Add ManagedObjectNotFoundError

Adding this new error type will allow Castellan to distinguish between
whether an error occurred because the could not be found or some other sort
of error with communicating with Barbican.

Change-Id: Ie8fc3cf457009522349285c750adeeedd75e9a60
This commit is contained in:
Kaitlin Farr 2015-09-21 13:16:32 -04:00
parent 0be6648f6f
commit 14db1346e7
3 changed files with 54 additions and 17 deletions

View File

@ -58,3 +58,7 @@ class Forbidden(CastellanException):
class KeyManagerError(CastellanException):
message = u._("Key manager error: %(reason)s")
class ManagedObjectNotFoundError(CastellanException):
message = u._("Key not found, uuid: %(uuid)s")

View File

@ -25,6 +25,7 @@ from keystoneclient.auth import identity
from keystoneclient import session
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from castellan.common import exception
from castellan.common.objects import key as key_base_class
@ -303,19 +304,19 @@ class BarbicanKeyManager(key_manager.KeyManager):
LOG.error(u._LE("Error storing object: %s"), e)
raise exception.KeyManagerError(reason=e)
def _create_secret_ref(self, key_id):
def _create_secret_ref(self, object_id):
"""Creates the URL required for accessing a secret.
:param key_id: the UUID of the key to copy
:param object_id: the UUID of the key to copy
:return: the URL of the requested secret
"""
if not key_id:
if not object_id:
msg = "Key ID is None"
raise exception.KeyManagerError(reason=msg)
base_url = self._base_url
if base_url[-1] != '/':
base_url += '/'
return urllib.parse.urljoin(base_url, "secrets/" + key_id)
return urllib.parse.urljoin(base_url, "secrets/" + object_id)
def _get_active_order(self, barbican_client, order_ref):
"""Returns the order when it is active.
@ -418,26 +419,35 @@ class BarbicanKeyManager(key_manager.KeyManager):
else:
return secret_type(secret_data)
def _get_secret(self, context, key_id):
def _get_secret(self, context, object_id):
"""Returns the metadata of the secret.
:param context: contains information of the user and the environment
for the request (castellan/context.py)
:param key_id: UUID of the secret
:param object_id: UUID of the secret
:return: the secret's metadata
:raises KeyManagerError: if object retrieval fails
:raises HTTPAuthError: if object retrieval fails with 401
:raises HTTPClientError: if object retrieval fails with 4xx
:raises HTTPServerError: if object retrieval fails with 5xx
"""
barbican_client = self._get_barbican_client(context)
try:
secret_ref = self._create_secret_ref(key_id)
secret_ref = self._create_secret_ref(object_id)
return barbican_client.secrets.get(secret_ref)
except (barbican_exceptions.HTTPAuthError,
barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e:
LOG.error(u._LE("Error getting secret metadata: %s"), e)
raise exception.KeyManagerError(reason=e)
with excutils.save_and_reraise_exception():
LOG.error(u._LE("Error getting secret metadata: %s"), e)
def _is_secret_not_found_error(self, error):
if (isinstance(error, barbican_exceptions.HTTPClientError) and
error.status_code == 404):
return True
else:
return False
def get(self, context, managed_object_id):
"""Retrieves the specified managed object.
@ -449,16 +459,20 @@ class BarbicanKeyManager(key_manager.KeyManager):
:param managed_object_id: the UUID of the object to retrieve
:return: SymmetricKey representation of the key
:raises KeyManagerError: if object retrieval fails
:raises ManagedObjectNotFoundError: if object not found
"""
try:
secret = self._get_secret(context, managed_object_id)
return self._get_castellan_object(secret)
except (barbican_exceptions.HTTPAuthError,
barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError,
exception.KeyManagerError) as e:
LOG.error(u._LE("Error getting object: %s"), e)
raise exception.KeyManagerError(reason=e)
barbican_exceptions.HTTPServerError) as e:
LOG.error(u._LE("Error retrieving object: %s"), e)
if self._is_secret_not_found_error(e):
raise exception.ManagedObjectNotFoundError(
uuid=managed_object_id)
else:
raise exception.KeyManagerError(reason=e)
def delete(self, context, managed_object_id):
"""Deletes the specified managed object.
@ -467,6 +481,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
for the request (castellan/context.py)
:param managed_object_id: the UUID of the object to delete
:raises KeyManagerError: if key deletion fails
:raises ManagedObjectNotFoundError: if the object could not be found
"""
barbican_client = self._get_barbican_client(context)
@ -477,4 +492,8 @@ class BarbicanKeyManager(key_manager.KeyManager):
barbican_exceptions.HTTPClientError,
barbican_exceptions.HTTPServerError) as e:
LOG.error(u._LE("Error deleting object: %s"), e)
raise exception.KeyManagerError(reason=e)
if self._is_secret_not_found_error(e):
raise exception.ManagedObjectNotFoundError(
uuid=managed_object_id)
else:
raise exception.KeyManagerError(reason=e)

View File

@ -171,10 +171,17 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.key_mgr.delete(self.ctxt, self.key_id)
self.delete.assert_called_once_with(self.secret_ref)
def test_delete_unknown_key(self):
def test_delete_none_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete, self.ctxt, None)
def test_delete_unkown_key(self):
side_effect = barbican_exceptions.HTTPClientError('key not found')
side_effect.status_code = 404
self.mock_barbican.secrets.delete = mock.Mock(side_effect=side_effect)
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.delete, self.ctxt, self.key_id)
def test_delete_with_error(self):
self.mock_barbican.secrets.delete = mock.Mock(
side_effect=barbican_exceptions.HTTPClientError('test error'))
@ -200,10 +207,17 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
self.assertRaises(exception.Forbidden,
self.key_mgr.get, None, self.key_id)
def test_get_unknown_key(self):
def test_get_none_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.get, self.ctxt, None)
def test_get_unknown_key(self):
side_effect = barbican_exceptions.HTTPClientError('key not found')
side_effect.status_code = 404
self.mock_barbican.secrets.get = mock.Mock(side_effect=side_effect)
self.assertRaises(exception.ManagedObjectNotFoundError,
self.key_mgr.get, self.ctxt, self.key_id)
def test_get_with_error(self):
self.mock_barbican.secrets.get = mock.Mock(
side_effect=barbican_exceptions.HTTPClientError('test error'))