Fix deprecation warning for multiple context managers part 2

Fixes the following deprecation warning in driver and service unit
tests:

        DeprecationWarning: With-statements now directly support
        multiple context managers

Removes nested helper from neutron_lbaas.tests.

Change-Id: I15834dc99a88c0e3c528caacfc630a93112c70a7
This commit is contained in:
Dustin Lundquist 2016-07-13 19:00:47 -07:00
parent ebaf75b063
commit f3e9f0ae51
15 changed files with 416 additions and 576 deletions

View File

@ -1,26 +0,0 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import six
if six.PY3:
@contextlib.contextmanager
def nested(*contexts):
with contextlib.ExitStack() as stack:
yield [stack.enter_context(c) for c in contexts]
else:
nested = contextlib.nested

View File

@ -18,7 +18,6 @@ from oslo_config import cfg
from neutron_lbaas.agent import agent
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
class TestLbaasService(base.BaseTestCase):
@ -36,13 +35,11 @@ class TestLbaasService(base.BaseTestCase):
def test_main(self):
logging_str = 'neutron.agent.common.config.setup_logging'
with nested(
mock.patch(logging_str),
mock.patch.object(agent.service, 'launch'),
mock.patch('sys.argv'),
mock.patch.object(agent.manager, 'LbaasAgentManager'),
mock.patch.object(cfg.CONF, 'register_opts')
) as (mock_logging, mock_launch, sys_argv, mgr_cls, ro):
with mock.patch(logging_str), \
mock.patch.object(agent.service, 'launch') as mock_launch, \
mock.patch('sys.argv'), \
mock.patch.object(agent.manager, 'LbaasAgentManager'), \
mock.patch.object(cfg.CONF, 'register_opts'):
agent.main()
mock_launch.assert_called_once_with(mock.ANY, mock.ANY)

View File

@ -23,7 +23,6 @@ from neutron_lbaas.db.loadbalancer import models
from neutron_lbaas.drivers.common import agent_driver_base
from neutron_lbaas.extensions import loadbalancerv2
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
from neutron_lbaas.tests.unit.db.loadbalancer import test_db_loadbalancerv2
@ -61,12 +60,8 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
self.assertEqual('topic', self.api.client.target.topic)
def _call_test_helper(self, method_name, method_args):
with nested(
mock.patch.object(self.api.client, 'cast'),
mock.patch.object(self.api.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
with mock.patch.object(self.api.client, 'cast') as rpc_mock, \
mock.patch.object(self.api.client, 'prepare') as prepare_mock:
prepare_mock.return_value = self.api.client
getattr(self.api, method_name)(mock.sentinel.context,
host='host',

View File

@ -23,7 +23,6 @@ from neutron_lib import exceptions
from neutron_lbaas.drivers.haproxy import namespace_driver
from neutron_lbaas.services.loadbalancer import data_models
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
class TestHaproxyNSDriver(base.BaseTestCase):
@ -159,11 +158,9 @@ class TestHaproxyNSDriver(base.BaseTestCase):
'hrsp_3xx,hrsp_4xx,hrsp_5xx,hrsp_other,hanafail,'
'req_rate,req_rate_max,req_tot,cli_abrt,srv_abrt,'
'\n')
with nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch('socket.socket'),
mock.patch('os.path.exists'),
) as (gsp, mocket, path_exists):
with mock.patch.object(self.driver, '_get_state_file_path') as gsp, \
mock.patch('socket.socket') as mocket, \
mock.patch('os.path.exists') as path_exists:
gsp.side_effect = lambda x, y, z: '/pool/' + y
path_exists.return_value = True
mocket.return_value = mocket

View File

@ -29,7 +29,6 @@ from neutron_lbaas.drivers.radware import exceptions as r_exc
from neutron_lbaas.drivers.radware import v2_driver
from neutron_lbaas.extensions import loadbalancerv2
from neutron_lbaas.services.loadbalancer import constants as lb_con
from neutron_lbaas.tests import nested
from neutron_lbaas.tests.unit.db.loadbalancer import test_db_loadbalancerv2
GET_200 = ('/api/workflow/', '/api/workflowTemplate')
@ -711,13 +710,12 @@ class TestLBaaSDriver(TestLBaaSDriverBase):
vip_address=WF_APPLY_PARAMS['parameters']['vip_address']
) as lb:
lb_id = lb['loadbalancer']['id']
with nested(
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'cert_parser', autospec=True),
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'CERT_MANAGER_PLUGIN.CertManager',
autospec=True)
) as (cert_parser_mock, cert_manager_mock):
with mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'cert_parser',
autospec=True) as cert_parser_mock, \
mock.patch('neutron_lbaas.services.loadbalancer.'
'plugin.CERT_MANAGER_PLUGIN.CertManager',
autospec=True) as cert_manager_mock:
cert_mock = mock.Mock(spec=cert_manager.Cert)
cert_mock.get_certificate.return_value = 'certificate'
cert_mock.get_intermediates.return_value = 'intermediates'
@ -751,13 +749,10 @@ class TestLBaaSDriver(TestLBaaSDriverBase):
protocol=lb_con.PROTOCOL_HTTP,
loadbalancer_id=lb_id) as listener:
listener_id = listener['listener']['id']
with nested(
self.pool(
protocol=lb_con.PROTOCOL_HTTP,
listener_id=listener_id),
self.pool(
protocol=lb_con.PROTOCOL_HTTP,
loadbalancer_id=lb_id)) as (def_pool, pol_pool):
with self.pool(protocol=lb_con.PROTOCOL_HTTP,
listener_id=listener_id) as def_pool, \
self.pool(protocol=lb_con.PROTOCOL_HTTP,
loadbalancer_id=lb_id) as pol_pool:
def_pool_id = def_pool['pool']['id']
pol_pool_id = pol_pool['pool']['id']
with self.l7policy(
@ -767,20 +762,15 @@ class TestLBaaSDriver(TestLBaaSDriverBase):
policy_id = policy['l7policy']['id']
self.driver_rest_call_mock.reset_mock()
with nested(
self.l7policy_rule(
l7policy_id=policy_id,
key=u'key1', value=u'val1'),
self.l7policy_rule(
l7policy_id=policy_id,
key=u'key2', value=u'val2'),
self.member(
pool_id=def_pool_id,
subnet=vip_sub,
address=u'10.0.1.10')) as (
rule1, rule2,
def_m):
with self.l7policy_rule(l7policy_id=policy_id,
key=u'key1',
value=u'val1'), \
self.l7policy_rule(l7policy_id=policy_id,
key=u'key2',
value=u'val2'), \
self.member(pool_id=def_pool_id,
subnet=vip_sub,
address=u'10.0.1.10'):
self.driver_rest_call_mock.reset_mock()
rest_call_function_mock.__dict__.update(
{'WORKFLOW_MISSING': False})
@ -804,12 +794,12 @@ class TestLBaaSDriver(TestLBaaSDriverBase):
with self.pool(
protocol=lb_con.PROTOCOL_HTTP,
listener_id=listener_id) as pool:
with nested(
self.member(pool_id=pool['pool']['id'],
subnet=vip_sub, address='10.0.1.10'),
self.member(pool_id=pool['pool']['id'],
subnet=vip_sub, address='10.0.1.20')):
with self.member(pool_id=pool['pool']['id'],
subnet=vip_sub,
address='10.0.1.10'), \
self.member(pool_id=pool['pool']['id'],
subnet=vip_sub,
address='10.0.1.20'):
self.driver_rest_call_mock.reset_mock()
rest_call_function_mock.__dict__.update(
{'WORKFLOW_MISSING': False})
@ -831,19 +821,18 @@ class TestLBaaSDriver(TestLBaaSDriverBase):
with self.pool(
protocol='HTTP',
listener_id=listener_id) as pool:
with nested(
self.member(pool_id=pool['pool']['id'],
subnet=vip_sub, address='10.0.1.10'),
self.member(pool_id=pool['pool']['id'],
subnet=vip_sub, address='10.0.1.20')):
with self.member(pool_id=pool['pool']['id'],
subnet=vip_sub,
address='10.0.1.10'), \
self.member(pool_id=pool['pool']['id'],
subnet=vip_sub,
address='10.0.1.20'):
self.compare_apply_call()
def test_build_objects_graph_two_legs_full(self):
with nested(
self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='20.0.0.0/24'),
self.subnet(cidr='30.0.0.0/24')
) as (vip_sub, member_sub1, member_sub2):
with self.subnet(cidr='10.0.0.0/24') as vip_sub, \
self.subnet(cidr='20.0.0.0/24') as member_sub1, \
self.subnet(cidr='30.0.0.0/24'):
with self.loadbalancer(
subnet=vip_sub,
vip_address=WF_APPLY_PARAMS['parameters']['vip_address']

View File

@ -17,7 +17,6 @@ from oslo_config import cfg
from neutron_lbaas.services.loadbalancer.agent import agent
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
class TestLbaasService(base.BaseTestCase):
@ -35,13 +34,11 @@ class TestLbaasService(base.BaseTestCase):
def test_main(self):
logging_str = 'neutron.agent.common.config.setup_logging'
with nested(
mock.patch(logging_str),
mock.patch.object(agent.service, 'launch'),
mock.patch('sys.argv'),
mock.patch.object(agent.manager, 'LbaasAgentManager'),
mock.patch.object(cfg.CONF, 'register_opts')
) as (mock_logging, mock_launch, sys_argv, mgr_cls, ro):
with mock.patch(logging_str), \
mock.patch.object(agent.service, 'launch') as mock_launch, \
mock.patch('sys.argv'), \
mock.patch.object(agent.manager, 'LbaasAgentManager'), \
mock.patch.object(cfg.CONF, 'register_opts'):
agent.main()
mock_launch.assert_called_once_with(mock.ANY, mock.ANY)

View File

@ -19,7 +19,6 @@ from neutron.plugins.common import constants
from neutron_lbaas.services.loadbalancer.agent import agent_manager as manager
from neutron_lbaas.services.loadbalancer import constants as l_const
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
class TestManager(base.BaseTestCase):
@ -82,11 +81,8 @@ class TestManager(base.BaseTestCase):
self.assertTrue(self.log.exception.called)
def _sync_state_helper(self, ready, reloaded, destroyed):
with nested(
mock.patch.object(self.mgr, '_reload_pool'),
mock.patch.object(self.mgr, '_destroy_pool')
) as (reload, destroy):
with mock.patch.object(self.mgr, '_reload_pool') as reload, \
mock.patch.object(self.mgr, '_destroy_pool') as destroy:
self.rpc_mock.get_ready_devices.return_value = ready
self.mgr.sync_state()

View File

@ -17,7 +17,6 @@ import mock
from neutron_lbaas.services.loadbalancer.agent import agent_api as api
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
class TestApiCache(base.BaseTestCase):
@ -37,12 +36,8 @@ class TestApiCache(base.BaseTestCase):
if method in add_host:
expected_kwargs['host'] = self.api.host
with nested(
mock.patch.object(self.api.client, 'call'),
mock.patch.object(self.api.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
with mock.patch.object(self.api.client, 'call') as rpc_mock, \
mock.patch.object(self.api.client, 'prepare') as prepare_mock:
prepare_mock.return_value = self.api.client
rpc_mock.return_value = 'foo'
rv = getattr(self.api, method)(**kwargs)

View File

@ -17,22 +17,19 @@ import mock
from neutron_lbaas.services.loadbalancer.drivers.haproxy import cfg
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
class TestHaproxyCfg(base.BaseTestCase):
def test_save_config(self):
with nested(
with mock.patch('neutron_lbaas.services.loadbalancer.'
'drivers.haproxy.cfg._build_global') as b_g, \
mock.patch('neutron_lbaas.services.loadbalancer.'
'drivers.haproxy.cfg._build_global'),
'drivers.haproxy.cfg._build_defaults') as b_d, \
mock.patch('neutron_lbaas.services.loadbalancer.'
'drivers.haproxy.cfg._build_defaults'),
'drivers.haproxy.cfg._build_frontend') as b_f, \
mock.patch('neutron_lbaas.services.loadbalancer.'
'drivers.haproxy.cfg._build_frontend'),
mock.patch('neutron_lbaas.services.loadbalancer.'
'drivers.haproxy.cfg._build_backend'),
mock.patch('neutron.common.utils.replace_file')
) as (b_g, b_d, b_f, b_b, replace):
'drivers.haproxy.cfg._build_backend') as b_b, \
mock.patch('neutron.common.utils.replace_file') as replace:
test_config = ['globals', 'defaults', 'frontend', 'backend']
b_g.return_value = [test_config[0]]
b_d.return_value = [test_config[1]]

View File

@ -21,18 +21,15 @@ from neutron_lbaas.common.cert_manager import cert_manager
from neutron_lbaas.common.tls_utils import cert_parser
from neutron_lbaas.services.loadbalancer import data_models
from neutron_lbaas.services.loadbalancer.drivers.haproxy import jinja_cfg
from neutron_lbaas.tests import nested
from neutron_lbaas.tests.unit.services.loadbalancer.drivers.haproxy.\
sample_configs import sample_configs
class TestHaproxyCfg(base.BaseTestCase):
def test_save_config(self):
with nested(
mock.patch('neutron_lbaas.services.loadbalancer.'
'drivers.haproxy.jinja_cfg.render_loadbalancer_obj'),
mock.patch('neutron.common.utils.replace_file')
) as (r_t, replace):
with mock.patch('neutron_lbaas.services.loadbalancer.drivers.haproxy.'
'jinja_cfg.render_loadbalancer_obj') as r_t, \
mock.patch('neutron.common.utils.replace_file') as replace:
r_t.return_value = 'fake_rendered_template'
lb = mock.Mock()
jinja_cfg.save_config('test_conf_path', lb, 'test_sock_path',
@ -336,12 +333,13 @@ class TestHaproxyCfg(base.BaseTestCase):
cert.get_certificate.return_value = tls.certificate
cert.get_intermediates.return_value = tls.intermediates
with nested(
mock.patch.object(jinja_cfg, '_map_cert_tls_container'),
mock.patch.object(jinja_cfg, '_store_listener_crt'),
mock.patch.object(cert_parser, 'get_host_names'),
mock.patch.object(jinja_cfg, 'CERT_MANAGER_PLUGIN')
) as (map, store_cert, get_host_names, cert_mgr):
with mock.patch.object(jinja_cfg, '_map_cert_tls_container') as map, \
mock.patch.object(jinja_cfg,
'_store_listener_crt') as store_cert, \
mock.patch.object(cert_parser,
'get_host_names') as get_host_names, \
mock.patch.object(jinja_cfg,
'CERT_MANAGER_PLUGIN') as cert_mgr:
map.return_value = tls
cert_mgr_mock = mock.Mock(spec=cert_manager.CertManager)
cert_mgr_mock.get_cert.return_value = cert

View File

@ -19,7 +19,6 @@ import six
from neutron_lbaas.services.loadbalancer.drivers.haproxy \
import namespace_driver
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
class TestHaproxyNSDriver(base.BaseTestCase):
@ -70,11 +69,9 @@ class TestHaproxyNSDriver(base.BaseTestCase):
spawn.assert_called_once_with(self.fake_config)
def test_update(self):
with nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch.object(self.driver, '_spawn'),
mock.patch.object(six.moves.builtins, 'open')
) as (gsp, spawn, mock_open):
with mock.patch.object(self.driver, '_get_state_file_path') as gsp, \
mock.patch.object(self.driver, '_spawn') as spawn, \
mock.patch.object(six.moves.builtins, 'open') as mock_open:
mock_open.return_value = ['5']
self.driver.update(self.fake_config)
@ -83,11 +80,11 @@ class TestHaproxyNSDriver(base.BaseTestCase):
spawn.assert_called_once_with(self.fake_config, ['-sf', '5'])
def test_spawn(self):
with nested(
mock.patch.object(namespace_driver.hacfg, 'save_config'),
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
) as (mock_save, gsp, ip_wrap):
with mock.patch.object(namespace_driver.hacfg,
'save_config') as mock_save, \
mock.patch.object(self.driver,
'_get_state_file_path') as gsp, \
mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap:
gsp.side_effect = lambda x, y: y
self.driver._spawn(self.fake_config)
@ -101,14 +98,14 @@ class TestHaproxyNSDriver(base.BaseTestCase):
])
def test_undeploy_instance(self):
with nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch.object(namespace_driver, 'kill_pids_in_file'),
mock.patch.object(self.driver, '_unplug'),
mock.patch('neutron.agent.linux.ip_lib.IPWrapper'),
mock.patch('os.path.isdir'),
mock.patch('shutil.rmtree')
) as (gsp, kill, unplug, ip_wrap, isdir, rmtree):
with mock.patch.object(self.driver, '_get_state_file_path') as gsp, \
mock.patch.object(namespace_driver,
'kill_pids_in_file') as kill, \
mock.patch.object(self.driver, '_unplug') as unplug, \
mock.patch('neutron.agent.linux.ip_lib.'
'IPWrapper') as ip_wrap, \
mock.patch('os.path.isdir') as isdir, \
mock.patch('shutil.rmtree') as rmtree:
gsp.side_effect = lambda x, y: '/pool/' + y
self.driver.pool_to_port_id['pool_id'] = 'port_id'
@ -126,14 +123,13 @@ class TestHaproxyNSDriver(base.BaseTestCase):
])
def test_undeploy_instance_with_ns_cleanup(self):
with nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch.object(self.driver, 'vif_driver'),
mock.patch.object(namespace_driver, 'kill_pids_in_file'),
mock.patch('neutron.agent.linux.ip_lib.IPWrapper'),
mock.patch('os.path.isdir'),
mock.patch('shutil.rmtree')
) as (gsp, vif, kill, ip_wrap, isdir, rmtree):
with mock.patch.object(self.driver, '_get_state_file_path'), \
mock.patch.object(self.driver, 'vif_driver') as vif, \
mock.patch.object(namespace_driver, 'kill_pids_in_file'), \
mock.patch('neutron.agent.linux.ip_lib.'
'IPWrapper') as ip_wrap, \
mock.patch('os.path.isdir'), \
mock.patch('shutil.rmtree'):
device = mock.Mock()
device_name = 'port_device'
device.name = device_name
@ -144,12 +140,11 @@ class TestHaproxyNSDriver(base.BaseTestCase):
namespace='qlbaas-pool_id')
def test_remove_orphans(self):
with nested(
mock.patch.object(self.driver, 'exists'),
mock.patch.object(self.driver, 'undeploy_instance'),
mock.patch('os.listdir'),
mock.patch('os.path.exists')
) as (exists, undeploy, listdir, path_exists):
with mock.patch.object(self.driver, 'exists') as exists, \
mock.patch.object(self.driver,
'undeploy_instance') as undeploy, \
mock.patch('os.listdir') as listdir, \
mock.patch('os.path.exists'):
known = ['known1', 'known2']
unknown = ['unknown1', 'unknown2']
listdir.return_value = known + unknown
@ -161,12 +156,10 @@ class TestHaproxyNSDriver(base.BaseTestCase):
cleanup_namespace=True)
def test_exists(self):
with nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch('neutron.agent.linux.ip_lib.IPWrapper'),
mock.patch('socket.socket'),
mock.patch('os.path.exists'),
) as (gsp, ip_wrap, socket, path_exists):
with mock.patch.object(self.driver, '_get_state_file_path') as gsp, \
mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap, \
mock.patch('socket.socket'), \
mock.patch('os.path.exists') as path_exists:
gsp.side_effect = lambda x, y, z: '/pool/' + y
ip_wrap.return_value.netns.exists.return_value = True
@ -209,11 +202,9 @@ class TestHaproxyNSDriver(base.BaseTestCase):
'hrsp_3xx,hrsp_4xx,hrsp_5xx,hrsp_other,hanafail,'
'req_rate,req_rate_max,req_tot,cli_abrt,srv_abrt,'
'\n')
with nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch('socket.socket'),
mock.patch('os.path.exists'),
) as (gsp, socket, path_exists):
with mock.patch.object(self.driver, '_get_state_file_path') as gsp, \
mock.patch('socket.socket') as socket, \
mock.patch('os.path.exists') as path_exists:
gsp.side_effect = lambda x, y, z: '/pool/' + y
path_exists.return_value = True
socket.return_value = socket
@ -261,11 +252,9 @@ class TestHaproxyNSDriver(base.BaseTestCase):
'subnet': {'cidr': '10.0.0.0/24',
'gateway_ip': '10.0.0.1'}}]}
test_address = '10.0.0.2'
with nested(
mock.patch('neutron.agent.linux.ip_lib.device_exists'),
mock.patch('netaddr.IPNetwork'),
mock.patch('neutron.agent.linux.ip_lib.IPWrapper'),
) as (dev_exists, ip_net, ip_wrap):
with mock.patch('neutron.agent.linux.ip_lib.device_exists') as dev_exists, \
mock.patch('netaddr.IPNetwork') as ip_net, \
mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap:
self.vif_driver.get_device_name.return_value = 'test_interface'
dev_exists.return_value = False
ip_net.return_value = ip_net
@ -308,11 +297,9 @@ class TestHaproxyNSDriver(base.BaseTestCase):
'subnet': {'cidr': '10.0.0.0/24',
'gateway_ip': '10.0.0.1'}}]}
test_address = '10.0.0.2'
with nested(
mock.patch('neutron.agent.linux.ip_lib.device_exists'),
mock.patch('netaddr.IPNetwork'),
mock.patch('neutron.agent.linux.ip_lib.IPWrapper'),
) as (dev_exists, ip_net, ip_wrap):
with mock.patch('neutron.agent.linux.ip_lib.device_exists') as dev_exists, \
mock.patch('netaddr.IPNetwork') as ip_net, \
mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap:
self.vif_driver.get_device_name.return_value = 'test_interface'
dev_exists.return_value = False
ip_net.return_value = ip_net
@ -332,11 +319,9 @@ class TestHaproxyNSDriver(base.BaseTestCase):
'fixed_ips': [{'ip_address': '10.0.0.2',
'subnet': {'cidr': '10.0.0.0/24'}}]}
test_address = '10.0.0.2'
with nested(
mock.patch('neutron.agent.linux.ip_lib.device_exists'),
mock.patch('netaddr.IPNetwork'),
mock.patch('neutron.agent.linux.ip_lib.IPWrapper'),
) as (dev_exists, ip_net, ip_wrap):
with mock.patch('neutron.agent.linux.ip_lib.device_exists') as dev_exists, \
mock.patch('netaddr.IPNetwork') as ip_net, \
mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap:
self.vif_driver.get_device_name.return_value = 'test_interface'
dev_exists.return_value = False
ip_net.return_value = ip_net
@ -371,11 +356,9 @@ class TestHaproxyNSDriver(base.BaseTestCase):
[{'destination': '0.0.0.0/0',
'nexthop': '10.0.0.1'}]}}]}
test_address = '10.0.0.2'
with nested(
mock.patch('neutron.agent.linux.ip_lib.device_exists'),
mock.patch('netaddr.IPNetwork'),
mock.patch('neutron.agent.linux.ip_lib.IPWrapper'),
) as (dev_exists, ip_net, ip_wrap):
with mock.patch('neutron.agent.linux.ip_lib.device_exists') as dev_exists, \
mock.patch('netaddr.IPNetwork') as ip_net, \
mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap:
self.vif_driver.get_device_name.return_value = 'test_interface'
dev_exists.return_value = False
ip_net.return_value = ip_net
@ -408,12 +391,12 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.vif_driver.unplug('test_interface', namespace='test_ns')
def test_kill_pids_in_file(self):
with nested(
mock.patch('os.path.exists'),
mock.patch.object(six.moves.builtins, 'open'),
mock.patch('neutron.agent.linux.utils.execute'),
mock.patch.object(namespace_driver.LOG, 'exception'),
) as (path_exists, mock_open, mock_execute, mock_log):
with mock.patch('os.path.exists') as path_exists, \
mock.patch.object(six.moves.builtins, 'open') as mock_open, \
mock.patch('neutron.agent.linux.utils.'
'execute') as mock_execute, \
mock.patch.object(namespace_driver.LOG,
'exception') as mock_log:
file_mock = mock.MagicMock()
mock_open.return_value = file_mock
file_mock.__enter__.return_value = file_mock
@ -486,10 +469,9 @@ class TestHaproxyNSDriver(base.BaseTestCase):
update.assert_called_once_with(self.fake_config)
def test_refresh_device(self):
with nested(
mock.patch.object(self.driver, 'deploy_instance'),
mock.patch.object(self.driver, 'undeploy_instance')
) as (deploy, undeploy):
with mock.patch.object(self.driver, 'deploy_instance') as deploy, \
mock.patch.object(self.driver,
'undeploy_instance') as undeploy:
pool_id = 'pool_id1'
self.driver._refresh_device(pool_id)
self.rpc_mock.get_logical_device.assert_called_once_with(pool_id)
@ -498,11 +480,10 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.assertFalse(undeploy.called)
def test_refresh_device_not_deployed(self):
with nested(
mock.patch.object(self.driver, 'deploy_instance'),
mock.patch.object(self.driver, 'exists'),
mock.patch.object(self.driver, 'undeploy_instance')
) as (deploy, exists, undeploy):
with mock.patch.object(self.driver, 'deploy_instance') as deploy, \
mock.patch.object(self.driver, 'exists') as exists, \
mock.patch.object(self.driver,
'undeploy_instance') as undeploy:
pool_id = 'pool_id1'
deploy.return_value = False
exists.return_value = True
@ -510,11 +491,10 @@ class TestHaproxyNSDriver(base.BaseTestCase):
undeploy.assert_called_once_with(pool_id)
def test_refresh_device_non_existing(self):
with nested(
mock.patch.object(self.driver, 'deploy_instance'),
mock.patch.object(self.driver, 'exists'),
mock.patch.object(self.driver, 'undeploy_instance')
) as (deploy, exists, undeploy):
with mock.patch.object(self.driver, 'deploy_instance') as deploy, \
mock.patch.object(self.driver, 'exists') as exists, \
mock.patch.object(self.driver,
'undeploy_instance') as undeploy:
pool_id = 'pool_id1'
deploy.return_value = False
exists.return_value = False
@ -547,19 +527,15 @@ class TestHaproxyNSDriver(base.BaseTestCase):
refresh.assert_called_once_with('1')
def test_delete_pool_existing(self):
with nested(
mock.patch.object(self.driver, 'undeploy_instance'),
mock.patch.object(self.driver, 'exists'),
) as (undeploy, exists):
with mock.patch.object(self.driver, 'undeploy_instance') as undeploy, \
mock.patch.object(self.driver, 'exists') as exists:
exists.return_value = True
self.driver.delete_pool({'id': '1'})
undeploy.assert_called_once_with('1', delete_namespace=True)
def test_delete_pool_non_existing(self):
with nested(
mock.patch.object(self.driver, 'undeploy_instance'),
mock.patch.object(self.driver, 'exists'),
) as (undeploy, exists):
with mock.patch.object(self.driver, 'undeploy_instance') as undeploy, \
mock.patch.object(self.driver, 'exists') as exists:
exists.return_value = False
self.driver.delete_pool({'id': '1'})
self.assertFalse(undeploy.called)

View File

@ -22,7 +22,6 @@ from neutron_lbaas.db.loadbalancer import loadbalancer_db
from neutron_lbaas.services.loadbalancer.drivers.netscaler import ncc_client
from neutron_lbaas.services.loadbalancer.drivers.netscaler \
import netscaler_driver
from neutron_lbaas.tests import nested
from neutron_lbaas.tests.unit.db.loadbalancer import test_db_loadbalancer
@ -93,10 +92,9 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
self.context = context.get_admin_context()
def test_create_vip(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
testvip = self._build_testvip_contents(subnet['subnet'],
@ -125,10 +123,9 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ACTIVE)
def test_create_vip_without_connection(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
testvip = self._build_testvip_contents(subnet['subnet'],
@ -160,10 +157,9 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ERROR)
def test_update_vip(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
with self.vip(pool=pool, subnet=subnet) as vip:
@ -201,16 +197,14 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ACTIVE)
def test_delete_vip(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
with nested(
self.vip(pool=pool, subnet=subnet),
mock.patch.object(self.driver.plugin, '_delete_db_vip')
) as (vip, mock_delete_db_vip):
with self.vip(pool=pool, subnet=subnet) as vip, \
mock.patch.object(self.driver.plugin,
'_delete_db_vip') as mock_delete_db_vip:
mock_delete_db_vip.return_value = None
#reset the remove_resource() mock
self.remove_resource_mock.reset_mock()
@ -225,12 +219,13 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
.assert_called_once_with(None, vip_resource_path))
def test_create_pool(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet'),
mock.patch.object(self.driver.plugin._core_plugin, 'get_ports'),
mock.patch.object(self.driver.plugin._core_plugin, 'create_port')
) as (subnet, mock_get_subnet, mock_get_ports, mock_create_port):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_ports') as mock_get_ports, \
mock.patch.object(self.driver.plugin._core_plugin,
'create_port') as mock_create_port:
mock_get_subnet.return_value = subnet['subnet']
mock_get_ports.return_value = None
mock_create_port.return_value = TESTPOOL_SNAT_PORT
@ -258,12 +253,13 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ACTIVE)
def test_create_pool_with_error(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet'),
mock.patch.object(self.driver.plugin._core_plugin, 'get_ports'),
mock.patch.object(self.driver.plugin._core_plugin, 'create_port')
) as (subnet, mock_get_subnet, mock_get_ports, mock_create_port):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_ports') as mock_get_ports, \
mock.patch.object(self.driver.plugin._core_plugin,
'create_port') as mock_create_port:
mock_get_subnet.return_value = subnet['subnet']
mock_get_ports.return_value = None
mock_create_port.return_value = TESTPOOL_SNAT_PORT
@ -294,12 +290,13 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ERROR)
def test_create_pool_with_snatportcreate_failure(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet'),
mock.patch.object(self.driver.plugin._core_plugin, 'get_ports'),
mock.patch.object(self.driver.plugin._core_plugin, 'create_port')
) as (subnet, mock_get_subnet, mock_get_ports, mock_create_port):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_ports') as mock_get_ports, \
mock.patch.object(self.driver.plugin._core_plugin,
'create_port') as mock_create_port:
mock_get_subnet.return_value = subnet['subnet']
mock_get_ports.return_value = None
mock_create_port.side_effect = exceptions.NeutronException()
@ -312,10 +309,9 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
self.context, testpool)
def test_update_pool(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
updated_pool = self._build_updated_testpool_contents(
@ -349,23 +345,19 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ACTIVE)
def test_delete_pool(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with nested(
self.pool(provider=LBAAS_PROVIDER_NAME),
mock.patch.object(self.driver.plugin._core_plugin,
'delete_port'),
mock.patch.object(self.driver.plugin._core_plugin,
'get_ports'),
mock.patch.object(self.driver.plugin,
'get_pools'),
mock.patch.object(self.driver.plugin,
'_delete_db_pool')
) as (pool, mock_delete_port, mock_get_ports, mock_get_pools,
mock_delete_db_pool):
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool, \
mock.patch.object(self.driver.plugin._core_plugin,
'delete_port') as mock_delete_port, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_ports') as mock_get_ports, \
mock.patch.object(self.driver.plugin,
'get_pools') as mock_get_pools, \
mock.patch.object(self.driver.plugin,
'_delete_db_pool') as mock_delete_db_pool:
mock_delete_port.return_value = None
mock_get_ports.return_value = [{'id': TESTPOOL_PORT_ID}]
mock_get_pools.return_value = []
@ -383,11 +375,9 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
.assert_called_once_with(None, pool_resource_path))
def test_create_member(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
testmember = self._build_testmember_contents(pool['pool'])
@ -415,10 +405,9 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ACTIVE)
def test_update_member(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
with self.member(pool_id=pool['pool']['id']) as member:
@ -454,16 +443,14 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ACTIVE)
def test_delete_member(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
with nested(
self.member(pool_id=pool['pool']['id']),
mock.patch.object(self.driver.plugin, '_delete_db_member')
) as (member, mock_delete_db_member):
with self.member(pool_id=pool['pool']['id']) as member, \
mock.patch.object(self.driver.plugin,
'_delete_db_member') as mock_delete_db_member:
mock_delete_db_member.return_value = None
# reset the remove_resource() mock
self.remove_resource_mock.reset_mock()
@ -479,10 +466,9 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
.assert_called_once_with(None, member_resource_path))
def test_create_pool_health_monitor(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
testhealthmonitor = self._build_testhealthmonitor_contents(
@ -519,10 +505,9 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ACTIVE, ""))
def test_update_pool_health_monitor(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
with self.health_monitor(
@ -562,17 +547,16 @@ class TestNetScalerPluginDriver(TestLoadBalancerPluginBase):
constants.ACTIVE, ""))
def test_delete_pool_health_monitor(self):
with nested(
self.subnet(),
mock.patch.object(self.driver.plugin._core_plugin, 'get_subnet')
) as (subnet, mock_get_subnet):
with self.subnet() as subnet, \
mock.patch.object(self.driver.plugin._core_plugin,
'get_subnet') as mock_get_subnet:
mock_get_subnet.return_value = subnet['subnet']
with self.pool(provider=LBAAS_PROVIDER_NAME) as pool:
with nested(
self.health_monitor(pool_id=pool['pool']['id']),
mock.patch.object(self.driver.plugin,
'_delete_db_pool_health_monitor')
) as (health_monitor, mock_delete_db_monitor):
with self.health_monitor(
pool_id=pool['pool']['id']) as health_monitor, \
mock.patch.object(self.driver.plugin,
'_delete_db_pool_health_monitor') as \
mock_delete_db_monitor:
mock_delete_db_monitor.return_value = None
# reset the remove_resource() mock
self.remove_resource_mock.reset_mock()

View File

@ -27,7 +27,6 @@ from neutron_lbaas.extensions import loadbalancer
from neutron_lbaas.services.loadbalancer.drivers.radware import driver
from neutron_lbaas.services.loadbalancer.drivers.radware \
import exceptions as r_exc
from neutron_lbaas.tests import nested
from neutron_lbaas.tests.unit.db.loadbalancer import test_db_loadbalancer
GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
@ -515,15 +514,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
with self.pool(do_delete=False,
provider='radware',
subnet_id=subnet['subnet']['id']) as pool:
with nested(
self.member(pool_id=pool['pool']['id'],
do_delete=False),
self.member(pool_id=pool['pool']['id'],
address='192.168.1.101',
do_delete=False),
self.health_monitor(do_delete=False),
self.vip(pool=pool, subnet=subnet, do_delete=False)
) as (mem1, mem2, hm, vip):
with self.member(pool_id=pool['pool']['id'],
do_delete=False) as mem1, \
self.member(pool_id=pool['pool']['id'],
address='192.168.1.101',
do_delete=False) as mem2, \
self.health_monitor(do_delete=False) as hm, \
self.vip(pool=pool, subnet=subnet,
do_delete=False) as vip:
plugin.create_pool_health_monitor(
context.get_admin_context(), hm, pool['pool']['id']
@ -682,136 +680,122 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
calls, any_order=True)
def test_create_member_on_different_subnets(self):
with nested(
self.subnet(),
self.subnet(cidr='20.0.0.0/24'),
self.subnet(cidr='30.0.0.0/24')
) as (vip_sub, pool_sub, member_sub):
with self.pool(provider='radware',
subnet_id=pool_sub['subnet']['id']) as pool:
with nested(
self.port(subnet=vip_sub,
fixed_ips=[{'ip_address': '10.0.0.2'}]),
self.port(subnet=pool_sub,
fixed_ips=[{'ip_address': '20.0.0.2'}]),
self.port(subnet=member_sub,
fixed_ips=[{'ip_address': '30.0.0.2'}])
):
with nested(
self.member(pool_id=pool['pool']['id'],
address='10.0.0.2'),
self.member(pool_id=pool['pool']['id'],
address='20.0.0.2'),
self.member(pool_id=pool['pool']['id'],
address='30.0.0.2')
) as (member_vip, member_pool, member_out):
with self.vip(pool=pool, subnet=vip_sub):
calls = [
mock.call(
'POST', '/api/workflow/' +
pool['pool']['id'] +
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
with self.subnet() as vip_sub, \
self.subnet(cidr='20.0.0.0/24') as pool_sub, \
self.subnet(cidr='30.0.0.0/24') as member_sub, \
self.pool(provider='radware',
subnet_id=pool_sub['subnet']['id']) as pool, \
self.port(subnet=vip_sub,
fixed_ips=[{'ip_address': '10.0.0.2'}]), \
self.port(subnet=pool_sub,
fixed_ips=[{'ip_address': '20.0.0.2'}]), \
self.port(subnet=member_sub,
fixed_ips=[{'ip_address': '30.0.0.2'}]), \
self.member(pool_id=pool['pool']['id'], address='10.0.0.2'), \
self.member(pool_id=pool['pool']['id'], address='20.0.0.2'), \
self.member(pool_id=pool['pool']['id'], address='30.0.0.2'), \
self.vip(pool=pool, subnet=vip_sub):
calls = [
mock.call(
'POST', '/api/workflow/' +
pool['pool']['id'] +
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
mock_calls = self.driver_rest_call_mock.mock_calls
params = mock_calls[-2][1][2]['parameters']
member_subnet_array = params['member_subnet_array']
member_mask_array = params['member_mask_array']
member_gw_array = params['member_gw_array']
self.assertEqual(['10.0.0.0',
'255.255.255.255',
'30.0.0.0'],
member_subnet_array)
self.assertEqual(['255.255.255.0',
'255.255.255.255',
'255.255.255.0'],
member_mask_array)
self.assertEqual(
[pool_sub['subnet']['gateway_ip'],
'255.255.255.255',
pool_sub['subnet']['gateway_ip']],
member_gw_array)
mock_calls = self.driver_rest_call_mock.mock_calls
params = mock_calls[-2][1][2]['parameters']
member_subnet_array = params['member_subnet_array']
member_mask_array = params['member_mask_array']
member_gw_array = params['member_gw_array']
self.assertEqual(['10.0.0.0',
'255.255.255.255',
'30.0.0.0'],
member_subnet_array)
self.assertEqual(['255.255.255.0',
'255.255.255.255',
'255.255.255.0'],
member_mask_array)
self.assertEqual(
[pool_sub['subnet']['gateway_ip'],
'255.255.255.255',
pool_sub['subnet']['gateway_ip']],
member_gw_array)
def test_create_member_on_different_subnet_no_port(self):
with nested(
self.subnet(),
self.subnet(cidr='20.0.0.0/24'),
self.subnet(cidr='30.0.0.0/24')
) as (vip_sub, pool_sub, member_sub):
with self.pool(provider='radware',
subnet_id=pool_sub['subnet']['id']) as pool:
with self.member(pool_id=pool['pool']['id'],
address='30.0.0.2'):
with self.vip(pool=pool, subnet=vip_sub):
calls = [
mock.call(
'POST', '/api/workflow/' +
pool['pool']['id'] +
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
with self.subnet() as vip_sub, \
self.subnet(cidr='20.0.0.0/24') as pool_sub, \
self.subnet(cidr='30.0.0.0/24'), \
self.pool(provider='radware',
subnet_id=pool_sub['subnet']['id']) as pool, \
self.member(pool_id=pool['pool']['id'],
address='30.0.0.2'), \
self.vip(pool=pool, subnet=vip_sub):
calls = [
mock.call(
'POST', '/api/workflow/' +
pool['pool']['id'] +
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
mock_calls = self.driver_rest_call_mock.mock_calls
params = mock_calls[-2][1][2]['parameters']
member_subnet_array = params['member_subnet_array']
member_mask_array = params['member_mask_array']
member_gw_array = params['member_gw_array']
self.assertEqual(['30.0.0.2'],
member_subnet_array)
self.assertEqual(['255.255.255.255'],
member_mask_array)
self.assertEqual([pool_sub['subnet']['gateway_ip']],
member_gw_array)
mock_calls = self.driver_rest_call_mock.mock_calls
params = mock_calls[-2][1][2]['parameters']
member_subnet_array = params['member_subnet_array']
member_mask_array = params['member_mask_array']
member_gw_array = params['member_gw_array']
self.assertEqual(['30.0.0.2'],
member_subnet_array)
self.assertEqual(['255.255.255.255'],
member_mask_array)
self.assertEqual([pool_sub['subnet']['gateway_ip']],
member_gw_array)
def test_create_member_on_different_subnet_multiple_ports(self):
cfg.CONF.set_override("allow_overlapping_ips", 'true')
with self.network() as other_net:
with nested(
self.subnet(),
self.subnet(cidr='20.0.0.0/24'),
self.subnet(cidr='30.0.0.0/24'),
self.subnet(network=other_net, cidr='30.0.0.0/24')
) as (vip_sub, pool_sub, member_sub1, member_sub2):
with self.pool(provider='radware',
subnet_id=pool_sub['subnet']['id']) as pool:
with nested(
self.port(subnet=member_sub1,
fixed_ips=[{'ip_address': '30.0.0.2'}]),
self.port(subnet=member_sub2,
fixed_ips=[{'ip_address': '30.0.0.2'}])):
with self.member(pool_id=pool['pool']['id'],
address='30.0.0.2'):
with self.vip(pool=pool, subnet=vip_sub):
calls = [
mock.call(
'POST', '/api/workflow/' +
pool['pool']['id'] +
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
with self.network() as other_net, \
self.subnet() as vip_sub, \
self.subnet(cidr='20.0.0.0/24') as pool_sub, \
self.subnet(cidr='30.0.0.0/24') as member_sub1, \
self.subnet(network=other_net, cidr='30.0.0.0/24') as member_sub2, \
self.pool(provider='radware',
subnet_id=pool_sub['subnet']['id']) as pool, \
self.port(subnet=member_sub1,
fixed_ips=[{'ip_address': '30.0.0.2'}]), \
self.port(subnet=member_sub2,
fixed_ips=[{'ip_address': '30.0.0.2'}]), \
self.member(pool_id=pool['pool']['id'],
address='30.0.0.2'), \
self.vip(pool=pool, subnet=vip_sub):
calls = [
mock.call(
'POST', '/api/workflow/' +
pool['pool']['id'] +
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
calls = self.driver_rest_call_mock.mock_calls
params = calls[-2][1][2]['parameters']
m_sub_array = params['member_subnet_array']
m_mask_array = params['member_mask_array']
m_gw_array = params['member_gw_array']
self.assertEqual(['30.0.0.2'],
m_sub_array)
self.assertEqual(['255.255.255.255'],
m_mask_array)
self.assertEqual(
[pool_sub['subnet']['gateway_ip']],
m_gw_array)
calls = self.driver_rest_call_mock.mock_calls
params = calls[-2][1][2]['parameters']
m_sub_array = params['member_subnet_array']
m_mask_array = params['member_mask_array']
m_gw_array = params['member_gw_array']
self.assertEqual(['30.0.0.2'],
m_sub_array)
self.assertEqual(['255.255.255.255'],
m_mask_array)
self.assertEqual(
[pool_sub['subnet']['gateway_ip']],
m_gw_array)
def test_update_member_with_vip(self):
with self.subnet() as subnet:

View File

@ -29,7 +29,6 @@ from neutron_lbaas.extensions import loadbalancer
from neutron_lbaas.services.loadbalancer.drivers.common \
import agent_driver_base
from neutron_lbaas.tests import base
from neutron_lbaas.tests import nested
from neutron_lbaas.tests.unit.db.loadbalancer import test_db_loadbalancer
@ -409,10 +408,8 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
self.assertTrue(mock_log.warning.called)
def test_update_status_health_monitor(self):
with nested(
self.health_monitor(),
self.pool()
) as (hm, pool):
with self.health_monitor() as hm, \
self.pool() as pool:
pool_id = pool['pool']['id']
ctx = context.get_admin_context()
self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
@ -438,12 +435,8 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
self.assertEqual('topic', self.api.client.target.topic)
def _call_test_helper(self, method_name, method_args):
with nested(
mock.patch.object(self.api.client, 'cast'),
mock.patch.object(self.api.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
with mock.patch.object(self.api.client, 'cast') as rpc_mock, \
mock.patch.object(self.api.client, 'prepare') as prepare_mock:
prepare_mock.return_value = self.api.client
getattr(self.api, method_name)(mock.sentinel.context,
host='host',
@ -669,10 +662,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
mock.ANY, member['member'], 'host')
def test_create_pool_health_monitor(self):
with nested(
self.health_monitor(),
self.pool(),
) as (hm, pool):
with self.health_monitor() as hm, \
self.pool() as pool:
pool_id = pool['pool']['id']
ctx = context.get_admin_context()
self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
@ -683,10 +674,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
mock.ANY, hm, pool_id, 'host')
def test_delete_pool_health_monitor(self):
with nested(
self.pool(),
self.health_monitor()
) as (pool, hm):
with self.pool() as pool, \
self.health_monitor() as hm:
pool_id = pool['pool']['id']
ctx = context.get_admin_context()
self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
@ -700,10 +689,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
mock.ANY, hm, pool_id, 'host')
def test_update_health_monitor_associated_with_pool(self):
with nested(
self.health_monitor(type='HTTP'),
self.pool()
) as (monitor, pool):
with self.health_monitor(type='HTTP') as monitor, \
self.pool() as pool:
data = {
'health_monitor': {
'id': monitor['health_monitor']['id'],

View File

@ -21,7 +21,6 @@ from neutron.plugins.common import constants
from neutron import manager
from neutron_lbaas.db.loadbalancer import loadbalancer_db as lb_db
from neutron_lbaas.services.loadbalancer.drivers.vmware import db
from neutron_lbaas.tests import nested
from neutron_lbaas.tests.unit.db.loadbalancer import test_db_loadbalancer
@ -65,10 +64,10 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
def test_create_pool_successful(self):
pool = {'id': POOL_ID}
with nested(
mock.patch.object(db, 'add_nsxv_edge_pool_mapping'),
mock.patch.object(self.edge_driver, 'pool_successful')
) as (mock_add_pool, mock_pool_successful):
with mock.patch.object(db,
'add_nsxv_edge_pool_mapping') as mock_add_pool, \
mock.patch.object(self.edge_driver,
'pool_successful') as mock_pool_successful:
self.edge_driver.create_pool_successful(self.context,
pool,
EDGE_ID, EDGE_POOL_ID)
@ -79,10 +78,10 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
def test_delete_pool_successful(self):
pool = {'id': POOL_ID}
with nested(
mock.patch.object(self.service_plugin, '_delete_db_pool'),
mock.patch.object(db, 'delete_nsxv_edge_pool_mapping')
) as (mock_del_db_pool, mock_del_mapping):
with mock.patch.object(self.service_plugin,
'_delete_db_pool') as mock_del_db_pool, \
mock.patch.object(db, 'delete_nsxv_edge_pool_mapping') as \
mock_del_mapping:
self.edge_driver.delete_pool_successful(self.context, pool)
mock_del_db_pool.assert_called_with(self.context, POOL_ID)
mock_del_mapping.assert_called_with(self.context, POOL_ID)
@ -139,12 +138,10 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'update_pool')
) as (mock_get_mapping, mock_update_pool):
with mock.patch.object(db, 'get_nsxv_edge_pool_'
'mapping') as mock_get_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'update_pool') as mock_update_pool:
mock_get_mapping.return_value = mapping
self.edge_driver.update_pool(self.context, from_pool, to_pool)
mock_update_pool.assert_called_with(self.context, from_pool,
@ -160,14 +157,12 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
'health_monitors_status': [], 'provider': 'vmwareedge'}
mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(self.service_plugin, 'get_pool',
return_value={}),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'delete_pool')
) as (mock_get_mapping, mock_get_pool, mock_delete_pool):
with mock.patch.object(db, 'get_nsxv_edge_pool_'
'mapping') as mock_get_mapping, \
mock.patch.object(self.service_plugin, 'get_pool',
return_value={}), \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'delete_pool') as mock_delete_pool:
mock_get_mapping.return_value = mapping
self.edge_driver.delete_pool(self.context, lbaas_pool)
mock_delete_pool.assert_called_with(self.context, lbaas_pool,
@ -175,11 +170,10 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
def test_create_vip_successful(self):
vip = {'pool_id': POOL_ID}
with nested(
mock.patch.object(db, 'add_nsxv_edge_vip_mapping'),
mock.patch.object(self.edge_driver, 'vip_successful')
) as (mock_add_vip_mapping, mock_vip_successful):
with mock.patch.object(db, 'add_nsxv_edge_vip_'
'mapping') as mock_add_vip_mapping, \
mock.patch.object(self.edge_driver,
'vip_successful') as mock_vip_successful:
self.edge_driver.create_vip_successful(
self.context, vip, EDGE_ID, APP_PROFILE_ID, EDGE_VSE_ID,
EDGE_FW_RULE_ID)
@ -191,11 +185,10 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
def test_delete_vip_successful(self):
vip = {'pool_id': POOL_ID, 'id': VIP_ID}
with nested(
mock.patch.object(db, 'delete_nsxv_edge_vip_mapping'),
mock.patch.object(self.service_plugin, '_delete_db_vip')
) as (mock_del_vip_mapping, mock_del_vip):
with mock.patch.object(db, 'delete_nsxv_edge_vip_'
'mapping') as mock_del_vip_mapping, \
mock.patch.object(self.service_plugin,
'_delete_db_vip') as mock_del_vip:
self.edge_driver.delete_vip_successful(self.context, vip)
mock_del_vip_mapping.assert_called_with(self.context, POOL_ID)
mock_del_vip.assert_called_with(self.context, VIP_ID)
@ -227,11 +220,9 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
'session_persistence': {'type': 'SOURCE_IP'}}
mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'create_vip')
) as (mock_get_mapping, mock_create_vip):
with mock.patch.object(db, 'get_nsxv_edge_pool_mapping') as mock_get_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'create_vip') as mock_create_vip:
mock_get_mapping.return_value = mapping
self.edge_driver.create_vip(self.context, lbaas_vip)
mock_create_vip.assert_called_with(self.context, lbaas_vip,
@ -258,13 +249,10 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
vip_mapping = {'edge_id': EDGE_ID, 'edge_vse_id': EDGE_VSE_ID,
'edge_app_profile_id': APP_PROFILE_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(db, 'get_nsxv_edge_vip_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'update_vip')
) as (mock_get_pool_mapping, mock_get_vip_mapping, mock_upd_vip):
with mock.patch.object(db, 'get_nsxv_edge_pool_mapping') as mock_get_pool_mapping, \
mock.patch.object(db, 'get_nsxv_edge_vip_mapping') as mock_get_vip_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'update_vip') as mock_upd_vip:
mock_get_pool_mapping.return_value = pool_mapping
mock_get_vip_mapping.return_value = vip_mapping
self.edge_driver.update_vip(self.context, vip_from, vip_to)
@ -283,12 +271,9 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
'edge_app_profile_id': APP_PROFILE_ID,
'edge_fw_rule_id': EDGE_FW_RULE_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_vip_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'delete_vip')
) as (mock_get_mapping, mock_del_vip):
with mock.patch.object(db, 'get_nsxv_edge_vip_mapping') as mock_get_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'delete_vip') as mock_del_vip:
mock_get_mapping.return_value = mapping
self.edge_driver.delete_vip(self.context, lbaas_vip)
mock_del_vip.assert_called_with(self.context, lbaas_vip, mapping)
@ -317,12 +302,9 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
'pool_id': POOL_ID}
mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'create_member')
) as (mock_get_mapping, mock_create_member):
with mock.patch.object(db, 'get_nsxv_edge_pool_mapping') as mock_get_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'create_member') as mock_create_member:
mock_get_mapping.return_value = mapping
self.edge_driver.create_member(self.context, lbaas_member)
mock_create_member.assert_called_with(self.context, lbaas_member,
@ -341,12 +323,9 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
'pool_id': POOL_ID}
mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'update_member')
) as (mock_get_mapping, mock_update_member):
with mock.patch.object(db, 'get_nsxv_edge_pool_mapping') as mock_get_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'update_member') as mock_update_member:
mock_get_mapping.return_value = mapping
self.edge_driver.update_member(self.context, member_from,
member_to)
@ -361,12 +340,9 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
'pool_id': POOL_ID}
mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'delete_member')
) as (mock_get_mapping, mock_delete_member):
with mock.patch.object(db, 'get_nsxv_edge_pool_mapping') as mock_get_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'delete_member') as mock_delete_member:
mock_get_mapping.return_value = mapping
self.edge_driver.delete_member(self.context, lbaas_member)
mock_delete_member.assert_called_with(self.context, lbaas_member,
@ -374,11 +350,11 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
def test_create_pool_health_monitor_successful(self):
hmon = {'id': HEALTHMON_ID}
with nested(
mock.patch.object(db, 'add_nsxv_edge_monitor_mapping'),
mock.patch.object(self.edge_driver,
'pool_health_monitor_successful')
) as (mock_add_pool_mon_mapping, mock_pool_hmon_successful):
with mock.patch.object(db, 'add_nsxv_edge_monitor_'
'mapping') as mock_add_pool_mon_mapping, \
mock.patch.object(self.edge_driver,
'pool_health_monitor_'
'successful') as mock_pool_hmon_successful:
self.edge_driver.create_pool_health_monitor_successful(
self.context, hmon, POOL_ID, EDGE_ID, EDGE_MON_ID)
mock_add_pool_mon_mapping.assert_called_with(
@ -389,12 +365,11 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
def test_delete_pool_health_monitor_successful(self):
hmon = {'id': HEALTHMON_ID, 'pool_id': POOL_ID}
hmon_mapping = {'edge_id': EDGE_ID}
with nested(
mock.patch.object(db, 'delete_nsxv_edge_monitor_mapping'),
mock.patch.object(self.service_plugin,
'_delete_db_pool_health_monitor')
) as (mock_del_pool_hmon_mapping, mock_del_db_pool_hmon):
with mock.patch.object(db, 'delete_nsxv_edge_monitor_'
'mapping') as mock_del_pool_hmon_mapping, \
mock.patch.object(self.service_plugin,
'_delete_db_pool_health_'
'monitor') as mock_del_db_pool_hmon:
self.edge_driver.delete_pool_health_monitor_successful(
self.context, hmon, POOL_ID, hmon_mapping)
mock_del_pool_hmon_mapping.assert_called_with(
@ -431,14 +406,13 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
'type': 'PING', 'id': HEALTHMON_ID}
pool_mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(db, 'get_nsxv_edge_monitor_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'create_pool_health_monitor')
) as (mock_get_pool_mapping, mock_get_mon_mapping,
mock_create_pool_hm):
with mock.patch.object(db, 'get_nsxv_edge_pool_'
'mapping') as mock_get_pool_mapping, \
mock.patch.object(db, 'get_nsxv_edge_monitor_'
'mapping') as mock_get_mon_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'create_pool_health_'
'monitor') as mock_create_pool_hm:
mock_get_pool_mapping.return_value = pool_mapping
mock_get_mon_mapping.return_value = None
self.edge_driver.create_pool_health_monitor(self.context,
@ -462,13 +436,13 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
pool_mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
mon_mapping = {'edge_id': EDGE_ID, 'edge_monitor_id': EDGE_MON_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(db, 'get_nsxv_edge_monitor_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'update_pool_health_monitor')
) as (mock_get_pool_mapping, mock_get_mon_mapping, mock_upd_pool_hm):
with mock.patch.object(db, 'get_nsxv_edge_pool_'
'mapping') as mock_get_pool_mapping, \
mock.patch.object(db, 'get_nsxv_edge_monitor_'
'mapping') as mock_get_mon_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'update_pool_health_'
'monitor') as mock_upd_pool_hm:
mock_get_pool_mapping.return_value = pool_mapping
mock_get_mon_mapping.return_value = mon_mapping
self.edge_driver.update_pool_health_monitor(
@ -487,13 +461,13 @@ class TestEdgeLoadBalancerPlugin(TestLoadBalancerPluginBase):
pool_mapping = {'edge_id': EDGE_ID, 'edge_pool_id': EDGE_POOL_ID}
mon_mapping = {'edge_id': EDGE_ID, 'edge_monitor_id': EDGE_MON_ID}
with nested(
mock.patch.object(db, 'get_nsxv_edge_pool_mapping'),
mock.patch.object(db, 'get_nsxv_edge_monitor_mapping'),
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'delete_pool_health_monitor')
) as (mock_get_pool_mapping, mock_get_mon_mapping, mock_del_pool_hm):
with mock.patch.object(db, 'get_nsxv_edge_pool_'
'mapping') as mock_get_pool_mapping, \
mock.patch.object(db, 'get_nsxv_edge_monitor_'
'mapping') as mock_get_mon_mapping, \
mock.patch.object(self.service_plugin._core_plugin.nsx_v,
'delete_pool_health_'
'monitor') as mock_del_pool_hm:
mock_get_pool_mapping.return_value = pool_mapping
mock_get_mon_mapping.return_value = mon_mapping
self.edge_driver.delete_pool_health_monitor(self.context, hmon,