Convert all internal commands to list

Make all internal commands as list to avoid any possibility of command
line injection. Commands supplied as string are susceptible to
substitution.

All the internal commands are supplied as list to CommandRunner. As a
convention, all the commands must be given as list to subprocess except
the commands read from file, like in case of cfn hooks and commands
section in metadata.

Few internal commands require shell redirects and they will be
implemented in another patch.

Change-Id: Ifabaf44e341144bc85508dc05c76b1d83e41ae44
Partial-Bug: #1312246
This commit is contained in:
Anant Patil 2015-09-10 11:27:25 +05:30
parent f427a69443
commit 2710bba2cb
3 changed files with 231 additions and 200 deletions

View File

@ -100,14 +100,19 @@ body = {
"UniqueId": unique_id,
"Data": args.data
}
data = cfn_helper.json.dumps(body)
insecure = ""
cmd = ['curl']
if args.insecure:
insecure = "--insecure"
cmd.append('--insecure')
cmd.extend([
'-X', 'PUT',
'-H', 'Content-Type:',
'--data-binary', data,
args.url
])
cmd_str = ("curl %s -X PUT -H \'Content-Type:\' --data-binary \'%s\' \"%s\"" %
(insecure, cfn_helper.json.dumps(body), args.url))
command = cfn_helper.CommandRunner(cmd_str).run()
command = cfn_helper.CommandRunner(cmd).run()
if command.status != 0:
LOG.error(command.stderr)
sys.exit(command.status)

View File

@ -140,7 +140,7 @@ class Hook(object):
def event(self, ev_name, ev_object, ev_resource):
if self.resource_name_get() == ev_resource and \
ev_name in self.triggers:
CommandRunner(self.action).run(user=self.runas)
CommandRunner(self.action, shell=True).run(user=self.runas)
else:
LOG.debug('event: {%s, %s, %s} did not match %s' %
(ev_name, ev_object, ev_resource, self.__str__()))
@ -184,8 +184,9 @@ def controlled_privileges(user):
class CommandRunner(object):
"""Helper class to run a command and store the output."""
def __init__(self, command, nextcommand=None):
def __init__(self, command, shell=False, nextcommand=None):
self._command = command
self._shell = shell
self._next = nextcommand
self._stdout = None
self._stderr = None
@ -211,16 +212,16 @@ class CommandRunner(object):
LOG.debug("Running command: %s" % self._command)
cmd = self._command
shell = self._shell
# Ensure commands are passed as strings only as we run them
# using shell
assert isinstance(cmd, six.string_types)
# Ensure commands that are given as string are run on shell
assert isinstance(cmd, six.string_types) is bool(shell)
try:
with controlled_privileges(user):
subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, cwd=cwd,
env=env, shell=True)
env=env, shell=shell)
output = subproc.communicate()
self._status = subproc.returncode
self._stdout = output[0]
@ -319,7 +320,8 @@ class RpmHelper(object):
e.g., httpd-2.2.22
e.g., httpd-2.2.22-1.fc16
"""
command = CommandRunner("rpm -q %s" % pkg).run()
cmd = ['rpm', '-q', pkg]
command = CommandRunner(cmd).run()
return command.status == 0
@classmethod
@ -332,8 +334,8 @@ class RpmHelper(object):
e.g., httpd-2.2.22
e.g., httpd-2.2.22-1.fc16
"""
cmd_str = "yum -y --showduplicates list available %s" % pkg
command = CommandRunner(cmd_str).run()
cmd = ['yum', '-y', '--showduplicates', 'list', 'available', pkg]
command = CommandRunner(cmd).run()
return command.status == 0
@classmethod
@ -346,8 +348,8 @@ class RpmHelper(object):
e.g., httpd-2.2.22
e.g., httpd-2.2.22-1.fc21
"""
cmd_str = "dnf -y --showduplicates list available %s" % pkg
command = CommandRunner(cmd_str).run()
cmd = ['dnf', '-y', '--showduplicates', 'list', 'available', pkg]
command = CommandRunner(cmd).run()
return command.status == 0
@classmethod
@ -360,8 +362,8 @@ class RpmHelper(object):
e.g., httpd-2.2.22
e.g., httpd-2.2.22-1.fc16
"""
cmd_str = "zypper -n --no-refresh search %s" % pkg
command = CommandRunner(cmd_str).run()
cmd = ['zypper', '-n', '--no-refresh', 'search', pkg]
command = CommandRunner(cmd).run()
return command.status == 0
@classmethod
@ -387,22 +389,16 @@ class RpmHelper(object):
* packages must be in same format as yum pkg list
"""
if rpms:
cmd = "rpm -U --force --nosignature "
cmd += " ".join(packages)
LOG.info("Installing packages: %s" % cmd)
cmd = ['rpm', '-U', '--force', '--nosignature']
elif zypper:
cmd = "zypper -n install "
cmd += " ".join(packages)
LOG.info("Installing packages: %s" % cmd)
cmd = ['zypper', '-n', 'install']
elif dnf:
# use dnf --best to upgrade outdated-but-installed packages
cmd = "dnf -y --best install "
cmd += " ".join(packages)
LOG.info("Installing packages: %s" % cmd)
cmd = ['dnf', '-y', '--best', 'install']
else:
cmd = "yum -y install "
cmd += " ".join(packages)
LOG.info("Installing packages: %s" % cmd)
cmd = ['yum', '-y', 'install']
cmd.extend(packages)
LOG.info("Installing packages: %s" % cmd)
command = CommandRunner(cmd).run()
if command.status:
LOG.warn("Failed to install packages: %s" % cmd)
@ -428,22 +424,22 @@ class RpmHelper(object):
if rpms:
cls.install(packages)
elif zypper:
cmd = "zypper -n install --oldpackage "
cmd += " ".join(packages)
cmd = ['zypper', '-n', 'install', '--oldpackage']
cmd.extend(packages)
LOG.info("Downgrading packages: %s", cmd)
command = CommandRunner(cmd).run()
if command.status:
LOG.warn("Failed to downgrade packages: %s" % cmd)
elif dnf:
cmd = "dnf -y downgrade "
cmd += " ".join(packages)
cmd = ['dnf', '-y', 'downgrade']
cmd.extend(packages)
LOG.info("Downgrading packages: %s", cmd)
command = CommandRunner(cmd).run()
if command.status:
LOG.warn("Failed to downgrade packages: %s" % cmd)
else:
cmd = "yum -y downgrade "
cmd += " ".join(packages)
cmd = ['yum', '-y', 'downgrade']
cmd.extend(packages)
LOG.info("Downgrading packages: %s" % cmd)
command = CommandRunner(cmd).run()
if command.status:
@ -477,22 +473,23 @@ class PackagesHandler(object):
# TODO(asalkeld) support versions
# -b == local & remote install
# -y == install deps
opts = '-b -y'
opts = ['-b', '-y']
for pkg_name, versions in packages.items():
if len(versions) > 0:
cmd_str = 'gem install %s --version %s %s' % (opts,
versions[0],
pkg_name)
CommandRunner(cmd_str).run()
cmd = ['gem', 'install'] + opts
cmd.extend(['--version', versions[0], pkg_name])
CommandRunner(cmd).run()
else:
CommandRunner('gem install %s %s' % (opts, pkg_name)).run()
cmd = ['gem', 'install'] + opts
cmd.append(pkg_name)
CommandRunner(cmd).run()
def _handle_python_packages(self, packages):
"""very basic support for easy_install."""
# TODO(asalkeld) support versions
for pkg_name, versions in packages.items():
cmd_str = 'easy_install %s' % (pkg_name)
CommandRunner(cmd_str).run()
cmd = ['easy_install', pkg_name]
CommandRunner(cmd).run()
def _handle_zypper_packages(self, packages):
"""Handle installation, upgrade, or downgrade of packages via yum.
@ -605,7 +602,7 @@ class PackagesHandler(object):
array and follow same logic for version string above
"""
cmd = CommandRunner("which yum").run()
cmd = CommandRunner(['which', 'yum']).run()
if cmd.status == 1:
# yum not available, use DNF if available
self._handle_dnf_packages(packages)
@ -658,11 +655,11 @@ class PackagesHandler(object):
def _handle_apt_packages(self, packages):
"""very basic support for apt."""
# TODO(asalkeld) support versions
pkg_list = ' '.join([p for p in packages])
pkg_list = list(packages)
cmd_str = 'DEBIAN_FRONTEND=noninteractive apt-get -y install %s' % \
pkg_list
CommandRunner(cmd_str).run()
env = {'DEBIAN_FRONTEND': 'noninteractive'}
cmd = ['apt-get', '-y', 'install'] + pkg_list
CommandRunner(cmd).run(env=env)
# map of function pointers to handle different package managers
_package_handlers = {"yum": _handle_yum_packages,
@ -738,7 +735,7 @@ class FilesHandler(object):
.encode('UTF-8'))
f.close()
elif 'source' in meta:
CommandRunner('curl -o %s %s' % (dest, meta['source'])).run()
CommandRunner(['curl', '-o', dest, meta['source']]).run()
else:
LOG.error('%s %s' % (dest, str(meta)))
continue
@ -839,8 +836,9 @@ class SourcesHandler(object):
def _apply_source(self, dest, url):
cmd = self._apply_source_cmd(dest, url)
#FIXME bug 1498298
if cmd != '':
runner = CommandRunner(cmd)
runner = CommandRunner(cmd, shell=True)
runner.run()
def apply_sources(self):
@ -862,47 +860,52 @@ class ServicesHandler(object):
if os.path.exists("/bin/systemctl"):
service_exe = "/bin/systemctl"
service = '%s.service' % service
service_start = '%s start %s'
service_status = '%s status %s'
service_stop = '%s stop %s'
service_start = [service_exe, 'start', service]
service_status = [service_exe, 'status', service]
service_stop = [service_exe, 'stop', service]
elif os.path.exists("/sbin/service"):
service_exe = "/sbin/service"
service_start = '%s %s start'
service_status = '%s %s status'
service_stop = '%s %s stop'
service_start = [service_exe, service, 'start']
service_status = [service_exe, service, 'status']
service_stop = [service_exe, service, 'stop']
else:
service_exe = "/usr/sbin/service"
service_start = '%s %s start'
service_status = '%s %s status'
service_stop = '%s %s stop'
service_start = [service_exe, service, 'start']
service_status = [service_exe, service, 'status']
service_stop = [service_exe, service, 'stop']
if os.path.exists("/bin/systemctl"):
enable_exe = "/bin/systemctl"
enable_on = '%s enable %s'
enable_off = '%s disable %s'
enable_on = [enable_exe, 'enable', service]
enable_off = [enable_exe, 'disable', service]
elif os.path.exists("/sbin/chkconfig"):
enable_exe = "/sbin/chkconfig"
enable_on = '%s %s on'
enable_off = '%s %s off'
enable_on = [enable_exe, service, 'on']
enable_off = [enable_exe, service, 'off']
else:
enable_exe = "/usr/sbin/update-rc.d"
enable_on = '%s %s enable'
enable_off = '%s %s disable'
enable_on = [enable_exe, service, 'enable']
enable_off = [enable_exe, service, 'disable']
cmd = ""
cmd = None
if "enable" == command:
cmd = enable_on % (enable_exe, service)
cmd = enable_on
elif "disable" == command:
cmd = enable_off % (enable_exe, service)
cmd = enable_off
elif "start" == command:
cmd = service_start % (service_exe, service)
cmd = service_start
elif "stop" == command:
cmd = service_stop % (service_exe, service)
cmd = service_stop
elif "status" == command:
cmd = service_status % (service_exe, service)
command = CommandRunner(cmd)
command.run()
return command
cmd = service_status
if cmd is not None:
command = CommandRunner(cmd)
command.run()
return command
else:
LOG.error("Unknown sysv command %s" % command)
def _initialize_service(self, handler, service, properties):
if "enabled" in properties:
@ -1085,7 +1088,7 @@ class CommandsHandler(object):
return
if "test" in properties:
test = CommandRunner(properties["test"])
test = CommandRunner(properties["test"], shell=True)
test_status = test.run('root', cwd, env).status
if test_status != 0:
LOG.info("%s test returns false, skipping command"
@ -1097,10 +1100,11 @@ class CommandsHandler(object):
if "command" in properties:
try:
command = properties["command"]
#FIXME bug 1498300
if isinstance(command, list):
escape = lambda x: '"%s"' % x.replace('"', '\\"')
command = ' '.join(map(escape, command))
command = CommandRunner(command)
command = CommandRunner(command, shell=True)
command.run('root', cwd, env)
command_status = command.status
except OSError as e:
@ -1139,14 +1143,11 @@ class GroupsHandler(object):
def _initialize_group(self, group, properties):
gid = properties.get("gid", None)
param_list = []
param_list.append(group)
cmd = ['groupadd', group]
if gid is not None:
param_list.append("--gid " + gid)
cmd.extend(['--gid', str(gid)])
command = CommandRunner("groupadd " + ' '.join(param_list))
command = CommandRunner(cmd)
command.run()
command_status = command.status
@ -1185,23 +1186,23 @@ class UsersHandler(object):
uid = properties.get("uid", None)
homeDir = properties.get("homeDir", None)
param_list = []
param_list.append(user)
cmd = ['useradd', user]
if uid is not None:
param_list.append("--uid " + uid)
cmd.extend(['--uid', six.text_type(uid)])
if homeDir is not None:
param_list.append("--home " + homeDir)
cmd.extend(['--home', six.text_type(homeDir)])
if "groups" in properties:
param_list.append("--groups " + ','.join(properties["groups"]))
groups = ','.join(properties["groups"])
cmd.extend(['--groups', groups])
#Users are created as non-interactive system users with a shell
#of /sbin/nologin. This is by design and cannot be modified.
param_list.append("--shell /sbin/nologin")
cmd.extend(['--shell', '/sbin/nologin'])
command = CommandRunner("useradd " + ' '.join(param_list))
command = CommandRunner(cmd)
command.run()
command_status = command.status
@ -1294,7 +1295,8 @@ class Metadata(object):
url = 'http://169.254.169.254/openstack/2012-08-10/meta_data.json'
if not os.path.exists(cache_path):
CommandRunner('curl -o %s %s' % (cache_path, url)).run()
cmd = ['curl', '-o', cache_path, url]
CommandRunner(cmd).run()
try:
with open(cache_path) as fd:
try:

View File

@ -26,9 +26,9 @@ import testtools.matchers as ttm
from heat_cfntools.cfntools import cfn_helper
def popen_root_calls(calls):
def popen_root_calls(calls, shell=False):
kwargs = {'env': None, 'cwd': None, 'stderr': -1, 'stdout': -1,
'shell': True}
'shell': shell}
return [
mock.call(call, **kwargs)
for call in calls
@ -55,26 +55,28 @@ class TestCommandRunner(testtools.TestCase):
def test_command_runner(self, mock_geteuid, mock_seteuid, mock_getpwnam):
def returns(*args, **kwargs):
if args[0] == '/bin/command1':
if args[0][0] == '/bin/command1':
return FakePOpen('All good')
elif args[0] == '/bin/command2':
elif args[0][0] == '/bin/command2':
return FakePOpen('Doing something', 'error', -1)
else:
raise Exception('This should never happen')
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
cmd2 = cfn_helper.CommandRunner('/bin/command2')
cmd1 = cfn_helper.CommandRunner('/bin/command1', cmd2)
cmd2 = cfn_helper.CommandRunner(['/bin/command2'])
cmd1 = cfn_helper.CommandRunner(['/bin/command1'],
nextcommand=cmd2)
cmd1.run('root')
self.assertEqual(
'CommandRunner:\n\tcommand: /bin/command1\n\tstdout: All good',
'CommandRunner:\n\tcommand: [\'/bin/command1\']\n\tstdout: '
'All good',
str(cmd1))
self.assertEqual(
'CommandRunner:\n\tcommand: /bin/command2\n\tstatus: -1\n'
'\tstdout: Doing something\n\tstderr: error',
'CommandRunner:\n\tcommand: [\'/bin/command2\']\n\tstatus: '
'-1\n\tstdout: Doing something\n\tstderr: error',
str(cmd2))
calls = popen_root_calls(['/bin/command1', '/bin/command2'])
calls = popen_root_calls([['/bin/command1'], ['/bin/command2']])
mock_popen.assert_has_calls(calls)
def test_privileges_are_lowered_for_non_root_user(self, mock_geteuid,
@ -86,7 +88,7 @@ class TestCommandRunner(testtools.TestCase):
mock_geteuid.return_value = 0
calls = [mock.call(1001), mock.call(0)]
with mock.patch('subprocess.Popen') as mock_popen:
command = '/bin/command --option=value arg1 arg2'
command = ['/bin/command', '--option=value', 'arg1', 'arg2']
cmd = cfn_helper.CommandRunner(command)
cmd.run(user='nonroot')
self.assertTrue(mock_geteuid.called)
@ -100,7 +102,7 @@ class TestCommandRunner(testtools.TestCase):
msg = '[Error 1] Permission Denied'
mock_seteuid.side_effect = Exception(msg)
with mock.patch('subprocess.Popen') as mock_popen:
command = '/bin/command2'
command = ['/bin/command2']
cmd = cfn_helper.CommandRunner(command)
cmd.run(user='nonroot')
self.assertTrue(mock_getpwnam.called)
@ -119,7 +121,7 @@ class TestCommandRunner(testtools.TestCase):
calls = [mock.call(1001), mock.call(0)]
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = ValueError('Something wrong')
command = '/bin/command --option=value arg1 arg2'
command = ['/bin/command', '--option=value', 'arg1', 'arg2']
cmd = cfn_helper.CommandRunner(command)
self.assertRaises(ValueError, cmd.run, user='nonroot')
self.assertTrue(mock_geteuid.called)
@ -134,15 +136,16 @@ class TestPackages(testtools.TestCase):
def test_yum_install(self, mock_cp):
def returns(*args, **kwargs):
if args[0].startswith('rpm -q '):
if args[0][0] == 'rpm' and args[0][1] == '-q':
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
calls = ['which yum']
calls = [['which', 'yum']]
for pack in ('httpd', 'wordpress', 'mysql-server'):
calls.append('rpm -q %s' % pack)
calls.append('yum -y --showduplicates list available %s' % pack)
calls.append(['rpm', '-q', pack])
calls.append(['yum', '-y', '--showduplicates', 'list',
'available', pack])
calls = popen_root_calls(calls)
packages = {
@ -161,16 +164,17 @@ class TestPackages(testtools.TestCase):
def test_dnf_install_yum_unavailable(self, mock_cp):
def returns(*args, **kwargs):
if args[0].startswith('rpm -q ') \
or args[0] == 'which yum':
if ((args[0][0] == 'rpm' and args[0][1] == '-q')
or (args[0][0] == 'which' and args[0][1] == 'yum')):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
calls = ['which yum']
calls = [['which', 'yum']]
for pack in ('httpd', 'wordpress', 'mysql-server'):
calls.append('rpm -q %s' % pack)
calls.append('dnf -y --showduplicates list available %s' % pack)
calls.append(['rpm', '-q', pack])
calls.append(['dnf', '-y', '--showduplicates', 'list',
'available', pack])
calls = popen_root_calls(calls)
packages = {
@ -189,15 +193,16 @@ class TestPackages(testtools.TestCase):
def test_dnf_install(self, mock_cp):
def returns(*args, **kwargs):
if args[0].startswith('rpm -q '):
if args[0][0] == 'rpm' and args[0][1] == '-q':
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
calls = []
for pack in ('httpd', 'wordpress', 'mysql-server'):
calls.append('rpm -q %s' % pack)
calls.append('dnf -y --showduplicates list available %s' % pack)
calls.append(['rpm', '-q', pack])
calls.append(['dnf', '-y', '--showduplicates', 'list',
'available', pack])
calls = popen_root_calls(calls)
packages = {
@ -216,15 +221,15 @@ class TestPackages(testtools.TestCase):
def test_zypper_install(self, mock_cp):
def returns(*args, **kwargs):
if args[0].startswith('rpm -q '):
if args[0][0].startswith('rpm') and args[0][1].startswith('-q'):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
calls = []
for pack in ('httpd', 'wordpress', 'mysql-server'):
calls.append('rpm -q %s' % pack)
calls.append('zypper -n --no-refresh search %s' % pack)
calls.append(['rpm', '-q', pack])
calls.append(['zypper', '-n', '--no-refresh', 'search', pack])
calls = popen_root_calls(calls)
packages = {
@ -263,41 +268,50 @@ class TestServicesHandler(testtools.TestCase):
returns = []
# apply_services
calls.append('/bin/systemctl enable httpd.service')
calls.append(['/bin/systemctl', 'enable', 'httpd.service'])
returns.append(FakePOpen())
calls.append('/bin/systemctl status httpd.service')
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start httpd.service')
calls.append(['/bin/systemctl', 'start', 'httpd.service'])
returns.append(FakePOpen())
calls.append('/bin/systemctl enable mysqld.service')
calls.append(['/bin/systemctl', 'enable', 'mysqld.service'])
returns.append(FakePOpen())
calls.append('/bin/systemctl status mysqld.service')
calls.append(['/bin/systemctl', 'status', 'mysqld.service'])
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start mysqld.service')
calls.append(['/bin/systemctl', 'start', 'mysqld.service'])
returns.append(FakePOpen())
# monitor_services not running
calls.append('/bin/systemctl status httpd.service')
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start httpd.service')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
returns.append(FakePOpen())
calls.append('/bin/systemctl status mysqld.service')
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start mysqld.service')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
returns.append(FakePOpen())
# monitor_services running
calls.append('/bin/systemctl status httpd.service')
returns.append(FakePOpen())
calls.append('/bin/systemctl status mysqld.service')
calls.append(['/bin/systemctl', 'start', 'httpd.service'])
returns.append(FakePOpen())
calls = popen_root_calls(calls)
calls.extend(popen_root_calls(['/bin/services_restarted'], shell=True))
returns.append(FakePOpen())
calls.extend(popen_root_calls([['/bin/systemctl', 'status',
'mysqld.service']]))
returns.append(FakePOpen(returncode=-1))
calls.extend(popen_root_calls([['/bin/systemctl', 'start',
'mysqld.service']]))
returns.append(FakePOpen())
calls.extend(popen_root_calls(['/bin/services_restarted'], shell=True))
returns.append(FakePOpen())
# monitor_services running
calls.extend(popen_root_calls([['/bin/systemctl', 'status',
'httpd.service']]))
returns.append(FakePOpen())
calls.extend(popen_root_calls([['/bin/systemctl', 'status',
'mysqld.service']]))
returns.append(FakePOpen())
#calls = popen_root_calls(calls)
services = {
"systemd": {
"mysqld": {"enabled": "true", "ensureRunning": "true"},
@ -332,12 +346,12 @@ class TestServicesHandler(testtools.TestCase):
calls = []
# apply_services
calls.append('/bin/systemctl disable httpd.service')
calls.append('/bin/systemctl status httpd.service')
calls.append('/bin/systemctl stop httpd.service')
calls.append('/bin/systemctl disable mysqld.service')
calls.append('/bin/systemctl status mysqld.service')
calls.append('/bin/systemctl stop mysqld.service')
calls.append(['/bin/systemctl', 'disable', 'httpd.service'])
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
calls.append(['/bin/systemctl', 'stop', 'httpd.service'])
calls.append(['/bin/systemctl', 'disable', 'mysqld.service'])
calls.append(['/bin/systemctl', 'status', 'mysqld.service'])
calls.append(['/bin/systemctl', 'stop', 'mysqld.service'])
calls = popen_root_calls(calls)
services = {
@ -372,27 +386,28 @@ class TestServicesHandler(testtools.TestCase):
returns = []
# apply_services
calls.append('/sbin/chkconfig httpd on')
calls.append(['/sbin/chkconfig', 'httpd', 'on'])
returns.append(FakePOpen())
calls.append('/sbin/service httpd status')
calls.append(['/sbin/service', 'httpd', 'status'])
returns.append(FakePOpen(returncode=-1))
calls.append('/sbin/service httpd start')
calls.append(['/sbin/service', 'httpd', 'start'])
returns.append(FakePOpen())
# monitor_services not running
calls.append('/sbin/service httpd status')
calls.append(['/sbin/service', 'httpd', 'status'])
returns.append(FakePOpen(returncode=-1))
calls.append('/sbin/service httpd start')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
returns.append(FakePOpen())
# monitor_services running
calls.append('/sbin/service httpd status')
calls.append(['/sbin/service', 'httpd', 'start'])
returns.append(FakePOpen())
calls = popen_root_calls(calls)
calls.extend(popen_root_calls(['/bin/services_restarted'], shell=True))
returns.append(FakePOpen())
# monitor_services running
calls.extend(popen_root_calls([['/sbin/service', 'httpd', 'status']]))
returns.append(FakePOpen())
services = {
"sysvinit": {
"httpd": {"enabled": "true", "ensureRunning": "true"}
@ -430,9 +445,9 @@ class TestServicesHandler(testtools.TestCase):
calls = []
# apply_services
calls.append('/sbin/chkconfig httpd off')
calls.append('/sbin/service httpd status')
calls.append('/sbin/service httpd stop')
calls.append(['/sbin/chkconfig', 'httpd', 'off'])
calls.append(['/sbin/service', 'httpd', 'status'])
calls.append(['/sbin/service', 'httpd', 'stop'])
calls = popen_root_calls(calls)
@ -466,26 +481,30 @@ class TestServicesHandler(testtools.TestCase):
returns = []
# apply_services
calls.append('/bin/systemctl enable httpd.service')
calls.append(['/bin/systemctl', 'enable', 'httpd.service'])
returns.append(FakePOpen())
calls.append('/bin/systemctl status httpd.service')
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start httpd.service')
calls.append(['/bin/systemctl', 'start', 'httpd.service'])
returns.append(FakePOpen())
# monitor_services not running
calls.append('/bin/systemctl status httpd.service')
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start httpd.service')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
calls.append(['/bin/systemctl', 'start', 'httpd.service'])
returns.append(FakePOpen())
# monitor_services running
calls.append('/bin/systemctl status httpd.service')
shell_calls = []
shell_calls.append('/bin/services_restarted')
returns.append(FakePOpen())
calls = popen_root_calls(calls)
calls.extend(popen_root_calls(shell_calls, shell=True))
# monitor_services running
calls.extend(popen_root_calls([['/bin/systemctl', 'status',
'httpd.service']]))
returns.append(FakePOpen())
services = {
"sysvinit": {
@ -519,9 +538,9 @@ class TestServicesHandler(testtools.TestCase):
calls = []
# apply_services
calls.append('/bin/systemctl disable httpd.service')
calls.append('/bin/systemctl status httpd.service')
calls.append('/bin/systemctl stop httpd.service')
calls.append(['/bin/systemctl', 'disable', 'httpd.service'])
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
calls.append(['/bin/systemctl', 'stop', 'httpd.service'])
calls = popen_root_calls(calls)
@ -553,26 +572,30 @@ class TestServicesHandler(testtools.TestCase):
returns = []
# apply_services
calls.append('/usr/sbin/update-rc.d httpd enable')
calls.append(['/usr/sbin/update-rc.d', 'httpd', 'enable'])
returns.append(FakePOpen())
calls.append('/usr/sbin/service httpd status')
calls.append(['/usr/sbin/service', 'httpd', 'status'])
returns.append(FakePOpen(returncode=-1))
calls.append('/usr/sbin/service httpd start')
calls.append(['/usr/sbin/service', 'httpd', 'start'])
returns.append(FakePOpen())
# monitor_services not running
calls.append('/usr/sbin/service httpd status')
calls.append(['/usr/sbin/service', 'httpd', 'status'])
returns.append(FakePOpen(returncode=-1))
calls.append('/usr/sbin/service httpd start')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
calls.append(['/usr/sbin/service', 'httpd', 'start'])
returns.append(FakePOpen())
# monitor_services running
calls.append('/usr/sbin/service httpd status')
shell_calls = []
shell_calls.append('/bin/services_restarted')
returns.append(FakePOpen())
calls = popen_root_calls(calls)
calls.extend(popen_root_calls(shell_calls, shell=True))
# monitor_services running
calls.extend(popen_root_calls([['/usr/sbin/service', 'httpd',
'status']]))
returns.append(FakePOpen())
services = {
"sysvinit": {
@ -609,11 +632,11 @@ class TestServicesHandler(testtools.TestCase):
returns = []
# apply_services
calls.append('/usr/sbin/update-rc.d httpd disable')
calls.append(['/usr/sbin/update-rc.d', 'httpd', 'disable'])
returns.append(FakePOpen())
calls.append('/usr/sbin/service httpd status')
calls.append(['/usr/sbin/service', 'httpd', 'status'])
returns.append(FakePOpen())
calls.append('/usr/sbin/service httpd stop')
calls.append(['/usr/sbin/service', 'httpd', 'stop'])
returns.append(FakePOpen())
calls = popen_root_calls(calls)
@ -751,11 +774,11 @@ interval=120''' % fcreds.name).encode('UTF-8'))
' root, /bin/hook3}', str(hooks[3]))
calls = []
calls.append('/bin/cfn-http-restarted')
calls.append('/bin/hook1')
calls.append('/bin/hook2')
calls.append('/bin/hook3')
calls = popen_root_calls(calls)
calls.extend(popen_root_calls(['/bin/cfn-http-restarted'], shell=True))
calls.extend(popen_root_calls(['/bin/hook1'], shell=True))
calls.extend(popen_root_calls(['/bin/hook2'], shell=True))
calls.extend(popen_root_calls(['/bin/hook3'], shell=True))
#calls = popen_root_calls(calls)
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('All good')
@ -1121,7 +1144,7 @@ class TestMetadataRetrieve(testtools.TestCase):
meta_out = md.get_nova_meta(cache_path=cache_path)
self.assertEqual(meta_in, meta_out)
mock_popen.assert_has_calls(
popen_root_calls(['curl -o %s %s' % (cache_path, url)]))
popen_root_calls([['curl', '-o', cache_path, url]]))
@mock.patch.object(cfn_helper, 'controlled_privileges')
def test_nova_meta_curl_corrupt(self, mock_cp):
@ -1150,7 +1173,7 @@ class TestMetadataRetrieve(testtools.TestCase):
meta_out = md.get_nova_meta(cache_path=cache_path)
self.assertEqual(None, meta_out)
mock_popen.assert_has_calls(
popen_root_calls(['curl -o %s %s' % (cache_path, url)]))
popen_root_calls([['curl', '-o', cache_path, url]]))
@mock.patch.object(cfn_helper, 'controlled_privileges')
def test_nova_meta_curl_failed(self, mock_cp):
@ -1169,7 +1192,7 @@ class TestMetadataRetrieve(testtools.TestCase):
meta_out = md.get_nova_meta(cache_path=cache_path)
self.assertEqual(None, meta_out)
mock_popen.assert_has_calls(
popen_root_calls(['curl -o %s %s' % (cache_path, url)]))
popen_root_calls([['curl', '-o', cache_path, url]]))
def test_get_tags(self):
fake_tags = {'foo': 'fee',
@ -1240,17 +1263,18 @@ class TestCfnInit(testtools.TestCase):
self.assertTrue(
md.retrieve(meta_str=md_data, last_path=self.last_file))
self.assertRaises(cfn_helper.CommandsHandlerRunError, md.cfn_init)
mock_popen.assert_has_calls(popen_root_calls(['/bin/command1']))
mock_popen.assert_has_calls(popen_root_calls(['/bin/command1'],
shell=True))
@mock.patch.object(cfn_helper, 'controlled_privileges')
def test_cfn_init_with_ignore_errors_true(self, mock_cp):
calls = []
returns = []
calls.append('/bin/command1')
calls.extend(popen_root_calls(['/bin/command1'], shell=True))
returns.append(FakePOpen('Doing something', 'error', -1))
calls.append('/bin/command2')
calls.extend(popen_root_calls(['/bin/command2'], shell=True))
returns.append(FakePOpen('All good'))
calls = popen_root_calls(calls)
#calls = popen_root_calls(calls)
md_data = {"AWS::CloudFormation::Init": {"config": {"commands": {
"00_foo": {"command": "/bin/command1",
@ -1279,7 +1303,7 @@ class TestSourcesHandler(testtools.TestCase):
sources = {dest: url}
td = os.path.dirname(end_file)
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
calls = popen_root_calls([er % (dest, dest, url)])
calls = popen_root_calls([er % (dest, dest, url)], shell=True)
with mock.patch.object(tempfile, 'mkdtemp') as mock_mkdtemp:
mock_mkdtemp.return_value = td
@ -1297,7 +1321,7 @@ class TestSourcesHandler(testtools.TestCase):
self.addCleanup(os.rmdir, dest)
sources = {dest: url}
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
calls = popen_root_calls([er % (dest, dest, url)])
calls = popen_root_calls([er % (dest, dest, url)], shell=True)
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('Curl good')
sh = cfn_helper.SourcesHandler(sources)
@ -1311,7 +1335,7 @@ class TestSourcesHandler(testtools.TestCase):
self.addCleanup(os.rmdir, dest)
sources = {dest: url}
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
calls = popen_root_calls([er % (dest, dest, url)])
calls = popen_root_calls([er % (dest, dest, url)], shell=True)
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('Curl good')
sh = cfn_helper.SourcesHandler(sources)