Fix params order in assertEqual

Fix params order to correspond to real signature:
assertEqual(expected, actual)

Change-Id: I722b998f6eae47076f3d10213073296a0a9a2081
Closes-Bug: #1277104
This commit is contained in:
Yatin Kumbhare 2016-01-21 13:55:34 +05:30
parent b1999202b8
commit 6b4be76e14
27 changed files with 594 additions and 600 deletions

View File

@ -70,23 +70,23 @@ class ResourceIndexTestCase(base.BaseTestCase):
res = index.get('')
self.assertIn('resources', res.json)
self.assertEqual(len(res.json['resources']), 1)
self.assertEqual(1, len(res.json['resources']))
resource = res.json['resources'][0]
self.assertIn('collection', resource)
self.assertEqual(resource['collection'], 'bar')
self.assertEqual('bar', resource['collection'])
self.assertIn('name', resource)
self.assertEqual(resource['name'], 'foo')
self.assertEqual('foo', resource['name'])
self.assertIn('links', resource)
self.assertEqual(len(resource['links']), 1)
self.assertEqual(1, len(resource['links']))
link = resource['links'][0]
self.assertIn('href', link)
self.assertEqual(link['href'], 'http://localhost/bar')
self.assertIn('rel', link)
self.assertEqual(link['rel'], 'self')
self.assertEqual('self', link['rel'])
class APIv2TestBase(base.BaseTestCase):
@ -326,7 +326,7 @@ class APIv2TestCase(APIv2TestBase):
res = self.api.get(_get_path('networks'), {'limit': -1},
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_limit_with_non_integer(self):
instance = self.plugin.return_value
@ -334,7 +334,7 @@ class APIv2TestCase(APIv2TestBase):
res = self.api.get(_get_path('networks'),
{'limit': 'abc'}, expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_limit_with_infinite_pagination_max_limit(self):
instance = self.plugin.return_value
@ -436,7 +436,7 @@ class APIv2TestCase(APIv2TestBase):
res = self.api.get(_get_path('networks'), {'sort_key': ['name']},
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_sort_with_invalid_attribute(self):
instance = self.plugin.return_value
@ -446,7 +446,7 @@ class APIv2TestCase(APIv2TestBase):
{'sort_key': 'abc',
'sort_dir': 'asc'},
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_sort_with_invalid_dirs(self):
instance = self.plugin.return_value
@ -456,7 +456,7 @@ class APIv2TestCase(APIv2TestBase):
{'sort_key': 'name',
'sort_dir': 'abc'},
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_emulated_sort(self):
instance = self.plugin.return_value
@ -546,7 +546,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.assertIn('networks', res)
if not req_tenant_id or req_tenant_id == real_tenant_id:
# expect full list returned
self.assertEqual(len(res['networks']), 1)
self.assertEqual(1, len(res['networks']))
output_dict = res['networks'][0]
input_dict['shared'] = False
self.assertEqual(len(input_dict), len(output_dict))
@ -554,7 +554,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.assertEqual(v, output_dict[k])
else:
# expect no results
self.assertEqual(len(res['networks']), 0)
self.assertEqual(0, len(res['networks']))
def test_list_noauth(self):
self._test_list(None, _uuid())
@ -594,7 +594,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(len(res['networks']), 2)
self.assertEqual(2, len(res['networks']))
self.assertEqual(sorted([id1, id2]),
sorted([res['networks'][0]['id'],
res['networks'][1]['id']]))
@ -607,19 +607,19 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
next_links.append(r)
if r['rel'] == 'previous':
previous_links.append(r)
self.assertEqual(len(next_links), 1)
self.assertEqual(len(previous_links), 1)
self.assertEqual(1, len(next_links))
self.assertEqual(1, len(previous_links))
url = urlparse.urlparse(next_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
params['marker'] = [id2]
self.assertEqual(urlparse.parse_qs(url.query), params)
self.assertEqual(params, urlparse.parse_qs(url.query))
url = urlparse.urlparse(previous_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
params['marker'] = [id1]
params['page_reverse'] = ['True']
self.assertEqual(urlparse.parse_qs(url.query), params)
self.assertEqual(params, urlparse.parse_qs(url.query))
def test_list_pagination_with_last_page(self):
id = str(_uuid())
@ -638,7 +638,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(len(res['networks']), 1)
self.assertEqual(1, len(res['networks']))
self.assertEqual(id, res['networks'][0]['id'])
self.assertIn('networks_links', res)
@ -647,14 +647,14 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.assertNotEqual(r['rel'], 'next')
if r['rel'] == 'previous':
previous_links.append(r)
self.assertEqual(len(previous_links), 1)
self.assertEqual(1, len(previous_links))
url = urlparse.urlparse(previous_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
expect_params = params.copy()
expect_params['marker'] = [id]
expect_params['page_reverse'] = ['True']
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
self.assertEqual(expect_params, urlparse.parse_qs(url.query))
def test_list_pagination_with_empty_page(self):
return_value = []
@ -673,14 +673,14 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.assertNotEqual(r['rel'], 'next')
if r['rel'] == 'previous':
previous_links.append(r)
self.assertEqual(len(previous_links), 1)
self.assertEqual(1, len(previous_links))
url = urlparse.urlparse(previous_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
expect_params = params.copy()
del expect_params['marker']
expect_params['page_reverse'] = ['True']
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
self.assertEqual(expect_params, urlparse.parse_qs(url.query))
def test_list_pagination_reverse_with_last_page(self):
id = str(_uuid())
@ -709,15 +709,15 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.assertNotEqual(r['rel'], 'previous')
if r['rel'] == 'next':
next_links.append(r)
self.assertEqual(len(next_links), 1)
self.assertEqual(1, len(next_links))
url = urlparse.urlparse(next_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
expected_params = params.copy()
del expected_params['page_reverse']
expected_params['marker'] = [id]
self.assertEqual(urlparse.parse_qs(url.query),
expected_params)
self.assertEqual(expected_params,
urlparse.parse_qs(url.query))
def test_list_pagination_reverse_with_empty_page(self):
return_value = []
@ -736,14 +736,14 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.assertNotEqual(r['rel'], 'previous')
if r['rel'] == 'next':
next_links.append(r)
self.assertEqual(len(next_links), 1)
self.assertEqual(1, len(next_links))
url = urlparse.urlparse(next_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
expect_params = params.copy()
del expect_params['marker']
del expect_params['page_reverse']
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
self.assertEqual(expect_params, urlparse.parse_qs(url.query))
def test_create(self):
net_id = _uuid()
@ -760,12 +760,12 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
res = self.api.post(_get_path('networks', fmt=self.fmt),
self.serialize(data),
content_type='application/' + self.fmt)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('network', res)
net = res['network']
self.assertEqual(net['id'], net_id)
self.assertEqual(net['status'], "ACTIVE")
self.assertEqual(net_id, net['id'])
self.assertEqual("ACTIVE", net['status'])
def test_create_use_defaults(self):
net_id = _uuid()
@ -786,13 +786,13 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
content_type='application/' + self.fmt)
instance.create_network.assert_called_with(mock.ANY,
network=full_input)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('network', res)
net = res['network']
self.assertEqual(net['id'], net_id)
self.assertEqual(net_id, net['id'])
self.assertTrue(net['admin_state_up'])
self.assertEqual(net['status'], "ACTIVE")
self.assertEqual("ACTIVE", net['status'])
def test_create_no_keystone_env(self):
data = {'name': 'net1'}
@ -822,7 +822,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
instance.create_network.assert_called_with(mock.ANY,
network=full_input)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
def test_create_bad_keystone_tenant(self):
tenant_id = _uuid()
@ -864,7 +864,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.serialize(data),
content_type='application/' + self.fmt,
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_create_bulk(self):
data = {'networks': [{'name': 'net1',
@ -885,14 +885,14 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
res = self.api.post(_get_path('networks', fmt=self.fmt),
self.serialize(data),
content_type='application/' + self.fmt)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
def _test_create_failure_bad_request(self, resource, data, **kwargs):
res = self.api.post(_get_path(resource, fmt=self.fmt),
self.serialize(data),
content_type='application/' + self.fmt,
expect_errors=True, **kwargs)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_create_bulk_networks_none(self):
self._test_create_failure_bad_request('networks', {'networks': None})
@ -940,12 +940,12 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.serialize(initial_input),
content_type='application/' + self.fmt)
instance.create_port.assert_called_with(mock.ANY, port=full_input)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('port', res)
port = res['port']
self.assertEqual(port['network_id'], net_id)
self.assertEqual(port['mac_address'], 'ca:fe:de:ad:be:ef')
self.assertEqual(net_id, port['network_id'])
self.assertEqual('ca:fe:de:ad:be:ef', port['mac_address'])
def test_create_return_extra_attr(self):
net_id = _uuid()
@ -962,12 +962,12 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
res = self.api.post(_get_path('networks', fmt=self.fmt),
self.serialize(data),
content_type='application/' + self.fmt)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('network', res)
net = res['network']
self.assertEqual(net['id'], net_id)
self.assertEqual(net['status'], "ACTIVE")
self.assertEqual(net_id, net['id'])
self.assertEqual("ACTIVE", net['status'])
self.assertNotIn('v2attrs:something', net)
def test_fields(self):
@ -996,7 +996,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
fmt=self.fmt),
extra_environ=env,
expect_errors=expect_errors)
self.assertEqual(res.status_int, expected_code)
self.assertEqual(expected_code, res.status_int)
def test_delete_noauth(self):
self._test_delete(None, _uuid(), exc.HTTPNoContent.code)
@ -1029,7 +1029,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
fmt=self.fmt),
extra_environ=env,
expect_errors=expect_errors)
self.assertEqual(res.status_int, expected_code)
self.assertEqual(expected_code, res.status_int)
return res
def test_get_noauth(self):
@ -1103,7 +1103,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.serialize(data),
content_type='application/' + self.fmt,
expect_errors=True)
self.assertEqual(res.status_int, 400)
self.assertEqual(400, res.status_int)
def test_invalid_attribute_field(self):
data = {'network': {'invalid_key1': "foo1", 'invalid_key2': "foo2"}}
@ -1111,7 +1111,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.serialize(data),
content_type='application/' + self.fmt,
expect_errors=True)
self.assertEqual(res.status_int, 400)
self.assertEqual(400, res.status_int)
class SubresourceTest(base.BaseTestCase):
@ -1308,13 +1308,13 @@ class NotificationTest(APIv2TestBase):
expected_events = ('.'.join([resource, opname, "start"]),
'.'.join([resource, opname, "end"]))
self.assertEqual(len(fake_notifier.NOTIFICATIONS),
len(expected_events))
self.assertEqual(len(expected_events),
len(fake_notifier.NOTIFICATIONS))
for msg, event in zip(fake_notifier.NOTIFICATIONS, expected_events):
self.assertEqual('INFO', msg['priority'])
self.assertEqual(event, msg['event_type'])
self.assertEqual(res.status_int, expected_code)
self.assertEqual(expected_code, res.status_int)
def test_network_create_notifer(self):
self._resource_op_notifier('create', 'network')
@ -1440,7 +1440,7 @@ class QuotaTest(APIv2TestBase):
instance.get_networks_count.return_value = 3
res = self.api.post_json(
_get_path('networks'), initial_input)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
class ExtensionTestCase(base.BaseTestCase):
@ -1500,12 +1500,12 @@ class ExtensionTestCase(base.BaseTestCase):
instance.create_network.assert_called_with(mock.ANY,
network=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
self.assertIn('network', res.json)
net = res.json['network']
self.assertEqual(net['id'], net_id)
self.assertEqual(net['status'], "ACTIVE")
self.assertEqual(net['v2attrs:something'], "123")
self.assertEqual(net_id, net['id'])
self.assertEqual("ACTIVE", net['status'])
self.assertEqual("123", net['v2attrs:something'])
self.assertNotIn('v2attrs:something_else', net)
@ -1537,7 +1537,7 @@ class ListArgsTestCase(base.BaseTestCase):
request = webob.Request.blank(path)
expect_val = ['2', '4']
actual_val = api_common.list_args(request, 'fields')
self.assertEqual(sorted(actual_val), expect_val)
self.assertEqual(expect_val, sorted(actual_val))
def test_list_args_with_empty(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
@ -1562,7 +1562,7 @@ class FiltersTestCase(base.BaseTestCase):
request = webob.Request.blank(path)
expect_val = {'foo': ['4'], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
actual_val = api_common.get_filters(request, {})
self.assertEqual(actual_val, expect_val)
self.assertEqual(expect_val, actual_val)
def test_attr_info_without_conversion(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
@ -1570,7 +1570,7 @@ class FiltersTestCase(base.BaseTestCase):
attr_info = {'foo': {'key': 'val'}}
expect_val = {'foo': ['4'], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
actual_val = api_common.get_filters(request, attr_info)
self.assertEqual(actual_val, expect_val)
self.assertEqual(expect_val, actual_val)
def test_attr_info_with_convert_list_to(self):
path = '/?foo=key=4&bar=3&foo=key=2&qux=1'
@ -1590,7 +1590,7 @@ class FiltersTestCase(base.BaseTestCase):
attr_info = {'foo': {'convert_to': attributes.convert_to_int}}
expect_val = {'foo': [4], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
actual_val = api_common.get_filters(request, attr_info)
self.assertEqual(actual_val, expect_val)
self.assertEqual(expect_val, actual_val)
class CreateResourceTestCase(base.BaseTestCase):

View File

@ -40,47 +40,47 @@ class RequestTestCase(base.BaseTestCase):
request = wsgi.Request.blank('/tests/123')
request.headers["Content-Type"] = "application/json; charset=UTF-8"
result = request.get_content_type()
self.assertEqual(result, "application/json")
self.assertEqual("application/json", result)
def test_content_type_from_accept(self):
content_type = 'application/json'
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = content_type
result = request.best_match_content_type()
self.assertEqual(result, content_type)
self.assertEqual(content_type, result)
def test_content_type_from_accept_best(self):
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/json"
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
self.assertEqual("application/json", result)
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = ("application/json; q=0.3, "
"application/xml; q=0.9")
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
self.assertEqual("application/json", result)
def test_content_type_from_query_extension(self):
request = wsgi.Request.blank('/tests/123.json')
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
self.assertEqual("application/json", result)
request = wsgi.Request.blank('/tests/123.invalid')
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
self.assertEqual("application/json", result)
def test_content_type_accept_and_query_extension(self):
request = wsgi.Request.blank('/tests/123.json')
request.headers["Accept"] = "application/xml"
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
self.assertEqual("application/json", result)
def test_content_type_accept_default(self):
request = wsgi.Request.blank('/tests/123.unsupported')
request.headers["Accept"] = "application/unsupported1"
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
self.assertEqual("application/json", result)
def test_context_with_neutron_context(self):
ctxt = context.Context('fake_user', 'fake_tenant')
@ -108,7 +108,7 @@ class RequestTestCase(base.BaseTestCase):
'es', 'zh']
request.headers['Accept-Language'] = 'known-language'
language = request.best_match_language()
self.assertEqual(language, 'known-language')
self.assertEqual('known-language', language)
# If the Accept-Leader is an unknown language, missing or empty,
# the best match locale should be None
@ -147,9 +147,9 @@ class ResourceTestCase(base.BaseTestCase):
environ = {'wsgiorg.routing_args': (None, {'action': 'test',
'format': 'json'})}
res = resource.get('', extra_environ=environ, expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPInternalServerError.code)
self.assertEqual(wsgi.JSONDeserializer().deserialize(res.body),
expected_res)
self.assertEqual(exc.HTTPInternalServerError.code, res.status_int)
self.assertEqual(expected_res,
wsgi.JSONDeserializer().deserialize(res.body))
@mock.patch('oslo_i18n.translate')
def test_unmapped_neutron_error_localized(self, mock_translation):
@ -168,7 +168,7 @@ class ResourceTestCase(base.BaseTestCase):
'format': 'json'})}
res = resource.get('', extra_environ=environ, expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPInternalServerError.code)
self.assertEqual(exc.HTTPInternalServerError.code, res.status_int)
self.assertIn(msg_translation,
str(wsgi.JSONDeserializer().deserialize(res.body)))
@ -192,9 +192,9 @@ class ResourceTestCase(base.BaseTestCase):
environ = {'wsgiorg.routing_args': (None, {'action': 'test',
'format': 'json'})}
res = resource.get('', extra_environ=environ, expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPGatewayTimeout.code)
self.assertEqual(wsgi.JSONDeserializer().deserialize(res.body),
expected_res)
self.assertEqual(exc.HTTPGatewayTimeout.code, res.status_int)
self.assertEqual(expected_res,
wsgi.JSONDeserializer().deserialize(res.body))
@mock.patch('oslo_i18n.translate')
def test_mapped_neutron_error_localized(self, mock_translation):
@ -278,7 +278,7 @@ class ResourceTestCase(base.BaseTestCase):
environ = {'wsgiorg.routing_args': (None, {'action': 'test'})}
res = resource.get('', extra_environ=environ)
self.assertEqual(res.status_int, 200)
self.assertEqual(200, res.status_int)
def test_status_204(self):
controller = mock.MagicMock()
@ -288,7 +288,7 @@ class ResourceTestCase(base.BaseTestCase):
environ = {'wsgiorg.routing_args': (None, {'action': 'delete'})}
res = resource.delete('', extra_environ=environ)
self.assertEqual(res.status_int, 204)
self.assertEqual(204, res.status_int)
def _test_error_log_level(self, expected_webob_exc, expect_log_info=False,
use_fault_map=True, exc_raised=None):
@ -304,7 +304,7 @@ class ResourceTestCase(base.BaseTestCase):
environ = {'wsgiorg.routing_args': (None, {'action': 'test'})}
with mock.patch.object(wsgi_resource, 'LOG') as log:
res = resource.get('', extra_environ=environ, expect_errors=True)
self.assertEqual(res.status_int, expected_webob_exc.code)
self.assertEqual(expected_webob_exc.code, res.status_int)
self.assertEqual(expect_log_info, log.info.called)
self.assertNotEqual(expect_log_info, log.exception.called)
@ -336,7 +336,7 @@ class ResourceTestCase(base.BaseTestCase):
environ = {}
res = resource.get('', extra_environ=environ, expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPInternalServerError.code)
self.assertEqual(exc.HTTPInternalServerError.code, res.status_int)
def test_post_with_body(self):
controller = mock.MagicMock()
@ -347,4 +347,4 @@ class ResourceTestCase(base.BaseTestCase):
environ = {'wsgiorg.routing_args': (None, {'action': 'test'})}
res = resource.post('', params='{"key": "val"}',
extra_environ=environ)
self.assertEqual(res.status_int, 200)
self.assertEqual(200, res.status_int)

View File

@ -60,15 +60,15 @@ class TestParseMappings(base.BaseTestCase):
self.parse(['key1:val', 'key2:val'])
def test_parse_mappings_succeeds_for_one_mapping(self):
self.assertEqual(self.parse(['key:val']), {'key': 'val'})
self.assertEqual({'key': 'val'}, self.parse(['key:val']))
def test_parse_mappings_succeeds_for_n_mappings(self):
self.assertEqual(self.parse(['key1:val1', 'key2:val2']),
{'key1': 'val1', 'key2': 'val2'})
self.assertEqual({'key1': 'val1', 'key2': 'val2'},
self.parse(['key1:val1', 'key2:val2']))
def test_parse_mappings_succeeds_for_duplicate_value(self):
self.assertEqual(self.parse(['key1:val', 'key2:val'], False),
{'key1': 'val', 'key2': 'val'})
self.assertEqual({'key1': 'val', 'key2': 'val'},
self.parse(['key1:val', 'key2:val'], False))
def test_parse_mappings_succeeds_for_no_mappings(self):
self.assertEqual({}, self.parse(['']))
@ -253,12 +253,12 @@ class TestParseOneVlanRange(UtilTestParseVlanRanges):
def test_parse_one_net_no_vlan_range(self):
config_str = "net1"
expected_networks = ("net1", None)
self.assertEqual(self.parse_one(config_str), expected_networks)
self.assertEqual(expected_networks, self.parse_one(config_str))
def test_parse_one_net_and_vlan_range(self):
config_str = "net1:100:199"
expected_networks = ("net1", (100, 199))
self.assertEqual(self.parse_one(config_str), expected_networks)
self.assertEqual(expected_networks, self.parse_one(config_str))
def test_parse_one_net_incomplete_range(self):
config_str = "net1:100"
@ -294,7 +294,7 @@ class TestParseOneVlanRange(UtilTestParseVlanRanges):
def test_parse_one_net_and_max_range(self):
config_str = "net1:1:4094"
expected_networks = ("net1", (1, 4094))
self.assertEqual(self.parse_one(config_str), expected_networks)
self.assertEqual(expected_networks, self.parse_one(config_str))
def test_parse_one_net_range_bad_vlan1(self):
config_str = "net1:9000:150"
@ -318,33 +318,33 @@ class TestParseVlanRangeList(UtilTestParseVlanRanges):
def test_parse_list_one_net_no_vlan_range(self):
config_list = ["net1"]
expected_networks = {"net1": []}
self.assertEqual(self.parse_list(config_list), expected_networks)
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_list_one_net_vlan_range(self):
config_list = ["net1:100:199"]
expected_networks = {"net1": [(100, 199)]}
self.assertEqual(self.parse_list(config_list), expected_networks)
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_no_vlan_range(self):
config_list = ["net1",
"net2"]
expected_networks = {"net1": [],
"net2": []}
self.assertEqual(self.parse_list(config_list), expected_networks)
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_range_and_no_range(self):
config_list = ["net1:100:199",
"net2"]
expected_networks = {"net1": [(100, 199)],
"net2": []}
self.assertEqual(self.parse_list(config_list), expected_networks)
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_no_range_and_range(self):
config_list = ["net1",
"net2:200:299"]
expected_networks = {"net1": [],
"net2": [(200, 299)]}
self.assertEqual(self.parse_list(config_list), expected_networks)
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_bad_vlan_range1(self):
config_list = ["net1:100",
@ -369,7 +369,7 @@ class TestParseVlanRangeList(UtilTestParseVlanRanges):
expected_networks = {"net1": [(100, 199),
(1000, 1099)],
"net2": [(200, 299)]}
self.assertEqual(self.parse_list(config_list), expected_networks)
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_and_append_1_3(self):
config_list = ["net1:100:199",
@ -378,23 +378,23 @@ class TestParseVlanRangeList(UtilTestParseVlanRanges):
expected_networks = {"net1": [(100, 199),
(1000, 1099)],
"net2": [(200, 299)]}
self.assertEqual(self.parse_list(config_list), expected_networks)
self.assertEqual(expected_networks, self.parse_list(config_list))
class TestDictUtils(base.BaseTestCase):
def test_dict2str(self):
dic = {"key1": "value1", "key2": "value2", "key3": "value3"}
expected = "key1=value1,key2=value2,key3=value3"
self.assertEqual(utils.dict2str(dic), expected)
self.assertEqual(expected, utils.dict2str(dic))
def test_str2dict(self):
string = "key1=value1,key2=value2,key3=value3"
expected = {"key1": "value1", "key2": "value2", "key3": "value3"}
self.assertEqual(utils.str2dict(string), expected)
self.assertEqual(expected, utils.str2dict(string))
def test_dict_str_conversion(self):
dic = {"key1": "value1", "key2": "value2"}
self.assertEqual(utils.str2dict(utils.dict2str(dic)), dic)
self.assertEqual(dic, utils.str2dict(utils.dict2str(dic)))
def test_diff_list_of_dict(self):
old_list = [{"key1": "value1"},
@ -633,12 +633,12 @@ class TestCidrIsHost(base.BaseTestCase):
class TestIpVersionFromInt(base.BaseTestCase):
def test_ip_version_from_int_ipv4(self):
self.assertEqual(utils.ip_version_from_int(4),
constants.IPv4)
self.assertEqual(constants.IPv4,
utils.ip_version_from_int(4))
def test_ip_version_from_int_ipv6(self):
self.assertEqual(utils.ip_version_from_int(6),
constants.IPv6)
self.assertEqual(constants.IPv6,
utils.ip_version_from_int(6))
def test_ip_version_from_int_illegal_int(self):
self.assertRaises(ValueError,

View File

@ -63,7 +63,7 @@ class AgentSchedulerTestMixIn(object):
expected_code=exc.HTTPOk.code):
req = self._path_req(path, admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
self.assertEqual(expected_code, res.status_int)
return self.deserialize(self.fmt, res)
def _path_req(self, path, method='GET', data=None,
@ -142,7 +142,7 @@ class AgentSchedulerTestMixIn(object):
{'router_id': router_id},
admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
self.assertEqual(expected_code, res.status_int)
def _add_network_to_dhcp_agent(self, id, network_id,
expected_code=exc.HTTPCreated.code,
@ -154,7 +154,7 @@ class AgentSchedulerTestMixIn(object):
{'network_id': network_id},
admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
self.assertEqual(expected_code, res.status_int)
def _remove_network_from_dhcp_agent(self, id, network_id,
expected_code=exc.HTTPNoContent.code,
@ -166,7 +166,7 @@ class AgentSchedulerTestMixIn(object):
req = self._path_delete_request(path,
admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
self.assertEqual(expected_code, res.status_int)
def _remove_router_from_l3_agent(self, id, router_id,
expected_code=exc.HTTPNoContent.code,
@ -177,7 +177,7 @@ class AgentSchedulerTestMixIn(object):
self.fmt)
req = self._path_delete_request(path, admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
self.assertEqual(expected_code, res.status_int)
def _assert_notify(self, notifications, expected_event_type):
event_types = [event['event_type'] for event in notifications]

View File

@ -151,7 +151,7 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase):
port_security_enabled=False,
allowed_address_pairs=address_pairs)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
self.assertEqual(409, res.status_int)
address_pairs = []
res = self._create_port(self.fmt, net['network']['id'],

File diff suppressed because it is too large Load Diff

View File

@ -462,7 +462,7 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(ips[0]['ip_address'], auto_ip)
self.assertEqual(auto_ip, ips[0]['ip_address'])
# Update port with another new ip
data = {"port": {"fixed_ips": [{
'subnet_id': subnet['subnet']['id'],
@ -489,7 +489,7 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(ips[0]['ip_address'], auto_ip)
self.assertEqual(auto_ip, ips[0]['ip_address'])
req = self.new_delete_request('ports', port['port']['id'])
res = req.get_response(self.api)
@ -502,14 +502,14 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(ips[0]['ip_address'], ip)
self.assertEqual(ip, ips[0]['ip_address'])
req = self.new_delete_request('ports', port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
with self.port(subnet=subnet, fixed_ips=ips) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(ips[0]['ip_address'], ip)
self.assertEqual(ip, ips[0]['ip_address'])
@mock.patch('neutron.ipam.driver.Pool')
def test_update_ips_for_port_passes_port_dict_to_factory(self, pool_mock):

View File

@ -117,4 +117,4 @@ class ExtensionTestCase(testlib_api.WebTestCase):
test_base._get_path(path, id=entity_id, fmt=self.fmt))
delete_entity = getattr(self.plugin.return_value, "delete_" + entity)
delete_entity.assert_called_with(mock.ANY, entity_id)
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
self.assertEqual(exc.HTTPNoContent.code, res.status_int)

View File

@ -66,7 +66,7 @@ class AddressScopeTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
address_scope_res = address_scope_req.get_response(self.ext_api)
if expected_res_status:
self.assertEqual(address_scope_res.status_int, expected_res_status)
self.assertEqual(expected_res_status, address_scope_res.status_int)
return address_scope_res
def _make_address_scope(self, fmt, ip_version, admin=False, **kwargs):

View File

@ -76,7 +76,7 @@ class AgentDBTestMixIn(object):
neutron_context=neutron_context,
query_params=query_string)
if expected_res_status:
self.assertEqual(agent_res.status_int, expected_res_status)
self.assertEqual(expected_res_status, agent_res.status_int)
return agent_res
def _register_agent_states(self, lbaas_agents=False):
@ -142,7 +142,7 @@ class AgentDBTestCase(AgentDBTestMixIn,
_req.environ['neutron.context'] = context.Context(
'', 'tenant_id')
res = _req.get_response(self.ext_api)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_list_agent(self):
agents = self._register_agent_states()

View File

@ -84,7 +84,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
self.assertEqual(expected_res_status, port_res.status_int)
return port_res
def _test_list_resources(self, resource, items, neutron_context=None,
@ -104,8 +104,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual('myname', port['port']['name'])
self._verify_dns_assigment(port['port'],
ips_list=['10.0.0.2'])
@ -135,8 +135,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
self._verify_dns_assigment(res['port'],
ips_list=['10.0.0.2'],
dns_name='vm1')
@ -146,8 +146,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
self._verify_dns_assigment(res['port'],
ips_list=['10.0.0.2'])
@ -157,7 +157,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
ips_list = ips_list or []
ipv4_cidrs = ipv4_cidrs or []
ipv6_cidrs = ipv6_cidrs or []
self.assertEqual(port['dns_name'], dns_name)
self.assertEqual(dns_name, port['dns_name'])
dns_assignment = port['dns_assignment']
if ips_list:
self.assertEqual(len(dns_assignment), len(ips_list))
@ -240,8 +240,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
@ -250,7 +250,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(1, len(ips))
self.assertEqual(ips[0]['ip_address'], '10.0.0.10')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self._verify_dns_assigment(res['port'], ips_list=['10.0.0.10'])
@ -259,8 +259,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
@ -270,7 +270,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.10',
@ -281,54 +281,54 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
def test_create_port_with_multiple_ipv4_and_ipv6_subnets(self):
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets()
self.assertEqual(res.status_code, 201)
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_pqdn_and_dns_domain_no_period(
self):
cfg.CONF.set_override('dns_domain', 'example.com')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1')
self.assertEqual(res.status_code, 201)
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_pqdn_and_dns_domain_period(
self):
cfg.CONF.set_override('dns_domain', 'example.com.')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1')
self.assertEqual(res.status_code, 201)
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_pqdn_and_no_dns_domain(
self):
cfg.CONF.set_override('dns_domain', '')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets()
self.assertEqual(res.status_code, 201)
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_fqdn_and_dns_domain_no_period(
self):
cfg.CONF.set_override('dns_domain', 'example.com')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1.example.com.')
self.assertEqual(res.status_code, 201)
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_fqdn_and_dns_domain_period(
self):
cfg.CONF.set_override('dns_domain', 'example.com.')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1.example.com.')
self.assertEqual(res.status_code, 201)
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_fqdn_default_domain_period(
self):
cfg.CONF.set_override('dns_domain', 'openstacklocal.')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets()
self.assertEqual(res.status_code, 201)
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_bad_fqdn_and_dns_domain(
self):
cfg.CONF.set_override('dns_domain', 'example.com')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1.bad-domain.com.')
self.assertEqual(res.status_code, 400)
self.assertEqual(400, res.status_code)
expected_error = ('The dns_name passed is a FQDN. Its higher level '
'labels must be equal to the dns_domain option in '
'neutron.conf')
@ -345,7 +345,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
num_labels + 'a' * filler_len)
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name=dns_name)
self.assertEqual(res.status_code, 400)
self.assertEqual(400, res.status_code)
expected_error = ("When the two are concatenated to form a FQDN "
"(with a '.' at the end), the resulting length "
"exceeds the maximum size")
@ -435,7 +435,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
for dns_name in dns_names:
res = self._create_port(self.fmt, net_id=network['network']['id'],
dns_name=dns_name)
self.assertEqual(res.status_code, 400)
self.assertEqual(400, res.status_code)
is_expected_message = (
'cannot be converted to lowercase string' in res.text or
'not a valid PQDN or FQDN. Reason:' in res.text)
@ -471,4 +471,4 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
for dns_name in dns_names:
res = self._create_port(self.fmt, net_id=network['network']['id'],
dns_name=dns_name)
self.assertEqual(res.status_code, 201)
self.assertEqual(201, res.status_code)

View File

@ -69,17 +69,17 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self._set_net_external(n1['network']['id'])
with self.network():
body = self._list('networks')
self.assertEqual(len(body['networks']), 2)
self.assertEqual(2, len(body['networks']))
body = self._list('networks',
query_params="%s=True" %
external_net.EXTERNAL)
self.assertEqual(len(body['networks']), 1)
self.assertEqual(1, len(body['networks']))
body = self._list('networks',
query_params="%s=False" %
external_net.EXTERNAL)
self.assertEqual(len(body['networks']), 1)
self.assertEqual(1, len(body['networks']))
def test_list_nets_external_pagination(self):
if self._skip_native_pagination:
@ -157,7 +157,7 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
set_context='True',
tenant_id='noadmin'):
pass
self.assertEqual(ctx_manager.exception.code, 403)
self.assertEqual(403, ctx_manager.exception.code)
def test_create_port_external_network_admin_succeeds(self):
with self.network(router__external=True) as ext_net:
@ -172,7 +172,7 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
set_context='True',
tenant_id='noadmin'):
pass
self.assertEqual(ctx_manager.exception.code, 403)
self.assertEqual(403, ctx_manager.exception.code)
def test_create_external_network_admin_succeeds(self):
with self.network(router__external=True) as ext_net:
@ -186,6 +186,6 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
with self.network() as net:
req = self.new_delete_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
(l3_mock.delete_disassociated_floatingips
.assert_called_once_with(mock.ANY, net['network']['id']))

View File

@ -163,7 +163,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
req = self.new_update_request('ports', update_port,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
port = self.deserialize('json', res)
self._check_opts(expected_opts,
port['port'][edo_ext.EXTRADHCPOPTS])
@ -258,7 +258,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
req = self.new_update_request('ports', update_port,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_update_port_with_blank_name_extradhcpopt(self):
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
@ -277,7 +277,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
req = self.new_update_request('ports', update_port,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_update_port_with_blank_router_extradhcpopt(self):
opt_list = [{'opt_name': 'bootfile-name',

View File

@ -77,7 +77,7 @@ class ExtraRouteDBTestCaseBase(object):
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
self.assertEqual(body['router']['routes'], routes)
self.assertEqual(routes, body['router']['routes'])
self._routes_update_cleanup(p['port']['id'],
None, r['router']['id'], [])
@ -108,7 +108,7 @@ class ExtraRouteDBTestCaseBase(object):
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
self.assertEqual(body['router']['routes'], routes)
self.assertEqual(routes, body['router']['routes'])
self._router_interface_action(
'remove',
r['router']['id'],
@ -151,12 +151,12 @@ class ExtraRouteDBTestCaseBase(object):
body = self._routes_update_prepare(r1['router']['id'],
None, p1['port']['id'],
routes1)
self.assertEqual(body['router']['routes'], routes1)
self.assertEqual(routes1, body['router']['routes'])
body = self._routes_update_prepare(r2['router']['id'],
None, p2['port']['id'],
routes2)
self.assertEqual(body['router']['routes'], routes2)
self.assertEqual(routes2, body['router']['routes'])
self._routes_update_cleanup(p1['port']['id'],
None, r1['router']['id'], [])
@ -405,7 +405,7 @@ class ExtraRouteDBTestCaseBase(object):
tenant_id=r['router']['tenant_id'],
device_owner=constants.DEVICE_OWNER_ROUTER_GW)
port_list = self.deserialize('json', port_res)
self.assertEqual(len(port_list['ports']), 1)
self.assertEqual(1, len(port_list['ports']))
routes = [{'destination': '135.207.0.0/16',
'nexthop': '10.0.1.3'}]
@ -415,8 +415,7 @@ class ExtraRouteDBTestCaseBase(object):
routes}})
body = self._show('routers', r['router']['id'])
self.assertEqual(body['router']['routes'],
routes)
self.assertEqual(routes, body['router']['routes'])
self._remove_external_gateway_from_router(
r['router']['id'],

View File

@ -114,12 +114,12 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
content_type='application/%s' % self.fmt)
instance.create_router.assert_called_with(mock.ANY,
router=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('router', res)
router = res['router']
self.assertEqual(router['id'], router_id)
self.assertEqual(router['status'], "ACTIVE")
self.assertEqual(router_id, router['id'])
self.assertEqual("ACTIVE", router['status'])
self.assertTrue(router['admin_state_up'])
def test_router_list(self):
@ -138,7 +138,7 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
limit=mock.ANY,
marker=mock.ANY,
page_reverse=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('routers', res)
self.assertEqual(1, len(res['routers']))
@ -160,12 +160,12 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
instance.update_router.assert_called_with(mock.ANY, router_id,
router=update_data)
self.assertEqual(res.status_int, exc.HTTPOk.code)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('router', res)
router = res['router']
self.assertEqual(router['id'], router_id)
self.assertEqual(router['status'], "ACTIVE")
self.assertEqual(router_id, router['id'])
self.assertEqual("ACTIVE", router['status'])
self.assertFalse(router['admin_state_up'])
def test_router_get(self):
@ -182,12 +182,12 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
instance.get_router.assert_called_with(mock.ANY, router_id,
fields=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('router', res)
router = res['router']
self.assertEqual(router['id'], router_id)
self.assertEqual(router['status'], "ACTIVE")
self.assertEqual(router_id, router['id'])
self.assertEqual("ACTIVE", router['status'])
self.assertFalse(router['admin_state_up'])
def test_router_delete(self):
@ -197,7 +197,7 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
instance = self.plugin.return_value
instance.delete_router.assert_called_with(mock.ANY, router_id)
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
def test_router_add_interface(self):
router_id = _uuid()
@ -218,11 +218,11 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
instance.add_router_interface.assert_called_with(mock.ANY, router_id,
interface_data)
self.assertEqual(res.status_int, exc.HTTPOk.code)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('port_id', res)
self.assertEqual(res['port_id'], port_id)
self.assertEqual(res['subnet_id'], subnet_id)
self.assertEqual(port_id, res['port_id'])
self.assertEqual(subnet_id, res['subnet_id'])
# This base plugin class is for tests.
@ -397,10 +397,10 @@ class L3NatTestCaseMixin(object):
req.environ['neutron.context'] = context.Context(
'', tenant_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code, msg)
self.assertEqual(expected_code, res.status_int, msg)
response = self.deserialize(self.fmt, res)
if expected_body:
self.assertEqual(response, expected_body, msg)
self.assertEqual(expected_body, response, msg)
return response
@contextlib.contextmanager
@ -444,12 +444,12 @@ class L3NatTestCaseMixin(object):
http_status=exc.HTTPCreated.code):
res = self._create_floatingip(fmt, network_id, port_id,
fixed_ip, set_context, floating_ip)
self.assertEqual(res.status_int, http_status)
self.assertEqual(http_status, res.status_int)
return self.deserialize(fmt, res)
def _validate_floating_ip(self, fip):
body = self._list('floatingips')
self.assertEqual(len(body['floatingips']), 1)
self.assertEqual(1, len(body['floatingips']))
self.assertEqual(body['floatingips'][0]['id'],
fip['floatingip']['id'])
@ -1771,7 +1771,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
router_id = router['router']['id']
req = self.new_show_request('router', router_id)
res = req.get_response(self._api_for_resource('router'))
self.assertEqual(res.status_int, 404)
self.assertEqual(404, res.status_int)
def test_router_delete_with_port_existed_returns_409(self):
with self.subnet() as subnet:
@ -1801,7 +1801,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
res = self._create_floatingip(
self.fmt, public_sub['subnet']['network_id'],
port_id=p['port']['id'])
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
self._delete('routers', r['router']['id'],
expected_code=exc.HTTPConflict.code)
@ -1846,7 +1846,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
# post-delete, check that it is really gone
body = self._list('floatingips')
self.assertEqual(len(body['floatingips']), 0)
self.assertEqual(0, len(body['floatingips']))
self._show('floatingips', fip['floatingip']['id'],
expected_code=exc.HTTPNotFound.code)
@ -1872,7 +1872,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.fmt,
public_sub['subnet']['network_id'],
port_id=private_port['port']['id'])
self.assertEqual(res.status_int, 400)
self.assertEqual(400, res.status_int)
for p in self._list('ports')['ports']:
if (p['device_owner'] ==
l3_constants.DEVICE_OWNER_FLOATINGIP):
@ -1939,15 +1939,15 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
body = self._show('floatingips', fip['floatingip']['id'])
self.assertIsNone(body['floatingip']['port_id'])
self.assertIsNone(body['floatingip']['fixed_ip_address'])
self.assertEqual(body['floatingip']['status'], expected_status)
self.assertEqual(expected_status, body['floatingip']['status'])
port_id = p['port']['id']
ip_address = p['port']['fixed_ips'][0]['ip_address']
body = self._update('floatingips', fip['floatingip']['id'],
{'floatingip': {'port_id': port_id}})
self.assertEqual(body['floatingip']['port_id'], port_id)
self.assertEqual(body['floatingip']['fixed_ip_address'],
ip_address)
self.assertEqual(port_id, body['floatingip']['port_id'])
self.assertEqual(ip_address,
body['floatingip']['fixed_ip_address'])
def test_floatingip_create_different_fixed_ip_same_port(self):
'''This tests that it is possible to delete a port that has
@ -2150,11 +2150,11 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
body = self._update(
'floatingips', fip['floatingip']['id'],
{'floatingip': {'port_id': port_id}})
self.assertEqual(body['floatingip']['port_id'],
port_id)
self.assertEqual(port_id,
body['floatingip']['port_id'])
self.assertEqual(
body['floatingip']['fixed_ip_address'],
ip_address)
ip_address,
body['floatingip']['fixed_ip_address'])
return body['floatingip']['router_id']
fip1_r1_res = associate_and_assert(fip1, p1)
@ -2196,7 +2196,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.fmt,
fip1['floatingip']['floating_network_id'],
fip1['floatingip']['port_id'])
self.assertEqual(res.status_int, exc.HTTPConflict.code)
self.assertEqual(exc.HTTPConflict.code, res.status_int)
def test_floating_ip_direct_port_delete_returns_409(self):
found = False
@ -2229,8 +2229,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
res = self._create_floatingip(
self.fmt, public_sub['subnet']['network_id'],
port_id=p['port']['id'])
self.assertEqual(res.status_int,
exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_floatingip_with_invalid_create_port(self):
self._test_floatingip_with_invalid_create_port(
@ -2245,7 +2244,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
public_sub['subnet']['network_id'],
subnet_id=public_sub['subnet']['id'],
set_context=True)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
def test_create_floatingip_with_multisubnet_id(self):
with self.network() as network:
@ -2280,7 +2279,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.fmt,
subnet1['subnet']['network_id'],
subnet_id=subnet2['subnet']['id'])
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_create_floatingip_no_ext_gateway_return_404(self):
with self.subnet() as public_sub:
@ -2292,7 +2291,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
public_sub['subnet']['network_id'],
port_id=private_port['port']['id'])
# this should be some kind of error
self.assertEqual(res.status_int, exc.HTTPNotFound.code)
self.assertEqual(exc.HTTPNotFound.code, res.status_int)
def test_create_floating_non_ext_network_returns_400(self):
with self.subnet() as public_sub:
@ -2303,7 +2302,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
res = self._create_floatingip(
self.fmt,
public_sub['subnet']['network_id'])
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_create_floatingip_no_public_subnet_returns_400(self):
with self.network() as public_network:
@ -2319,25 +2318,25 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.fmt,
public_network['network']['id'],
port_id=private_port['port']['id'])
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_create_floatingip_invalid_floating_network_id_returns_400(self):
# API-level test - no need to create all objects for l3 plugin
res = self._create_floatingip(self.fmt, 'iamnotanuuid',
uuidutils.generate_uuid(), '192.168.0.1')
self.assertEqual(res.status_int, 400)
self.assertEqual(400, res.status_int)
def test_create_floatingip_invalid_floating_port_id_returns_400(self):
# API-level test - no need to create all objects for l3 plugin
res = self._create_floatingip(self.fmt, uuidutils.generate_uuid(),
'iamnotanuuid', '192.168.0.1')
self.assertEqual(res.status_int, 400)
self.assertEqual(400, res.status_int)
def test_create_floatingip_invalid_fixed_ip_address_returns_400(self):
# API-level test - no need to create all objects for l3 plugin
res = self._create_floatingip(self.fmt, uuidutils.generate_uuid(),
uuidutils.generate_uuid(), 'iamnotnanip')
self.assertEqual(res.status_int, 400)
self.assertEqual(400, res.status_int)
def test_floatingip_list_with_sort(self):
with self.subnet(cidr="10.0.0.0/24") as s1,\
@ -2360,9 +2359,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
port_id = fip['floatingip']['port_id']
res = self._list('floatingips',
query_params="port_id=%s" % port_id)
self.assertEqual(len(res['floatingips']), 1)
self.assertEqual(1, len(res['floatingips']))
res = self._list('floatingips', query_params="port_id=aaa")
self.assertEqual(len(res['floatingips']), 0)
self.assertEqual(0, len(res['floatingips']))
def test_floatingip_list_with_pagination(self):
with self.subnet(cidr="10.0.0.0/24") as s1,\
@ -2547,8 +2546,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self._set_net_external(network_id)
fp = self._make_floatingip(self.fmt, network_id,
floating_ip='10.0.0.10')
self.assertEqual(fp['floatingip']['floating_ip_address'],
'10.0.0.10')
self.assertEqual('10.0.0.10',
fp['floatingip']['floating_ip_address'])
def test_create_floatingip_with_specific_ip_out_of_allocation(self):
with self.subnet(cidr='10.0.0.0/24',
@ -2559,8 +2558,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self._set_net_external(network_id)
fp = self._make_floatingip(self.fmt, network_id,
floating_ip='10.0.0.30')
self.assertEqual(fp['floatingip']['floating_ip_address'],
'10.0.0.30')
self.assertEqual('10.0.0.30',
fp['floatingip']['floating_ip_address'])
def test_create_floatingip_with_specific_ip_non_admin(self):
ctx = context.Context('user_id', 'tenant_id')
@ -2601,7 +2600,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
'tenant_id': 'foo',
'admin_state_up': True}}
result = plugin.create_router(context.Context('', 'foo'), router_req)
self.assertEqual(result['id'], router_req['router']['id'])
self.assertEqual(router_req['router']['id'], result['id'])
def test_create_floatingip_ipv6_only_network_returns_400(self):
with self.subnet(cidr="2001:db8::/48", ip_version=6) as public_sub:
@ -2609,7 +2608,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
res = self._create_floatingip(
self.fmt,
public_sub['subnet']['network_id'])
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self):
with self.network() as n,\
@ -2617,8 +2616,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.subnet(cidr="192.168.1.0/24", ip_version=4, network=n):
self._set_net_external(n['network']['id'])
fip = self._make_floatingip(self.fmt, n['network']['id'])
self.assertEqual(fip['floatingip']['floating_ip_address'],
'192.168.1.2')
self.assertEqual('192.168.1.2',
fip['floatingip']['floating_ip_address'])
def test_create_floatingip_with_assoc_to_ipv6_subnet(self):
with self.subnet() as public_sub:
@ -2630,14 +2629,14 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.fmt,
public_sub['subnet']['network_id'],
port_id=private_port['port']['id'])
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
def test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port(self):
with self.network() as n,\
self.subnet(cidr='10.0.0.0/24', network=n) as s4,\
self.subnet(cidr='2001:db8::/64', ip_version=6, network=n),\
self.port(subnet=s4) as p:
self.assertEqual(len(p['port']['fixed_ips']), 2)
self.assertEqual(2, len(p['port']['fixed_ips']))
ipv4_address = next(i['ip_address'] for i in
p['port']['fixed_ips'] if
netaddr.IPAddress(i['ip_address']).version == 4)
@ -2646,7 +2645,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
ipv4_address)
floating_ip = netaddr.IPAddress(
fip['floatingip']['floating_ip_address'])
self.assertEqual(floating_ip.version, 4)
self.assertEqual(4, floating_ip.version)
def test_update_subnet_gateway_for_external_net(self):
"""Test to make sure notification to routers occurs when the gateway
@ -2678,8 +2677,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
subnet['subnet']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['subnet']['gateway_ip'],
data['subnet']['gateway_ip'])
self.assertEqual(data['subnet']['gateway_ip'],
res['subnet']['gateway_ip'])
chk_method.assert_called_with(mock.ANY,
['fake_device'], None)
@ -2913,7 +2912,7 @@ class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
service_constants.L3_ROUTER_NAT)
agents = plugin.list_l3_agents_hosting_router(
self.adminContext, router_id)['agents']
self.assertEqual(len(agents), 1)
self.assertEqual(1, len(agents))
self.assertEqual(agents[0]['host'], agent_host)
def test_update_gateway_agent_exists_supporting_network(self):

View File

@ -469,9 +469,9 @@ class ExtGwModeIntTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
return
body = self._show('routers', r['router']['id'])
res_gw_info = body['router']['external_gateway_info']
self.assertEqual(res_gw_info['network_id'], ext_net_id)
self.assertEqual(res_gw_info['enable_snat'],
snat_expected_value)
self.assertEqual(ext_net_id, res_gw_info['network_id'])
self.assertEqual(snat_expected_value,
res_gw_info['enable_snat'])
finally:
self._remove_external_gateway_from_router(
r['router']['id'], ext_net_id)

View File

@ -61,8 +61,8 @@ class NetmtuExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
self.assertEqual(1, len(res['networks']))
self.assertEqual(res['networks'][0]['name'],
net1['network']['name'])
self.assertEqual(res['networks'][0].get('mtu'),
constants.DEFAULT_NETWORK_MTU)
self.assertEqual(constants.DEFAULT_NETWORK_MTU,
res['networks'][0].get('mtu'))
def test_show_network_mtu(self):
with self.network(name='net1') as net:
@ -70,5 +70,5 @@ class NetmtuExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['network']['name'],
net['network']['name'])
self.assertEqual(res['network']['mtu'],
constants.DEFAULT_NETWORK_MTU)
self.assertEqual(constants.DEFAULT_NETWORK_MTU,
res['network']['mtu'])

View File

@ -251,7 +251,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
'port_security_enabled'),
security_groups=[security_group_id],
port_security_enabled=False)
self.assertEqual(res.status_int, 400)
self.assertEqual(400, res.status_int)
def test_create_port_with_default_security_group(self):
if self._skip_security_group:
@ -261,7 +261,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
res = self._create_port('json', net['network']['id'])
port = self.deserialize('json', res)
self.assertTrue(port['port'][psec.PORTSECURITY])
self.assertEqual(len(port['port'][ext_sg.SECURITYGROUPS]), 1)
self.assertEqual(1, len(port['port'][ext_sg.SECURITYGROUPS]))
self._delete('ports', port['port']['id'])
def test_create_port_with_security_group_and_net_sec_false(self):
@ -312,7 +312,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
req = self.new_update_request('ports', update_port,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 409)
self.assertEqual(409, res.status_int)
# remove security group on port
update_port = {'port': {ext_sg.SECURITYGROUPS: None}}
req = self.new_update_request('ports', update_port,
@ -339,7 +339,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
port['port']['id'])
port = self.deserialize('json', req.get_response(self.api))
self.assertFalse(port['port'][psec.PORTSECURITY])
self.assertEqual(len(port['port'][ext_sg.SECURITYGROUPS]), 0)
self.assertEqual(0, len(port['port'][ext_sg.SECURITYGROUPS]))
self._delete('ports', port['port']['id'])
def test_update_port_remove_port_security_security_group_read(self):
@ -369,7 +369,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
port = self.deserialize('json', req.get_response(self.api))
self.assertTrue(port['port'][psec.PORTSECURITY])
self.assertEqual(len(port['port'][ext_sg.SECURITYGROUPS]), 1)
self.assertEqual(1, len(port['port'][ext_sg.SECURITYGROUPS]))
self._delete('ports', port['port']['id'])
def test_create_port_security_off_shared_network(self):
@ -381,7 +381,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
tenant_id='not_network_owner',
set_context=True)
self.deserialize('json', res)
self.assertEqual(res.status_int, 403)
self.assertEqual(403, res.status_int)
def test_update_port_security_off_shared_network(self):
with self.network(shared=True) as net:
@ -398,4 +398,4 @@ class TestPortSecurity(PortSecurityDBTestCase):
req.environ['neutron.context'] = context.Context(
'', 'not_network_owner')
res = req.get_response(self.api)
self.assertEqual(res.status_int, exc.HTTPForbidden.code)
self.assertEqual(exc.HTTPForbidden.code, res.status_int)

View File

@ -134,7 +134,7 @@ class ProvidernetExtensionTestCase(testlib_api.WebTestCase):
'shared': False})
instance.create_network.assert_called_with(mock.ANY,
network=exp_input)
self.assertEqual(res.status_int, web_exc.HTTPCreated.code)
self.assertEqual(web_exc.HTTPCreated.code, res.status_int)
def test_network_create_with_bad_provider_attrs_400(self):
ctx = context.get_admin_context()
@ -153,16 +153,16 @@ class ProvidernetExtensionTestCase(testlib_api.WebTestCase):
instance.update_network.assert_called_with(mock.ANY,
net_id,
network=exp_input)
self.assertEqual(res.status_int, web_exc.HTTPOk.code)
self.assertEqual(web_exc.HTTPOk.code, res.status_int)
def test_network_create_with_provider_attrs_noadmin_returns_403(self):
tenant_id = 'no_admin'
ctx = context.Context('', tenant_id, is_admin=False)
res, _1 = self._post_network_with_provider_attrs(ctx, True)
self.assertEqual(res.status_int, web_exc.HTTPForbidden.code)
self.assertEqual(web_exc.HTTPForbidden.code, res.status_int)
def test_network_update_with_provider_attrs_noadmin_returns_403(self):
tenant_id = 'no_admin'
ctx = context.Context('', tenant_id, is_admin=False)
res, _1, _2 = self._put_network_with_provider_attrs(ctx, True)
self.assertEqual(res.status_int, web_exc.HTTPForbidden.code)
self.assertEqual(web_exc.HTTPForbidden.code, res.status_int)

View File

@ -75,7 +75,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
prefix_set = netaddr.IPSet(iterable=prefix_list)
allocated_set = netaddr.IPSet(iterable=[detail.subnet_cidr])
self.assertTrue(allocated_set.issubset(prefix_set))
self.assertEqual(detail.prefixlen, 21)
self.assertEqual(21, detail.prefixlen)
def test_allocate_specific_subnet(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
@ -90,8 +90,8 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
res = sa.allocate_subnet(req)
detail = res.get_details()
sp = self._get_subnetpool(self.ctx, self.plugin, sp['id'])
self.assertEqual(str(detail.subnet_cidr), '10.1.2.0/24')
self.assertEqual(detail.prefixlen, 24)
self.assertEqual('10.1.2.0/24', str(detail.subnet_cidr))
self.assertEqual(24, detail.prefixlen)
def test_insufficient_prefix_space_for_any_allocation(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
@ -146,8 +146,8 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
gateway_ip='10.1.2.254')
res = sa.allocate_subnet(req)
detail = res.get_details()
self.assertEqual(detail.gateway_ip,
netaddr.IPAddress('10.1.2.254'))
self.assertEqual(netaddr.IPAddress('10.1.2.254'),
detail.gateway_ip)
def test_allocate_specific_ipv6_subnet_specific_gateway(self):
# Same scenario as described in bug #1466322
@ -163,8 +163,8 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
'2210::ffff:ffff:ffff:ffff')
res = sa.allocate_subnet(req)
detail = res.get_details()
self.assertEqual(detail.gateway_ip,
netaddr.IPAddress('2210::ffff:ffff:ffff:ffff'))
self.assertEqual(netaddr.IPAddress('2210::ffff:ffff:ffff:ffff'),
detail.gateway_ip)
def test__allocation_value_for_tenant_no_allocations(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
@ -172,7 +172,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
21, 4)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
value = sa._allocations_used_by_tenant(32)
self.assertEqual(value, 0)
self.assertEqual(0, value)
def test_subnetpool_default_quota_exceeded(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',

View File

@ -649,7 +649,7 @@ class TestLinuxBridgeManager(base.BaseTestCase):
with mock.patch.object(
bridge_lib.BridgeDevice, 'get_interfaces') as get_ifs_fn:
get_ifs_fn.return_value = ['tap2101', 'eth0.100', 'vxlan-1000']
self.assertEqual(self.lbm.get_tap_devices_count('br0'), 1)
self.assertEqual(1, self.lbm.get_tap_devices_count('br0'))
def test_get_interface_details(self):
with mock.patch.object(ip_lib.IpAddrCommand, 'list') as list_fn,\
@ -683,9 +683,8 @@ class TestLinuxBridgeManager(base.BaseTestCase):
list_fn.return_value = ipdict
with mock.patch.object(self.lbm, 'ensure_bridge') as ens:
self.assertEqual(
self.lbm.ensure_flat_bridge("123", None, "eth0"),
"eth0"
)
"eth0",
self.lbm.ensure_flat_bridge("123", None, "eth0"))
self.assertTrue(list_fn.called)
self.assertTrue(getgw_fn.called)
ens.assert_called_once_with("brq123", "eth0",
@ -707,19 +706,19 @@ class TestLinuxBridgeManager(base.BaseTestCase):
'get_interface_details') as get_int_det_fn:
ens_vl_fn.return_value = "eth0.1"
get_int_det_fn.return_value = (None, None)
self.assertEqual(self.lbm.ensure_vlan_bridge("123",
self.assertEqual("eth0.1",
self.lbm.ensure_vlan_bridge("123",
None,
"eth0",
"1"),
"eth0.1")
"1"))
ens.assert_called_with("brq123", "eth0.1", None, None)
get_int_det_fn.return_value = ("ips", "gateway")
self.assertEqual(self.lbm.ensure_vlan_bridge("123",
self.assertEqual("eth0.1",
self.lbm.ensure_vlan_bridge("123",
None,
"eth0",
"1"),
"eth0.1")
"1"))
ens.assert_called_with("brq123", "eth0.1", "ips", "gateway")
def test_ensure_vlan_bridge_with_existed_brq(self):
@ -748,16 +747,16 @@ class TestLinuxBridgeManager(base.BaseTestCase):
def test_ensure_vlan(self):
with mock.patch.object(ip_lib, 'device_exists') as de_fn:
de_fn.return_value = True
self.assertEqual(self.lbm.ensure_vlan("eth0", "1"), "eth0.1")
self.assertEqual("eth0.1", self.lbm.ensure_vlan("eth0", "1"))
de_fn.return_value = False
with mock.patch.object(utils, 'execute') as exec_fn:
exec_fn.return_value = False
self.assertEqual(self.lbm.ensure_vlan("eth0", "1"), "eth0.1")
self.assertEqual("eth0.1", self.lbm.ensure_vlan("eth0", "1"))
# FIXME(kevinbenton): validate the params to the exec_fn calls
self.assertEqual(exec_fn.call_count, 2)
self.assertEqual(2, exec_fn.call_count)
exec_fn.return_value = True
self.assertIsNone(self.lbm.ensure_vlan("eth0", "1"))
self.assertEqual(exec_fn.call_count, 3)
self.assertEqual(3, exec_fn.call_count)
def test_ensure_vxlan(self):
seg_id = "12345678"
@ -765,19 +764,19 @@ class TestLinuxBridgeManager(base.BaseTestCase):
self.lbm.vxlan_mode = lconst.VXLAN_MCAST
with mock.patch.object(ip_lib, 'device_exists') as de_fn:
de_fn.return_value = True
self.assertEqual(self.lbm.ensure_vxlan(seg_id), "vxlan-" + seg_id)
self.assertEqual("vxlan-" + seg_id, self.lbm.ensure_vxlan(seg_id))
de_fn.return_value = False
with mock.patch.object(self.lbm.ip,
'add_vxlan') as add_vxlan_fn:
add_vxlan_fn.return_value = FakeIpDevice()
self.assertEqual(self.lbm.ensure_vxlan(seg_id),
"vxlan-" + seg_id)
self.assertEqual("vxlan-" + seg_id,
self.lbm.ensure_vxlan(seg_id))
add_vxlan_fn.assert_called_with("vxlan-" + seg_id, seg_id,
group="224.0.0.1",
dev=self.lbm.local_int)
cfg.CONF.set_override('l2_population', 'True', 'VXLAN')
self.assertEqual(self.lbm.ensure_vxlan(seg_id),
"vxlan-" + seg_id)
self.assertEqual("vxlan-" + seg_id,
self.lbm.ensure_vxlan(seg_id))
add_vxlan_fn.assert_called_with("vxlan-" + seg_id, seg_id,
group="224.0.0.1",
dev=self.lbm.local_int,
@ -835,7 +834,7 @@ class TestLinuxBridgeManager(base.BaseTestCase):
bridge_device.disable_stp.return_value = False
bridge_device.disable_ipv6.return_value = False
bridge_device.link.set_up.return_value = False
self.assertEqual(self.lbm.ensure_bridge("br0", None), "br0")
self.assertEqual("br0", self.lbm.ensure_bridge("br0", None))
bridge_device.owns_interface.return_value = False
self.lbm.ensure_bridge("br0", "eth0")

View File

@ -1064,10 +1064,10 @@ class TestOvsNeutronAgent(object):
'int-br-eth'),
]
parent.assert_has_calls(expected_calls)
self.assertEqual(self.agent.int_ofports["physnet1"],
"int_ofport")
self.assertEqual(self.agent.phys_ofports["physnet1"],
"phy_ofport")
self.assertEqual("int_ofport",
self.agent.int_ofports["physnet1"])
self.assertEqual("phy_ofport",
self.agent.phys_ofports["physnet1"])
def test_setup_physical_bridges_using_veth_interconnection(self):
self.agent.use_veth_interconnection = True
@ -1101,10 +1101,10 @@ class TestOvsNeutronAgent(object):
mock.call.add_veth('int-br-eth',
'phy-br-eth')]
parent.assert_has_calls(expected_calls, any_order=False)
self.assertEqual(self.agent.int_ofports["physnet1"],
"int_veth_ofport")
self.assertEqual(self.agent.phys_ofports["physnet1"],
"phys_veth_ofport")
self.assertEqual("int_veth_ofport",
self.agent.int_ofports["physnet1"])
self.assertEqual("phys_veth_ofport",
self.agent.phys_ofports["physnet1"])
def test_setup_physical_bridges_change_from_veth_to_patch_conf(self):
with mock.patch.object(sys, "exit"),\
@ -1142,10 +1142,10 @@ class TestOvsNeutronAgent(object):
'int-br-eth'),
]
parent.assert_has_calls(expected_calls)
self.assertEqual(self.agent.int_ofports["physnet1"],
"int_ofport")
self.assertEqual(self.agent.phys_ofports["physnet1"],
"phy_ofport")
self.assertEqual("int_ofport",
self.agent.int_ofports["physnet1"])
self.assertEqual("phy_ofport",
self.agent.phys_ofports["physnet1"])
def test_setup_tunnel_br(self):
self.tun_br = mock.Mock()
@ -1220,11 +1220,11 @@ class TestOvsNeutronAgent(object):
lvm.vif_ports = {}
self.agent.port_unbound("vif1", "netuid12345")
self.assertEqual(reclvl_fn.call_count, 2)
self.assertEqual(2, reclvl_fn.call_count)
lvm.vif_ports = {"vif1": mock.Mock()}
self.agent.port_unbound("vif3", "netuid12345")
self.assertEqual(reclvl_fn.call_count, 2)
self.assertEqual(2, reclvl_fn.call_count)
def _prepare_l2_pop_ofports(self):
lvm1 = mock.Mock()
@ -1431,7 +1431,7 @@ class TestOvsNeutronAgent(object):
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
self.assertEqual(0, ofport)
def test_setup_tunnel_port_error_negative_df_disabled(self):
with mock.patch.object(
@ -1450,7 +1450,7 @@ class TestOvsNeutronAgent(object):
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
self.assertEqual(0, ofport)
def test_setup_tunnel_port_error_negative_tunnel_csum(self):
with mock.patch.object(
@ -1469,7 +1469,7 @@ class TestOvsNeutronAgent(object):
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
self.assertEqual(0, ofport)
def test_tunnel_sync_with_ml2_plugin(self):
fake_tunnel_details = {'tunnels': [{'ip_address': '100.101.31.15'}]}
@ -1771,7 +1771,7 @@ class TestOvsNeutronAgent(object):
[mock.call(port=1)],
self.agent.int_br.delete_arp_spoofing_protection.mock_calls)
# make sure the state was updated with the new map
self.assertEqual(self.agent.vifname_to_ofport_map, newmap)
self.assertEqual(newmap, self.agent.vifname_to_ofport_map)
def test_update_stale_ofport_rules_treats_moved(self):
self.agent.prevent_arp_spoofing = True

View File

@ -320,7 +320,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
self.ctx.session, port_id, 'foo_host', router.id)
expected = (self.ctx.session.query(models.DVRPortBinding).
filter_by(port_id=port_id).one())
self.assertEqual(expected.port_id, port_id)
self.assertEqual(port_id, expected.port_id)
def test_ensure_dvr_port_binding_multiple_bindings(self):
network_id = 'foo_network_id'

View File

@ -60,7 +60,7 @@ class PSExtDriverTestCase(test_plugin.Ml2PluginV2TestCase,
'port_security_enabled'),
security_groups=[],
port_security_enabled=False)
self.assertEqual(res.status_int, 201)
self.assertEqual(201, res.status_int)
port = self.deserialize('json', res)
self.assertFalse(port['port'][psec.PORTSECURITY])
self.assertEqual([], port['port']['security_groups'])

View File

@ -393,7 +393,7 @@ class TestMl2NetworksWithVlanTransparencyAndMTU(TestMl2NetworksV2):
res = network_req.get_response(self.api)
self.assertEqual(201, res.status_int)
network = self.deserialize(self.fmt, res)['network']
self.assertEqual(network['mtu'], 1000)
self.assertEqual(1000, network['mtu'])
self.assertIn('vlan_transparent', network)

View File

@ -47,16 +47,16 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
self.plugin.start_rpc_listeners()
def _check_response(self, port, vif_type, has_port_filter, bound, status):
self.assertEqual(port[portbindings.VIF_TYPE], vif_type)
self.assertEqual(vif_type, port[portbindings.VIF_TYPE])
vif_details = port[portbindings.VIF_DETAILS]
port_status = port['status']
if bound:
# TODO(rkukura): Replace with new VIF security details
self.assertEqual(vif_details[portbindings.CAP_PORT_FILTER],
has_port_filter)
self.assertEqual(port_status, status or 'DOWN')
self.assertEqual(has_port_filter,
vif_details[portbindings.CAP_PORT_FILTER])
self.assertEqual(status or 'DOWN', port_status)
else:
self.assertEqual(port_status, 'DOWN')
self.assertEqual('DOWN', port_status)
def _test_port_binding(self, host, vif_type, has_port_filter, bound,
status=None, network_type='local'):
@ -72,7 +72,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
details = self.plugin.endpoints[0].get_device_details(
neutron_context, agent_id="theAgentId", device=port_id)
if bound:
self.assertEqual(details['network_type'], network_type)
self.assertEqual(network_type, details['network_type'])
self.assertEqual(mac_address, details['mac_address'])
else:
self.assertNotIn('network_type', details)
@ -152,10 +152,10 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
neutron_context=neutron_context)
port_data = updated_port['port']
if new_host is not None:
self.assertEqual(port_data[portbindings.HOST_ID],
new_host)
self.assertEqual(new_host,
port_data[portbindings.HOST_ID])
else:
self.assertEqual(port_data[portbindings.HOST_ID], host)
self.assertEqual(host, port_data[portbindings.HOST_ID])
if new_host is not None and new_host != host:
notify_mock.assert_called_once_with(mock.ANY)
else:

View File

@ -608,9 +608,9 @@ class L3SchedulerTestBaseMixin(object):
self, router, agent_list, exp_host, count=1):
candidates = self.get_l3_agent_candidates(self.adminContext,
router, agent_list)
self.assertEqual(len(candidates), count)
self.assertEqual(count, len(candidates))
if count:
self.assertEqual(candidates[0]['host'], exp_host)
self.assertEqual(exp_host, candidates[0]['host'])
def test_get_l3_agent_candidates_legacy(self):
self._register_l3_dvr_agents()
@ -768,8 +768,8 @@ class L3AgentChanceSchedulerTestCase(L3SchedulerTestCaseMixin,
self.adminContext, [r1['router']['id']],
admin_state_up=True)
self.assertEqual(len(agents), 1)
self.assertEqual(random_mock.call_count, 1)
self.assertEqual(1, len(agents))
self.assertEqual(1, random_mock.call_count)
with self.router_with_ext_gw(name='r2', subnet=subnet) as r2:
agents = self.get_l3_agents_hosting_routers(
@ -777,7 +777,7 @@ class L3AgentChanceSchedulerTestCase(L3SchedulerTestCaseMixin,
admin_state_up=True)
self.assertEqual(len(agents), 1)
self.assertEqual(random_mock.call_count, 2)
self.assertEqual(2, random_mock.call_count)
random_patch.stop()
@ -828,7 +828,7 @@ class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCaseMixin,
agents = self.get_l3_agents_hosting_routers(
self.adminContext, [r1['router']['id']],
admin_state_up=True)
self.assertEqual(len(agents), 1)
self.assertEqual(1, len(agents))
agent_id1 = agents[0]['id']
@ -836,7 +836,7 @@ class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCaseMixin,
agents = self.get_l3_agents_hosting_routers(
self.adminContext, [r2['router']['id']],
admin_state_up=True)
self.assertEqual(len(agents), 1)
self.assertEqual(1, len(agents))
agent_id2 = agents[0]['id']
@ -852,7 +852,7 @@ class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCaseMixin,
agents = self.get_l3_agents_hosting_routers(
self.adminContext, [r3['router']['id']],
admin_state_up=True)
self.assertEqual(len(agents), 1)
self.assertEqual(1, len(agents))
agent_id3 = agents[0]['id']
@ -1186,7 +1186,7 @@ class L3DvrSchedulerTestCase(testlib_api.SqlTestCase):
'.get_ports', return_value=[dvr_port]):
router_id = self.dut.get_dvr_routers_by_subnet_ids(
self.adminContext, [subnet_id])
self.assertEqual(router_id.pop(), r1['id'])
self.assertEqual(r1['id'], router_id.pop())
def test_get_subnet_ids_on_router(self):
dvr_port = {
@ -1228,7 +1228,7 @@ class L3DvrSchedulerTestCase(testlib_api.SqlTestCase):
return_value=[dvr_port]):
sub_ids = self.dut.get_subnet_ids_on_router(self.adminContext,
r1['id'])
self.assertEqual(len(sub_ids), 0)
self.assertEqual(0, len(sub_ids))
def _prepare_schedule_snat_tests(self):
agent = agents_db.Agent()