heat-cfntools/heat_cfntools/tests/test_cfn_helper.py

1309 lines
48 KiB
Python

#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import boto.cloudformation as cfn
import fixtures
import json
import mock
import os
import tempfile
import testtools
import testtools.matchers as ttm
from heat_cfntools.cfntools import cfn_helper
def popen_root_calls(calls):
kwargs = {'env': None, 'cwd': None, 'stderr': -1, 'stdout': -1}
return [
mock.call(['su', 'root', '-c', call], **kwargs)
for call in calls
]
class FakePOpen():
def __init__(self, stdout='', stderr='', returncode=0):
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
def communicate(self):
return (self.stdout, self.stderr)
def wait(self):
pass
class TestCommandRunner(testtools.TestCase):
def test_command_runner(self):
def returns(*args, **kwargs):
if args[0][3] == '/bin/command1':
return FakePOpen('All good')
elif args[0][3] == '/bin/command2':
return FakePOpen('Doing something', 'error', -1)
else:
raise Exception('This should never happen')
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
cmd2 = cfn_helper.CommandRunner('/bin/command2')
cmd1 = cfn_helper.CommandRunner('/bin/command1', cmd2)
cmd1.run('root')
self.assertEqual(
'CommandRunner:\n\tcommand: /bin/command1\n\tstdout: All good',
str(cmd1))
self.assertEqual(
'CommandRunner:\n\tcommand: /bin/command2\n\tstatus: -1\n'
'\tstdout: Doing something\n\tstderr: error',
str(cmd2))
calls = popen_root_calls(['/bin/command1', '/bin/command2'])
mock_popen.assert_has_calls(calls)
class TestPackages(testtools.TestCase):
def test_yum_install(self):
def returns(*args, **kwargs):
if args[0][3].startswith('rpm -q '):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
calls = ['which yum']
for pack in ('httpd', 'wordpress', 'mysql-server'):
calls.append('rpm -q %s' % pack)
calls.append('yum -y --showduplicates list available %s' % pack)
calls = popen_root_calls(calls)
packages = {
"yum": {
"mysql-server": [],
"httpd": [],
"wordpress": []
}
}
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
cfn_helper.PackagesHandler(packages).apply_packages()
mock_popen.assert_has_calls(calls, any_order=True)
def test_dnf_install_yum_unavailable(self):
def returns(*args, **kwargs):
if args[0][3].startswith('rpm -q ') \
or args[0][3] == 'which yum':
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
calls = ['which yum']
for pack in ('httpd', 'wordpress', 'mysql-server'):
calls.append('rpm -q %s' % pack)
calls.append('dnf -y --showduplicates list available %s' % pack)
calls = popen_root_calls(calls)
packages = {
"yum": {
"mysql-server": [],
"httpd": [],
"wordpress": []
}
}
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
cfn_helper.PackagesHandler(packages).apply_packages()
mock_popen.assert_has_calls(calls, any_order=True)
def test_dnf_install(self):
def returns(*args, **kwargs):
if args[0][3].startswith('rpm -q '):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
calls = []
for pack in ('httpd', 'wordpress', 'mysql-server'):
calls.append('rpm -q %s' % pack)
calls.append('dnf -y --showduplicates list available %s' % pack)
calls = popen_root_calls(calls)
packages = {
"dnf": {
"mysql-server": [],
"httpd": [],
"wordpress": []
}
}
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
cfn_helper.PackagesHandler(packages).apply_packages()
mock_popen.assert_has_calls(calls, any_order=True)
def test_zypper_install(self):
def returns(*args, **kwargs):
if args[0][3].startswith('rpm -q '):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
calls = []
for pack in ('httpd', 'wordpress', 'mysql-server'):
calls.append('rpm -q %s' % pack)
calls.append('zypper -n --no-refresh search %s' % pack)
calls = popen_root_calls(calls)
packages = {
"zypper": {
"mysql-server": [],
"httpd": [],
"wordpress": []
}
}
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
cfn_helper.PackagesHandler(packages).apply_packages()
mock_popen.assert_has_calls(calls, any_order=True)
def test_apt_install(self):
packages = {
"apt": {
"mysql-server": [],
"httpd": [],
"wordpress": []
}
}
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen(returncode=0)
cfn_helper.PackagesHandler(packages).apply_packages()
self.assertTrue(mock_popen.called)
class TestServicesHandler(testtools.TestCase):
def test_services_handler_systemd(self):
calls = []
returns = []
# apply_services
calls.append('/bin/systemctl enable httpd.service')
returns.append(FakePOpen())
calls.append('/bin/systemctl status httpd.service')
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start httpd.service')
returns.append(FakePOpen())
calls.append('/bin/systemctl enable mysqld.service')
returns.append(FakePOpen())
calls.append('/bin/systemctl status mysqld.service')
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start mysqld.service')
returns.append(FakePOpen())
# monitor_services not running
calls.append('/bin/systemctl status httpd.service')
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start httpd.service')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
returns.append(FakePOpen())
calls.append('/bin/systemctl status mysqld.service')
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start mysqld.service')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
returns.append(FakePOpen())
# monitor_services running
calls.append('/bin/systemctl status httpd.service')
returns.append(FakePOpen())
calls.append('/bin/systemctl status mysqld.service')
returns.append(FakePOpen())
calls = popen_root_calls(calls)
services = {
"systemd": {
"mysqld": {"enabled": "true", "ensureRunning": "true"},
"httpd": {"enabled": "true", "ensureRunning": "true"}
}
}
hooks = [
cfn_helper.Hook(
'hook1',
'service.restarted',
'Resources.resource1.Metadata',
'root',
'/bin/services_restarted')
]
with mock.patch('os.path.exists') as mock_exists:
mock_exists.return_value = True
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
sh.apply_services()
# services not running
sh.monitor_services()
# services running
sh.monitor_services()
mock_popen.assert_has_calls(calls, any_order=True)
mock_exists.assert_called_with('/bin/systemctl')
def test_services_handler_systemd_disabled(self):
calls = []
# apply_services
calls.append('/bin/systemctl disable httpd.service')
calls.append('/bin/systemctl status httpd.service')
calls.append('/bin/systemctl stop httpd.service')
calls.append('/bin/systemctl disable mysqld.service')
calls.append('/bin/systemctl status mysqld.service')
calls.append('/bin/systemctl stop mysqld.service')
calls = popen_root_calls(calls)
services = {
"systemd": {
"mysqld": {"enabled": "false", "ensureRunning": "false"},
"httpd": {"enabled": "false", "ensureRunning": "false"}
}
}
hooks = [
cfn_helper.Hook(
'hook1',
'service.restarted',
'Resources.resource1.Metadata',
'root',
'/bin/services_restarted')
]
with mock.patch('os.path.exists') as mock_exists:
mock_exists.return_value = True
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen()
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
sh.apply_services()
mock_popen.assert_has_calls(calls, any_order=True)
mock_exists.assert_called_with('/bin/systemctl')
def test_services_handler_sysv_service_chkconfig(self):
def exists(*args, **kwargs):
return args[0] != '/bin/systemctl'
calls = []
returns = []
# apply_services
calls.append('/sbin/chkconfig httpd on')
returns.append(FakePOpen())
calls.append('/sbin/service httpd status')
returns.append(FakePOpen(returncode=-1))
calls.append('/sbin/service httpd start')
returns.append(FakePOpen())
# monitor_services not running
calls.append('/sbin/service httpd status')
returns.append(FakePOpen(returncode=-1))
calls.append('/sbin/service httpd start')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
returns.append(FakePOpen())
# monitor_services running
calls.append('/sbin/service httpd status')
returns.append(FakePOpen())
calls = popen_root_calls(calls)
services = {
"sysvinit": {
"httpd": {"enabled": "true", "ensureRunning": "true"}
}
}
hooks = [
cfn_helper.Hook(
'hook1',
'service.restarted',
'Resources.resource1.Metadata',
'root',
'/bin/services_restarted')
]
with mock.patch('os.path.exists') as mock_exists:
mock_exists.side_effect = exists
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
sh.apply_services()
# services not running
sh.monitor_services()
# services running
sh.monitor_services()
mock_popen.assert_has_calls(calls)
mock_exists.assert_any_call('/bin/systemctl')
mock_exists.assert_any_call('/sbin/service')
mock_exists.assert_any_call('/sbin/chkconfig')
def test_services_handler_sysv_disabled_service_chkconfig(self):
def exists(*args, **kwargs):
return args[0] != '/bin/systemctl'
calls = []
# apply_services
calls.append('/sbin/chkconfig httpd off')
calls.append('/sbin/service httpd status')
calls.append('/sbin/service httpd stop')
calls = popen_root_calls(calls)
services = {
"sysvinit": {
"httpd": {"enabled": "false", "ensureRunning": "false"}
}
}
hooks = [
cfn_helper.Hook(
'hook1',
'service.restarted',
'Resources.resource1.Metadata',
'root',
'/bin/services_restarted')
]
with mock.patch('os.path.exists') as mock_exists:
mock_exists.side_effect = exists
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen()
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
sh.apply_services()
mock_popen.assert_has_calls(calls)
mock_exists.assert_any_call('/bin/systemctl')
mock_exists.assert_any_call('/sbin/service')
mock_exists.assert_any_call('/sbin/chkconfig')
def test_services_handler_sysv_systemctl(self):
calls = []
returns = []
# apply_services
calls.append('/bin/systemctl enable httpd.service')
returns.append(FakePOpen())
calls.append('/bin/systemctl status httpd.service')
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start httpd.service')
returns.append(FakePOpen())
# monitor_services not running
calls.append('/bin/systemctl status httpd.service')
returns.append(FakePOpen(returncode=-1))
calls.append('/bin/systemctl start httpd.service')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
returns.append(FakePOpen())
# monitor_services running
calls.append('/bin/systemctl status httpd.service')
returns.append(FakePOpen())
calls = popen_root_calls(calls)
services = {
"sysvinit": {
"httpd": {"enabled": "true", "ensureRunning": "true"}
}
}
hooks = [
cfn_helper.Hook(
'hook1',
'service.restarted',
'Resources.resource1.Metadata',
'root',
'/bin/services_restarted')
]
with mock.patch('os.path.exists') as mock_exists:
mock_exists.return_value = True
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
sh.apply_services()
# services not running
sh.monitor_services()
# services running
sh.monitor_services()
mock_popen.assert_has_calls(calls)
mock_exists.assert_called_with('/bin/systemctl')
def test_services_handler_sysv_disabled_systemctl(self):
calls = []
# apply_services
calls.append('/bin/systemctl disable httpd.service')
calls.append('/bin/systemctl status httpd.service')
calls.append('/bin/systemctl stop httpd.service')
calls = popen_root_calls(calls)
services = {
"sysvinit": {
"httpd": {"enabled": "false", "ensureRunning": "false"}
}
}
hooks = [
cfn_helper.Hook(
'hook1',
'service.restarted',
'Resources.resource1.Metadata',
'root',
'/bin/services_restarted')
]
with mock.patch('os.path.exists') as mock_exists:
mock_exists.return_value = True
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen()
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
sh.apply_services()
mock_popen.assert_has_calls(calls)
mock_exists.assert_called_with('/bin/systemctl')
def test_services_handler_sysv_service_updaterc(self):
calls = []
returns = []
# apply_services
calls.append('/usr/sbin/update-rc.d httpd enable')
returns.append(FakePOpen())
calls.append('/usr/sbin/service httpd status')
returns.append(FakePOpen(returncode=-1))
calls.append('/usr/sbin/service httpd start')
returns.append(FakePOpen())
# monitor_services not running
calls.append('/usr/sbin/service httpd status')
returns.append(FakePOpen(returncode=-1))
calls.append('/usr/sbin/service httpd start')
returns.append(FakePOpen())
calls.append('/bin/services_restarted')
returns.append(FakePOpen())
# monitor_services running
calls.append('/usr/sbin/service httpd status')
returns.append(FakePOpen())
calls = popen_root_calls(calls)
services = {
"sysvinit": {
"httpd": {"enabled": "true", "ensureRunning": "true"}
}
}
hooks = [
cfn_helper.Hook(
'hook1',
'service.restarted',
'Resources.resource1.Metadata',
'root',
'/bin/services_restarted')
]
with mock.patch('os.path.exists') as mock_exists:
mock_exists.return_value = False
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
sh.apply_services()
# services not running
sh.monitor_services()
# services running
sh.monitor_services()
mock_popen.assert_has_calls(calls)
mock_exists.assert_any_call('/bin/systemctl')
mock_exists.assert_any_call('/sbin/service')
mock_exists.assert_any_call('/sbin/chkconfig')
def test_services_handler_sysv_disabled_service_updaterc(self):
calls = []
returns = []
# apply_services
calls.append('/usr/sbin/update-rc.d httpd disable')
returns.append(FakePOpen())
calls.append('/usr/sbin/service httpd status')
returns.append(FakePOpen())
calls.append('/usr/sbin/service httpd stop')
returns.append(FakePOpen())
calls = popen_root_calls(calls)
services = {
"sysvinit": {
"httpd": {"enabled": "false", "ensureRunning": "false"}
}
}
hooks = [
cfn_helper.Hook(
'hook1',
'service.restarted',
'Resources.resource1.Metadata',
'root',
'/bin/services_restarted')
]
with mock.patch('os.path.exists') as mock_exists:
mock_exists.return_value = False
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
sh.apply_services()
mock_popen.assert_has_calls(calls)
mock_exists.assert_any_call('/bin/systemctl')
mock_exists.assert_any_call('/sbin/service')
mock_exists.assert_any_call('/sbin/chkconfig')
class TestHupConfig(testtools.TestCase):
def test_load_main_section(self):
fcreds = tempfile.NamedTemporaryFile()
fcreds.write('AWSAccessKeyId=foo\nAWSSecretKey=bar\n'.encode('UTF-8'))
fcreds.flush()
main_conf = tempfile.NamedTemporaryFile()
main_conf.write(('''[main]
stack=teststack
credential-file=%s''' % fcreds.name).encode('UTF-8'))
main_conf.flush()
mainconfig = cfn_helper.HupConfig([open(main_conf.name)])
self.assertEqual(
'{stack: teststack, credential_file: %s, '
'region: nova, interval:10}' % fcreds.name,
str(mainconfig))
main_conf.close()
main_conf = tempfile.NamedTemporaryFile()
main_conf.write(('''[main]
stack=teststack
region=region1
credential-file=%s-invalid
interval=120''' % fcreds.name).encode('UTF-8'))
main_conf.flush()
e = self.assertRaises(Exception, cfn_helper.HupConfig,
[open(main_conf.name)])
self.assertIn('invalid credentials file', str(e))
fcreds.close()
def test_hup_config(self):
hooks_conf = tempfile.NamedTemporaryFile()
def write_hook_conf(f, name, triggers, path, action):
f.write((
'[%s]\ntriggers=%s\npath=%s\naction=%s\nrunas=root\n\n' % (
name, triggers, path, action)).encode('UTF-8'))
write_hook_conf(
hooks_conf,
'hook2',
'service2.restarted',
'Resources.resource2.Metadata',
'/bin/hook2')
write_hook_conf(
hooks_conf,
'hook1',
'service1.restarted',
'Resources.resource1.Metadata',
'/bin/hook1')
write_hook_conf(
hooks_conf,
'hook3',
'service3.restarted',
'Resources.resource3.Metadata',
'/bin/hook3')
write_hook_conf(
hooks_conf,
'cfn-http-restarted',
'service.restarted',
'Resources.resource.Metadata',
'/bin/cfn-http-restarted')
hooks_conf.flush()
fcreds = tempfile.NamedTemporaryFile()
fcreds.write('AWSAccessKeyId=foo\nAWSSecretKey=bar\n'.encode('UTF-8'))
fcreds.flush()
main_conf = tempfile.NamedTemporaryFile()
main_conf.write(('''[main]
stack=teststack
credential-file=%s
region=region1
interval=120''' % fcreds.name).encode('UTF-8'))
main_conf.flush()
mainconfig = cfn_helper.HupConfig([
open(main_conf.name),
open(hooks_conf.name)])
unique_resources = mainconfig.unique_resources_get()
self.assertThat([
'resource',
'resource1',
'resource2',
'resource3',
], ttm.Equals(sorted(unique_resources)))
hooks = sorted(mainconfig.hooks,
key=lambda hook: hook.resource_name_get())
self.assertEqual(len(hooks), 4)
self.assertEqual(
'{cfn-http-restarted, service.restarted,'
' Resources.resource.Metadata, root, /bin/cfn-http-restarted}',
str(hooks[0]))
self.assertEqual(
'{hook1, service1.restarted, Resources.resource1.Metadata,'
' root, /bin/hook1}', str(hooks[1]))
self.assertEqual(
'{hook2, service2.restarted, Resources.resource2.Metadata,'
' root, /bin/hook2}', str(hooks[2]))
self.assertEqual(
'{hook3, service3.restarted, Resources.resource3.Metadata,'
' root, /bin/hook3}', str(hooks[3]))
calls = []
calls.append('/bin/cfn-http-restarted')
calls.append('/bin/hook1')
calls.append('/bin/hook2')
calls.append('/bin/hook3')
calls = popen_root_calls(calls)
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('All good')
for hook in hooks:
hook.event(hook.triggers, None, hook.resource_name_get())
hooks_conf.close()
fcreds.close()
main_conf.close()
mock_popen.assert_has_calls(calls)
class TestCfnHelper(testtools.TestCase):
def _check_metadata_content(self, content, value):
with tempfile.NamedTemporaryFile() as metadata_info:
metadata_info.write(content.encode('UTF-8'))
metadata_info.flush()
port = cfn_helper.metadata_server_port(metadata_info.name)
self.assertEqual(value, port)
def test_metadata_server_port(self):
self._check_metadata_content("http://172.20.42.42:8000\n", 8000)
def test_metadata_server_port_https(self):
self._check_metadata_content("https://abc.foo.bar:6969\n", 6969)
def test_metadata_server_port_noport(self):
self._check_metadata_content("http://172.20.42.42\n", None)
def test_metadata_server_port_justip(self):
self._check_metadata_content("172.20.42.42", None)
def test_metadata_server_port_weird(self):
self._check_metadata_content("::::", None)
self._check_metadata_content("beforecolons:aftercolons", None)
def test_metadata_server_port_emptyfile(self):
self._check_metadata_content("\n", None)
self._check_metadata_content("", None)
def test_metadata_server_nofile(self):
random_filename = self.getUniqueString()
self.assertEqual(None,
cfn_helper.metadata_server_port(random_filename))
def test_to_boolean(self):
self.assertTrue(cfn_helper.to_boolean(True))
self.assertTrue(cfn_helper.to_boolean('true'))
self.assertTrue(cfn_helper.to_boolean('yes'))
self.assertTrue(cfn_helper.to_boolean('1'))
self.assertTrue(cfn_helper.to_boolean(1))
self.assertFalse(cfn_helper.to_boolean(False))
self.assertFalse(cfn_helper.to_boolean('false'))
self.assertFalse(cfn_helper.to_boolean('no'))
self.assertFalse(cfn_helper.to_boolean('0'))
self.assertFalse(cfn_helper.to_boolean(0))
self.assertFalse(cfn_helper.to_boolean(None))
self.assertFalse(cfn_helper.to_boolean('fingle'))
def test_parse_creds_file(self):
def parse_creds_test(file_contents, creds_match):
with tempfile.NamedTemporaryFile(mode='w') as fcreds:
fcreds.write(file_contents)
fcreds.flush()
creds = cfn_helper.parse_creds_file(fcreds.name)
self.assertThat(creds_match, ttm.Equals(creds))
parse_creds_test(
'AWSAccessKeyId=foo\nAWSSecretKey=bar\n',
{'AWSAccessKeyId': 'foo', 'AWSSecretKey': 'bar'}
)
parse_creds_test(
'AWSAccessKeyId =foo\nAWSSecretKey= bar\n',
{'AWSAccessKeyId': 'foo', 'AWSSecretKey': 'bar'}
)
parse_creds_test(
'AWSAccessKeyId = foo\nAWSSecretKey = bar\n',
{'AWSAccessKeyId': 'foo', 'AWSSecretKey': 'bar'}
)
class TestMetadataRetrieve(testtools.TestCase):
def setUp(self):
super(TestMetadataRetrieve, self).setUp()
self.tdir = self.useFixture(fixtures.TempDir())
self.last_file = os.path.join(self.tdir.path, 'last_metadata')
def test_metadata_retrieve_files(self):
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
"/tmp/foo": {"content": "bar"}}}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
with tempfile.NamedTemporaryFile(mode='w+') as default_file:
default_file.write(md_str)
default_file.flush()
self.assertThat(default_file.name, ttm.FileContains(md_str))
self.assertTrue(
md.retrieve(default_path=default_file.name,
last_path=self.last_file))
self.assertThat(self.last_file, ttm.FileContains(md_str))
self.assertThat(md_data, ttm.Equals(md._metadata))
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(default_path=default_file.name,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
def test_metadata_retrieve_none(self):
md = cfn_helper.Metadata('teststack', None)
default_file = os.path.join(self.tdir.path, 'default_file')
self.assertFalse(md.retrieve(default_path=default_file,
last_path=self.last_file))
self.assertIsNone(md._metadata)
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display()
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
def test_metadata_retrieve_passed(self):
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
"/tmp/foo": {"content": "bar"}}}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display()
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(),
"{\"AWS::CloudFormation::Init\": {\"config\": {"
"\"files\": {\"/tmp/foo\": {\"content\": \"bar\"}"
"}}}}\n")
def test_metadata_retrieve_by_key_passed(self):
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("foo")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(),
"{\"bar\": {\"fred.1\": \"abcd\"}}\n")
def test_metadata_retrieve_by_nested_key_passed(self):
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("foo.bar.'fred.1'")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(),
'"abcd"\n')
def test_metadata_retrieve_key_none(self):
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
"/tmp/foo": {"content": "bar"}}}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("no_key")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
def test_metadata_retrieve_by_nested_key_none(self):
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("foo.fred")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
def test_metadata_retrieve_by_nested_key_none_with_matching_string(self):
md_data = {"foo": "bar"}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("foo.bar")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
def test_metadata_creates_cache(self):
temp_home = tempfile.mkdtemp()
def cleanup_temp_home(thome):
os.unlink(os.path.join(thome, 'cache', 'last_metadata'))
os.rmdir(os.path.join(thome, 'cache'))
os.rmdir(os.path.join(thome))
self.addCleanup(cleanup_temp_home, temp_home)
last_path = os.path.join(temp_home, 'cache', 'last_metadata')
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
"/tmp/foo": {"content": "bar"}}}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertFalse(os.path.exists(last_path),
"last_metadata file already exists")
self.assertTrue(md.retrieve(meta_str=md_str, last_path=last_path))
self.assertTrue(os.path.exists(last_path),
"last_metadata file should exist")
# Ensure created dirs and file have right perms
self.assertTrue(os.stat(last_path).st_mode & 0o600 == 0o600)
self.assertTrue(
os.stat(os.path.dirname(last_path)).st_mode & 0o700 == 0o700)
def test_is_valid_metadata(self):
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
"/tmp/foo": {"content": "bar"}}}}}
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(
md.retrieve(meta_str=md_data, last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertTrue(md._is_valid_metadata())
self.assertThat(
md_data['AWS::CloudFormation::Init'], ttm.Equals(md._metadata))
def test_remote_metadata(self):
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
"/tmp/foo": {"content": "bar"}}}}}
with mock.patch.object(
cfn.CloudFormationConnection, 'describe_stack_resource'
) as mock_dsr:
mock_dsr.return_value = {
'DescribeStackResourceResponse': {
'DescribeStackResourceResult': {
'StackResourceDetail': {'Metadata': md_data}}}}
md = cfn_helper.Metadata(
'teststack',
None,
access_key='foo',
secret_key='bar')
self.assertTrue(md.retrieve(last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
with tempfile.NamedTemporaryFile(mode='w') as fcreds:
fcreds.write('AWSAccessKeyId=foo\nAWSSecretKey=bar\n')
fcreds.flush()
md = cfn_helper.Metadata(
'teststack', None, credentials_file=fcreds.name)
self.assertTrue(md.retrieve(last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
def test_nova_meta_with_cache(self):
meta_in = {"uuid": "f9431d18-d971-434d-9044-5b38f5b4646f",
"availability_zone": "nova",
"hostname": "as-wikidatabase-4ykioj3lgi57.novalocal",
"launch_index": 0,
"meta": {},
"public_keys": {"heat_key": "ssh-rsa etc...\n"},
"name": "as-WikiDatabase-4ykioj3lgi57"}
md_str = json.dumps(meta_in)
md = cfn_helper.Metadata('teststack', None)
with tempfile.NamedTemporaryFile(mode='w+') as default_file:
default_file.write(md_str)
default_file.flush()
self.assertThat(default_file.name, ttm.FileContains(md_str))
meta_out = md.get_nova_meta(cache_path=default_file.name)
self.assertEqual(meta_in, meta_out)
def test_nova_meta_curl(self):
url = 'http://169.254.169.254/openstack/2012-08-10/meta_data.json'
temp_home = tempfile.mkdtemp()
cache_path = os.path.join(temp_home, 'meta_data.json')
def cleanup_temp_home(thome):
os.unlink(cache_path)
os.rmdir(thome)
self.addCleanup(cleanup_temp_home, temp_home)
meta_in = {"uuid": "f9431d18-d971-434d-9044-5b38f5b4646f",
"availability_zone": "nova",
"hostname": "as-wikidatabase-4ykioj3lgi57.novalocal",
"launch_index": 0,
"meta": {"freddy": "is hungry"},
"public_keys": {"heat_key": "ssh-rsa etc...\n"},
"name": "as-WikiDatabase-4ykioj3lgi57"}
md_str = json.dumps(meta_in)
def write_cache_file(*params, **kwargs):
with open(cache_path, 'w+') as cache_file:
cache_file.write(md_str)
cache_file.flush()
self.assertThat(cache_file.name, ttm.FileContains(md_str))
return FakePOpen('Downloaded', '', 0)
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = write_cache_file
md = cfn_helper.Metadata('teststack', None)
meta_out = md.get_nova_meta(cache_path=cache_path)
self.assertEqual(meta_in, meta_out)
mock_popen.assert_has_calls(
popen_root_calls(['curl -o %s %s' % (cache_path, url)]))
def test_nova_meta_curl_corrupt(self):
url = 'http://169.254.169.254/openstack/2012-08-10/meta_data.json'
temp_home = tempfile.mkdtemp()
cache_path = os.path.join(temp_home, 'meta_data.json')
def cleanup_temp_home(thome):
os.unlink(cache_path)
os.rmdir(thome)
self.addCleanup(cleanup_temp_home, temp_home)
md_str = "this { is not really json"
def write_cache_file(*params, **kwargs):
with open(cache_path, 'w+') as cache_file:
cache_file.write(md_str)
cache_file.flush()
self.assertThat(cache_file.name, ttm.FileContains(md_str))
return FakePOpen('Downloaded', '', 0)
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = write_cache_file
md = cfn_helper.Metadata('teststack', None)
meta_out = md.get_nova_meta(cache_path=cache_path)
self.assertEqual(None, meta_out)
mock_popen.assert_has_calls(
popen_root_calls(['curl -o %s %s' % (cache_path, url)]))
def test_nova_meta_curl_failed(self):
url = 'http://169.254.169.254/openstack/2012-08-10/meta_data.json'
temp_home = tempfile.mkdtemp()
cache_path = os.path.join(temp_home, 'meta_data.json')
def cleanup_temp_home(thome):
os.rmdir(thome)
self.addCleanup(cleanup_temp_home, temp_home)
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('Failed', '', 1)
md = cfn_helper.Metadata('teststack', None)
meta_out = md.get_nova_meta(cache_path=cache_path)
self.assertEqual(None, meta_out)
mock_popen.assert_has_calls(
popen_root_calls(['curl -o %s %s' % (cache_path, url)]))
def test_get_tags(self):
fake_tags = {'foo': 'fee',
'apple': 'red'}
md_data = {"uuid": "f9431d18-d971-434d-9044-5b38f5b4646f",
"availability_zone": "nova",
"hostname": "as-wikidatabase-4ykioj3lgi57.novalocal",
"launch_index": 0,
"meta": fake_tags,
"public_keys": {"heat_key": "ssh-rsa etc...\n"},
"name": "as-WikiDatabase-4ykioj3lgi57"}
tags_expect = fake_tags
tags_expect['InstanceId'] = md_data['uuid']
md = cfn_helper.Metadata('teststack', None)
with mock.patch.object(md, 'get_nova_meta') as mock_method:
mock_method.return_value = md_data
tags = md.get_tags()
mock_method.assert_called_once_with()
self.assertEqual(tags_expect, tags)
def test_get_instance_id(self):
uuid = "f9431d18-d971-434d-9044-5b38f5b4646f"
md_data = {"uuid": uuid,
"availability_zone": "nova",
"hostname": "as-wikidatabase-4ykioj3lgi57.novalocal",
"launch_index": 0,
"public_keys": {"heat_key": "ssh-rsa etc...\n"},
"name": "as-WikiDatabase-4ykioj3lgi57"}
md = cfn_helper.Metadata('teststack', None)
with mock.patch.object(md, 'get_nova_meta') as mock_method:
mock_method.return_value = md_data
self.assertEqual(md.get_instance_id(), uuid)
mock_method.assert_called_once_with()
class TestCfnInit(testtools.TestCase):
def setUp(self):
super(TestCfnInit, self).setUp()
self.tdir = self.useFixture(fixtures.TempDir())
self.last_file = os.path.join(self.tdir.path, 'last_metadata')
def test_cfn_init(self):
with tempfile.NamedTemporaryFile(mode='w+') as foo_file:
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
foo_file.name: {"content": "bar"}}}}}
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(
md.retrieve(meta_str=md_data, last_path=self.last_file))
md.cfn_init()
self.assertThat(foo_file.name, ttm.FileContains('bar'))
def test_cfn_init_with_ignore_errors_false(self):
md_data = {"AWS::CloudFormation::Init": {"config": {"commands": {
"00_foo": {"command": "/bin/command1",
"ignoreErrors": "false"}}}}}
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('Doing something', 'error', -1)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(
md.retrieve(meta_str=md_data, last_path=self.last_file))
self.assertRaises(cfn_helper.CommandsHandlerRunError, md.cfn_init)
mock_popen.assert_has_calls(popen_root_calls(['/bin/command1']))
def test_cfn_init_with_ignore_errors_true(self):
calls = []
returns = []
calls.append('/bin/command1')
returns.append(FakePOpen('Doing something', 'error', -1))
calls.append('/bin/command2')
returns.append(FakePOpen('All good'))
calls = popen_root_calls(calls)
md_data = {"AWS::CloudFormation::Init": {"config": {"commands": {
"00_foo": {"command": "/bin/command1",
"ignoreErrors": "true"},
"01_bar": {"command": "/bin/command2",
"ignoreErrors": "false"}
}}}}
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.side_effect = returns
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(
md.retrieve(meta_str=md_data, last_path=self.last_file))
md.cfn_init()
mock_popen.assert_has_calls(calls)
class TestSourcesHandler(testtools.TestCase):
def test_apply_sources_empty(self):
sh = cfn_helper.SourcesHandler({})
sh.apply_sources()
def _test_apply_sources(self, url, end_file):
dest = tempfile.mkdtemp()
self.addCleanup(os.rmdir, dest)
sources = {dest: url}
td = os.path.dirname(end_file)
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
calls = popen_root_calls([er % (dest, dest, url)])
with mock.patch.object(tempfile, 'mkdtemp') as mock_mkdtemp:
mock_mkdtemp.return_value = td
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('Curl good')
sh = cfn_helper.SourcesHandler(sources)
sh.apply_sources()
mock_popen.assert_has_calls(calls)
mock_mkdtemp.assert_called_with()
def test_apply_sources_github(self):
url = "https://github.com/NoSuchProject/tarball/NoSuchTarball"
dest = tempfile.mkdtemp()
self.addCleanup(os.rmdir, dest)
sources = {dest: url}
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
calls = popen_root_calls([er % (dest, dest, url)])
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('Curl good')
sh = cfn_helper.SourcesHandler(sources)
sh.apply_sources()
mock_popen.assert_has_calls(calls)
def test_apply_sources_general(self):
url = "https://website.no.existe/a/b/c/file.tar.gz"
dest = tempfile.mkdtemp()
self.addCleanup(os.rmdir, dest)
sources = {dest: url}
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
calls = popen_root_calls([er % (dest, dest, url)])
with mock.patch('subprocess.Popen') as mock_popen:
mock_popen.return_value = FakePOpen('Curl good')
sh = cfn_helper.SourcesHandler(sources)
sh.apply_sources()
mock_popen.assert_has_calls(calls)
def test_apply_source_cmd(self):
sh = cfn_helper.SourcesHandler({})
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | %s | tar -xvf -"
dest = '/tmp'
# test tgz
url = 'http://www.example.com/a.tgz'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, url, "gunzip"), cmd)
# test tar.gz
url = 'http://www.example.com/a.tar.gz'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, url, "gunzip"), cmd)
# test github - tarball 1
url = 'https://github.com/openstack/heat-cfntools/tarball/master'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, url, "gunzip"), cmd)
# test github - tarball 2
url = 'https://github.com/openstack/heat-cfntools/tarball/master/'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, url, "gunzip"), cmd)
# test tbz2
url = 'http://www.example.com/a.tbz2'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, url, "bunzip2"), cmd)
# test tar.bz2
url = 'http://www.example.com/a.tar.bz2'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, url, "bunzip2"), cmd)
# test zip
er = "mkdir -p '%s'; cd '%s'; curl -s -o '%s' '%s' && unzip -o '%s'"
url = 'http://www.example.com/a.zip'
d = "/tmp/tmp2I0yNK"
tmp = "%s/a.zip" % d
with mock.patch.object(tempfile, 'mkdtemp') as mock_mkdtemp:
mock_mkdtemp.return_value = d
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, tmp, url, tmp), cmd)
# test gz
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | %s > '%s'"
url = 'http://www.example.com/a.sh.gz'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, url, "gunzip", "a.sh"), cmd)
# test bz2
url = 'http://www.example.com/a.sh.bz2'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual(er % (dest, dest, url, "bunzip2", "a.sh"), cmd)
# test other
url = 'http://www.example.com/a.sh'
cmd = sh._apply_source_cmd(dest, url)
self.assertEqual("", cmd)
mock_mkdtemp.assert_called_with()