From 2e9f5a7a3156ad91c901099f7577179f0c588a9c Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Mon, 9 Sep 2013 01:29:54 -0700 Subject: [PATCH] Add support for managing async processes Interacting with a long-running asynchronous process requires the use of non-blocking io. This change adds a helper class that can launch a long-running process and read stdout and stderr in a non-blocking fashion via eventlet. This functionality is intended to support monitoring ovsdb via a long-running and root-privileged invocation of ovsdb-client. The complexity of the system interaction in this patch suggested the addition of a functional test that validated actual behaviour. The test was added under the neutron/tests/functional path which is now included in the testr search path. Partial-Bug: #1177973 Change-Id: I9969e556acecf7a9e77d873371cc2ec2647be011 (cherry picked from commit acf0209b28e21eed60158967fab77468eb195e7c) --- .testr.conf | 2 +- TESTING | 26 +- neutron/agent/linux/async_process.py | 214 ++++++++++++++++ neutron/agent/linux/utils.py | 41 ++- neutron/tests/functional/__init__.py | 15 ++ neutron/tests/functional/agent/__init__.py | 15 ++ .../tests/functional/agent/linux/__init__.py | 15 ++ .../agent/linux/test_async_process.py | 79 ++++++ neutron/tests/unit/agent/__init__.py | 15 ++ neutron/tests/unit/agent/linux/__init__.py | 15 ++ .../unit/agent/linux/test_async_process.py | 239 ++++++++++++++++++ neutron/tests/unit/test_agent_linux_utils.py | 23 ++ 12 files changed, 681 insertions(+), 18 deletions(-) create mode 100644 neutron/agent/linux/async_process.py create mode 100644 neutron/tests/functional/__init__.py create mode 100644 neutron/tests/functional/agent/__init__.py create mode 100644 neutron/tests/functional/agent/linux/__init__.py create mode 100644 neutron/tests/functional/agent/linux/test_async_process.py create mode 100644 neutron/tests/unit/agent/__init__.py create mode 100644 neutron/tests/unit/agent/linux/__init__.py create mode 100644 neutron/tests/unit/agent/linux/test_async_process.py diff --git a/.testr.conf b/.testr.conf index 01d160ec50c..b63d965523c 100644 --- a/.testr.conf +++ b/.testr.conf @@ -1,4 +1,4 @@ [DEFAULT] -test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests/unit $LISTOPT $IDOPTION +test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list diff --git a/TESTING b/TESTING index 8162b2fd662..cd5ae4dab15 100644 --- a/TESTING +++ b/TESTING @@ -8,12 +8,18 @@ Overview the various pieces of the neutron tree to make sure any new changes don't break existing functionality. + The functional tests are intended to validate actual system + interaction. Mocks should be used sparingly, if at all. Care + should be taken to ensure that existing system resources are not + modified and that resources created in tests are properly cleaned + up. + Running tests There are two mechanisms for running tests: run_tests.sh and tox. - Before submitting a patch for review you should always ensure all unit - test pass; a tox run is triggered by the jenkins gate executed on gerrit - for each patch pushed for review. + Before submitting a patch for review you should always ensure all + test pass; a tox run is triggered by the jenkins gate executed on + gerrit for each patch pushed for review. With both mechanisms you can either run the tests in the standard environment or create a virtual environment to run them in. @@ -41,18 +47,18 @@ Running individual tests Adding more tests Neutron has a fast growing code base and there is plenty of areas that - need to be covered by unit tests. + need to be covered by unit and functional tests. - To get a grasp of the areas where unit tests are needed, you can check + To get a grasp of the areas where tests are needed, you can check current coverage by running: $ ./run_tests.sh -c Development process - It is expected that any new changes that are proposed for merge come with - unit tests for that feature or code area. Ideally any bugs fixes that are - submitted also have unit tests to prove that they stay fixed! - In addition, before proposing for merge, all of the current unit tests - should be passing. + It is expected that any new changes that are proposed for merge + come with tests for that feature or code area. Ideally any bugs + fixes that are submitted also have tests to prove that they stay + fixed! In addition, before proposing for merge, all of the + current tests should be passing. diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py new file mode 100644 index 00000000000..aa41c9cc0e8 --- /dev/null +++ b/neutron/agent/linux/async_process.py @@ -0,0 +1,214 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import eventlet.event +import eventlet.queue +import eventlet.timeout + +from neutron.agent.linux import utils +from neutron.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class AsyncProcessException(Exception): + pass + + +class AsyncProcess(object): + """Manages an asynchronous process. + + This class spawns a new process via subprocess and uses + greenthreads to read stderr and stdout asynchronously into queues + that can be read via repeatedly calling iter_stdout() and + iter_stderr(). + + If respawn_interval is non-zero, any error in communicating with + the managed process will result in the process and greenthreads + being cleaned up and the process restarted after the specified + interval. + + Example usage: + + >>> import time + >>> proc = AsyncProcess(['ping']) + >>> proc.start() + >>> time.sleep(5) + >>> proc.stop() + >>> for line in proc.iter_stdout(): + ... print line + """ + + def __init__(self, cmd, root_helper=None, respawn_interval=None): + """Constructor. + + :param cmd: The list of command arguments to invoke. + :param root_helper: Optional, utility to use when running shell cmds. + :param respawn_interval: Optional, the interval in seconds to wait + to respawn after unexpected process death. Respawn will + only be attempted if a value of 0 or greater is provided. + """ + self.cmd = cmd + self.root_helper = root_helper + if respawn_interval is not None and respawn_interval < 0: + raise ValueError(_('respawn_interval must be >= 0 if provided.')) + self.respawn_interval = respawn_interval + self._process = None + self._kill_event = None + self._stdout_lines = eventlet.queue.LightQueue() + self._stderr_lines = eventlet.queue.LightQueue() + self._watchers = [] + + def start(self): + """Launch a process and monitor it asynchronously.""" + if self._kill_event: + raise AsyncProcessException(_('Process is already started')) + else: + LOG.debug(_('Launching async process [%s].'), self.cmd) + self._spawn() + + def stop(self): + """Halt the process and watcher threads.""" + if self._kill_event: + LOG.debug(_('Halting async process [%s].'), self.cmd) + self._kill() + else: + raise AsyncProcessException(_('Process is not running.')) + + def _spawn(self): + """Spawn a process and its watchers.""" + self._kill_event = eventlet.event.Event() + self._process, cmd = utils.create_process(self.cmd, + root_helper=self.root_helper) + self._watchers = [] + for reader in (self._read_stdout, self._read_stderr): + # Pass the stop event directly to the greenthread to + # ensure that assignment of a new event to the instance + # attribute does not prevent the greenthread from using + # the original event. + watcher = eventlet.spawn(self._watch_process, + reader, + self._kill_event) + self._watchers.append(watcher) + + def _kill(self, respawning=False): + """Kill the process and the associated watcher greenthreads. + + :param respawning: Optional, whether respawn will be subsequently + attempted. + """ + # Halt the greenthreads + self._kill_event.send() + + pid = self._get_pid_to_kill() + if pid: + self._kill_process(pid) + + if not respawning: + # Clear the kill event to ensure the process can be + # explicitly started again. + self._kill_event = None + + def _get_pid_to_kill(self): + pid = self._process.pid + # If root helper was used, two processes will be created: + # + # - a root helper process (e.g. sudo myscript) + # - a child process (e.g. myscript) + # + # Killing the root helper process will leave the child process + # as a zombie, so the only way to ensure that both die is to + # target the child process directly. + if self.root_helper: + pids = utils.find_child_pids(pid) + if pids: + # The root helper will only ever launch a single child. + pid = pids[0] + else: + # Process is already dead. + pid = None + return pid + + def _kill_process(self, pid): + try: + # A process started by a root helper will be running as + # root and need to be killed via the same helper. + utils.execute(['kill', '-9', pid], root_helper=self.root_helper) + except Exception as ex: + stale_pid = (isinstance(ex, RuntimeError) and + 'No such process' in str(ex)) + if not stale_pid: + LOG.exception(_('An error occurred while killing [%s].'), + self.cmd) + return False + return True + + def _handle_process_error(self): + """Kill the async process and respawn if necessary.""" + LOG.debug(_('Halting async process [%s] in response to an error.'), + self.cmd) + respawning = self.respawn_interval >= 0 + self._kill(respawning=respawning) + if respawning: + eventlet.sleep(self.respawn_interval) + LOG.debug(_('Respawning async process [%s].'), self.cmd) + self._spawn() + + def _watch_process(self, callback, kill_event): + while not kill_event.ready(): + try: + if not callback(): + break + except Exception: + LOG.exception(_('An error occured while communicating ' + 'with async process [%s].'), self.cmd) + break + # Ensure that watching a process with lots of output does + # not block execution of other greenthreads. + eventlet.sleep() + # The kill event not being ready indicates that the loop was + # broken out of due to an error in the watched process rather + # than the loop condition being satisfied. + if not kill_event.ready(): + self._handle_process_error() + + def _read(self, stream, queue): + data = stream.readline() + if data: + data = data.strip() + queue.put(data) + return data + + def _read_stdout(self): + return self._read(self._process.stdout, self._stdout_lines) + + def _read_stderr(self): + return self._read(self._process.stderr, self._stderr_lines) + + def _iter_queue(self, queue): + while True: + try: + yield queue.get_nowait() + except eventlet.queue.Empty: + break + + def iter_stdout(self): + return self._iter_queue(self._stdout_lines) + + def iter_stderr(self): + return self._iter_queue(self._stderr_lines) diff --git a/neutron/agent/linux/utils.py b/neutron/agent/linux/utils.py index 6e0aae41efb..f292906fd90 100644 --- a/neutron/agent/linux/utils.py +++ b/neutron/agent/linux/utils.py @@ -34,8 +34,12 @@ from neutron.openstack.common import log as logging LOG = logging.getLogger(__name__) -def execute(cmd, root_helper=None, process_input=None, addl_env=None, - check_exit_code=True, return_stderr=False): +def create_process(cmd, root_helper=None, addl_env=None): + """Create a process object for the given command. + + The return value will be a tuple of the process object and the + list of command arguments used to create it. + """ if root_helper: cmd = shlex.split(root_helper) + cmd cmd = map(str, cmd) @@ -44,12 +48,21 @@ def execute(cmd, root_helper=None, process_input=None, addl_env=None, env = os.environ.copy() if addl_env: env.update(addl_env) + + obj = utils.subprocess_popen(cmd, shell=False, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env) + + return obj, cmd + + +def execute(cmd, root_helper=None, process_input=None, addl_env=None, + check_exit_code=True, return_stderr=False): try: - obj = utils.subprocess_popen(cmd, shell=False, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=env) + obj, cmd = create_process(cmd, root_helper=root_helper, + addl_env=addl_env) _stdout, _stderr = (process_input and obj.communicate(process_input) or obj.communicate()) @@ -95,3 +108,17 @@ def replace_file(file_name, data): tmp_file.close() os.chmod(tmp_file.name, 0o644) os.rename(tmp_file.name, file_name) + + +def find_child_pids(pid): + """Retrieve a list of the pids of child processes of the given pid.""" + try: + raw_pids = execute(['ps', '--ppid', pid, '-o', 'pid=']) + except RuntimeError as e: + # Exception has already been logged by execute + no_children_found = 'Exit code: 1' in str(e) + if no_children_found: + return [] + # Unexpected errors are the responsibility of the caller + raise + return [x.strip() for x in raw_pids.split('\n') if x.strip()] diff --git a/neutron/tests/functional/__init__.py b/neutron/tests/functional/__init__.py new file mode 100644 index 00000000000..ac4d6cbf69f --- /dev/null +++ b/neutron/tests/functional/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/neutron/tests/functional/agent/__init__.py b/neutron/tests/functional/agent/__init__.py new file mode 100644 index 00000000000..ac4d6cbf69f --- /dev/null +++ b/neutron/tests/functional/agent/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/neutron/tests/functional/agent/linux/__init__.py b/neutron/tests/functional/agent/linux/__init__.py new file mode 100644 index 00000000000..ac4d6cbf69f --- /dev/null +++ b/neutron/tests/functional/agent/linux/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/neutron/tests/functional/agent/linux/test_async_process.py b/neutron/tests/functional/agent/linux/test_async_process.py new file mode 100644 index 00000000000..f7fa3309381 --- /dev/null +++ b/neutron/tests/functional/agent/linux/test_async_process.py @@ -0,0 +1,79 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib + +import eventlet +import eventlet.timeout +import fixtures + +from neutron.agent.linux import async_process +from neutron.tests import base + + +class TestAsyncProcess(base.BaseTestCase): + + def setUp(self): + super(TestAsyncProcess, self).setUp() + self.test_file_path = self.useFixture( + fixtures.TempDir()).join("test_async_process.tmp") + self.data = [str(x) for x in xrange(4)] + with file(self.test_file_path, 'w') as f: + f.writelines('%s\n' % item for item in self.data) + + def _check_stdout(self, proc): + # Ensure that all the output from the file is read + output = [] + while output != self.data: + new_output = list(proc.iter_stdout()) + if new_output: + output += new_output + eventlet.sleep(0.01) + + @contextlib.contextmanager + def assert_max_execution_time(self, max_execution_time=5): + with eventlet.timeout.Timeout(max_execution_time, False): + yield + return + self.fail('Execution of this test timed out') + + def test_stopping_async_process_lifecycle(self): + with self.assert_max_execution_time(): + proc = async_process.AsyncProcess(['tail', '-f', + self.test_file_path]) + proc.start() + self._check_stdout(proc) + proc.stop() + + # Ensure that the process and greenthreads have stopped + proc._process.wait() + self.assertEqual(proc._process.returncode, -9) + for watcher in proc._watchers: + watcher.wait() + + def test_async_process_respawns(self): + with self.assert_max_execution_time(): + proc = async_process.AsyncProcess(['tail', '-f', + self.test_file_path], + respawn_interval=0) + proc.start() + + # Ensure that the same output is read twice + self._check_stdout(proc) + pid = proc._get_pid_to_kill() + proc._kill_process(pid) + self._check_stdout(proc) + proc.stop() diff --git a/neutron/tests/unit/agent/__init__.py b/neutron/tests/unit/agent/__init__.py new file mode 100644 index 00000000000..ac4d6cbf69f --- /dev/null +++ b/neutron/tests/unit/agent/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/neutron/tests/unit/agent/linux/__init__.py b/neutron/tests/unit/agent/linux/__init__.py new file mode 100644 index 00000000000..ac4d6cbf69f --- /dev/null +++ b/neutron/tests/unit/agent/linux/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/neutron/tests/unit/agent/linux/test_async_process.py b/neutron/tests/unit/agent/linux/test_async_process.py new file mode 100644 index 00000000000..d57a3015ed3 --- /dev/null +++ b/neutron/tests/unit/agent/linux/test_async_process.py @@ -0,0 +1,239 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet.event +import eventlet.queue +import eventlet.timeout +import mock +import testtools + +from neutron.agent.linux import async_process +from neutron.agent.linux import utils +from neutron.tests import base + + +_marker = () + + +class TestAsyncProcess(base.BaseTestCase): + + def setUp(self): + super(TestAsyncProcess, self).setUp() + self.proc = async_process.AsyncProcess(['fake']) + + def test_construtor_raises_exception_for_negative_respawn_interval(self): + with testtools.ExpectedException(ValueError): + async_process.AsyncProcess(['fake'], respawn_interval=-1) + + def test__spawn(self): + expected_process = 'Foo' + proc = self.proc + with mock.patch.object(utils, 'create_process') as mock_create_process: + mock_create_process.return_value = [expected_process, None] + with mock.patch('eventlet.spawn') as mock_spawn: + proc._spawn() + + self.assertIsInstance(proc._kill_event, eventlet.event.Event) + self.assertEqual(proc._process, expected_process) + mock_spawn.assert_has_calls([ + mock.call(proc._watch_process, + proc._read_stdout, + proc._kill_event), + mock.call(proc._watch_process, + proc._read_stderr, + proc._kill_event), + ]) + self.assertEqual(len(proc._watchers), 2) + + def test__handle_process_error_kills_with_respawn(self): + with mock.patch.object(self.proc, '_kill') as kill: + self.proc._handle_process_error() + + kill.assert_has_calls(mock.call(respawning=False)) + + def test__handle_process_error_kills_without_respawn(self): + self.proc.respawn_interval = 1 + with mock.patch.object(self.proc, '_kill') as kill: + with mock.patch.object(self.proc, '_spawn') as spawn: + with mock.patch('eventlet.sleep') as sleep: + self.proc._handle_process_error() + + kill.assert_has_calls(mock.call(respawning=True)) + sleep.assert_has_calls(mock.call(self.proc.respawn_interval)) + spawn.assert_called_once() + + def _test__watch_process(self, callback, kill_event): + self.proc._kill_event = kill_event + # Ensure the test times out eventually if the watcher loops endlessly + with eventlet.timeout.Timeout(5): + with mock.patch.object(self.proc, + '_handle_process_error') as func: + self.proc._watch_process(callback, kill_event) + + if not kill_event.ready(): + func.assert_called_once() + + def test__watch_process_exits_on_callback_failure(self): + self._test__watch_process(lambda: False, eventlet.event.Event()) + + def test__watch_process_exits_on_exception(self): + def foo(): + raise Exception('Error!') + self._test__watch_process(foo, eventlet.event.Event()) + + def test__watch_process_exits_on_sent_kill_event(self): + kill_event = eventlet.event.Event() + kill_event.send() + self._test__watch_process(None, kill_event) + + def _test_read_output_queues_and_returns_result(self, output): + queue = eventlet.queue.LightQueue() + mock_stream = mock.Mock() + with mock.patch.object(mock_stream, 'readline') as mock_readline: + mock_readline.return_value = output + result = self.proc._read(mock_stream, queue) + + if output: + self.assertEqual(output, result) + self.assertEqual(output, queue.get_nowait()) + else: + self.assertFalse(result) + self.assertTrue(queue.empty()) + + def test__read_queues_and_returns_output(self): + self._test_read_output_queues_and_returns_result('foo') + + def test__read_returns_none_for_missing_output(self): + self._test_read_output_queues_and_returns_result('') + + def test_start_raises_exception_if_process_already_started(self): + self.proc._kill_event = True + with testtools.ExpectedException(async_process.AsyncProcessException): + self.proc.start() + + def test_start_invokes__spawn(self): + with mock.patch.object(self.proc, '_spawn') as mock_start: + self.proc.start() + + mock_start.assert_called_once() + + def test__iter_queue_returns_empty_list_for_empty_queue(self): + result = list(self.proc._iter_queue(eventlet.queue.LightQueue())) + self.assertEqual(result, []) + + def test__iter_queue_returns_queued_data(self): + queue = eventlet.queue.LightQueue() + queue.put('foo') + result = list(self.proc._iter_queue(queue)) + self.assertEqual(result, ['foo']) + + def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type): + expected_value = 'foo' + with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue: + mock_iter_queue.return_value = expected_value + target_func = getattr(self.proc, 'iter_%s' % output_type, None) + value = target_func() + + self.assertEqual(value, expected_value) + queue = getattr(self.proc, '_%s_lines' % output_type, None) + mock_iter_queue.assert_called_with(queue) + + def test_iter_stdout(self): + self._test_iter_output_calls_iter_queue_on_output_queue('stdout') + + def test_iter_stderr(self): + self._test_iter_output_calls_iter_queue_on_output_queue('stderr') + + def _test__kill(self, respawning, pid=None): + with mock.patch.object(self.proc, '_kill_event') as mock_kill_event: + with mock.patch.object(self.proc, '_get_pid_to_kill', + return_value=pid): + with mock.patch.object(self.proc, + '_kill_process') as mock_kill_process: + self.proc._kill(respawning) + + if respawning: + self.assertIsNotNone(self.proc._kill_event) + else: + self.assertIsNone(self.proc._kill_event) + + mock_kill_event.send.assert_called_once() + if pid: + mock_kill_process.assert_called_once(pid) + + def test__kill_when_respawning_does_not_clear_kill_event(self): + self._test__kill(True) + + def test__kill_when_not_respawning_clears_kill_event(self): + self._test__kill(False) + + def test__kill_targets_process_for_pid(self): + self._test__kill(False, pid='1') + + def _test__get_pid_to_kill(self, expected=_marker, + root_helper=None, pids=None): + if root_helper: + self.proc.root_helper = root_helper + with mock.patch.object(self.proc, '_process') as mock_process: + with mock.patch.object(mock_process, 'pid') as mock_pid: + with mock.patch.object(utils, 'find_child_pids', + return_value=pids): + actual = self.proc._get_pid_to_kill() + if expected is _marker: + expected = mock_pid + self.assertEqual(expected, actual) + + def test__get_pid_to_kill_returns_process_pid_without_root_helper(self): + self._test__get_pid_to_kill() + + def test__get_pid_to_kill_returns_child_pid_with_root_helper(self): + self._test__get_pid_to_kill(expected='1', pids=['1'], root_helper='a') + + def test__get_pid_to_kill_returns_none_with_root_helper(self): + self._test__get_pid_to_kill(expected=None, root_helper='a') + + def _test__kill_process(self, pid, expected, exception_message=None): + self.proc.root_helper = 'foo' + if exception_message: + exc = RuntimeError(exception_message) + else: + exc = None + with mock.patch.object(utils, 'execute', + side_effect=exc) as mock_execute: + actual = self.proc._kill_process(pid) + + self.assertEqual(expected, actual) + mock_execute.assert_called_with(['kill', '-9', pid], + root_helper=self.proc.root_helper) + + def test__kill_process_returns_true_for_valid_pid(self): + self._test__kill_process('1', True) + + def test__kill_process_returns_true_for_stale_pid(self): + self._test__kill_process('1', True, 'No such process') + + def test__kill_process_returns_false_for_execute_exception(self): + self._test__kill_process('1', False, 'Invalid') + + def test_stop_calls_kill(self): + self.proc._kill_event = True + with mock.patch.object(self.proc, '_kill') as mock_kill: + self.proc.stop() + mock_kill.called_once() + + def test_stop_raises_exception_if_already_started(self): + with testtools.ExpectedException(async_process.AsyncProcessException): + self.proc.stop() diff --git a/neutron/tests/unit/test_agent_linux_utils.py b/neutron/tests/unit/test_agent_linux_utils.py index cccbf2024f6..6b7fbbfd00d 100644 --- a/neutron/tests/unit/test_agent_linux_utils.py +++ b/neutron/tests/unit/test_agent_linux_utils.py @@ -17,6 +17,7 @@ import fixtures import mock +import testtools from neutron.agent.linux import utils from neutron.tests import base @@ -106,3 +107,25 @@ class AgentUtilsReplaceFile(base.BaseTestCase): ntf.assert_has_calls(expected) chmod.assert_called_once_with('/baz', 0o644) rename.assert_called_once_with('/baz', '/foo') + + +class TestFindChildPids(base.BaseTestCase): + + def test_returns_empty_list_for_exit_code_1(self): + with mock.patch.object(utils, 'execute', + side_effect=RuntimeError('Exit code: 1')): + self.assertEqual(utils.find_child_pids(-1), []) + + def test_returns_empty_list_for_no_output(self): + with mock.patch.object(utils, 'execute', return_value=''): + self.assertEqual(utils.find_child_pids(-1), []) + + def test_returns_list_of_child_process_ids_for_good_ouput(self): + with mock.patch.object(utils, 'execute', return_value=' 123 \n 185\n'): + self.assertEqual(utils.find_child_pids(-1), ['123', '185']) + + def test_raises_unknown_exception(self): + with testtools.ExpectedException(RuntimeError): + with mock.patch.object(utils, 'execute', + side_effect=RuntimeError()): + utils.find_child_pids(-1)