Merge "Simplify logic around option lists in port set"

This commit is contained in:
Jenkins 2017-03-27 17:33:23 +00:00 committed by Gerrit Code Review
commit dd81ca0692
3 changed files with 193 additions and 137 deletions

View File

@ -114,8 +114,6 @@ def _get_attrs(client_manager, parsed_args):
))
if parsed_args.description is not None:
attrs['description'] = parsed_args.description
if parsed_args.fixed_ip is not None:
attrs['fixed_ips'] = parsed_args.fixed_ip
if parsed_args.device:
attrs['device_id'] = parsed_args.device
if parsed_args.device_owner is not None:
@ -124,8 +122,6 @@ def _get_attrs(client_manager, parsed_args):
attrs['admin_state_up'] = True
if parsed_args.disable:
attrs['admin_state_up'] = False
if parsed_args.binding_profile is not None:
attrs['binding:profile'] = parsed_args.binding_profile
if parsed_args.vnic_type is not None:
attrs['binding:vnic_type'] = parsed_args.vnic_type
if parsed_args.host:
@ -389,6 +385,12 @@ class CreatePort(command.ShowOne):
_prepare_fixed_ips(self.app.client_manager, parsed_args)
attrs = _get_attrs(self.app.client_manager, parsed_args)
if parsed_args.binding_profile is not None:
attrs['binding:profile'] = parsed_args.binding_profile
if parsed_args.fixed_ip:
attrs['fixed_ips'] = parsed_args.fixed_ip
if parsed_args.security_group:
attrs['security_group_ids'] = [client.find_security_group(
sg, ignore_missing=False).id
@ -396,6 +398,7 @@ class CreatePort(command.ShowOne):
parsed_args.security_group]
elif parsed_args.no_security_group:
attrs['security_group_ids'] = []
if parsed_args.allowed_address_pairs:
attrs['allowed_address_pairs'] = (
_convert_address_pairs(parsed_args))
@ -674,48 +677,50 @@ class SetPort(command.Command):
_prepare_fixed_ips(self.app.client_manager, parsed_args)
attrs = _get_attrs(self.app.client_manager, parsed_args)
obj = client.find_port(parsed_args.port, ignore_missing=False)
if 'binding:profile' in attrs:
# Do not modify attrs if both binding_profile/no_binding given
if not parsed_args.no_binding_profile:
tmp_binding_profile = copy.deepcopy(obj.binding_profile)
tmp_binding_profile.update(attrs['binding:profile'])
attrs['binding:profile'] = tmp_binding_profile
elif parsed_args.no_binding_profile:
if parsed_args.no_binding_profile:
attrs['binding:profile'] = {}
if 'fixed_ips' in attrs:
# When user unsets the fixed_ips, obj.fixed_ips = [{}].
# Adding the obj.fixed_ips list to attrs['fixed_ips']
# would therefore add an empty dictionary, while we need
# to append the attrs['fixed_ips'] iff there is some info
# in the obj.fixed_ips. Therefore I have opted for this `for` loop
# Do not modify attrs if fixed_ip/no_fixed_ip given
if not parsed_args.no_fixed_ip:
attrs['fixed_ips'] += [ip for ip in obj.fixed_ips if ip]
elif parsed_args.no_fixed_ip:
if parsed_args.binding_profile:
if 'binding:profile' not in attrs:
attrs['binding:profile'] = copy.deepcopy(obj.binding_profile)
attrs['binding:profile'].update(parsed_args.binding_profile)
if parsed_args.no_fixed_ip:
attrs['fixed_ips'] = []
if parsed_args.fixed_ip:
if 'fixed_ips' not in attrs:
# obj.fixed_ips = [{}] if no fixed IPs are set.
# Only append this to attrs['fixed_ips'] if actual fixed
# IPs are present to avoid adding an empty dict.
attrs['fixed_ips'] = [ip for ip in obj.fixed_ips if ip]
attrs['fixed_ips'].extend(parsed_args.fixed_ip)
if parsed_args.security_group:
attrs['security_group_ids'] = [
client.find_security_group(sg, ignore_missing=False).id for
sg in parsed_args.security_group]
if not parsed_args.no_security_group:
attrs['security_group_ids'] += obj.security_group_ids
elif parsed_args.no_security_group:
if parsed_args.no_security_group:
attrs['security_group_ids'] = []
if parsed_args.security_group:
if 'security_group_ids' not in attrs:
# NOTE(dtroyer): Get existing security groups, iterate the
# list to force a new list object to be
# created and make sure the SDK Resource
# marks the attribute 'dirty'.
attrs['security_group_ids'] = [
id for id in obj.security_group_ids
]
attrs['security_group_ids'].extend(
client.find_security_group(sg, ignore_missing=False).id
for sg in parsed_args.security_group
)
if (parsed_args.allowed_address_pairs and
parsed_args.no_allowed_address_pair):
attrs['allowed_address_pairs'] = (
_convert_address_pairs(parsed_args))
elif parsed_args.allowed_address_pairs:
attrs['allowed_address_pairs'] = (
[addr for addr in obj.allowed_address_pairs if addr] +
_convert_address_pairs(parsed_args))
elif parsed_args.no_allowed_address_pair:
if parsed_args.no_allowed_address_pair:
attrs['allowed_address_pairs'] = []
if parsed_args.allowed_address_pairs:
if 'allowed_address_pairs' not in attrs:
attrs['allowed_address_pairs'] = (
[addr for addr in obj.allowed_address_pairs if addr]
)
attrs['allowed_address_pairs'].extend(
_convert_address_pairs(parsed_args)
)
client.update_port(obj, **attrs)

View File

@ -20,7 +20,6 @@ class PortTests(base.TestCase):
"""Functional tests for port. """
NAME = uuid.uuid4().hex
NETWORK_NAME = uuid.uuid4().hex
SG_NAME = uuid.uuid4().hex
@classmethod
def setUpClass(cls):
@ -112,29 +111,32 @@ class PortTests(base.TestCase):
def test_port_set(self):
"""Test create, set, show, delete"""
name = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'port create -f json ' +
'--network ' + self.NETWORK_NAME + ' ' +
'--description xyzpdq '
'--description xyzpdq ' +
'--disable ' +
self.NAME
name
))
id1 = json_output.get('id')
self.addCleanup(self.openstack, 'port delete ' + id1)
self.assertEqual(self.NAME, json_output.get('name'))
self.assertEqual(name, json_output.get('name'))
self.assertEqual('xyzpdq', json_output.get('description'))
self.assertEqual('DOWN', json_output.get('admin_state_up'))
raw_output = self.openstack(
'port set ' + '--enable ' + self.NAME)
'port set ' + '--enable ' +
name
)
self.assertOutput('', raw_output)
json_output = json.loads(self.openstack(
'port show -f json ' + self.NAME
'port show -f json ' + name
))
sg_id = json_output.get('security_group_ids')
self.assertEqual(self.NAME, json_output.get('name'))
self.assertEqual(name, json_output.get('name'))
self.assertEqual('xyzpdq', json_output.get('description'))
self.assertEqual('UP', json_output.get('admin_state_up'))
self.assertIsNotNone(json_output.get('mac_address'))
@ -144,7 +146,7 @@ class PortTests(base.TestCase):
self.assertOutput('', raw_output)
json_output = json.loads(self.openstack(
'port show -f json ' + self.NAME
'port show -f json ' + name
))
self.assertEqual('', json_output.get('security_group_ids'))
@ -166,3 +168,68 @@ class PortTests(base.TestCase):
'port show -f json ' + self.NAME
))
self.assertEqual(json_output.get('mac_address'), '11:22:33:44:55:66')
def test_port_set_sg(self):
"""Test create, set, show, delete"""
sg_name1 = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'security group create -f json ' +
sg_name1
))
sg_id1 = json_output.get('id')
self.addCleanup(self.openstack, 'security group delete ' + sg_id1)
sg_name2 = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'security group create -f json ' +
sg_name2
))
sg_id2 = json_output.get('id')
self.addCleanup(self.openstack, 'security group delete ' + sg_id2)
name = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'port create -f json ' +
'--network ' + self.NETWORK_NAME + ' ' +
'--security-group ' + sg_name1 + ' ' +
name
))
id1 = json_output.get('id')
self.addCleanup(self.openstack, 'port delete ' + id1)
self.assertEqual(name, json_output.get('name'))
self.assertEqual(sg_id1, json_output.get('security_group_ids'))
raw_output = self.openstack(
'port set ' +
'--security-group ' + sg_name2 + ' ' +
name
)
self.assertOutput('', raw_output)
json_output = json.loads(self.openstack(
'port show -f json ' + name
))
self.assertEqual(name, json_output.get('name'))
self.assertIn(
# TODO(dtroyer): output formatters should not mess with JSON!
sg_id1,
json_output.get('security_group_ids'),
)
self.assertIn(
# TODO(dtroyer): output formatters should not mess with JSON!
sg_id2,
json_output.get('security_group_ids'),
)
raw_output = self.openstack(
'port unset --security-group ' + sg_id1 + ' ' + id1)
self.assertOutput('', raw_output)
json_output = json.loads(self.openstack(
'port show -f json ' + name
))
self.assertEqual(
# TODO(dtroyer): output formatters should do this on JSON!
sg_id2,
json_output.get('security_group_ids'),
)

View File

@ -883,27 +883,69 @@ class TestSetPort(TestPort):
# Get the command object to test
self.cmd = port.SetPort(self.app, self.namespace)
def test_set_fixed_ip(self):
def test_set_port_defaults(self):
arglist = [
'--fixed-ip', 'ip-address=10.0.0.11',
self._port.name,
'--no-fixed-ip',
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.11'}]),
('port', self._port.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [{'ip_address': '10.0.0.11'}],
}
result = self.cmd.take_action(parsed_args)
attrs = {}
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_dns_name(self):
def test_set_port_fixed_ip(self):
_testport = network_fakes.FakePort.create_one_port(
{'fixed_ips': [{'ip_address': '0.0.0.1'}]})
self.network.find_port = mock.Mock(return_value=_testport)
arglist = [
'--fixed-ip', 'ip-address=10.0.0.12',
_testport.name,
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.12'}]),
('port', _testport.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [
{'ip_address': '0.0.0.1'},
{'ip_address': '10.0.0.12'},
],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_port_fixed_ip_clear(self):
_testport = network_fakes.FakePort.create_one_port(
{'fixed_ips': [{'ip_address': '0.0.0.1'}]})
self.network.find_port = mock.Mock(return_value=_testport)
arglist = [
'--fixed-ip', 'ip-address=10.0.0.12',
'--no-fixed-ip',
_testport.name,
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.12'}]),
('no_fixed_ip', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [
{'ip_address': '10.0.0.12'},
],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_port_dns_name(self):
arglist = [
'--dns-name', '8.8.8.8',
self._port.name,
@ -922,28 +964,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_append_fixed_ip(self):
_testport = network_fakes.FakePort.create_one_port(
{'fixed_ips': [{'ip_address': '0.0.0.1'}]})
self.network.find_port = mock.Mock(return_value=_testport)
arglist = [
'--fixed-ip', 'ip-address=10.0.0.12',
_testport.name,
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.12'}]),
('port', _testport.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [
{'ip_address': '10.0.0.12'}, {'ip_address': '0.0.0.1'}],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_overwrite_binding_profile(self):
def test_set_port_overwrite_binding_profile(self):
_testport = network_fakes.FakePort.create_one_port(
{'binding_profile': {'lok_i': 'visi_on'}})
self.network.find_port = mock.Mock(return_value=_testport)
@ -965,28 +986,6 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_overwrite_fixed_ip(self):
_testport = network_fakes.FakePort.create_one_port(
{'fixed_ips': [{'ip_address': '0.0.0.1'}]})
self.network.find_port = mock.Mock(return_value=_testport)
arglist = [
'--fixed-ip', 'ip-address=10.0.0.12',
'--no-fixed-ip',
_testport.name,
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.12'}]),
('no_fixed_ip', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [
{'ip_address': '10.0.0.12'}],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_overwrite_mac_address(self):
_testport = network_fakes.FakePort.create_one_port(
{'mac_address': '11:22:33:44:55:66'})
@ -1006,7 +1005,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_this(self):
def test_set_port_this(self):
arglist = [
'--disable',
'--no-fixed-ip',
@ -1031,7 +1030,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_that(self):
def test_set_port_that(self):
arglist = [
'--description', 'newDescription',
'--enable',
@ -1065,22 +1064,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_nothing(self):
arglist = [
self._port.name,
]
verifylist = [
('port', self._port.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {}
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_invalid_json_binding_profile(self):
def test_set_port_invalid_json_binding_profile(self):
arglist = [
'--binding-profile', '{"parent_name"}',
'test-port',
@ -1091,7 +1075,7 @@ class TestSetPort(TestPort):
arglist,
None)
def test_set_invalid_key_value_binding_profile(self):
def test_set_port_invalid_key_value_binding_profile(self):
arglist = [
'--binding-profile', 'key',
'test-port',
@ -1102,7 +1086,7 @@ class TestSetPort(TestPort):
arglist,
None)
def test_set_mixed_binding_profile(self):
def test_set_port_mixed_binding_profile(self):
arglist = [
'--binding-profile', 'foo=bar',
'--binding-profile', '{"foo2": "bar2"}',
@ -1122,7 +1106,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_security_group(self):
def test_set_port_security_group(self):
sg = network_fakes.FakeSecurityGroup.create_one_security_group()
self.network.find_security_group = mock.Mock(return_value=sg)
arglist = [
@ -1133,17 +1117,16 @@ class TestSetPort(TestPort):
('security_group', [sg.id]),
('port', self._port.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
result = self.cmd.take_action(parsed_args)
attrs = {
'security_group_ids': [sg.id],
}
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_append_security_group(self):
def test_set_port_security_group_append(self):
sg_1 = network_fakes.FakeSecurityGroup.create_one_security_group()
sg_2 = network_fakes.FakeSecurityGroup.create_one_security_group()
sg_3 = network_fakes.FakeSecurityGroup.create_one_security_group()
@ -1161,14 +1144,15 @@ class TestSetPort(TestPort):
('port', _testport.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'security_group_ids': [sg_2.id, sg_3.id, sg_1.id],
'security_group_ids': [sg_1.id, sg_2.id, sg_3.id],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_no_security_groups(self):
def test_set_port_security_group_clear(self):
arglist = [
'--no-security-group',
self._port.name,
@ -1177,17 +1161,16 @@ class TestSetPort(TestPort):
('no_security_group', True),
('port', self._port.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
result = self.cmd.take_action(parsed_args)
attrs = {
'security_group_ids': [],
}
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_overwrite_security_group(self):
def test_set_port_security_group_replace(self):
sg1 = network_fakes.FakeSecurityGroup.create_one_security_group()
sg2 = network_fakes.FakeSecurityGroup.create_one_security_group()
_testport = network_fakes.FakePort.create_one_port(
@ -1204,6 +1187,7 @@ class TestSetPort(TestPort):
('no_security_group', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'security_group_ids': [sg2.id],
@ -1211,7 +1195,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_allowed_address_pair(self):
def test_set_port_allowed_address_pair(self):
arglist = [
'--allowed-address', 'ip-address=192.168.1.123',
self._port.name,
@ -1230,7 +1214,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_append_allowed_address_pair(self):
def test_set_port_append_allowed_address_pair(self):
_testport = network_fakes.FakePort.create_one_port(
{'allowed_address_pairs': [{'ip_address': '192.168.1.123'}]})
self.network.find_port = mock.Mock(return_value=_testport)
@ -1253,7 +1237,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_overwrite_allowed_address_pair(self):
def test_set_port_overwrite_allowed_address_pair(self):
_testport = network_fakes.FakePort.create_one_port(
{'allowed_address_pairs': [{'ip_address': '192.168.1.123'}]})
self.network.find_port = mock.Mock(return_value=_testport)
@ -1277,7 +1261,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_no_allowed_address_pairs(self):
def test_set_port_no_allowed_address_pairs(self):
arglist = [
'--no-allowed-address',
self._port.name,
@ -1296,7 +1280,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_port_security_enabled(self):
def test_set_port_security_enabled(self):
arglist = [
'--enable-port-security',
self._port.id,
@ -1314,7 +1298,7 @@ class TestSetPort(TestPort):
'port_security_enabled': True,
})
def test_port_security_disabled(self):
def test_set_port_security_disabled(self):
arglist = [
'--disable-port-security',
self._port.id,