Merge "Add Secret Consumer Controllers and their tests"

This commit is contained in:
Zuul 2020-01-17 15:12:34 +00:00 committed by Gerrit Code Review
commit a699da6408
8 changed files with 1221 additions and 61 deletions

View File

@ -44,7 +44,7 @@ def _invalid_consumer_id():
class ContainerConsumerController(controllers.ACLMixin):
"""Handles Consumer entity retrieval and deletion requests."""
"""Handles Container Consumer entity retrieval and deletion requests"""
def __init__(self, consumer_id):
self.consumer_id = consumer_id
@ -76,7 +76,7 @@ class ContainerConsumerController(controllers.ACLMixin):
class ContainerConsumersController(controllers.ACLMixin):
"""Handles Consumer creation requests."""
"""Handles Container Consumer creation requests"""
def __init__(self, container_id):
self.container_id = container_id
@ -131,7 +131,7 @@ class ContainerConsumersController(controllers.ACLMixin):
)
resp_ctrs_overall.update({'total': total})
LOG.info('Retrieved a consumer list for project: %s',
LOG.info('Retrieved a container consumer list for project: %s',
external_project_id)
return resp_ctrs_overall
@ -157,7 +157,7 @@ class ContainerConsumersController(controllers.ACLMixin):
url = hrefs.convert_consumer_to_href(new_consumer.container_id)
pecan.response.headers['Location'] = url
LOG.info('Created a consumer for project: %s',
LOG.info('Created a container consumer for project: %s',
external_project_id)
return self._return_container_data(self.container_id)
@ -182,7 +182,7 @@ class ContainerConsumersController(controllers.ACLMixin):
)
if not consumer:
_consumer_not_found()
LOG.debug("Found consumer: %s", consumer)
LOG.debug("Found container consumer: %s", consumer)
container = self._get_container(self.container_id)
owner_of_consumer = consumer.project_id == project.id
@ -195,11 +195,11 @@ class ContainerConsumersController(controllers.ACLMixin):
self.consumer_repo.delete_entity_by_id(consumer.id,
external_project_id)
except exception.NotFound:
LOG.exception('Problem deleting consumer')
LOG.exception('Problem deleting container consumer')
_consumer_not_found()
ret_data = self._return_container_data(self.container_id)
LOG.info('Deleted a consumer for project: %s',
LOG.info('Deleted a container consumer for project: %s',
external_project_id)
return ret_data
@ -222,3 +222,183 @@ class ContainerConsumersController(controllers.ACLMixin):
return hrefs.convert_to_hrefs(
hrefs.convert_to_hrefs(dict_fields)
)
class SecretConsumerController(controllers.ACLMixin):
"""Handles Secret Consumer entity retrieval and deletion requests"""
def __init__(self, consumer_id):
self.consumer_id = consumer_id
self.consumer_repo = repo.get_secret_consumer_repository()
self.validator = validators.SecretConsumerValidator()
@pecan.expose(generic=True)
def index(self):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@index.when(method='GET', template='json')
@controllers.handle_exceptions(u._('SecretConsumer retrieval'))
@controllers.enforce_rbac('consumer:get')
def on_get(self, external_project_id):
consumer = self.consumer_repo.get(
entity_id=self.consumer_id,
suppress_exception=True)
if not consumer:
_consumer_not_found()
dict_fields = consumer.to_dict_fields()
LOG.info('Retrieved a secret consumer for project: %s',
external_project_id)
return hrefs.convert_to_hrefs(
hrefs.convert_to_hrefs(dict_fields)
)
class SecretConsumersController(controllers.ACLMixin):
"""Handles Secret Consumer creation requests"""
def __init__(self, secret_id):
self.secret_id = secret_id
self.consumer_repo = repo.get_secret_consumer_repository()
self.secret_repo = repo.get_secret_repository()
self.project_repo = repo.get_project_repository()
self.validator = validators.SecretConsumerValidator()
self.quota_enforcer = quota.QuotaEnforcer('consumers',
self.consumer_repo)
@pecan.expose()
def _lookup(self, consumer_id, *remainder):
if not utils.validate_id_is_uuid(consumer_id):
_invalid_consumer_id()()
return SecretConsumerController(consumer_id), remainder
@pecan.expose(generic=True)
def index(self, **kwargs):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@index.when(method='GET', template='json')
@controllers.handle_exceptions(u._('SecretConsumers(s) retrieval'))
@controllers.enforce_rbac('consumers:get')
def on_get(self, external_project_id, **kw):
LOG.debug('Start consumers on_get '
'for secret-ID %s:', self.secret_id)
result = self.consumer_repo.get_by_secret_id(
self.secret_id,
offset_arg=kw.get('offset', 0),
limit_arg=kw.get('limit'),
suppress_exception=True
)
consumers, offset, limit, total = result
if not consumers:
resp_ctrs_overall = {'consumers': [], 'total': total}
else:
resp_ctrs = [
hrefs.convert_to_hrefs(c.to_dict_fields())
for c in consumers
]
consumer_path = "secrets/{secret_id}/consumers".format(
secret_id=self.secret_id)
resp_ctrs_overall = hrefs.add_nav_hrefs(
consumer_path,
offset,
limit,
total,
{'consumers': resp_ctrs}
)
resp_ctrs_overall.update({'total': total})
LOG.info('Retrieved a consumer list for project: %s',
external_project_id)
return resp_ctrs_overall
@index.when(method='POST', template='json')
@controllers.handle_exceptions(u._('SecretConsumer creation'))
@controllers.enforce_rbac('consumers:post')
@controllers.enforce_content_types(['application/json'])
def on_post(self, external_project_id, **kwargs):
project = res.get_or_create_project(external_project_id)
data = api.load_body(pecan.request, validator=self.validator)
LOG.debug('Start on_post...%s', data)
secret = self._get_secret(self.secret_id)
self.quota_enforcer.enforce(project)
new_consumer = models.SecretConsumerMetadatum(
self.secret_id,
project.id,
data["service"],
data["resource_type"],
data["resource_id"],
)
self.consumer_repo.create_or_update_from(new_consumer, secret)
url = hrefs.convert_consumer_to_href(new_consumer.secret_id)
pecan.response.headers['Location'] = url
LOG.info('Created a consumer for project: %s',
external_project_id)
return self._return_secret_data(self.secret_id)
@index.when(method='DELETE', template='json')
@controllers.handle_exceptions(u._('SecretConsumer deletion'))
@controllers.enforce_rbac('consumers:delete')
@controllers.enforce_content_types(['application/json'])
def on_delete(self, external_project_id, **kwargs):
data = api.load_body(pecan.request, validator=self.validator)
LOG.debug('Start on_delete...%s', data)
project = self.project_repo.find_by_external_project_id(
external_project_id, suppress_exception=True)
if not project:
_consumer_not_found()
consumer = self.consumer_repo.get_by_values(
self.secret_id,
data["resource_id"],
suppress_exception=True
)
if not consumer:
_consumer_not_found()
LOG.debug("Found consumer: %s", consumer)
secret = self._get_secret(self.secret_id)
owner_of_consumer = consumer.project_id == project.id
owner_of_secret = secret.project.external_id \
== external_project_id
if not owner_of_consumer and not owner_of_secret:
_consumer_ownership_mismatch()
try:
self.consumer_repo.delete_entity_by_id(consumer.id,
external_project_id)
except exception.NotFound:
LOG.exception('Problem deleting consumer')
_consumer_not_found()
ret_data = self._return_secret_data(self.secret_id)
LOG.info('Deleted a consumer for project: %s',
external_project_id)
return ret_data
def _get_secret(self, secret_id):
secret = self.secret_repo.get_secret_by_id(
secret_id, suppress_exception=True)
if not secret:
controllers.secrets.secret_not_found()
return secret
def _return_secret_data(self, secret_id):
secret = self._get_secret(secret_id)
dict_fields = secret.to_dict_fields()
return hrefs.convert_to_hrefs(
hrefs.convert_to_hrefs(dict_fields)
)

View File

@ -17,6 +17,7 @@ from six.moves.urllib import parse
from barbican import api
from barbican.api import controllers
from barbican.api.controllers import acls
from barbican.api.controllers import consumers
from barbican.api.controllers import secretmeta
from barbican.common import accept
from barbican.common import exception
@ -76,6 +77,8 @@ class SecretController(controllers.ACLMixin):
def __init__(self, secret):
LOG.debug('=== Creating SecretController ===')
self.secret = secret
self.consumers = consumers.SecretConsumersController(secret.id)
self.consumer_repo = repo.get_secret_consumer_repository()
self.transport_key_repo = repo.get_transport_key_repository()
def get_acl_tuple(self, req, **kwargs):
@ -253,9 +256,20 @@ class SecretController(controllers.ACLMixin):
@controllers.handle_exceptions(u._('Secret deletion'))
@controllers.enforce_rbac('secret:delete')
def on_delete(self, external_project_id, **kwargs):
secret_consumers = self.consumer_repo.get_by_secret_id(
self.secret.id,
suppress_exception=True
)
plugin.delete_secret(self.secret, external_project_id)
LOG.info('Deleted secret for project: %s', external_project_id)
for consumer in secret_consumers[0]:
try:
self.consumer_repo.delete_entity_by_id(
consumer.id, external_project_id)
except exception.NotFound: # nosec
pass
class SecretsController(controllers.ACLMixin):
"""Handles Secret creation requests."""

View File

@ -374,6 +374,13 @@ class Secret(BASE, SoftDeleteMixIn, ModelBase):
'bit_length': self.bit_length,
'mode': self.mode,
'creator_id': self.creator_id,
"consumers": [
{
"service": consumer.service,
"resource_type": consumer.resource_type,
"resource_id": consumer.resource_id,
} for consumer in self.consumers if not consumer.deleted
],
}
@ -576,16 +583,16 @@ class Order(BASE, SoftDeleteMixIn, ModelBase):
cascade="all, delete-orphan")
def __init__(self, parsed_request=None, check_exc=True):
"""Creates a Order entity from a dict."""
super(Order, self).__init__()
if parsed_request:
self.type = parsed_request.get('type')
self.meta = parsed_request.get('meta')
self.status = States.ACTIVE
self.sub_status = parsed_request.get('sub_status')
self.sub_status_message = parsed_request.get(
'sub_status_message')
self.creator_id = parsed_request.get('creator_id')
"""Creates a Order entity from a dict."""
super(Order, self).__init__()
if parsed_request:
self.type = parsed_request.get('type')
self.meta = parsed_request.get('meta')
self.status = States.ACTIVE
self.sub_status = parsed_request.get('sub_status')
self.sub_status_message = parsed_request.get(
'sub_status_message')
self.creator_id = parsed_request.get('creator_id')
def set_error_reason_safely(self, error_reason_raw):
"""Ensure error reason does not raise database attribute exceptions."""

View File

@ -17,10 +17,10 @@ import os
from barbican.tests import utils
class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
class WhenTestingContainerConsumersResource(utils.BarbicanAPIBaseTestCase):
def setUp(self):
super(WhenTestingConsumersResource, self).setUp()
super(WhenTestingContainerConsumersResource, self).setUp()
self.container_name = "Im_a_container"
self.container_type = "generic"
@ -48,7 +48,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_consumer(
consumer_resp, consumer = create_container_consumer(
self.app,
container_id=container_uuid,
name=self.consumer_a["name"],
@ -66,7 +66,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_consumer(
consumer_resp, consumers = create_container_consumer(
self.app,
container_id=container_uuid,
name=self.consumer_a["name"],
@ -74,7 +74,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_consumer(
consumer_resp, consumers = create_container_consumer(
self.app,
container_id=container_uuid,
name=self.consumer_b["name"],
@ -82,7 +82,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_consumer(
consumer_resp, consumers = create_container_consumer(
self.app,
container_id=container_uuid,
name=self.consumer_c["name"],
@ -116,7 +116,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_consumer(
consumer_resp, consumers = create_container_consumer(
self.app,
container_id=container_uuid,
name=self.consumer_a["name"],
@ -124,7 +124,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_consumer(
consumer_resp, consumers = create_container_consumer(
self.app,
container_id=container_uuid,
name=self.consumer_b["name"],
@ -132,7 +132,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_consumer(
consumer_resp, consumers = create_container_consumer(
self.app,
container_id=container_uuid,
name=self.consumer_c["name"],
@ -170,7 +170,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_consumer(
consumer_resp, consumers = create_container_consumer(
self.app,
container_id=container_uuid,
name=self.consumer_a["name"],
@ -208,7 +208,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
self.assertEqual([], consumer_get_resp.json["consumers"])
def test_fail_create_container_not_found(self):
consumer_resp, consumers = create_consumer(
consumer_resp, consumers = create_container_consumer(
self.app,
container_id="bad_container_id",
name=self.consumer_a["name"],
@ -271,7 +271,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_consumer(
consumer_resp, consumer = create_container_consumer(
self.app,
container_id=container_uuid,
url="http://theurl",
@ -287,7 +287,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_consumer(
consumer_resp, consumer = create_container_consumer(
self.app,
container_id=container_uuid,
name="thename",
@ -303,7 +303,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_consumer(
consumer_resp, consumer = create_container_consumer(
self.app,
container_id=container_uuid,
name="",
@ -320,7 +320,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_consumer(
consumer_resp, consumer = create_container_consumer(
self.app,
container_id=container_uuid,
name="thename",
@ -330,6 +330,336 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
self.assertEqual(400, consumer_resp.status_int)
class WhenTestingSecretConsumersResource(utils.BarbicanAPIBaseTestCase):
def setUp(self):
super(WhenTestingSecretConsumersResource, self).setUp()
self.consumer_a = {
"service": "service_a",
"resource_type": "resource_type_a",
"resource_id": "resource_id_a",
}
self.consumer_b = {
"service": "service_b",
"resource_type": "resource_type_b",
"resource_id": "resource_id_b",
}
self.consumer_c = {
"service": "service_c",
"resource_type": "resource_type_c",
"resource_id": "resource_id_c",
}
def test_can_create_new_consumer(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
self.assertEqual([self.consumer_a], consumer)
def test_can_get_consumers(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_b["service"],
resource_type=self.consumer_b["resource_type"],
resource_id=self.consumer_b["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_c["service"],
resource_type=self.consumer_c["resource_type"],
resource_id=self.consumer_c["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_get_resp = self.app.get(
'/secrets/{secret_id}/consumers/'.format(
secret_id=secret_id))
self.assertEqual(200, consumer_get_resp.status_int)
self.assertIn(consumers[0]["service"],
consumer_get_resp.json["consumers"][0]["service"])
self.assertIn(consumers[0]["resource_type"],
consumer_get_resp.json["consumers"][0]["resource_type"])
self.assertIn(consumers[0]["resource_id"],
consumer_get_resp.json["consumers"][0]["resource_id"])
self.assertIn(consumers[1]["service"],
consumer_get_resp.json["consumers"][1]["service"])
self.assertIn(consumers[1]["resource_type"],
consumer_get_resp.json["consumers"][1]["resource_type"])
self.assertIn(consumers[1]["resource_id"],
consumer_get_resp.json["consumers"][1]["resource_id"])
self.assertIn(consumers[2]["service"],
consumer_get_resp.json["consumers"][2]["service"])
self.assertIn(consumers[2]["resource_type"],
consumer_get_resp.json["consumers"][2]["resource_type"])
self.assertIn(consumers[2]["resource_id"],
consumer_get_resp.json["consumers"][2]["resource_id"])
def test_can_get_consumers_with_limit_and_offset(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_b["service"],
resource_type=self.consumer_b["resource_type"],
resource_id=self.consumer_b["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_c["service"],
resource_type=self.consumer_c["resource_type"],
resource_id=self.consumer_c["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_get_resp = self.app.get(
'/secrets/{secret_id}/consumers/?limit=1&offset=1'.format(
secret_id=secret_id))
self.assertEqual(200, consumer_get_resp.status_int)
secret_url = resp.json["secret_ref"]
prev_cons = u"{secret_url}/consumers?limit=1&offset=0".format(
secret_url=secret_url)
self.assertEqual(prev_cons, consumer_get_resp.json["previous"])
next_cons = u"{secret_url}/consumers?limit=1&offset=2".format(
secret_url=secret_url)
self.assertEqual(next_cons, consumer_get_resp.json["next"])
self.assertEqual(
self.consumer_b["service"],
consumer_get_resp.json["consumers"][0]["service"]
)
self.assertEqual(
self.consumer_b["resource_type"],
consumer_get_resp.json["consumers"][0]["resource_type"]
)
self.assertEqual(
self.consumer_b["resource_id"],
consumer_get_resp.json["consumers"][0]["resource_id"]
)
self.assertEqual(3, consumer_get_resp.json["total"])
def test_can_delete_consumer(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
request = {
"service": self.consumer_a["service"],
"resource_type": self.consumer_a["resource_type"],
"resource_id": self.consumer_a["resource_id"],
}
cleaned_request = {key: val for key, val in request.items()
if val is not None}
consumer_del_resp = self.app.delete_json(
'/secrets/{secret_id}/consumers/'.format(
secret_id=secret_id
), cleaned_request, headers={'Content-Type': 'application/json'})
self.assertEqual(200, consumer_del_resp.status_int)
def test_can_get_no_consumers(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_get_resp = self.app.get(
'/secrets/{secret_id}/consumers/'.format(
secret_id=secret_id))
self.assertEqual(200, consumer_get_resp.status_int)
self.assertEqual([], consumer_get_resp.json["consumers"])
def test_fail_create_secret_not_found(self):
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id="bad_secret_id",
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
expect_errors=True
)
self.assertEqual(404, consumer_resp.status_int)
def test_fail_get_secret_not_found(self):
consumer_get_resp = self.app.get(
'/secrets/{secret_id}/consumers/'.format(
secret_id="bad_secret_id"), expect_errors=True)
self.assertEqual(404, consumer_get_resp.status_int)
def test_fail_delete_secret_not_found(self):
request = {
"service": self.consumer_a["service"],
"resource_type": self.consumer_a["resource_type"],
"resource_id": self.consumer_a["resource_id"],
}
cleaned_request = {key: val for key, val in request.items()
if val is not None}
consumer_del_resp = self.app.delete_json(
'/secrets/{secret_id}/consumers/'.format(
secret_id="bad_secret_id"
), cleaned_request, headers={'Content-Type': 'application/json'},
expect_errors=True)
self.assertEqual(404, consumer_del_resp.status_int)
def test_fail_delete_consumer_not_found(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
request = {
"service": self.consumer_a["service"],
"resource_type": self.consumer_a["resource_type"],
"resource_id": self.consumer_a["resource_id"],
}
cleaned_request = {key: val for key, val in request.items()
if val is not None}
consumer_del_resp = self.app.delete_json(
'/secrets/{secret_id}/consumers/'.format(
secret_id=secret_id
), cleaned_request, headers={'Content-Type': 'application/json'},
expect_errors=True)
self.assertEqual(404, consumer_del_resp.status_int)
def test_fail_create_no_service(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
resource_type="resource_type",
resource_id="resource_id",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_no_resource_type(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="service",
resource_id="resource_id",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_no_resource_id(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="service",
resource_type="resource_type",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_empty_service(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="",
resource_type="resource_type",
resource_id="resource_id",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_empty_resource_type(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="service",
resource_type="",
resource_id="resource_id",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_empty_resource_id(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="service",
resource_type="resource_type",
resource_id="",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
# ----------------------- Helper Functions ---------------------------
def create_container(app, name=None, container_type=None, expect_errors=False,
headers=None):
@ -355,8 +685,8 @@ def create_container(app, name=None, container_type=None, expect_errors=False,
return resp, created_uuid
def create_consumer(app, container_id=None, name=None, url=None,
expect_errors=False, headers=None):
def create_container_consumer(app, container_id=None, name=None, url=None,
expect_errors=False, headers=None):
request = {
'name': name,
'URL': url
@ -377,3 +707,38 @@ def create_consumer(app, container_id=None, name=None, url=None,
consumers = resp.json.get('consumers', '')
return resp, consumers
def create_secret(app, expect_errors=False):
resp = app.post_json('/secrets/', {}, expect_errors=expect_errors)
secret_id = None
if resp.status_int == 201:
secret_ref = resp.json.get('secret_ref', '')
_, secret_id = os.path.split(secret_ref)
return resp, secret_id
def create_secret_consumer(app, secret_id=None, service=None,
resource_type=None, resource_id=None,
expect_errors=False, headers=None):
request = {
"service": service,
"resource_type": resource_type,
"resource_id": resource_id,
}
request = {k: v for k, v in request.items() if v is not None}
resp = app.post_json(
"/secrets/{}/consumers/".format(secret_id),
request,
expect_errors=expect_errors,
headers=headers
)
consumers = None
if resp.status_int == 200:
consumers = resp.json.get('consumers', '')
return resp, consumers

View File

@ -52,14 +52,17 @@ def get_barbican_env(external_project_id):
def create_secret(id_ref="id", name="name",
algorithm=None, bit_length=None,
mode=None, encrypted_datum=None, content_type=None):
algorithm=None, bit_length=None, mode=None,
encrypted_datum=None, content_type=None, project_id=None):
"""Generate a Secret entity instance."""
info = {'id': id_ref,
'name': name,
'algorithm': algorithm,
'bit_length': bit_length,
'mode': mode}
info = {
'id': id_ref,
'name': name,
'algorithm': algorithm,
'bit_length': bit_length,
'mode': mode,
'project_id': project_id,
}
secret = models.Secret(info)
secret.id = id_ref
if encrypted_datum:
@ -109,7 +112,7 @@ def create_container(id_ref, project_id=None, external_project_id=None):
return container
def create_consumer(container_id, project_id, id_ref):
def create_container_consumer(container_id, project_id, id_ref):
"""Generate a ContainerConsumerMetadatum entity instance."""
data = {
'name': 'test name',
@ -122,6 +125,19 @@ def create_consumer(container_id, project_id, id_ref):
return consumer
def create_secret_consumer(secret_id, project_id, id_ref):
"""Generate a SecretConsumerMetadatum entity instance."""
consumer = models.SecretConsumerMetadatum(
secret_id,
project_id,
"service",
"resource_type",
"resource_id",
)
consumer.id = id_ref
return consumer
class SecretAllowAllMimeTypesDecoratorTest(utils.BaseTestCase):
def setUp(self):
@ -719,10 +735,10 @@ class TestingJsonSanitization(utils.BaseTestCase):
.endswith(' '), "whitespace should be gone")
class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
class WhenCreatingContainerConsumersUsingResource(FunctionalTest):
def setUp(self):
super(
WhenCreatingConsumersUsingConsumersResource, self
WhenCreatingContainerConsumersUsingResource, self
).setUp()
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
self.app.extra_environ = get_barbican_env(self.external_project_id)
@ -832,11 +848,11 @@ class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
self.assertEqual(404, resp.status_int)
class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
class WhenGettingOrDeletingContainerConsumersUsingResource(FunctionalTest):
def setUp(self):
super(
WhenGettingOrDeletingConsumersUsingConsumerResource, self
WhenGettingOrDeletingContainerConsumersUsingResource, self
).setUp()
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
self.app.extra_environ = get_barbican_env(self.external_project_id)
@ -871,10 +887,10 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
external_project_id=self.external_project_id)
# Set up mocked consumers
self.consumer = create_consumer(
self.consumer = create_container_consumer(
self.container.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer2 = create_consumer(
self.consumer2 = create_container_consumer(
self.container.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
@ -1019,10 +1035,10 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
)
class WhenPerformingUnallowedOperationsOnConsumers(FunctionalTest):
class WhenPerformingUnallowedOperationsOnContainerConsumers(FunctionalTest):
def setUp(self):
super(
WhenPerformingUnallowedOperationsOnConsumers, self
WhenPerformingUnallowedOperationsOnContainerConsumers, self
).setUp()
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
self.app.extra_environ = get_barbican_env(self.external_project_id)
@ -1078,10 +1094,10 @@ class WhenPerformingUnallowedOperationsOnConsumers(FunctionalTest):
external_project_id=self.external_project_id)
# Set up mocked container consumers
self.consumer = create_consumer(
self.consumer = create_container_consumer(
self.container.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer2 = create_consumer(
self.consumer2 = create_container_consumer(
self.container.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
@ -1145,11 +1161,11 @@ class WhenPerformingUnallowedOperationsOnConsumers(FunctionalTest):
self.assertEqual(405, resp.status_int)
class WhenOwnershipMismatch(FunctionalTest):
class WhenOwnershipMismatchForContainerConsumer(FunctionalTest):
def setUp(self):
super(
WhenOwnershipMismatch, self
WhenOwnershipMismatchForContainerConsumer, self
).setUp()
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
self.app.extra_environ = get_barbican_env(self.external_project_id)
@ -1183,12 +1199,12 @@ class WhenOwnershipMismatch(FunctionalTest):
external_project_id='differentProjectId')
# Set up mocked consumers
self.consumer = create_consumer(self.container.id,
self.project_internal_id,
id_ref='id2')
self.consumer2 = create_consumer(self.container.id,
self.project_internal_id,
id_ref='id3')
self.consumer = create_container_consumer(self.container.id,
self.project_internal_id,
id_ref='id2')
self.consumer2 = create_container_consumer(self.container.id,
self.project_internal_id,
id_ref='id3')
self.consumer_ref = {
'name': self.consumer.name,
@ -1215,3 +1231,462 @@ class WhenOwnershipMismatch(FunctionalTest):
'/containers/{0}/consumers/'.format(self.container.id),
self.consumer_ref, expect_errors=True)
self.assertEqual(403, resp.status_int)
class WhenCreatingSecretConsumersUsingResource(FunctionalTest):
def setUp(self):
super(
WhenCreatingSecretConsumersUsingResource, self
).setUp()
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
self.app.extra_environ = get_barbican_env(self.external_project_id)
@property
def root(self):
self._init()
class RootController(object):
secrets = controllers.secrets.SecretsController()
return RootController()
def _init(self):
self.external_project_id = 'keystoneid1234'
self.project_internal_id = 'projectid1234'
# Set up mocked project
self.project = models.Project()
self.project.id = self.project_internal_id
self.project.external_id = self.external_project_id
# Set up mocked secret
self.secret = models.Secret()
self.secret.id = utils.generate_test_valid_uuid()
self.secret.project = self.project
self.secret.project_id = self.project_internal_id
# Set up consumer ref
self.consumer_ref = {
"service": "service",
"resource_type": "resource_type",
"resource_id": "resource_id",
}
# Set up mocked project repo
self.project_repo = mock.MagicMock()
self.project_repo.get.return_value = self.project
self.setup_project_repository_mock(self.project_repo)
# Set up mocked quota enforcer
self.quota_patch = mock.patch(
'barbican.common.quota.QuotaEnforcer.enforce', return_value=None)
self.quota_patch.start()
self.addCleanup(self.quota_patch.stop)
# Set up mocked secret repo
self.secret_repo = mock.MagicMock()
self.secret_repo.get_secret_by_id.return_value = self.secret
self.setup_secret_repository_mock(self.secret_repo)
# Set up mocked secret meta repo
self.secret_meta_repo = mock.MagicMock()
self.secret_meta_repo.get_metadata_for_secret.return_value = None
self.setup_secret_meta_repository_mock(self.secret_meta_repo)
# Set up mocked secret consumer repo
self.consumer_repo = mock.MagicMock()
self.consumer_repo.create_from.return_value = None
self.setup_secret_consumer_repository_mock(self.consumer_repo)
def test_should_add_new_consumer(self):
resp = self.app.post_json(
'/secrets/{0}/consumers/'.format(self.secret.id),
self.consumer_ref
)
self.assertEqual(200, resp.status_int)
self.assertNotIn(self.external_project_id, resp.headers['Location'])
args, kwargs = self.consumer_repo.create_or_update_from.call_args
consumer = args[0]
self.assertIsInstance(consumer, models.SecretConsumerMetadatum)
def test_should_fail_consumer_bad_json(self):
resp = self.app.post(
'/secrets/{0}/consumers/'.format(self.secret.id),
'',
expect_errors=True
)
self.assertEqual(415, resp.status_int)
def test_should_404_when_secret_ref_doesnt_exist(self):
self.secret_repo.get_secret_by_id.return_value = None
resp = self.app.post_json(
'/secrets/{0}/consumers/'.format('bad_id'),
self.consumer_ref, expect_errors=True
)
self.assertEqual(404, resp.status_int)
class WhenGettingOrDeletingSecretConsumersUsingResource(FunctionalTest):
def setUp(self):
super(
WhenGettingOrDeletingSecretConsumersUsingResource, self
).setUp()
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
self.app.extra_environ = get_barbican_env(self.external_project_id)
@property
def root(self):
self._init()
class RootController(object):
secrets = controllers.secrets.SecretsController()
return RootController()
def _init(self):
self.external_project_id = 'keystoneid1234'
self.project_internal_id = 'projectid1234'
# Set up mocked project
self.project = models.Project()
self.project.id = self.project_internal_id
self.project.external_id = self.external_project_id
# Set up mocked secret
self.secret = models.Secret()
self.secret.id = utils.generate_test_valid_uuid()
self.secret.project = self.project
self.secret.project_id = self.project_internal_id
# Set up mocked consumers
self.consumer = create_secret_consumer(
self.secret.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer2 = create_secret_consumer(
self.secret.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer_ref = {
"service": self.consumer.service,
"resource_type": self.consumer.resource_type,
"resource_id": self.consumer.resource_type,
}
# Set up mocked project repo
self.project_repo = mock.MagicMock()
self.project_repo.get.return_value = self.project
self.setup_project_repository_mock(self.project_repo)
# Set up mocked secret repo
self.secret_repo = mock.MagicMock()
self.secret_repo.get_secret_by_id.return_value = self.secret
self.setup_secret_repository_mock(self.secret_repo)
# Set up mocked secret meta repo
self.secret_meta_repo = mock.MagicMock()
self.secret_meta_repo.get_metadata_for_secret.return_value = None
self.setup_secret_meta_repository_mock(self.secret_meta_repo)
# Set up mocked secret consumer repo
self.consumer_repo = mock.MagicMock()
self.consumer_repo.get_by_values.return_value = self.consumer
self.consumer_repo.delete_entity_by_id.return_value = None
self.setup_secret_consumer_repository_mock(self.consumer_repo)
def test_should_get_consumer(self):
ret_val = ([self.consumer], 0, 0, 1)
self.consumer_repo.get_by_secret_id.return_value = ret_val
resp = self.app.get('/secrets/{0}/consumers/'.format(
self.secret.id
))
self.assertEqual(200, resp.status_int)
self.consumer_repo.get_by_secret_id.assert_called_once_with(
self.secret.id,
limit_arg=None,
offset_arg=0,
suppress_exception=True
)
self.assertEqual(
self.consumer.service,
resp.json["consumers"][0]["service"],
)
self.assertEqual(
self.consumer.resource_type,
resp.json["consumers"][0]["resource_type"]
)
self.assertEqual(
self.consumer.resource_id,
resp.json["consumers"][0]["resource_id"]
)
def test_should_404_when_secret_ref_doesnt_exist(self):
self.secret_repo.get_secret_by_id.return_value = None
resp = self.app.get('/secrets/{0}/consumers/'.format(
'bad_id'
), expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_get_consumer_by_id(self):
self.consumer_repo.get.return_value = self.consumer
resp = self.app.get('/secrets/{0}/consumers/{1}/'.format(
self.secret.id, self.consumer.id
))
self.assertEqual(200, resp.status_int)
def test_should_404_with_bad_consumer_id(self):
self.consumer_repo.get.return_value = None
resp = self.app.get('/secrets/{0}/consumers/{1}/'.format(
self.secret.id, 'bad_id'
), expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_get_no_consumers(self):
self.consumer_repo.get_by_secret_id.return_value = ([], 0, 0, 0)
resp = self.app.get('/secrets/{0}/consumers/'.format(
self.secret.id
))
self.assertEqual(200, resp.status_int)
def test_should_delete_consumer(self):
self.app.delete_json('/secrets/{0}/consumers/'.format(
self.secret.id
), self.consumer_ref)
self.consumer_repo.delete_entity_by_id.assert_called_once_with(
self.consumer.id, self.external_project_id)
def test_should_fail_deleting_consumer_bad_json(self):
resp = self.app.delete(
'/secrets/{0}/consumers/'.format(self.secret.id),
'',
expect_errors=True
)
self.assertEqual(415, resp.status_int)
def test_should_404_on_delete_when_consumer_not_found(self):
old_return = self.consumer_repo.get_by_values.return_value
self.consumer_repo.get_by_values.return_value = None
resp = self.app.delete_json('/secrets/{0}/consumers/'.format(
self.secret.id
), self.consumer_ref, expect_errors=True)
self.consumer_repo.get_by_values.return_value = old_return
self.assertEqual(404, resp.status_int)
# Error response should have json content type
self.assertEqual("application/json", resp.content_type)
def test_should_404_on_delete_when_consumer_not_found_later(self):
self.consumer_repo.delete_entity_by_id.side_effect = excep.NotFound()
resp = self.app.delete_json('/secrets/{0}/consumers/'.format(
self.secret.id
), self.consumer_ref, expect_errors=True)
self.consumer_repo.delete_entity_by_id.side_effect = None
self.assertEqual(404, resp.status_int)
# Error response should have json content type
self.assertEqual("application/json", resp.content_type)
def test_should_delete_consumers_on_secret_delete(self):
consumers = [self.consumer, self.consumer2]
ret_val = (consumers, 0, 0, 1)
self.consumer_repo.get_by_secret_id.return_value = ret_val
resp = self.app.delete(
'/secrets/{0}/'.format(self.secret.id)
)
self.assertEqual(204, resp.status_int)
# Verify consumers were deleted
calls = []
for consumer in consumers:
calls.append(mock.call(consumer.id, self.external_project_id))
self.consumer_repo.delete_entity_by_id.assert_has_calls(
calls, any_order=True
)
def test_should_pass_on_secret_delete_with_missing_consumers(self):
consumers = [self.consumer, self.consumer2]
ret_val = (consumers, 0, 0, 1)
self.consumer_repo.get_by_secret_id.return_value = ret_val
self.consumer_repo.delete_entity_by_id.side_effect = excep.NotFound
resp = self.app.delete(
'/secrets/{0}/'.format(self.secret.id)
)
self.assertEqual(204, resp.status_int)
# Verify consumers were deleted
calls = []
for consumer in consumers:
calls.append(mock.call(consumer.id, self.external_project_id))
self.consumer_repo.delete_entity_by_id.assert_has_calls(
calls, any_order=True
)
class WhenPerformingUnallowedOperationsOnSecretConsumers(FunctionalTest):
def setUp(self):
super(
WhenPerformingUnallowedOperationsOnSecretConsumers, self
).setUp()
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
self.app.extra_environ = get_barbican_env(self.external_project_id)
@property
def root(self):
self._init()
class RootController(object):
secrets = controllers.secrets.SecretsController()
return RootController()
def _init(self):
self.external_project_id = 'keystoneid1234'
self.project_internal_id = 'projectid1234'
# Set up mocked project
self.project = models.Project()
self.project.id = self.project_internal_id
self.project.external_id = self.external_project_id
# Set up mocked secret
self.secret = models.Secret()
self.secret.id = utils.generate_test_valid_uuid()
self.secret.project_id = self.project_internal_id
# Set up mocked secret consumers
self.consumer = create_secret_consumer(
self.secret.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer_ref = {
"service": self.consumer.service,
"resource_type": self.consumer.resource_type,
"resource_id": self.consumer.resource_type,
}
# Set up mocked project repo
self.project_repo = mock.MagicMock()
self.project_repo.get.return_value = self.project
self.setup_project_repository_mock(self.project_repo)
# Set up secret repo
self.secret_repo = mock.MagicMock()
self.secret_repo.get_secret_by_id.return_value = self.secret
self.setup_secret_repository_mock(self.secret_repo)
# Set up secret consumer repo
self.consumer_repo = mock.MagicMock()
self.consumer_repo.get_by_values.return_value = self.consumer
self.consumer_repo.delete_entity_by_id.return_value = None
self.setup_secret_consumer_repository_mock(self.consumer_repo)
def test_should_not_allow_put_on_consumers(self):
ret_val = ([self.consumer], 0, 0, 1)
self.consumer_repo.get_by_secret_id.return_value = ret_val
resp = self.app.put_json(
'/secrets/{0}/consumers/'.format(self.secret.id),
self.consumer_ref,
expect_errors=True
)
self.assertEqual(405, resp.status_int)
def test_should_not_allow_post_on_consumer_by_id(self):
self.consumer_repo.get.return_value = self.consumer
resp = self.app.post_json(
'/secrets/{0}/consumers/{1}/'.format(self.secret.id,
self.consumer.id),
self.consumer_ref,
expect_errors=True
)
self.assertEqual(405, resp.status_int)
def test_should_not_allow_put_on_consumer_by_id(self):
self.consumer_repo.get.return_value = self.consumer
resp = self.app.put_json(
'/secrets/{0}/consumers/{1}/'.format(self.secret.id,
self.consumer.id),
self.consumer_ref,
expect_errors=True
)
self.assertEqual(405, resp.status_int)
def test_should_not_allow_delete_on_consumer_by_id(self):
self.consumer_repo.get.return_value = self.consumer
resp = self.app.delete(
'/secrets/{0}/consumers/{1}/'.format(self.secret.id,
self.consumer.id),
expect_errors=True
)
self.assertEqual(405, resp.status_int)
class WhenOwnershipMismatchForSecretConsumer(FunctionalTest):
def setUp(self):
super(
WhenOwnershipMismatchForSecretConsumer, self
).setUp()
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
self.app.extra_environ = get_barbican_env(self.external_project_id)
@property
def root(self):
self._init()
class RootController(object):
secrets = controllers.secrets.SecretsController()
return RootController()
def _init(self):
self.external_project_id = 'keystoneid1234'
self.project_internal_id = 'projectid1234'
# Set up mocked project
self.project = models.Project()
self.project.id = self.project_internal_id
self.project.external_id = self.external_project_id
# Set up mocked secret
self.secret = models.Secret()
self.secret.id = utils.generate_test_valid_uuid()
self.secret.project = models.Project()
self.secret.project.external_id = "differentProjectId"
# Set up mocked consumer
self.consumer = create_secret_consumer(self.secret.id,
self.project_internal_id,
id_ref='consumerid1234')
self.consumer_ref = {
"service": self.consumer.service,
"resource_type": self.consumer.resource_type,
"resource_id": self.consumer.resource_type,
}
# Set up mocked project repo
self.project_repo = mock.MagicMock()
self.project_repo.get.return_value = self.project
self.setup_project_repository_mock(self.project_repo)
# Set up mocked secret repo
self.secret_repo = mock.MagicMock()
self.secret_repo.get.return_value = self.secret
self.secret_repo.get_secret_by_id.return_value = self.secret
self.setup_secret_repository_mock(self.secret_repo)
# Set up mocked secret consumer repo
self.consumer_repo = mock.MagicMock()
self.consumer_repo.get_by_values.return_value = self.consumer
self.consumer_repo.delete_entity_by_id.return_value = None
self.setup_secret_consumer_repository_mock(self.consumer_repo)
def test_consumer_check_ownership_mismatch(self):
resp = self.app.delete_json(
'/secrets/{0}/consumers/'.format(self.secret.id),
self.consumer_ref, expect_errors=True)
self.assertEqual(403, resp.status_int)

View File

@ -116,6 +116,14 @@ class PreferredSecretStoreResource(TestableResource):
controller_cls = secretstores.PreferredSecretStoreController
class SecretConsumersResource(TestableResource):
controller_cls = consumers.SecretConsumersController
class SecretConsumerResource(TestableResource):
controller_cls = consumers.SecretConsumerController
class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
def setUp(self):
@ -1275,3 +1283,99 @@ class WhenTestingPreferredSecretStoreResource(BaseTestCase):
def _invoke_on_post(self):
self.resource.on_post(self.req, self.resp)
class WhenTestingSecretConsumersResource(BaseTestCase):
"""RBAC tests for barbican.api.resources.SecretConsumersResource"""
def setUp(self):
super(WhenTestingSecretConsumersResource, self).setUp()
self.external_project_id = '12345project'
self.secret_id = '12345secret'
# Force an error on GET calls that pass RBAC, as we are not testing
# such flows in this test module.
self.consumer_repo = mock.MagicMock()
get_by_secret_id = mock.MagicMock(return_value=None,
side_effect=self
._generate_get_error())
self.consumer_repo.get_by_secret_id = get_by_secret_id
self.setup_project_repository_mock()
self.setup_secret_consumer_repository_mock(self.consumer_repo)
self.setup_secret_repository_mock()
self.resource = SecretConsumersResource(secret_id=self.secret_id)
def test_rules_should_be_loaded(self):
self.assertIsNotNone(self.policy_enforcer.rules)
def test_should_pass_create_consumer(self):
self._assert_pass_rbac(['admin'], self._invoke_on_post,
content_type='application/json')
def test_should_raise_create_consumer(self):
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
self._invoke_on_post,
content_type='application/json')
def test_should_pass_delete_consumer(self):
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
content_type='application/json')
def test_should_raise_delete_consumer(self):
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
self._invoke_on_delete)
def test_should_pass_get_consumers(self):
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
content_type='application/json')
def test_should_raise_get_consumers(self):
self._assert_fail_rbac([None, 'bogus'],
self._invoke_on_get,
content_type='application/json')
def _invoke_on_post(self):
self.resource.on_post(self.req, self.resp)
def _invoke_on_delete(self):
self.resource.on_delete(self.req, self.resp)
def _invoke_on_get(self):
self.resource.on_get(self.req, self.resp)
class WhenTestingSecretConsumerResource(BaseTestCase):
"""RBAC tests for barbican.api.resources.SecretConsumerResource"""
def setUp(self):
super(WhenTestingSecretConsumerResource, self).setUp()
self.external_project_id = '12345project'
self.consumer_id = '12345consumer'
# Force an error on GET calls that pass RBAC, as we are not testing
# such flows in this test module.
self.consumer_repo = mock.MagicMock()
fail_method = mock.MagicMock(return_value=None,
side_effect=self._generate_get_error())
self.consumer_repo.get = fail_method
self.setup_project_repository_mock()
self.setup_secret_consumer_repository_mock(self.consumer_repo)
self.resource = SecretConsumerResource(consumer_id=self.consumer_id)
def test_rules_should_be_loaded(self):
self.assertIsNotNone(self.policy_enforcer.rules)
def test_should_pass_get_consumer(self):
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get)
def test_should_raise_get_consumer(self):
self._assert_fail_rbac([None, 'bogus'],
self._invoke_on_get)
def _invoke_on_get(self):
self.resource.on_get(self.req, self.resp)

View File

@ -141,6 +141,20 @@ class MockModelRepositoryMixin(object):
mock_repo_obj=mock_container_consumer_repo,
patcher_obj=self.mock_container_consumer_repo_patcher)
def setup_secret_consumer_repository_mock(
self, mock_secret_consumer_repo=mock.MagicMock()):
"""Mocks the secret consumer repository factory function
:param mock_secret_consumer_repo: The pre-configured mock
secret consumer repo to be
returned.
"""
self.mock_secret_consumer_repo_patcher = None
self._setup_repository_mock(
repo_factory='get_secret_consumer_repository',
mock_repo_obj=mock_secret_consumer_repo,
patcher_obj=self.mock_secret_consumer_repo_patcher)
def setup_container_repository_mock(self,
mock_container_repo=mock.MagicMock()):
"""Mocks the container repository factory function

View File

@ -23,7 +23,7 @@ class SecretModel(base_models.BaseModel):
secret_ref=None, bit_length=None, mode=None, secret_type=None,
payload_content_type=None, payload=None, content_types=None,
payload_content_encoding=None, status=None, updated=None,
created=None, creator_id=None, metadata=None):
created=None, creator_id=None, metadata=None, consumers=None):
super(SecretModel, self).__init__()
self.name = name
@ -42,3 +42,4 @@ class SecretModel(base_models.BaseModel):
self.created = created
self.creator_id = creator_id
self.metadata = metadata
self.consumers = consumers