Merge "Remove mox in unit/network/test_neutronv2.py (7)"

This commit is contained in:
Zuul 2019-04-09 03:22:08 +00:00 committed by Gerrit Code Review
commit d18d638d61
1 changed files with 169 additions and 169 deletions

View File

@ -836,175 +836,6 @@ class TestNeutronv2(TestNeutronv2Base):
self.addCleanup(self.mox.UnsetStubs)
self.addCleanup(self.stubs.UnsetAll)
def test_get_instance_nw_info_without_subnet(self):
# Test get instance_nw_info for a port without subnet.
api = neutronapi.API()
self.mox.StubOutWithMock(api.db, 'instance_info_cache_update')
api.db.instance_info_cache_update(
mox.IgnoreArg(),
self.instance['uuid'], mox.IgnoreArg()).AndReturn(fake_info_cache)
neutronapi.get_client(mox.IgnoreArg(), admin=True).AndReturn(
self.moxed_client)
neutronapi.get_client(mox.IgnoreArg(), admin=True).AndReturn(
self.moxed_client)
self.mox.StubOutWithMock(api, '_get_physnet_tunneled_info')
api._get_physnet_tunneled_info(
mox.IgnoreArg(), mox.IgnoreArg(),
self.port_data1[0]['network_id']).AndReturn((None, False))
self.moxed_client.list_ports(
tenant_id=self.instance['project_id'],
device_id=self.instance['uuid']).AndReturn(
{'ports': self.port_data3})
self.moxed_client.list_networks(
id=[self.port_data1[0]['network_id']]).AndReturn(
{'networks': self.nets1})
net_info_cache = []
for port in self.port_data3:
net_info_cache.append({"network": {"id": port['network_id']},
"id": port['id']})
self.instance['info_cache'] = self._fake_instance_info_cache(
net_info_cache, self.instance['uuid'])
self.mox.StubOutWithMock(api.db, 'instance_info_cache_get')
api.db.instance_info_cache_get(
mox.IgnoreArg(),
self.instance['uuid']).AndReturn(self.instance['info_cache'])
self.mox.ReplayAll()
instance = self._fake_instance_object_with_info_cache(self.instance)
nw_inf = api.get_instance_nw_info(self.context,
instance)
id_suffix = 3
self.assertEqual(0, len(nw_inf.fixed_ips()))
self.assertEqual('my_netname1', nw_inf[0]['network']['label'])
self.assertEqual(uuids.portid_3, nw_inf[0]['id'])
self.assertEqual('my_mac%s' % id_suffix, nw_inf[0]['address'])
self.assertEqual(0, len(nw_inf[0]['network']['subnets']))
def test_refresh_neutron_extensions_cache(self):
api = neutronapi.API()
# Note: Don't want the default get_client from setUp()
self.mox.ResetAll()
neutronapi.get_client(mox.IgnoreArg()).AndReturn(
self.moxed_client)
self.moxed_client.list_extensions().AndReturn(
{'extensions': [{'name': constants.QOS_QUEUE}]})
self.mox.ReplayAll()
api._refresh_neutron_extensions_cache(mox.IgnoreArg())
self.assertEqual(
{constants.QOS_QUEUE: {'name': constants.QOS_QUEUE}},
api.extensions)
def test_populate_neutron_extension_values_rxtx_factor(self):
api = neutronapi.API()
# Note: Don't want the default get_client from setUp()
self.mox.ResetAll()
neutronapi.get_client(mox.IgnoreArg()).AndReturn(
self.moxed_client)
self.moxed_client.list_extensions().AndReturn(
{'extensions': [{'name': constants.QOS_QUEUE}]})
self.mox.ReplayAll()
flavor = flavors.get_default_flavor()
flavor['rxtx_factor'] = 1
instance = objects.Instance(system_metadata={})
instance.flavor = flavor
port_req_body = {'port': {}}
api._populate_neutron_extension_values(self.context, instance,
None, port_req_body)
self.assertEqual(1, port_req_body['port']['rxtx_factor'])
def test_allocate_for_instance_no_networks(self):
"""verify the exception thrown when there are no networks defined."""
self.instance = fake_instance.fake_instance_obj(self.context,
**self.instance)
api = neutronapi.API()
self.moxed_client.list_networks(
tenant_id=self.instance.project_id,
shared=False).AndReturn(
{'networks': model.NetworkInfo([])})
neutronapi.get_client(mox.IgnoreArg()).AndReturn(self.moxed_client)
self.moxed_client.list_networks(shared=True).AndReturn(
{'networks': model.NetworkInfo([])})
self.mox.ReplayAll()
nwinfo = api.allocate_for_instance(self.context, self.instance,
False, None)
self.assertEqual(0, len(nwinfo))
@mock.patch(
'nova.network.neutronv2.api.API._populate_neutron_extension_values')
@mock.patch('nova.network.neutronv2.api.API._create_ports_for_instance')
@mock.patch('nova.network.neutronv2.api.API._unbind_ports')
def test_allocate_for_instance_ex1(self, mock_unbind, mock_create_ports,
mock_populate):
"""verify we will delete created ports
if we fail to allocate all net resources.
Mox to raise exception when creating a second port.
In this case, the code should delete the first created port.
"""
self.instance = fake_instance.fake_instance_obj(self.context,
**self.instance)
api = neutronapi.API()
requested_networks = objects.NetworkRequestList(
objects=[objects.NetworkRequest(network_id=net['id'])
for net in (self.nets2[0], self.nets2[1])])
neutronapi.get_client(mox.IgnoreArg()).AndReturn(self.moxed_client)
self.moxed_client.list_networks(
id=[uuids.my_netid1, uuids.my_netid2]).AndReturn(
{'networks': self.nets2})
mock_create_ports.return_value = [
(request, (getattr(uuids, 'portid_%s' % request.network_id)))
for request in requested_networks
]
neutronapi.get_client(
mox.IgnoreArg(), admin=True).AndReturn(
self.moxed_client)
index = 0
for network in self.nets2:
binding_port_req_body = {
'port': {
'device_id': self.instance.uuid,
'device_owner': 'compute:nova',
},
}
port_req_body = {
'port': {
'network_id': network['id'],
'admin_state_up': True,
'tenant_id': self.instance.project_id,
},
}
port_req_body['port'].update(binding_port_req_body['port'])
port_id = getattr(uuids, 'portid_%s' % network['id'])
port = {'id': port_id, 'mac_address': 'foo'}
if index == 0:
self.moxed_client.update_port(port_id,
MyComparator(binding_port_req_body)).AndReturn(
{'port': port})
else:
NeutronOverQuota = exceptions.MacAddressInUseClient()
self.moxed_client.update_port(port_id,
MyComparator(binding_port_req_body)).AndRaise(
NeutronOverQuota)
index += 1
self.moxed_client.delete_port(
getattr(uuids, 'portid_%s' % self.nets2[0]['id']))
self.moxed_client.delete_port(
getattr(uuids, 'portid_%s' % self.nets2[1]['id']))
self.mox.ReplayAll()
self.assertRaises(exception.PortInUse,
api.allocate_for_instance,
self.context, self.instance, False,
requested_networks=requested_networks)
mock_unbind.assert_called_once_with(self.context, [],
self.moxed_client, mock.ANY)
def test_allocate_for_instance_ex2(self):
"""verify we have no port to delete
if we fail to allocate the first net resource.
@ -3204,6 +3035,83 @@ class TestNeutronv2WithMock(TestNeutronv2Base):
self.assertEqual(len(expected_list_ports_calls),
mocked_client.list_ports.call_count)
@mock.patch.object(neutronapi.API, '_get_physnet_tunneled_info',
return_value=(None, False))
@mock.patch.object(db_api, 'instance_info_cache_get')
@mock.patch.object(db_api, 'instance_info_cache_update',
return_value=fake_info_cache)
@mock.patch.object(neutronapi, 'get_client')
def test_get_instance_nw_info_without_subnet(
self, mock_get_client, mock_cache_update, mock_cache_get,
mock_get_physnet):
# Test get instance_nw_info for a port without subnet.
mocked_client = mock.create_autospec(client.Client)
mock_get_client.return_value = mocked_client
mocked_client.list_ports.return_value = {'ports': self.port_data3}
mocked_client.list_networks.return_value = {'networks': self.nets1}
net_info_cache = []
for port in self.port_data3:
net_info_cache.append({"network": {"id": port['network_id']},
"id": port['id']})
self.instance['info_cache'] = self._fake_instance_info_cache(
net_info_cache, self.instance['uuid'])
mock_cache_get.return_value = self.instance['info_cache']
instance = self._fake_instance_object_with_info_cache(self.instance)
nw_inf = self.api.get_instance_nw_info(self.context, instance)
id_suffix = 3
self.assertEqual(0, len(nw_inf.fixed_ips()))
self.assertEqual('my_netname1', nw_inf[0]['network']['label'])
self.assertEqual(uuids.portid_3, nw_inf[0]['id'])
self.assertEqual('my_mac%s' % id_suffix, nw_inf[0]['address'])
self.assertEqual(0, len(nw_inf[0]['network']['subnets']))
mock_get_client.assert_has_calls([mock.call(mock.ANY, admin=True)] * 2,
any_order=True)
mock_cache_update.assert_called_once_with(
mock.ANY, self.instance['uuid'], mock.ANY)
mock_cache_get.assert_called_once_with(mock.ANY, self.instance['uuid'])
mocked_client.list_ports.assert_called_once_with(
tenant_id=self.instance['project_id'],
device_id=self.instance['uuid'])
mocked_client.list_networks.assert_called_once_with(
id=[self.port_data1[0]['network_id']])
mock_get_physnet.assert_called_once_with(
mock.ANY, mock.ANY, self.port_data1[0]['network_id'])
@mock.patch.object(neutronapi, 'get_client')
def test_refresh_neutron_extensions_cache(self, mock_get_client):
mocked_client = mock.create_autospec(client.Client)
mock_get_client.return_value = mocked_client
mocked_client.list_extensions.return_value = {
'extensions': [{'name': constants.QOS_QUEUE}]}
self.api._refresh_neutron_extensions_cache(self.context)
self.assertEqual(
{constants.QOS_QUEUE: {'name': constants.QOS_QUEUE}},
self.api.extensions)
mock_get_client.assert_called_once_with(self.context)
mocked_client.list_extensions.assert_called_once_with()
@mock.patch.object(neutronapi, 'get_client')
def test_populate_neutron_extension_values_rxtx_factor(
self, mock_get_client):
mocked_client = mock.create_autospec(client.Client)
mock_get_client.return_value = mocked_client
mocked_client.list_extensions.return_value = {
'extensions': [{'name': constants.QOS_QUEUE}]}
flavor = flavors.get_default_flavor()
flavor['rxtx_factor'] = 1
instance = objects.Instance(system_metadata={})
instance.flavor = flavor
port_req_body = {'port': {}}
self.api._populate_neutron_extension_values(self.context, instance,
None, port_req_body)
self.assertEqual(1, port_req_body['port']['rxtx_factor'])
mock_get_client.assert_called_once_with(self.context)
mocked_client.list_extensions.assert_called_once_with()
def test_allocate_for_instance_1(self):
# Allocate one port in one network env.
self._test_allocate_for_instance_with_virtual_interface(1)
@ -3368,6 +3276,98 @@ class TestNeutronv2WithMock(TestNeutronv2Base):
self._test_allocate_for_instance_with_virtual_interface(
net_idx=1, requested_networks=requested_networks)
@mock.patch.object(neutronapi, 'get_client')
def test_allocate_for_instance_no_networks(self, mock_get_client):
"""verify the exception thrown when there are no networks defined."""
self.instance = fake_instance.fake_instance_obj(self.context,
**self.instance)
mocked_client = mock.create_autospec(client.Client)
mock_get_client.return_value = mocked_client
mocked_client.list_networks.return_value = {
'networks': model.NetworkInfo([])}
nwinfo = self.api.allocate_for_instance(self.context, self.instance,
False, None)
self.assertEqual(0, len(nwinfo))
mock_get_client.assert_called_once_with(self.context)
mocked_client.list_networks.assert_has_calls([
mock.call(tenant_id=self.instance.project_id, shared=False),
mock.call(shared=True)])
self.assertEqual(2, mocked_client.list_networks.call_count)
@mock.patch.object(neutronapi, 'get_client')
@mock.patch(
'nova.network.neutronv2.api.API._populate_neutron_extension_values')
@mock.patch('nova.network.neutronv2.api.API._create_ports_for_instance')
@mock.patch('nova.network.neutronv2.api.API._unbind_ports')
def test_allocate_for_instance_ex1(self, mock_unbind, mock_create_ports,
mock_populate, mock_get_client):
"""verify we will delete created ports
if we fail to allocate all net resources.
Mox to raise exception when creating a second port.
In this case, the code should delete the first created port.
"""
self.instance = fake_instance.fake_instance_obj(self.context,
**self.instance)
requested_networks = objects.NetworkRequestList(
objects=[objects.NetworkRequest(network_id=net['id'])
for net in (self.nets2[0], self.nets2[1])])
mocked_client = mock.create_autospec(client.Client)
mock_get_client.return_value = mocked_client
mocked_client.list_networks.return_value = {'networks': self.nets2}
mock_create_ports.return_value = [
(request, (getattr(uuids, 'portid_%s' % request.network_id)))
for request in requested_networks
]
index = 0
update_port_values = []
expected_update_port_calls = []
for network in self.nets2:
binding_port_req_body = {
'port': {
'device_id': self.instance.uuid,
'device_owner': 'compute:nova',
},
}
port_req_body = {
'port': {
'network_id': network['id'],
'admin_state_up': True,
'tenant_id': self.instance.project_id,
},
}
port_req_body['port'].update(binding_port_req_body['port'])
port_id = getattr(uuids, 'portid_%s' % network['id'])
port = {'id': port_id, 'mac_address': 'foo'}
if index == 0:
update_port_values.append({'port': port})
else:
update_port_values.append(exceptions.MacAddressInUseClient())
expected_update_port_calls.append(mock.call(
port_id, binding_port_req_body))
index += 1
mocked_client.update_port.side_effect = update_port_values
self.assertRaises(exception.PortInUse,
self.api.allocate_for_instance,
self.context, self.instance, False,
requested_networks=requested_networks)
mock_unbind.assert_called_once_with(self.context, [],
mocked_client, mock.ANY)
mock_get_client.assert_has_calls([
mock.call(self.context),
mock.call(self.context, admin=True)], any_order=True)
mocked_client.list_networks.assert_called_once_with(
id=[uuids.my_netid1, uuids.my_netid2])
mocked_client.update_port.assert_has_calls(expected_update_port_calls)
self.assertEqual(len(expected_update_port_calls),
mocked_client.update_port.call_count)
mocked_client.delete_port.assert_has_calls([
mock.call(getattr(uuids, 'portid_%s' % self.nets2[0]['id'])),
mock.call(getattr(uuids, 'portid_%s' % self.nets2[1]['id']))])
self.assertEqual(2, mocked_client.delete_port.call_count)
def test_allocate_for_instance_second_time(self):
# Make sure that allocate_for_instance only returns ports that it
# allocated during _that_ run.