Fix params order in assertEqual
Fix params order to correspond to real signature: assertEqual(expected, actual) Change-Id: I722b998f6eae47076f3d10213073296a0a9a2081 Closes-Bug: #1277104
This commit is contained in:
parent
b1999202b8
commit
6b4be76e14
|
@ -70,23 +70,23 @@ class ResourceIndexTestCase(base.BaseTestCase):
|
|||
res = index.get('')
|
||||
|
||||
self.assertIn('resources', res.json)
|
||||
self.assertEqual(len(res.json['resources']), 1)
|
||||
self.assertEqual(1, len(res.json['resources']))
|
||||
|
||||
resource = res.json['resources'][0]
|
||||
self.assertIn('collection', resource)
|
||||
self.assertEqual(resource['collection'], 'bar')
|
||||
self.assertEqual('bar', resource['collection'])
|
||||
|
||||
self.assertIn('name', resource)
|
||||
self.assertEqual(resource['name'], 'foo')
|
||||
self.assertEqual('foo', resource['name'])
|
||||
|
||||
self.assertIn('links', resource)
|
||||
self.assertEqual(len(resource['links']), 1)
|
||||
self.assertEqual(1, len(resource['links']))
|
||||
|
||||
link = resource['links'][0]
|
||||
self.assertIn('href', link)
|
||||
self.assertEqual(link['href'], 'http://localhost/bar')
|
||||
self.assertIn('rel', link)
|
||||
self.assertEqual(link['rel'], 'self')
|
||||
self.assertEqual('self', link['rel'])
|
||||
|
||||
|
||||
class APIv2TestBase(base.BaseTestCase):
|
||||
|
@ -326,7 +326,7 @@ class APIv2TestCase(APIv2TestBase):
|
|||
|
||||
res = self.api.get(_get_path('networks'), {'limit': -1},
|
||||
expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_limit_with_non_integer(self):
|
||||
instance = self.plugin.return_value
|
||||
|
@ -334,7 +334,7 @@ class APIv2TestCase(APIv2TestBase):
|
|||
|
||||
res = self.api.get(_get_path('networks'),
|
||||
{'limit': 'abc'}, expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_limit_with_infinite_pagination_max_limit(self):
|
||||
instance = self.plugin.return_value
|
||||
|
@ -436,7 +436,7 @@ class APIv2TestCase(APIv2TestBase):
|
|||
|
||||
res = self.api.get(_get_path('networks'), {'sort_key': ['name']},
|
||||
expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_sort_with_invalid_attribute(self):
|
||||
instance = self.plugin.return_value
|
||||
|
@ -446,7 +446,7 @@ class APIv2TestCase(APIv2TestBase):
|
|||
{'sort_key': 'abc',
|
||||
'sort_dir': 'asc'},
|
||||
expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_sort_with_invalid_dirs(self):
|
||||
instance = self.plugin.return_value
|
||||
|
@ -456,7 +456,7 @@ class APIv2TestCase(APIv2TestBase):
|
|||
{'sort_key': 'name',
|
||||
'sort_dir': 'abc'},
|
||||
expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_emulated_sort(self):
|
||||
instance = self.plugin.return_value
|
||||
|
@ -546,7 +546,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.assertIn('networks', res)
|
||||
if not req_tenant_id or req_tenant_id == real_tenant_id:
|
||||
# expect full list returned
|
||||
self.assertEqual(len(res['networks']), 1)
|
||||
self.assertEqual(1, len(res['networks']))
|
||||
output_dict = res['networks'][0]
|
||||
input_dict['shared'] = False
|
||||
self.assertEqual(len(input_dict), len(output_dict))
|
||||
|
@ -554,7 +554,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.assertEqual(v, output_dict[k])
|
||||
else:
|
||||
# expect no results
|
||||
self.assertEqual(len(res['networks']), 0)
|
||||
self.assertEqual(0, len(res['networks']))
|
||||
|
||||
def test_list_noauth(self):
|
||||
self._test_list(None, _uuid())
|
||||
|
@ -594,7 +594,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
res = self.api.get(_get_path('networks'),
|
||||
params=params).json
|
||||
|
||||
self.assertEqual(len(res['networks']), 2)
|
||||
self.assertEqual(2, len(res['networks']))
|
||||
self.assertEqual(sorted([id1, id2]),
|
||||
sorted([res['networks'][0]['id'],
|
||||
res['networks'][1]['id']]))
|
||||
|
@ -607,19 +607,19 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
next_links.append(r)
|
||||
if r['rel'] == 'previous':
|
||||
previous_links.append(r)
|
||||
self.assertEqual(len(next_links), 1)
|
||||
self.assertEqual(len(previous_links), 1)
|
||||
self.assertEqual(1, len(next_links))
|
||||
self.assertEqual(1, len(previous_links))
|
||||
|
||||
url = urlparse.urlparse(next_links[0]['href'])
|
||||
self.assertEqual(url.path, _get_path('networks'))
|
||||
params['marker'] = [id2]
|
||||
self.assertEqual(urlparse.parse_qs(url.query), params)
|
||||
self.assertEqual(params, urlparse.parse_qs(url.query))
|
||||
|
||||
url = urlparse.urlparse(previous_links[0]['href'])
|
||||
self.assertEqual(url.path, _get_path('networks'))
|
||||
params['marker'] = [id1]
|
||||
params['page_reverse'] = ['True']
|
||||
self.assertEqual(urlparse.parse_qs(url.query), params)
|
||||
self.assertEqual(params, urlparse.parse_qs(url.query))
|
||||
|
||||
def test_list_pagination_with_last_page(self):
|
||||
id = str(_uuid())
|
||||
|
@ -638,7 +638,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
res = self.api.get(_get_path('networks'),
|
||||
params=params).json
|
||||
|
||||
self.assertEqual(len(res['networks']), 1)
|
||||
self.assertEqual(1, len(res['networks']))
|
||||
self.assertEqual(id, res['networks'][0]['id'])
|
||||
|
||||
self.assertIn('networks_links', res)
|
||||
|
@ -647,14 +647,14 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.assertNotEqual(r['rel'], 'next')
|
||||
if r['rel'] == 'previous':
|
||||
previous_links.append(r)
|
||||
self.assertEqual(len(previous_links), 1)
|
||||
self.assertEqual(1, len(previous_links))
|
||||
|
||||
url = urlparse.urlparse(previous_links[0]['href'])
|
||||
self.assertEqual(url.path, _get_path('networks'))
|
||||
expect_params = params.copy()
|
||||
expect_params['marker'] = [id]
|
||||
expect_params['page_reverse'] = ['True']
|
||||
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
|
||||
self.assertEqual(expect_params, urlparse.parse_qs(url.query))
|
||||
|
||||
def test_list_pagination_with_empty_page(self):
|
||||
return_value = []
|
||||
|
@ -673,14 +673,14 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.assertNotEqual(r['rel'], 'next')
|
||||
if r['rel'] == 'previous':
|
||||
previous_links.append(r)
|
||||
self.assertEqual(len(previous_links), 1)
|
||||
self.assertEqual(1, len(previous_links))
|
||||
|
||||
url = urlparse.urlparse(previous_links[0]['href'])
|
||||
self.assertEqual(url.path, _get_path('networks'))
|
||||
expect_params = params.copy()
|
||||
del expect_params['marker']
|
||||
expect_params['page_reverse'] = ['True']
|
||||
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
|
||||
self.assertEqual(expect_params, urlparse.parse_qs(url.query))
|
||||
|
||||
def test_list_pagination_reverse_with_last_page(self):
|
||||
id = str(_uuid())
|
||||
|
@ -709,15 +709,15 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.assertNotEqual(r['rel'], 'previous')
|
||||
if r['rel'] == 'next':
|
||||
next_links.append(r)
|
||||
self.assertEqual(len(next_links), 1)
|
||||
self.assertEqual(1, len(next_links))
|
||||
|
||||
url = urlparse.urlparse(next_links[0]['href'])
|
||||
self.assertEqual(url.path, _get_path('networks'))
|
||||
expected_params = params.copy()
|
||||
del expected_params['page_reverse']
|
||||
expected_params['marker'] = [id]
|
||||
self.assertEqual(urlparse.parse_qs(url.query),
|
||||
expected_params)
|
||||
self.assertEqual(expected_params,
|
||||
urlparse.parse_qs(url.query))
|
||||
|
||||
def test_list_pagination_reverse_with_empty_page(self):
|
||||
return_value = []
|
||||
|
@ -736,14 +736,14 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.assertNotEqual(r['rel'], 'previous')
|
||||
if r['rel'] == 'next':
|
||||
next_links.append(r)
|
||||
self.assertEqual(len(next_links), 1)
|
||||
self.assertEqual(1, len(next_links))
|
||||
|
||||
url = urlparse.urlparse(next_links[0]['href'])
|
||||
self.assertEqual(url.path, _get_path('networks'))
|
||||
expect_params = params.copy()
|
||||
del expect_params['marker']
|
||||
del expect_params['page_reverse']
|
||||
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
|
||||
self.assertEqual(expect_params, urlparse.parse_qs(url.query))
|
||||
|
||||
def test_create(self):
|
||||
net_id = _uuid()
|
||||
|
@ -760,12 +760,12 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
res = self.api.post(_get_path('networks', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/' + self.fmt)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('network', res)
|
||||
net = res['network']
|
||||
self.assertEqual(net['id'], net_id)
|
||||
self.assertEqual(net['status'], "ACTIVE")
|
||||
self.assertEqual(net_id, net['id'])
|
||||
self.assertEqual("ACTIVE", net['status'])
|
||||
|
||||
def test_create_use_defaults(self):
|
||||
net_id = _uuid()
|
||||
|
@ -786,13 +786,13 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
content_type='application/' + self.fmt)
|
||||
instance.create_network.assert_called_with(mock.ANY,
|
||||
network=full_input)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('network', res)
|
||||
net = res['network']
|
||||
self.assertEqual(net['id'], net_id)
|
||||
self.assertEqual(net_id, net['id'])
|
||||
self.assertTrue(net['admin_state_up'])
|
||||
self.assertEqual(net['status'], "ACTIVE")
|
||||
self.assertEqual("ACTIVE", net['status'])
|
||||
|
||||
def test_create_no_keystone_env(self):
|
||||
data = {'name': 'net1'}
|
||||
|
@ -822,7 +822,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
|
||||
instance.create_network.assert_called_with(mock.ANY,
|
||||
network=full_input)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
|
||||
def test_create_bad_keystone_tenant(self):
|
||||
tenant_id = _uuid()
|
||||
|
@ -864,7 +864,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.serialize(data),
|
||||
content_type='application/' + self.fmt,
|
||||
expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_bulk(self):
|
||||
data = {'networks': [{'name': 'net1',
|
||||
|
@ -885,14 +885,14 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
res = self.api.post(_get_path('networks', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/' + self.fmt)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
|
||||
def _test_create_failure_bad_request(self, resource, data, **kwargs):
|
||||
res = self.api.post(_get_path(resource, fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/' + self.fmt,
|
||||
expect_errors=True, **kwargs)
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_bulk_networks_none(self):
|
||||
self._test_create_failure_bad_request('networks', {'networks': None})
|
||||
|
@ -940,12 +940,12 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.serialize(initial_input),
|
||||
content_type='application/' + self.fmt)
|
||||
instance.create_port.assert_called_with(mock.ANY, port=full_input)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('port', res)
|
||||
port = res['port']
|
||||
self.assertEqual(port['network_id'], net_id)
|
||||
self.assertEqual(port['mac_address'], 'ca:fe:de:ad:be:ef')
|
||||
self.assertEqual(net_id, port['network_id'])
|
||||
self.assertEqual('ca:fe:de:ad:be:ef', port['mac_address'])
|
||||
|
||||
def test_create_return_extra_attr(self):
|
||||
net_id = _uuid()
|
||||
|
@ -962,12 +962,12 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
res = self.api.post(_get_path('networks', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/' + self.fmt)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('network', res)
|
||||
net = res['network']
|
||||
self.assertEqual(net['id'], net_id)
|
||||
self.assertEqual(net['status'], "ACTIVE")
|
||||
self.assertEqual(net_id, net['id'])
|
||||
self.assertEqual("ACTIVE", net['status'])
|
||||
self.assertNotIn('v2attrs:something', net)
|
||||
|
||||
def test_fields(self):
|
||||
|
@ -996,7 +996,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
fmt=self.fmt),
|
||||
extra_environ=env,
|
||||
expect_errors=expect_errors)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
self.assertEqual(expected_code, res.status_int)
|
||||
|
||||
def test_delete_noauth(self):
|
||||
self._test_delete(None, _uuid(), exc.HTTPNoContent.code)
|
||||
|
@ -1029,7 +1029,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
fmt=self.fmt),
|
||||
extra_environ=env,
|
||||
expect_errors=expect_errors)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
self.assertEqual(expected_code, res.status_int)
|
||||
return res
|
||||
|
||||
def test_get_noauth(self):
|
||||
|
@ -1103,7 +1103,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.serialize(data),
|
||||
content_type='application/' + self.fmt,
|
||||
expect_errors=True)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
def test_invalid_attribute_field(self):
|
||||
data = {'network': {'invalid_key1': "foo1", 'invalid_key2': "foo2"}}
|
||||
|
@ -1111,7 +1111,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||
self.serialize(data),
|
||||
content_type='application/' + self.fmt,
|
||||
expect_errors=True)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
|
||||
class SubresourceTest(base.BaseTestCase):
|
||||
|
@ -1308,13 +1308,13 @@ class NotificationTest(APIv2TestBase):
|
|||
|
||||
expected_events = ('.'.join([resource, opname, "start"]),
|
||||
'.'.join([resource, opname, "end"]))
|
||||
self.assertEqual(len(fake_notifier.NOTIFICATIONS),
|
||||
len(expected_events))
|
||||
self.assertEqual(len(expected_events),
|
||||
len(fake_notifier.NOTIFICATIONS))
|
||||
for msg, event in zip(fake_notifier.NOTIFICATIONS, expected_events):
|
||||
self.assertEqual('INFO', msg['priority'])
|
||||
self.assertEqual(event, msg['event_type'])
|
||||
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
self.assertEqual(expected_code, res.status_int)
|
||||
|
||||
def test_network_create_notifer(self):
|
||||
self._resource_op_notifier('create', 'network')
|
||||
|
@ -1440,7 +1440,7 @@ class QuotaTest(APIv2TestBase):
|
|||
instance.get_networks_count.return_value = 3
|
||||
res = self.api.post_json(
|
||||
_get_path('networks'), initial_input)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
|
||||
|
||||
class ExtensionTestCase(base.BaseTestCase):
|
||||
|
@ -1500,12 +1500,12 @@ class ExtensionTestCase(base.BaseTestCase):
|
|||
|
||||
instance.create_network.assert_called_with(mock.ANY,
|
||||
network=data)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
self.assertIn('network', res.json)
|
||||
net = res.json['network']
|
||||
self.assertEqual(net['id'], net_id)
|
||||
self.assertEqual(net['status'], "ACTIVE")
|
||||
self.assertEqual(net['v2attrs:something'], "123")
|
||||
self.assertEqual(net_id, net['id'])
|
||||
self.assertEqual("ACTIVE", net['status'])
|
||||
self.assertEqual("123", net['v2attrs:something'])
|
||||
self.assertNotIn('v2attrs:something_else', net)
|
||||
|
||||
|
||||
|
@ -1537,7 +1537,7 @@ class ListArgsTestCase(base.BaseTestCase):
|
|||
request = webob.Request.blank(path)
|
||||
expect_val = ['2', '4']
|
||||
actual_val = api_common.list_args(request, 'fields')
|
||||
self.assertEqual(sorted(actual_val), expect_val)
|
||||
self.assertEqual(expect_val, sorted(actual_val))
|
||||
|
||||
def test_list_args_with_empty(self):
|
||||
path = '/?foo=4&bar=3&baz=2&qux=1'
|
||||
|
@ -1562,7 +1562,7 @@ class FiltersTestCase(base.BaseTestCase):
|
|||
request = webob.Request.blank(path)
|
||||
expect_val = {'foo': ['4'], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
|
||||
actual_val = api_common.get_filters(request, {})
|
||||
self.assertEqual(actual_val, expect_val)
|
||||
self.assertEqual(expect_val, actual_val)
|
||||
|
||||
def test_attr_info_without_conversion(self):
|
||||
path = '/?foo=4&bar=3&baz=2&qux=1'
|
||||
|
@ -1570,7 +1570,7 @@ class FiltersTestCase(base.BaseTestCase):
|
|||
attr_info = {'foo': {'key': 'val'}}
|
||||
expect_val = {'foo': ['4'], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
|
||||
actual_val = api_common.get_filters(request, attr_info)
|
||||
self.assertEqual(actual_val, expect_val)
|
||||
self.assertEqual(expect_val, actual_val)
|
||||
|
||||
def test_attr_info_with_convert_list_to(self):
|
||||
path = '/?foo=key=4&bar=3&foo=key=2&qux=1'
|
||||
|
@ -1590,7 +1590,7 @@ class FiltersTestCase(base.BaseTestCase):
|
|||
attr_info = {'foo': {'convert_to': attributes.convert_to_int}}
|
||||
expect_val = {'foo': [4], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
|
||||
actual_val = api_common.get_filters(request, attr_info)
|
||||
self.assertEqual(actual_val, expect_val)
|
||||
self.assertEqual(expect_val, actual_val)
|
||||
|
||||
|
||||
class CreateResourceTestCase(base.BaseTestCase):
|
||||
|
|
|
@ -40,47 +40,47 @@ class RequestTestCase(base.BaseTestCase):
|
|||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Content-Type"] = "application/json; charset=UTF-8"
|
||||
result = request.get_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_from_accept(self):
|
||||
content_type = 'application/json'
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = content_type
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, content_type)
|
||||
self.assertEqual(content_type, result)
|
||||
|
||||
def test_content_type_from_accept_best(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = "application/json"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = ("application/json; q=0.3, "
|
||||
"application/xml; q=0.9")
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_from_query_extension(self):
|
||||
request = wsgi.Request.blank('/tests/123.json')
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
request = wsgi.Request.blank('/tests/123.invalid')
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_accept_and_query_extension(self):
|
||||
request = wsgi.Request.blank('/tests/123.json')
|
||||
request.headers["Accept"] = "application/xml"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_accept_default(self):
|
||||
request = wsgi.Request.blank('/tests/123.unsupported')
|
||||
request.headers["Accept"] = "application/unsupported1"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_context_with_neutron_context(self):
|
||||
ctxt = context.Context('fake_user', 'fake_tenant')
|
||||
|
@ -108,7 +108,7 @@ class RequestTestCase(base.BaseTestCase):
|
|||
'es', 'zh']
|
||||
request.headers['Accept-Language'] = 'known-language'
|
||||
language = request.best_match_language()
|
||||
self.assertEqual(language, 'known-language')
|
||||
self.assertEqual('known-language', language)
|
||||
|
||||
# If the Accept-Leader is an unknown language, missing or empty,
|
||||
# the best match locale should be None
|
||||
|
@ -147,9 +147,9 @@ class ResourceTestCase(base.BaseTestCase):
|
|||
environ = {'wsgiorg.routing_args': (None, {'action': 'test',
|
||||
'format': 'json'})}
|
||||
res = resource.get('', extra_environ=environ, expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPInternalServerError.code)
|
||||
self.assertEqual(wsgi.JSONDeserializer().deserialize(res.body),
|
||||
expected_res)
|
||||
self.assertEqual(exc.HTTPInternalServerError.code, res.status_int)
|
||||
self.assertEqual(expected_res,
|
||||
wsgi.JSONDeserializer().deserialize(res.body))
|
||||
|
||||
@mock.patch('oslo_i18n.translate')
|
||||
def test_unmapped_neutron_error_localized(self, mock_translation):
|
||||
|
@ -168,7 +168,7 @@ class ResourceTestCase(base.BaseTestCase):
|
|||
'format': 'json'})}
|
||||
|
||||
res = resource.get('', extra_environ=environ, expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPInternalServerError.code)
|
||||
self.assertEqual(exc.HTTPInternalServerError.code, res.status_int)
|
||||
self.assertIn(msg_translation,
|
||||
str(wsgi.JSONDeserializer().deserialize(res.body)))
|
||||
|
||||
|
@ -192,9 +192,9 @@ class ResourceTestCase(base.BaseTestCase):
|
|||
environ = {'wsgiorg.routing_args': (None, {'action': 'test',
|
||||
'format': 'json'})}
|
||||
res = resource.get('', extra_environ=environ, expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPGatewayTimeout.code)
|
||||
self.assertEqual(wsgi.JSONDeserializer().deserialize(res.body),
|
||||
expected_res)
|
||||
self.assertEqual(exc.HTTPGatewayTimeout.code, res.status_int)
|
||||
self.assertEqual(expected_res,
|
||||
wsgi.JSONDeserializer().deserialize(res.body))
|
||||
|
||||
@mock.patch('oslo_i18n.translate')
|
||||
def test_mapped_neutron_error_localized(self, mock_translation):
|
||||
|
@ -278,7 +278,7 @@ class ResourceTestCase(base.BaseTestCase):
|
|||
|
||||
environ = {'wsgiorg.routing_args': (None, {'action': 'test'})}
|
||||
res = resource.get('', extra_environ=environ)
|
||||
self.assertEqual(res.status_int, 200)
|
||||
self.assertEqual(200, res.status_int)
|
||||
|
||||
def test_status_204(self):
|
||||
controller = mock.MagicMock()
|
||||
|
@ -288,7 +288,7 @@ class ResourceTestCase(base.BaseTestCase):
|
|||
|
||||
environ = {'wsgiorg.routing_args': (None, {'action': 'delete'})}
|
||||
res = resource.delete('', extra_environ=environ)
|
||||
self.assertEqual(res.status_int, 204)
|
||||
self.assertEqual(204, res.status_int)
|
||||
|
||||
def _test_error_log_level(self, expected_webob_exc, expect_log_info=False,
|
||||
use_fault_map=True, exc_raised=None):
|
||||
|
@ -304,7 +304,7 @@ class ResourceTestCase(base.BaseTestCase):
|
|||
environ = {'wsgiorg.routing_args': (None, {'action': 'test'})}
|
||||
with mock.patch.object(wsgi_resource, 'LOG') as log:
|
||||
res = resource.get('', extra_environ=environ, expect_errors=True)
|
||||
self.assertEqual(res.status_int, expected_webob_exc.code)
|
||||
self.assertEqual(expected_webob_exc.code, res.status_int)
|
||||
self.assertEqual(expect_log_info, log.info.called)
|
||||
self.assertNotEqual(expect_log_info, log.exception.called)
|
||||
|
||||
|
@ -336,7 +336,7 @@ class ResourceTestCase(base.BaseTestCase):
|
|||
|
||||
environ = {}
|
||||
res = resource.get('', extra_environ=environ, expect_errors=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPInternalServerError.code)
|
||||
self.assertEqual(exc.HTTPInternalServerError.code, res.status_int)
|
||||
|
||||
def test_post_with_body(self):
|
||||
controller = mock.MagicMock()
|
||||
|
@ -347,4 +347,4 @@ class ResourceTestCase(base.BaseTestCase):
|
|||
environ = {'wsgiorg.routing_args': (None, {'action': 'test'})}
|
||||
res = resource.post('', params='{"key": "val"}',
|
||||
extra_environ=environ)
|
||||
self.assertEqual(res.status_int, 200)
|
||||
self.assertEqual(200, res.status_int)
|
||||
|
|
|
@ -60,15 +60,15 @@ class TestParseMappings(base.BaseTestCase):
|
|||
self.parse(['key1:val', 'key2:val'])
|
||||
|
||||
def test_parse_mappings_succeeds_for_one_mapping(self):
|
||||
self.assertEqual(self.parse(['key:val']), {'key': 'val'})
|
||||
self.assertEqual({'key': 'val'}, self.parse(['key:val']))
|
||||
|
||||
def test_parse_mappings_succeeds_for_n_mappings(self):
|
||||
self.assertEqual(self.parse(['key1:val1', 'key2:val2']),
|
||||
{'key1': 'val1', 'key2': 'val2'})
|
||||
self.assertEqual({'key1': 'val1', 'key2': 'val2'},
|
||||
self.parse(['key1:val1', 'key2:val2']))
|
||||
|
||||
def test_parse_mappings_succeeds_for_duplicate_value(self):
|
||||
self.assertEqual(self.parse(['key1:val', 'key2:val'], False),
|
||||
{'key1': 'val', 'key2': 'val'})
|
||||
self.assertEqual({'key1': 'val', 'key2': 'val'},
|
||||
self.parse(['key1:val', 'key2:val'], False))
|
||||
|
||||
def test_parse_mappings_succeeds_for_no_mappings(self):
|
||||
self.assertEqual({}, self.parse(['']))
|
||||
|
@ -253,12 +253,12 @@ class TestParseOneVlanRange(UtilTestParseVlanRanges):
|
|||
def test_parse_one_net_no_vlan_range(self):
|
||||
config_str = "net1"
|
||||
expected_networks = ("net1", None)
|
||||
self.assertEqual(self.parse_one(config_str), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_one(config_str))
|
||||
|
||||
def test_parse_one_net_and_vlan_range(self):
|
||||
config_str = "net1:100:199"
|
||||
expected_networks = ("net1", (100, 199))
|
||||
self.assertEqual(self.parse_one(config_str), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_one(config_str))
|
||||
|
||||
def test_parse_one_net_incomplete_range(self):
|
||||
config_str = "net1:100"
|
||||
|
@ -294,7 +294,7 @@ class TestParseOneVlanRange(UtilTestParseVlanRanges):
|
|||
def test_parse_one_net_and_max_range(self):
|
||||
config_str = "net1:1:4094"
|
||||
expected_networks = ("net1", (1, 4094))
|
||||
self.assertEqual(self.parse_one(config_str), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_one(config_str))
|
||||
|
||||
def test_parse_one_net_range_bad_vlan1(self):
|
||||
config_str = "net1:9000:150"
|
||||
|
@ -318,33 +318,33 @@ class TestParseVlanRangeList(UtilTestParseVlanRanges):
|
|||
def test_parse_list_one_net_no_vlan_range(self):
|
||||
config_list = ["net1"]
|
||||
expected_networks = {"net1": []}
|
||||
self.assertEqual(self.parse_list(config_list), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_list(config_list))
|
||||
|
||||
def test_parse_list_one_net_vlan_range(self):
|
||||
config_list = ["net1:100:199"]
|
||||
expected_networks = {"net1": [(100, 199)]}
|
||||
self.assertEqual(self.parse_list(config_list), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_list(config_list))
|
||||
|
||||
def test_parse_two_nets_no_vlan_range(self):
|
||||
config_list = ["net1",
|
||||
"net2"]
|
||||
expected_networks = {"net1": [],
|
||||
"net2": []}
|
||||
self.assertEqual(self.parse_list(config_list), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_list(config_list))
|
||||
|
||||
def test_parse_two_nets_range_and_no_range(self):
|
||||
config_list = ["net1:100:199",
|
||||
"net2"]
|
||||
expected_networks = {"net1": [(100, 199)],
|
||||
"net2": []}
|
||||
self.assertEqual(self.parse_list(config_list), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_list(config_list))
|
||||
|
||||
def test_parse_two_nets_no_range_and_range(self):
|
||||
config_list = ["net1",
|
||||
"net2:200:299"]
|
||||
expected_networks = {"net1": [],
|
||||
"net2": [(200, 299)]}
|
||||
self.assertEqual(self.parse_list(config_list), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_list(config_list))
|
||||
|
||||
def test_parse_two_nets_bad_vlan_range1(self):
|
||||
config_list = ["net1:100",
|
||||
|
@ -369,7 +369,7 @@ class TestParseVlanRangeList(UtilTestParseVlanRanges):
|
|||
expected_networks = {"net1": [(100, 199),
|
||||
(1000, 1099)],
|
||||
"net2": [(200, 299)]}
|
||||
self.assertEqual(self.parse_list(config_list), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_list(config_list))
|
||||
|
||||
def test_parse_two_nets_and_append_1_3(self):
|
||||
config_list = ["net1:100:199",
|
||||
|
@ -378,23 +378,23 @@ class TestParseVlanRangeList(UtilTestParseVlanRanges):
|
|||
expected_networks = {"net1": [(100, 199),
|
||||
(1000, 1099)],
|
||||
"net2": [(200, 299)]}
|
||||
self.assertEqual(self.parse_list(config_list), expected_networks)
|
||||
self.assertEqual(expected_networks, self.parse_list(config_list))
|
||||
|
||||
|
||||
class TestDictUtils(base.BaseTestCase):
|
||||
def test_dict2str(self):
|
||||
dic = {"key1": "value1", "key2": "value2", "key3": "value3"}
|
||||
expected = "key1=value1,key2=value2,key3=value3"
|
||||
self.assertEqual(utils.dict2str(dic), expected)
|
||||
self.assertEqual(expected, utils.dict2str(dic))
|
||||
|
||||
def test_str2dict(self):
|
||||
string = "key1=value1,key2=value2,key3=value3"
|
||||
expected = {"key1": "value1", "key2": "value2", "key3": "value3"}
|
||||
self.assertEqual(utils.str2dict(string), expected)
|
||||
self.assertEqual(expected, utils.str2dict(string))
|
||||
|
||||
def test_dict_str_conversion(self):
|
||||
dic = {"key1": "value1", "key2": "value2"}
|
||||
self.assertEqual(utils.str2dict(utils.dict2str(dic)), dic)
|
||||
self.assertEqual(dic, utils.str2dict(utils.dict2str(dic)))
|
||||
|
||||
def test_diff_list_of_dict(self):
|
||||
old_list = [{"key1": "value1"},
|
||||
|
@ -633,12 +633,12 @@ class TestCidrIsHost(base.BaseTestCase):
|
|||
|
||||
class TestIpVersionFromInt(base.BaseTestCase):
|
||||
def test_ip_version_from_int_ipv4(self):
|
||||
self.assertEqual(utils.ip_version_from_int(4),
|
||||
constants.IPv4)
|
||||
self.assertEqual(constants.IPv4,
|
||||
utils.ip_version_from_int(4))
|
||||
|
||||
def test_ip_version_from_int_ipv6(self):
|
||||
self.assertEqual(utils.ip_version_from_int(6),
|
||||
constants.IPv6)
|
||||
self.assertEqual(constants.IPv6,
|
||||
utils.ip_version_from_int(6))
|
||||
|
||||
def test_ip_version_from_int_illegal_int(self):
|
||||
self.assertRaises(ValueError,
|
||||
|
|
|
@ -63,7 +63,7 @@ class AgentSchedulerTestMixIn(object):
|
|||
expected_code=exc.HTTPOk.code):
|
||||
req = self._path_req(path, admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
self.assertEqual(expected_code, res.status_int)
|
||||
return self.deserialize(self.fmt, res)
|
||||
|
||||
def _path_req(self, path, method='GET', data=None,
|
||||
|
@ -142,7 +142,7 @@ class AgentSchedulerTestMixIn(object):
|
|||
{'router_id': router_id},
|
||||
admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
self.assertEqual(expected_code, res.status_int)
|
||||
|
||||
def _add_network_to_dhcp_agent(self, id, network_id,
|
||||
expected_code=exc.HTTPCreated.code,
|
||||
|
@ -154,7 +154,7 @@ class AgentSchedulerTestMixIn(object):
|
|||
{'network_id': network_id},
|
||||
admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
self.assertEqual(expected_code, res.status_int)
|
||||
|
||||
def _remove_network_from_dhcp_agent(self, id, network_id,
|
||||
expected_code=exc.HTTPNoContent.code,
|
||||
|
@ -166,7 +166,7 @@ class AgentSchedulerTestMixIn(object):
|
|||
req = self._path_delete_request(path,
|
||||
admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
self.assertEqual(expected_code, res.status_int)
|
||||
|
||||
def _remove_router_from_l3_agent(self, id, router_id,
|
||||
expected_code=exc.HTTPNoContent.code,
|
||||
|
@ -177,7 +177,7 @@ class AgentSchedulerTestMixIn(object):
|
|||
self.fmt)
|
||||
req = self._path_delete_request(path, admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
self.assertEqual(expected_code, res.status_int)
|
||||
|
||||
def _assert_notify(self, notifications, expected_event_type):
|
||||
event_types = [event['event_type'] for event in notifications]
|
||||
|
|
|
@ -151,7 +151,7 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase):
|
|||
port_security_enabled=False,
|
||||
allowed_address_pairs=address_pairs)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(409, res.status_int)
|
||||
|
||||
address_pairs = []
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -462,7 +462,7 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
|
|||
with self.port(subnet=subnet) as port:
|
||||
ips = port['port']['fixed_ips']
|
||||
self.assertEqual(1, len(ips))
|
||||
self.assertEqual(ips[0]['ip_address'], auto_ip)
|
||||
self.assertEqual(auto_ip, ips[0]['ip_address'])
|
||||
# Update port with another new ip
|
||||
data = {"port": {"fixed_ips": [{
|
||||
'subnet_id': subnet['subnet']['id'],
|
||||
|
@ -489,7 +489,7 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
|
|||
with self.port(subnet=subnet) as port:
|
||||
ips = port['port']['fixed_ips']
|
||||
self.assertEqual(1, len(ips))
|
||||
self.assertEqual(ips[0]['ip_address'], auto_ip)
|
||||
self.assertEqual(auto_ip, ips[0]['ip_address'])
|
||||
req = self.new_delete_request('ports', port['port']['id'])
|
||||
res = req.get_response(self.api)
|
||||
|
||||
|
@ -502,14 +502,14 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
|
|||
with self.port(subnet=subnet) as port:
|
||||
ips = port['port']['fixed_ips']
|
||||
self.assertEqual(1, len(ips))
|
||||
self.assertEqual(ips[0]['ip_address'], ip)
|
||||
self.assertEqual(ip, ips[0]['ip_address'])
|
||||
req = self.new_delete_request('ports', port['port']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
|
||||
with self.port(subnet=subnet, fixed_ips=ips) as port:
|
||||
ips = port['port']['fixed_ips']
|
||||
self.assertEqual(1, len(ips))
|
||||
self.assertEqual(ips[0]['ip_address'], ip)
|
||||
self.assertEqual(ip, ips[0]['ip_address'])
|
||||
|
||||
@mock.patch('neutron.ipam.driver.Pool')
|
||||
def test_update_ips_for_port_passes_port_dict_to_factory(self, pool_mock):
|
||||
|
|
|
@ -117,4 +117,4 @@ class ExtensionTestCase(testlib_api.WebTestCase):
|
|||
test_base._get_path(path, id=entity_id, fmt=self.fmt))
|
||||
delete_entity = getattr(self.plugin.return_value, "delete_" + entity)
|
||||
delete_entity.assert_called_with(mock.ANY, entity_id)
|
||||
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
|
||||
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
|
||||
|
|
|
@ -66,7 +66,7 @@ class AddressScopeTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
|
||||
address_scope_res = address_scope_req.get_response(self.ext_api)
|
||||
if expected_res_status:
|
||||
self.assertEqual(address_scope_res.status_int, expected_res_status)
|
||||
self.assertEqual(expected_res_status, address_scope_res.status_int)
|
||||
return address_scope_res
|
||||
|
||||
def _make_address_scope(self, fmt, ip_version, admin=False, **kwargs):
|
||||
|
|
|
@ -76,7 +76,7 @@ class AgentDBTestMixIn(object):
|
|||
neutron_context=neutron_context,
|
||||
query_params=query_string)
|
||||
if expected_res_status:
|
||||
self.assertEqual(agent_res.status_int, expected_res_status)
|
||||
self.assertEqual(expected_res_status, agent_res.status_int)
|
||||
return agent_res
|
||||
|
||||
def _register_agent_states(self, lbaas_agents=False):
|
||||
|
@ -142,7 +142,7 @@ class AgentDBTestCase(AgentDBTestMixIn,
|
|||
_req.environ['neutron.context'] = context.Context(
|
||||
'', 'tenant_id')
|
||||
res = _req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_list_agent(self):
|
||||
agents = self._register_agent_states()
|
||||
|
|
|
@ -84,7 +84,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
|
||||
port_res = port_req.get_response(self.api)
|
||||
if expected_res_status:
|
||||
self.assertEqual(port_res.status_int, expected_res_status)
|
||||
self.assertEqual(expected_res_status, port_res.status_int)
|
||||
return port_res
|
||||
|
||||
def _test_list_resources(self, resource, items, neutron_context=None,
|
||||
|
@ -104,8 +104,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
self.assertEqual(port['port'][k], v)
|
||||
self.assertIn('mac_address', port['port'])
|
||||
ips = port['port']['fixed_ips']
|
||||
self.assertEqual(len(ips), 1)
|
||||
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
|
||||
self.assertEqual(1, len(ips))
|
||||
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
|
||||
self.assertEqual('myname', port['port']['name'])
|
||||
self._verify_dns_assigment(port['port'],
|
||||
ips_list=['10.0.0.2'])
|
||||
|
@ -135,8 +135,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}}
|
||||
req = self.new_update_request('ports', data, port['port']['id'])
|
||||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
self.assertEqual(res['port']['admin_state_up'],
|
||||
data['port']['admin_state_up'])
|
||||
self.assertEqual(data['port']['admin_state_up'],
|
||||
res['port']['admin_state_up'])
|
||||
self._verify_dns_assigment(res['port'],
|
||||
ips_list=['10.0.0.2'],
|
||||
dns_name='vm1')
|
||||
|
@ -146,8 +146,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}}
|
||||
req = self.new_update_request('ports', data, port['port']['id'])
|
||||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
self.assertEqual(res['port']['admin_state_up'],
|
||||
data['port']['admin_state_up'])
|
||||
self.assertEqual(data['port']['admin_state_up'],
|
||||
res['port']['admin_state_up'])
|
||||
self._verify_dns_assigment(res['port'],
|
||||
ips_list=['10.0.0.2'])
|
||||
|
||||
|
@ -157,7 +157,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
ips_list = ips_list or []
|
||||
ipv4_cidrs = ipv4_cidrs or []
|
||||
ipv6_cidrs = ipv6_cidrs or []
|
||||
self.assertEqual(port['dns_name'], dns_name)
|
||||
self.assertEqual(dns_name, port['dns_name'])
|
||||
dns_assignment = port['dns_assignment']
|
||||
if ips_list:
|
||||
self.assertEqual(len(dns_assignment), len(ips_list))
|
||||
|
@ -240,8 +240,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
with self.subnet() as subnet:
|
||||
with self.port(subnet=subnet) as port:
|
||||
ips = port['port']['fixed_ips']
|
||||
self.assertEqual(len(ips), 1)
|
||||
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
|
||||
self.assertEqual(1, len(ips))
|
||||
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
|
||||
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
|
||||
data = {'port': {'fixed_ips': [{'subnet_id':
|
||||
subnet['subnet']['id'],
|
||||
|
@ -250,7 +250,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
port['port']['id'])
|
||||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
ips = res['port']['fixed_ips']
|
||||
self.assertEqual(len(ips), 1)
|
||||
self.assertEqual(1, len(ips))
|
||||
self.assertEqual(ips[0]['ip_address'], '10.0.0.10')
|
||||
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
|
||||
self._verify_dns_assigment(res['port'], ips_list=['10.0.0.10'])
|
||||
|
@ -259,8 +259,8 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
with self.subnet() as subnet:
|
||||
with self.port(subnet=subnet) as port:
|
||||
ips = port['port']['fixed_ips']
|
||||
self.assertEqual(len(ips), 1)
|
||||
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
|
||||
self.assertEqual(1, len(ips))
|
||||
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
|
||||
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
|
||||
data = {'port': {'fixed_ips': [{'subnet_id':
|
||||
subnet['subnet']['id'],
|
||||
|
@ -270,7 +270,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
port['port']['id'])
|
||||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
ips = res['port']['fixed_ips']
|
||||
self.assertEqual(len(ips), 2)
|
||||
self.assertEqual(2, len(ips))
|
||||
self.assertIn({'ip_address': '10.0.0.2',
|
||||
'subnet_id': subnet['subnet']['id']}, ips)
|
||||
self.assertIn({'ip_address': '10.0.0.10',
|
||||
|
@ -281,54 +281,54 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
|
||||
def test_create_port_with_multiple_ipv4_and_ipv6_subnets(self):
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets()
|
||||
self.assertEqual(res.status_code, 201)
|
||||
self.assertEqual(201, res.status_code)
|
||||
|
||||
def test_create_port_multiple_v4_v6_subnets_pqdn_and_dns_domain_no_period(
|
||||
self):
|
||||
cfg.CONF.set_override('dns_domain', 'example.com')
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
|
||||
dns_name='vm1')
|
||||
self.assertEqual(res.status_code, 201)
|
||||
self.assertEqual(201, res.status_code)
|
||||
|
||||
def test_create_port_multiple_v4_v6_subnets_pqdn_and_dns_domain_period(
|
||||
self):
|
||||
cfg.CONF.set_override('dns_domain', 'example.com.')
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
|
||||
dns_name='vm1')
|
||||
self.assertEqual(res.status_code, 201)
|
||||
self.assertEqual(201, res.status_code)
|
||||
|
||||
def test_create_port_multiple_v4_v6_subnets_pqdn_and_no_dns_domain(
|
||||
self):
|
||||
cfg.CONF.set_override('dns_domain', '')
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets()
|
||||
self.assertEqual(res.status_code, 201)
|
||||
self.assertEqual(201, res.status_code)
|
||||
|
||||
def test_create_port_multiple_v4_v6_subnets_fqdn_and_dns_domain_no_period(
|
||||
self):
|
||||
cfg.CONF.set_override('dns_domain', 'example.com')
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
|
||||
dns_name='vm1.example.com.')
|
||||
self.assertEqual(res.status_code, 201)
|
||||
self.assertEqual(201, res.status_code)
|
||||
|
||||
def test_create_port_multiple_v4_v6_subnets_fqdn_and_dns_domain_period(
|
||||
self):
|
||||
cfg.CONF.set_override('dns_domain', 'example.com.')
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
|
||||
dns_name='vm1.example.com.')
|
||||
self.assertEqual(res.status_code, 201)
|
||||
self.assertEqual(201, res.status_code)
|
||||
|
||||
def test_create_port_multiple_v4_v6_subnets_fqdn_default_domain_period(
|
||||
self):
|
||||
cfg.CONF.set_override('dns_domain', 'openstacklocal.')
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets()
|
||||
self.assertEqual(res.status_code, 201)
|
||||
self.assertEqual(201, res.status_code)
|
||||
|
||||
def test_create_port_multiple_v4_v6_subnets_bad_fqdn_and_dns_domain(
|
||||
self):
|
||||
cfg.CONF.set_override('dns_domain', 'example.com')
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
|
||||
dns_name='vm1.bad-domain.com.')
|
||||
self.assertEqual(res.status_code, 400)
|
||||
self.assertEqual(400, res.status_code)
|
||||
expected_error = ('The dns_name passed is a FQDN. Its higher level '
|
||||
'labels must be equal to the dns_domain option in '
|
||||
'neutron.conf')
|
||||
|
@ -345,7 +345,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
num_labels + 'a' * filler_len)
|
||||
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
|
||||
dns_name=dns_name)
|
||||
self.assertEqual(res.status_code, 400)
|
||||
self.assertEqual(400, res.status_code)
|
||||
expected_error = ("When the two are concatenated to form a FQDN "
|
||||
"(with a '.' at the end), the resulting length "
|
||||
"exceeds the maximum size")
|
||||
|
@ -435,7 +435,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
for dns_name in dns_names:
|
||||
res = self._create_port(self.fmt, net_id=network['network']['id'],
|
||||
dns_name=dns_name)
|
||||
self.assertEqual(res.status_code, 400)
|
||||
self.assertEqual(400, res.status_code)
|
||||
is_expected_message = (
|
||||
'cannot be converted to lowercase string' in res.text or
|
||||
'not a valid PQDN or FQDN. Reason:' in res.text)
|
||||
|
@ -471,4 +471,4 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
for dns_name in dns_names:
|
||||
res = self._create_port(self.fmt, net_id=network['network']['id'],
|
||||
dns_name=dns_name)
|
||||
self.assertEqual(res.status_code, 201)
|
||||
self.assertEqual(201, res.status_code)
|
||||
|
|
|
@ -69,17 +69,17 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
self._set_net_external(n1['network']['id'])
|
||||
with self.network():
|
||||
body = self._list('networks')
|
||||
self.assertEqual(len(body['networks']), 2)
|
||||
self.assertEqual(2, len(body['networks']))
|
||||
|
||||
body = self._list('networks',
|
||||
query_params="%s=True" %
|
||||
external_net.EXTERNAL)
|
||||
self.assertEqual(len(body['networks']), 1)
|
||||
self.assertEqual(1, len(body['networks']))
|
||||
|
||||
body = self._list('networks',
|
||||
query_params="%s=False" %
|
||||
external_net.EXTERNAL)
|
||||
self.assertEqual(len(body['networks']), 1)
|
||||
self.assertEqual(1, len(body['networks']))
|
||||
|
||||
def test_list_nets_external_pagination(self):
|
||||
if self._skip_native_pagination:
|
||||
|
@ -157,7 +157,7 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
set_context='True',
|
||||
tenant_id='noadmin'):
|
||||
pass
|
||||
self.assertEqual(ctx_manager.exception.code, 403)
|
||||
self.assertEqual(403, ctx_manager.exception.code)
|
||||
|
||||
def test_create_port_external_network_admin_succeeds(self):
|
||||
with self.network(router__external=True) as ext_net:
|
||||
|
@ -172,7 +172,7 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
set_context='True',
|
||||
tenant_id='noadmin'):
|
||||
pass
|
||||
self.assertEqual(ctx_manager.exception.code, 403)
|
||||
self.assertEqual(403, ctx_manager.exception.code)
|
||||
|
||||
def test_create_external_network_admin_succeeds(self):
|
||||
with self.network(router__external=True) as ext_net:
|
||||
|
@ -186,6 +186,6 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
with self.network() as net:
|
||||
req = self.new_delete_request('networks', net['network']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
|
||||
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
|
||||
(l3_mock.delete_disassociated_floatingips
|
||||
.assert_called_once_with(mock.ANY, net['network']['id']))
|
||||
|
|
|
@ -163,7 +163,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
|
||||
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
|
||||
port = self.deserialize('json', res)
|
||||
self._check_opts(expected_opts,
|
||||
port['port'][edo_ext.EXTRADHCPOPTS])
|
||||
|
@ -258,7 +258,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_update_port_with_blank_name_extradhcpopt(self):
|
||||
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
|
||||
|
@ -277,7 +277,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_update_port_with_blank_router_extradhcpopt(self):
|
||||
opt_list = [{'opt_name': 'bootfile-name',
|
||||
|
|
|
@ -77,7 +77,7 @@ class ExtraRouteDBTestCaseBase(object):
|
|||
body = self._routes_update_prepare(r['router']['id'],
|
||||
None, p['port']['id'],
|
||||
routes)
|
||||
self.assertEqual(body['router']['routes'], routes)
|
||||
self.assertEqual(routes, body['router']['routes'])
|
||||
self._routes_update_cleanup(p['port']['id'],
|
||||
None, r['router']['id'], [])
|
||||
|
||||
|
@ -108,7 +108,7 @@ class ExtraRouteDBTestCaseBase(object):
|
|||
body = self._routes_update_prepare(r['router']['id'],
|
||||
None, p['port']['id'],
|
||||
routes)
|
||||
self.assertEqual(body['router']['routes'], routes)
|
||||
self.assertEqual(routes, body['router']['routes'])
|
||||
self._router_interface_action(
|
||||
'remove',
|
||||
r['router']['id'],
|
||||
|
@ -151,12 +151,12 @@ class ExtraRouteDBTestCaseBase(object):
|
|||
body = self._routes_update_prepare(r1['router']['id'],
|
||||
None, p1['port']['id'],
|
||||
routes1)
|
||||
self.assertEqual(body['router']['routes'], routes1)
|
||||
self.assertEqual(routes1, body['router']['routes'])
|
||||
|
||||
body = self._routes_update_prepare(r2['router']['id'],
|
||||
None, p2['port']['id'],
|
||||
routes2)
|
||||
self.assertEqual(body['router']['routes'], routes2)
|
||||
self.assertEqual(routes2, body['router']['routes'])
|
||||
|
||||
self._routes_update_cleanup(p1['port']['id'],
|
||||
None, r1['router']['id'], [])
|
||||
|
@ -405,7 +405,7 @@ class ExtraRouteDBTestCaseBase(object):
|
|||
tenant_id=r['router']['tenant_id'],
|
||||
device_owner=constants.DEVICE_OWNER_ROUTER_GW)
|
||||
port_list = self.deserialize('json', port_res)
|
||||
self.assertEqual(len(port_list['ports']), 1)
|
||||
self.assertEqual(1, len(port_list['ports']))
|
||||
|
||||
routes = [{'destination': '135.207.0.0/16',
|
||||
'nexthop': '10.0.1.3'}]
|
||||
|
@ -415,8 +415,7 @@ class ExtraRouteDBTestCaseBase(object):
|
|||
routes}})
|
||||
|
||||
body = self._show('routers', r['router']['id'])
|
||||
self.assertEqual(body['router']['routes'],
|
||||
routes)
|
||||
self.assertEqual(routes, body['router']['routes'])
|
||||
|
||||
self._remove_external_gateway_from_router(
|
||||
r['router']['id'],
|
||||
|
|
|
@ -114,12 +114,12 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
|
|||
content_type='application/%s' % self.fmt)
|
||||
instance.create_router.assert_called_with(mock.ANY,
|
||||
router=data)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('router', res)
|
||||
router = res['router']
|
||||
self.assertEqual(router['id'], router_id)
|
||||
self.assertEqual(router['status'], "ACTIVE")
|
||||
self.assertEqual(router_id, router['id'])
|
||||
self.assertEqual("ACTIVE", router['status'])
|
||||
self.assertTrue(router['admin_state_up'])
|
||||
|
||||
def test_router_list(self):
|
||||
|
@ -138,7 +138,7 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
|
|||
limit=mock.ANY,
|
||||
marker=mock.ANY,
|
||||
page_reverse=mock.ANY)
|
||||
self.assertEqual(res.status_int, exc.HTTPOk.code)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('routers', res)
|
||||
self.assertEqual(1, len(res['routers']))
|
||||
|
@ -160,12 +160,12 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
|
|||
|
||||
instance.update_router.assert_called_with(mock.ANY, router_id,
|
||||
router=update_data)
|
||||
self.assertEqual(res.status_int, exc.HTTPOk.code)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('router', res)
|
||||
router = res['router']
|
||||
self.assertEqual(router['id'], router_id)
|
||||
self.assertEqual(router['status'], "ACTIVE")
|
||||
self.assertEqual(router_id, router['id'])
|
||||
self.assertEqual("ACTIVE", router['status'])
|
||||
self.assertFalse(router['admin_state_up'])
|
||||
|
||||
def test_router_get(self):
|
||||
|
@ -182,12 +182,12 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
|
|||
|
||||
instance.get_router.assert_called_with(mock.ANY, router_id,
|
||||
fields=mock.ANY)
|
||||
self.assertEqual(res.status_int, exc.HTTPOk.code)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('router', res)
|
||||
router = res['router']
|
||||
self.assertEqual(router['id'], router_id)
|
||||
self.assertEqual(router['status'], "ACTIVE")
|
||||
self.assertEqual(router_id, router['id'])
|
||||
self.assertEqual("ACTIVE", router['status'])
|
||||
self.assertFalse(router['admin_state_up'])
|
||||
|
||||
def test_router_delete(self):
|
||||
|
@ -197,7 +197,7 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
|
|||
|
||||
instance = self.plugin.return_value
|
||||
instance.delete_router.assert_called_with(mock.ANY, router_id)
|
||||
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
|
||||
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
|
||||
|
||||
def test_router_add_interface(self):
|
||||
router_id = _uuid()
|
||||
|
@ -218,11 +218,11 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
|
|||
|
||||
instance.add_router_interface.assert_called_with(mock.ANY, router_id,
|
||||
interface_data)
|
||||
self.assertEqual(res.status_int, exc.HTTPOk.code)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('port_id', res)
|
||||
self.assertEqual(res['port_id'], port_id)
|
||||
self.assertEqual(res['subnet_id'], subnet_id)
|
||||
self.assertEqual(port_id, res['port_id'])
|
||||
self.assertEqual(subnet_id, res['subnet_id'])
|
||||
|
||||
|
||||
# This base plugin class is for tests.
|
||||
|
@ -397,10 +397,10 @@ class L3NatTestCaseMixin(object):
|
|||
req.environ['neutron.context'] = context.Context(
|
||||
'', tenant_id)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code, msg)
|
||||
self.assertEqual(expected_code, res.status_int, msg)
|
||||
response = self.deserialize(self.fmt, res)
|
||||
if expected_body:
|
||||
self.assertEqual(response, expected_body, msg)
|
||||
self.assertEqual(expected_body, response, msg)
|
||||
return response
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
@ -444,12 +444,12 @@ class L3NatTestCaseMixin(object):
|
|||
http_status=exc.HTTPCreated.code):
|
||||
res = self._create_floatingip(fmt, network_id, port_id,
|
||||
fixed_ip, set_context, floating_ip)
|
||||
self.assertEqual(res.status_int, http_status)
|
||||
self.assertEqual(http_status, res.status_int)
|
||||
return self.deserialize(fmt, res)
|
||||
|
||||
def _validate_floating_ip(self, fip):
|
||||
body = self._list('floatingips')
|
||||
self.assertEqual(len(body['floatingips']), 1)
|
||||
self.assertEqual(1, len(body['floatingips']))
|
||||
self.assertEqual(body['floatingips'][0]['id'],
|
||||
fip['floatingip']['id'])
|
||||
|
||||
|
@ -1771,7 +1771,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
router_id = router['router']['id']
|
||||
req = self.new_show_request('router', router_id)
|
||||
res = req.get_response(self._api_for_resource('router'))
|
||||
self.assertEqual(res.status_int, 404)
|
||||
self.assertEqual(404, res.status_int)
|
||||
|
||||
def test_router_delete_with_port_existed_returns_409(self):
|
||||
with self.subnet() as subnet:
|
||||
|
@ -1801,7 +1801,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
res = self._create_floatingip(
|
||||
self.fmt, public_sub['subnet']['network_id'],
|
||||
port_id=p['port']['id'])
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
self._delete('routers', r['router']['id'],
|
||||
expected_code=exc.HTTPConflict.code)
|
||||
|
||||
|
@ -1846,7 +1846,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
|
||||
# post-delete, check that it is really gone
|
||||
body = self._list('floatingips')
|
||||
self.assertEqual(len(body['floatingips']), 0)
|
||||
self.assertEqual(0, len(body['floatingips']))
|
||||
|
||||
self._show('floatingips', fip['floatingip']['id'],
|
||||
expected_code=exc.HTTPNotFound.code)
|
||||
|
@ -1872,7 +1872,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
self.fmt,
|
||||
public_sub['subnet']['network_id'],
|
||||
port_id=private_port['port']['id'])
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(400, res.status_int)
|
||||
for p in self._list('ports')['ports']:
|
||||
if (p['device_owner'] ==
|
||||
l3_constants.DEVICE_OWNER_FLOATINGIP):
|
||||
|
@ -1939,15 +1939,15 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
body = self._show('floatingips', fip['floatingip']['id'])
|
||||
self.assertIsNone(body['floatingip']['port_id'])
|
||||
self.assertIsNone(body['floatingip']['fixed_ip_address'])
|
||||
self.assertEqual(body['floatingip']['status'], expected_status)
|
||||
self.assertEqual(expected_status, body['floatingip']['status'])
|
||||
|
||||
port_id = p['port']['id']
|
||||
ip_address = p['port']['fixed_ips'][0]['ip_address']
|
||||
body = self._update('floatingips', fip['floatingip']['id'],
|
||||
{'floatingip': {'port_id': port_id}})
|
||||
self.assertEqual(body['floatingip']['port_id'], port_id)
|
||||
self.assertEqual(body['floatingip']['fixed_ip_address'],
|
||||
ip_address)
|
||||
self.assertEqual(port_id, body['floatingip']['port_id'])
|
||||
self.assertEqual(ip_address,
|
||||
body['floatingip']['fixed_ip_address'])
|
||||
|
||||
def test_floatingip_create_different_fixed_ip_same_port(self):
|
||||
'''This tests that it is possible to delete a port that has
|
||||
|
@ -2150,11 +2150,11 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
body = self._update(
|
||||
'floatingips', fip['floatingip']['id'],
|
||||
{'floatingip': {'port_id': port_id}})
|
||||
self.assertEqual(body['floatingip']['port_id'],
|
||||
port_id)
|
||||
self.assertEqual(port_id,
|
||||
body['floatingip']['port_id'])
|
||||
self.assertEqual(
|
||||
body['floatingip']['fixed_ip_address'],
|
||||
ip_address)
|
||||
ip_address,
|
||||
body['floatingip']['fixed_ip_address'])
|
||||
return body['floatingip']['router_id']
|
||||
|
||||
fip1_r1_res = associate_and_assert(fip1, p1)
|
||||
|
@ -2196,7 +2196,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
self.fmt,
|
||||
fip1['floatingip']['floating_network_id'],
|
||||
fip1['floatingip']['port_id'])
|
||||
self.assertEqual(res.status_int, exc.HTTPConflict.code)
|
||||
self.assertEqual(exc.HTTPConflict.code, res.status_int)
|
||||
|
||||
def test_floating_ip_direct_port_delete_returns_409(self):
|
||||
found = False
|
||||
|
@ -2229,8 +2229,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
res = self._create_floatingip(
|
||||
self.fmt, public_sub['subnet']['network_id'],
|
||||
port_id=p['port']['id'])
|
||||
self.assertEqual(res.status_int,
|
||||
exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_floatingip_with_invalid_create_port(self):
|
||||
self._test_floatingip_with_invalid_create_port(
|
||||
|
@ -2245,7 +2244,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
public_sub['subnet']['network_id'],
|
||||
subnet_id=public_sub['subnet']['id'],
|
||||
set_context=True)
|
||||
self.assertEqual(res.status_int, exc.HTTPCreated.code)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
|
||||
def test_create_floatingip_with_multisubnet_id(self):
|
||||
with self.network() as network:
|
||||
|
@ -2280,7 +2279,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
self.fmt,
|
||||
subnet1['subnet']['network_id'],
|
||||
subnet_id=subnet2['subnet']['id'])
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_floatingip_no_ext_gateway_return_404(self):
|
||||
with self.subnet() as public_sub:
|
||||
|
@ -2292,7 +2291,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
public_sub['subnet']['network_id'],
|
||||
port_id=private_port['port']['id'])
|
||||
# this should be some kind of error
|
||||
self.assertEqual(res.status_int, exc.HTTPNotFound.code)
|
||||
self.assertEqual(exc.HTTPNotFound.code, res.status_int)
|
||||
|
||||
def test_create_floating_non_ext_network_returns_400(self):
|
||||
with self.subnet() as public_sub:
|
||||
|
@ -2303,7 +2302,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
res = self._create_floatingip(
|
||||
self.fmt,
|
||||
public_sub['subnet']['network_id'])
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_floatingip_no_public_subnet_returns_400(self):
|
||||
with self.network() as public_network:
|
||||
|
@ -2319,25 +2318,25 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
self.fmt,
|
||||
public_network['network']['id'],
|
||||
port_id=private_port['port']['id'])
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_floatingip_invalid_floating_network_id_returns_400(self):
|
||||
# API-level test - no need to create all objects for l3 plugin
|
||||
res = self._create_floatingip(self.fmt, 'iamnotanuuid',
|
||||
uuidutils.generate_uuid(), '192.168.0.1')
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
def test_create_floatingip_invalid_floating_port_id_returns_400(self):
|
||||
# API-level test - no need to create all objects for l3 plugin
|
||||
res = self._create_floatingip(self.fmt, uuidutils.generate_uuid(),
|
||||
'iamnotanuuid', '192.168.0.1')
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
def test_create_floatingip_invalid_fixed_ip_address_returns_400(self):
|
||||
# API-level test - no need to create all objects for l3 plugin
|
||||
res = self._create_floatingip(self.fmt, uuidutils.generate_uuid(),
|
||||
uuidutils.generate_uuid(), 'iamnotnanip')
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
def test_floatingip_list_with_sort(self):
|
||||
with self.subnet(cidr="10.0.0.0/24") as s1,\
|
||||
|
@ -2360,9 +2359,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
port_id = fip['floatingip']['port_id']
|
||||
res = self._list('floatingips',
|
||||
query_params="port_id=%s" % port_id)
|
||||
self.assertEqual(len(res['floatingips']), 1)
|
||||
self.assertEqual(1, len(res['floatingips']))
|
||||
res = self._list('floatingips', query_params="port_id=aaa")
|
||||
self.assertEqual(len(res['floatingips']), 0)
|
||||
self.assertEqual(0, len(res['floatingips']))
|
||||
|
||||
def test_floatingip_list_with_pagination(self):
|
||||
with self.subnet(cidr="10.0.0.0/24") as s1,\
|
||||
|
@ -2547,8 +2546,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
self._set_net_external(network_id)
|
||||
fp = self._make_floatingip(self.fmt, network_id,
|
||||
floating_ip='10.0.0.10')
|
||||
self.assertEqual(fp['floatingip']['floating_ip_address'],
|
||||
'10.0.0.10')
|
||||
self.assertEqual('10.0.0.10',
|
||||
fp['floatingip']['floating_ip_address'])
|
||||
|
||||
def test_create_floatingip_with_specific_ip_out_of_allocation(self):
|
||||
with self.subnet(cidr='10.0.0.0/24',
|
||||
|
@ -2559,8 +2558,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
self._set_net_external(network_id)
|
||||
fp = self._make_floatingip(self.fmt, network_id,
|
||||
floating_ip='10.0.0.30')
|
||||
self.assertEqual(fp['floatingip']['floating_ip_address'],
|
||||
'10.0.0.30')
|
||||
self.assertEqual('10.0.0.30',
|
||||
fp['floatingip']['floating_ip_address'])
|
||||
|
||||
def test_create_floatingip_with_specific_ip_non_admin(self):
|
||||
ctx = context.Context('user_id', 'tenant_id')
|
||||
|
@ -2601,7 +2600,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
'tenant_id': 'foo',
|
||||
'admin_state_up': True}}
|
||||
result = plugin.create_router(context.Context('', 'foo'), router_req)
|
||||
self.assertEqual(result['id'], router_req['router']['id'])
|
||||
self.assertEqual(router_req['router']['id'], result['id'])
|
||||
|
||||
def test_create_floatingip_ipv6_only_network_returns_400(self):
|
||||
with self.subnet(cidr="2001:db8::/48", ip_version=6) as public_sub:
|
||||
|
@ -2609,7 +2608,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
res = self._create_floatingip(
|
||||
self.fmt,
|
||||
public_sub['subnet']['network_id'])
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self):
|
||||
with self.network() as n,\
|
||||
|
@ -2617,8 +2616,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
self.subnet(cidr="192.168.1.0/24", ip_version=4, network=n):
|
||||
self._set_net_external(n['network']['id'])
|
||||
fip = self._make_floatingip(self.fmt, n['network']['id'])
|
||||
self.assertEqual(fip['floatingip']['floating_ip_address'],
|
||||
'192.168.1.2')
|
||||
self.assertEqual('192.168.1.2',
|
||||
fip['floatingip']['floating_ip_address'])
|
||||
|
||||
def test_create_floatingip_with_assoc_to_ipv6_subnet(self):
|
||||
with self.subnet() as public_sub:
|
||||
|
@ -2630,14 +2629,14 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
self.fmt,
|
||||
public_sub['subnet']['network_id'],
|
||||
port_id=private_port['port']['id'])
|
||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port(self):
|
||||
with self.network() as n,\
|
||||
self.subnet(cidr='10.0.0.0/24', network=n) as s4,\
|
||||
self.subnet(cidr='2001:db8::/64', ip_version=6, network=n),\
|
||||
self.port(subnet=s4) as p:
|
||||
self.assertEqual(len(p['port']['fixed_ips']), 2)
|
||||
self.assertEqual(2, len(p['port']['fixed_ips']))
|
||||
ipv4_address = next(i['ip_address'] for i in
|
||||
p['port']['fixed_ips'] if
|
||||
netaddr.IPAddress(i['ip_address']).version == 4)
|
||||
|
@ -2646,7 +2645,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
ipv4_address)
|
||||
floating_ip = netaddr.IPAddress(
|
||||
fip['floatingip']['floating_ip_address'])
|
||||
self.assertEqual(floating_ip.version, 4)
|
||||
self.assertEqual(4, floating_ip.version)
|
||||
|
||||
def test_update_subnet_gateway_for_external_net(self):
|
||||
"""Test to make sure notification to routers occurs when the gateway
|
||||
|
@ -2678,8 +2677,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
|||
subnet['subnet']['id'])
|
||||
res = self.deserialize(self.fmt,
|
||||
req.get_response(self.api))
|
||||
self.assertEqual(res['subnet']['gateway_ip'],
|
||||
data['subnet']['gateway_ip'])
|
||||
self.assertEqual(data['subnet']['gateway_ip'],
|
||||
res['subnet']['gateway_ip'])
|
||||
chk_method.assert_called_with(mock.ANY,
|
||||
['fake_device'], None)
|
||||
|
||||
|
@ -2913,7 +2912,7 @@ class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
|
|||
service_constants.L3_ROUTER_NAT)
|
||||
agents = plugin.list_l3_agents_hosting_router(
|
||||
self.adminContext, router_id)['agents']
|
||||
self.assertEqual(len(agents), 1)
|
||||
self.assertEqual(1, len(agents))
|
||||
self.assertEqual(agents[0]['host'], agent_host)
|
||||
|
||||
def test_update_gateway_agent_exists_supporting_network(self):
|
||||
|
|
|
@ -469,9 +469,9 @@ class ExtGwModeIntTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
|
|||
return
|
||||
body = self._show('routers', r['router']['id'])
|
||||
res_gw_info = body['router']['external_gateway_info']
|
||||
self.assertEqual(res_gw_info['network_id'], ext_net_id)
|
||||
self.assertEqual(res_gw_info['enable_snat'],
|
||||
snat_expected_value)
|
||||
self.assertEqual(ext_net_id, res_gw_info['network_id'])
|
||||
self.assertEqual(snat_expected_value,
|
||||
res_gw_info['enable_snat'])
|
||||
finally:
|
||||
self._remove_external_gateway_from_router(
|
||||
r['router']['id'], ext_net_id)
|
||||
|
|
|
@ -61,8 +61,8 @@ class NetmtuExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
self.assertEqual(1, len(res['networks']))
|
||||
self.assertEqual(res['networks'][0]['name'],
|
||||
net1['network']['name'])
|
||||
self.assertEqual(res['networks'][0].get('mtu'),
|
||||
constants.DEFAULT_NETWORK_MTU)
|
||||
self.assertEqual(constants.DEFAULT_NETWORK_MTU,
|
||||
res['networks'][0].get('mtu'))
|
||||
|
||||
def test_show_network_mtu(self):
|
||||
with self.network(name='net1') as net:
|
||||
|
@ -70,5 +70,5 @@ class NetmtuExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
|
|||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
self.assertEqual(res['network']['name'],
|
||||
net['network']['name'])
|
||||
self.assertEqual(res['network']['mtu'],
|
||||
constants.DEFAULT_NETWORK_MTU)
|
||||
self.assertEqual(constants.DEFAULT_NETWORK_MTU,
|
||||
res['network']['mtu'])
|
||||
|
|
|
@ -251,7 +251,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
|
|||
'port_security_enabled'),
|
||||
security_groups=[security_group_id],
|
||||
port_security_enabled=False)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
def test_create_port_with_default_security_group(self):
|
||||
if self._skip_security_group:
|
||||
|
@ -261,7 +261,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
|
|||
res = self._create_port('json', net['network']['id'])
|
||||
port = self.deserialize('json', res)
|
||||
self.assertTrue(port['port'][psec.PORTSECURITY])
|
||||
self.assertEqual(len(port['port'][ext_sg.SECURITYGROUPS]), 1)
|
||||
self.assertEqual(1, len(port['port'][ext_sg.SECURITYGROUPS]))
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_with_security_group_and_net_sec_false(self):
|
||||
|
@ -312,7 +312,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
|
|||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(409, res.status_int)
|
||||
# remove security group on port
|
||||
update_port = {'port': {ext_sg.SECURITYGROUPS: None}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
|
@ -339,7 +339,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
|
|||
port['port']['id'])
|
||||
port = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertFalse(port['port'][psec.PORTSECURITY])
|
||||
self.assertEqual(len(port['port'][ext_sg.SECURITYGROUPS]), 0)
|
||||
self.assertEqual(0, len(port['port'][ext_sg.SECURITYGROUPS]))
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_update_port_remove_port_security_security_group_read(self):
|
||||
|
@ -369,7 +369,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
|
|||
|
||||
port = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertTrue(port['port'][psec.PORTSECURITY])
|
||||
self.assertEqual(len(port['port'][ext_sg.SECURITYGROUPS]), 1)
|
||||
self.assertEqual(1, len(port['port'][ext_sg.SECURITYGROUPS]))
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_security_off_shared_network(self):
|
||||
|
@ -381,7 +381,7 @@ class TestPortSecurity(PortSecurityDBTestCase):
|
|||
tenant_id='not_network_owner',
|
||||
set_context=True)
|
||||
self.deserialize('json', res)
|
||||
self.assertEqual(res.status_int, 403)
|
||||
self.assertEqual(403, res.status_int)
|
||||
|
||||
def test_update_port_security_off_shared_network(self):
|
||||
with self.network(shared=True) as net:
|
||||
|
@ -398,4 +398,4 @@ class TestPortSecurity(PortSecurityDBTestCase):
|
|||
req.environ['neutron.context'] = context.Context(
|
||||
'', 'not_network_owner')
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, exc.HTTPForbidden.code)
|
||||
self.assertEqual(exc.HTTPForbidden.code, res.status_int)
|
||||
|
|
|
@ -134,7 +134,7 @@ class ProvidernetExtensionTestCase(testlib_api.WebTestCase):
|
|||
'shared': False})
|
||||
instance.create_network.assert_called_with(mock.ANY,
|
||||
network=exp_input)
|
||||
self.assertEqual(res.status_int, web_exc.HTTPCreated.code)
|
||||
self.assertEqual(web_exc.HTTPCreated.code, res.status_int)
|
||||
|
||||
def test_network_create_with_bad_provider_attrs_400(self):
|
||||
ctx = context.get_admin_context()
|
||||
|
@ -153,16 +153,16 @@ class ProvidernetExtensionTestCase(testlib_api.WebTestCase):
|
|||
instance.update_network.assert_called_with(mock.ANY,
|
||||
net_id,
|
||||
network=exp_input)
|
||||
self.assertEqual(res.status_int, web_exc.HTTPOk.code)
|
||||
self.assertEqual(web_exc.HTTPOk.code, res.status_int)
|
||||
|
||||
def test_network_create_with_provider_attrs_noadmin_returns_403(self):
|
||||
tenant_id = 'no_admin'
|
||||
ctx = context.Context('', tenant_id, is_admin=False)
|
||||
res, _1 = self._post_network_with_provider_attrs(ctx, True)
|
||||
self.assertEqual(res.status_int, web_exc.HTTPForbidden.code)
|
||||
self.assertEqual(web_exc.HTTPForbidden.code, res.status_int)
|
||||
|
||||
def test_network_update_with_provider_attrs_noadmin_returns_403(self):
|
||||
tenant_id = 'no_admin'
|
||||
ctx = context.Context('', tenant_id, is_admin=False)
|
||||
res, _1, _2 = self._put_network_with_provider_attrs(ctx, True)
|
||||
self.assertEqual(res.status_int, web_exc.HTTPForbidden.code)
|
||||
self.assertEqual(web_exc.HTTPForbidden.code, res.status_int)
|
||||
|
|
|
@ -75,7 +75,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
|
|||
prefix_set = netaddr.IPSet(iterable=prefix_list)
|
||||
allocated_set = netaddr.IPSet(iterable=[detail.subnet_cidr])
|
||||
self.assertTrue(allocated_set.issubset(prefix_set))
|
||||
self.assertEqual(detail.prefixlen, 21)
|
||||
self.assertEqual(21, detail.prefixlen)
|
||||
|
||||
def test_allocate_specific_subnet(self):
|
||||
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
|
||||
|
@ -90,8 +90,8 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
|
|||
res = sa.allocate_subnet(req)
|
||||
detail = res.get_details()
|
||||
sp = self._get_subnetpool(self.ctx, self.plugin, sp['id'])
|
||||
self.assertEqual(str(detail.subnet_cidr), '10.1.2.0/24')
|
||||
self.assertEqual(detail.prefixlen, 24)
|
||||
self.assertEqual('10.1.2.0/24', str(detail.subnet_cidr))
|
||||
self.assertEqual(24, detail.prefixlen)
|
||||
|
||||
def test_insufficient_prefix_space_for_any_allocation(self):
|
||||
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
|
||||
|
@ -146,8 +146,8 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
|
|||
gateway_ip='10.1.2.254')
|
||||
res = sa.allocate_subnet(req)
|
||||
detail = res.get_details()
|
||||
self.assertEqual(detail.gateway_ip,
|
||||
netaddr.IPAddress('10.1.2.254'))
|
||||
self.assertEqual(netaddr.IPAddress('10.1.2.254'),
|
||||
detail.gateway_ip)
|
||||
|
||||
def test_allocate_specific_ipv6_subnet_specific_gateway(self):
|
||||
# Same scenario as described in bug #1466322
|
||||
|
@ -163,8 +163,8 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
|
|||
'2210::ffff:ffff:ffff:ffff')
|
||||
res = sa.allocate_subnet(req)
|
||||
detail = res.get_details()
|
||||
self.assertEqual(detail.gateway_ip,
|
||||
netaddr.IPAddress('2210::ffff:ffff:ffff:ffff'))
|
||||
self.assertEqual(netaddr.IPAddress('2210::ffff:ffff:ffff:ffff'),
|
||||
detail.gateway_ip)
|
||||
|
||||
def test__allocation_value_for_tenant_no_allocations(self):
|
||||
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
|
||||
|
@ -172,7 +172,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
|
|||
21, 4)
|
||||
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
|
||||
value = sa._allocations_used_by_tenant(32)
|
||||
self.assertEqual(value, 0)
|
||||
self.assertEqual(0, value)
|
||||
|
||||
def test_subnetpool_default_quota_exceeded(self):
|
||||
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
|
||||
|
|
|
@ -649,7 +649,7 @@ class TestLinuxBridgeManager(base.BaseTestCase):
|
|||
with mock.patch.object(
|
||||
bridge_lib.BridgeDevice, 'get_interfaces') as get_ifs_fn:
|
||||
get_ifs_fn.return_value = ['tap2101', 'eth0.100', 'vxlan-1000']
|
||||
self.assertEqual(self.lbm.get_tap_devices_count('br0'), 1)
|
||||
self.assertEqual(1, self.lbm.get_tap_devices_count('br0'))
|
||||
|
||||
def test_get_interface_details(self):
|
||||
with mock.patch.object(ip_lib.IpAddrCommand, 'list') as list_fn,\
|
||||
|
@ -683,9 +683,8 @@ class TestLinuxBridgeManager(base.BaseTestCase):
|
|||
list_fn.return_value = ipdict
|
||||
with mock.patch.object(self.lbm, 'ensure_bridge') as ens:
|
||||
self.assertEqual(
|
||||
self.lbm.ensure_flat_bridge("123", None, "eth0"),
|
||||
"eth0"
|
||||
)
|
||||
"eth0",
|
||||
self.lbm.ensure_flat_bridge("123", None, "eth0"))
|
||||
self.assertTrue(list_fn.called)
|
||||
self.assertTrue(getgw_fn.called)
|
||||
ens.assert_called_once_with("brq123", "eth0",
|
||||
|
@ -707,19 +706,19 @@ class TestLinuxBridgeManager(base.BaseTestCase):
|
|||
'get_interface_details') as get_int_det_fn:
|
||||
ens_vl_fn.return_value = "eth0.1"
|
||||
get_int_det_fn.return_value = (None, None)
|
||||
self.assertEqual(self.lbm.ensure_vlan_bridge("123",
|
||||
self.assertEqual("eth0.1",
|
||||
self.lbm.ensure_vlan_bridge("123",
|
||||
None,
|
||||
"eth0",
|
||||
"1"),
|
||||
"eth0.1")
|
||||
"1"))
|
||||
ens.assert_called_with("brq123", "eth0.1", None, None)
|
||||
|
||||
get_int_det_fn.return_value = ("ips", "gateway")
|
||||
self.assertEqual(self.lbm.ensure_vlan_bridge("123",
|
||||
self.assertEqual("eth0.1",
|
||||
self.lbm.ensure_vlan_bridge("123",
|
||||
None,
|
||||
"eth0",
|
||||
"1"),
|
||||
"eth0.1")
|
||||
"1"))
|
||||
ens.assert_called_with("brq123", "eth0.1", "ips", "gateway")
|
||||
|
||||
def test_ensure_vlan_bridge_with_existed_brq(self):
|
||||
|
@ -748,16 +747,16 @@ class TestLinuxBridgeManager(base.BaseTestCase):
|
|||
def test_ensure_vlan(self):
|
||||
with mock.patch.object(ip_lib, 'device_exists') as de_fn:
|
||||
de_fn.return_value = True
|
||||
self.assertEqual(self.lbm.ensure_vlan("eth0", "1"), "eth0.1")
|
||||
self.assertEqual("eth0.1", self.lbm.ensure_vlan("eth0", "1"))
|
||||
de_fn.return_value = False
|
||||
with mock.patch.object(utils, 'execute') as exec_fn:
|
||||
exec_fn.return_value = False
|
||||
self.assertEqual(self.lbm.ensure_vlan("eth0", "1"), "eth0.1")
|
||||
self.assertEqual("eth0.1", self.lbm.ensure_vlan("eth0", "1"))
|
||||
# FIXME(kevinbenton): validate the params to the exec_fn calls
|
||||
self.assertEqual(exec_fn.call_count, 2)
|
||||
self.assertEqual(2, exec_fn.call_count)
|
||||
exec_fn.return_value = True
|
||||
self.assertIsNone(self.lbm.ensure_vlan("eth0", "1"))
|
||||
self.assertEqual(exec_fn.call_count, 3)
|
||||
self.assertEqual(3, exec_fn.call_count)
|
||||
|
||||
def test_ensure_vxlan(self):
|
||||
seg_id = "12345678"
|
||||
|
@ -765,19 +764,19 @@ class TestLinuxBridgeManager(base.BaseTestCase):
|
|||
self.lbm.vxlan_mode = lconst.VXLAN_MCAST
|
||||
with mock.patch.object(ip_lib, 'device_exists') as de_fn:
|
||||
de_fn.return_value = True
|
||||
self.assertEqual(self.lbm.ensure_vxlan(seg_id), "vxlan-" + seg_id)
|
||||
self.assertEqual("vxlan-" + seg_id, self.lbm.ensure_vxlan(seg_id))
|
||||
de_fn.return_value = False
|
||||
with mock.patch.object(self.lbm.ip,
|
||||
'add_vxlan') as add_vxlan_fn:
|
||||
add_vxlan_fn.return_value = FakeIpDevice()
|
||||
self.assertEqual(self.lbm.ensure_vxlan(seg_id),
|
||||
"vxlan-" + seg_id)
|
||||
self.assertEqual("vxlan-" + seg_id,
|
||||
self.lbm.ensure_vxlan(seg_id))
|
||||
add_vxlan_fn.assert_called_with("vxlan-" + seg_id, seg_id,
|
||||
group="224.0.0.1",
|
||||
dev=self.lbm.local_int)
|
||||
cfg.CONF.set_override('l2_population', 'True', 'VXLAN')
|
||||
self.assertEqual(self.lbm.ensure_vxlan(seg_id),
|
||||
"vxlan-" + seg_id)
|
||||
self.assertEqual("vxlan-" + seg_id,
|
||||
self.lbm.ensure_vxlan(seg_id))
|
||||
add_vxlan_fn.assert_called_with("vxlan-" + seg_id, seg_id,
|
||||
group="224.0.0.1",
|
||||
dev=self.lbm.local_int,
|
||||
|
@ -835,7 +834,7 @@ class TestLinuxBridgeManager(base.BaseTestCase):
|
|||
bridge_device.disable_stp.return_value = False
|
||||
bridge_device.disable_ipv6.return_value = False
|
||||
bridge_device.link.set_up.return_value = False
|
||||
self.assertEqual(self.lbm.ensure_bridge("br0", None), "br0")
|
||||
self.assertEqual("br0", self.lbm.ensure_bridge("br0", None))
|
||||
|
||||
bridge_device.owns_interface.return_value = False
|
||||
self.lbm.ensure_bridge("br0", "eth0")
|
||||
|
|
|
@ -1064,10 +1064,10 @@ class TestOvsNeutronAgent(object):
|
|||
'int-br-eth'),
|
||||
]
|
||||
parent.assert_has_calls(expected_calls)
|
||||
self.assertEqual(self.agent.int_ofports["physnet1"],
|
||||
"int_ofport")
|
||||
self.assertEqual(self.agent.phys_ofports["physnet1"],
|
||||
"phy_ofport")
|
||||
self.assertEqual("int_ofport",
|
||||
self.agent.int_ofports["physnet1"])
|
||||
self.assertEqual("phy_ofport",
|
||||
self.agent.phys_ofports["physnet1"])
|
||||
|
||||
def test_setup_physical_bridges_using_veth_interconnection(self):
|
||||
self.agent.use_veth_interconnection = True
|
||||
|
@ -1101,10 +1101,10 @@ class TestOvsNeutronAgent(object):
|
|||
mock.call.add_veth('int-br-eth',
|
||||
'phy-br-eth')]
|
||||
parent.assert_has_calls(expected_calls, any_order=False)
|
||||
self.assertEqual(self.agent.int_ofports["physnet1"],
|
||||
"int_veth_ofport")
|
||||
self.assertEqual(self.agent.phys_ofports["physnet1"],
|
||||
"phys_veth_ofport")
|
||||
self.assertEqual("int_veth_ofport",
|
||||
self.agent.int_ofports["physnet1"])
|
||||
self.assertEqual("phys_veth_ofport",
|
||||
self.agent.phys_ofports["physnet1"])
|
||||
|
||||
def test_setup_physical_bridges_change_from_veth_to_patch_conf(self):
|
||||
with mock.patch.object(sys, "exit"),\
|
||||
|
@ -1142,10 +1142,10 @@ class TestOvsNeutronAgent(object):
|
|||
'int-br-eth'),
|
||||
]
|
||||
parent.assert_has_calls(expected_calls)
|
||||
self.assertEqual(self.agent.int_ofports["physnet1"],
|
||||
"int_ofport")
|
||||
self.assertEqual(self.agent.phys_ofports["physnet1"],
|
||||
"phy_ofport")
|
||||
self.assertEqual("int_ofport",
|
||||
self.agent.int_ofports["physnet1"])
|
||||
self.assertEqual("phy_ofport",
|
||||
self.agent.phys_ofports["physnet1"])
|
||||
|
||||
def test_setup_tunnel_br(self):
|
||||
self.tun_br = mock.Mock()
|
||||
|
@ -1220,11 +1220,11 @@ class TestOvsNeutronAgent(object):
|
|||
|
||||
lvm.vif_ports = {}
|
||||
self.agent.port_unbound("vif1", "netuid12345")
|
||||
self.assertEqual(reclvl_fn.call_count, 2)
|
||||
self.assertEqual(2, reclvl_fn.call_count)
|
||||
|
||||
lvm.vif_ports = {"vif1": mock.Mock()}
|
||||
self.agent.port_unbound("vif3", "netuid12345")
|
||||
self.assertEqual(reclvl_fn.call_count, 2)
|
||||
self.assertEqual(2, reclvl_fn.call_count)
|
||||
|
||||
def _prepare_l2_pop_ofports(self):
|
||||
lvm1 = mock.Mock()
|
||||
|
@ -1431,7 +1431,7 @@ class TestOvsNeutronAgent(object):
|
|||
log_error_fn.assert_called_once_with(
|
||||
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
|
||||
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
|
||||
self.assertEqual(ofport, 0)
|
||||
self.assertEqual(0, ofport)
|
||||
|
||||
def test_setup_tunnel_port_error_negative_df_disabled(self):
|
||||
with mock.patch.object(
|
||||
|
@ -1450,7 +1450,7 @@ class TestOvsNeutronAgent(object):
|
|||
log_error_fn.assert_called_once_with(
|
||||
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
|
||||
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
|
||||
self.assertEqual(ofport, 0)
|
||||
self.assertEqual(0, ofport)
|
||||
|
||||
def test_setup_tunnel_port_error_negative_tunnel_csum(self):
|
||||
with mock.patch.object(
|
||||
|
@ -1469,7 +1469,7 @@ class TestOvsNeutronAgent(object):
|
|||
log_error_fn.assert_called_once_with(
|
||||
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
|
||||
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
|
||||
self.assertEqual(ofport, 0)
|
||||
self.assertEqual(0, ofport)
|
||||
|
||||
def test_tunnel_sync_with_ml2_plugin(self):
|
||||
fake_tunnel_details = {'tunnels': [{'ip_address': '100.101.31.15'}]}
|
||||
|
@ -1771,7 +1771,7 @@ class TestOvsNeutronAgent(object):
|
|||
[mock.call(port=1)],
|
||||
self.agent.int_br.delete_arp_spoofing_protection.mock_calls)
|
||||
# make sure the state was updated with the new map
|
||||
self.assertEqual(self.agent.vifname_to_ofport_map, newmap)
|
||||
self.assertEqual(newmap, self.agent.vifname_to_ofport_map)
|
||||
|
||||
def test_update_stale_ofport_rules_treats_moved(self):
|
||||
self.agent.prevent_arp_spoofing = True
|
||||
|
|
|
@ -320,7 +320,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
|
|||
self.ctx.session, port_id, 'foo_host', router.id)
|
||||
expected = (self.ctx.session.query(models.DVRPortBinding).
|
||||
filter_by(port_id=port_id).one())
|
||||
self.assertEqual(expected.port_id, port_id)
|
||||
self.assertEqual(port_id, expected.port_id)
|
||||
|
||||
def test_ensure_dvr_port_binding_multiple_bindings(self):
|
||||
network_id = 'foo_network_id'
|
||||
|
|
|
@ -60,7 +60,7 @@ class PSExtDriverTestCase(test_plugin.Ml2PluginV2TestCase,
|
|||
'port_security_enabled'),
|
||||
security_groups=[],
|
||||
port_security_enabled=False)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(201, res.status_int)
|
||||
port = self.deserialize('json', res)
|
||||
self.assertFalse(port['port'][psec.PORTSECURITY])
|
||||
self.assertEqual([], port['port']['security_groups'])
|
||||
|
|
|
@ -393,7 +393,7 @@ class TestMl2NetworksWithVlanTransparencyAndMTU(TestMl2NetworksV2):
|
|||
res = network_req.get_response(self.api)
|
||||
self.assertEqual(201, res.status_int)
|
||||
network = self.deserialize(self.fmt, res)['network']
|
||||
self.assertEqual(network['mtu'], 1000)
|
||||
self.assertEqual(1000, network['mtu'])
|
||||
self.assertIn('vlan_transparent', network)
|
||||
|
||||
|
||||
|
|
|
@ -47,16 +47,16 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
|||
self.plugin.start_rpc_listeners()
|
||||
|
||||
def _check_response(self, port, vif_type, has_port_filter, bound, status):
|
||||
self.assertEqual(port[portbindings.VIF_TYPE], vif_type)
|
||||
self.assertEqual(vif_type, port[portbindings.VIF_TYPE])
|
||||
vif_details = port[portbindings.VIF_DETAILS]
|
||||
port_status = port['status']
|
||||
if bound:
|
||||
# TODO(rkukura): Replace with new VIF security details
|
||||
self.assertEqual(vif_details[portbindings.CAP_PORT_FILTER],
|
||||
has_port_filter)
|
||||
self.assertEqual(port_status, status or 'DOWN')
|
||||
self.assertEqual(has_port_filter,
|
||||
vif_details[portbindings.CAP_PORT_FILTER])
|
||||
self.assertEqual(status or 'DOWN', port_status)
|
||||
else:
|
||||
self.assertEqual(port_status, 'DOWN')
|
||||
self.assertEqual('DOWN', port_status)
|
||||
|
||||
def _test_port_binding(self, host, vif_type, has_port_filter, bound,
|
||||
status=None, network_type='local'):
|
||||
|
@ -72,7 +72,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
|||
details = self.plugin.endpoints[0].get_device_details(
|
||||
neutron_context, agent_id="theAgentId", device=port_id)
|
||||
if bound:
|
||||
self.assertEqual(details['network_type'], network_type)
|
||||
self.assertEqual(network_type, details['network_type'])
|
||||
self.assertEqual(mac_address, details['mac_address'])
|
||||
else:
|
||||
self.assertNotIn('network_type', details)
|
||||
|
@ -152,10 +152,10 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
|||
neutron_context=neutron_context)
|
||||
port_data = updated_port['port']
|
||||
if new_host is not None:
|
||||
self.assertEqual(port_data[portbindings.HOST_ID],
|
||||
new_host)
|
||||
self.assertEqual(new_host,
|
||||
port_data[portbindings.HOST_ID])
|
||||
else:
|
||||
self.assertEqual(port_data[portbindings.HOST_ID], host)
|
||||
self.assertEqual(host, port_data[portbindings.HOST_ID])
|
||||
if new_host is not None and new_host != host:
|
||||
notify_mock.assert_called_once_with(mock.ANY)
|
||||
else:
|
||||
|
|
|
@ -608,9 +608,9 @@ class L3SchedulerTestBaseMixin(object):
|
|||
self, router, agent_list, exp_host, count=1):
|
||||
candidates = self.get_l3_agent_candidates(self.adminContext,
|
||||
router, agent_list)
|
||||
self.assertEqual(len(candidates), count)
|
||||
self.assertEqual(count, len(candidates))
|
||||
if count:
|
||||
self.assertEqual(candidates[0]['host'], exp_host)
|
||||
self.assertEqual(exp_host, candidates[0]['host'])
|
||||
|
||||
def test_get_l3_agent_candidates_legacy(self):
|
||||
self._register_l3_dvr_agents()
|
||||
|
@ -768,8 +768,8 @@ class L3AgentChanceSchedulerTestCase(L3SchedulerTestCaseMixin,
|
|||
self.adminContext, [r1['router']['id']],
|
||||
admin_state_up=True)
|
||||
|
||||
self.assertEqual(len(agents), 1)
|
||||
self.assertEqual(random_mock.call_count, 1)
|
||||
self.assertEqual(1, len(agents))
|
||||
self.assertEqual(1, random_mock.call_count)
|
||||
|
||||
with self.router_with_ext_gw(name='r2', subnet=subnet) as r2:
|
||||
agents = self.get_l3_agents_hosting_routers(
|
||||
|
@ -777,7 +777,7 @@ class L3AgentChanceSchedulerTestCase(L3SchedulerTestCaseMixin,
|
|||
admin_state_up=True)
|
||||
|
||||
self.assertEqual(len(agents), 1)
|
||||
self.assertEqual(random_mock.call_count, 2)
|
||||
self.assertEqual(2, random_mock.call_count)
|
||||
|
||||
random_patch.stop()
|
||||
|
||||
|
@ -828,7 +828,7 @@ class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCaseMixin,
|
|||
agents = self.get_l3_agents_hosting_routers(
|
||||
self.adminContext, [r1['router']['id']],
|
||||
admin_state_up=True)
|
||||
self.assertEqual(len(agents), 1)
|
||||
self.assertEqual(1, len(agents))
|
||||
|
||||
agent_id1 = agents[0]['id']
|
||||
|
||||
|
@ -836,7 +836,7 @@ class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCaseMixin,
|
|||
agents = self.get_l3_agents_hosting_routers(
|
||||
self.adminContext, [r2['router']['id']],
|
||||
admin_state_up=True)
|
||||
self.assertEqual(len(agents), 1)
|
||||
self.assertEqual(1, len(agents))
|
||||
|
||||
agent_id2 = agents[0]['id']
|
||||
|
||||
|
@ -852,7 +852,7 @@ class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCaseMixin,
|
|||
agents = self.get_l3_agents_hosting_routers(
|
||||
self.adminContext, [r3['router']['id']],
|
||||
admin_state_up=True)
|
||||
self.assertEqual(len(agents), 1)
|
||||
self.assertEqual(1, len(agents))
|
||||
|
||||
agent_id3 = agents[0]['id']
|
||||
|
||||
|
@ -1186,7 +1186,7 @@ class L3DvrSchedulerTestCase(testlib_api.SqlTestCase):
|
|||
'.get_ports', return_value=[dvr_port]):
|
||||
router_id = self.dut.get_dvr_routers_by_subnet_ids(
|
||||
self.adminContext, [subnet_id])
|
||||
self.assertEqual(router_id.pop(), r1['id'])
|
||||
self.assertEqual(r1['id'], router_id.pop())
|
||||
|
||||
def test_get_subnet_ids_on_router(self):
|
||||
dvr_port = {
|
||||
|
@ -1228,7 +1228,7 @@ class L3DvrSchedulerTestCase(testlib_api.SqlTestCase):
|
|||
return_value=[dvr_port]):
|
||||
sub_ids = self.dut.get_subnet_ids_on_router(self.adminContext,
|
||||
r1['id'])
|
||||
self.assertEqual(len(sub_ids), 0)
|
||||
self.assertEqual(0, len(sub_ids))
|
||||
|
||||
def _prepare_schedule_snat_tests(self):
|
||||
agent = agents_db.Agent()
|
||||
|
|
Loading…
Reference in New Issue