Fix params order in assertEqual

Fix params order to correspond to real signature:
assertEqual(expected, actual)

Change-Id: I7888ca97afad663185a508002c6d9c62357e0cde
Closes-Bug: #1277104
This commit is contained in:
Yatin Kumbhare 2016-01-13 15:27:52 +05:30
parent 6576b7061e
commit b4db9c2644
9 changed files with 119 additions and 121 deletions

View File

@ -242,10 +242,10 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
# Verify that default egress rules have been created
sg_rules = security_group['security_group']['security_group_rules']
self.assertEqual(len(sg_rules), 2)
self.assertEqual(2, len(sg_rules))
v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4]
self.assertEqual(len(v4_rules), 1)
self.assertEqual(1, len(v4_rules))
v4_rule = v4_rules[0]
expected = {'direction': 'egress',
'ethertype': const.IPv4,
@ -257,7 +257,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._assert_sg_rule_has_kvs(v4_rule, expected)
v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6]
self.assertEqual(len(v6_rules), 1)
self.assertEqual(1, len(v6_rules))
v6_rule = v6_rules[0]
expected = {'direction': 'egress',
'ethertype': const.IPv6,
@ -305,10 +305,10 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
data,
sg['security_group']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(res['security_group']['name'],
data['security_group']['name'])
self.assertEqual(res['security_group']['description'],
data['security_group']['description'])
self.assertEqual(data['security_group']['name'],
res['security_group']['name'])
self.assertEqual(data['security_group']['description'],
res['security_group']['description'])
def test_update_security_group_name_to_default_fail(self):
with self.security_group() as sg:
@ -319,7 +319,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_group']['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_update_default_security_group_name_fail(self):
with self.network():
@ -332,7 +332,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_groups'][0]['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
def test_update_default_security_group_with_description(self):
with self.network():
@ -343,8 +343,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
data,
sg['security_groups'][0]['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(res['security_group']['description'],
data['security_group']['description'])
self.assertEqual(data['security_group']['description'],
res['security_group']['description'])
def test_check_default_security_group_description(self):
with self.network():
@ -357,21 +357,21 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
with self.network():
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(len(groups['security_groups']), 1)
self.assertEqual(1, len(groups['security_groups']))
def test_create_default_security_group_fail(self):
name = 'default'
description = 'my webservers'
res = self._create_security_group(self.fmt, name, description)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_default_security_group_check_case_insensitive(self):
name = 'DEFAULT'
description = 'my webservers'
res = self._create_security_group(self.fmt, name, description)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_list_security_groups(self):
with self.security_group(name='sg1', description='sg') as v1,\
@ -419,7 +419,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
'22', None, None, ethertype=ethertype)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_ethertype_invalid_for_protocol(self):
name = 'webservers'
@ -430,7 +430,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id, 'ingress', const.PROTO_NAME_ICMP_V6)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_invalid_ip_prefix(self):
name = 'webservers'
@ -446,7 +446,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
'22', '22',
remote_ip_prefix)
res = self._create_security_group_rule(self.fmt, rule)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_invalid_ethertype_for_prefix(self):
name = 'webservers'
@ -466,7 +466,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
None,
ethertype=ethertype)
res = self._create_security_group_rule(self.fmt, rule)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_with_unmasked_prefix(self):
name = 'webservers'
@ -502,7 +502,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id, 'ingress', protocol, '22', '22')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_protocol_as_number(self):
name = 'webservers'
@ -514,7 +514,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id, 'ingress', protocol)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_case_insensitive(self):
name = 'webservers'
@ -567,7 +567,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg_rule = group['security_group']['security_group_rules']
self.assertEqual(group['security_group']['id'],
remote_group_id)
self.assertEqual(len(sg_rule), 3)
self.assertEqual(3, len(sg_rule))
sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']
for k, v, in keys:
self.assertEqual(sg_rule[0][k], v)
@ -615,7 +615,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
neutron_context = context.Context('', 'test-tenant')
sg = self._list('security-groups',
neutron_context=neutron_context).get('security_groups')
self.assertEqual(len(sg), 1)
self.assertEqual(1, len(sg))
def test_security_group_port_create_creates_default_security_group(self):
res = self._create_network(self.fmt, 'net1', True,
@ -625,7 +625,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
res = self._create_port(self.fmt, net1['network']['id'],
tenant_id='not_admin', set_context=True)
sg = self._list('security-groups').get('security_groups')
self.assertEqual(len(sg), 1)
self.assertEqual(1, len(sg))
def test_default_security_group_rules(self):
with self.network():
@ -643,7 +643,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
r for r in sg_rules
if r['direction'] == 'egress' and r['ethertype'] == const.IPv4
]
self.assertEqual(len(rules), 1)
self.assertEqual(1, len(rules))
v4_egress = rules[0]
expected = {'direction': 'egress',
@ -660,7 +660,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
r for r in sg_rules
if r['direction'] == 'egress' and r['ethertype'] == const.IPv6
]
self.assertEqual(len(rules), 1)
self.assertEqual(1, len(rules))
v6_egress = rules[0]
expected = {'direction': 'egress',
@ -677,7 +677,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
r for r in sg_rules
if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4
]
self.assertEqual(len(rules), 1)
self.assertEqual(1, len(rules))
v4_ingress = rules[0]
expected = {'direction': 'ingress',
@ -694,7 +694,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
r for r in sg_rules
if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6
]
self.assertEqual(len(rules), 1)
self.assertEqual(1, len(rules))
v6_ingress = rules[0]
expected = {'direction': 'ingress',
@ -1280,7 +1280,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_groups=['bad_id'])
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_delete_security_group_port_in_use(self):
with self.network() as n:
@ -1316,7 +1316,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
ret = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
self.assertEqual(2, len(ret['security_group_rules']))
def test_create_security_group_rule_bulk_emulated(self):
@ -1342,7 +1342,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_allow_all_ipv4(self):
with self.security_group() as sg:
@ -1354,7 +1354,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
res = self._create_security_group_rule(
self.fmt, {'security_group_rule': rule})
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_allow_all_ipv4_v6_bulk(self):
if self._skip_native_bulk:
@ -1373,7 +1373,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
rules = {'security_group_rules': [rule_v4, rule_v6]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_duplicate_rule_in_post(self):
if self._skip_native_bulk:
@ -1388,7 +1388,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
real_has_attr = hasattr
@ -1410,7 +1410,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_security_group_rule_duplicate_rule_db(self):
if self._skip_native_bulk:
@ -1425,7 +1425,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
real_has_attr = hasattr
@ -1446,7 +1446,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_security_group_rule_different_security_group_ids(self):
if self._skip_native_bulk:
@ -1466,7 +1466,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_with_invalid_ethertype(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -1484,7 +1484,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
ethertype='IPv5')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_with_invalid_protocol(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -1501,7 +1501,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_with_invalid_tcp_or_udp_protocol(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -1518,7 +1518,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_port_with_non_uuid(self):
with self.network() as n:
@ -1562,7 +1562,7 @@ class TestConvertIPPrefixToCIDR(base.BaseTestCase):
def test_convert_ip_prefix_with_netmask_to_cidr(self):
addresses = ['10.1.0.0/16', '10.1.2.3/32', '2001:db8:1234::/48']
for addr in addresses:
self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(addr), addr)
self.assertEqual(addr, ext_sg.convert_ip_prefix_to_cidr(addr))
class TestConvertProtocol(base.BaseTestCase):

View File

@ -73,13 +73,13 @@ class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
ctx,
filters=dict(service_type=[constants.LOADBALANCER])
)
self.assertEqual(len(res), 1)
self.assertEqual(1, len(res))
res = self.manager.get_service_providers(
ctx,
filters=dict(service_type=[constants.FIREWALL])
)
self.assertEqual(len(res), 1)
self.assertEqual(1, len(res))
def test_multiple_default_providers_specified_for_service(self):
self.assertRaises(
@ -98,10 +98,10 @@ class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
# can pass None as a context
p = self.manager.get_default_service_provider(None,
constants.LOADBALANCER)
self.assertEqual(p, {'service_type': constants.LOADBALANCER,
'name': 'lbaas1',
'driver': 'driver_path',
'default': True})
self.assertEqual({'service_type': constants.LOADBALANCER,
'name': 'lbaas1',
'driver': 'driver_path',
'default': True}, p)
self.assertRaises(
provconf.DefaultServiceProviderNotFound,
@ -186,7 +186,7 @@ class ServiceTypeExtensionTestCase(ServiceTypeExtensionTestCaseBase):
instance.get_service_providers.assert_called_with(mock.ANY,
filters={},
fields=[])
self.assertEqual(res.status_int, webexc.HTTPOk.code)
self.assertEqual(webexc.HTTPOk.code, res.status_int)
class ServiceTypeManagerExtTestCase(ServiceTypeExtensionTestCaseBase):
@ -212,7 +212,7 @@ class ServiceTypeManagerExtTestCase(ServiceTypeExtensionTestCaseBase):
def test_list_service_providers(self):
res = self._list_service_providers()
self.assertEqual(res.status_int, webexc.HTTPOk.code)
self.assertEqual(webexc.HTTPOk.code, res.status_int)
data = self.deserialize(res)
self.assertIn('service_providers', data)
self.assertGreaterEqual(len(data['service_providers']), 2)

View File

@ -643,7 +643,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertEqual(2, len(ips))
add_expected = {'chg_ip':
{p1['network_id']:
@ -661,7 +661,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertEqual(2, len(ips))
upd_expected = {'chg_ip':
{p1['network_id']:
@ -680,7 +680,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(1, len(ips))
del_expected = {'chg_ip':
{p1['network_id']:

View File

@ -530,7 +530,7 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
self.assertTrue(
port_2_data in self.agent.network_ports[NETWORK_ID]
)
self.assertEqual(cleaned_port_id, PORT_1)
self.assertEqual(PORT_1, cleaned_port_id)
#and now remove last port from network:
cleaned_port_id = self.agent._clean_network_ports(
port_2_data['device']
@ -538,7 +538,7 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
self.assertTrue(
NETWORK_ID not in self.agent.network_ports.keys()
)
self.assertEqual(cleaned_port_id, port_2_data['port_id'])
self.assertEqual(port_2_data['port_id'], cleaned_port_id)
class TestLinuxBridgeManager(base.BaseTestCase):
@ -575,30 +575,28 @@ class TestLinuxBridgeManager(base.BaseTestCase):
def test_get_bridge_name(self):
nw_id = "123456789101112"
self.assertEqual(self.lbm.get_bridge_name(nw_id),
"brq" + nw_id[0:11])
self.assertEqual("brq" + nw_id[0:11],
self.lbm.get_bridge_name(nw_id))
nw_id = ""
self.assertEqual(self.lbm.get_bridge_name(nw_id),
"brq")
self.assertEqual("brq", self.lbm.get_bridge_name(nw_id))
def test_get_subinterface_name(self):
self.assertEqual(self.lbm.get_subinterface_name("eth0", "0"),
"eth0.0")
self.assertEqual(self.lbm.get_subinterface_name("eth0", ""),
"eth0.")
self.assertEqual("eth0.0",
self.lbm.get_subinterface_name("eth0", "0"))
self.assertEqual("eth0.", self.lbm.get_subinterface_name("eth0", ""))
def test_get_tap_device_name(self):
if_id = "123456789101112"
self.assertEqual(self.lbm.get_tap_device_name(if_id),
constants.TAP_DEVICE_PREFIX + if_id[0:11])
self.assertEqual(constants.TAP_DEVICE_PREFIX + if_id[0:11],
self.lbm.get_tap_device_name(if_id))
if_id = ""
self.assertEqual(self.lbm.get_tap_device_name(if_id),
constants.TAP_DEVICE_PREFIX)
self.assertEqual(constants.TAP_DEVICE_PREFIX,
self.lbm.get_tap_device_name(if_id))
def test_get_vxlan_device_name(self):
vn_id = p_const.MAX_VXLAN_VNI
self.assertEqual(self.lbm.get_vxlan_device_name(vn_id),
"vxlan-" + str(vn_id))
self.assertEqual("vxlan-" + str(vn_id),
self.lbm.get_vxlan_device_name(vn_id))
self.assertIsNone(self.lbm.get_vxlan_device_name(vn_id + 1))
def test_get_vxlan_group(self):

View File

@ -63,7 +63,7 @@ class TestSriovAgentConfig(base.BaseTestCase):
'SRIOV_NIC')
device_mappings = n_utils.parse_mappings(
cfg.CONF.SRIOV_NIC.physical_device_mappings)
self.assertEqual(device_mappings, self.DEVICE_MAPPING)
self.assertEqual(self.DEVICE_MAPPING, device_mappings)
def test_device_mappings_with_error(self):
cfg.CONF.set_override('physical_device_mappings',
@ -78,7 +78,7 @@ class TestSriovAgentConfig(base.BaseTestCase):
'SRIOV_NIC')
device_mappings = n_utils.parse_mappings(
cfg.CONF.SRIOV_NIC.physical_device_mappings)
self.assertEqual(device_mappings, self.DEVICE_MAPPING)
self.assertEqual(self.DEVICE_MAPPING, device_mappings)
def test_exclude_devices(self):
cfg.CONF.set_override('exclude_devices',
@ -86,7 +86,7 @@ class TestSriovAgentConfig(base.BaseTestCase):
'SRIOV_NIC')
exclude_devices = config.parse_exclude_devices(
cfg.CONF.SRIOV_NIC.exclude_devices)
self.assertEqual(exclude_devices, self.EXCLUDE_DEVICES)
self.assertEqual(self.EXCLUDE_DEVICES, exclude_devices)
def test_exclude_devices_with_spaces(self):
cfg.CONF.set_override('exclude_devices',
@ -94,7 +94,7 @@ class TestSriovAgentConfig(base.BaseTestCase):
'SRIOV_NIC')
exclude_devices = config.parse_exclude_devices(
cfg.CONF.SRIOV_NIC.exclude_devices)
self.assertEqual(exclude_devices, self.EXCLUDE_DEVICES)
self.assertEqual(self.EXCLUDE_DEVICES, exclude_devices)
def test_exclude_devices_with_error(self):
cfg.CONF.set_override('exclude_devices',
@ -114,8 +114,8 @@ class TestSriovAgentConfig(base.BaseTestCase):
config_parser.parse()
device_mappings = config_parser.device_mappings
exclude_devices = config_parser.exclude_devices
self.assertEqual(exclude_devices, self.EXCLUDE_DEVICES)
self.assertEqual(device_mappings, self.DEVICE_MAPPING)
self.assertEqual(self.EXCLUDE_DEVICES, exclude_devices)
self.assertEqual(self.DEVICE_MAPPING, device_mappings)
def test_validate_config_fail(self):
cfg.CONF.set_override('physical_device_mappings',

View File

@ -75,31 +75,31 @@ class TestMeteringOperations(base.BaseTestCase):
def test_add_metering_label(self):
self.agent.add_metering_label(None, ROUTERS)
self.assertEqual(self.driver.add_metering_label.call_count, 1)
self.assertEqual(1, self.driver.add_metering_label.call_count)
def test_remove_metering_label(self):
self.agent.remove_metering_label(None, ROUTERS)
self.assertEqual(self.driver.remove_metering_label.call_count, 1)
self.assertEqual(1, self.driver.remove_metering_label.call_count)
def test_update_metering_label_rule(self):
self.agent.update_metering_label_rules(None, ROUTERS)
self.assertEqual(self.driver.update_metering_label_rules.call_count, 1)
self.assertEqual(1, self.driver.update_metering_label_rules.call_count)
def test_add_metering_label_rule(self):
self.agent.add_metering_label_rule(None, ROUTERS_WITH_RULE)
self.assertEqual(self.driver.add_metering_label_rule.call_count, 1)
self.assertEqual(1, self.driver.add_metering_label_rule.call_count)
def test_remove_metering_label_rule(self):
self.agent.remove_metering_label_rule(None, ROUTERS_WITH_RULE)
self.assertEqual(self.driver.remove_metering_label_rule.call_count, 1)
self.assertEqual(1, self.driver.remove_metering_label_rule.call_count)
def test_routers_updated(self):
self.agent.routers_updated(None, ROUTERS)
self.assertEqual(self.driver.update_routers.call_count, 1)
self.assertEqual(1, self.driver.update_routers.call_count)
def test_get_traffic_counters(self):
self.agent._get_traffic_counters(None, ROUTERS)
self.assertEqual(self.driver.get_traffic_counters.call_count, 1)
self.assertEqual(1, self.driver.get_traffic_counters.call_count)
def test_notification_report(self):
self.agent.routers_updated(None, ROUTERS)
@ -114,13 +114,13 @@ class TestMeteringOperations(base.BaseTestCase):
if n['event_type'] == 'l3.meter':
break
self.assertEqual(n['event_type'], 'l3.meter')
self.assertEqual('l3.meter', n['event_type'])
payload = n['payload']
self.assertEqual(payload['tenant_id'], TENANT_ID)
self.assertEqual(payload['label_id'], LABEL_ID)
self.assertEqual(payload['pkts'], 88)
self.assertEqual(payload['bytes'], 444)
self.assertEqual(TENANT_ID, payload['tenant_id'])
self.assertEqual(LABEL_ID, payload['label_id'])
self.assertEqual(88, payload['pkts'])
self.assertEqual(444, payload['bytes'])
def test_router_deleted(self):
label_id = _uuid()
@ -133,8 +133,8 @@ class TestMeteringOperations(base.BaseTestCase):
self.agent.routers_updated(None, ROUTERS)
self.agent.router_deleted(None, ROUTERS[0]['id'])
self.assertEqual(self.agent._add_metering_info.call_count, 1)
self.assertEqual(self.driver.remove_router.call_count, 1)
self.assertEqual(1, self.agent._add_metering_info.call_count)
self.assertEqual(1, self.driver.remove_router.call_count)
self.agent._add_metering_info.assert_called_with(label_id, 44, 222)

View File

@ -45,8 +45,8 @@ class ParseServiceProviderConfigurationTestCase(base.BaseTestCase):
'driver': 'driver_path',
'default': False}
res = provconf.parse_service_provider_opt()
self.assertEqual(len(res), 1)
self.assertEqual(res, [expected])
self.assertEqual(1, len(res))
self.assertEqual([expected], res)
def test_parse_single_default_service_provider_opt(self):
self._set_override([constants.LOADBALANCER +
@ -56,8 +56,8 @@ class ParseServiceProviderConfigurationTestCase(base.BaseTestCase):
'driver': 'driver_path',
'default': True}
res = provconf.parse_service_provider_opt()
self.assertEqual(len(res), 1)
self.assertEqual(res, [expected])
self.assertEqual(1, len(res))
self.assertEqual([expected], res)
def test_parse_multi_service_provider_opt(self):
self._set_override([constants.LOADBALANCER +
@ -124,11 +124,11 @@ class ProviderConfigurationTestCase(base.BaseTestCase):
'driver': 'path',
'default': False}
pconf.add_provider(prov)
self.assertEqual(len(pconf.providers), 1)
self.assertEqual(list(pconf.providers.keys()),
[(constants.LOADBALANCER, 'name')])
self.assertEqual(list(pconf.providers.values()),
[{'driver': 'path', 'default': False}])
self.assertEqual(1, len(pconf.providers))
self.assertEqual([(constants.LOADBALANCER, 'name')],
list(pconf.providers.keys()))
self.assertEqual([{'driver': 'path', 'default': False}],
list(pconf.providers.values()))
def test_add_duplicate_provider(self):
pconf = provconf.ProviderConfiguration()
@ -138,7 +138,7 @@ class ProviderConfigurationTestCase(base.BaseTestCase):
'default': False}
pconf.add_provider(prov)
self.assertRaises(n_exc.Invalid, pconf.add_provider, prov)
self.assertEqual(len(pconf.providers), 1)
self.assertEqual(1, len(pconf.providers))
def test_get_service_providers(self):
self._set_override([constants.LOADBALANCER + ':name:path',
@ -168,7 +168,7 @@ class ProviderConfigurationTestCase(base.BaseTestCase):
filters={'name': [prov['name']],
'service_type': prov['service_type']}
)
self.assertEqual(p, [prov])
self.assertEqual([prov], p)
def test_get_service_providers_with_fields(self):
self._set_override([constants.LOADBALANCER + ":name:path",
@ -188,7 +188,7 @@ class ProviderConfigurationTestCase(base.BaseTestCase):
'service_type': prov['service_type']},
fields=['name']
)
self.assertEqual(p, [{'name': prov['name']}])
self.assertEqual([{'name': prov['name']}], p)
class GetProviderDriverClassTestCase(base.BaseTestCase):

View File

@ -37,32 +37,32 @@ class NeutronKeystoneContextTestCase(base.BaseTestCase):
def test_no_user_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '401 Unauthorized')
self.assertEqual('401 Unauthorized', response.status)
def test_with_user_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.user_id, 'testuserid')
self.assertEqual(self.context.user, 'testuserid')
self.assertEqual('200 OK', response.status)
self.assertEqual('testuserid', self.context.user_id)
self.assertEqual('testuserid', self.context.user)
def test_with_tenant_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'test_user_id'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.tenant_id, 'testtenantid')
self.assertEqual(self.context.tenant, 'testtenantid')
self.assertEqual('200 OK', response.status)
self.assertEqual('testtenantid', self.context.tenant_id)
self.assertEqual('testtenantid', self.context.tenant)
def test_roles_no_admin(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.headers['X_ROLES'] = 'role1, role2 , role3,role4,role5'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.roles, ['role1', 'role2', 'role3',
'role4', 'role5'])
self.assertEqual('200 OK', response.status)
self.assertEqual(['role1', 'role2', 'role3', 'role4', 'role5'],
self.context.roles)
self.assertFalse(self.context.is_admin)
def test_roles_with_admin(self):
@ -71,9 +71,9 @@ class NeutronKeystoneContextTestCase(base.BaseTestCase):
self.request.headers['X_ROLES'] = ('role1, role2 , role3,role4,role5,'
'AdMiN')
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.roles, ['role1', 'role2', 'role3',
'role4', 'role5', 'AdMiN'])
self.assertEqual('200 OK', response.status)
self.assertEqual(['role1', 'role2', 'role3', 'role4', 'role5',
'AdMiN'], self.context.roles)
self.assertTrue(self.context.is_admin)
def test_with_user_tenant_name(self):
@ -82,11 +82,11 @@ class NeutronKeystoneContextTestCase(base.BaseTestCase):
self.request.headers['X_PROJECT_NAME'] = 'testtenantname'
self.request.headers['X_USER_NAME'] = 'testusername'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.user_id, 'testuserid')
self.assertEqual(self.context.user_name, 'testusername')
self.assertEqual(self.context.tenant_id, 'testtenantid')
self.assertEqual(self.context.tenant_name, 'testtenantname')
self.assertEqual('200 OK', response.status)
self.assertEqual('testuserid', self.context.user_id)
self.assertEqual('testusername', self.context.user_name)
self.assertEqual('testtenantid', self.context.tenant_id)
self.assertEqual('testtenantname', self.context.tenant_name)
def test_request_id_extracted_from_env(self):
req_id = 'dummy-request-id'
@ -100,8 +100,8 @@ class NeutronKeystoneContextTestCase(base.BaseTestCase):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.auth_token, 'testauthtoken')
self.assertEqual('200 OK', response.status)
self.assertEqual('testauthtoken', self.context.auth_token)
def test_without_auth_token(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'

View File

@ -95,7 +95,7 @@ class TestGetIgnoredTraceback(base.BaseTestCase):
actual = post_mortem_debug.get_ignored_traceback(root_tb)
if expected is not None:
expected = tracebacks[expected]
self.assertEqual(actual, expected)
self.assertEqual(expected, actual)
def test_no_ignored_tracebacks(self):
self._test_get_ignored_traceback([0, 0, 0], None)