Add Process class helper to manage processes with namespace

This class extends Popen class with usage of namespace and root-helper.
Because of usage of root wrapper, this class re-uses get_child_pid() for
killing the child process. get_child_pid() is taken out of AsyncProcess
as a part of this patch.

Change-Id: I856b3ec75f347ecccaf4a1c6fd17b28a33ee1a3f
Related-Bug: 1243216
This commit is contained in:
Jakub Libosvar 2014-10-03 14:02:55 +02:00
parent 98c53d5b37
commit 374a612241
9 changed files with 236 additions and 75 deletions

View File

@ -116,7 +116,8 @@ class AsyncProcess(object):
# Halt the greenthreads
self._kill_event.send()
pid = self._get_pid_to_kill()
pid = utils.get_root_helper_child_pid(
self._process.pid, self.root_helper)
if pid:
self._kill_process(pid)
@ -125,33 +126,6 @@ class AsyncProcess(object):
# explicitly started again.
self._kill_event = None
def _get_pid_to_kill(self):
pid = self._process.pid
# If root helper was used, two or more processes will be created:
#
# - a root helper process (e.g. sudo myscript)
# - possibly a rootwrap script (e.g. neutron-rootwrap)
# - a child process (e.g. myscript)
#
# Killing the root helper process will leave the child process
# running, re-parented to init, so the only way to ensure that both
# die is to target the child process directly.
if self.root_helper:
try:
pid = utils.find_child_pids(pid)[0]
except IndexError:
# Process is already dead
return None
while True:
try:
# We shouldn't have more than one child per process
# so keep getting the children of the first one
pid = utils.find_child_pids(pid)[0]
except IndexError:
# Last process in the tree, return it
break
return pid
def _kill_process(self, pid):
try:
# A process started by a root helper will be running as

View File

@ -120,12 +120,13 @@ def find_child_pids(pid):
"""Retrieve a list of the pids of child processes of the given pid."""
try:
raw_pids = execute(['ps', '--ppid', pid, '-o', 'pid='])
raw_pids = execute(['ps', '--ppid', pid, '-o', 'pid='],
log_fail_as_error=False)
except RuntimeError as e:
# Unexpected errors are the responsibility of the caller
with excutils.save_and_reraise_exception() as ctxt:
# Exception has already been logged by execute
no_children_found = 'Exit code: 1' in str(e)
no_children_found = 'Exit code: 1' in e.message
if no_children_found:
ctxt.reraise = False
return []
@ -176,3 +177,35 @@ def remove_conf_file(cfg_root, uuid, cfg_file):
conf_file = get_conf_file_name(cfg_root, uuid, cfg_file)
if os.path.exists(conf_file):
os.unlink(conf_file)
def get_root_helper_child_pid(pid, root_helper=None):
"""
Get the lowest child pid in the process hierarchy
If root helper was used, two or more processes would be created:
- a root helper process (e.g. sudo myscript)
- possibly a rootwrap script (e.g. neutron-rootwrap)
- a child process (e.g. myscript)
Killing the root helper process will leave the child process
running, re-parented to init, so the only way to ensure that both
die is to target the child process directly.
"""
pid = str(pid)
if root_helper:
try:
pid = find_child_pids(pid)[0]
except IndexError:
# Process is already dead
return None
while True:
try:
# We shouldn't have more than one child per process
# so keep getting the children of the first one
pid = find_child_pids(pid)[0]
except IndexError:
# Last process in the tree, return it
break
return pid

View File

@ -16,3 +16,5 @@ ping_filter: CommandFilter, ping, root
# enable curl from namespace
curl_filter: CommandFilter, curl, root
tee_filter: CommandFilter, tee, root
tee_kill: KillFilter, root, tee, -9

View File

@ -13,6 +13,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import functools
import os
import select
import shlex
import subprocess
from neutron.agent.linux import utils
CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20)
CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5)
READ_TIMEOUT = os.environ.get('OS_TEST_READ_TIMEOUT', 5)
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
@ -31,6 +42,47 @@ def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
eventlet.sleep(sleep)
def remove_abs_path(cmd):
"""Remove absolute path of executable in cmd
Note: New instance of list is returned
:param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
"""
if cmd and os.path.isabs(cmd[0]):
cmd = list(cmd)
cmd[0] = os.path.basename(cmd[0])
return cmd
def get_cmdline_from_pid(pid):
if pid is None or not os.path.exists('/proc/%s' % pid):
return list()
with open('/proc/%s/cmdline' % pid, 'r') as f:
return f.readline().split('\0')[:-1]
def cmdlines_are_equal(cmd1, cmd2):
"""Validate provided lists containing output of /proc/cmdline are equal
This function ignores absolute paths of executables in order to have
correct results in case one list uses absolute path and the other does not.
"""
cmd1 = remove_abs_path(cmd1)
cmd2 = remove_abs_path(cmd2)
return cmd1 == cmd2
def pid_invoked_with_cmdline(pid, expected_cmd):
"""Validate process with given pid is running with provided parameters
"""
cmdline = get_cmdline_from_pid(pid)
return cmdlines_are_equal(expected_cmd, cmdline)
class Pinger(object):
def __init__(self, testcase, timeout=1, max_attempts=1):
self.testcase = testcase
@ -57,3 +109,59 @@ class Pinger(object):
{'src_ns': src_ns.namespace, 'dst_ip': dst_ip})
except RuntimeError:
pass
class RootHelperProcess(subprocess.Popen):
def __init__(self, cmd, *args, **kwargs):
for arg in ('stdin', 'stdout', 'stderr'):
kwargs.setdefault(arg, subprocess.PIPE)
self.namespace = kwargs.pop('namespace', None)
self.root_helper = kwargs.pop('root_helper', None)
self.cmd = cmd
if self.namespace is not None:
cmd = ['ip', 'netns', 'exec', self.namespace] + cmd
if self.root_helper is not None:
cmd = shlex.split(self.root_helper) + cmd
self.child_pid = None
super(RootHelperProcess, self).__init__(cmd, *args, **kwargs)
if self.root_helper:
self._wait_for_child_process()
def kill(self):
pid = self.child_pid or str(self.pid)
utils.execute(['kill', '-9', pid],
root_helper=self.root_helper)
def read_stdout(self, timeout=None):
return self._read_stream(self.stdout, timeout)
@staticmethod
def _read_stream(stream, timeout):
if timeout:
poller = select.poll()
poller.register(stream.fileno())
poll_predicate = functools.partial(poller.poll, 1)
wait_until_true(poll_predicate, timeout, 0.1,
RuntimeError(
'No output in %.2f seconds' % timeout))
return stream.readline()
def writeline(self, data):
self.stdin.write(data + os.linesep)
self.stdin.flush()
def _wait_for_child_process(self, timeout=CHILD_PROCESS_TIMEOUT,
sleep=CHILD_PROCESS_SLEEP):
def child_is_running():
child_pid = utils.get_root_helper_child_pid(
self.pid, root_helper=self.root_helper)
if pid_invoked_with_cmdline(child_pid, self.cmd):
return True
wait_until_true(
child_is_running,
timeout,
exception=RuntimeError("Process %s hasn't been spawned "
"in %d seconds" % (self.cmd, timeout)))
self.child_pid = utils.get_root_helper_child_pid(
self.pid, root_helper=self.root_helper)

View File

@ -18,6 +18,7 @@ import fixtures
from six import moves
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests import base
@ -61,7 +62,8 @@ class TestAsyncProcess(base.BaseTestCase):
# Ensure that the same output is read twice
self._check_stdout(proc)
pid = proc._get_pid_to_kill()
pid = utils.get_root_helper_child_pid(proc._process.pid,
proc.root_helper)
proc._kill_process(pid)
self._check_stdout(proc)
proc.stop()

View File

@ -0,0 +1,35 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
class TestRootHelperProcess(base.BaseLinuxTestCase):
def test_process_read_write(self):
proc = helpers.RootHelperProcess(['tee'], root_helper=self.root_helper)
proc.writeline('foo')
output = proc.read_stdout(helpers.READ_TIMEOUT)
self.assertEqual('foo\n', output)
def test_process_kill(self):
with self.assert_max_execution_time(100):
proc = helpers.RootHelperProcess(
['tee'], root_helper=self.root_helper)
proc.kill()
proc.wait()
# sudo returns 137 and
# rootwrap returns 247 (bug 1364822)
self.assertIn(proc.returncode, [137, 247])

View File

@ -25,6 +25,7 @@ Tests in this module will be skipped unless:
import eventlet
from neutron.agent.linux import ovsdb_monitor
from neutron.agent.linux import utils
from neutron.tests.functional.agent.linux import base as linux_base
from neutron.tests.functional import base as functional_base
@ -77,7 +78,7 @@ class TestOvsdbMonitor(BaseMonitorTest):
self.monitor.respawn_interval = 0
old_pid = self.monitor._process.pid
output1 = self.collect_initial_output()
pid = self.monitor._get_pid_to_kill()
pid = utils.get_root_helper_child_pid(old_pid, self.root_helper)
self.monitor._kill_process(pid)
self.monitor._reset_queues()
while (self.monitor._process.pid == old_pid):

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import eventlet.event
import eventlet.queue
import eventlet.timeout
@ -23,9 +25,6 @@ from neutron.agent.linux import utils
from neutron.tests import base
_marker = ()
class TestAsyncProcess(base.BaseTestCase):
def setUp(self):
@ -156,12 +155,17 @@ class TestAsyncProcess(base.BaseTestCase):
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
def _test__kill(self, respawning, pid=None):
with mock.patch.object(self.proc, '_kill_event') as mock_kill_event:
with mock.patch.object(self.proc, '_get_pid_to_kill',
return_value=pid):
with mock.patch.object(self.proc,
'_kill_process') as mock_kill_process:
self.proc._kill(respawning)
with contextlib.nested(
mock.patch.object(self.proc, '_kill_event'),
mock.patch.object(utils, 'get_root_helper_child_pid',
return_value=pid),
mock.patch.object(self.proc, '_kill_process'),
mock.patch.object(self.proc, '_process')) as (
mock_kill_event,
mock_get_child_pid,
mock_kill_process,
mock_process):
self.proc._kill(respawning)
if respawning:
self.assertIsNotNone(self.proc._kill_event)
@ -181,40 +185,6 @@ class TestAsyncProcess(base.BaseTestCase):
def test__kill_targets_process_for_pid(self):
self._test__kill(False, pid='1')
def _test__get_pid_to_kill(self, expected=_marker,
root_helper=None, pids=None):
def _find_child_pids(x):
if not pids:
return []
pids.pop(0)
return pids
if root_helper:
self.proc.root_helper = root_helper
with mock.patch.object(self.proc, '_process') as mock_process:
with mock.patch.object(mock_process, 'pid') as mock_pid:
with mock.patch.object(utils, 'find_child_pids',
side_effect=_find_child_pids):
actual = self.proc._get_pid_to_kill()
if expected is _marker:
expected = mock_pid
self.assertEqual(expected, actual)
def test__get_pid_to_kill_returns_process_pid_without_root_helper(self):
self._test__get_pid_to_kill()
def test__get_pid_to_kill_returns_child_pid_with_root_helper(self):
self._test__get_pid_to_kill(expected='2', pids=['1', '2'],
root_helper='a')
def test__get_pid_to_kill_returns_last_child_pid_with_root_Helper(self):
self._test__get_pid_to_kill(expected='3', pids=['1', '2', '3'],
root_helper='a')
def test__get_pid_to_kill_returns_none_with_root_helper(self):
self._test__get_pid_to_kill(expected=None, root_helper='a')
def _test__kill_process(self, pid, expected, exception_message=None):
self.proc.root_helper = 'foo'
if exception_message:

View File

@ -20,6 +20,9 @@ from neutron.agent.linux import utils
from neutron.tests import base
_marker = object()
class FakeCreateProcess(object):
class FakeStdin(object):
def close(self):
@ -168,3 +171,36 @@ class TestFindChildPids(base.BaseTestCase):
with mock.patch.object(utils, 'execute',
side_effect=RuntimeError()):
utils.find_child_pids(-1)
class TestGetRoothelperChildPid(base.BaseTestCase):
def _test_get_root_helper_child_pid(self, expected=_marker,
root_helper=None, pids=None):
def _find_child_pids(x):
if not pids:
return []
pids.pop(0)
return pids
mock_pid = object()
with mock.patch.object(utils, 'find_child_pids',
side_effect=_find_child_pids):
actual = utils.get_root_helper_child_pid(mock_pid, root_helper)
if expected is _marker:
expected = str(mock_pid)
self.assertEqual(expected, actual)
def test_returns_process_pid_without_root_helper(self):
self._test_get_root_helper_child_pid()
def test_returns_child_pid_with_root_helper(self):
self._test_get_root_helper_child_pid(expected='2', pids=['1', '2'],
root_helper='a')
def test_returns_last_child_pid_with_root_helper(self):
self._test_get_root_helper_child_pid(expected='3',
pids=['1', '2', '3'],
root_helper='a')
def test_returns_none_with_root_helper(self):
self._test_get_root_helper_child_pid(expected=None, root_helper='a')