Merge "Remove mox in unit/network/test_neutronv2.py (7)"
This commit is contained in:
commit
d18d638d61
|
@ -836,175 +836,6 @@ class TestNeutronv2(TestNeutronv2Base):
|
|||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.addCleanup(self.stubs.UnsetAll)
|
||||
|
||||
def test_get_instance_nw_info_without_subnet(self):
|
||||
# Test get instance_nw_info for a port without subnet.
|
||||
api = neutronapi.API()
|
||||
self.mox.StubOutWithMock(api.db, 'instance_info_cache_update')
|
||||
api.db.instance_info_cache_update(
|
||||
mox.IgnoreArg(),
|
||||
self.instance['uuid'], mox.IgnoreArg()).AndReturn(fake_info_cache)
|
||||
neutronapi.get_client(mox.IgnoreArg(), admin=True).AndReturn(
|
||||
self.moxed_client)
|
||||
neutronapi.get_client(mox.IgnoreArg(), admin=True).AndReturn(
|
||||
self.moxed_client)
|
||||
self.mox.StubOutWithMock(api, '_get_physnet_tunneled_info')
|
||||
api._get_physnet_tunneled_info(
|
||||
mox.IgnoreArg(), mox.IgnoreArg(),
|
||||
self.port_data1[0]['network_id']).AndReturn((None, False))
|
||||
self.moxed_client.list_ports(
|
||||
tenant_id=self.instance['project_id'],
|
||||
device_id=self.instance['uuid']).AndReturn(
|
||||
{'ports': self.port_data3})
|
||||
self.moxed_client.list_networks(
|
||||
id=[self.port_data1[0]['network_id']]).AndReturn(
|
||||
{'networks': self.nets1})
|
||||
|
||||
net_info_cache = []
|
||||
for port in self.port_data3:
|
||||
net_info_cache.append({"network": {"id": port['network_id']},
|
||||
"id": port['id']})
|
||||
self.instance['info_cache'] = self._fake_instance_info_cache(
|
||||
net_info_cache, self.instance['uuid'])
|
||||
|
||||
self.mox.StubOutWithMock(api.db, 'instance_info_cache_get')
|
||||
api.db.instance_info_cache_get(
|
||||
mox.IgnoreArg(),
|
||||
self.instance['uuid']).AndReturn(self.instance['info_cache'])
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
instance = self._fake_instance_object_with_info_cache(self.instance)
|
||||
nw_inf = api.get_instance_nw_info(self.context,
|
||||
instance)
|
||||
|
||||
id_suffix = 3
|
||||
self.assertEqual(0, len(nw_inf.fixed_ips()))
|
||||
self.assertEqual('my_netname1', nw_inf[0]['network']['label'])
|
||||
self.assertEqual(uuids.portid_3, nw_inf[0]['id'])
|
||||
self.assertEqual('my_mac%s' % id_suffix, nw_inf[0]['address'])
|
||||
self.assertEqual(0, len(nw_inf[0]['network']['subnets']))
|
||||
|
||||
def test_refresh_neutron_extensions_cache(self):
|
||||
api = neutronapi.API()
|
||||
|
||||
# Note: Don't want the default get_client from setUp()
|
||||
self.mox.ResetAll()
|
||||
neutronapi.get_client(mox.IgnoreArg()).AndReturn(
|
||||
self.moxed_client)
|
||||
self.moxed_client.list_extensions().AndReturn(
|
||||
{'extensions': [{'name': constants.QOS_QUEUE}]})
|
||||
self.mox.ReplayAll()
|
||||
api._refresh_neutron_extensions_cache(mox.IgnoreArg())
|
||||
self.assertEqual(
|
||||
{constants.QOS_QUEUE: {'name': constants.QOS_QUEUE}},
|
||||
api.extensions)
|
||||
|
||||
def test_populate_neutron_extension_values_rxtx_factor(self):
|
||||
api = neutronapi.API()
|
||||
|
||||
# Note: Don't want the default get_client from setUp()
|
||||
self.mox.ResetAll()
|
||||
neutronapi.get_client(mox.IgnoreArg()).AndReturn(
|
||||
self.moxed_client)
|
||||
self.moxed_client.list_extensions().AndReturn(
|
||||
{'extensions': [{'name': constants.QOS_QUEUE}]})
|
||||
self.mox.ReplayAll()
|
||||
flavor = flavors.get_default_flavor()
|
||||
flavor['rxtx_factor'] = 1
|
||||
instance = objects.Instance(system_metadata={})
|
||||
instance.flavor = flavor
|
||||
port_req_body = {'port': {}}
|
||||
api._populate_neutron_extension_values(self.context, instance,
|
||||
None, port_req_body)
|
||||
self.assertEqual(1, port_req_body['port']['rxtx_factor'])
|
||||
|
||||
def test_allocate_for_instance_no_networks(self):
|
||||
"""verify the exception thrown when there are no networks defined."""
|
||||
self.instance = fake_instance.fake_instance_obj(self.context,
|
||||
**self.instance)
|
||||
api = neutronapi.API()
|
||||
self.moxed_client.list_networks(
|
||||
tenant_id=self.instance.project_id,
|
||||
shared=False).AndReturn(
|
||||
{'networks': model.NetworkInfo([])})
|
||||
neutronapi.get_client(mox.IgnoreArg()).AndReturn(self.moxed_client)
|
||||
self.moxed_client.list_networks(shared=True).AndReturn(
|
||||
{'networks': model.NetworkInfo([])})
|
||||
self.mox.ReplayAll()
|
||||
nwinfo = api.allocate_for_instance(self.context, self.instance,
|
||||
False, None)
|
||||
self.assertEqual(0, len(nwinfo))
|
||||
|
||||
@mock.patch(
|
||||
'nova.network.neutronv2.api.API._populate_neutron_extension_values')
|
||||
@mock.patch('nova.network.neutronv2.api.API._create_ports_for_instance')
|
||||
@mock.patch('nova.network.neutronv2.api.API._unbind_ports')
|
||||
def test_allocate_for_instance_ex1(self, mock_unbind, mock_create_ports,
|
||||
mock_populate):
|
||||
"""verify we will delete created ports
|
||||
if we fail to allocate all net resources.
|
||||
|
||||
Mox to raise exception when creating a second port.
|
||||
In this case, the code should delete the first created port.
|
||||
"""
|
||||
self.instance = fake_instance.fake_instance_obj(self.context,
|
||||
**self.instance)
|
||||
api = neutronapi.API()
|
||||
requested_networks = objects.NetworkRequestList(
|
||||
objects=[objects.NetworkRequest(network_id=net['id'])
|
||||
for net in (self.nets2[0], self.nets2[1])])
|
||||
neutronapi.get_client(mox.IgnoreArg()).AndReturn(self.moxed_client)
|
||||
self.moxed_client.list_networks(
|
||||
id=[uuids.my_netid1, uuids.my_netid2]).AndReturn(
|
||||
{'networks': self.nets2})
|
||||
mock_create_ports.return_value = [
|
||||
(request, (getattr(uuids, 'portid_%s' % request.network_id)))
|
||||
for request in requested_networks
|
||||
]
|
||||
neutronapi.get_client(
|
||||
mox.IgnoreArg(), admin=True).AndReturn(
|
||||
self.moxed_client)
|
||||
index = 0
|
||||
for network in self.nets2:
|
||||
binding_port_req_body = {
|
||||
'port': {
|
||||
'device_id': self.instance.uuid,
|
||||
'device_owner': 'compute:nova',
|
||||
},
|
||||
}
|
||||
port_req_body = {
|
||||
'port': {
|
||||
'network_id': network['id'],
|
||||
'admin_state_up': True,
|
||||
'tenant_id': self.instance.project_id,
|
||||
},
|
||||
}
|
||||
port_req_body['port'].update(binding_port_req_body['port'])
|
||||
port_id = getattr(uuids, 'portid_%s' % network['id'])
|
||||
port = {'id': port_id, 'mac_address': 'foo'}
|
||||
|
||||
if index == 0:
|
||||
self.moxed_client.update_port(port_id,
|
||||
MyComparator(binding_port_req_body)).AndReturn(
|
||||
{'port': port})
|
||||
else:
|
||||
NeutronOverQuota = exceptions.MacAddressInUseClient()
|
||||
self.moxed_client.update_port(port_id,
|
||||
MyComparator(binding_port_req_body)).AndRaise(
|
||||
NeutronOverQuota)
|
||||
index += 1
|
||||
self.moxed_client.delete_port(
|
||||
getattr(uuids, 'portid_%s' % self.nets2[0]['id']))
|
||||
self.moxed_client.delete_port(
|
||||
getattr(uuids, 'portid_%s' % self.nets2[1]['id']))
|
||||
self.mox.ReplayAll()
|
||||
self.assertRaises(exception.PortInUse,
|
||||
api.allocate_for_instance,
|
||||
self.context, self.instance, False,
|
||||
requested_networks=requested_networks)
|
||||
mock_unbind.assert_called_once_with(self.context, [],
|
||||
self.moxed_client, mock.ANY)
|
||||
|
||||
def test_allocate_for_instance_ex2(self):
|
||||
"""verify we have no port to delete
|
||||
if we fail to allocate the first net resource.
|
||||
|
@ -3204,6 +3035,83 @@ class TestNeutronv2WithMock(TestNeutronv2Base):
|
|||
self.assertEqual(len(expected_list_ports_calls),
|
||||
mocked_client.list_ports.call_count)
|
||||
|
||||
@mock.patch.object(neutronapi.API, '_get_physnet_tunneled_info',
|
||||
return_value=(None, False))
|
||||
@mock.patch.object(db_api, 'instance_info_cache_get')
|
||||
@mock.patch.object(db_api, 'instance_info_cache_update',
|
||||
return_value=fake_info_cache)
|
||||
@mock.patch.object(neutronapi, 'get_client')
|
||||
def test_get_instance_nw_info_without_subnet(
|
||||
self, mock_get_client, mock_cache_update, mock_cache_get,
|
||||
mock_get_physnet):
|
||||
# Test get instance_nw_info for a port without subnet.
|
||||
mocked_client = mock.create_autospec(client.Client)
|
||||
mock_get_client.return_value = mocked_client
|
||||
mocked_client.list_ports.return_value = {'ports': self.port_data3}
|
||||
mocked_client.list_networks.return_value = {'networks': self.nets1}
|
||||
|
||||
net_info_cache = []
|
||||
for port in self.port_data3:
|
||||
net_info_cache.append({"network": {"id": port['network_id']},
|
||||
"id": port['id']})
|
||||
self.instance['info_cache'] = self._fake_instance_info_cache(
|
||||
net_info_cache, self.instance['uuid'])
|
||||
mock_cache_get.return_value = self.instance['info_cache']
|
||||
|
||||
instance = self._fake_instance_object_with_info_cache(self.instance)
|
||||
nw_inf = self.api.get_instance_nw_info(self.context, instance)
|
||||
|
||||
id_suffix = 3
|
||||
self.assertEqual(0, len(nw_inf.fixed_ips()))
|
||||
self.assertEqual('my_netname1', nw_inf[0]['network']['label'])
|
||||
self.assertEqual(uuids.portid_3, nw_inf[0]['id'])
|
||||
self.assertEqual('my_mac%s' % id_suffix, nw_inf[0]['address'])
|
||||
self.assertEqual(0, len(nw_inf[0]['network']['subnets']))
|
||||
|
||||
mock_get_client.assert_has_calls([mock.call(mock.ANY, admin=True)] * 2,
|
||||
any_order=True)
|
||||
mock_cache_update.assert_called_once_with(
|
||||
mock.ANY, self.instance['uuid'], mock.ANY)
|
||||
mock_cache_get.assert_called_once_with(mock.ANY, self.instance['uuid'])
|
||||
mocked_client.list_ports.assert_called_once_with(
|
||||
tenant_id=self.instance['project_id'],
|
||||
device_id=self.instance['uuid'])
|
||||
mocked_client.list_networks.assert_called_once_with(
|
||||
id=[self.port_data1[0]['network_id']])
|
||||
mock_get_physnet.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, self.port_data1[0]['network_id'])
|
||||
|
||||
@mock.patch.object(neutronapi, 'get_client')
|
||||
def test_refresh_neutron_extensions_cache(self, mock_get_client):
|
||||
mocked_client = mock.create_autospec(client.Client)
|
||||
mock_get_client.return_value = mocked_client
|
||||
mocked_client.list_extensions.return_value = {
|
||||
'extensions': [{'name': constants.QOS_QUEUE}]}
|
||||
self.api._refresh_neutron_extensions_cache(self.context)
|
||||
self.assertEqual(
|
||||
{constants.QOS_QUEUE: {'name': constants.QOS_QUEUE}},
|
||||
self.api.extensions)
|
||||
mock_get_client.assert_called_once_with(self.context)
|
||||
mocked_client.list_extensions.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(neutronapi, 'get_client')
|
||||
def test_populate_neutron_extension_values_rxtx_factor(
|
||||
self, mock_get_client):
|
||||
mocked_client = mock.create_autospec(client.Client)
|
||||
mock_get_client.return_value = mocked_client
|
||||
mocked_client.list_extensions.return_value = {
|
||||
'extensions': [{'name': constants.QOS_QUEUE}]}
|
||||
flavor = flavors.get_default_flavor()
|
||||
flavor['rxtx_factor'] = 1
|
||||
instance = objects.Instance(system_metadata={})
|
||||
instance.flavor = flavor
|
||||
port_req_body = {'port': {}}
|
||||
self.api._populate_neutron_extension_values(self.context, instance,
|
||||
None, port_req_body)
|
||||
self.assertEqual(1, port_req_body['port']['rxtx_factor'])
|
||||
mock_get_client.assert_called_once_with(self.context)
|
||||
mocked_client.list_extensions.assert_called_once_with()
|
||||
|
||||
def test_allocate_for_instance_1(self):
|
||||
# Allocate one port in one network env.
|
||||
self._test_allocate_for_instance_with_virtual_interface(1)
|
||||
|
@ -3368,6 +3276,98 @@ class TestNeutronv2WithMock(TestNeutronv2Base):
|
|||
self._test_allocate_for_instance_with_virtual_interface(
|
||||
net_idx=1, requested_networks=requested_networks)
|
||||
|
||||
@mock.patch.object(neutronapi, 'get_client')
|
||||
def test_allocate_for_instance_no_networks(self, mock_get_client):
|
||||
"""verify the exception thrown when there are no networks defined."""
|
||||
self.instance = fake_instance.fake_instance_obj(self.context,
|
||||
**self.instance)
|
||||
mocked_client = mock.create_autospec(client.Client)
|
||||
mock_get_client.return_value = mocked_client
|
||||
mocked_client.list_networks.return_value = {
|
||||
'networks': model.NetworkInfo([])}
|
||||
nwinfo = self.api.allocate_for_instance(self.context, self.instance,
|
||||
False, None)
|
||||
self.assertEqual(0, len(nwinfo))
|
||||
mock_get_client.assert_called_once_with(self.context)
|
||||
mocked_client.list_networks.assert_has_calls([
|
||||
mock.call(tenant_id=self.instance.project_id, shared=False),
|
||||
mock.call(shared=True)])
|
||||
self.assertEqual(2, mocked_client.list_networks.call_count)
|
||||
|
||||
@mock.patch.object(neutronapi, 'get_client')
|
||||
@mock.patch(
|
||||
'nova.network.neutronv2.api.API._populate_neutron_extension_values')
|
||||
@mock.patch('nova.network.neutronv2.api.API._create_ports_for_instance')
|
||||
@mock.patch('nova.network.neutronv2.api.API._unbind_ports')
|
||||
def test_allocate_for_instance_ex1(self, mock_unbind, mock_create_ports,
|
||||
mock_populate, mock_get_client):
|
||||
"""verify we will delete created ports
|
||||
if we fail to allocate all net resources.
|
||||
|
||||
Mox to raise exception when creating a second port.
|
||||
In this case, the code should delete the first created port.
|
||||
"""
|
||||
self.instance = fake_instance.fake_instance_obj(self.context,
|
||||
**self.instance)
|
||||
requested_networks = objects.NetworkRequestList(
|
||||
objects=[objects.NetworkRequest(network_id=net['id'])
|
||||
for net in (self.nets2[0], self.nets2[1])])
|
||||
mocked_client = mock.create_autospec(client.Client)
|
||||
mock_get_client.return_value = mocked_client
|
||||
mocked_client.list_networks.return_value = {'networks': self.nets2}
|
||||
mock_create_ports.return_value = [
|
||||
(request, (getattr(uuids, 'portid_%s' % request.network_id)))
|
||||
for request in requested_networks
|
||||
]
|
||||
index = 0
|
||||
update_port_values = []
|
||||
expected_update_port_calls = []
|
||||
for network in self.nets2:
|
||||
binding_port_req_body = {
|
||||
'port': {
|
||||
'device_id': self.instance.uuid,
|
||||
'device_owner': 'compute:nova',
|
||||
},
|
||||
}
|
||||
port_req_body = {
|
||||
'port': {
|
||||
'network_id': network['id'],
|
||||
'admin_state_up': True,
|
||||
'tenant_id': self.instance.project_id,
|
||||
},
|
||||
}
|
||||
port_req_body['port'].update(binding_port_req_body['port'])
|
||||
port_id = getattr(uuids, 'portid_%s' % network['id'])
|
||||
port = {'id': port_id, 'mac_address': 'foo'}
|
||||
|
||||
if index == 0:
|
||||
update_port_values.append({'port': port})
|
||||
else:
|
||||
update_port_values.append(exceptions.MacAddressInUseClient())
|
||||
expected_update_port_calls.append(mock.call(
|
||||
port_id, binding_port_req_body))
|
||||
index += 1
|
||||
mocked_client.update_port.side_effect = update_port_values
|
||||
|
||||
self.assertRaises(exception.PortInUse,
|
||||
self.api.allocate_for_instance,
|
||||
self.context, self.instance, False,
|
||||
requested_networks=requested_networks)
|
||||
mock_unbind.assert_called_once_with(self.context, [],
|
||||
mocked_client, mock.ANY)
|
||||
mock_get_client.assert_has_calls([
|
||||
mock.call(self.context),
|
||||
mock.call(self.context, admin=True)], any_order=True)
|
||||
mocked_client.list_networks.assert_called_once_with(
|
||||
id=[uuids.my_netid1, uuids.my_netid2])
|
||||
mocked_client.update_port.assert_has_calls(expected_update_port_calls)
|
||||
self.assertEqual(len(expected_update_port_calls),
|
||||
mocked_client.update_port.call_count)
|
||||
mocked_client.delete_port.assert_has_calls([
|
||||
mock.call(getattr(uuids, 'portid_%s' % self.nets2[0]['id'])),
|
||||
mock.call(getattr(uuids, 'portid_%s' % self.nets2[1]['id']))])
|
||||
self.assertEqual(2, mocked_client.delete_port.call_count)
|
||||
|
||||
def test_allocate_for_instance_second_time(self):
|
||||
# Make sure that allocate_for_instance only returns ports that it
|
||||
# allocated during _that_ run.
|
||||
|
|
Loading…
Reference in New Issue