Enhance _program() and _program_path()

* _program_path() now uses shutil.which() if available
  (Python 3.3 and newer)
* Convert _program_path() to static method (self is not needed)
* Explain when os.readlink("/proc/pid/exe") result can contain NUL
  byte followed by junk data
* PATH environment variable: use os.defpath if PATH is not set
  (as shutil.which())
* Update test_KillFilter_renamed_exe() for Python 3 using
  shutil.which(): mock also os.path.exists() and os.access()
* test_KillFilter_upgraded_exe(), test_KillFilter_renamed_exe(): use
  @mock.patch() decorator
* Cleanup the code

Change-Id: I91a7a8505380d4aeae7b2c0b3199e25b17b96888
This commit is contained in:
Victor Stinner 2016-08-19 12:21:30 +02:00
parent a46b731c8a
commit 04e2cd00d7
2 changed files with 64 additions and 40 deletions

View File

@ -16,6 +16,7 @@
import os
import pwd
import re
import shutil
def _getuid(user):
@ -156,15 +157,26 @@ class KillFilter(CommandFilter):
def __init__(self, *args):
super(KillFilter, self).__init__("/bin/kill", *args)
def _program_path(self, command):
"""Determine the full path for command"""
@staticmethod
def _program_path(command):
"""Try to determine the full path for command.
Return command if the full path cannot be found.
"""
# shutil.which() was added to Python 3.3
if hasattr(shutil, 'which'):
return shutil.which(command)
if os.path.isabs(command):
return command
else:
for path in os.environ.get('PATH', '').split(os.pathsep):
program = os.path.join(path, command)
if os.path.isfile(program):
return program
path = os.environ.get('PATH', os.defpath).split(os.pathsep)
for dir in path:
program = os.path.join(dir, command)
if os.path.isfile(program):
return program
return command
def _program(self, pid):
@ -177,8 +189,8 @@ class KillFilter(CommandFilter):
return None
# NOTE(yufang521247): /proc/PID/exe may have '\0' on the
# end, because python doesn't stop at '\0' when read the
# target path.
# end (ex: if an executable is updated or deleted), because python
# doesn't stop at '\0' when read the target path.
command = command.partition('\0')[0]
# NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
@ -186,23 +198,26 @@ class KillFilter(CommandFilter):
if command.endswith(" (deleted)"):
command = command[:-len(" (deleted)")]
if os.path.isfile(command):
return command
# /proc/PID/exe may have been renamed with
# a ';......' or '.#prelink#......' suffix etc.
# So defer to /proc/PID/cmdline in that case.
if not os.path.isfile(command):
try:
with open("/proc/%d/cmdline" % int(pid)) as pfile:
cmdline = pfile.read().partition('\0')[0]
cmdline = self._program_path(cmdline)
if os.path.isfile(cmdline):
command = cmdline
# Note we don't return None if cmdline doesn't exist
# as that will allow killing a process where the exe
# has been removed from the system rather than updated.
except EnvironmentError:
return None
try:
with open("/proc/%d/cmdline" % int(pid)) as pfile:
cmdline = pfile.read().partition('\0')[0]
return command
cmdline = self._program_path(cmdline)
if os.path.isfile(cmdline):
command = cmdline
# Note we don't return None if cmdline doesn't exist
# as that will allow killing a process where the exe
# has been removed from the system rather than updated.
return command
except EnvironmentError:
return None
def match(self, userargs):
if not userargs or userargs[0] != "kill":

View File

@ -254,33 +254,42 @@ class RootwrapTestCase(testtools.TestCase):
exists.side_effect = fake_exists
self.assertTrue(f.match(usercmd))
def test_KillFilter_upgraded_exe(self):
@mock.patch('os.readlink')
@mock.patch('os.path.isfile')
def test_KillFilter_upgraded_exe(self, mock_isfile, mock_readlink):
"""Makes sure upgraded exe's are killed correctly."""
f = filters.KillFilter("root", "/bin/commandddddd")
command = "/bin/commandddddd"
usercmd = ['kill', 1234]
with mock.patch('os.readlink') as readlink:
readlink.return_value = command + '\0\05190bfb2 (deleted)'
with mock.patch('os.path.isfile') as exists:
def fake_exists(path):
return path == command
exists.side_effect = fake_exists
self.assertTrue(f.match(usercmd))
def test_KillFilter_renamed_exe(self):
def fake_exists(path):
return path == command
mock_readlink.return_value = command + '\0\05190bfb2 (deleted)'
mock_isfile.side_effect = fake_exists
self.assertTrue(f.match(usercmd))
@mock.patch('os.readlink')
@mock.patch('os.path.isfile')
@mock.patch('os.path.exists')
@mock.patch('os.access')
def test_KillFilter_renamed_exe(self, mock_access, mock_exists,
mock_isfile, mock_readlink):
"""Makes sure renamed exe's are killed correctly."""
command = "/bin/commandddddd"
f = filters.KillFilter("root", command)
usercmd = ['kill', 1234]
with mock.patch('os.readlink') as readlink:
readlink.return_value = command + ';90bfb2 (deleted)'
m = mock.mock_open(read_data=command)
with mock.patch("six.moves.builtins.open", m, create=True):
with mock.patch('os.path.isfile') as exists:
def fake_exists(path):
return path == command
exists.side_effect = fake_exists
self.assertTrue(f.match(usercmd))
def fake_os_func(path, *args):
return path == command
mock_readlink.return_value = command + ';90bfb2 (deleted)'
m = mock.mock_open(read_data=command)
with mock.patch("six.moves.builtins.open", m, create=True):
mock_isfile.side_effect = fake_os_func
mock_exists.side_effect = fake_os_func
mock_access.side_effect = fake_os_func
self.assertTrue(f.match(usercmd))
def test_ReadFileFilter(self):
goodfn = '/good/file.name'