OVS agent support on Hyper-V

This patch abstracts away platform specific differences in
agent/linux/utils.py and agent/linux/polling.py in order for
OVS neutron agent to work on Hyper-V.

agent.linux.utils uses fcntl that is not available on Windows and
also uses rootwrap which is no necessary on Windows.

ovsdb_monitor.SimpleInterfaceMonitor works only on GNU/Linux because
agent.linux.async_process uses platfom specific components like the
kill command.

Unit tests have been updated accordingly

Implements blueprint: hyper-v-ovs-agent

Change-Id: I3326414335467d9dc5da03e6d1016d0e32330dd0
This commit is contained in:
Adelina Tuvenie 2015-03-24 11:29:17 -07:00
parent 1a089e6059
commit f0d9410a82
22 changed files with 317 additions and 130 deletions

View File

@ -0,0 +1,62 @@
# Copyright 2015 Cloudbase Solutions.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class BasePollingManager(object):
def __init__(self):
self._force_polling = False
self._polling_completed = True
def force_polling(self):
self._force_polling = True
def polling_completed(self):
self._polling_completed = True
def _is_polling_required(self):
raise NotImplementedError()
@property
def is_polling_required(self):
# Always consume the updates to minimize polling.
polling_required = self._is_polling_required()
# Polling is required regardless of whether updates have been
# detected.
if self._force_polling:
self._force_polling = False
polling_required = True
# Polling is required if not yet done for previously detected
# updates.
if not self._polling_completed:
polling_required = True
if polling_required:
# Track whether polling has been completed to ensure that
# polling can be required until the caller indicates via a
# call to polling_completed() that polling has been
# successfully performed.
self._polling_completed = False
return polling_required
class AlwaysPoll(BasePollingManager):
@property
def is_polling_required(self):
return True

View File

@ -23,8 +23,8 @@ from oslo_utils import excutils
import retrying import retrying
import six import six
from neutron.agent.common import utils
from neutron.agent.linux import ip_lib from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.agent.ovsdb import api as ovsdb from neutron.agent.ovsdb import api as ovsdb
from neutron.common import exceptions from neutron.common import exceptions
from neutron.i18n import _LE, _LI, _LW from neutron.i18n import _LE, _LI, _LW

View File

@ -0,0 +1,24 @@
# Copyright 2015 Cloudbase Solutions.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
if os.name == 'nt':
from neutron.agent.windows import polling
else:
from neutron.agent.linux import polling
get_polling_manager = polling.get_polling_manager

View File

@ -0,0 +1,24 @@
# Copyright 2015 Cloudbase Solutions.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
if os.name == 'nt':
from neutron.agent.windows import utils
else:
from neutron.agent.linux import utils
execute = utils.execute

View File

@ -19,7 +19,7 @@ import os
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from neutron.agent.linux import utils from neutron.agent.common import utils
from neutron.common import exceptions from neutron.common import exceptions
from neutron.i18n import _LE from neutron.i18n import _LE

View File

@ -16,6 +16,7 @@ import contextlib
import eventlet import eventlet
from neutron.agent.common import base_polling
from neutron.agent.linux import ovsdb_monitor from neutron.agent.linux import ovsdb_monitor
from neutron.plugins.openvswitch.common import constants from neutron.plugins.openvswitch.common import constants
@ -29,7 +30,7 @@ def get_polling_manager(minimize_polling=False,
ovsdb_monitor_respawn_interval=ovsdb_monitor_respawn_interval) ovsdb_monitor_respawn_interval=ovsdb_monitor_respawn_interval)
pm.start() pm.start()
else: else:
pm = AlwaysPoll() pm = base_polling.AlwaysPoll()
try: try:
yield pm yield pm
finally: finally:
@ -37,55 +38,7 @@ def get_polling_manager(minimize_polling=False,
pm.stop() pm.stop()
class BasePollingManager(object): class InterfacePollingMinimizer(base_polling.BasePollingManager):
def __init__(self):
self._force_polling = False
self._polling_completed = True
def force_polling(self):
self._force_polling = True
def polling_completed(self):
self._polling_completed = True
def _is_polling_required(self):
raise NotImplementedError()
@property
def is_polling_required(self):
# Always consume the updates to minimize polling.
polling_required = self._is_polling_required()
# Polling is required regardless of whether updates have been
# detected.
if self._force_polling:
self._force_polling = False
polling_required = True
# Polling is required if not yet done for previously detected
# updates.
if not self._polling_completed:
polling_required = True
if polling_required:
# Track whether polling has been completed to ensure that
# polling can be required until the caller indicates via a
# call to polling_completed() that polling has been
# successfully performed.
self._polling_completed = False
return polling_required
class AlwaysPoll(BasePollingManager):
@property
def is_polling_required(self):
return True
class InterfacePollingMinimizer(BasePollingManager):
"""Monitors ovsdb to determine when polling is required.""" """Monitors ovsdb to determine when polling is required."""
def __init__( def __init__(

View File

@ -20,7 +20,7 @@ from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_utils import excutils from oslo_utils import excutils
from neutron.agent.linux import utils from neutron.agent.common import utils
from neutron.agent.ovsdb import api as ovsdb from neutron.agent.ovsdb import api as ovsdb
from neutron.i18n import _LE from neutron.i18n import _LE

View File

View File

@ -0,0 +1,24 @@
# Copyright 2015 Cloudbase Solutions.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from neutron.agent.common import base_polling
@contextlib.contextmanager
def get_polling_manager(minimize_polling, ovsdb_monitor_respawn_interval):
pm = base_polling.AlwaysPoll()
yield pm

View File

@ -0,0 +1,79 @@
# Copyright 2015 Cloudbase Solutions.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from eventlet.green import subprocess
from eventlet import greenthread
from oslo_log import log as logging
from neutron.common import utils
LOG = logging.getLogger(__name__)
def create_process(cmd, addl_env=None):
cmd = map(str, cmd)
LOG.debug("Running command: %s", cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
obj = utils.subprocess_popen(cmd, shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=None,
close_fds=False)
return obj, cmd
def execute(cmd, process_input=None, addl_env=None,
check_exit_code=True, return_stderr=False, log_fail_as_error=True,
extra_ok_codes=None, run_as_root=False):
try:
obj, cmd = create_process(cmd, addl_env=addl_env)
_stdout, _stderr = obj.communicate(process_input)
obj.stdin.close()
m = _("\nCommand: %(cmd)s\nExit code: %(code)s\nStdin: %(stdin)s\n"
"Stdout: %(stdout)s\nStderr: %(stderr)s") % \
{'cmd': cmd,
'code': obj.returncode,
'stdin': process_input or '',
'stdout': _stdout,
'stderr': _stderr}
extra_ok_codes = extra_ok_codes or []
if obj.returncode and obj.returncode in extra_ok_codes:
obj.returncode = None
if obj.returncode and log_fail_as_error:
LOG.error(m)
else:
LOG.debug(m)
if obj.returncode and check_exit_code:
raise RuntimeError(m)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
return (_stdout, _stderr) if return_stderr else _stdout

View File

@ -179,10 +179,11 @@ def _subprocess_setup():
def subprocess_popen(args, stdin=None, stdout=None, stderr=None, shell=False, def subprocess_popen(args, stdin=None, stdout=None, stderr=None, shell=False,
env=None): env=None, preexec_fn=_subprocess_setup, close_fds=True):
return subprocess.Popen(args, shell=shell, stdin=stdin, stdout=stdout, return subprocess.Popen(args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, preexec_fn=_subprocess_setup, stderr=stderr, preexec_fn=preexec_fn,
close_fds=True, env=env) close_fds=close_fds, env=env)
def parse_mappings(mapping_list, unique_values=True): def parse_mappings(mapping_list, unique_values=True):

View File

@ -27,10 +27,10 @@ from six import moves
from neutron.agent.common import config from neutron.agent.common import config
from neutron.agent.common import ovs_lib from neutron.agent.common import ovs_lib
from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent import l2population_rpc from neutron.agent import l2population_rpc
from neutron.agent.linux import ip_lib from neutron.agent.linux import ip_lib
from neutron.agent.linux import polling
from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc from neutron.api.rpc.handlers import dvr_rpc
@ -1410,7 +1410,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def rpc_loop(self, polling_manager=None): def rpc_loop(self, polling_manager=None):
if not polling_manager: if not polling_manager:
polling_manager = polling.AlwaysPoll() polling_manager = polling.get_polling_manager(
minimize_polling=False)
sync = True sync = True
ports = set() ports = set()

View File

@ -18,7 +18,7 @@ from oslo_serialization import jsonutils
import testtools import testtools
from neutron.agent.common import ovs_lib from neutron.agent.common import ovs_lib
from neutron.agent.linux import utils from neutron.agent.common import utils
from neutron.common import exceptions from neutron.common import exceptions
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants from neutron.plugins.common import constants

View File

@ -0,0 +1,69 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.agent.common import base_polling as polling
from neutron.tests import base
class TestBasePollingManager(base.BaseTestCase):
def setUp(self):
super(TestBasePollingManager, self).setUp()
self.pm = polling.BasePollingManager()
def test__is_polling_required_should_not_be_implemented(self):
self.assertRaises(NotImplementedError, self.pm._is_polling_required)
def test_force_polling_sets_interval_attribute(self):
self.assertFalse(self.pm._force_polling)
self.pm.force_polling()
self.assertTrue(self.pm._force_polling)
def test_polling_completed_sets_interval_attribute(self):
self.pm._polling_completed = False
self.pm.polling_completed()
self.assertTrue(self.pm._polling_completed)
def mock_is_polling_required(self, return_value):
return mock.patch.object(self.pm, '_is_polling_required',
return_value=return_value)
def test_is_polling_required_returns_true_when_forced(self):
with self.mock_is_polling_required(False):
self.pm.force_polling()
self.assertTrue(self.pm.is_polling_required)
self.assertFalse(self.pm._force_polling)
def test_is_polling_required_returns_true_when_polling_not_completed(self):
with self.mock_is_polling_required(False):
self.pm._polling_completed = False
self.assertTrue(self.pm.is_polling_required)
def test_is_polling_required_returns_true_when_updates_are_present(self):
with self.mock_is_polling_required(True):
self.assertTrue(self.pm.is_polling_required)
self.assertFalse(self.pm._polling_completed)
def test_is_polling_required_returns_false_for_no_updates(self):
with self.mock_is_polling_required(False):
self.assertFalse(self.pm.is_polling_required)
class TestAlwaysPoll(base.BaseTestCase):
def test_is_polling_required_always_returns_true(self):
pm = polling.AlwaysPoll()
self.assertTrue(pm.is_polling_required)

View File

@ -14,6 +14,7 @@
import mock import mock
from neutron.agent.common import base_polling
from neutron.agent.linux import polling from neutron.agent.linux import polling
from neutron.tests import base from neutron.tests import base
@ -22,7 +23,7 @@ class TestGetPollingManager(base.BaseTestCase):
def test_return_always_poll_by_default(self): def test_return_always_poll_by_default(self):
with polling.get_polling_manager() as pm: with polling.get_polling_manager() as pm:
self.assertEqual(pm.__class__, polling.AlwaysPoll) self.assertEqual(pm.__class__, base_polling.AlwaysPoll)
def test_manage_polling_minimizer(self): def test_manage_polling_minimizer(self):
mock_target = 'neutron.agent.linux.polling.InterfacePollingMinimizer' mock_target = 'neutron.agent.linux.polling.InterfacePollingMinimizer'
@ -35,57 +36,6 @@ class TestGetPollingManager(base.BaseTestCase):
mock_start.assert_has_calls(mock.call()) mock_start.assert_has_calls(mock.call())
class TestBasePollingManager(base.BaseTestCase):
def setUp(self):
super(TestBasePollingManager, self).setUp()
self.pm = polling.BasePollingManager()
def test__is_polling_required_should_not_be_implemented(self):
self.assertRaises(NotImplementedError, self.pm._is_polling_required)
def test_force_polling_sets_interval_attribute(self):
self.assertFalse(self.pm._force_polling)
self.pm.force_polling()
self.assertTrue(self.pm._force_polling)
def test_polling_completed_sets_interval_attribute(self):
self.pm._polling_completed = False
self.pm.polling_completed()
self.assertTrue(self.pm._polling_completed)
def mock_is_polling_required(self, return_value):
return mock.patch.object(self.pm, '_is_polling_required',
return_value=return_value)
def test_is_polling_required_returns_true_when_forced(self):
with self.mock_is_polling_required(False):
self.pm.force_polling()
self.assertTrue(self.pm.is_polling_required)
self.assertFalse(self.pm._force_polling)
def test_is_polling_required_returns_true_when_polling_not_completed(self):
with self.mock_is_polling_required(False):
self.pm._polling_completed = False
self.assertTrue(self.pm.is_polling_required)
def test_is_polling_required_returns_true_when_updates_are_present(self):
with self.mock_is_polling_required(True):
self.assertTrue(self.pm.is_polling_required)
self.assertFalse(self.pm._polling_completed)
def test_is_polling_required_returns_false_for_no_updates(self):
with self.mock_is_polling_required(False):
self.assertFalse(self.pm.is_polling_required)
class TestAlwaysPoll(base.BaseTestCase):
def test_is_polling_required_always_returns_true(self):
pm = polling.AlwaysPoll()
self.assertTrue(pm.is_polling_required)
class TestInterfacePollingMinimizer(base.BaseTestCase): class TestInterfacePollingMinimizer(base.BaseTestCase):
def setUp(self): def setUp(self):

View File

@ -18,7 +18,7 @@ import mock
from oslo_config import cfg from oslo_config import cfg
import oslo_messaging import oslo_messaging
from neutron.agent.linux import utils from neutron.agent.common import utils
from neutron.common import constants as n_const from neutron.common import constants as n_const
from neutron.plugins.common import constants as p_const from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.agent import ovs_neutron_agent from neutron.plugins.openvswitch.agent import ovs_neutron_agent

View File

@ -23,9 +23,9 @@ from oslo_log import log
import testtools import testtools
from neutron.agent.common import ovs_lib from neutron.agent.common import ovs_lib
from neutron.agent.common import utils
from neutron.agent.linux import async_process from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const from neutron.common import constants as n_const
from neutron.plugins.common import constants as p_const from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
@ -925,7 +925,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
def test_daemon_loop_uses_polling_manager(self): def test_daemon_loop_uses_polling_manager(self):
with mock.patch( with mock.patch(
'neutron.agent.linux.polling.get_polling_manager') as mock_get_pm: 'neutron.agent.common.polling.get_polling_manager') as mock_get_pm:
with mock.patch.object(self.agent, 'rpc_loop') as mock_loop: with mock.patch.object(self.agent, 'rpc_loop') as mock_loop:
self.agent.daemon_loop() self.agent.daemon_loop()
mock_get_pm.assert_called_with(True, mock_get_pm.assert_called_with(True,

View File

@ -133,7 +133,7 @@ class TunnelTest(base.BaseTestCase):
self.TUN_BRIDGE, self.TUN_BRIDGE,
self.MAP_TUN_BRIDGE] self.MAP_TUN_BRIDGE]
self.execute = mock.patch('neutron.agent.linux.utils.execute').start() self.execute = mock.patch('neutron.agent.common.utils.execute').start()
self._define_expected_calls() self._define_expected_calls()

View File

@ -1882,7 +1882,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
get_conf_file_name = 'neutron.agent.linux.utils.get_conf_file_name' get_conf_file_name = 'neutron.agent.linux.utils.get_conf_file_name'
get_pid_file_name = ('neutron.agent.linux.external_process.' get_pid_file_name = ('neutron.agent.linux.external_process.'
'ProcessManager.get_pid_file_name') 'ProcessManager.get_pid_file_name')
utils_execute = 'neutron.agent.linux.utils.execute' utils_execute = 'neutron.agent.common.utils.execute'
mock.patch(get_conf_file_name).start().return_value = conffile mock.patch(get_conf_file_name).start().return_value = conffile
mock.patch(get_pid_file_name).start().return_value = pidfile mock.patch(get_pid_file_name).start().return_value = pidfile

View File

@ -616,7 +616,7 @@ class TestBase(base.BaseTestCase):
self.conf.set_override('state_path', '') self.conf.set_override('state_path', '')
self.replace_p = mock.patch('neutron.agent.linux.utils.replace_file') self.replace_p = mock.patch('neutron.agent.linux.utils.replace_file')
self.execute_p = mock.patch('neutron.agent.linux.utils.execute') self.execute_p = mock.patch('neutron.agent.common.utils.execute')
self.safe = self.replace_p.start() self.safe = self.replace_p.start()
self.execute = self.execute_p.start() self.execute = self.execute_p.start()

View File

@ -23,7 +23,7 @@ from neutron.tests import base
class TestProcessManager(base.BaseTestCase): class TestProcessManager(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestProcessManager, self).setUp() super(TestProcessManager, self).setUp()
self.execute_p = mock.patch('neutron.agent.linux.utils.execute') self.execute_p = mock.patch('neutron.agent.common.utils.execute')
self.execute = self.execute_p.start() self.execute = self.execute_p.start()
self.delete_if_exists = mock.patch( self.delete_if_exists = mock.patch(
'neutron.openstack.common.fileutils.delete_if_exists').start() 'neutron.openstack.common.fileutils.delete_if_exists').start()

View File

@ -16,8 +16,8 @@
import mock import mock
import netaddr import netaddr
from neutron.agent.common import utils # noqa
from neutron.agent.linux import ip_lib from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils # noqa
from neutron.common import exceptions from neutron.common import exceptions
from neutron.tests import base from neutron.tests import base
@ -181,7 +181,7 @@ RULE_V6_SAMPLE = ("""
class TestSubProcessBase(base.BaseTestCase): class TestSubProcessBase(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestSubProcessBase, self).setUp() super(TestSubProcessBase, self).setUp()
self.execute_p = mock.patch('neutron.agent.linux.utils.execute') self.execute_p = mock.patch('neutron.agent.common.utils.execute')
self.execute = self.execute_p.start() self.execute = self.execute_p.start()
def test_execute_wrapper(self): def test_execute_wrapper(self):
@ -243,7 +243,7 @@ class TestIpWrapper(base.BaseTestCase):
mocked_islink.assert_called_once_with('/sys/class/net/lo') mocked_islink.assert_called_once_with('/sys/class/net/lo')
self.assertEqual(retval, [ip_lib.IPDevice('lo')]) self.assertEqual(retval, [ip_lib.IPDevice('lo')])
@mock.patch('neutron.agent.linux.utils.execute') @mock.patch('neutron.agent.common.utils.execute')
def test_get_devices_namespaces(self, mocked_execute): def test_get_devices_namespaces(self, mocked_execute):
fake_str = mock.Mock() fake_str = mock.Mock()
fake_str.split.return_value = ['lo'] fake_str.split.return_value = ['lo']
@ -309,7 +309,7 @@ class TestIpWrapper(base.BaseTestCase):
with mock.patch.object(ip_lib, 'IPDevice') as ip_dev: with mock.patch.object(ip_lib, 'IPDevice') as ip_dev:
ip = ip_lib.IPWrapper() ip = ip_lib.IPWrapper()
with mock.patch.object(ip.netns, 'exists') as ns_exists: with mock.patch.object(ip.netns, 'exists') as ns_exists:
with mock.patch('neutron.agent.linux.utils.execute'): with mock.patch('neutron.agent.common.utils.execute'):
ns_exists.return_value = False ns_exists.return_value = False
ip.ensure_namespace('ns') ip.ensure_namespace('ns')
self.execute.assert_has_calls( self.execute.assert_has_calls(
@ -860,7 +860,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
self.netns_cmd = ip_lib.IpNetnsCommand(self.parent) self.netns_cmd = ip_lib.IpNetnsCommand(self.parent)
def test_add_namespace(self): def test_add_namespace(self):
with mock.patch('neutron.agent.linux.utils.execute') as execute: with mock.patch('neutron.agent.common.utils.execute') as execute:
ns = self.netns_cmd.add('ns') ns = self.netns_cmd.add('ns')
self._assert_sudo([], ('add', 'ns'), use_root_namespace=True) self._assert_sudo([], ('add', 'ns'), use_root_namespace=True)
self.assertEqual(ns.namespace, 'ns') self.assertEqual(ns.namespace, 'ns')
@ -870,7 +870,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
run_as_root=True, check_exit_code=True, extra_ok_codes=None) run_as_root=True, check_exit_code=True, extra_ok_codes=None)
def test_delete_namespace(self): def test_delete_namespace(self):
with mock.patch('neutron.agent.linux.utils.execute'): with mock.patch('neutron.agent.common.utils.execute'):
self.netns_cmd.delete('ns') self.netns_cmd.delete('ns')
self._assert_sudo([], ('delete', 'ns'), use_root_namespace=True) self._assert_sudo([], ('delete', 'ns'), use_root_namespace=True)
@ -879,7 +879,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
retval = '\n'.join(NETNS_SAMPLE) retval = '\n'.join(NETNS_SAMPLE)
# need another instance to avoid mocking # need another instance to avoid mocking
netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase()) netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase())
with mock.patch('neutron.agent.linux.utils.execute') as execute: with mock.patch('neutron.agent.common.utils.execute') as execute:
execute.return_value = retval execute.return_value = retval
self.assertTrue( self.assertTrue(
netns_cmd.exists('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb')) netns_cmd.exists('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'))
@ -892,7 +892,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
retval = '\n'.join(NETNS_SAMPLE) retval = '\n'.join(NETNS_SAMPLE)
# need another instance to avoid mocking # need another instance to avoid mocking
netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase()) netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase())
with mock.patch('neutron.agent.linux.utils.execute') as execute: with mock.patch('neutron.agent.common.utils.execute') as execute:
execute.return_value = retval execute.return_value = retval
self.assertFalse( self.assertFalse(
netns_cmd.exists('bbbbbbbb-1111-2222-3333-bbbbbbbbbbbb')) netns_cmd.exists('bbbbbbbb-1111-2222-3333-bbbbbbbbbbbb'))
@ -902,7 +902,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
def test_execute(self): def test_execute(self):
self.parent.namespace = 'ns' self.parent.namespace = 'ns'
with mock.patch('neutron.agent.linux.utils.execute') as execute: with mock.patch('neutron.agent.common.utils.execute') as execute:
self.netns_cmd.execute(['ip', 'link', 'list']) self.netns_cmd.execute(['ip', 'link', 'list'])
execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns', 'ip', execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns', 'ip',
'link', 'list'], 'link', 'list'],
@ -912,7 +912,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
def test_execute_env_var_prepend(self): def test_execute_env_var_prepend(self):
self.parent.namespace = 'ns' self.parent.namespace = 'ns'
with mock.patch('neutron.agent.linux.utils.execute') as execute: with mock.patch('neutron.agent.common.utils.execute') as execute:
env = dict(FOO=1, BAR=2) env = dict(FOO=1, BAR=2)
self.netns_cmd.execute(['ip', 'link', 'list'], env) self.netns_cmd.execute(['ip', 'link', 'list'], env)
execute.assert_called_once_with( execute.assert_called_once_with(
@ -922,7 +922,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
run_as_root=True, check_exit_code=True, extra_ok_codes=None) run_as_root=True, check_exit_code=True, extra_ok_codes=None)
def test_execute_nosudo_with_no_namespace(self): def test_execute_nosudo_with_no_namespace(self):
with mock.patch('neutron.agent.linux.utils.execute') as execute: with mock.patch('neutron.agent.common.utils.execute') as execute:
self.parent.namespace = None self.parent.namespace = None
self.netns_cmd.execute(['test']) self.netns_cmd.execute(['test'])
execute.assert_called_once_with(['test'], execute.assert_called_once_with(['test'],