Add Castellan client

Adds the client that uses Castellan to communicate with the
key manager.

Change-Id: Id63fe6bec0c3180382dc8017a4deacbf87ebd890
This commit is contained in:
Kaitlin Farr 2017-12-12 11:16:49 -05:00
parent 2a90199ded
commit d1e8f7c93b
4 changed files with 381 additions and 0 deletions

140
castellan_ui/api/client.py Normal file
View File

@ -0,0 +1,140 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import base64
import logging
from oslo_context import context
from castellan.common import exception as castellan_exception
from castellan.common.objects import key as key_type
from castellan.common.objects import opaque_data
from castellan.common.objects import passphrase
from castellan.common.objects import x_509
from castellan import key_manager as key_manager_api
from horizon import exceptions
from horizon.utils.memoized import memoized_with_request
LOG = logging.getLogger(__name__)
GENERATE_ATTRIBUTES = ['algorithm', 'length', 'name']
IMPORT_KEY_ATTRIBUTES = ['algorithm', 'bit_length', 'name',
'key']
IMPORT_CERT_ATTRIBUTES = ['name', 'data']
IMPORT_PASSPHRASE_ATTRIBUTES = ['name', 'passphrase']
IMPORT_DATA_ATTRIBUTES = ['name', 'data']
def key_manager():
return key_manager_api.API()
def get_auth_params_from_request(request):
return(
request.user.token.id,
request.user.tenant_id,
)
@memoized_with_request(get_auth_params_from_request)
def get_context(request_auth_params):
token_id, tenant_id = request_auth_params
return context.RequestContext(auth_token=token_id,
tenant=tenant_id)
def import_object(request, **kwargs):
args = {}
try:
object_type = kwargs.pop('object_type')
except TypeError:
raise exceptions.BadRequest("Object type must be included in kwargs")
for (key, value) in kwargs.items():
if key in ['data', 'key']:
# the data was passed in b64 encoded because some of the bytes
# were changed when the raw bytes were passed from the form
value = base64.b64decode(value)
if (issubclass(object_type, key_type.Key) and
key in IMPORT_KEY_ATTRIBUTES):
args[str(key)] = value
elif object_type == x_509.X509 and key in IMPORT_CERT_ATTRIBUTES:
args[str(key)] = value
elif (object_type == passphrase.Passphrase and
key in IMPORT_PASSPHRASE_ATTRIBUTES):
args[str(key)] = value
elif (object_type == opaque_data.OpaqueData and
key in IMPORT_DATA_ATTRIBUTES):
args[str(key)] = value
else:
raise exceptions.BadRequest(
"Attribute must be in %s" % ",".join(IMPORT_KEY_ATTRIBUTES))
key = object_type(**args)
created_uuid = key_manager().store(get_context(request), key)
return created_uuid
def generate_symmetric_key(request, **kwargs):
args = {}
for (key, value) in kwargs.items():
if key in GENERATE_ATTRIBUTES:
args[str(key)] = value
else:
raise exceptions.BadRequest(
"Key must be in %s" % ",".join(GENERATE_ATTRIBUTES))
created_uuid = key_manager().create_key(get_context(request),
**args)
return created_uuid
def generate_key_pair(request, **kwargs):
args = {}
for (key, value) in kwargs.items():
if key in GENERATE_ATTRIBUTES:
args[str(key)] = value
else:
raise exceptions.BadRequest(
"Key must be in %s" % ",".join(GENERATE_ATTRIBUTES))
private_uuid, public_uuid = key_manager().create_key_pair(
get_context(request),
**args)
return "{priv} + {pub}".format(priv=private_uuid, pub=public_uuid)
def delete(request, id):
deleted = key_manager().delete(get_context(request), id)
return deleted
def list(request, object_type=None):
try:
list = key_manager().list(
get_context(request), object_type=object_type, metadata_only=True)
except castellan_exception.KeyManagerError as e:
raise exceptions.BadRequest("Could not list objects: %s" % e.message)
return list
def get(request, id):
show = key_manager().get(get_context(request), id)
return show

View File

View File

@ -0,0 +1,188 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import mock
import uuid
from castellan.common.objects import symmetric_key
from castellan_ui.api import client as api
from castellan_ui.test import helpers as base
from horizon import exceptions
class ClientApiTests(base.APITestCase):
def setUp(self):
super(self.__class__, self).setUp()
self.ctxt = api.get_context(self.request)
def test_get_auth_params_from_request(self):
token, tenant = api.get_auth_params_from_request(self.request)
self.assertEqual(self.token.id, token)
self.assertEqual(self.tenant.id, tenant)
def test_get_context(self):
ctxt = api.get_context(self.request)
self.assertEqual(ctxt.auth_token, self.token.id)
self.assertEqual(ctxt.tenant, self.tenant.id)
self.assertEqual(ctxt, self.ctxt)
def test_import_object(self):
algorithm = "AES"
bit_length = 48
name = None
key = b'deadbeef'
key_b64 = base64.b64encode(key)
actual_uuid = api.import_object(
self.request,
algorithm=algorithm,
bit_length=bit_length,
name=name,
key=key_b64,
object_type=symmetric_key.SymmetricKey)
self.key_manager.store.assert_called_once()
args, kwargs = self.key_manager.store.call_args
actual_ctxt, actual_key = args
self.assertEqual(actual_uuid, self.mock_uuid1)
self.assertEqual(actual_ctxt, self.ctxt)
self.assertEqual(actual_key.algorithm, algorithm)
self.assertEqual(actual_key.bit_length, bit_length)
self.assertEqual(actual_key.name, name)
self.assertEqual(actual_key.get_encoded(), key)
def test_import_object_includes_invalid_param(self):
algorithm = "AES"
bit_length = 256
name = None
key = b'deadbeef'
other_value = "other_value"
with self.assertRaises(exceptions.BadRequest):
api.import_object(
self.request,
algorithm=algorithm,
bit_length=bit_length,
name=name,
key=key,
object_type=symmetric_key.SymmetricKey,
other_value=other_value)
def test_generate_symmetric_key(self):
algorithm = "AES"
length = 256
name = None
actual_uuid = api.generate_symmetric_key(
self.request,
algorithm=algorithm,
length=length,
name=name)
self.key_manager.create_key.assert_called_once()
args, kwargs = self.key_manager.create_key.call_args
(actual_ctxt,) = args
actual_algorithm = kwargs.get("algorithm")
actual_length = kwargs.get("length")
actual_name = kwargs.get("name")
self.assertEqual(actual_uuid, self.mock_uuid1)
self.assertEqual(actual_ctxt, self.ctxt)
self.assertEqual(actual_algorithm, algorithm)
self.assertEqual(actual_length, length)
self.assertEqual(actual_name, name)
def test_generate_symmetric_key_includes_invalid_param(self):
algorithm = "AES"
length = 256
name = None
other_value = "other_value"
with self.assertRaises(exceptions.BadRequest):
api.generate_symmetric_key(
self.request,
algorithm=algorithm,
length=length,
name=name,
other_value=other_value)
def test_generate_key_pair(self):
algorithm = "RSA"
length = 2048
name = None
actual_result = api.generate_key_pair(
self.request,
algorithm=algorithm,
length=length,
name=name)
self.key_manager.create_key_pair.assert_called_once()
args, kwargs = self.key_manager.create_key_pair.call_args
(actual_ctxt,) = args
actual_algorithm = kwargs.get("algorithm")
actual_length = kwargs.get("length")
actual_name = kwargs.get("name")
self.assertTrue(self.mock_uuid1 in actual_result)
self.assertTrue(self.mock_uuid2 in actual_result)
self.assertEqual(actual_ctxt, self.ctxt)
self.assertEqual(actual_algorithm, algorithm)
self.assertEqual(actual_length, length)
self.assertEqual(actual_name, name)
def test_generate_key_pair_invalid_param(self):
algorithm = "RSA"
length = 2048
name = None
other_value = "other_value"
with self.assertRaises(exceptions.BadRequest):
api.generate_key_pair(
self.request,
algorithm=algorithm,
length=length,
name=name,
other_value=other_value)
def test_delete(self):
object_id = str(uuid.uuid4())
api.delete(self.request, object_id)
self.key_manager.delete.assert_called_once()
args, kwargs = self.key_manager.delete.call_args
actual_ctxt, actual_id = args
self.assertEqual(actual_ctxt, self.ctxt)
self.assertEqual(actual_id, object_id)
def test_list(self):
api.list(self.request)
self.key_manager.list.assert_called_once()
args, kwargs = self.key_manager.list.call_args
(actual_ctxt,) = args
actual_object_type = kwargs.get("object_type")
actual_metadata_only = kwargs.get("metadata_only")
self.assertEqual(actual_ctxt, self.ctxt)
self.assertIsNone(actual_object_type)
self.assertTrue(actual_metadata_only)
def test_list_with_type(self):
object_type = mock.Mock()
api.list(self.request, object_type=object_type)
self.key_manager.list.assert_called_once()
args, kwargs = self.key_manager.list.call_args
(actual_ctxt,) = args
actual_object_type = kwargs.get("object_type")
actual_metadata_only = kwargs.get("metadata_only")
self.assertEqual(actual_ctxt, self.ctxt)
self.assertEqual(actual_object_type, object_type)
self.assertTrue(actual_metadata_only)
def test_get(self):
object_id = str(uuid.uuid4())
api.get(self.request, object_id)
self.key_manager.get.assert_called_once()
args, kwargs = self.key_manager.get.call_args
actual_ctxt, actual_id = args
self.assertEqual(actual_ctxt, self.ctxt)
self.assertEqual(actual_id, object_id)

View File

@ -0,0 +1,53 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import uuid
from castellan_ui import api
from openstack_dashboard.test import helpers
class CastellanTestsMixin(object):
def _setup_test_data(self):
super(CastellanTestsMixin, self)._setup_test_data()
def mock_object(self, obj, attr_name, new_attr=None, **kwargs):
"""Use python mock to mock an object attribute
Mocks the specified objects attribute with the given value.
Automatically performs 'addCleanup' for the mock.
"""
if not new_attr:
new_attr = mock.Mock()
patcher = mock.patch.object(obj, attr_name, new_attr, **kwargs)
patcher.start()
return new_attr
class APITestCase(CastellanTestsMixin, helpers.APITestCase):
def setUp(self):
super(APITestCase, self).setUp()
self.mock_uuid1 = str(uuid.uuid4())
self.mock_uuid2 = str(uuid.uuid4())
self.mock_managed_object = mock.Mock()
self._key_manager = self.mock_object(api.client,
"key_manager")
self.key_manager = self._key_manager.return_value
self.key_manager.store.return_value = self.mock_uuid1
self.key_manager.create_key.return_value = self.mock_uuid1
self.key_manager.create_key_pair.return_value = (
self.mock_uuid1, self.mock_uuid2)
self.key_manager.get.return_value = self.mock_object
self.key_manager.delete.return_value = True
self.key_manager.list.return_value = [self.mock_object]