Implement -k option for cfn-get-metadata

The -k argument can be a top-level key or a nested key in which case
the keys are separated by dots (eg "foo.bar"). In case a key contains a
dot character, it needs to be surrounded by single quotes (eg
"foo.'bar.1'.fred).
If the -k option is not provided, the command prints out the full
metadata structure as before.

Change-Id: Ib05d39672086001b83e8d7f56bc42cc4ba75751c
Fixes: bug #1183299
This commit is contained in:
Simon Pasquier 2013-09-09 15:26:47 +02:00
parent ed5365dffe
commit 63d70cd0ec
3 changed files with 134 additions and 8 deletions

View File

@ -82,4 +82,4 @@ metadata = cfn_helper.Metadata(args.stack_name,
credentials_file=args.credential_file)
metadata.retrieve()
LOG.debug(str(metadata))
metadata.display()
metadata.display(args.key)

View File

@ -1182,9 +1182,47 @@ class Metadata(object):
def __str__(self):
return json.dumps(self._metadata)
def display(self):
if self._metadata is not None:
def display(self, key=None):
"""Print the metadata to the standard output stream. By default the
full metadata is displayed but the ouptut can be limited to a specific
with the <key> argument.
Arguments:
key -- the metadata's key to display, nested keys can be specified
separating them by the dot character.
e.g., "foo.bar"
If the key contains a dot, it should be surrounded by single
quotes
e.g., "foo.'bar.1'"
"""
if self._metadata is None:
return
if key is None:
print(str(self))
return
value = None
md = self._metadata
while True:
key_match = re.match(r'^(?:(?:\'([^\']+)\')|([^\.]+))(?:\.|$)',
key)
if not key_match:
break
k = key_match.group(1) or key_match.group(2)
if isinstance(md, dict) and k in md:
key = key.replace(key_match.group(), '')
value = md = md[k]
else:
break
if key != '':
value = None
if value is not None:
print(json.dumps(value))
return
def _is_valid_metadata(self):

View File

@ -562,11 +562,6 @@ class TestMetadataRetrieve(testtools.TestCase):
"/tmp/foo": {"content": "bar"}}}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_str,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
@ -583,6 +578,99 @@ class TestMetadataRetrieve(testtools.TestCase):
"\"files\": {\"/tmp/foo\": {\"content\": \"bar\"}"
"}}}}\n")
def test_metadata_retrieve_by_key_passed(self):
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("foo")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(),
"{\"bar\": {\"fred.1\": \"abcd\"}}\n")
def test_metadata_retrieve_by_nested_key_passed(self):
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("foo.bar.'fred.1'")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(),
'"abcd"\n')
def test_metadata_retrieve_key_none(self):
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
"/tmp/foo": {"content": "bar"}}}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("no_key")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
def test_metadata_retrieve_by_nested_key_none(self):
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("foo.fred")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
def test_metadata_retrieve_by_nested_key_none_with_matching_string(self):
md_data = {"foo": "bar"}
md_str = json.dumps(md_data)
md = cfn_helper.Metadata('teststack', None)
self.assertTrue(md.retrieve(meta_str=md_data,
last_path=self.last_file))
self.assertThat(md_data, ttm.Equals(md._metadata))
self.assertEqual(md_str, str(md))
displayed = self.useFixture(fixtures.StringStream('stdout'))
fake_stdout = displayed.stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
md.display("foo.bar")
fake_stdout.flush()
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
def test_metadata_creates_cache(self):
temp_home = tempfile.mkdtemp()