Add admin config sni cert endpoint

Change-Id: I32d3759c932c92fbfd73e925d2c8147e1234bf28
This commit is contained in:
Isaac Mungai 2016-09-15 11:13:01 -04:00
parent ae84ae694f
commit 24eb952e71
8 changed files with 196 additions and 12 deletions

View File

@ -314,7 +314,7 @@ class DefaultSSLCertificateController(base.SSLCertificateController):
raise ValueError(
"%s is not a valid san cert, valid san certs are: %s" %
(san_cert_name, akamai_driver.san_cert_cnames))
akamai_driver = self._driver.providers['akamai'].obj
# given the spsId, determine the most recent jobId
# and persist the jobId
if new_cert_config.get('spsId') is not None:
@ -342,6 +342,31 @@ class DefaultSSLCertificateController(base.SSLCertificateController):
return res
def get_sni_cert_configuration(self, cert_name):
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
self._validate_sni_cert_name(akamai_driver, cert_name)
res = akamai_driver.cert_info_storage.get_sni_cert_info(cert_name)
else:
# if not using akamai driver just return an empty list
res = {}
return res
def update_sni_cert_configuration(self, cert_name, new_cert_config):
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
self._validate_sni_cert_name(akamai_driver, cert_name)
res = akamai_driver.cert_info_storage.update_sni_cert_config(
cert_name,
new_cert_config
)
else:
# if not using akamai driver just return an empty list
res = {}
return res
def get_san_cert_hostname_limit(self):
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
@ -353,6 +378,14 @@ class DefaultSSLCertificateController(base.SSLCertificateController):
return res
@staticmethod
def _validate_sni_cert_name(provider_driver, cert_name):
if cert_name not in provider_driver.sni_cert_cnames:
raise ValueError(
"{0} is not a valid sni cert, "
"valid sni certs are: {1}".format(
cert_name, provider_driver.sni_cert_cnames))
def set_san_cert_hostname_limit(self, request_json):
if 'akamai' in self._driver.providers:
try:

View File

@ -144,6 +144,9 @@ class CassandraSanInfoStorage(base.BaseAkamaiSanInfoStorage):
def _get_akamai_san_certs_info(self):
return json.loads(self._get_akamai_provider_info()['info']['san_info'])
def _get_akamai_sni_certs_info(self):
return json.loads(self._get_akamai_provider_info()['info']['sni_info'])
def _get_akamai_san_certs_settings(self):
try:
return json.loads(
@ -212,6 +215,23 @@ class CassandraSanInfoStorage(base.BaseAkamaiSanInfoStorage):
return res
def get_sni_cert_info(self, cert_name):
cert_info = self._get_akamai_sni_certs_info().get(cert_name)
if cert_info is None:
raise ValueError('No san cert info found for %s.' % cert_name)
enrollment_id = cert_info.get("enrollmentId")
res = {
'cnameHostname': cert_name,
'enrollmentId': enrollment_id,
}
if any([i for i in [enrollment_id] if i is None]):
raise ValueError("SNI info error: {0}".format(res))
return res
def get_cert_config(self, san_cert_name):
res = self.get_cert_info(san_cert_name)
res['spsId'] = str(self.get_cert_last_spsid(san_cert_name))
@ -221,20 +241,27 @@ class CassandraSanInfoStorage(base.BaseAkamaiSanInfoStorage):
self.save_cert_config(san_cert_name, new_cert_config)
return self.get_cert_config(san_cert_name)
def save_cert_config(self, san_cert_name, new_cert_config):
san_info = self._get_akamai_san_certs_info()
the_san_cert_info = san_info.get(
san_cert_name
)
def update_sni_cert_config(self, sni_cert_name, new_cert_config):
self.save_cert_config(
sni_cert_name, new_cert_config, info_type='sni_info')
return self.get_sni_cert_info(sni_cert_name)
if the_san_cert_info is None:
raise ValueError('No san cert info found for %s.' % san_cert_name)
def save_cert_config(self, cert_name, new_cert_config,
info_type='san_info'):
if info_type == 'sni_info':
certs_info = self._get_akamai_sni_certs_info()
else:
certs_info = self._get_akamai_san_certs_info()
cert_info = certs_info.get(cert_name)
the_san_cert_info.update(new_cert_config)
san_info[san_cert_name] = the_san_cert_info
# Change the previous san info in the overall provider_info dictionary
if cert_info is None:
raise ValueError('No cert info found for %s.' % cert_name)
cert_info.update(new_cert_config)
certs_info[cert_name] = cert_info
# Change the previous info in the overall provider_info dictionary
provider_info = dict(self._get_akamai_provider_info()['info'])
provider_info['san_info'] = json.dumps(san_info)
provider_info[info_type] = json.dumps(certs_info)
stmt = query.SimpleStatement(
UPDATE_PROVIDER_INFO,

View File

@ -90,6 +90,8 @@ AKAMAI_OPTIONS = [
'Custom cert https policies'
),
cfg.ListOpt('sni_cert_cnames',
help='A list of sni certs cname host names'),
# SANCERT related configs
cfg.ListOpt('san_cert_cnames',
help='A list of san certs cnamehost names'),
@ -191,6 +193,7 @@ class CDNProvider(base.Driver):
])
self.san_cert_cnames = self.akamai_conf.san_cert_cnames
self.sni_cert_cnames = self.akamai_conf.sni_cert_cnames
self.san_cert_hostname_limit = self.akamai_conf.san_cert_hostname_limit
self.akamai_sps_api_client = self.akamai_policy_api_client

View File

@ -281,6 +281,45 @@ class AkamaiSanCertConfigController(base.Controller, hooks.HookController):
pecan.abort(400, str(e))
class AkamaiSNICertConfigController(base.Controller, hooks.HookController):
__hooks__ = [poppy_hooks.Context(), poppy_hooks.Error()]
@pecan.expose('json')
@decorators.validate(
query=rule.Rule(
helpers.is_valid_domain_by_name(),
helpers.abort_with_message))
def get_one(self, query):
try:
return (
self._driver.manager.ssl_certificate_controller.
get_sni_cert_configuration(query)
)
except Exception as e:
pecan.abort(400, str(e))
@pecan.expose('json')
@decorators.validate(
query=rule.Rule(
helpers.is_valid_domain_by_name(),
helpers.abort_with_message),
request=rule.Rule(
helpers.json_matches_service_schema(
ssl_certificate.SSLCertificateSchema.get_schema(
"sni_config", "POST")),
helpers.abort_with_message,
stoplight_helpers.pecan_getter))
def post(self, query):
request_json = json.loads(pecan.request.body.decode('utf-8'))
try:
res = (
self._driver.manager.ssl_certificate_controller.
update_sni_cert_configuration(query, request_json))
return res
except Exception as e:
pecan.abort(400, str(e))
class AkamaiSSLCertificateController(base.Controller, hooks.HookController):
__hooks__ = [poppy_hooks.Context(), poppy_hooks.Error()]
@ -288,6 +327,7 @@ class AkamaiSSLCertificateController(base.Controller, hooks.HookController):
super(AkamaiSSLCertificateController, self).__init__(driver)
self.__class__.retry_list = AkamaiRetryListController(driver)
self.__class__.config = AkamaiSanCertConfigController(driver)
self.__class__.sni_config = AkamaiSNICertConfigController(driver)
class AkamaiController(base.Controller, hooks.HookController):

View File

@ -110,6 +110,20 @@ class SSLCertificateSchema(schema_base.SchemaBase):
}
},
'sni_config': {
'POST': {
'type': 'object',
'additionalProperties': False,
'properties': {
'enrollmentId': {
'type': 'integer',
# we cannot have 0 or negative enrollmentId
'minimum': 1
}
}
}
},
'san_mapping_list': {
'PUT': {
'type': 'array',

View File

@ -54,6 +54,7 @@ class DefaultSSLCertificateControllerTests(base.TestCase):
def get_provider_by_name(name):
obj_mock = self.provider_mocks[name]
obj_mock.san_cert_cnames = ["san1", "san2"]
obj_mock.sni_cert_cnames = ["sni1", "sni2"]
obj_mock.akamai_sps_api_base_url = 'akamai_base_url/{spsId}'
provider = mock.Mock(obj=obj_mock)
@ -117,15 +118,28 @@ class DefaultSSLCertificateControllerTests(base.TestCase):
resp = self.scc.get_san_cert_configuration("san1")
self.assertIsNotNone(resp)
def test_get_sni_cert_configuration_positive(self):
resp = self.scc.get_sni_cert_configuration("sni1")
self.assertIsNotNone(resp)
def test_get_san_cert_configuration_positive_no_akamai_provider(self):
del self.provider_mocks['akamai']
resp = self.scc.get_san_cert_configuration("san1")
self.assertEqual({}, resp)
def test_get_sni_cert_configuration_positive_no_akamai_provider(self):
del self.provider_mocks['akamai']
resp = self.scc.get_sni_cert_configuration("sni1")
self.assertEqual({}, resp)
def test_get_san_cert_configuration_invalid_san_cert_cname(self):
with testtools.ExpectedException(ValueError):
self.scc.get_san_cert_configuration("non-existant")
def test_get_sni_cert_configuration_invalid_san_cert_cname(self):
with testtools.ExpectedException(ValueError):
self.scc.get_sni_cert_configuration("non-existant")
def test_set_san_cert_hostname_limit_positive(self):
resp = mock.Mock()
resp.status_code = 200
@ -184,6 +198,13 @@ class DefaultSSLCertificateControllerTests(base.TestCase):
self.scc.update_san_cert_configuration("non-existant",
{"spsId": '1234'})
def test_update_sni_cert_invalid_cert_cname(self):
with testtools.ExpectedException(ValueError):
self.scc.update_sni_cert_configuration(
"non-existant",
{"enrollmentId": '1234'}
)
def test_update_san_cert_configuration_api_failure(self):
resp = mock.Mock()
resp.status_code = 404

View File

@ -54,6 +54,8 @@ class TestCassandraCertInfoStorage(base.TestCase):
'{"ipVersion": "ipv4", "issuer": "symentec", '
'"slot_deployment_klass": "esslType", '
'"jobId": "1432", "spsId": 1423}}',
'sni_info':
'{"secured2.sni1.test-cdn.com": {"enrollmentId": "2345"}}',
'settings': '{"san_cert_hostname_limit": 80}'
}}]
@ -99,6 +101,18 @@ class TestCassandraCertInfoStorage(base.TestCase):
res == json.loads(self.get_returned_value[0]['info']['san_info'])
)
def test__get_akamai_sni_certs_info(self):
self.cassandra_storage = cassandra_storage.CassandraSanInfoStorage(
self.conf)
mock_execute = self.cassandra_storage.session.execute
mock_execute.return_value = self.get_returned_value
res = self.cassandra_storage._get_akamai_sni_certs_info()
mock_execute.assert_called()
self.assertTrue(
res == json.loads(self.get_returned_value[0]['info']['sni_info'])
)
def test_list_all_san_cert_names(self):
self.cassandra_storage = cassandra_storage.CassandraSanInfoStorage(
self.conf)
@ -179,6 +193,23 @@ class TestCassandraCertInfoStorage(base.TestCase):
)[cert_name]['spsId'])
)
def test_get_sni_cert_info(self):
self.cassandra_storage = cassandra_storage.CassandraSanInfoStorage(
self.conf)
mock_execute = self.cassandra_storage.session.execute
mock_execute.return_value = self.get_returned_value
cert_name = "secured2.sni1.test-cdn.com"
res = self.cassandra_storage.get_sni_cert_info(
cert_name
)
mock_execute.assert_called()
self.assertTrue(
res['enrollmentId'] == str(json.loads(
self.get_returned_value[0]['info']['sni_info']
)[cert_name]['enrollmentId'])
)
def test_update_cert_config(self):
self.cassandra_storage = cassandra_storage.CassandraSanInfoStorage(
self.conf)
@ -192,6 +223,19 @@ class TestCassandraCertInfoStorage(base.TestCase):
)
mock_execute.assert_called()
def test_update_sni_cert_config(self):
self.cassandra_storage = cassandra_storage.CassandraSanInfoStorage(
self.conf)
mock_execute = self.cassandra_storage.session.execute
mock_execute.return_value = self.get_returned_value
cert_name = "secured2.sni1.test-cdn.com"
new_enrollment_id = 9898
self.cassandra_storage.update_sni_cert_config(
cert_name, {'enrollmentId': new_enrollment_id}
)
mock_execute.assert_called()
def test_set_san_cert_hostname_limit(self):
self.cassandra_storage = cassandra_storage.CassandraSanInfoStorage(
self.conf

View File

@ -80,6 +80,8 @@ AKAMAI_OPTIONS = [
'Custom cert https policies'
),
cfg.ListOpt('sni_cert_cnames', default='secure.san.test.com',
help='A list of sni certs cname host names'),
# SANCERT related configs
cfg.ListOpt('san_cert_cnames', default='secure.san.test.com',
help='A list of san certs cnamehost names'),