Merge "Remove Certificate Orders and CAs from API"

This commit is contained in:
Zuul 2018-01-23 04:43:05 +00:00 committed by Gerrit Code Review
commit 3d3ea33e8b
14 changed files with 38 additions and 3431 deletions

View File

@ -1,499 +0,0 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import versionutils
import pecan
from six.moves.urllib import parse
from barbican import api
from barbican.api import controllers
from barbican.common import exception as excep
from barbican.common import hrefs
from barbican.common import quota
from barbican.common import resources as res
from barbican.common import utils
from barbican.common import validators
from barbican import i18n as u
from barbican.model import models
from barbican.model import repositories as repo
from barbican.tasks import certificate_resources as cert_resources
LOG = utils.getLogger(__name__)
_DEPRECATION_MSG = '%s has been deprecated in the Newton release. ' \
'It will be removed in the Pike release.'
def _certificate_authority_not_found():
"""Throw exception indicating certificate authority not found."""
pecan.abort(404, u._('Not Found. CA not found.'))
def _certificate_authority_attribute_not_found():
"""Throw exception indicating CA attribute was not found."""
pecan.abort(404, u._('Not Found. CA attribute not found.'))
def _ca_not_in_project():
"""Throw exception certificate authority is not in project."""
pecan.abort(404, u._('Not Found. CA not in project.'))
def _requested_preferred_ca_not_a_project_ca():
"""Throw exception indicating that preferred CA is not a project CA."""
pecan.abort(
400,
u._('Cannot set CA as a preferred CA as it is not a project CA.')
)
def _cant_remove_preferred_ca_from_project():
pecan.abort(
409,
u._('Please change the preferred CA to a different project CA '
'before removing it.')
)
class CertificateAuthorityController(controllers.ACLMixin):
"""Handles certificate authority retrieval requests."""
def __init__(self, ca):
LOG.debug('=== Creating CertificateAuthorityController ===')
msg = _DEPRECATION_MSG % "Certificate Authorities API"
versionutils.report_deprecated_feature(LOG, msg)
self.ca = ca
self.ca_repo = repo.get_ca_repository()
self.project_ca_repo = repo.get_project_ca_repository()
self.preferred_ca_repo = repo.get_preferred_ca_repository()
self.project_repo = repo.get_project_repository()
def __getattr__(self, name):
route_table = {
'add-to-project': self.add_to_project,
'remove-from-project': self.remove_from_project,
'set-global-preferred': self.set_global_preferred,
'set-preferred': self.set_preferred,
}
if name in route_table:
return route_table[name]
raise AttributeError
@pecan.expose()
def _lookup(self, attribute, *remainder):
_certificate_authority_attribute_not_found()
@pecan.expose(generic=True)
def index(self, **kwargs):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@index.when(method='GET', template='json')
@controllers.handle_exceptions(u._('Certificate Authority retrieval'))
@controllers.enforce_rbac('certificate_authority:get')
def on_get(self, external_project_id):
LOG.debug("== Getting certificate authority for %s", self.ca.id)
return self.ca.to_dict_fields()
@pecan.expose()
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('CA Signing Cert retrieval'))
@controllers.enforce_rbac('certificate_authority:get_cacert')
def cacert(self, external_project_id):
LOG.debug("== Getting signing cert for %s", self.ca.id)
cacert = self.ca.ca_meta['ca_signing_certificate'].value
return cacert
@pecan.expose()
@controllers.handle_exceptions(u._('CA Cert Chain retrieval'))
@controllers.enforce_rbac('certificate_authority:get_ca_cert_chain')
def intermediates(self, external_project_id):
LOG.debug("== Getting CA Cert Chain for %s", self.ca.id)
cert_chain = self.ca.ca_meta['intermediates'].value
return cert_chain
@pecan.expose(template='json')
@controllers.handle_exceptions(u._('CA projects retrieval'))
@controllers.enforce_rbac('certificate_authority:get_projects')
def projects(self, external_project_id):
LOG.debug("== Getting Projects for %s", self.ca.id)
project_cas = self.ca.project_cas
if not project_cas:
ca_projects_resp = {'projects': []}
else:
project_list = []
for p in project_cas:
project_list.append(p.project.external_id)
ca_projects_resp = {'projects': project_list}
return ca_projects_resp
@pecan.expose()
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Add CA to project'))
@controllers.enforce_rbac('certificate_authority:add_to_project')
def add_to_project(self, external_project_id):
if pecan.request.method != 'POST':
pecan.abort(405)
LOG.debug("== Saving CA %s to external_project_id %s",
self.ca.id, external_project_id)
project_model = res.get_or_create_project(external_project_id)
# CA must be a base CA or a subCA owned by this project
if (self.ca.project_id is not None and
self.ca.project_id != project_model.id):
raise excep.UnauthorizedSubCA()
project_cas = project_model.cas
num_cas = len(project_cas)
for project_ca in project_cas:
if project_ca.ca_id == self.ca.id:
# project already added
return
project_ca = models.ProjectCertificateAuthority(
project_model.id, self.ca.id)
self.project_ca_repo.create_from(project_ca)
if num_cas == 0:
# set first project CA to be the preferred one
preferred_ca = models.PreferredCertificateAuthority(
project_model.id, self.ca.id)
self.preferred_ca_repo.create_from(preferred_ca)
@pecan.expose()
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Remove CA from project'))
@controllers.enforce_rbac('certificate_authority:remove_from_project')
def remove_from_project(self, external_project_id):
if pecan.request.method != 'POST':
pecan.abort(405)
LOG.debug("== Removing CA %s from project_external_id %s",
self.ca.id, external_project_id)
project_model = res.get_or_create_project(external_project_id)
(project_ca, __offset, __limit, __total) = (
self.project_ca_repo.get_by_create_date(
project_id=project_model.id,
ca_id=self.ca.id,
suppress_exception=True))
if project_ca:
self._do_remove_from_project(project_ca[0])
else:
_ca_not_in_project()
def _do_remove_from_project(self, project_ca):
project_id = project_ca.project_id
ca_id = project_ca.ca_id
preferred_ca = self.preferred_ca_repo.get_project_entities(
project_id)[0]
if cert_resources.is_last_project_ca(project_id):
self.preferred_ca_repo.delete_entity_by_id(preferred_ca.id, None)
else:
self._assert_is_not_preferred_ca(preferred_ca.ca_id, ca_id)
self.project_ca_repo.delete_entity_by_id(project_ca.id, None)
def _assert_is_not_preferred_ca(self, preferred_ca_id, ca_id):
if preferred_ca_id == ca_id:
_cant_remove_preferred_ca_from_project()
@pecan.expose()
@controllers.handle_exceptions(u._('Set preferred project CA'))
@controllers.enforce_rbac('certificate_authority:set_preferred')
def set_preferred(self, external_project_id):
if pecan.request.method != 'POST':
pecan.abort(405)
LOG.debug("== Setting preferred CA %s for project %s",
self.ca.id, external_project_id)
project_model = res.get_or_create_project(external_project_id)
(project_ca, __offset, __limit, __total) = (
self.project_ca_repo.get_by_create_date(
project_id=project_model.id,
ca_id=self.ca.id,
suppress_exception=True))
if not project_ca:
_requested_preferred_ca_not_a_project_ca()
self.preferred_ca_repo.create_or_update_by_project_id(
project_model.id, self.ca.id)
@pecan.expose()
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Set global preferred CA'))
@controllers.enforce_rbac('certificate_authority:set_global_preferred')
def set_global_preferred(self, external_project_id):
if pecan.request.method != 'POST':
pecan.abort(405)
LOG.debug("== Set global preferred CA %s", self.ca.id)
project = res.get_or_create_global_preferred_project()
self.preferred_ca_repo.create_or_update_by_project_id(
project.id, self.ca.id)
@index.when(method='DELETE')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('CA deletion'))
@controllers.enforce_rbac('certificate_authority:delete')
def on_delete(self, external_project_id, **kwargs):
cert_resources.delete_subordinate_ca(external_project_id, self.ca)
LOG.info('Deleted CA for project: %s', external_project_id)
class CertificateAuthoritiesController(controllers.ACLMixin):
"""Handles certificate authority list requests."""
def __init__(self):
LOG.debug('Creating CertificateAuthoritiesController')
msg = _DEPRECATION_MSG % "Certificate Authorities API"
versionutils.report_deprecated_feature(LOG, msg)
self.ca_repo = repo.get_ca_repository()
self.project_ca_repo = repo.get_project_ca_repository()
self.preferred_ca_repo = repo.get_preferred_ca_repository()
self.project_repo = repo.get_project_repository()
self.validator = validators.NewCAValidator()
self.quota_enforcer = quota.QuotaEnforcer('cas', self.ca_repo)
# Populate the CA table at start up
cert_resources.refresh_certificate_resources()
def __getattr__(self, name):
route_table = {
'all': self.get_all,
'global-preferred': self.get_global_preferred,
'preferred': self.preferred,
'unset-global-preferred': self.unset_global_preferred,
}
if name in route_table:
return route_table[name]
raise AttributeError
@pecan.expose()
def _lookup(self, ca_id, *remainder):
ca = self.ca_repo.get(entity_id=ca_id, suppress_exception=True)
if not ca:
_certificate_authority_not_found()
return CertificateAuthorityController(ca), remainder
@pecan.expose(generic=True)
def index(self, **kwargs):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@index.when(method='GET', template='json')
@controllers.handle_exceptions(
u._('Certificate Authorities retrieval (limited)'))
@controllers.enforce_rbac('certificate_authorities:get_limited')
def on_get(self, external_project_id, **kw):
LOG.debug('Start certificate_authorities on_get (limited)')
plugin_name = kw.get('plugin_name')
if plugin_name is not None:
plugin_name = parse.unquote_plus(plugin_name)
plugin_ca_id = kw.get('plugin_ca_id', None)
if plugin_ca_id is not None:
plugin_ca_id = parse.unquote_plus(plugin_ca_id)
# refresh CA table, in case plugin entries have expired
cert_resources.refresh_certificate_resources()
project_model = res.get_or_create_project(external_project_id)
if self._project_cas_defined(project_model.id):
cas, offset, limit, total = self._get_subcas_and_project_cas(
offset=kw.get('offset', 0),
limit=kw.get('limit', None),
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_model.id)
else:
cas, offset, limit, total = self._get_subcas_and_root_cas(
offset=kw.get('offset', 0),
limit=kw.get('limit', None),
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_model.id)
return self._display_cas(cas, offset, limit, total)
@pecan.expose(generic=True, template='json')
@controllers.handle_exceptions(u._('Certificate Authorities retrieval'))
@controllers.enforce_rbac('certificate_authorities:get')
def get_all(self, external_project_id, **kw):
LOG.debug('Start certificate_authorities on_get')
plugin_name = kw.get('plugin_name')
if plugin_name is not None:
plugin_name = parse.unquote_plus(plugin_name)
plugin_ca_id = kw.get('plugin_ca_id', None)
if plugin_ca_id is not None:
plugin_ca_id = parse.unquote_plus(plugin_ca_id)
# refresh CA table, in case plugin entries have expired
cert_resources.refresh_certificate_resources()
project_model = res.get_or_create_project(external_project_id)
cas, offset, limit, total = self._get_subcas_and_root_cas(
offset=kw.get('offset', 0),
limit=kw.get('limit', None),
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_model.id)
return self._display_cas(cas, offset, limit, total)
def _get_project_cas(self, project_id, query_filters):
cas, offset, limit, total = self.project_ca_repo.get_by_create_date(
offset_arg=query_filters.get('offset', 0),
limit_arg=query_filters.get('limit', None),
project_id=project_id,
suppress_exception=True
)
return cas, offset, limit, total
def _project_cas_defined(self, project_id):
_cas, _offset, _limit, total = self._get_project_cas(project_id, {})
return total > 0
def _get_subcas_and_project_cas(self, offset, limit, plugin_name,
plugin_ca_id, project_id):
return self.ca_repo.get_by_create_date(
offset_arg=offset,
limit_arg=limit,
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_id,
restrict_to_project_cas=True,
suppress_exception=True)
def _get_subcas_and_root_cas(self, offset, limit, plugin_name,
plugin_ca_id, project_id):
return self.ca_repo.get_by_create_date(
offset_arg=offset,
limit_arg=limit,
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_id,
restrict_to_project_cas=False,
suppress_exception=True)
def _display_cas(self, cas, offset, limit, total):
if not cas:
cas_resp_overall = {'cas': [],
'total': total}
else:
cas_resp = [
hrefs.convert_certificate_authority_to_href(ca.id)
for ca in cas]
cas_resp_overall = hrefs.add_nav_hrefs('cas', offset, limit, total,
{'cas': cas_resp})
cas_resp_overall.update({'total': total})
return cas_resp_overall
@pecan.expose(generic=True, template='json')
@controllers.handle_exceptions(u._('Retrieve global preferred CA'))
@controllers.enforce_rbac(
'certificate_authorities:get_global_preferred_ca')
def get_global_preferred(self, external_project_id, **kw):
LOG.debug('Start certificate_authorities get_global_preferred CA')
pref_ca = cert_resources.get_global_preferred_ca()
if not pref_ca:
pecan.abort(404, u._("No global preferred CA defined"))
return {
'ca_ref':
hrefs.convert_certificate_authority_to_href(pref_ca.ca_id)
}
@pecan.expose()
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Unset global preferred CA'))
@controllers.enforce_rbac('certificate_authorities:unset_global_preferred')
def unset_global_preferred(self, external_project_id):
if pecan.request.method != 'POST':
pecan.abort(405)
LOG.debug("== Unsetting global preferred CA")
self._remove_global_preferred_ca(external_project_id)
def _remove_global_preferred_ca(self, external_project_id):
global_preferred_ca = cert_resources.get_global_preferred_ca()
if global_preferred_ca:
self.preferred_ca_repo.delete_entity_by_id(
global_preferred_ca.id,
external_project_id)
@pecan.expose(generic=True, template='json')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Retrieve project preferred CA'))
@controllers.enforce_rbac('certificate_authorities:get_preferred_ca')
def preferred(self, external_project_id, **kw):
LOG.debug('Start certificate_authorities get'
' project preferred CA')
project = res.get_or_create_project(external_project_id)
pref_ca_id = cert_resources.get_project_preferred_ca_id(project.id)
if not pref_ca_id:
pecan.abort(404, u._("No preferred CA defined for this project"))
return {
'ca_ref':
hrefs.convert_certificate_authority_to_href(pref_ca_id)
}
@index.when(method='POST', template='json')
@controllers.handle_exceptions(u._('CA creation'))
@controllers.enforce_rbac('certificate_authorities:post')
@controllers.enforce_content_types(['application/json'])
def on_post(self, external_project_id, **kwargs):
LOG.debug('Start on_post for project-ID %s:...',
external_project_id)
data = api.load_body(pecan.request, validator=self.validator)
project = res.get_or_create_project(external_project_id)
ctxt = controllers._get_barbican_context(pecan.request)
if ctxt: # in authenticated pipeline case, always use auth token user
creator_id = ctxt.user
self.quota_enforcer.enforce(project)
new_ca = cert_resources.create_subordinate_ca(
project_model=project,
name=data.get('name'),
description=data.get('description'),
subject_dn=data.get('subject_dn'),
parent_ca_ref=data.get('parent_ca_ref'),
creator_id=creator_id
)
url = hrefs.convert_certificate_authority_to_href(new_ca.id)
LOG.debug('URI to sub-CA is %s', url)
pecan.response.status = 201
pecan.response.headers['Location'] = url
LOG.info('Created a sub CA for project: %s',
external_project_id)
return {'ca_ref': url}

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import versionutils
import pecan
from barbican import api
@ -47,12 +46,6 @@ def _order_update_not_supported():
pecan.abort(405, u._("Order update is not supported."))
def _order_update_not_supported_for_type(order_type):
"""Throw exception that update is not supported."""
pecan.abort(400, u._("Updates are not supported for order type "
"{0}.").format(order_type))
def _order_cannot_be_updated_if_not_pending(order_status):
"""Throw exception that order cannot be updated if not PENDING."""
pecan.abort(400, u._("Only PENDING orders can be updated. Order is in the"
@ -84,41 +77,6 @@ class OrderController(controllers.ACLMixin):
def on_get(self, external_project_id):
return hrefs.convert_to_hrefs(self.order.to_dict_fields())
@index.when(method='PUT')
@controllers.handle_exceptions(u._('Order update'))
@controllers.enforce_rbac('order:put')
@controllers.enforce_content_types(['application/json'])
def on_put(self, external_project_id, **kwargs):
body = api.load_body(pecan.request,
validator=self.type_order_validator)
project = res.get_or_create_project(external_project_id)
order_type = body.get('type')
request_id = None
ctxt = controllers._get_barbican_context(pecan.request)
if ctxt and ctxt.request_id:
request_id = ctxt.request_id
if self.order.type != order_type:
order_cannot_modify_order_type()
if models.OrderType.CERTIFICATE != self.order.type:
_order_update_not_supported_for_type(order_type)
if models.States.PENDING != self.order.status:
_order_cannot_be_updated_if_not_pending(self.order.status)
updated_meta = body.get('meta')
validators.validate_ca_id(project.id, updated_meta)
# TODO(chellygel): Put 'meta' into a separate order association
# entity.
self.queue.update_order(order_id=self.order.id,
project_id=external_project_id,
updated_meta=updated_meta,
request_id=request_id)
@index.when(method='DELETE')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Order deletion'))
@ -214,16 +172,6 @@ class OrdersController(controllers.ACLMixin):
{'order_type': order_type,
'request_type': request_type})
if order_type == models.OrderType.CERTIFICATE:
msg = _DEPRECATION_MSG % "Certificate Order Resource"
versionutils.report_deprecated_feature(LOG, msg)
validators.validate_ca_id(project.id, body.get('meta'))
if request_type == 'stored-key':
container_ref = order_meta.get('container_ref')
validators.validate_stored_key_rsa_container(
external_project_id,
container_ref, pecan.request)
self.quota_enforcer.enforce(project)
new_order = models.Order()

View File

@ -14,7 +14,6 @@ import pecan
from six.moves.urllib import parse
from barbican.api import controllers
from barbican.api.controllers import cas
from barbican.api.controllers import containers
from barbican.api.controllers import orders
from barbican.api.controllers import quotas
@ -93,7 +92,6 @@ class V1Controller(BaseVersionController):
self.orders = orders.OrdersController()
self.containers = containers.ContainersController()
self.transport_keys = transportkeys.TransportKeysController()
self.cas = cas.CertificateAuthoritiesController()
self.quotas = quotas.QuotasController()
setattr(self, 'project-quotas', quotas.ProjectsQuotasController())
setattr(self, 'secret-stores', secretstores.SecretStoresController())

View File

@ -46,15 +46,6 @@ class TaskClient(object):
project_id=project_id,
request_id=request_id)
def update_order(self, order_id, project_id, updated_meta, request_id):
"""Update Order."""
self._cast('update_order',
order_id=order_id,
project_id=project_id,
updated_meta=updated_meta,
request_id=request_id)
def check_certificate_status(self, order_id, project_id, request_id):
"""Check the status of a certificate order."""
self._cast('check_certificate_status',

View File

@ -211,19 +211,6 @@ class Tasks(object):
return resources.BeginTypeOrder().process_and_suppress_exceptions(
order_id, project_id)
@monitored
@transactional
@retryable_order
def update_order(self, context, order_id, project_id,
updated_meta, request_id):
"""Update Order."""
message = "Processing update order: order ID is '%(order)s' and " \
"request ID is '%(request)s'"
LOG.info(message, {'order': order_id, 'request': request_id})
return resources.UpdateOrder().process_and_suppress_exceptions(
order_id, project_id, updated_meta)
@monitored
@transactional
@retryable_order

View File

@ -1,556 +0,0 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from six import moves
from barbican.common import exception
from barbican.common import hrefs
from barbican.common import resources as res
from barbican.model import models
from barbican.model import repositories
from barbican.tests import utils
project_repo = repositories.get_project_repository()
ca_repo = repositories.get_ca_repository()
project_ca_repo = repositories.get_project_repository()
preferred_ca_repo = repositories.get_preferred_ca_repository()
def create_ca(parsed_ca, id_ref="id"):
"""Generate a CA entity instance."""
ca = models.CertificateAuthority(parsed_ca)
ca.id = id_ref
return ca
class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
def test_should_get_list_certificate_authorities(self):
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user="user1")
}
self.create_cas(set_project_cas=False)
resp = self.app.get('/cas/', self.params)
self.assertEqual(self.limit, len(resp.namespace['cas']))
self.assertIn('previous', resp.namespace)
self.assertIn('next', resp.namespace)
url_nav_next = self._create_url(self.project_id,
self.offset + self.limit, self.limit)
self.assertEqual(1, resp.body.decode('utf-8').count(url_nav_next))
url_nav_prev = self._create_url(self.project_id,
0, self.limit)
self.assertEqual(1, resp.body.decode('utf-8').count(url_nav_prev))
url_hrefs = self._create_url(self.project_id)
self.assertEqual((self.limit + 2),
resp.body.decode('utf-8').count(url_hrefs))
def test_response_should_list_subca_and_project_cas(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
resp = self.app.get('/cas/', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(3, resp.namespace['total'])
ca_refs = list(resp.namespace['cas'])
for ca_ref in ca_refs:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
if not ((ca_id in self.project_ca_ids)
or (ca_id == self.subca.id)):
self.fail("Invalid CA reference returned")
def test_response_should_all_except_subca(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context("other_project",
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
self.params['plugin_name'] = self.plugin_name
resp = self.app.get('/cas/', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(self.num_cas - 1, resp.namespace['total'])
ca_refs = list(resp.namespace['cas'])
for ca_ref in ca_refs:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
self.assertNotEqual(ca_id, self.subca.id)
def test_response_should_all_except_subca_from_all_subresource(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context("other_project",
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
self.params['plugin_name'] = self.plugin_name
resp = self.app.get('/cas/all', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(self.num_cas - 1, resp.namespace['total'])
ca_refs = list(resp.namespace['cas'])
for ca_ref in ca_refs:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
self.assertNotEqual(ca_id, self.subca.id)
def test_response_should_all_from_all_subresource(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
self.params['plugin_name'] = self.plugin_name
resp = self.app.get('/cas/all', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(self.num_cas, resp.namespace['total'])
def test_response_should_all_cas(self):
self.create_cas(set_project_cas=False)
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
self.params['plugin_name'] = self.plugin_name
resp = self.app.get('/cas/', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(self.num_cas, resp.namespace['total'])
def test_should_get_list_certificate_authorities_with_params(self):
self.create_cas(set_project_cas=False)
self.params['plugin_name'] = self.plugin_name
self.params['plugin_ca_id'] = self.plugin_ca_id + str(1)
self.params['offset'] = 0
resp = self.app.get('/cas/', self.params)
self.assertNotIn('previous', resp.namespace)
self.assertNotIn('next', resp.namespace)
self.assertEqual(1, resp.namespace['total'])
def test_should_get_with_params_on_all_resource(self):
self.create_cas(set_project_cas=False)
self.params['plugin_name'] = self.plugin_name
self.params['plugin_ca_id'] = self.plugin_ca_id + str(1)
self.params['offset'] = 0
resp = self.app.get('/cas/all', self.params)
self.assertNotIn('previous', resp.namespace)
self.assertNotIn('next', resp.namespace)
self.assertEqual(1, resp.namespace['total'])
def test_should_handle_no_cas(self):
self.params = {'offset': 0, 'limit': 2, 'plugin_name': 'dummy'}
resp = self.app.get('/cas/', self.params)
self.assertEqual([], resp.namespace.get('cas'))
self.assertEqual(0, resp.namespace.get('total'))
self.assertNotIn('previous', resp.namespace)
self.assertNotIn('next', resp.namespace)
def test_should_get_global_preferred_ca(self):
self.create_cas()
resp = self.app.get('/cas/global-preferred')
self.assertEqual(
hrefs.convert_certificate_authority_to_href(
self.global_preferred_ca.id),
resp.namespace['ca_ref'])
def test_should_get_no_global_preferred_ca(self):
resp = self.app.get('/cas/global-preferred', expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_get_preferred_ca_not_found(self):
self.project = res.get_or_create_project(self.project_id)
project_repo.save(self.project)
resp = self.app.get('/cas/preferred', expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_get_preferred_ca(self):
self.create_cas()
resp = self.app.get('/cas/preferred')
self.assertEqual(
hrefs.convert_certificate_authority_to_href(
self.preferred_ca.id),
resp.namespace['ca_ref'])
def test_should_get_ca(self):
self.create_cas()
resp = self.app.get('/cas/{0}'.format(self.selected_ca_id))
self.assertEqual(self.selected_ca_id,
resp.namespace['ca_id'])
self.assertEqual(self.selected_plugin_ca_id,
resp.namespace['plugin_ca_id'])
def test_should_throw_exception_for_get_when_ca_not_found(self):
self.create_cas()
resp = self.app.get('/cas/bogus_ca_id', expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_get_signing_certificate(self):
self.create_cas()
resp = self.app.get('/cas/{0}/cacert'.format(self.selected_ca_id))
self.assertEqual(self.selected_signing_cert, resp.body.decode('utf-8'))
def test_should_raise_for_get_signing_certificate_ca_not_found(self):
resp = self.app.get('/cas/bogus_ca/cacert', expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_get_cert_chain(self):
self.create_cas()
resp = self.app.get('/cas/{0}/intermediates'.format(
self.selected_ca_id))
self.assertEqual(self.selected_intermediates,
resp.body.decode('utf-8'))
def test_should_raise_for_get_cert_chain_ca_not_found(self):
resp = self.app.get('/cas/bogus_ca/intermediates', expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_raise_for_ca_attribute_not_found(self):
self.create_cas()
resp = self.app.get('/cas/{0}/bogus'.format(self.selected_ca_id),
expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_add_to_project(self):
self.create_cas()
resp = self.app.post('/cas/{0}/add-to-project'.format(
self.selected_ca_id))
self.assertEqual(204, resp.status_int)
# TODO(alee) need more detailed tests here
def test_should_add_existing_project_ca_to_project(self):
self.create_cas()
resp = self.app.post('/cas/{0}/add-to-project'.format(
self.project_ca_ids[0]))
self.assertEqual(204, resp.status_int)
# TODO(alee) need more detailed tests here
def test_should_raise_add_to_project_on_ca_not_found(self):
resp = self.app.post(
'/cas/bogus_ca/add-to-project', expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_raise_add_to_project_on_ca_not_owned_by_project(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context("other_project",
user="user1")
}
resp = self.app.post('/cas/{0}/add-to-project'.format(
self.subca.id), expect_errors=True)
self.assertEqual(403, resp.status_int)
def test_should_raise_add_to_project_not_post(self):
self.create_cas()
resp = self.app.get(
'/cas/{0}/add_to_project'.format(self.selected_ca_id),
expect_errors=True)
self.assertEqual(405, resp.status_int)
def test_should_remove_from_project(self):
self.create_cas()
resp = self.app.post('/cas/{0}/remove-from-project'.format(
self.project_ca_ids[0]))
self.assertEqual(204, resp.status_int)
# TODO(alee) need more detailed tests here
def test_should_raise_remove_from_project_preferred_ca(self):
self.create_cas()
resp = self.app.post('/cas/{0}/remove-from-project'.format(
self.project_ca_ids[1]),
expect_errors=True)
self.assertEqual(409, resp.status_int)
def test_should_remove_preferred_ca_if_last_project_ca(self):
self.create_cas()
resp = self.app.post('/cas/{0}/remove-from-project'.format(
self.project_ca_ids[0]))
self.assertEqual(204, resp.status_int)
resp = self.app.post('/cas/{0}/remove-from-project'.format(
self.project_ca_ids[1]))
self.assertEqual(204, resp.status_int)
def test_should_raise_remove_from_project_not_currently_set(self):
self.create_cas()
resp = self.app.post(
'/cas/{0}/remove-from-project'.format(self.selected_ca_id),
expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_raise_remove_form_project_on_ca_not_found(self):
self.create_cas()
resp = self.app.post('/cas/bogus_ca/remove-from-project',
expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_raise_remove_from_project_not_post(self):
self.create_cas()
resp = self.app.get(
'/cas/{0}/remove-from-project'.format(self.selected_ca_id),
expect_errors=True)
self.assertEqual(405, resp.status_int)
def test_should_set_preferred_modify_existing(self):
self.create_cas()
self.app.post(
'/cas/{0}/set-preferred'.format(self.project_ca_ids[1]))
def test_should_raise_set_preferred_ca_not_found(self):
self.create_cas()
resp = self.app.post('/cas/bogus_ca/set-preferred', expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_raise_set_preferred_ca_not_in_project(self):
self.create_cas()
resp = self.app.post(
'/cas/{0}/set-preferred'.format(self.selected_ca_id),
expect_errors=True)
self.assertEqual(400, resp.status_int)
def test_should_raise_set_preferred_ca_not_post(self):
self.create_cas()
resp = self.app.get(
'/cas/{0}/set-preferred'.format(self.selected_ca_id),
expect_errors=True)
self.assertEqual(405, resp.status_int)
def test_should_set_global_preferred(self):
self.create_cas()
self.app.post(
'/cas/{0}/set-global-preferred'.format(self.selected_ca_id))
def test_should_raise_set_global_preferred_ca_not_found(self):
resp = self.app.post(
'/cas/bogus_ca/set-global-preferred',
expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_raise_set_global_preferred_ca_not_post(self):
self.create_cas()
resp = self.app.get(
'/cas/{0}/set-global-preferred'.format(self.selected_ca_id),
expect_errors=True)
self.assertEqual(405, resp.status_int)
def test_should_unset_global_preferred(self):
self.create_cas()
resp = self.app.post(
'/cas/unset-global-preferred')
self.assertEqual(204, resp.status_int)
def test_should_unset_global_preferred_not_post(self):
self.create_cas()
resp = self.app.get(
'/cas/unset-global-preferred',
expect_errors=True)
self.assertEqual(405, resp.status_int)
def test_should_get_projects(self):
self.create_cas()
resp = self.app.get(
'/cas/{0}/projects'.format(self.project_ca_ids[0]))
self.assertEqual(
self.project.external_id,
resp.namespace['projects'][0])
def test_should_get_no_projects(self):
self.create_cas()
resp = self.app.get('/cas/{0}/projects'.format(self.selected_ca_id))
self.assertEqual([], resp.namespace['projects'])
def test_should_raise_get_projects_ca_not_found(self):
self.create_cas()
resp = self.app.get(
'/cas/bogus_ca/projects'.format(self.project_ca_ids[0]),
expect_errors=True)
self.assertEqual(404, resp.status_int)
@mock.patch('barbican.tasks.certificate_resources.create_subordinate_ca')
def test_should_create_subca(self, mocked_task):
self.create_cas()
self.create_subca_request(self.selected_ca_id)
mocked_task.return_value = models.CertificateAuthority(
self.parsed_subca)
resp = self.app.post_json(
'/cas',
self.subca_request,
expect_errors=False)
self.assertEqual(201, resp.status_int)
def test_should_raise_delete_subca_not_found(self):
self.create_cas()
resp = self.app.delete('/cas/foobar', expect_errors=True)
self.assertEqual(404, resp.status_int)
@mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca')
def test_should_delete_subca(self, mocked_task):
self.create_cas()
resp = self.app.delete('/cas/' + self.subca.id)
mocked_task.assert_called_once_with(self.project_id,
self.subca)
self.assertEqual(204, resp.status_int)
@mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca')
def test_should_raise_delete_not_a_subca(self, mocked_task):
self.create_cas()
mocked_task.side_effect = exception.CannotDeleteBaseCA()
resp = self.app.delete('/cas/' + self.subca.id,
expect_errors=True)
mocked_task.assert_called_once_with(self.project_id,
self.subca)
self.assertEqual(403, resp.status_int)
@mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca')
def test_should_raise_delete_not_authorized(self, mocked_task):
self.create_cas()
mocked_task.side_effect = exception.UnauthorizedSubCA()
resp = self.app.delete('/cas/' + self.subca.id,
expect_errors=True)
mocked_task.assert_called_once_with(self.project_id,
self.subca)
self.assertEqual(403, resp.status_int)
def create_subca_request(self, parent_ca_id):
self.subca_request = {
'name': "Subordinate CA",
'subject_dn': 'cn=subordinate ca signing cert, o=example.com',
'parent_ca_ref': "https://localhost:9311/cas/" + parent_ca_id
}
self.parsed_subca = {
'plugin_name': self.plugin_name,
'plugin_ca_id': self.plugin_ca_id + '_subca_id',
'name': self.plugin_name,
'description': 'Subordinate CA',
'ca_signing_certificate': 'ZZZZZ...Subordinate...',
'intermediates': 'YYYYY...subordinate...',
'parent_ca_id': parent_ca_id
}
def create_cas(self, set_project_cas=True):
self.project = res.get_or_create_project(self.project_id)
self.global_project = res.get_or_create_global_preferred_project()
project_repo.save(self.project)
self.project_ca_ids = []
self.plugin_name = 'default_plugin'
self.plugin_ca_id = 'default_plugin_ca_id_'
self.ca_id = "id1"
self.num_cas = 10
self.offset = 2
self.limit = 4
self.params = {'offset': self.offset, 'limit': self.limit}
self._do_create_cas(set_project_cas)
# create subca for DELETE testing
parsed_ca = {
'plugin_name': self.plugin_name,
'plugin_ca_id': self.plugin_ca_id + "subca 1",
'name': self.plugin_name,
'description': 'Sub CA for default plugin',
'ca_signing_certificate': 'ZZZZZ' + "sub ca1",
'intermediates': 'YYYYY' + "sub ca1",
'project_id': self.project.id,
'creator_id': 'user12345'
}
ca = models.CertificateAuthority(parsed_ca)
ca_repo.create_from(ca)
ca_repo.save(ca)
self.subca = ca
self.num_cas += 1
def _do_create_cas(self, set_project_cas):
for ca_id in moves.range(self.num_cas):
parsed_ca = {
'plugin_name': self.plugin_name,
'plugin_ca_id': self.plugin_ca_id + str(ca_id),
'name': self.plugin_name,
'description': 'Master CA for default plugin',
'ca_signing_certificate': 'ZZZZZ' + str(ca_id),
'intermediates': 'YYYYY' + str(ca_id)
}
ca = models.CertificateAuthority(parsed_ca)
ca_repo.create_from(ca)
ca_repo.save(ca)
if ca_id == 1:
# set global preferred ca
pref_ca = models.PreferredCertificateAuthority(
self.global_project.id,
ca.id)
preferred_ca_repo.create_from(pref_ca)
preferred_ca_repo.save(pref_ca)
self.global_preferred_ca = ca
if ca_id == 2 and set_project_cas:
# set project CA
project_ca = models.ProjectCertificateAuthority(
self.project.id, ca.id)
project_ca_repo.create_from(project_ca)
project_ca_repo.save(project_ca)
self.project_ca_ids.append(ca.id)
if ca_id == 3 and set_project_cas:
# set project preferred CA
project_ca = models.ProjectCertificateAuthority(
self.project.id, ca.id)
project_ca_repo.create_from(project_ca)
project_ca_repo.save(project_ca)
self.project_ca_ids.append(ca.id)
pref_ca = models.PreferredCertificateAuthority(
self.project.id, ca.id)
preferred_ca_repo.create_from(pref_ca)
preferred_ca_repo.save(pref_ca)
self.preferred_ca = ca
if ca_id == 4:
# set ca for testing GETs for a single CA
self.selected_ca_id = ca.id
self.selected_plugin_ca_id = self.plugin_ca_id + str(ca_id)
self.selected_signing_cert = 'ZZZZZ' + str(ca_id)
self.selected_intermediates = 'YYYYY' + str(ca_id)
def _create_url(self, external_project_id, offset_arg=None,
limit_arg=None):
if limit_arg:
offset = int(offset_arg)
limit = int(limit_arg)
return '/cas?limit={0}&offset={1}'.format(
limit, offset)
else:
return '/cas'

View File

@ -15,13 +15,8 @@
import os
import uuid
import mock
from barbican.common import resources
from barbican.model import models
from barbican.model import repositories
from barbican.tests.api.controllers import test_acls
from barbican.tests.api import test_resources_policy as test_policy
from barbican.tests import utils
from oslo_utils import uuidutils
@ -213,90 +208,6 @@ class WhenGettingOrDeletingOrders(utils.BarbicanAPIBaseTestCase):
self.assertEqual(404, resp.status_int)
@utils.parameterized_test_case
class WhenPuttingAnOrderWithMetadata(utils.BarbicanAPIBaseTestCase):
def setUp(self):
# Temporarily mock the queue until we can figure out a better way
# TODO(jvrbanac): Remove dependence on mocks
self.update_order_mock = mock.MagicMock()
repositories.OrderRepo.update_order = self.update_order_mock
super(WhenPuttingAnOrderWithMetadata, self).setUp()
def _create_generic_order_for_put(self):
"""Create a real order to modify and perform PUT actions on
This makes sure that a project exists for our order and that there
is an order within the database. This is a little hacky due to issues
testing certificate order types.
"""
# Create generic order
resp, order_uuid = create_order(
self.app,
order_type='key',
meta=generic_key_meta
)
self.assertEqual(202, resp.status_int)
# Modify the order in the DB to allow actions to be performed
order_model = order_repo.get(order_uuid, self.project_id)
order_model.type = 'certificate'
order_model.status = models.States.PENDING
order_model.meta = {'nope': 'nothing'}
order_model.save()
repositories.commit()
return order_uuid
def test_putting_on_a_order(self):
order_uuid = self._create_generic_order_for_put()
body = {
'type': 'certificate',
'meta': {'nope': 'thing'}
}
resp = self.app.put_json(
'/orders/{0}'.format(order_uuid),
body,
headers={'Content-Type': 'application/json'}
)
self.assertEqual(204, resp.status_int)
self.assertEqual(1, self.update_order_mock.call_count)
@utils.parameterized_dataset({
'bogus_content': ['bogus'],
'bad_order_type': ['{"type": "secret", "meta": {}}'],
})
def test_return_400_on_put_with(self, body):
order_uuid = self._create_generic_order_for_put()
resp = self.app.put(
'/orders/{0}'.format(order_uuid),
body,
headers={'Content-Type': 'application/json'},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
def test_return_400_on_put_when_order_is_active(self):
order_uuid = self._create_generic_order_for_put()
# Put the order in a active state to prevent modification
order_model = order_repo.get(order_uuid, self.project_id)
order_model.status = models.States.ACTIVE
order_model.save()
repositories.commit()
resp = self.app.put_json(
'/orders/{0}'.format(order_uuid),
{'type': 'certificate', 'meta': {}},
headers={'Content-Type': 'application/json'},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
class WhenCreatingOrders(utils.BarbicanAPIBaseTestCase):
def test_should_add_new_order(self):
order_meta = {
@ -328,411 +239,6 @@ class WhenCreatingOrders(utils.BarbicanAPIBaseTestCase):
self.assertEqual(415, resp.status_int)
class WhenCreatingCertificateOrders(utils.BarbicanAPIBaseTestCase):
def setUp(self):
super(WhenCreatingCertificateOrders, self).setUp()
self.certificate_meta = {
'request': 'XXXXXX'
}
# Make sure we have a project
self.project = resources.get_or_create_project(self.project_id)
# Create CA's in the db
self.available_ca_ids = []
for i in range(2):
ca_information = {
'plugin_name': 'plugin_name',
'plugin_ca_id': 'plugin_name ca_id1',
'name': 'plugin name',
'description': 'Master CA for default plugin',
'ca_signing_certificate': 'XXXXX',
'intermediates': 'YYYYY'
}
ca_model = models.CertificateAuthority(ca_information)
ca = ca_repo.create_from(ca_model)
self.available_ca_ids.append(ca.id)
foreign_project = resources.get_or_create_project('foreign_project')
foreign_ca_information = {
'project_id': foreign_project.id,
'plugin_name': 'plugin_name',
'plugin_ca_id': 'plugin_name ca_id1',
'name': 'plugin name',
'description': 'Master CA for default plugin',
'ca_signing_certificate': 'XXXXX',
'intermediates': 'YYYYY'
}
foreign_ca_model = models.CertificateAuthority(foreign_ca_information)
foreign_ca = ca_repo.create_from(foreign_ca_model)
self.foreign_ca_id = foreign_ca.id
repositories.commit()
def test_can_create_new_cert_order(self):
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=self.certificate_meta
)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_uuid, self.project_id)
self.assertIsInstance(order, models.Order)
def test_can_add_new_cert_order_with_ca_id(self):
self.certificate_meta['ca_id'] = self.available_ca_ids[0]
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=self.certificate_meta
)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_uuid, self.project_id)
self.assertIsInstance(order, models.Order)
def test_can_add_new_cert_order_with_ca_id_project_ca_defined(self):
# Create a Project CA and add it
project_ca_model = models.ProjectCertificateAuthority(
self.project.id,
self.available_ca_ids[0]
)
project_ca_repo.create_from(project_ca_model)
repositories.commit()
# Attempt to create an order
self.certificate_meta['ca_id'] = self.available_ca_ids[0]
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=self.certificate_meta
)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_uuid, self.project_id)
self.assertIsInstance(order, models.Order)
def test_create_w_invalid_ca_id_should_fail(self):
self.certificate_meta['ca_id'] = 'bogus_ca_id'
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=self.certificate_meta,
expect_errors=True
)
self.assertEqual(400, create_resp.status_int)
def test_create_should_fail_when_ca_not_in_defined_project_ca_ids(self):
# Create a Project CA and add it
project_ca_model = models.ProjectCertificateAuthority(
self.project.id,
self.available_ca_ids[0]
)
project_ca_repo.create_from(project_ca_model)
repositories.commit()
# Make sure we set the ca_id to an id not defined in the project
self.certificate_meta['ca_id'] = self.available_ca_ids[1]
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=self.certificate_meta,
expect_errors=True
)
self.assertEqual(403, create_resp.status_int)
def test_create_with_wrong_projects_subca_should_fail(self):
self.certificate_meta['ca_id'] = self.foreign_ca_id
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=self.certificate_meta,
expect_errors=True
)
self.assertEqual(403, create_resp.status_int)
self.assertIn("not owned", create_resp.json['description'])
class WhenCreatingStoredKeyOrders(utils.BarbicanAPIBaseTestCase,
test_policy.BaseTestCase):
def setUp(self):
super(WhenCreatingStoredKeyOrders, self).setUp()
# Make sure we have a project
self.project = resources.get_or_create_project(self.project_id)
self.creator_user_id = 'creatorUserId'
def test_can_create_new_stored_key_order(self):
container_name = 'rsa container name'
container_type = 'rsa'
secret_refs = []
resp, container_id = create_container(
self.app,
name=container_name,
container_type=container_type,
secret_refs=secret_refs
)
stored_key_meta = {
'request_type': 'stored-key',
'subject_dn': 'cn=barbican-server,o=example.com',
'container_ref': 'https://localhost/v1/containers/' + container_id
}
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=stored_key_meta
)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_uuid, self.project_id)
self.assertIsInstance(order, models.Order)
def _setup_acl_order_context_and_create_order(
self, add_acls=False, read_project_access=True, order_roles=None,
order_user=None, expect_errors=False):
"""Helper method to setup acls, order context and return created order.
Create order uses actual oslo policy enforcer instead of being None.
Create ACLs for container if 'add_acls' is True.
Make container private when 'read_project_access' is False.
"""
container_name = 'rsa container name'
container_type = 'rsa'
secret_refs = []
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user=self.creator_user_id)
}
_, container_id = create_container(
self.app,
name=container_name,
container_type=container_type,
secret_refs=secret_refs
)
if add_acls:
test_acls.manage_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u3', 'u4'],
read_project_access=read_project_access,
is_update=False)
self.app.extra_environ = {
'barbican.context': self._build_context(
self.project_id, roles=order_roles, user=order_user,
is_admin=False, policy_enforcer=self.policy_enforcer)
}
stored_key_meta = {
'request_type': 'stored-key',
'subject_dn': 'cn=barbican-server,o=example.com',
'container_ref': 'https://localhost/v1/containers/' + container_id
}
return create_order(
self.app,
order_type='certificate',
meta=stored_key_meta,
expect_errors=expect_errors
)
def test_can_create_new_stored_key_order_no_acls_and_policy_check(self):
"""Create stored key order with actual policy enforcement logic.
Order can be created as long as order project and user roles are
allowed in policy. In the test, user requesting order has container
project and has 'creator' role. Order should be created regardless
of what user id is.
"""
create_resp, order_id = self._setup_acl_order_context_and_create_order(
add_acls=False, read_project_access=True, order_roles=['creator'],
order_user='anyUserId', expect_errors=False)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_id, self.project_id)
self.assertIsInstance(order, models.Order)
self.assertEqual('anyUserId', order.creator_id)
def test_should_fail_for_user_observer_role_no_acls_and_policy_check(self):
"""Should not allow create order when user doesn't have necessary role.
Order can be created as long as order project and user roles are
allowed in policy. In the test, user requesting order has container
project but has 'observer' role. Create order should fail as expected
role is 'admin' or 'creator'.
"""
create_resp, _ = self._setup_acl_order_context_and_create_order(
add_acls=False, read_project_access=True, order_roles=['observer'],
order_user='anyUserId', expect_errors=True)
self.assertEqual(403, create_resp.status_int)
def test_can_create_order_with_private_container_and_creator_user(self):
"""Create order using private container with creator user.
Container has been marked private via ACLs. Still creator of container
should be able to create stored key order using that container
successfully.
"""
create_resp, order_id = self._setup_acl_order_context_and_create_order(
add_acls=True, read_project_access=False, order_roles=['creator'],
order_user=self.creator_user_id, expect_errors=False)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_id, self.project_id)
self.assertIsInstance(order, models.Order)
self.assertEqual(self.creator_user_id, order.creator_id)
def test_can_create_order_with_private_container_and_acl_user(self):
"""Create order using private container with acl user.
Container has been marked private via ACLs. So *generally* project user
should not be able to create stored key order using that container.
But here it can create order as that user is defined in read ACL user
list. Here project user means user which has 'creator' role in the
container project. Order project is same as container.
"""
create_resp, order_id = self._setup_acl_order_context_and_create_order(
add_acls=True, read_project_access=False, order_roles=['creator'],
order_user='u3', expect_errors=False)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_id, self.project_id)
self.assertIsInstance(order, models.Order)
self.assertEqual('u3', order.creator_id)
def test_should_raise_with_private_container_and_project_user(self):
"""Create order should fail using private container for project user.
Container has been marked private via ACLs. So project user should not
be able to create stored key order using that container. Here project
user means user which has 'creator' role in the container project.
Order project is same as container. If container was not marked
private, this user would have been able to create order. See next test.
"""
create_resp, _ = self._setup_acl_order_context_and_create_order(
add_acls=True, read_project_access=False, order_roles=['creator'],
order_user='anyProjectUser', expect_errors=True)
self.assertEqual(403, create_resp.status_int)
def test_can_create_order_with_non_private_acls_and_project_user(self):
"""Create order using non-private container with project user.
Container has not been marked private via ACLs. So project user should
be able to create stored key order using that container successfully.
Here project user means user which has 'creator' role in the container
project. Order project is same as container.
"""
create_resp, order_id = self._setup_acl_order_context_and_create_order(
add_acls=True, read_project_access=True, order_roles=['creator'],
order_user='anyProjectUser', expect_errors=False)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_id, self.project_id)
self.assertIsInstance(order, models.Order)
self.assertEqual('anyProjectUser', order.creator_id)
def test_can_create_order_with_non_private_acls_and_creator_user(self):
"""Create order using non-private container with creator user.
Container has not been marked private via ACLs. So user who created
container should be able to create stored key order using that
container successfully. Order project is same as container.
"""
create_resp, order_id = self._setup_acl_order_context_and_create_order(
add_acls=True, read_project_access=True, order_roles=['creator'],
order_user=self.creator_user_id, expect_errors=False)
self.assertEqual(202, create_resp.status_int)
order = order_repo.get(order_id, self.project_id)
self.assertIsInstance(order, models.Order)
self.assertEqual(self.creator_user_id, order.creator_id)
def test_should_raise_with_bad_container_ref(self):
stored_key_meta = {
'request_type': 'stored-key',
'subject_dn': 'cn=barbican-server,o=example.com',
'container_ref': 'bad_ref'
}
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=stored_key_meta,
expect_errors=True
)
self.assertEqual(400, create_resp.status_int)
def test_should_raise_with_container_not_found(self):
stored_key_meta = {
'request_type': 'stored-key',
'subject_dn': 'cn=barbican-server,o=example.com',
'container_ref': 'https://localhost/v1/containers/not_found'
}
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=stored_key_meta,
expect_errors=True
)
self.assertEqual(400, create_resp.status_int)
def test_should_raise_with_container_wrong_type(self):
container_name = 'generic container name'
container_type = 'generic'
secret_refs = []
resp, container_id = create_container(
self.app,
name=container_name,
container_type=container_type,
secret_refs=secret_refs
)
stored_key_meta = {
'request_type': 'stored-key',
'subject_dn': 'cn=barbican-server,o=example.com',
'container_ref': 'https://localhost/v1/containers/' + container_id
}
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=stored_key_meta,
expect_errors=True
)
self.assertEqual(400, create_resp.status_int)
def test_should_raise_with_container_no_access(self):
stored_key_meta = {
'request_type': 'stored-key',
'subject_dn': 'cn=barbican-server,o=example.com',
'container_ref': 'https://localhost/v1/containers/no_access'
}
create_resp, order_uuid = create_order(
self.app,
order_type='certificate',
meta=stored_key_meta,
expect_errors=True
)
self.assertEqual(400, create_resp.status_int)
class WhenPerformingUnallowedOperations(utils.BarbicanAPIBaseTestCase):
def test_should_not_allow_put_orders(self):
resp = self.app.put_json('/orders/', expect_errors=True)

View File

@ -52,29 +52,6 @@ class WhenUsingAsyncTaskClient(utils.BaseTestCase):
project_id=self.external_project_id,
request_id=self.request_id)
def test_should_update_order(self):
updated_meta = {}
self.client.update_order(order_id=self.order_id,
project_id=self.external_project_id,
updated_meta=updated_meta,
request_id=self.request_id)
self.mock_client.cast.assert_called_with(
{}, 'update_order', order_id=self.order_id,
project_id=self.external_project_id,
updated_meta=updated_meta,
request_id=self.request_id)
def test_should_check_certificate_order(self):
self.client.check_certificate_status(
order_id=self.order_id,
project_id=self.external_project_id,
request_id=self.request_id)
self.mock_client.cast.assert_called_with(
{}, 'check_certificate_status',
order_id=self.order_id,
project_id=self.external_project_id,
request_id=self.request_id)
class WhenCreatingDirectTaskClient(utils.BaseTestCase):
"""Test using the synchronous task client (i.e. standalone mode)."""

View File

@ -303,26 +303,6 @@ class WhenCallingTasksMethod(utils.BaseTestCase):
mock.ANY, 'result', None, 'order1234',
'keystone1234', 'request1234')
@mock.patch('barbican.queue.server.schedule_order_retry_tasks')
@mock.patch('barbican.tasks.resources.UpdateOrder')
def test_should_process_update_order(
self, mock_update_order, mock_schedule):
method = mock_update_order.return_value.process_and_suppress_exceptions
method.return_value = 'result'
updated_meta = {'foo': 1}
self.tasks.update_order(
None, self.order_id, self.external_project_id,
updated_meta, self.request_id)
mock_process = mock_update_order.return_value
mock_process.process_and_suppress_exceptions.assert_called_with(
self.order_id, self.external_project_id, updated_meta
)
mock_schedule.assert_called_with(
mock.ANY, 'result', None,
'order1234', 'keystone1234', updated_meta, 'request1234')
@mock.patch('barbican.queue.server.schedule_order_retry_tasks')
@mock.patch('barbican.tasks.resources.CheckCertificateStatusOrder')
def test_should_check_certificate_order(
@ -438,26 +418,3 @@ class WhenUsingTaskServer(database_utils.RepositoryTestCase):
self.assertEqual(
six.u('500'),
order_result.error_status_code)
def test_process_bogus_update_type_order_should_not_rollback(self):
order_id = self.order.id
self.order.type = 'bogus-type' # Force error out of business logic.
# Invoke process, including the transactional decorator that terminates
# the session when it is done. Hence we must re-retrieve the order for
# verification afterwards.
self.server.update_order(
None, self.order.id, self.external_id, None, self.request_id)
order_repo = repositories.get_order_repository()
order_result = order_repo.get(order_id, self.external_id)
self.assertEqual(models.States.ERROR, order_result.status)
self.assertEqual(
six.u(
'Update Order failure seen - '
'please contact site administrator.'),
order_result.error_reason)
self.assertEqual(
six.u('500'),
order_result.error_status_code)

View File

@ -1,183 +0,0 @@
"""
Copyright 2015 Red Hat, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v1.behaviors import base_behaviors
from functionaltests.api.v1.models import ca_models
class CABehaviors(base_behaviors.BaseBehaviors):
def get_ca(self, ca_ref, extra_headers=None,
use_auth=True, user_name=None):
"""Handles getting a CA
:param ca_ref: href for a CA
:param extra_headers: extra HTTP headers for the GET request
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used for request
:return: a request Response object
"""
return self.client.get(ca_ref,
response_model_type=ca_models.CAModel,
extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
def get_cacert(self, ca_ref, payload_content_encoding=None,
extra_headers=None,
use_auth=True, user_name=None):
"""Retrieve the CA signing certificate. """
headers = {'Accept': 'application/octet-stream',
'Accept-Encoding': payload_content_encoding}
if extra_headers:
headers.update(extra_headers)
return self.client.get(ca_ref + '/cacert',
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
def get_cas(self, limit=10, offset=0, user_name=None):
"""Handles getting a list of CAs.
:param limit: limits number of returned CAs
:param offset: represents how many records to skip before retrieving
the list
:return: the response, a list of cas, total number of cas, next and
prev references
"""
resp = self.client.get('cas', user_name=user_name,
params={'limit': limit, 'offset': offset})
# TODO(alee) refactor to use he client's get_list_of_models()
resp_json = self.get_json(resp)
cas, total, next_ref, prev_ref = [], 0, None, None
for item in resp_json:
if 'next' == item:
next_ref = resp_json.get('next')
elif 'previous' == item:
prev_ref = resp_json.get('previous')
elif 'cas' == item:
cas = resp_json.get('cas')
elif 'total' == item:
total = resp_json.get('total')
return resp, cas, total, next_ref, prev_ref
def create_ca(self, model, headers=None, use_auth=True,
user_name=None, admin=None):
"""Create a subordinate CA from the data in the model.
:param model: The metadata used to create the subCA
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used to create the subCA
:param admin: The user with permissions to delete the subCA
:return: A tuple containing the response from the create
and the href to the newly created subCA
"""
resp = self.client.post('cas', request_model=model,
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
# handle expected JSON parsing errors for unauthenticated requests
if resp.status_code == 401 and not use_auth:
return resp, None
returned_data = self.get_json(resp)
ca_ref = returned_data.get('ca_ref')
if ca_ref:
if admin is None:
admin = user_name
self.created_entities.append((ca_ref, admin))
return resp, ca_ref
def delete_ca(self, ca_ref, extra_headers=None,
expected_fail=False, use_auth=True, user_name=None):
"""Delete a secret.
:param ca_ref: HATEOAS ref of the secret to be deleted
:param extra_headers: Optional HTTP headers to add to the request
:param expected_fail: If test is expected to fail the deletion
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used to delete the entity
:return: A request response object
"""
resp = self.client.delete(ca_ref, extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
if not expected_fail:
for item in self.created_entities:
if item[0] == ca_ref:
self.created_entities.remove(item)
return resp
def delete_all_created_cas(self):
"""Delete all of the cas that we have created."""
entities = list(self.created_entities)
for (ca_ref, admin) in entities:
self.delete_ca(ca_ref, user_name=admin)
def add_ca_to_project(self, ca_ref, headers=None, use_auth=True,
user_name=None):
resp = self.client.post(ca_ref + '/add-to-project',
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
return resp
def remove_ca_from_project(self, ca_ref, headers=None, use_auth=True,
user_name=None):
resp = self.client.post(ca_ref + '/remove-from-project',
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
return resp
def set_preferred(self, ca_ref, headers=None, use_auth=True,
user_name=None):
resp = self.client.post(ca_ref + '/set-preferred',
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
return resp
def get_preferred(self, extra_headers=None, use_auth=True,
user_name=None):
resp = self.client.get('cas/preferred',
response_model_type=ca_models.CAModel,
extra_headers=extra_headers, use_auth=use_auth,
user_name=user_name)
return resp
def set_global_preferred(self, ca_ref, headers=None,
use_auth=True, user_name=None):
resp = self.client.post(ca_ref + '/set-global-preferred',
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
return resp
def unset_global_preferred(self, headers=None,
use_auth=True, user_name=None):
resp = self.client.post('cas/unset-global-preferred',
extra_headers=headers,
use_auth=use_auth, user_name=user_name)
return resp
def get_global_preferred(self, extra_headers=None,
use_auth=True, user_name=None):
resp = self.client.get('cas/global-preferred',
response_model_type=ca_models.CAModel,
extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
return resp

View File

@ -1,717 +0,0 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import copy
import datetime
import re
import testtools
import time
from OpenSSL import crypto
from barbican.common import hrefs
from barbican.plugin.interface import certificate_manager as cert_interface
from barbican.tests import certificate_utils as certutil
from functionaltests.api import base
from functionaltests.api.v1.behaviors import ca_behaviors
from functionaltests.api.v1.behaviors import container_behaviors
from functionaltests.api.v1.behaviors import order_behaviors
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import ca_models
from functionaltests.api.v1.models import order_models
from functionaltests.common import config
CONF = config.get_config()
dogtag_subcas_enabled = False
admin_a = CONF.rbac_users.admin_a
admin_b = CONF.rbac_users.admin_b
creator_a = CONF.rbac_users.creator_a
service_admin = CONF.identity.service_admin
order_simple_cmc_request_data = {
'type': 'certificate',
'meta': {
'request_type': 'simple-cmc',
'requestor_name': 'Barbican User',
'requestor_email': 'user@example.com',
'requestor_phone': '555-1212'
}
}
BARBICAN_SRV_CONF = cert_interface.CONF
def is_plugin_enabled(plugin):
return plugin in BARBICAN_SRV_CONF.certificate.enabled_certificate_plugins
def depends_on_ca_plugins(*plugins):
def depends_on_ca_plugins_decorator(function):
def wrapper(instance, *args, **kwargs):
plugins_enabled = (is_plugin_enabled(p) for p in plugins)
if not all(plugins_enabled):
instance.skipTest("The following plugin(s) need to be "
"enabled: {}".format(plugins))
function(instance, *args, **kwargs)
return wrapper
return depends_on_ca_plugins_decorator
def convert_to_X509Name(dn):
target = crypto.X509().get_subject()
fields = dn.split(',')
for field in fields:
m = re.search(r"(\w+)\s*=\s*(.+)", field.strip())
name = m.group(1)
value = m.group(2)
if name.lower() == 'ou':
target.OU = value
elif name.lower() == 'st':
target.ST = value
elif name.lower() == 'cn':
target.CN = value
elif name.lower() == 'l':
target.L = value
elif name.lower() == 'o':
target.O = value
return target
class CATestCommon(base.TestCase):
def setUp(self):
super(CATestCommon, self).setUp()
self.order_behaviors = order_behaviors.OrderBehaviors(self.client)
self.ca_behaviors = ca_behaviors.CABehaviors(self.client)
self.container_behaviors = container_behaviors.ContainerBehaviors(
self.client)
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
self.simple_cmc_data = copy.deepcopy(order_simple_cmc_request_data)
def tearDown(self):
self.order_behaviors.delete_all_created_orders()
self.ca_behaviors.delete_all_created_cas()
self.container_behaviors.delete_all_created_containers()
self.secret_behaviors.delete_all_created_secrets()
super(CATestCommon, self).tearDown()
def send_test_order(self, ca_ref=None, user_name=None,
expected_return=202):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
if ca_ref is not None:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
test_model.meta['ca_id'] = ca_id
create_resp, order_ref = self.order_behaviors.create_order(
test_model, user_name=user_name)
self.assertEqual(expected_return, create_resp.status_code)
if expected_return == 202:
self.assertIsNotNone(order_ref)
return order_ref
def wait_for_order(self, order_resp, order_ref):
# Make sure we have an active order
time_count = 1
while order_resp.model.status != "ACTIVE" and time_count <= 4:
time.sleep(1)
time_count += 1
order_resp = self.behaviors.get_order(order_ref)
def get_root_ca_ref(self, ca_plugin_name, ca_plugin_id):
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas(
limit=100)
for item in cas:
ca = self.ca_behaviors.get_ca(item)
if ca.model.plugin_name == ca_plugin_name:
if ca.model.plugin_ca_id == ca_plugin_id:
return item
return None
def get_snakeoil_root_ca_ref(self):
return self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
def get_dogtag_root_ca_ref(self):
return self.get_root_ca_ref(
ca_plugin_name='barbican.plugin.dogtag.DogtagCAPlugin',
ca_plugin_id="Dogtag CA")
class CertificateAuthoritiesTestCase(CATestCommon):
def setUp(self):
super(CertificateAuthoritiesTestCase, self).setUp()
self.subca_name = "Subordinate CA"
self.subca_description = "Test Snake Oil Subordinate CA"
self.subca_subca_name = "Sub-Sub CA"
self.subca_subca_description = "Test Snake Oil Sub-Sub CA"
def get_signing_cert(self, ca_ref):
resp = self.ca_behaviors.get_cacert(ca_ref)
return crypto.load_certificate(crypto.FILETYPE_PEM, resp.text)
def verify_signing_cert(self, ca_ref, subject_dn, issuer_dn):
cacert = self.get_signing_cert(ca_ref)
return ((cacert.get_subject() == subject_dn) and
(cacert.get_issuer() == issuer_dn))
def get_subca_model(self, root_ref):
now = datetime.datetime.utcnow().isoformat()
subject = "CN=Subordinate CA " + now + ", O=example.com"
return ca_models.CAModel(
parent_ca_ref=root_ref,
description=self.subca_description,
name=self.subca_name,
subject_dn=subject
)
def get_sub_subca_model(self, parent_ca_ref):
now = datetime.datetime.utcnow().isoformat()
subject = "CN=sub sub CA " + now + ", O=example.com"
return ca_models.CAModel(
parent_ca_ref=parent_ca_ref,
description=self.subca_subca_description,
name=self.subca_subca_name,
subject_dn=subject
)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca(self):
self._create_and_verify_subca(self.get_snakeoil_root_ca_ref())
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_create_dogtag_subca(self):
self._create_and_verify_subca(self.get_dogtag_root_ca_ref())
def _create_and_verify_subca(self, root_ca_ref):
ca_model = self.get_subca_model(root_ca_ref)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
root_subject = self.get_signing_cert(root_ca_ref).get_subject()
self.verify_signing_cert(
ca_ref=ca_ref,
subject_dn=convert_to_X509Name(ca_model.subject_dn),
issuer_dn=root_subject)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref)
self.assertEqual(204, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_subca_of_snakeoil_subca(self):
self._create_subca_of_subca(self.get_snakeoil_root_ca_ref())
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_create_subca_of_dogtag_subca(self):
self._create_subca_of_subca(self.get_dogtag_root_ca_ref())
def _create_subca_of_subca(self, root_ca_ref):
parent_model = self.get_subca_model(root_ca_ref)
resp, parent_ref = self.ca_behaviors.create_ca(parent_model)
self.assertEqual(201, resp.status_code)
child_model = self.get_sub_subca_model(parent_ref)
resp, child_ref = self.ca_behaviors.create_ca(child_model)
self.assertEqual(201, resp.status_code)
parent_subject = self.get_signing_cert(parent_ref).get_subject()
self.verify_signing_cert(
ca_ref=child_ref,
subject_dn=convert_to_X509Name(child_model.subject_dn),
issuer_dn=parent_subject)
resp = self.ca_behaviors.delete_ca(ca_ref=child_ref)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.delete_ca(ca_ref=parent_ref)
self.assertEqual(204, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_fail_to_create_subca_of_snakeoil_not_owned_subca(self):
self._fail_to_create_subca_of_not_owned_subca(
self.get_snakeoil_root_ca_ref())
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_fail_to_create_subca_of_dogtag_not_owned_subca(self):
self._fail_to_create_subca_of_not_owned_subca(
self.get_dogtag_root_ca_ref())
def _fail_to_create_subca_of_not_owned_subca(self, root_ca_ref):
parent_model = self.get_subca_model(root_ca_ref)
resp, parent_ref = self.ca_behaviors.create_ca(parent_model)
self.assertEqual(201, resp.status_code)
child_model = self.get_sub_subca_model(parent_ref)
resp, child_ref = self.ca_behaviors.create_ca(child_model,
user_name=admin_a)
self.assertEqual(403, resp.status_code)
resp = self.ca_behaviors.delete_ca(ca_ref=parent_ref)
self.assertEqual(204, resp.status_code)
def test_create_subca_with_invalid_parent_ca_id(self):
ca_model = self.get_subca_model(
'http://localhost:9311/cas/invalid_ref'
)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(400, resp.status_code)
def test_create_subca_with_missing_parent_ca_id(self):
ca_model = self.get_subca_model(
'http://localhost:9311/cas/missing_ref'
)
del ca_model.parent_ca_ref
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(400, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca_with_missing_subjectdn(self):
self._create_subca_with_missing_subjectdn(
self.get_snakeoil_root_ca_ref())
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_create_dogtag_subca_with_missing_subjectdn(self):
self._create_subca_with_missing_subjectdn(
self.get_dogtag_root_ca_ref())
def _create_subca_with_missing_subjectdn(self, root_ca_ref):
ca_model = self.get_subca_model(root_ca_ref)
del ca_model.subject_dn
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(400, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca_and_send_cert_order(self):
self._create_subca_and_send_cert_order(
self.get_snakeoil_root_ca_ref())
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_create_dogtag_subca_and_send_cert_order(self):
self._create_subca_and_send_cert_order(
self.get_dogtag_root_ca_ref())
def _create_subca_and_send_cert_order(self, root_ca):
ca_model = self.get_subca_model(root_ca)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
self.send_test_order(ca_ref)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref)
self.assertEqual(204, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_add_snakeoil_ca__to_project_and_get_preferred(self):
self._add_ca__to_project_and_get_preferred(
self.get_snakeoil_root_ca_ref()
)
@depends_on_ca_plugins('dogtag')
def test_add_dogtag_ca__to_project_and_get_preferred(self):
self._add_ca__to_project_and_get_preferred(
self.get_dogtag_root_ca_ref()
)
def _add_ca__to_project_and_get_preferred(self, ca_ref):
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(200, resp.status_code)
ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref)
self.assertEqual(hrefs.get_ca_id_from_ref(ca_ref), ca_id)
resp = self.ca_behaviors.remove_ca_from_project(
ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(404, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_try_and_fail_to_add_to_proj_snakeoil_subca_that_is_not_mine(self):
self._try_and_fail_to_add_to_proj_subca_that_is_not_mine(
self.get_snakeoil_root_ca_ref()
)
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_try_and_fail_to_add_to_proj_dogtag_subca_that_is_not_mine(self):
self._try_and_fail_to_add_to_proj_subca_that_is_not_mine(
self.get_dogtag_root_ca_ref()
)
def _try_and_fail_to_add_to_proj_subca_that_is_not_mine(self, root_ca_ref):
ca_model = self.get_subca_model(root_ca_ref)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a)
self.assertEqual(201, resp.status_code)
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_b)
self.assertEqual(403, resp.status_code)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_and_delete_snakeoil_subca(self):
self._create_and_delete_subca(
self.get_snakeoil_root_ca_ref()
)
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_create_and_delete_dogtag_subca(self):
self._create_and_delete_subca(
self.get_dogtag_root_ca_ref()
)
def _create_and_delete_subca(self, root_ca_ref):
ca_model = self.get_subca_model(root_ca_ref)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
self.ca_behaviors.delete_ca(ca_ref)
resp = self.ca_behaviors.get_ca(ca_ref)
self.assertEqual(404, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_and_delete_snakeoil_subca_and_artifacts(self):
ca_model = self.get_subca_model(self.get_snakeoil_root_ca_ref())
resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a)
self.assertEqual(201, resp.status_code)
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(200, resp.status_code)
self.ca_behaviors.delete_ca(ca_ref, user_name=admin_a)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(404, resp.status_code)
resp = self.ca_behaviors.get_ca(ca_ref, user_name=admin_a)
self.assertEqual(404, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_fail_to_delete_top_level_snakeoil_ca(self):
self._fail_to_delete_top_level_ca(
self.get_snakeoil_root_ca_ref()
)
@depends_on_ca_plugins('dogtag')
def test_fail_to_delete_top_level_dogtag_ca(self):
self._fail_to_delete_top_level_ca(
self.get_dogtag_root_ca_ref()
)
def _fail_to_delete_top_level_ca(self, root_ca_ref):
resp = self.ca_behaviors.delete_ca(
root_ca_ref,
expected_fail=True)
self.assertEqual(403, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca_and_get_cacert(self):
self._create_subca_and_get_cacert(
self.get_snakeoil_root_ca_ref()
)
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_create_dogtag_subca_and_get_cacert(self):
self._create_subca_and_get_cacert(
self.get_dogtag_root_ca_ref()
)
def _create_subca_and_get_cacert(self, root_ca_ref):
ca_model = self.get_subca_model(root_ca_ref)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a)
self.assertEqual(201, resp.status_code)
resp = self.ca_behaviors.get_cacert(ca_ref, user_name=admin_a)
self.assertEqual(200, resp.status_code)
crypto.load_certificate(crypto.FILETYPE_PEM, resp.text)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_try_and_fail_to_use_snakeoil_subca_that_is_not_mine(self):
self._try_and_fail_to_use_subca_that_is_not_mine(
self.get_snakeoil_root_ca_ref()
)
@testtools.skipIf(not dogtag_subcas_enabled,
"dogtag subcas are deprecated")
@depends_on_ca_plugins('dogtag')
def test_try_and_fail_to_use_dogtag_subca_that_is_not_mine(self):
self._try_and_fail_to_use_subca_that_is_not_mine(
self.get_dogtag_root_ca_ref()
)
def _try_and_fail_to_use_subca_that_is_not_mine(self, root_ca_ref):
ca_model = self.get_subca_model(root_ca_ref)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a)
self.assertEqual(201, resp.status_code)
self.send_test_order(ca_ref=ca_ref, user_name=admin_a)
self.send_test_order(ca_ref=ca_ref, user_name=admin_b,
expected_return=403)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca_and_send_cert_order_and_verify_cert(self):
ca_model = self.get_subca_model(self.get_snakeoil_root_ca_ref())
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
order_ref = self.send_test_order(ca_ref)
order_resp = self.order_behaviors.get_order(order_ref=order_ref)
self.assertEqual(200, order_resp.status_code)
self.wait_for_order(order_resp=order_resp, order_ref=order_ref)
container_resp = self.container_behaviors.get_container(
order_resp.model.container_ref)
self.assertEqual(200, container_resp.status_code)
secret_dict = {}
for secret in container_resp.model.secret_refs:
self.assertIsNotNone(secret.secret_ref)
secret_resp = self.secret_behaviors.get_secret(
secret.secret_ref, "application/octet-stream")
self.assertIsNotNone(secret_resp)
secret_dict[secret.name] = secret_resp.content
certificate = secret_dict['certificate']
new_cert = crypto.load_certificate(crypto.FILETYPE_PEM, certificate)
signing_cert = self.get_signing_cert(ca_ref)
issuer = new_cert.get_issuer()
expected_issuer = signing_cert.get_subject()
self.assertEqual(expected_issuer, issuer)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref)
self.assertEqual(204, resp.status_code)
class ListingCAsTestCase(CATestCommon):
"""Tests for listing CAs.
Must be in a separate class so that we can deselect them
in the parallel CA tests, until we can deselect specific tests
using a decorator.
"""
def test_list_and_get_cas(self):
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
self.assertGreater(total, 0)
for item in cas:
ca = self.ca_behaviors.get_ca(item)
self.assertIsNotNone(ca.model.plugin_name)
self.assertIsNotNone(ca.model.ca_id)
self.assertIsNotNone(ca.model.plugin_ca_id)
@depends_on_ca_plugins('snakeoil_ca', 'simple_certificate')
def test_list_snakeoil_and_simple_cert_cas(self):
"""Test if backend loads these specific CAs
Since the standard gate works with the snakeoil CA and the
simple_certificate CA. This test is just to make sure that these two
are specifically loaded.
"""
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
self.assertEqual(2, total)
@depends_on_ca_plugins('dogtag')
def test_list_dogtag_cas(self):
"""Test if backend loads this specific CA"""
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
self.assertGreater(total, 0)
class ProjectCATestCase(CATestCommon):
@depends_on_ca_plugins('snakeoil_ca', 'simple_certificate')
def test_addition_of_project_ca_affects_getting_ca_list(self):
# Getting list of CAs should get the total configured CAs
(resp, cas, initial_total, _, __) = self.ca_behaviors.get_cas()
self.assertEqual(2, initial_total)
# Set project CA
ca_ref = self.get_snakeoil_root_ca_ref()
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
# Getting list of CAs should get only the project CA for all users
(resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas(
user_name=admin_a)
self.assertEqual(1, project_ca_total)
# Getting list of CAs should get only the project CA for all users
(resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas(
user_name=creator_a)
self.assertEqual(1, project_ca_total)
# Remove project CA
resp = self.ca_behaviors.remove_ca_from_project(ca_ref,
user_name=admin_a)
self.assertEqual(204, resp.status_code)
# Getting list of CAs should get the total configured CAs (as seen
# before)
(resp, cas, final_total, _, __) = self.ca_behaviors.get_cas()
self.assertEqual(initial_total, final_total)
class GlobalPreferredCATestCase(CATestCommon):
def setUp(self):
super(GlobalPreferredCATestCase, self).setUp()
(_, self.cas, self.num_cas, _, _) = self.ca_behaviors.get_cas()
self.ca_ids = [hrefs.get_ca_id_from_ref(ref) for ref in self.cas]
def tearDown(self):
super(CATestCommon, self).tearDown()
def test_global_preferred_no_project_admin_access(self):
resp = self.ca_behaviors.get_global_preferred()
self.assertEqual(403, resp.status_code)
resp = self.ca_behaviors.set_global_preferred(ca_ref=self.cas[0])
self.assertEqual(403, resp.status_code)
resp = self.ca_behaviors.unset_global_preferred()
self.assertEqual(403, resp.status_code)
def test_global_preferred_update(self):
if self.num_cas < 2:
self.skipTest("At least two CAs are required for this test")
resp = self.ca_behaviors.set_global_preferred(
ca_ref=self.cas[0], user_name=service_admin)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_global_preferred(user_name=service_admin)
self.assertEqual(200, resp.status_code)
ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref)
self.assertEqual(self.ca_ids[0], ca_id)
resp = self.ca_behaviors.set_global_preferred(
ca_ref=self.cas[1], user_name=service_admin)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_global_preferred(user_name=service_admin)
self.assertEqual(200, resp.status_code)
ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref)
self.assertEqual(self.ca_ids[1], ca_id)
resp = self.ca_behaviors.unset_global_preferred(
user_name=service_admin)
self.assertEqual(204, resp.status_code)
def test_global_preferred_set_and_unset(self):
resp = self.ca_behaviors.set_global_preferred(
ca_ref=self.cas[0], user_name=service_admin)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_global_preferred(user_name=service_admin)
self.assertEqual(200, resp.status_code)
ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref)
self.assertEqual(self.ca_ids[0], ca_id)
resp = self.ca_behaviors.unset_global_preferred(
user_name=service_admin)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_global_preferred(user_name=service_admin)
self.assertEqual(404, resp.status_code)
def test_global_preferred_affects_project_preferred(self):
if self.num_cas < 2:
self.skipTest("At least two CAs are required for this test")
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(404, resp.status_code)
resp = self.ca_behaviors.set_global_preferred(
ca_ref=self.cas[1], user_name=service_admin)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(200, resp.status_code)
ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref)
self.assertEqual(self.ca_ids[1], ca_id)
resp = self.ca_behaviors.unset_global_preferred(
user_name=service_admin)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(404, resp.status_code)
def test_project_preferred_overrides_global_preferred(self):
if self.num_cas < 2:
self.skipTest("At least two CAs are required for this test")
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(404, resp.status_code)
resp = self.ca_behaviors.set_global_preferred(
ca_ref=self.cas[1], user_name=service_admin)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(200, resp.status_code)
ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref)
self.assertEqual(self.ca_ids[1], ca_id)
resp = self.ca_behaviors.add_ca_to_project(
ca_ref=self.cas[0], user_name=admin_a)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(200, resp.status_code)
ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref)
self.assertEqual(self.ca_ids[0], ca_id)
resp = self.ca_behaviors.remove_ca_from_project(
ca_ref=self.cas[0], user_name=admin_a)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref)
self.assertEqual(self.ca_ids[1], ca_id)
resp = self.ca_behaviors.unset_global_preferred(
user_name=service_admin)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(404, resp.status_code)

View File

@ -1,767 +0,0 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import copy
from oslo_serialization import jsonutils
import time
from OpenSSL import crypto
import testtools
from barbican.plugin.interface import secret_store as s
from barbican.tasks import certificate_resources as cert_res
from barbican.tests import certificate_utils as certutil
from barbican.tests import keys
from functionaltests.api import base
from functionaltests.api.v1.behaviors import ca_behaviors
from functionaltests.api.v1.behaviors import container_behaviors
from functionaltests.api.v1.behaviors import order_behaviors
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import container_models
from functionaltests.api.v1.models import order_models
from functionaltests.api.v1.models import secret_models
try:
import pki # flake8: noqa
dogtag_imports_ok = True
except ImportError:
# dogtag libraries not available, assume dogtag not installed
dogtag_imports_ok = False
NOT_FOUND_CONTAINER_REF = "http://localhost:9311/v1/containers/not_found"
INVALID_CONTAINER_REF = "invalid"
order_simple_cmc_request_data = {
'type': 'certificate',
'meta': {
'request_type': 'simple-cmc',
'requestor_name': 'Barbican User',
'requestor_email': 'user@example.com',
'requestor_phone': '555-1212'
}
}
order_full_cmc_request_data = {
'type': 'certificate',
'meta': {
'request_type': 'full-cmc',
'requestor_name': 'Barbican User',
'requestor_email': 'user@example.com',
'requestor_phone': '555-1212'
}
}
order_stored_key_request_data = {
'type': 'certificate',
'meta': {
'request_type': 'stored-key',
'subject_dn': 'cn=server.example.com,o=example.com',
'requestor_name': 'Barbican User',
'requestor_email': 'user@example.com',
'requestor_phone': '555-1212'
}
}
order_dogtag_custom_request_data = {
'type': 'certificate',
'meta': {
'request_type': 'custom',
'cert_request_type': 'pkcs10',
'profile_id': 'caServerCert'
}
}
create_container_rsa_data = {
"name": "rsacontainer",
"type": "rsa",
"secret_refs": [
{
"name": "public_key",
},
{
"name": "private_key",
},
{
"name": "private_key_passphrase"
}
]
}
def get_private_key_req():
return {'name': 'myprivatekey',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64',
'algorithm': 'rsa',
'bit_length': 2048,
'secret_type': s.SecretType.PRIVATE,
'payload': base64.b64encode(keys.get_private_key_pem())}
def get_public_key_req():
return {'name': 'mypublickey',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64',
'algorithm': 'rsa',
'bit_length': 2048,
'secret_type': s.SecretType.PUBLIC,
'payload': base64.b64encode(keys.get_public_key_pem())}
create_generic_container_data = {
"name": "containername",
"type": "generic",
"secret_refs": [
{
"name": "secret1",
},
{
"name": "secret2",
},
{
"name": "secret3"
}
]
}
class CertificatesTestCase(base.TestCase):
def setUp(self):
super(CertificatesTestCase, self).setUp()
self.behaviors = order_behaviors.OrderBehaviors(self.client)
self.ca_behaviors = ca_behaviors.CABehaviors(self.client)
self.container_behaviors = container_behaviors.ContainerBehaviors(
self.client)
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
self.simple_cmc_data = copy.deepcopy(order_simple_cmc_request_data)
self.full_cmc_data = copy.deepcopy(order_full_cmc_request_data)
self.stored_key_data = copy.deepcopy(order_stored_key_request_data)
self.dogtag_custom_data = copy.deepcopy(
order_dogtag_custom_request_data)
def tearDown(self):
self.behaviors.delete_all_created_orders()
self.ca_behaviors.delete_all_created_cas()
self.container_behaviors.delete_all_created_containers()
self.secret_behaviors.delete_all_created_secrets()
super(CertificatesTestCase, self).tearDown()
def wait_for_order(
self, order_ref, delay_before_check_seconds=1, max_wait_seconds=4):
time.sleep(delay_before_check_seconds)
# Make sure we have an order in a terminal state
time_count = 1
order_resp = self.behaviors.get_order(order_ref)
while ((order_resp.model.status != "ACTIVE") and
(order_resp.model.status != "ERROR") and
time_count <= max_wait_seconds):
time.sleep(1)
time_count += 1
order_resp = self.behaviors.get_order(order_ref)
return order_resp
def create_asymmetric_key_container(self):
secret_model = secret_models.SecretModel(**get_private_key_req())
secret_model.secret_type = s.SecretType.PRIVATE
resp, secret_ref_priv = self.secret_behaviors.create_secret(
secret_model)
self.assertEqual(201, resp.status_code)
secret_model = secret_models.SecretModel(**get_public_key_req())
secret_model.secret_type = s.SecretType.PUBLIC
resp, secret_ref_pub = self.secret_behaviors.create_secret(
secret_model)
self.assertEqual(201, resp.status_code)
pub_key_ref = {'name': 'public_key', 'secret_ref': secret_ref_pub}
priv_key_ref = {'name': 'private_key', 'secret_ref': secret_ref_priv}
test_model = container_models.ContainerModel(
**create_container_rsa_data)
test_model.secret_refs = [pub_key_ref, priv_key_ref]
resp, container_ref = self.container_behaviors.create_container(
test_model)
self.assertEqual(201, resp.status_code)
return container_ref
def create_generic_container(self):
secret_model = secret_models.SecretModel(**get_private_key_req())
secret_model.secret_type = s.SecretType.PRIVATE
resp, secret_ref = self.secret_behaviors.create_secret(secret_model)
self.assertEqual(201, resp.status_code)
test_model = container_models.ContainerModel(**create_generic_container_data)
test_model.secret_refs = [{
'name': 'my_secret',
'secret_ref': secret_ref
}]
resp, container_ref = self.container_behaviors.create_container(test_model)
self.assertEqual(201, resp.status_code)
return container_ref
def get_dogtag_ca_id(self):
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
for item in cas:
ca = self.ca_behaviors.get_ca(item)
if ca.model.plugin_name == (
'barbican.plugin.dogtag.DogtagCAPlugin'):
return ca.model.ca_id
return None
def verify_cert_returned(self, order_resp, is_stored_key_type=False):
container_ref = order_resp.model.container_ref
self.assertIsNotNone(container_ref, "no cert container returned")
container_resp = self.container_behaviors.get_container(container_ref)
self.assertIsNotNone(container_resp, "Cert container returns None")
self.assertEqual('certificate', container_resp.model.type)
secret_refs = container_resp.model.secret_refs
self.assertIsNotNone(secret_refs, "container has no secret refs")
contains_cert = False
contains_private_key_ref = False
for secret in secret_refs:
if secret.name == 'certificate':
contains_cert = True
self.assertIsNotNone(secret.secret_ref)
self.verify_valid_cert(secret.secret_ref)
if secret.name == 'intermediates':
self.assertIsNotNone(secret.secret_ref)
self.verify_valid_intermediates(secret.secret_ref)
if is_stored_key_type:
if secret.name == 'private_key':
contains_private_key_ref = True
self.assertIsNotNone(secret.secret_ref)
self.assertTrue(contains_cert)
if is_stored_key_type:
self.assertTrue(contains_private_key_ref)
def verify_valid_cert(self, secret_ref):
secret_resp = self.secret_behaviors.get_secret(
secret_ref,
"application/octet-stream")
self.assertIsNotNone(secret_resp)
self.assertIsNotNone(secret_resp.content)
cert = secret_resp.content
crypto.load_certificate(crypto.FILETYPE_PEM, cert)
def verify_valid_intermediates(self, secret_ref):
secret_resp = self.secret_behaviors.get_secret(
secret_ref,
"application/octet-stream")
self.assertIsNotNone(secret_resp)
self.assertIsNotNone(secret_resp.content)
cert_chain = secret_resp.content
crypto.load_pkcs7_data(crypto.FILETYPE_PEM, cert_chain)
def verify_pending_waiting_for_ca(self, order_resp):
self.assertEqual('PENDING', order_resp.model.status)
self.assertEqual(cert_res.ORDER_STATUS_REQUEST_PENDING.id,
order_resp.model.sub_status)
self.assertEqual(cert_res.ORDER_STATUS_REQUEST_PENDING.message,
order_resp.model.sub_status_message)
def confirm_error_message(self, resp, message):
resp_dict = jsonutils.loads(resp.content)
self.assertEqual(message, resp_dict['description'])
@testtools.testcase.attr('positive')
@testtools.skipIf(dogtag_imports_ok, "not applicable with dogtag plugin")
def test_create_simple_cmc_order(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.behaviors.get_order(order_ref)
self.verify_pending_waiting_for_ca(order_resp)
# Wait for retry processing to handle checking for status with the
# default certificate plugin (which takes about 10 seconds +- 20%).
order_resp = self.wait_for_order(
order_ref, delay_before_check_seconds=20, max_wait_seconds=25)
self.assertEqual('ACTIVE', order_resp.model.status)
@testtools.testcase.attr('positive')
def test_create_simple_cmc_order_without_requestor_info(self):
self.simple_cmc_data.pop("requestor_name", None)
self.simple_cmc_data.pop("requestor_email", None)
self.simple_cmc_data.pop("requestor_phone", None)
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.behaviors.get_order(order_ref)
self.verify_pending_waiting_for_ca(order_resp)
@testtools.testcase.attr('positive')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_simple_cmc_order_with_dogtag_profile(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
test_model.meta['profile'] = 'caServerCert'
test_model.meta['ca_id'] = self.get_dogtag_ca_id()
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.assertEqual('ACTIVE', order_resp.model.status)
self.verify_cert_returned(order_resp)
@testtools.testcase.attr('negative')
def test_create_simple_cmc_with_profile_and_no_ca_id(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
test_model.meta['profile'] = 'caServerCert'
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.assertIsNone(order_ref)
self.confirm_error_message(
create_resp,
"Missing required metadata field for ca_id"
)
@testtools.testcase.attr('negative')
def test_create_simple_cmc_with_profile_and_incorrect_ca_id(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
test_model.meta['profile'] = 'caServerCert'
test_model.meta['ca_id'] = 'incorrect_ca_id'
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.assertIsNone(order_ref)
self.confirm_error_message(
create_resp,
"Order creation issue seen - The ca_id provided "
"in the request is invalid."
)
@testtools.testcase.attr('negative')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_simple_cmc_with_dogtag_and_invalid_subject_dn(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_csr_with_bad_subject_dn())
test_model.meta['profile'] = 'caServerCert'
test_model.meta['ca_id'] = self.get_dogtag_ca_id()
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.assertEqual('ERROR', order_resp.model.status)
self.assertEqual('400', order_resp.model.error_status_code)
self.assertIn('Problem with data in certificate request',
order_resp.model.error_reason)
# TODO(alee) Dogtag does not currently return a error message
# when it does, check for that specific error message
@testtools.testcase.attr('negative')
def test_create_simple_cmc_order_with_no_base64(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
# do not encode with base64 to force the error
test_model.meta['request_data'] = certutil.create_bad_csr()
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.assertIsNone(order_ref)
self.confirm_error_message(create_resp,
"Unable to decode request data.")
@testtools.testcase.attr('negative')
def test_create_simple_cmc_order_with_invalid_pkcs10(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_bad_csr())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.assertIsNone(order_ref)
self.confirm_error_message(create_resp,
"Invalid PKCS10 Data: Bad format")
@testtools.testcase.attr('negative')
def test_create_simple_csc_order_with_unsigned_pkcs10(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_csr_that_has_not_been_signed())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.assertIsNone(order_ref)
error_description = jsonutils.loads(create_resp.content)['description']
self.assertIn("Invalid PKCS10 Data", error_description)
@testtools.testcase.attr('negative')
def test_create_simple_csc_order_with_pkcs10_signed_by_wrong_key(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_csr_signed_with_wrong_key())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Invalid PKCS10 Data: Signing key incorrect"
)
@testtools.testcase.attr('negative')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_simple_cmc_order_with_invalid_dogtag_profile(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
test_model.meta['profile'] = 'invalidProfileID'
test_model.meta['ca_id'] = self.get_dogtag_ca_id()
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.assertEqual('ERROR', order_resp.model.status)
self.assertEqual('400', order_resp.model.error_status_code)
self.assertIn('Problem with data in certificate request',
order_resp.model.error_reason)
self.assertIn('Profile not found',
order_resp.model.error_reason)
@testtools.testcase.attr('positive')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_simple_cmc_order_with_non_approved_dogtag_profile(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
test_model.meta['profile'] = 'caTPSCert'
test_model.meta['ca_id'] = self.get_dogtag_ca_id()
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.verify_pending_waiting_for_ca(order_resp)
@testtools.testcase.attr('negative')
def test_create_simple_cmc_order_with_missing_request(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.assertIsNone(order_ref)
self.confirm_error_message(
create_resp,
"Missing required metadata field for request_data"
)
@testtools.testcase.attr('negative')
def test_create_full_cmc_order(self):
test_model = order_models.OrderModel(**self.full_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.assertIsNone(order_ref)
self.confirm_error_message(
create_resp,
"Full CMC Requests are not yet supported."
)
@testtools.testcase.attr('negative')
def test_create_cert_order_with_invalid_type(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
test_model.meta['request_type'] = "invalid_type"
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Invalid Certificate Request Type"
)
@testtools.testcase.attr('positive')
def test_create_stored_key_order(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = (
self.create_asymmetric_key_container())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.behaviors.get_order(order_ref)
self.verify_pending_waiting_for_ca(order_resp)
@testtools.testcase.attr('positive')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_stored_key_order_with_dogtag_profile(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = (
self.create_asymmetric_key_container())
test_model.meta['profile'] = "caServerCert"
test_model.meta['ca_id'] = self.get_dogtag_ca_id()
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.assertEqual('ACTIVE', order_resp.model.status)
self.verify_cert_returned(order_resp, is_stored_key_type=True)
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_invalid_container_ref(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = INVALID_CONTAINER_REF
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Order creation issue seen - "
"Invalid container: Bad Container Reference "
+ INVALID_CONTAINER_REF + "."
)
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_not_found_container_ref(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = NOT_FOUND_CONTAINER_REF
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Order creation issue seen - "
"Invalid container: Container Not Found."
)
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_missing_container_ref(self):
test_model = order_models.OrderModel(**self.stored_key_data)
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Missing required metadata field for container_ref"
)
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_unauthorized_container_ref(self):
# TODO(alee) - Not sure how to do this
pass
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_invalid_container_type(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = (self.create_generic_container())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Order creation issue seen - "
"Invalid container: Container Wrong Type."
)
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_container_secrets_inaccessible(self):
# TODO(alee) Not sure how to do this
pass
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_subject_dn_missing(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = (
self.create_asymmetric_key_container())
del test_model.meta['subject_dn']
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Missing required metadata field for subject_dn"
)
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_subject_dn_invalid(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = (
self.create_asymmetric_key_container())
test_model.meta['subject_dn'] = "invalid_subject_dn"
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Invalid subject DN: invalid_subject_dn"
)
@testtools.testcase.attr('negative')
def test_create_stored_key_order_with_extensions(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = (
self.create_asymmetric_key_container())
test_model.meta['extensions'] = "any-extensions"
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.confirm_error_message(
create_resp,
"Extensions are not yet supported. "
"Specify a valid profile instead."
)
@testtools.testcase.attr('positive')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_stored_key_order_with_non_approved_dogtag_profile(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = (
self.create_asymmetric_key_container())
test_model.meta['profile'] = "caTPSCert"
test_model.meta['ca_id'] = self.get_dogtag_ca_id()
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.verify_pending_waiting_for_ca(order_resp)
@testtools.testcase.attr('negative')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_stored_key_order_with_invalid_dogtag_profile(self):
test_model = order_models.OrderModel(**self.stored_key_data)
test_model.meta['container_ref'] = (
self.create_asymmetric_key_container())
test_model.meta['profile'] = "invalidProfileID"
test_model.meta['ca_id'] = self.get_dogtag_ca_id()
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.assertEqual('ERROR', order_resp.model.status)
self.assertIn('Problem with data in certificate request',
order_resp.model.error_reason)
self.assertIn('Profile not found',
order_resp.model.error_reason)
@testtools.testcase.attr('positive')
def test_create_cert_order_with_missing_request_type(self):
# defaults to 'custom' type
test_model = order_models.OrderModel(**self.dogtag_custom_data)
test_model.meta['cert_request'] = base64.b64encode(
certutil.create_good_csr())
test_model.meta['profile_id'] = 'caTPSCert'
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.behaviors.get_order(order_ref)
self.verify_pending_waiting_for_ca(order_resp)
@testtools.testcase.attr('positive')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_cert_order_with_missing_request_type_auto_enroll(self):
# defaults to 'custom' type
test_model = order_models.OrderModel(**self.dogtag_custom_data)
test_model.meta['cert_request'] = base64.b64encode(
certutil.create_good_csr())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.assertEqual('ACTIVE', order_resp.model.status)
self.verify_cert_returned(order_resp)
@testtools.testcase.attr('positive')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_custom_order_with_valid_dogtag_data(self):
# defaults to 'custom' type
test_model = order_models.OrderModel(**self.dogtag_custom_data)
test_model.meta['cert_request'] = base64.b64encode(
certutil.create_good_csr())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.assertEqual('ACTIVE', order_resp.model.status)
self.verify_cert_returned(order_resp)
@testtools.testcase.attr('negative')
@testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available")
def test_create_custom_order_with_invalid_dogtag_data(self):
# TODO(alee) this test is broken because Dogtag does not return the
# correct type of exception, Fix this when Dogtag is fixed.
test_model = order_models.OrderModel(**self.dogtag_custom_data)
test_model.meta['cert_request'] = "invalid_data"
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.wait_for_order(order_ref)
self.assertEqual('ERROR', order_resp.model.status)
# TODO(alee) confirm substatus - data error seen
@testtools.testcase.attr('positive')
@testtools.skipIf(dogtag_imports_ok, "Non-Dogtag test only")
def test_create_custom_order_for_generic_plugin(self):
test_model = order_models.OrderModel(**self.dogtag_custom_data)
test_model.meta['container_ref'] = (
self.create_asymmetric_key_container())
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
order_resp = self.behaviors.get_order(order_ref)
self.assertEqual('PENDING', order_resp.model.status)

View File

@ -18,13 +18,11 @@ import testtools
from barbican.plugin.interface import certificate_manager as cert_interface
from functionaltests.api import base
from functionaltests.api.v1.behaviors import ca_behaviors
from functionaltests.api.v1.behaviors import consumer_behaviors
from functionaltests.api.v1.behaviors import container_behaviors
from functionaltests.api.v1.behaviors import order_behaviors
from functionaltests.api.v1.behaviors import quota_behaviors
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import ca_models
from functionaltests.api.v1.models import consumer_model
from functionaltests.api.v1.models import container_models
from functionaltests.api.v1.models import order_models
@ -55,7 +53,6 @@ class QuotaEnforcementTestCase(base.TestCase):
self.order_behaviors = order_behaviors.OrderBehaviors(self.client)
self.consumer_behaviors = consumer_behaviors.ConsumerBehaviors(
self.client)
self.ca_behaviors = ca_behaviors.CABehaviors(self.client)
self.secret_data = self.get_default_secret_data()
self.quota_data = self.get_default_quota_data()
@ -70,7 +67,6 @@ class QuotaEnforcementTestCase(base.TestCase):
self.consumer_behaviors.delete_all_created_consumers()
self.container_behaviors.delete_all_created_containers()
self.secret_behaviors.delete_all_created_secrets()
self.ca_behaviors.delete_all_created_cas()
for secret_ref in self.order_secrets:
resp = self.secret_behaviors.delete_secret(
secret_ref, user_name=admin_b)
@ -150,32 +146,6 @@ class QuotaEnforcementTestCase(base.TestCase):
self.create_consumers(count=5)
self.create_consumers(expected_return=403)
@testtools.skipIf(not is_ca_backend_snakeoil(),
"This test is only usable with snakeoil")
def test_snakeoil_cas_unlimited(self):
self.set_quotas('cas', -1)
self.create_snakeoil_cas(count=5)
@testtools.skipIf(not is_ca_backend_snakeoil(),
"This test is only usable with snakeoil")
def test_snakeoil_cas_disabled(self):
self.set_quotas('cas', 0)
self.create_snakeoil_cas(expected_return=403)
@testtools.skipIf(not is_ca_backend_snakeoil(),
"This test is only usable with snakeoil")
def test_snakeoil_cas_limited_one(self):
self.set_quotas('cas', 1)
self.create_snakeoil_cas(count=1)
self.create_snakeoil_cas(expected_return=403)
@testtools.skipIf(not is_ca_backend_snakeoil(),
"This test is only usable with snakeoil")
def test_snakeoil_cas_limited_five(self):
self.set_quotas('cas', 5)
self.create_snakeoil_cas(count=5)
self.create_snakeoil_cas(expected_return=403)
# ----------------------- Helper Functions ---------------------------
def get_default_quota_data(self):
@ -269,46 +239,3 @@ class QuotaEnforcementTestCase(base.TestCase):
resp, consumer_dat = self.consumer_behaviors.create_consumer(
model, container_ref, user_name=admin_b)
self.assertEqual(expected_return, resp.status_code)
def get_order_simple_cmc_request_data(self):
return {
'type': 'certificate',
'meta': {
'request_type': 'simple-cmc',
'requestor_name': 'Barbican User',
'requestor_email': 'user@example.com',
'requestor_phone': '555-1212'
}
}
def get_root_ca_ref(self):
if self.root_ca_ref is not None:
return self.root_ca_ref
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
snake_name = 'barbican.plugin.snakeoil_ca.SnakeoilCACertificatePlugin'
snake_plugin_ca_id = "Snakeoil CA"
for item in cas:
ca = self.ca_behaviors.get_ca(item)
if ca.model.plugin_name == snake_name:
if ca.model.plugin_ca_id == snake_plugin_ca_id:
return item
return None
def get_snakeoil_subca_model(self):
parent_ca_ref = self.get_root_ca_ref()
return ca_models.CAModel(
parent_ca_ref=parent_ca_ref,
description="Test Snake Oil Subordinate CA",
name="Subordinate CA",
subject_dn="CN=Subordinate CA, O=example.com"
)
def create_snakeoil_cas(self, count=1, expected_return=201):
"""Utility function to create snakeoil cas"""
for _ in range(count):
ca_model = self.get_snakeoil_subca_model()
resp, ca_ref = self.ca_behaviors.create_ca(ca_model,
user_name=admin_b)
self.assertEqual(expected_return, resp.status_code)

View File

@ -0,0 +1,38 @@
---
prelude: >
This release notify that we will remove Certificate Orders and CAs from API.
deprecations:
- Certificate Orders
- CAs
other:
- Why are we deprecating Certificate Issuance?
There are a few reasons that were considered for this decision. First,
there does not seem to be a lot of interest in the community to fully
develop the Certificate Authority integration with Barbican. We have a
few outstanding blueprints that are needed to make Certificate Issuance
fully functional, but so far no one has committed to getting the work
done. Additionally, we've had very little buy-in from public
Certificate Authorities. Both Symantec and Digicert were interested in
integration in the past, but that interest didn't materialize into
robust CA plugins like we hoped it would.
Secondly, there have been new developments in the space of Certificate
Authorities since we started Barbican. The most significant of these
was the launch of the Let's Encrypt public CA along with the definition
of the ACME protocol for certificate issuance. We believe that future
certificate authority services would do good to implement the ACME
standard, which is quite different than the API the Barbican team had
developed.
Lastly, deprecating Certificate Issuance within Barbican will simplify
both the architecture and deployment of Barbican. This will allow us to
focus on the features that Barbican does well -- the secure storage of
secret material.
- Will Barbican still be able to store Certificates?
Yes, absolutely! The only thing we're deprecating is the plugin
interface that talks to Certificate Authorites and associated APIs.
While you will not be able to use Barbican to issue a new certificate,
you will always be able to securely store any certificates in Barbican,
including those issued by public CAs or internal CAs.