User Metadata API and tests

This patch adds the api functions for user metadata and tests for
those functions. It is the second of several patches which will
complete the "User Defined Metadata for Barbican Secrets" blueprint.

Other Patches will include:
1.) Documentation

Implements: blueprint add-user-metadata
Change-Id: Iaf83bf5a3b9ed2ea22d4cd3e83ba3a0d5d087adf
This commit is contained in:
Fernando Diaz 2016-02-03 11:58:01 -06:00
parent 52b0479fcc
commit ba3b7093ac
12 changed files with 1363 additions and 30 deletions

View File

@ -0,0 +1,180 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import pecan
from barbican import api
from barbican.api import controllers
from barbican.common import hrefs
from barbican.common import utils
from barbican.common import validators
from barbican import i18n as u
from barbican.model import repositories as repo
LOG = utils.getLogger(__name__)
def _secret_metadata_not_found():
"""Throw exception indicating secret metadata not found."""
pecan.abort(404, u._('Not Found. Sorry but your secret metadata is in '
'another castle.'))
class SecretMetadataController(controllers.ACLMixin):
"""Handles SecretMetadata requests by a given secret id."""
def __init__(self, secret):
LOG.debug('=== Creating SecretMetadataController ===')
self.secret = secret
self.secret_project_id = self.secret.project.external_id
self.secret_repo = repo.get_secret_repository()
self.user_meta_repo = repo.get_secret_user_meta_repository()
self.metadata_validator = validators.NewSecretMetadataValidator()
self.metadatum_validator = validators.NewSecretMetadatumValidator()
@pecan.expose(generic=True)
def index(self, **kwargs):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@index.when(method='GET', template='json')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Secret metadata retrieval'))
@controllers.enforce_rbac('secret_meta:get')
def on_get(self, external_project_id, **kwargs):
"""Handles retrieval of existing secret metadata requests."""
LOG.debug('Start secret metadata on_get '
'for secret-ID %s:', self.secret.id)
resp = self.user_meta_repo.get_metadata_for_secret(self.secret.id)
pecan.response.status = 200
return {"metadata": resp}
@index.when(method='PUT', template='json')
@controllers.handle_exceptions(u._('Secret metadata creation'))
@controllers.enforce_rbac('secret_meta:put')
@controllers.enforce_content_types(['application/json'])
def on_put(self, external_project_id, **kwargs):
"""Handles creation/update of secret metadata."""
data = api.load_body(pecan.request, validator=self.metadata_validator)
LOG.debug('Start secret metadata on_put...%s', data)
self.user_meta_repo.create_replace_user_metadata(self.secret.id,
data)
url = hrefs.convert_user_meta_to_href(self.secret.id)
LOG.debug('URI to secret metadata is %s', url)
pecan.response.status = 201
return {'metadata_ref': url}
@index.when(method='POST', template='json')
@controllers.handle_exceptions(u._('Secret metadatum creation'))
@controllers.enforce_rbac('secret_meta:post')
@controllers.enforce_content_types(['application/json'])
def on_post(self, external_project_id, **kwargs):
"""Handles creation of secret metadatum."""
data = api.load_body(pecan.request, validator=self.metadatum_validator)
key = data.get('key')
value = data.get('value')
metadata = self.user_meta_repo.get_metadata_for_secret(self.secret.id)
if key in metadata:
pecan.abort(409, u._('Conflict. Key in request is already in the '
'secret metadata'))
LOG.debug('Start secret metadatum on_post...%s', metadata)
self.user_meta_repo.create_replace_user_metadatum(self.secret.id,
key, value)
url = hrefs.convert_user_meta_to_href(self.secret.id)
LOG.debug('URI to secret metadata is %s', url)
pecan.response.status = 201
return {'metadata_ref': url + "/%s {key: %s, value:%s}" % (key,
key,
value)}
class SecretMetadatumController(controllers.ACLMixin):
def __init__(self, secret):
LOG.debug('=== Creating SecretMetadatumController ===')
self.user_meta_repo = repo.get_secret_user_meta_repository()
self.secret = secret
self.metadatum_validator = validators.NewSecretMetadatumValidator()
@pecan.expose(generic=True)
def index(self, **kwargs):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@index.when(method='GET', template='json')
@controllers.handle_exceptions(u._('Secret metadatum retrieval'))
@controllers.enforce_rbac('secret_meta:get')
def on_get(self, external_project_id, remainder, **kwargs):
"""Handles retrieval of existing secret metadatum."""
LOG.debug('Start secret metadatum on_get '
'for secret-ID %s:', self.secret.id)
metadata = self.user_meta_repo.get_metadata_for_secret(self.secret.id)
if remainder in metadata:
pecan.response.status = 200
pair = {'key': remainder, 'value': metadata[remainder]}
return collections.OrderedDict(sorted(pair.items()))
else:
_secret_metadata_not_found()
@index.when(method='PUT', template='json')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Secret metadatum update'))
@controllers.enforce_rbac('secret_meta:put')
@controllers.enforce_content_types(['application/json'])
def on_put(self, external_project_id, remainder, **kwargs):
"""Handles update of existing secret metadatum."""
metadata = self.user_meta_repo.get_metadata_for_secret(self.secret.id)
data = api.load_body(pecan.request, validator=self.metadatum_validator)
key = data.get('key')
value = data.get('value')
if remainder not in metadata:
_secret_metadata_not_found()
elif remainder != key:
msg = 'Key in request data does not match key in the '
'request url.'
pecan.abort(409, msg)
else:
LOG.debug('Start secret metadatum on_put...%s', metadata)
self.user_meta_repo.create_replace_user_metadatum(self.secret.id,
key, value)
pecan.response.status = 200
pair = {'key': key, 'value': value}
return collections.OrderedDict(sorted(pair.items()))
@index.when(method='DELETE', template='json')
@controllers.handle_exceptions(u._('Secret metadatum removal'))
@controllers.enforce_rbac('secret_meta:delete')
def on_delete(self, external_project_id, remainder, **kwargs):
"""Handles removal of existing secret metadatum."""
self.user_meta_repo.delete_metadatum(self.secret.id,
remainder)
msg = 'Deleted secret metadatum: %s for secret %s' % (remainder,
self.secret.id)
pecan.response.status = 204
LOG.info(msg)

View File

@ -16,6 +16,7 @@ from six.moves.urllib import parse
from barbican import api
from barbican.api import controllers
from barbican.api.controllers import acls
from barbican.api.controllers import secretmeta
from barbican.common import exception
from barbican.common import hrefs
from barbican.common import quota
@ -72,8 +73,16 @@ class SecretController(controllers.ACLMixin):
def _lookup(self, sub_resource, *remainder):
if sub_resource == 'acl':
return acls.SecretACLsController(self.secret), remainder
elif sub_resource == 'metadata':
if len(remainder) == 0 or remainder == ('',):
return secretmeta.SecretMetadataController(self.secret), \
remainder
else:
return secretmeta.SecretMetadatumController(self.secret), \
remainder
else:
pecan.abort(405) # only 'acl' as sub-resource is supported
# only 'acl' and 'metadata' as sub-resource is supported
pecan.abort(405)
@pecan.expose(generic=True)
def index(self, **kwargs):

View File

@ -89,6 +89,18 @@ class MissingMetadataField(BarbicanHTTPException):
status_code = 400
class InvalidMetadataRequest(BarbicanHTTPException):
message = u._("Invalid Metadata. Keys and Values must be Strings.")
client_message = message
status_code = 400
class InvalidMetadataKey(BarbicanHTTPException):
message = u._("Invalid Key. Key must be URL safe.")
client_message = message
status_code = 400
class InvalidSubjectDN(BarbicanHTTPException):
message = u._("Invalid subject DN: %(subject_dn)s")
client_message = message

View File

@ -46,6 +46,11 @@ def convert_consumer_to_href(consumer_id):
return convert_resource_id_to_href('consumers', consumer_id) + '/consumers'
def convert_user_meta_to_href(secret_id):
"""Convert the consumer ID to a HATEOAS-style href."""
return convert_resource_id_to_href('secrets', secret_id) + '/metadata'
def convert_certificate_authority_to_href(ca_id):
"""Convert the ca ID to a HATEOAS-style href."""
return convert_resource_id_to_href('cas', ca_id)

View File

@ -15,6 +15,7 @@ API JSON validators.
import abc
import base64
import re
import jsonschema as schema
from ldap3.core import exceptions as ldap_exceptions
@ -353,6 +354,104 @@ class NewSecretValidator(ValidatorBase):
return payload.strip()
class NewSecretMetadataValidator(ValidatorBase):
"""Validate new secret metadata."""
def __init__(self):
self.name = 'SecretMetadata'
self.schema = {
"type": "object",
"$schema": "http://json-schema.org/draft-03/schema",
"properties": {
"metadata": {"type": "object", "required": True},
}
}
def validate(self, json_data, parent_schema=None):
"""Validate the input JSON for the schema for secret metadata."""
schema_name = self._full_name(parent_schema)
self._assert_schema_is_valid(json_data, schema_name)
return self._extract_metadata(json_data)
def _extract_metadata(self, json_data):
"""Extracts and returns the metadata from the JSON data."""
metadata = json_data['metadata']
for key in metadata:
# make sure key is a string and url-safe.
if not isinstance(key, six.string_types):
raise exception.InvalidMetadataRequest()
self._check_string_url_safe(key)
# make sure value is a string.
value = metadata[key]
if not isinstance(value, six.string_types):
raise exception.InvalidMetadataRequest()
# If key is not lowercase, then change it
if not key.islower():
del metadata[key]
metadata[key.lower()] = value
return metadata
def _check_string_url_safe(self, string):
"""Checks if string can be part of a URL."""
if not re.match("^[A-Za-z0-9_-]*$", string):
raise exception.InvalidMetadataKey()
class NewSecretMetadatumValidator(ValidatorBase):
"""Validate new secret metadatum."""
def __init__(self):
self.name = 'SecretMetadatum'
self.schema = {
"type": "object",
"$schema": "http://json-schema.org/draft-03/schema",
"properties": {
"key": {
"type": "string",
"maxLength": 255,
"required": True
},
"value": {
"type": "string",
"maxLength": 255,
"required": True
},
},
"additionalProperties": False
}
def validate(self, json_data, parent_schema=None):
"""Validate the input JSON for the schema for secret metadata."""
schema_name = self._full_name(parent_schema)
self._assert_schema_is_valid(json_data, schema_name)
key = self._extract_key(json_data)
value = self._extract_value(json_data)
return {"key": key, "value": value}
def _extract_key(self, json_data):
"""Extracts and returns the metadata from the JSON data."""
key = json_data['key']
self._check_string_url_safe(key)
key = key.lower()
return key
def _extract_value(self, json_data):
"""Extracts and returns the metadata from the JSON data."""
value = json_data['value']
return value
def _check_string_url_safe(self, string):
"""Checks if string can be part of a URL."""
if not re.match("^[A-Za-z0-9_-]*$", string):
raise exception.InvalidMetadataKey()
class CACommonHelpersMixin(object):
def _validate_subject_dn_data(self, subject_dn):
"""Confirm that the subject_dn contains valid data

View File

@ -755,18 +755,13 @@ class SecretStoreMetadatumRepo(BaseRepo):
session = get_session()
try:
query = session.query(models.SecretStoreMetadatum)
query = query.filter_by(deleted=False)
query = session.query(models.SecretStoreMetadatum)
query = query.filter_by(deleted=False)
query = query.filter(
models.SecretStoreMetadatum.secret_id == secret_id)
metadata = query.all()
except sa_orm.exc.NoResultFound:
metadata = {}
query = query.filter(
models.SecretStoreMetadatum.secret_id == secret_id)
metadata = query.all()
return {m.key: m.value for m in metadata}
def _do_entity_name(self):
@ -789,38 +784,57 @@ class SecretUserMetadatumRepo(BaseRepo):
Stores key/value information on behalf of a Secret.
"""
def save(self, metadata, secret_model):
"""Saves the the specified metadata for the secret.
:raises NotFound if entity does not exist.
"""
def create_replace_user_metadata(self, secret_id, metadata):
"""Creates or replaces the the specified metadata for the secret."""
now = timeutils.utcnow()
session = get_session()
query = session.query(models.SecretUserMetadatum)
query = query.filter_by(secret_id=secret_id)
query.delete()
for k, v in metadata.items():
meta_model = models.SecretUserMetadatum(k, v)
meta_model.secret_id = secret_id
meta_model.updated_at = now
meta_model.secret = secret_model
meta_model.save()
meta_model.save(session=session)
def get_metadata_for_secret(self, secret_id):
"""Returns a dict of SecretUserMetadatum instances."""
session = get_session()
try:
query = session.query(models.SecretUserMetadatum)
query = query.filter_by(deleted=False)
query = session.query(models.SecretUserMetadatum)
query = query.filter_by(deleted=False)
query = query.filter(
models.SecretUserMetadatum.secret_id == secret_id)
metadata = query.all()
except sa_orm.exc.NoResultFound:
metadata = {}
query = query.filter(
models.SecretUserMetadatum.secret_id == secret_id)
metadata = query.all()
return {m.key: m.value for m in metadata}
def create_replace_user_metadatum(self, secret_id, key, value):
now = timeutils.utcnow()
session = get_session()
query = session.query(models.SecretUserMetadatum)
query = query.filter_by(secret_id=secret_id)
query = query.filter_by(key=key)
query.delete()
meta_model = models.SecretUserMetadatum(key, value)
meta_model.secret_id = secret_id
meta_model.updated_at = now
meta_model.save(session=session)
def delete_metadatum(self, secret_id, key):
"""Removes a key from a SecretUserMetadatum instances."""
session = get_session()
query = session.query(models.SecretUserMetadatum)
query = query.filter_by(secret_id=secret_id)
query = query.filter_by(key=key)
query.delete()
def _do_entity_name(self):
"""Sub-class hook: return entity name, such as for debugging."""
return "SecretUserMetadatum"

View File

@ -0,0 +1,341 @@
# Copyright (c) 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
import os
import uuid
from barbican.tests import utils
@utils.parameterized_test_case
class WhenTestingSecretMetadataResource(utils.BarbicanAPIBaseTestCase):
def setUp(self):
super(WhenTestingSecretMetadataResource, self).setUp()
self.valid_metadata = {
"metadata": {
"latitude": "30.393805",
"longitude": "-97.724077"
}
}
def test_create_secret_metadata(self):
secret_resp, secret_uuid = create_secret(self.app)
meta_resp = create_secret_metadata(self.app,
self.valid_metadata,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
self.assertIsNotNone(meta_resp.json)
def test_can_get_secret_metadata(self):
secret_resp, secret_uuid = create_secret(self.app)
meta_resp = create_secret_metadata(self.app,
self.valid_metadata,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
get_resp = self.app.get('/secrets/%s/metadata' % secret_resp)
self.assertEqual(200, get_resp.status_int)
self.assertEqual(self.valid_metadata, get_resp.json)
def test_get_secret_metadata_invalid_secret_should_fail(self):
secret_resp, secret_uuid = create_secret(self.app)
create_secret_metadata(self.app,
self.valid_metadata,
secret_resp)
get_resp = self.app.get('/secrets/%s/metadata' % uuid.uuid4().hex,
expect_errors=True)
self.assertEqual(404, get_resp.status_int)
@utils.parameterized_test_case
class WhenTestingSecretMetadatumResource(utils.BarbicanAPIBaseTestCase):
def setUp(self):
super(WhenTestingSecretMetadatumResource, self).setUp()
self.valid_metadata = {
"metadata": {
"latitude": "30.393805",
"longitude": "-97.724077"
}
}
self.updated_valid_metadata = {
"metadata": {
"latitude": "30.393805",
"longitude": "-97.724077",
"access-limit": "2"
}
}
self.valid_metadatum = {
'key': 'access-limit',
'value': '2'
}
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
'get_metadata_for_secret')
def test_can_create_secret_metadatum(self, mocked_get):
secret_resp, secret_uuid = create_secret(self.app)
mocked_get.return_value = self.valid_metadata['metadata']
meta_resp = create_secret_metadatum(self.app,
self.valid_metadatum,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
self.assertIsNotNone(meta_resp.json)
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
'get_metadata_for_secret')
def test_conflict_create_same_key_secret_metadatum(self, mocked_get):
secret_resp, secret_uuid = create_secret(self.app)
mocked_get.return_value = self.valid_metadata['metadata']
latitude_metadatum = {
"key": "latitude",
"value": "30.393805"
}
meta_resp = create_secret_metadatum(self.app,
latitude_metadatum,
secret_resp,
expect_errors=True)
self.assertEqual(409, meta_resp.status_int)
self.assertIsNotNone(meta_resp.json)
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
'get_metadata_for_secret')
def test_can_delete_secret_metadatum(self, mocked_get):
secret_resp, secret_uuid = create_secret(self.app)
mocked_get.return_value = self.valid_metadata['metadata']
meta_resp = create_secret_metadatum(self.app,
self.valid_metadatum,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
delete_resp = self.app.delete('/secrets/%s/metadata/access-limit' %
secret_resp)
self.assertEqual(204, delete_resp.status_int)
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
'get_metadata_for_secret')
def test_can_get_secret_metadatum(self, mocked_get):
secret_resp, secret_uuid = create_secret(self.app)
mocked_get.return_value = self.valid_metadata['metadata']
meta_resp = create_secret_metadatum(self.app,
self.valid_metadatum,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
mocked_get.return_value = self.updated_valid_metadata['metadata']
get_resp = self.app.get('/secrets/%s/metadata/access-limit' %
secret_resp)
self.assertEqual(200, get_resp.status_int)
self.assertEqual(self.valid_metadatum, get_resp.json)
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
'get_metadata_for_secret')
def test_get_secret_metadatum_not_found(self, mocked_get):
secret_resp, secret_uuid = create_secret(self.app)
mocked_get.return_value = self.valid_metadata['metadata']
meta_resp = create_secret_metadatum(self.app,
self.valid_metadatum,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
mocked_get.return_value = self.updated_valid_metadata['metadata']
get_resp = self.app.get('/secrets/%s/metadata/nothere' %
secret_resp,
expect_errors=True)
self.assertEqual(404, get_resp.status_int)
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
'get_metadata_for_secret')
def test_can_update_secret_metadatum(self, mocked_get):
secret_resp, secret_uuid = create_secret(self.app)
mocked_get.return_value = self.valid_metadata['metadata']
meta_resp = create_secret_metadatum(self.app,
self.valid_metadatum,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
new_metadatum = {
'key': 'access-limit',
'value': '5'
}
new_metadatum_json = json.dumps(new_metadatum)
mocked_get.return_value = self.updated_valid_metadata['metadata']
put_resp = self.app.put('/secrets/%s/metadata/access-limit' %
secret_resp,
new_metadatum_json,
headers={'Content-Type': 'application/json'})
self.assertEqual(200, put_resp.status_int)
self.assertEqual(new_metadatum, put_resp.json)
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
'get_metadata_for_secret')
def test_can_update_secret_metadatum_not_found(self, mocked_get):
secret_resp, secret_uuid = create_secret(self.app)
mocked_get.return_value = self.valid_metadata['metadata']
meta_resp = create_secret_metadatum(self.app,
self.valid_metadatum,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
new_metadatum = {
'key': 'newwwww',
'value': '5'
}
new_metadatum_json = json.dumps(new_metadatum)
mocked_get.return_value = self.updated_valid_metadata['metadata']
put_resp = self.app.put('/secrets/%s/metadata/newwwww' %
secret_resp,
new_metadatum_json,
headers={'Content-Type': 'application/json'},
expect_errors=True)
self.assertEqual(404, put_resp.status_int)
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
'get_metadata_for_secret')
def test_conflict_update_secret_metadatum(self, mocked_get):
secret_resp, secret_uuid = create_secret(self.app)
mocked_get.return_value = self.valid_metadata['metadata']
meta_resp = create_secret_metadatum(self.app,
self.valid_metadatum,
secret_resp)
self.assertEqual(201, meta_resp.status_int)
new_metadatum = {
'key': 'snoop',
'value': '5'
}
new_metadatum_json = json.dumps(new_metadatum)
mocked_get.return_value = self.updated_valid_metadata['metadata']
put_resp = self.app.put('/secrets/%s/metadata/access-limit' %
secret_resp,
new_metadatum_json,
headers={'Content-Type': 'application/json'},
expect_errors=True)
self.assertEqual(409, put_resp.status_int)
def test_returns_405_for_delete_on_metadata(self):
secret_id, secret_resp = create_secret(self.app)
resp = self.app.delete('/secrets/{0}/metadata/'.format(secret_id),
expect_errors=True)
self.assertEqual(405, resp.status_int)
# ----------------------- Helper Functions ---------------------------
def create_secret(app, name=None, algorithm=None, bit_length=None, mode=None,
expiration=None, payload='not-encrypted',
content_type='text/plain',
content_encoding=None, transport_key_id=None,
transport_key_needed=None, expect_errors=False):
request = {
'name': name,
'algorithm': algorithm,
'bit_length': bit_length,
'mode': mode,
'expiration': expiration,
'payload': payload,
'payload_content_type': content_type,
'payload_content_encoding': content_encoding,
'transport_key_id': transport_key_id,
'transport_key_needed': transport_key_needed
}
cleaned_request = {key: val for key, val in request.items()
if val is not None}
resp = app.post_json(
'/secrets/',
cleaned_request,
expect_errors=expect_errors
)
created_uuid = None
if resp.status_int == 201:
secret_ref = resp.json.get('secret_ref', '')
_, created_uuid = os.path.split(secret_ref)
return created_uuid, resp
def create_secret_metadata(app, metadata, secret_uuid,
expect_errors=False):
request = {}
for metadatum in metadata:
request[metadatum] = metadata.get(metadatum)
cleaned_request = {key: val for key, val in request.items()
if val is not None}
url = '/secrets/%s/metadata/' % secret_uuid
resp = app.put_json(
url,
cleaned_request,
expect_errors=expect_errors
)
return resp
def create_secret_metadatum(app, metadata, secret_uuid, remainder=None,
update=False, expect_errors=False):
request = {}
for metadatum in metadata:
request[metadatum] = metadata.get(metadatum)
cleaned_request = {key: val for key, val in request.items()
if val is not None}
url = '/secrets/%s/metadata/' % secret_uuid
if remainder:
url = url + remainder
if update:
resp = app.put_json(
url,
cleaned_request,
expect_errors=expect_errors
)
else:
resp = app.post_json(
url,
cleaned_request,
expect_errors=expect_errors
)
return resp

View File

@ -1741,5 +1741,219 @@ class WhenTestingNewCAValidator(utils.BaseTestCase):
)
@utils.parameterized_test_case
class WhenTestingSecretMetadataValidator(utils.BaseTestCase):
def setUp(self):
super(WhenTestingSecretMetadataValidator, self).setUp()
self.top_key = 'metadata'
self.key1 = 'city'
self.value1 = 'Austin'
self.key2 = 'state'
self.value2 = 'Texas'
self.key3 = 'country'
self.value3 = 'USA'
self.metadata_req = {
self.top_key: {
self.key1: self.value1,
self.key2: self.value2,
self.key3: self.value3
}
}
self.validator = validators.NewSecretMetadataValidator()
def test_should_validate_all_fields(self):
self.validator.validate(self.metadata_req)
def test_should_validate_all_fields_and_make_key_lowercase(self):
self.key1 = "DOgg"
self.value1 = "poodle"
self.metadata_req = {
self.top_key: {
self.key1: self.value1,
self.key2: self.value2,
self.key3: self.value3
}
}
metadata = self.validator.validate(self.metadata_req)
self.assertNotIn("DOgg", metadata.keys())
self.assertIn("dogg", metadata.keys())
def test_should_validate_no_keys(self):
del self.metadata_req[self.top_key][self.key1]
del self.metadata_req[self.top_key][self.key2]
del self.metadata_req[self.top_key][self.key3]
self.validator.validate(self.metadata_req)
def test_should_raise_invalid_key_no_metadata(self):
del self.metadata_req[self.top_key]
exception = self.assertRaises(excep.InvalidObject,
self.validator.validate,
self.metadata_req)
self.assertIn("metadata' is a required property",
six.text_type(exception))
def test_should_raise_invalid_key_non_string(self):
self.key1 = 0
metadata_req = {
self.top_key: {
self.key1: self.value1
}
}
exception = self.assertRaises(excep.InvalidMetadataRequest,
self.validator.validate,
metadata_req)
self.assertIn("Invalid Metadata. Keys and Values must be Strings.",
six.text_type(exception))
def test_should_raise_invalid_key_non_url_safe_string(self):
self.key1 = "key/01"
metadata_req = {
self.top_key: {
self.key1: self.value1
}
}
exception = self.assertRaises(excep.InvalidMetadataKey,
self.validator.validate,
metadata_req)
self.assertIn("Invalid Key. Key must be URL safe.",
six.text_type(exception))
def test_should_raise_invalid_value_non_string(self):
self.value1 = 0
metadata_req = {
self.top_key: {
self.key1: self.value1
}
}
exception = self.assertRaises(excep.InvalidMetadataRequest,
self.validator.validate,
metadata_req)
self.assertIn("Invalid Metadata. Keys and Values must be Strings.",
six.text_type(exception))
@utils.parameterized_test_case
class WhenTestingSecretMetadatumValidator(utils.BaseTestCase):
def setUp(self):
super(WhenTestingSecretMetadatumValidator, self).setUp()
self.key1 = 'key'
self.value1 = 'city'
self.key2 = 'value'
self.value2 = 'Austin'
self.metadata_req = {
self.key1: self.value1,
self.key2: self.value2
}
self.validator = validators.NewSecretMetadatumValidator()
def test_should_validate_all_fields(self):
self.validator.validate(self.metadata_req)
def test_should_validate_all_fields_and_make_key_lowercase(self):
self.value1 = "DOgg"
self.value2 = "poodle"
self.metadata_req = {
self.key1: self.value1,
self.key2: self.value2
}
metadata = self.validator.validate(self.metadata_req)
self.assertEqual("dogg", metadata['key'])
def test_should_raise_invalid_empty(self):
del self.metadata_req[self.key1]
del self.metadata_req[self.key2]
exception = self.assertRaises(excep.InvalidObject,
self.validator.validate,
self.metadata_req)
self.assertIn("Provided object does not match schema "
"'SecretMetadatum'",
six.text_type(exception))
def test_should_raise_invalid_key_no_key(self):
del self.metadata_req[self.key2]
exception = self.assertRaises(excep.InvalidObject,
self.validator.validate,
self.metadata_req)
self.assertIn("Provided object does not match schema "
"'SecretMetadatum'",
six.text_type(exception))
def test_should_raise_invalid_key_no_value(self):
del self.metadata_req[self.key1]
exception = self.assertRaises(excep.InvalidObject,
self.validator.validate,
self.metadata_req)
self.assertIn("Provided object does not match schema "
"'SecretMetadatum'",
six.text_type(exception))
def test_should_raise_invalid_key_non_string(self):
self.value1 = 0
metadata_req = {
self.key1: self.value1,
self.key2: self.value2
}
exception = self.assertRaises(excep.InvalidObject,
self.validator.validate,
metadata_req)
self.assertIn("Provided object does not match schema "
"'SecretMetadatum'",
six.text_type(exception))
def test_should_raise_invalid_key_non_url_safe_string(self):
self.value1 = "key/01"
metadata_req = {
self.key1: self.value1,
self.key2: self.value2
}
exception = self.assertRaises(excep.InvalidMetadataKey,
self.validator.validate,
metadata_req)
self.assertIn("Invalid Key. Key must be URL safe.",
six.text_type(exception))
def test_should_raise_invalid_value_non_string(self):
self.value2 = 0
metadata_req = {
self.key1: self.value1,
self.key2: self.value2
}
exception = self.assertRaises(excep.InvalidObject,
self.validator.validate,
metadata_req)
self.assertIn("Provided object does not match schema "
"'SecretMetadatum'",
six.text_type(exception))
def test_should_raise_invalid_extra_sent_key(self):
self.value2 = 0
metadata_req = {
self.key1: self.value1,
self.key2: self.value2,
"extra_key": "extra_value"
}
exception = self.assertRaises(excep.InvalidObject,
self.validator.validate,
metadata_req)
self.assertIn("Provided object does not match schema "
"'SecretMetadatum'",
six.text_type(exception))
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,121 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from barbican.model import models
from barbican.model import repositories
from barbican.tests import database_utils
from barbican.tests import utils
@utils.parameterized_test_case
class WhenTestingSecretMetadataRepository(database_utils.RepositoryTestCase):
def setUp(self):
super(WhenTestingSecretMetadataRepository, self).setUp()
self.repo = repositories.SecretUserMetadatumRepo()
self.test_metadata = {
"dog": "poodle",
"cat": "siamese"
}
def _create_base_secret(self, project_id=None):
# Setup the secret and needed base relationship
secret_repo = repositories.get_secret_repository()
session = secret_repo.get_session()
if project_id is None: # don't re-create project if it created earlier
project = models.Project()
project.external_id = "keystone_project_id"
project.save(session=session)
project_id = project.id
secret_model = models.Secret()
secret_model.project_id = project_id
secret = secret_repo.create_from(secret_model, session=session)
secret.save(session=session)
session.commit()
return secret
def test_create_and_get_metadata_for_secret(self):
secret = self._create_base_secret()
self.repo.create_replace_user_metadata(secret.id,
self.test_metadata)
metadata = self.repo.get_metadata_for_secret(secret.id)
self.assertEqual(self.test_metadata, metadata)
def test_get_metadata_invalid_secret(self):
metadata = self.repo.get_metadata_for_secret("invalid_id")
self.assertEqual({}, metadata)
def test_create_user_metadatum(self):
secret = self._create_base_secret()
self.repo.create_replace_user_metadata(secret.id,
self.test_metadata)
# adds a new key
self.repo.create_replace_user_metadatum(secret.id,
'lizard',
'green anole')
self.test_metadata['lizard'] = 'green anole'
metadata = self.repo.get_metadata_for_secret(secret.id)
self.assertEqual(self.test_metadata, metadata)
def test_replace_user_metadatum(self):
secret = self._create_base_secret()
self.repo.create_replace_user_metadata(secret.id,
self.test_metadata)
# updates existing key
self.repo.create_replace_user_metadatum(secret.id,
'dog',
'rat terrier')
self.test_metadata['dog'] = 'rat terrier'
metadata = self.repo.get_metadata_for_secret(secret.id)
self.assertEqual(self.test_metadata, metadata)
def test_delete_user_metadatum(self):
secret = self._create_base_secret()
self.repo.create_replace_user_metadata(secret.id,
self.test_metadata)
# deletes existing key
self.repo.delete_metadatum(secret.id,
'cat')
del self.test_metadata['cat']
metadata = self.repo.get_metadata_for_secret(secret.id)
self.assertEqual(self.test_metadata, metadata)
def test_delete_secret_deletes_secret_metadata(self):
secret = self._create_base_secret()
self.repo.create_replace_user_metadata(secret.id,
self.test_metadata)
metadata = self.repo.get_metadata_for_secret(secret.id)
self.assertEqual(self.test_metadata, metadata)
# deletes existing secret
secret.delete()
metadata = self.repo.get_metadata_for_secret(secret.id)
self.assertEqual({}, metadata)

View File

@ -74,5 +74,9 @@
"quotas:get": "rule:all_users",
"project_quotas:get": "rule:service_admin",
"project_quotas:put": "rule:service_admin",
"project_quotas:delete": "rule:service_admin"
"project_quotas:delete": "rule:service_admin",
"secret_meta:get": "rule:all_but_audit",
"secret_meta:post": "rule:admin_or_creator",
"secret_meta:put": "rule:admin_or_creator",
"secret_meta:delete": "rule:admin_or_creator"
}

View File

@ -0,0 +1,127 @@
"""
Copyright 2016 IBM
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
from functionaltests.api.v1.behaviors import base_behaviors
class SecretMetadataBehaviors(base_behaviors.BaseBehaviors):
def create_or_update_metadata(self, secret_ref, data, extra_headers=None,
use_auth=True, user_name=None, admin=None):
meta_ref = '%s/metadata' % secret_ref
data = json.dumps(data)
resp = self.client.put(meta_ref, data=data,
extra_headers=extra_headers, use_auth=use_auth,
user_name=user_name)
# handle expected JSON parsing errors for unauthenticated requests
if resp.status_code == 401 and not use_auth:
return resp, None
returned_data = self.get_json(resp)
metadata_ref = returned_data.get('metadata_ref')
if metadata_ref:
if admin is None:
admin = user_name
self.created_entities.append((metadata_ref, admin))
return resp, metadata_ref
def get_metadata(self, secret_ref, extra_headers=None, use_auth=True,
user_name=None, admin=None):
meta_ref = '%s/metadata' % secret_ref
resp = self.client.get(meta_ref, extra_headers=extra_headers,
user_name=user_name, use_auth=use_auth)
# handle expected JSON parsing errors for unauthenticated requests
if resp.status_code == 401 and not use_auth:
return resp, None
returned_data = self.get_json(resp)
metadata_ref = returned_data.get('metadata_ref')
if metadata_ref:
if admin is None:
admin = user_name
self.created_entities.append((metadata_ref, admin))
return resp
def create_metadatum(self, secret_ref, data, extra_headers=None,
use_auth=True, user_name=None, admin=None):
meta_key_ref = '%s/%s' % (secret_ref, 'metadata')
data = json.dumps(data)
resp = self.client.post(meta_key_ref, data=data,
extra_headers=extra_headers, use_auth=use_auth,
user_name=user_name)
# handle expected JSON parsing errors for unauthenticated requests
if resp.status_code == 401 and not use_auth:
return resp, None
returned_data = self.get_json(resp)
metadata_ref = returned_data.get('metadata_ref')
if metadata_ref:
if admin is None:
admin = user_name
self.created_entities.append((metadata_ref, admin))
return resp, metadata_ref
def update_metadatum(self, secret_ref, metadata_key, data,
extra_headers=None, use_auth=True, user_name=None,
admin=None):
meta_key_ref = '%s/%s/%s' % (secret_ref, 'metadata', metadata_key)
data = json.dumps(data)
resp = self.client.put(meta_key_ref, data=data,
extra_headers=extra_headers, use_auth=use_auth,
user_name=user_name)
# handle expected JSON parsing errors for unauthenticated requests
if resp.status_code == 401 and not use_auth:
return resp, None
return resp
def get_metadatum(self, secret_ref, metadata_key, extra_headers=None,
use_auth=True, user_name=None, admin=None):
meta_key_ref = '%s/%s/%s' % (secret_ref, 'metadata', metadata_key)
resp = self.client.get(meta_key_ref, extra_headers=extra_headers,
user_name=user_name, use_auth=use_auth)
# handle expected JSON parsing errors for unauthenticated requests
if resp.status_code == 401 and not use_auth:
return resp, None
returned_data = self.get_json(resp)
metadata_ref = returned_data.get('metadata_ref')
if metadata_ref:
if admin is None:
admin = user_name
self.created_entities.append((metadata_ref, admin))
return resp
def delete_metadatum(self, secret_ref, metadata_key, extra_headers=None,
use_auth=True, user_name=None, admin=None):
meta_key_ref = '%s/%s/%s' % (secret_ref, 'metadata', metadata_key)
resp = self.client.delete(meta_key_ref, extra_headers=extra_headers,
user_name=user_name, use_auth=use_auth)
return resp

View File

@ -0,0 +1,207 @@
# Copyright (c) 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from testtools import testcase
import uuid
from barbican.tests import utils
from functionaltests.api import base
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.behaviors import secretmeta_behaviors
from functionaltests.api.v1.models import secret_models
@utils.parameterized_test_case
class SecretMetadataTestCase(base.TestCase):
def setUp(self):
super(SecretMetadataTestCase, self).setUp()
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
self.behaviors = secretmeta_behaviors.SecretMetadataBehaviors(
self.client)
self.default_secret_create_all_none_data = {
"name": None,
"expiration": None,
"algorithm": None,
"bit_length": None,
"mode": None,
"payload": None,
"payload_content_type": None,
"payload_content_encoding": None,
}
self.valid_metadata = {
"metadata": {
"latitude": "30.393805",
"longitude": "-97.724077"
}
}
self.invalid_metadata = {
"metadataaaaaaaa": {
"latitude": "30.393805",
"longitude": "-97.724077"
}
}
self.valid_metadatum_key = 'access-limit'
self.valid_metadatum = {
'key': self.valid_metadatum_key,
'value': '2'
}
def tearDown(self):
self.secret_behaviors.delete_all_created_secrets()
super(SecretMetadataTestCase, self).tearDown()
@testcase.attr('positive')
def test_secret_metadata_create(self):
test_model = secret_models.SecretModel(
**self.default_secret_create_all_none_data)
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
meta_resp, metadata_ref = self.behaviors.create_or_update_metadata(
secret_ref, self.valid_metadata)
self.assertEqual(meta_resp.status_code, 201)
self.assertEqual(secret_ref + '/metadata', metadata_ref)
@testcase.attr('negative')
def test_secret_metadata_create_no_secret(self):
secret_ref = 'http://localhost:9311/secrets/%s' % uuid.uuid4().hex
meta_resp, metadata_ref = self.behaviors.create_or_update_metadata(
secret_ref, self.invalid_metadata)
self.assertEqual(meta_resp.status_code, 404)
@testcase.attr('positive')
def test_secret_metadata_get(self):
test_model = secret_models.SecretModel(
**self.default_secret_create_all_none_data)
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
meta_resp, metadata_ref = self.behaviors.create_or_update_metadata(
secret_ref, self.valid_metadata)
self.assertEqual(meta_resp.status_code, 201)
self.assertEqual(secret_ref + '/metadata', metadata_ref)
get_resp = self.behaviors.get_metadata(secret_ref)
self.assertEqual(get_resp.status_code, 200)
self.assertEqual(get_resp.content, json.dumps(self.valid_metadata))
@testcase.attr('negative')
def test_secret_metadata_get_no_secret(self):
secret_ref = 'http://localhost:9311/secrets/%s' % uuid.uuid4().hex
get_resp = self.behaviors.get_metadata(secret_ref)
self.assertEqual(get_resp.status_code, 404)
@testcase.attr('positive')
def test_secret_metadatum_create(self):
test_model = secret_models.SecretModel(
**self.default_secret_create_all_none_data)
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
meta_resp, metadata_ref = self.behaviors.create_metadatum(
secret_ref, self.valid_metadatum)
self.assertEqual(meta_resp.status_code, 201)
@testcase.attr('positive')
def test_secret_metadatum_update(self):
test_model = secret_models.SecretModel(
**self.default_secret_create_all_none_data)
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
meta_resp, metadata_ref = self.behaviors.create_metadatum(
secret_ref, self.valid_metadatum)
self.assertEqual(meta_resp.status_code, 201)
updated_meta = {
'key': self.valid_metadatum_key,
'value': '10'
}
put_resp = self.behaviors.update_metadatum(
secret_ref, self.valid_metadatum_key, updated_meta)
self.assertEqual(put_resp.status_code, 200)
@testcase.attr('positive')
def test_secret_metadatum_get(self):
test_model = secret_models.SecretModel(
**self.default_secret_create_all_none_data)
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
meta_resp, metadata_ref = self.behaviors.create_metadatum(
secret_ref, self.valid_metadatum)
self.assertEqual(meta_resp.status_code, 201)
get_resp = self.behaviors.get_metadatum(secret_ref,
self.valid_metadatum_key)
self.assertEqual(get_resp.status_code, 200)
self.assertEqual(get_resp.content, json.dumps(self.valid_metadatum,
sort_keys=True))
@testcase.attr('negative')
def test_secret_metadatum_get_wrong_key(self):
test_model = secret_models.SecretModel(
**self.default_secret_create_all_none_data)
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
meta_resp, metadata_ref = self.behaviors.create_metadatum(
secret_ref, self.valid_metadatum)
self.assertEqual(meta_resp.status_code, 201)
get_resp = self.behaviors.get_metadatum(secret_ref,
'other_key')
self.assertEqual(get_resp.status_code, 404)
@testcase.attr('positive')
def test_secret_metadatum_delete(self):
test_model = secret_models.SecretModel(
**self.default_secret_create_all_none_data)
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
meta_resp, metadata_ref = self.behaviors.create_metadatum(
secret_ref, self.valid_metadatum)
self.assertEqual(meta_resp.status_code, 201)
get_resp = self.behaviors.get_metadatum(secret_ref,
self.valid_metadatum_key)
self.assertEqual(get_resp.status_code, 200)
delete_resp = self.behaviors.delete_metadatum(secret_ref,
self.valid_metadatum_key)
self.assertEqual(delete_resp.status_code, 204)