Fix cfntools command injection
The CommandRunner used to run commands using su command and passing the actual command to be run as argument to it. su USER -c <cmd> This is susceptible to command line injection as noted in the bug. The fix required to do two things: 1. Pass the command to be run as list instead of a string. This is to ensure that the actual arguments are passed as arguments to the program ought to be executed. And by doing so, avoids running any commands passed in the argument. On the contrary, if the command were passed as a string to the shell, the arguments could be formed in a way to execute malicious commands. 2. The CommandRunner runs the command directly and uses setuid to lower the privileges if needed. If the 'runas' user is other than root, then its UID is obtained and setuid is invoked to set the real user-id and effective user-id to the given user. Change-Id: I654117e994fd38411508dbe9b85d06c28dc0e411 Closes-Bug: #1312246
This commit is contained in:
parent
20049ea85f
commit
e424af2236
|
@ -180,7 +180,23 @@ class CommandRunner(object):
|
|||
self
|
||||
"""
|
||||
LOG.debug("Running command: %s" % self._command)
|
||||
cmd = ['su', user, '-c', self._command]
|
||||
|
||||
cmd = self._command
|
||||
if isinstance(cmd, str):
|
||||
cmd = cmd.split()
|
||||
|
||||
if user != 'root':
|
||||
# lower the privileges
|
||||
try:
|
||||
real = pwd.getpwnam(user)
|
||||
os.setuid(real.pw_uid)
|
||||
except Exception as e:
|
||||
LOG.error("Error setting privileges for user '%s': %s"
|
||||
% (user, e))
|
||||
# 126: command invoked cannot be executed
|
||||
self._status = 126
|
||||
return
|
||||
|
||||
subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, cwd=cwd, env=env)
|
||||
output = subproc.communicate()
|
||||
|
@ -560,7 +576,6 @@ class PackagesHandler(object):
|
|||
* if a version array is supplied, choose the highest version from the
|
||||
array and follow same logic for version string above
|
||||
"""
|
||||
|
||||
cmd = CommandRunner("which yum").run()
|
||||
if cmd.status == 1:
|
||||
# yum not available, use DNF if available
|
||||
|
|
|
@ -29,7 +29,7 @@ from heat_cfntools.cfntools import cfn_helper
|
|||
def popen_root_calls(calls):
|
||||
kwargs = {'env': None, 'cwd': None, 'stderr': -1, 'stdout': -1}
|
||||
return [
|
||||
mock.call(['su', 'root', '-c', call], **kwargs)
|
||||
mock.call(call.split(), **kwargs)
|
||||
for call in calls
|
||||
]
|
||||
|
||||
|
@ -51,9 +51,9 @@ class TestCommandRunner(testtools.TestCase):
|
|||
|
||||
def test_command_runner(self):
|
||||
def returns(*args, **kwargs):
|
||||
if args[0][3] == '/bin/command1':
|
||||
if args[0][0] == '/bin/command1':
|
||||
return FakePOpen('All good')
|
||||
elif args[0][3] == '/bin/command2':
|
||||
elif args[0][0] == '/bin/command2':
|
||||
return FakePOpen('Doing something', 'error', -1)
|
||||
else:
|
||||
raise Exception('This should never happen')
|
||||
|
@ -73,13 +73,53 @@ class TestCommandRunner(testtools.TestCase):
|
|||
calls = popen_root_calls(['/bin/command1', '/bin/command2'])
|
||||
mock_popen.assert_has_calls(calls)
|
||||
|
||||
def test_runner_runs_command_as_list(self):
|
||||
with mock.patch('subprocess.Popen') as mock_popen:
|
||||
command = '/bin/command --option=value arg1 arg2'
|
||||
expected_cmd = ['/bin/command', '--option=value', 'arg1', 'arg2']
|
||||
cmd = cfn_helper.CommandRunner(command)
|
||||
cmd.run()
|
||||
self.assertTrue(mock_popen.called)
|
||||
actual_cmd = mock_popen.call_args[0][0]
|
||||
self.assertTrue(isinstance(actual_cmd, list))
|
||||
self.assertItemsEqual(expected_cmd, actual_cmd)
|
||||
|
||||
@mock.patch.object(cfn_helper.pwd, 'getpwnam')
|
||||
@mock.patch.object(cfn_helper.os, 'setuid')
|
||||
def test_privileges_are_lowered_for_non_root_user(self, mock_setuid,
|
||||
mock_getpwnam):
|
||||
pw_entry = mock.MagicMock()
|
||||
pw_entry.pw_uid = 1001
|
||||
mock_getpwnam.return_value = pw_entry
|
||||
with mock.patch('subprocess.Popen') as mock_popen:
|
||||
command = '/bin/command --option=value arg1 arg2'
|
||||
cmd = cfn_helper.CommandRunner(command)
|
||||
cmd.run(user='nonroot')
|
||||
mock_getpwnam.assert_called_once_with('nonroot')
|
||||
mock_setuid.assert_called_once_with(1001)
|
||||
self.assertTrue(mock_popen.called)
|
||||
|
||||
@mock.patch.object(cfn_helper.pwd, 'getpwnam')
|
||||
@mock.patch.object(cfn_helper.os, 'setuid')
|
||||
def test_run_returns_when_cannot_set_privileges(self, mock_setuid,
|
||||
mock_getpwnam):
|
||||
mock_setuid.side_effect = Exception('[Error 1] Permission Denied')
|
||||
with mock.patch('subprocess.Popen') as mock_popen:
|
||||
command = '/bin/command2'
|
||||
cmd = cfn_helper.CommandRunner(command)
|
||||
cmd.run(user='nonroot')
|
||||
self.assertTrue(mock_getpwnam.called)
|
||||
self.assertTrue(mock_setuid.called)
|
||||
self.assertFalse(mock_popen.called)
|
||||
self.assertEqual(126, cmd.status)
|
||||
|
||||
|
||||
class TestPackages(testtools.TestCase):
|
||||
|
||||
def test_yum_install(self):
|
||||
|
||||
def returns(*args, **kwargs):
|
||||
if args[0][3].startswith('rpm -q '):
|
||||
if (args[0][0] == 'rpm' and args[0][1] == '-q'):
|
||||
return FakePOpen(returncode=1)
|
||||
else:
|
||||
return FakePOpen(returncode=0)
|
||||
|
@ -106,8 +146,8 @@ class TestPackages(testtools.TestCase):
|
|||
def test_dnf_install_yum_unavailable(self):
|
||||
|
||||
def returns(*args, **kwargs):
|
||||
if args[0][3].startswith('rpm -q ') \
|
||||
or args[0][3] == 'which yum':
|
||||
if ((args[0][0] == 'rpm' and args[0][1] == '-q')
|
||||
or (args[0][0] == 'which' and args[0][1] == 'yum')):
|
||||
return FakePOpen(returncode=1)
|
||||
else:
|
||||
return FakePOpen(returncode=0)
|
||||
|
@ -134,7 +174,7 @@ class TestPackages(testtools.TestCase):
|
|||
def test_dnf_install(self):
|
||||
|
||||
def returns(*args, **kwargs):
|
||||
if args[0][3].startswith('rpm -q '):
|
||||
if (args[0][0] == 'rpm' and args[0][1] == '-q'):
|
||||
return FakePOpen(returncode=1)
|
||||
else:
|
||||
return FakePOpen(returncode=0)
|
||||
|
@ -161,7 +201,7 @@ class TestPackages(testtools.TestCase):
|
|||
def test_zypper_install(self):
|
||||
|
||||
def returns(*args, **kwargs):
|
||||
if args[0][3].startswith('rpm -q '):
|
||||
if (args[0][0] == 'rpm' and args[0][1] == '-q'):
|
||||
return FakePOpen(returncode=1)
|
||||
else:
|
||||
return FakePOpen(returncode=0)
|
||||
|
|
Loading…
Reference in New Issue