Add ca_certificate option for SSL'd api

Change-Id: I12eb9dbbb8bee24e50ae342ffbc7356d4583a973
This commit is contained in:
Endre Karlson 2014-12-04 14:44:05 +01:00
parent de6d420f40
commit c917665af9
2 changed files with 14 additions and 3 deletions

View File

@ -35,6 +35,7 @@ opts = [
default='/var/lib/heat-cfntools/cfn-metadata-server',
help='Local file to read for metadata url if not explicitly '
' specified'),
cfg.StrOpt('ca_certificate', help='CA Certificate path'),
cfg.StrOpt('stack-name',
help='Stack name to describe'),
cfg.MultiStrOpt('path',
@ -105,7 +106,8 @@ class Collector(object):
params['Signature'] = signer.generate(credentials)
try:
content = self._session.get(
url, params=params, headers=headers)
url, params=params, headers=headers,
verify=CONF.cfn.ca_certificate)
content.raise_for_status()
except self._requests_impl.exceptions.RequestException as e:
logger.warn(e)

View File

@ -123,8 +123,9 @@ class FakeReqSession(object):
def __init__(self, testcase, expected_netloc):
self._test = testcase
self._expected_netloc = expected_netloc
self.verify = False
def get(self, url, params, headers):
def get(self, url, params, headers, verify=None):
self._test.addDetail('url', test_content.text_content(url))
url = urlparse.urlparse(url)
self._test.assertEqual(self._expected_netloc, url.netloc)
@ -144,6 +145,8 @@ class FakeReqSession(object):
detail = etree.SubElement(result, 'StackResourceDetail')
metadata = etree.SubElement(detail, 'Metadata')
metadata.text = json.dumps(self.SESSION_META_DATA)
if verify is not None:
self.verify = True
return FakeResponse(etree.tostring(root))
@ -186,7 +189,7 @@ class FakeFailRequests(object):
exceptions = requests.exceptions
class Session(object):
def get(self, url, params, headers):
def get(self, url, params, headers, verify=None):
raise requests.exceptions.HTTPError(403, 'Forbidden')
@ -220,6 +223,12 @@ class TestCfn(TestCfnBase):
self.assertEqual('', self.log.output)
def test_collect_with_ca_cert(self):
cfn.CONF.cfn.ca_certificate = "foo"
collector = cfn.Collector(requests_impl=FakeRequests(self))
collector.collect()
self.assertTrue(collector._session.verify)
def test_collect_cfn_fail(self):
cfn_collect = cfn.Collector(requests_impl=FakeFailRequests)
self.assertRaises(exc.CfnMetadataNotAvailable, cfn_collect.collect)