Simplify logic around option lists in port set

Use a common pattern to handle option pairs --XYZ and --no-XYZ for managing
lists of attributes. This pattern looks at the presence of the option
in parsed_args first and branches as necessary.

Some specific steps are included for the SDK Network resources to reliably
set the 'dirty' flag for changed attributes via one or both of the following:
* iterate over lists of original resource attributes to force the creation
  of a new list object
* use [].extend() rather than += to add to the existing list (substitute
  {}.update() for dicts)

Change-Id: I0c3f9a52ffe1ae2b5b230cb13d6376dd9131aaf9
This commit is contained in:
Dean Troyer 2017-02-28 10:27:02 -06:00
parent 3e621c9a9c
commit 82a86d2d58
3 changed files with 193 additions and 137 deletions

View File

@ -114,8 +114,6 @@ def _get_attrs(client_manager, parsed_args):
))
if parsed_args.description is not None:
attrs['description'] = parsed_args.description
if parsed_args.fixed_ip is not None:
attrs['fixed_ips'] = parsed_args.fixed_ip
if parsed_args.device:
attrs['device_id'] = parsed_args.device
if parsed_args.device_owner is not None:
@ -124,8 +122,6 @@ def _get_attrs(client_manager, parsed_args):
attrs['admin_state_up'] = True
if parsed_args.disable:
attrs['admin_state_up'] = False
if parsed_args.binding_profile is not None:
attrs['binding:profile'] = parsed_args.binding_profile
if parsed_args.vnic_type is not None:
attrs['binding:vnic_type'] = parsed_args.vnic_type
if parsed_args.host:
@ -389,6 +385,12 @@ class CreatePort(command.ShowOne):
_prepare_fixed_ips(self.app.client_manager, parsed_args)
attrs = _get_attrs(self.app.client_manager, parsed_args)
if parsed_args.binding_profile is not None:
attrs['binding:profile'] = parsed_args.binding_profile
if parsed_args.fixed_ip:
attrs['fixed_ips'] = parsed_args.fixed_ip
if parsed_args.security_group:
attrs['security_group_ids'] = [client.find_security_group(
sg, ignore_missing=False).id
@ -396,6 +398,7 @@ class CreatePort(command.ShowOne):
parsed_args.security_group]
elif parsed_args.no_security_group:
attrs['security_group_ids'] = []
if parsed_args.allowed_address_pairs:
attrs['allowed_address_pairs'] = (
_convert_address_pairs(parsed_args))
@ -674,48 +677,50 @@ class SetPort(command.Command):
_prepare_fixed_ips(self.app.client_manager, parsed_args)
attrs = _get_attrs(self.app.client_manager, parsed_args)
obj = client.find_port(parsed_args.port, ignore_missing=False)
if 'binding:profile' in attrs:
# Do not modify attrs if both binding_profile/no_binding given
if not parsed_args.no_binding_profile:
tmp_binding_profile = copy.deepcopy(obj.binding_profile)
tmp_binding_profile.update(attrs['binding:profile'])
attrs['binding:profile'] = tmp_binding_profile
elif parsed_args.no_binding_profile:
if parsed_args.no_binding_profile:
attrs['binding:profile'] = {}
if 'fixed_ips' in attrs:
# When user unsets the fixed_ips, obj.fixed_ips = [{}].
# Adding the obj.fixed_ips list to attrs['fixed_ips']
# would therefore add an empty dictionary, while we need
# to append the attrs['fixed_ips'] iff there is some info
# in the obj.fixed_ips. Therefore I have opted for this `for` loop
# Do not modify attrs if fixed_ip/no_fixed_ip given
if not parsed_args.no_fixed_ip:
attrs['fixed_ips'] += [ip for ip in obj.fixed_ips if ip]
elif parsed_args.no_fixed_ip:
if parsed_args.binding_profile:
if 'binding:profile' not in attrs:
attrs['binding:profile'] = copy.deepcopy(obj.binding_profile)
attrs['binding:profile'].update(parsed_args.binding_profile)
if parsed_args.no_fixed_ip:
attrs['fixed_ips'] = []
if parsed_args.fixed_ip:
if 'fixed_ips' not in attrs:
# obj.fixed_ips = [{}] if no fixed IPs are set.
# Only append this to attrs['fixed_ips'] if actual fixed
# IPs are present to avoid adding an empty dict.
attrs['fixed_ips'] = [ip for ip in obj.fixed_ips if ip]
attrs['fixed_ips'].extend(parsed_args.fixed_ip)
if parsed_args.security_group:
attrs['security_group_ids'] = [
client.find_security_group(sg, ignore_missing=False).id for
sg in parsed_args.security_group]
if not parsed_args.no_security_group:
attrs['security_group_ids'] += obj.security_group_ids
elif parsed_args.no_security_group:
if parsed_args.no_security_group:
attrs['security_group_ids'] = []
if parsed_args.security_group:
if 'security_group_ids' not in attrs:
# NOTE(dtroyer): Get existing security groups, iterate the
# list to force a new list object to be
# created and make sure the SDK Resource
# marks the attribute 'dirty'.
attrs['security_group_ids'] = [
id for id in obj.security_group_ids
]
attrs['security_group_ids'].extend(
client.find_security_group(sg, ignore_missing=False).id
for sg in parsed_args.security_group
)
if (parsed_args.allowed_address_pairs and
parsed_args.no_allowed_address_pair):
attrs['allowed_address_pairs'] = (
_convert_address_pairs(parsed_args))
elif parsed_args.allowed_address_pairs:
attrs['allowed_address_pairs'] = (
[addr for addr in obj.allowed_address_pairs if addr] +
_convert_address_pairs(parsed_args))
elif parsed_args.no_allowed_address_pair:
if parsed_args.no_allowed_address_pair:
attrs['allowed_address_pairs'] = []
if parsed_args.allowed_address_pairs:
if 'allowed_address_pairs' not in attrs:
attrs['allowed_address_pairs'] = (
[addr for addr in obj.allowed_address_pairs if addr]
)
attrs['allowed_address_pairs'].extend(
_convert_address_pairs(parsed_args)
)
client.update_port(obj, **attrs)

View File

@ -20,7 +20,6 @@ class PortTests(base.TestCase):
"""Functional tests for port. """
NAME = uuid.uuid4().hex
NETWORK_NAME = uuid.uuid4().hex
SG_NAME = uuid.uuid4().hex
@classmethod
def setUpClass(cls):
@ -112,29 +111,32 @@ class PortTests(base.TestCase):
def test_port_set(self):
"""Test create, set, show, delete"""
name = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'port create -f json ' +
'--network ' + self.NETWORK_NAME + ' ' +
'--description xyzpdq '
'--description xyzpdq ' +
'--disable ' +
self.NAME
name
))
id1 = json_output.get('id')
self.addCleanup(self.openstack, 'port delete ' + id1)
self.assertEqual(self.NAME, json_output.get('name'))
self.assertEqual(name, json_output.get('name'))
self.assertEqual('xyzpdq', json_output.get('description'))
self.assertEqual('DOWN', json_output.get('admin_state_up'))
raw_output = self.openstack(
'port set ' + '--enable ' + self.NAME)
'port set ' + '--enable ' +
name
)
self.assertOutput('', raw_output)
json_output = json.loads(self.openstack(
'port show -f json ' + self.NAME
'port show -f json ' + name
))
sg_id = json_output.get('security_group_ids')
self.assertEqual(self.NAME, json_output.get('name'))
self.assertEqual(name, json_output.get('name'))
self.assertEqual('xyzpdq', json_output.get('description'))
self.assertEqual('UP', json_output.get('admin_state_up'))
self.assertIsNotNone(json_output.get('mac_address'))
@ -144,7 +146,7 @@ class PortTests(base.TestCase):
self.assertOutput('', raw_output)
json_output = json.loads(self.openstack(
'port show -f json ' + self.NAME
'port show -f json ' + name
))
self.assertEqual('', json_output.get('security_group_ids'))
@ -166,3 +168,68 @@ class PortTests(base.TestCase):
'port show -f json ' + self.NAME
))
self.assertEqual(json_output.get('mac_address'), '11:22:33:44:55:66')
def test_port_set_sg(self):
"""Test create, set, show, delete"""
sg_name1 = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'security group create -f json ' +
sg_name1
))
sg_id1 = json_output.get('id')
self.addCleanup(self.openstack, 'security group delete ' + sg_id1)
sg_name2 = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'security group create -f json ' +
sg_name2
))
sg_id2 = json_output.get('id')
self.addCleanup(self.openstack, 'security group delete ' + sg_id2)
name = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'port create -f json ' +
'--network ' + self.NETWORK_NAME + ' ' +
'--security-group ' + sg_name1 + ' ' +
name
))
id1 = json_output.get('id')
self.addCleanup(self.openstack, 'port delete ' + id1)
self.assertEqual(name, json_output.get('name'))
self.assertEqual(sg_id1, json_output.get('security_group_ids'))
raw_output = self.openstack(
'port set ' +
'--security-group ' + sg_name2 + ' ' +
name
)
self.assertOutput('', raw_output)
json_output = json.loads(self.openstack(
'port show -f json ' + name
))
self.assertEqual(name, json_output.get('name'))
self.assertIn(
# TODO(dtroyer): output formatters should not mess with JSON!
sg_id1,
json_output.get('security_group_ids'),
)
self.assertIn(
# TODO(dtroyer): output formatters should not mess with JSON!
sg_id2,
json_output.get('security_group_ids'),
)
raw_output = self.openstack(
'port unset --security-group ' + sg_id1 + ' ' + id1)
self.assertOutput('', raw_output)
json_output = json.loads(self.openstack(
'port show -f json ' + name
))
self.assertEqual(
# TODO(dtroyer): output formatters should do this on JSON!
sg_id2,
json_output.get('security_group_ids'),
)

View File

@ -883,27 +883,69 @@ class TestSetPort(TestPort):
# Get the command object to test
self.cmd = port.SetPort(self.app, self.namespace)
def test_set_fixed_ip(self):
def test_set_port_defaults(self):
arglist = [
'--fixed-ip', 'ip-address=10.0.0.11',
self._port.name,
'--no-fixed-ip',
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.11'}]),
('port', self._port.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [{'ip_address': '10.0.0.11'}],
}
result = self.cmd.take_action(parsed_args)
attrs = {}
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_dns_name(self):
def test_set_port_fixed_ip(self):
_testport = network_fakes.FakePort.create_one_port(
{'fixed_ips': [{'ip_address': '0.0.0.1'}]})
self.network.find_port = mock.Mock(return_value=_testport)
arglist = [
'--fixed-ip', 'ip-address=10.0.0.12',
_testport.name,
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.12'}]),
('port', _testport.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [
{'ip_address': '0.0.0.1'},
{'ip_address': '10.0.0.12'},
],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_port_fixed_ip_clear(self):
_testport = network_fakes.FakePort.create_one_port(
{'fixed_ips': [{'ip_address': '0.0.0.1'}]})
self.network.find_port = mock.Mock(return_value=_testport)
arglist = [
'--fixed-ip', 'ip-address=10.0.0.12',
'--no-fixed-ip',
_testport.name,
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.12'}]),
('no_fixed_ip', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [
{'ip_address': '10.0.0.12'},
],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_port_dns_name(self):
arglist = [
'--dns-name', '8.8.8.8',
self._port.name,
@ -922,28 +964,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_append_fixed_ip(self):
_testport = network_fakes.FakePort.create_one_port(
{'fixed_ips': [{'ip_address': '0.0.0.1'}]})
self.network.find_port = mock.Mock(return_value=_testport)
arglist = [
'--fixed-ip', 'ip-address=10.0.0.12',
_testport.name,
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.12'}]),
('port', _testport.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [
{'ip_address': '10.0.0.12'}, {'ip_address': '0.0.0.1'}],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_overwrite_binding_profile(self):
def test_set_port_overwrite_binding_profile(self):
_testport = network_fakes.FakePort.create_one_port(
{'binding_profile': {'lok_i': 'visi_on'}})
self.network.find_port = mock.Mock(return_value=_testport)
@ -965,28 +986,6 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_overwrite_fixed_ip(self):
_testport = network_fakes.FakePort.create_one_port(
{'fixed_ips': [{'ip_address': '0.0.0.1'}]})
self.network.find_port = mock.Mock(return_value=_testport)
arglist = [
'--fixed-ip', 'ip-address=10.0.0.12',
'--no-fixed-ip',
_testport.name,
]
verifylist = [
('fixed_ip', [{'ip-address': '10.0.0.12'}]),
('no_fixed_ip', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'fixed_ips': [
{'ip_address': '10.0.0.12'}],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_overwrite_mac_address(self):
_testport = network_fakes.FakePort.create_one_port(
{'mac_address': '11:22:33:44:55:66'})
@ -1006,7 +1005,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_this(self):
def test_set_port_this(self):
arglist = [
'--disable',
'--no-fixed-ip',
@ -1031,7 +1030,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_that(self):
def test_set_port_that(self):
arglist = [
'--description', 'newDescription',
'--enable',
@ -1065,22 +1064,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_nothing(self):
arglist = [
self._port.name,
]
verifylist = [
('port', self._port.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {}
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_invalid_json_binding_profile(self):
def test_set_port_invalid_json_binding_profile(self):
arglist = [
'--binding-profile', '{"parent_name"}',
'test-port',
@ -1091,7 +1075,7 @@ class TestSetPort(TestPort):
arglist,
None)
def test_set_invalid_key_value_binding_profile(self):
def test_set_port_invalid_key_value_binding_profile(self):
arglist = [
'--binding-profile', 'key',
'test-port',
@ -1102,7 +1086,7 @@ class TestSetPort(TestPort):
arglist,
None)
def test_set_mixed_binding_profile(self):
def test_set_port_mixed_binding_profile(self):
arglist = [
'--binding-profile', 'foo=bar',
'--binding-profile', '{"foo2": "bar2"}',
@ -1122,7 +1106,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_set_security_group(self):
def test_set_port_security_group(self):
sg = network_fakes.FakeSecurityGroup.create_one_security_group()
self.network.find_security_group = mock.Mock(return_value=sg)
arglist = [
@ -1133,17 +1117,16 @@ class TestSetPort(TestPort):
('security_group', [sg.id]),
('port', self._port.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
result = self.cmd.take_action(parsed_args)
attrs = {
'security_group_ids': [sg.id],
}
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_append_security_group(self):
def test_set_port_security_group_append(self):
sg_1 = network_fakes.FakeSecurityGroup.create_one_security_group()
sg_2 = network_fakes.FakeSecurityGroup.create_one_security_group()
sg_3 = network_fakes.FakeSecurityGroup.create_one_security_group()
@ -1161,14 +1144,15 @@ class TestSetPort(TestPort):
('port', _testport.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'security_group_ids': [sg_2.id, sg_3.id, sg_1.id],
'security_group_ids': [sg_1.id, sg_2.id, sg_3.id],
}
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_no_security_groups(self):
def test_set_port_security_group_clear(self):
arglist = [
'--no-security-group',
self._port.name,
@ -1177,17 +1161,16 @@ class TestSetPort(TestPort):
('no_security_group', True),
('port', self._port.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
result = self.cmd.take_action(parsed_args)
attrs = {
'security_group_ids': [],
}
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_overwrite_security_group(self):
def test_set_port_security_group_replace(self):
sg1 = network_fakes.FakeSecurityGroup.create_one_security_group()
sg2 = network_fakes.FakeSecurityGroup.create_one_security_group()
_testport = network_fakes.FakePort.create_one_port(
@ -1204,6 +1187,7 @@ class TestSetPort(TestPort):
('no_security_group', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'security_group_ids': [sg2.id],
@ -1211,7 +1195,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_allowed_address_pair(self):
def test_set_port_allowed_address_pair(self):
arglist = [
'--allowed-address', 'ip-address=192.168.1.123',
self._port.name,
@ -1230,7 +1214,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_append_allowed_address_pair(self):
def test_set_port_append_allowed_address_pair(self):
_testport = network_fakes.FakePort.create_one_port(
{'allowed_address_pairs': [{'ip_address': '192.168.1.123'}]})
self.network.find_port = mock.Mock(return_value=_testport)
@ -1253,7 +1237,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_overwrite_allowed_address_pair(self):
def test_set_port_overwrite_allowed_address_pair(self):
_testport = network_fakes.FakePort.create_one_port(
{'allowed_address_pairs': [{'ip_address': '192.168.1.123'}]})
self.network.find_port = mock.Mock(return_value=_testport)
@ -1277,7 +1261,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(_testport, **attrs)
self.assertIsNone(result)
def test_set_no_allowed_address_pairs(self):
def test_set_port_no_allowed_address_pairs(self):
arglist = [
'--no-allowed-address',
self._port.name,
@ -1296,7 +1280,7 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
def test_port_security_enabled(self):
def test_set_port_security_enabled(self):
arglist = [
'--enable-port-security',
self._port.id,
@ -1314,7 +1298,7 @@ class TestSetPort(TestPort):
'port_security_enabled': True,
})
def test_port_security_disabled(self):
def test_set_port_security_disabled(self):
arglist = [
'--disable-port-security',
self._port.id,