Merge "Remove use of contextlib.nested"
This commit is contained in:
commit
c363f7a2df
|
@ -13,6 +13,7 @@ Neutron Specific Commandments
|
|||
- [N321] Validate that jsonutils module is used instead of json
|
||||
- [N322] Detect common errors with assert_called_once_with
|
||||
- [N323] Enforce namespace-less imports for oslo libraries
|
||||
- [N324] Prevent use of deprecated contextlib.nested.
|
||||
|
||||
Creating Unit Tests
|
||||
-------------------
|
||||
|
|
|
@ -53,6 +53,7 @@ log_translation_hint = re.compile(
|
|||
oslo_namespace_imports_dot = re.compile(r"import[\s]+oslo[.][^\s]+")
|
||||
oslo_namespace_imports_from_dot = re.compile(r"from[\s]+oslo[.]")
|
||||
oslo_namespace_imports_from_root = re.compile(r"from[\s]+oslo[\s]+import[\s]+")
|
||||
contextlib_nested = re.compile(r"^with (contextlib\.)?nested\(")
|
||||
|
||||
|
||||
def validate_log_translations(logical_line, physical_line, filename):
|
||||
|
@ -134,9 +135,36 @@ def check_oslo_namespace_imports(logical_line):
|
|||
yield(0, msg)
|
||||
|
||||
|
||||
def check_no_contextlib_nested(logical_line, filename):
|
||||
msg = ("N324: contextlib.nested is deprecated. With Python 2.7 and later "
|
||||
"the with-statement supports multiple nested objects. See https://"
|
||||
"docs.python.org/2/library/contextlib.html#contextlib.nested for "
|
||||
"more information.")
|
||||
|
||||
# TODO(ankit): The following check is temporary.
|
||||
# A series of patches will be submitted to address
|
||||
# these issues. It should be removed completely
|
||||
# when bug 1428424 is closed.
|
||||
ignore_dirs = [
|
||||
"neutron/plugins/ml2",
|
||||
"neutron/tests/unit/agent/test_securitygroups_rpc.py",
|
||||
"neutron/tests/unit/api",
|
||||
"neutron/tests/unit/db",
|
||||
"neutron/tests/unit/extensions",
|
||||
"neutron/tests/unit/plugins",
|
||||
"neutron/tests/unit/scheduler"]
|
||||
for directory in ignore_dirs:
|
||||
if directory in filename:
|
||||
return
|
||||
|
||||
if contextlib_nested.match(logical_line):
|
||||
yield(0, msg)
|
||||
|
||||
|
||||
def factory(register):
|
||||
register(validate_log_translations)
|
||||
register(use_jsonutils)
|
||||
register(check_assert_called_once_with)
|
||||
register(no_translate_debug_logs)
|
||||
register(check_oslo_namespace_imports)
|
||||
register(check_no_contextlib_nested)
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import httplib
|
||||
|
||||
from oslo_config import cfg
|
||||
|
@ -80,7 +79,7 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
def test_ports_vif_details(self):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
with contextlib.nested(self.port(), self.port()):
|
||||
with self.port(), self.port():
|
||||
ctx = context.get_admin_context()
|
||||
ports = plugin.get_ports(ctx)
|
||||
self.assertEqual(len(ports), 2)
|
||||
|
@ -214,11 +213,9 @@ class PortBindingsHostTestCaseMixin(object):
|
|||
def test_ports_vif_host(self):
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
host_arg = {portbindings.HOST_ID: self.hostname}
|
||||
with contextlib.nested(
|
||||
self.port(name='name1',
|
||||
arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg),
|
||||
self.port(name='name2')):
|
||||
with self.port(name='name1',
|
||||
arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg), self.port(name='name2'):
|
||||
ctx = context.get_admin_context()
|
||||
ports = self._list('ports', neutron_context=ctx)['ports']
|
||||
self.assertEqual(2, len(ports))
|
||||
|
@ -240,11 +237,8 @@ class PortBindingsHostTestCaseMixin(object):
|
|||
def test_ports_vif_host_update(self):
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
host_arg = {portbindings.HOST_ID: self.hostname}
|
||||
with contextlib.nested(
|
||||
self.port(name='name1',
|
||||
arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg),
|
||||
self.port(name='name2')) as (port1, port2):
|
||||
with self.port(name='name1', arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg) as port1, self.port(name='name2') as port2:
|
||||
data = {'port': {portbindings.HOST_ID: 'testhosttemp'}}
|
||||
req = self.new_update_request('ports', data, port1['port']['id'])
|
||||
req.get_response(self.api)
|
||||
|
@ -277,14 +271,13 @@ class PortBindingsHostTestCaseMixin(object):
|
|||
def test_ports_vif_host_list(self):
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
host_arg = {portbindings.HOST_ID: self.hostname}
|
||||
with contextlib.nested(
|
||||
self.port(name='name1',
|
||||
arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg),
|
||||
self.port(name='name2'),
|
||||
self.port(name='name3',
|
||||
arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg),) as (port1, _port2, port3):
|
||||
with self.port(name='name1',
|
||||
arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg) as port1,\
|
||||
self.port(name='name2'),\
|
||||
self.port(name='name3',
|
||||
arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg) as port3:
|
||||
self._test_list_resources(
|
||||
'port', (port1, port3),
|
||||
query_params='%s=%s' % (portbindings.HOST_ID, self.hostname))
|
||||
|
@ -335,11 +328,8 @@ class PortBindingsVnicTestCaseMixin(object):
|
|||
def test_ports_vnic_type(self):
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
|
||||
with contextlib.nested(
|
||||
self.port(name='name1',
|
||||
arg_list=(portbindings.VNIC_TYPE,),
|
||||
**vnic_arg),
|
||||
self.port(name='name2')):
|
||||
with self.port(name='name1', arg_list=(portbindings.VNIC_TYPE,),
|
||||
**vnic_arg), self.port(name='name2'):
|
||||
ctx = context.get_admin_context()
|
||||
ports = self._list('ports', neutron_context=ctx)['ports']
|
||||
self.assertEqual(2, len(ports))
|
||||
|
@ -362,14 +352,13 @@ class PortBindingsVnicTestCaseMixin(object):
|
|||
def test_ports_vnic_type_list(self):
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
|
||||
with contextlib.nested(
|
||||
self.port(name='name1',
|
||||
arg_list=(portbindings.VNIC_TYPE,),
|
||||
**vnic_arg),
|
||||
self.port(name='name2'),
|
||||
self.port(name='name3',
|
||||
arg_list=(portbindings.VNIC_TYPE,),
|
||||
**vnic_arg),) as (port1, port2, port3):
|
||||
with self.port(name='name1',
|
||||
arg_list=(portbindings.VNIC_TYPE,),
|
||||
**vnic_arg) as port1,\
|
||||
self.port(name='name2') as port2,\
|
||||
self.port(name='name3',
|
||||
arg_list=(portbindings.VNIC_TYPE,),
|
||||
**vnic_arg) as port3:
|
||||
self._test_list_resources(
|
||||
'port', (port1, port2, port3),
|
||||
query_params='%s=%s' % (portbindings.VNIC_TYPE,
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import copy
|
||||
import sys
|
||||
import uuid
|
||||
|
@ -983,12 +982,8 @@ class TestDhcpPluginApiProxy(base.BaseTestCase):
|
|||
proxy = dhcp_agent.DhcpPluginApi('foo', ctxt, None)
|
||||
proxy.host = 'foo'
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch.object(proxy.client, 'call'),
|
||||
mock.patch.object(proxy.client, 'prepare'),
|
||||
) as (
|
||||
rpc_mock, prepare_mock
|
||||
):
|
||||
with mock.patch.object(proxy.client, 'call') as rpc_mock,\
|
||||
mock.patch.object(proxy.client, 'prepare') as prepare_mock:
|
||||
prepare_mock.return_value = proxy.client
|
||||
rpc_mock.return_value = kwargs.pop('return_value', [])
|
||||
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import copy
|
||||
|
||||
import eventlet
|
||||
|
@ -1202,12 +1201,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
ri.fip_ns.subscribe = mock.Mock()
|
||||
ri.fip_ns.agent_router_gateway = mock.Mock()
|
||||
|
||||
with contextlib.nested(mock.patch.object(ri,
|
||||
'get_floating_ips'),
|
||||
mock.patch.object(
|
||||
ri, 'get_floating_agent_gw_interface')
|
||||
) as (fips,
|
||||
fip_gw_port):
|
||||
with mock.patch.object(ri, 'get_floating_ips') as fips,\
|
||||
mock.patch.object(ri, 'get_floating_agent_gw_interface'
|
||||
) as fip_gw_port:
|
||||
fips.return_value = fake_floatingips
|
||||
fip_gw_port.return_value = agent_gateway_port[0]
|
||||
ri.create_dvr_fip_interfaces(ext_gw_port)
|
||||
|
@ -1254,12 +1250,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
ri.rtr_fip_subnet = None
|
||||
ri.dist_fip_count = 1
|
||||
|
||||
with contextlib.nested(mock.patch.object(
|
||||
ri, 'get_floating_ips'),
|
||||
mock.patch.object(
|
||||
ri, 'get_floating_agent_gw_interface')
|
||||
) as (fips,
|
||||
fip_gw_port):
|
||||
with mock.patch.object(ri, 'get_floating_ips') as fips,\
|
||||
mock.patch.object(ri, 'get_floating_agent_gw_interface'
|
||||
) as fip_gw_port:
|
||||
fips.return_value = fake_floatingips
|
||||
fip_gw_port.return_value = agent_gateway_port[0]
|
||||
ri.create_dvr_fip_interfaces(ext_gw_port)
|
||||
|
@ -1850,18 +1843,14 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
self.assertEqual(len(internal_ports), 1)
|
||||
internal_port = internal_ports[0]
|
||||
|
||||
with contextlib.nested(mock.patch.object(ri,
|
||||
'internal_network_removed'),
|
||||
mock.patch.object(ri,
|
||||
'internal_network_added'),
|
||||
mock.patch.object(ri,
|
||||
'external_gateway_removed'),
|
||||
mock.patch.object(ri,
|
||||
'external_gateway_added')
|
||||
) as (internal_network_removed,
|
||||
internal_network_added,
|
||||
external_gateway_removed,
|
||||
external_gateway_added):
|
||||
with mock.patch.object(ri, 'internal_network_removed'
|
||||
) as internal_network_removed,\
|
||||
mock.patch.object(ri, 'internal_network_added'
|
||||
) as internal_network_added,\
|
||||
mock.patch.object(ri, 'external_gateway_removed'
|
||||
) as external_gateway_removed,\
|
||||
mock.patch.object(ri, 'external_gateway_added'
|
||||
) as external_gateway_added:
|
||||
|
||||
ri.process(agent)
|
||||
|
||||
|
|
|
@ -12,8 +12,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import eventlet.event
|
||||
import eventlet.queue
|
||||
import eventlet.timeout
|
||||
|
@ -156,16 +154,13 @@ class TestAsyncProcess(base.BaseTestCase):
|
|||
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
|
||||
|
||||
def _test__kill(self, respawning, pid=None):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.proc, '_kill_event'),
|
||||
with mock.patch.object(self.proc, '_kill_event'
|
||||
) as mock_kill_event,\
|
||||
mock.patch.object(utils, 'get_root_helper_child_pid',
|
||||
return_value=pid),
|
||||
mock.patch.object(self.proc, '_kill_process'),
|
||||
mock.patch.object(self.proc, '_process')) as (
|
||||
mock_kill_event,
|
||||
mock_get_child_pid,
|
||||
mock_kill_process,
|
||||
mock_process):
|
||||
return_value=pid),\
|
||||
mock.patch.object(self.proc, '_kill_process'
|
||||
) as mock_kill_process,\
|
||||
mock.patch.object(self.proc, '_process'):
|
||||
self.proc._kill(respawning)
|
||||
|
||||
if respawning:
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# under the License.
|
||||
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
@ -70,33 +69,32 @@ class TestPrivileges(base.BaseTestCase):
|
|||
daemon.setgid, '321')
|
||||
log_critical.assert_once_with(mock.ANY)
|
||||
|
||||
def test_drop_no_privileges(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(os, 'setgroups'),
|
||||
mock.patch.object(daemon, 'setgid'),
|
||||
mock.patch.object(daemon, 'setuid')) as mocks:
|
||||
daemon.drop_privileges()
|
||||
for cursor in mocks:
|
||||
self.assertFalse(cursor.called)
|
||||
@mock.patch.object(os, 'setgroups')
|
||||
@mock.patch.object(daemon, 'setgid')
|
||||
@mock.patch.object(daemon, 'setuid')
|
||||
def test_drop_no_privileges(self, mock_setuid, mock_setgid,
|
||||
mock_setgroups):
|
||||
daemon.drop_privileges()
|
||||
for cursor in (mock_setuid, mock_setgid, mock_setgroups):
|
||||
self.assertFalse(cursor.called)
|
||||
|
||||
def _test_drop_privileges(self, user=None, group=None):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(os, 'geteuid', return_value=0),
|
||||
mock.patch.object(os, 'setgroups'),
|
||||
mock.patch.object(daemon, 'setgid'),
|
||||
mock.patch.object(daemon, 'setuid')) as (
|
||||
geteuid, setgroups, setgid, setuid):
|
||||
daemon.drop_privileges(user=user, group=group)
|
||||
if user:
|
||||
setuid.assert_called_once_with(user)
|
||||
else:
|
||||
self.assertFalse(setuid.called)
|
||||
if group:
|
||||
setgroups.assert_called_once_with([])
|
||||
setgid.assert_called_once_with(group)
|
||||
else:
|
||||
self.assertFalse(setgroups.called)
|
||||
self.assertFalse(setgid.called)
|
||||
@mock.patch.object(os, 'geteuid', return_value=0)
|
||||
@mock.patch.object(os, 'setgroups')
|
||||
@mock.patch.object(daemon, 'setgid')
|
||||
@mock.patch.object(daemon, 'setuid')
|
||||
def _test_drop_privileges(self, setuid, setgid, setgroups,
|
||||
geteuid, user=None, group=None):
|
||||
daemon.drop_privileges(user=user, group=group)
|
||||
if user:
|
||||
setuid.assert_called_once_with(user)
|
||||
else:
|
||||
self.assertFalse(setuid.called)
|
||||
if group:
|
||||
setgroups.assert_called_once_with([])
|
||||
setgid.assert_called_once_with(group)
|
||||
else:
|
||||
self.assertFalse(setgroups.called)
|
||||
self.assertFalse(setgid.called)
|
||||
|
||||
def test_drop_user_privileges(self):
|
||||
self._test_drop_privileges(user='user')
|
||||
|
|
|
@ -12,8 +12,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
import webob
|
||||
|
@ -204,10 +202,12 @@ class TestMetadataProxyHandlerCache(TestMetadataProxyHandlerBase):
|
|||
remote_address = 'remote-address'
|
||||
expected = ['port1']
|
||||
networks = (network_id,)
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.handler, '_get_ports_for_remote_address'),
|
||||
mock.patch.object(self.handler, '_get_router_networks')
|
||||
) as (mock_get_ip_addr, mock_get_router_networks):
|
||||
with mock.patch.object(self.handler,
|
||||
'_get_ports_for_remote_address'
|
||||
) as mock_get_ip_addr,\
|
||||
mock.patch.object(self.handler,
|
||||
'_get_router_networks'
|
||||
) as mock_get_router_networks:
|
||||
mock_get_ip_addr.return_value = expected
|
||||
ports = self.handler._get_ports(remote_address, network_id,
|
||||
router_id)
|
||||
|
@ -221,14 +221,14 @@ class TestMetadataProxyHandlerCache(TestMetadataProxyHandlerBase):
|
|||
remote_address = 'remote-address'
|
||||
expected = ['port1']
|
||||
networks = ('network1', 'network2')
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.handler,
|
||||
'_get_ports_for_remote_address',
|
||||
return_value=expected),
|
||||
mock.patch.object(self.handler,
|
||||
'_get_router_networks',
|
||||
return_value=networks)
|
||||
) as (mock_get_ip_addr, mock_get_router_networks):
|
||||
with mock.patch.object(self.handler,
|
||||
'_get_ports_for_remote_address',
|
||||
return_value=expected
|
||||
) as mock_get_ip_addr,\
|
||||
mock.patch.object(self.handler,
|
||||
'_get_router_networks',
|
||||
return_value=networks
|
||||
) as mock_get_router_networks:
|
||||
ports = self.handler._get_ports(remote_address,
|
||||
router_id=router_id)
|
||||
mock_get_router_networks.called_once_with(router_id)
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
|
@ -96,13 +94,11 @@ class TestMetadataDriverProcess(base.BaseTestCase):
|
|||
cfg.CONF.set_override('debug', True)
|
||||
|
||||
agent = l3_agent.L3NATAgent('localhost')
|
||||
with contextlib.nested(
|
||||
mock.patch('os.geteuid', return_value=self.EUID),
|
||||
mock.patch('os.getegid', return_value=self.EGID),
|
||||
with mock.patch('os.geteuid', return_value=self.EUID),\
|
||||
mock.patch('os.getegid', return_value=self.EGID),\
|
||||
mock.patch(is_effective_user,
|
||||
side_effect=fake_is_effective_user),
|
||||
mock.patch(ip_class_path)) as (
|
||||
geteuid, getegid, is_effective_user, ip_mock):
|
||||
side_effect=fake_is_effective_user),\
|
||||
mock.patch(ip_class_path) as ip_mock:
|
||||
agent.metadata_driver.spawn_monitored_metadata_proxy(
|
||||
agent.process_monitor,
|
||||
router_ns,
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.common import constants as n_const
|
||||
|
@ -63,10 +61,9 @@ class TestL2populationRpcCallBackTunnelMixin(
|
|||
self.assertEqual(expected, results)
|
||||
|
||||
def test_fdb_add_tun(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.fakeagent, 'setup_tunnel_port'),
|
||||
mock.patch.object(self.fakeagent, 'add_fdb_flow'),
|
||||
) as (mock_setup_tunnel_port, mock_add_fdb_flow):
|
||||
with mock.patch.object(self.fakeagent, 'setup_tunnel_port'),\
|
||||
mock.patch.object(self.fakeagent, 'add_fdb_flow'
|
||||
) as mock_add_fdb_flow:
|
||||
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
|
||||
self.agent_ports,
|
||||
self._tunnel_port_lookup)
|
||||
|
@ -84,11 +81,11 @@ class TestL2populationRpcCallBackTunnelMixin(
|
|||
def test_fdb_add_tun_non_existence_key_in_ofports(self):
|
||||
ofport = self.lvm1.network_type + '0a0a0a0a'
|
||||
del self.ofports[self.type_gre][self.ports[1].ip]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.fakeagent, 'setup_tunnel_port',
|
||||
return_value=ofport),
|
||||
mock.patch.object(self.fakeagent, 'add_fdb_flow'),
|
||||
) as (mock_setup_tunnel_port, mock_add_fdb_flow):
|
||||
with mock.patch.object(self.fakeagent, 'setup_tunnel_port',
|
||||
return_value=ofport
|
||||
) as mock_setup_tunnel_port,\
|
||||
mock.patch.object(self.fakeagent, 'add_fdb_flow'
|
||||
) as mock_add_fdb_flow:
|
||||
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
|
||||
self.agent_ports,
|
||||
self._tunnel_port_lookup)
|
||||
|
@ -107,11 +104,11 @@ class TestL2populationRpcCallBackTunnelMixin(
|
|||
|
||||
def test_fdb_add_tun_unavailable_ofport(self):
|
||||
del self.ofports[self.type_gre][self.ports[1].ip]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.fakeagent, 'setup_tunnel_port',
|
||||
return_value=0),
|
||||
mock.patch.object(self.fakeagent, 'add_fdb_flow'),
|
||||
) as (mock_setup_tunnel_port, mock_add_fdb_flow):
|
||||
with mock.patch.object(self.fakeagent, 'setup_tunnel_port',
|
||||
return_value=0
|
||||
) as mock_setup_tunnel_port,\
|
||||
mock.patch.object(self.fakeagent, 'add_fdb_flow'
|
||||
) as mock_add_fdb_flow:
|
||||
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
|
||||
self.agent_ports,
|
||||
self._tunnel_port_lookup)
|
||||
|
@ -145,10 +142,10 @@ class TestL2populationRpcCallBackTunnelMixin(
|
|||
|
||||
def test_fdb_remove_tun_flooding_entry(self):
|
||||
self.agent_ports[self.ports[1].ip] = [n_const.FLOODING_ENTRY]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.fakeagent, 'del_fdb_flow'),
|
||||
mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'),
|
||||
) as (mock_del_fdb_flow, mock_cleanup_tunnel_port):
|
||||
with mock.patch.object(self.fakeagent, 'del_fdb_flow'
|
||||
) as mock_del_fdb_flow,\
|
||||
mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'
|
||||
) as mock_cleanup_tunnel_port:
|
||||
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
|
||||
self.agent_ports,
|
||||
self._tunnel_port_lookup)
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import mock
|
||||
from oslo_context import context as oslo_context
|
||||
import oslo_messaging
|
||||
|
@ -28,12 +26,8 @@ class AgentRPCPluginApi(base.BaseTestCase):
|
|||
agent = rpc.PluginApi('fake_topic')
|
||||
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
|
||||
expect_val = 'foo'
|
||||
with contextlib.nested(
|
||||
mock.patch.object(agent.client, 'call'),
|
||||
mock.patch.object(agent.client, 'prepare'),
|
||||
) as (
|
||||
mock_call, mock_prepare
|
||||
):
|
||||
with mock.patch.object(agent.client, 'call') as mock_call,\
|
||||
mock.patch.object(agent.client, 'prepare') as mock_prepare:
|
||||
mock_prepare.return_value = agent.client
|
||||
mock_call.return_value = expect_val
|
||||
func_obj = getattr(agent, method)
|
||||
|
@ -54,12 +48,8 @@ class AgentRPCPluginApi(base.BaseTestCase):
|
|||
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
|
||||
expect_val_get_device_details = 'foo'
|
||||
expect_val = [expect_val_get_device_details]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(agent.client, 'call'),
|
||||
mock.patch.object(agent.client, 'prepare'),
|
||||
) as (
|
||||
mock_call, mock_prepare
|
||||
):
|
||||
with mock.patch.object(agent.client, 'call') as mock_call, \
|
||||
mock.patch.object(agent.client, 'prepare') as mock_prepare:
|
||||
mock_prepare.return_value = agent.client
|
||||
mock_call.side_effect = [oslo_messaging.UnsupportedVersion('1.2'),
|
||||
expect_val_get_device_details]
|
||||
|
@ -79,13 +69,10 @@ class AgentPluginReportState(base.BaseTestCase):
|
|||
topic = 'test'
|
||||
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||
expected_agent_state = {'agent': 'test'}
|
||||
with contextlib.nested(
|
||||
mock.patch.object(reportStateAPI.client, 'call'),
|
||||
mock.patch.object(reportStateAPI.client, 'cast'),
|
||||
mock.patch.object(reportStateAPI.client, 'prepare'),
|
||||
) as (
|
||||
mock_call, mock_cast, mock_prepare
|
||||
):
|
||||
with mock.patch.object(reportStateAPI.client, 'call') as mock_call, \
|
||||
mock.patch.object(reportStateAPI.client, 'cast'), \
|
||||
mock.patch.object(reportStateAPI.client, 'prepare'
|
||||
) as mock_prepare:
|
||||
mock_prepare.return_value = reportStateAPI.client
|
||||
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
|
||||
reportStateAPI.report_state(ctxt, expected_agent_state,
|
||||
|
@ -100,13 +87,11 @@ class AgentPluginReportState(base.BaseTestCase):
|
|||
topic = 'test'
|
||||
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||
expected_agent_state = {'agent': 'test'}
|
||||
with contextlib.nested(
|
||||
mock.patch.object(reportStateAPI.client, 'call'),
|
||||
mock.patch.object(reportStateAPI.client, 'cast'),
|
||||
mock.patch.object(reportStateAPI.client, 'prepare'),
|
||||
) as (
|
||||
mock_call, mock_cast, mock_prepare
|
||||
):
|
||||
with mock.patch.object(reportStateAPI.client, 'call'), \
|
||||
mock.patch.object(reportStateAPI.client, 'cast'
|
||||
) as mock_cast, \
|
||||
mock.patch.object(reportStateAPI.client, 'prepare'
|
||||
) as mock_prepare:
|
||||
mock_prepare.return_value = reportStateAPI.client
|
||||
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
|
||||
reportStateAPI.report_state(ctxt, expected_agent_state)
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
import mock
|
||||
|
||||
|
@ -26,30 +25,29 @@ from neutron.tests import base
|
|||
|
||||
class TestOVSCleanup(base.BaseTestCase):
|
||||
|
||||
def test_main(self):
|
||||
@mock.patch('neutron.common.config.setup_logging')
|
||||
@mock.patch('neutron.cmd.ovs_cleanup.setup_conf')
|
||||
@mock.patch('neutron.agent.common.ovs_lib.BaseOVS.get_bridges')
|
||||
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
|
||||
@mock.patch.object(util, 'collect_neutron_ports')
|
||||
@mock.patch.object(util, 'delete_neutron_ports')
|
||||
def test_main(self, mock_delete, mock_collect, mock_ovs,
|
||||
mock_get_bridges, mock_conf, mock_logging):
|
||||
bridges = ['br-int', 'br-ex']
|
||||
ports = ['p1', 'p2', 'p3']
|
||||
conf = mock.Mock()
|
||||
conf.ovs_all_ports = False
|
||||
conf.ovs_integration_bridge = 'br-int'
|
||||
conf.external_network_bridge = 'br-ex'
|
||||
with contextlib.nested(
|
||||
mock.patch('neutron.common.config.setup_logging'),
|
||||
mock.patch('neutron.cmd.ovs_cleanup.setup_conf',
|
||||
return_value=conf),
|
||||
mock.patch('neutron.agent.common.ovs_lib.BaseOVS.get_bridges',
|
||||
return_value=bridges),
|
||||
mock.patch('neutron.agent.common.ovs_lib.OVSBridge'),
|
||||
mock.patch.object(util, 'collect_neutron_ports',
|
||||
return_value=ports),
|
||||
mock.patch.object(util, 'delete_neutron_ports')
|
||||
) as (_log, _conf, _get, ovs, collect, delete):
|
||||
with mock.patch('neutron.common.config.setup_logging'):
|
||||
util.main()
|
||||
ovs.assert_has_calls([mock.call().delete_ports(
|
||||
all_ports=False)])
|
||||
collect.assert_called_once_with(set(bridges))
|
||||
delete.assert_called_once_with(ports)
|
||||
mock_conf.return_value = conf
|
||||
mock_get_bridges.return_value = bridges
|
||||
mock_collect.return_value = ports
|
||||
|
||||
util.main()
|
||||
mock_ovs.assert_has_calls([mock.call().delete_ports(
|
||||
all_ports=False)])
|
||||
mock_collect.assert_called_once_with(set(bridges))
|
||||
mock_delete.assert_called_once_with(ports)
|
||||
|
||||
def test_collect_neutron_ports(self):
|
||||
port1 = ovs_lib.VifPort('tap1234', 1, uuidutils.generate_uuid(),
|
||||
|
@ -66,18 +64,18 @@ class TestOVSCleanup(base.BaseTestCase):
|
|||
ret = util.collect_neutron_ports(bridges)
|
||||
self.assertEqual(ret, portnames)
|
||||
|
||||
def test_delete_neutron_ports(self):
|
||||
@mock.patch.object(ip_lib, 'IPDevice')
|
||||
def test_delete_neutron_ports(self, mock_ip):
|
||||
ports = ['tap1234', 'tap5678', 'tap09ab']
|
||||
port_found = [True, False, True]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(ip_lib, 'device_exists',
|
||||
side_effect=port_found),
|
||||
mock.patch.object(ip_lib, 'IPDevice')
|
||||
) as (device_exists, ip_dev):
|
||||
|
||||
with mock.patch.object(
|
||||
ip_lib, 'device_exists',
|
||||
side_effect=port_found) as device_exists:
|
||||
util.delete_neutron_ports(ports)
|
||||
device_exists.assert_has_calls([mock.call(p) for p in ports])
|
||||
ip_dev.assert_has_calls(
|
||||
mock_ip.assert_has_calls(
|
||||
[mock.call('tap1234'),
|
||||
mock.call().link.delete(),
|
||||
mock.call('tap09ab'),
|
||||
mock.call().link.delete()])
|
||||
mock.call().link.delete(),
|
||||
mock.call('tap09ab'),
|
||||
mock.call().link.delete()])
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
"""Test of Policy Engine For Neutron"""
|
||||
|
||||
import contextlib
|
||||
import StringIO
|
||||
import urllib2
|
||||
|
||||
|
@ -654,11 +653,9 @@ class NeutronPolicyTestCase(base.BaseTestCase):
|
|||
'create_fake_resource:fake_resources',
|
||||
'create_fake_resource:attr:sub_attr_1'], rules)
|
||||
|
||||
def test_log_rule_list(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(policy.LOG, 'isEnabledFor', return_value=True),
|
||||
mock.patch.object(policy.LOG, 'debug')
|
||||
) as (is_e, dbg):
|
||||
policy.log_rule_list(common_policy.RuleCheck('rule', 'create_'))
|
||||
self.assertTrue(is_e.called)
|
||||
self.assertTrue(dbg.called)
|
||||
@mock.patch.object(policy.LOG, 'isEnabledFor', return_value=True)
|
||||
@mock.patch.object(policy.LOG, 'debug')
|
||||
def test_log_rule_list(self, mock_debug, mock_is_e):
|
||||
policy.log_rule_list(common_policy.RuleCheck('rule', 'create_'))
|
||||
self.assertTrue(mock_is_e.called)
|
||||
self.assertTrue(mock_debug.called)
|
||||
|
|
Loading…
Reference in New Issue