Adding keystone notification listener support

Notification listener processes only keystone project delete events.
It uses keystone project id to identify if related barbican resources are there
and deletes that project related barbican resources.

10/07/2014: Keep resolving rebase conflicts as this change is pending for a while.
Modified listener transaction logic as per related recent change in barbican.
Fixed missing patcher stop in few places which was causing failure in new tests.

Implements: blueprint consume-keystone-events

Change-Id: Iba7d50eb222edd43352ef82f629df2b3187c76ec
This commit is contained in:
Arun Kant 2014-07-07 10:56:03 -07:00
parent 2a3082ce4e
commit 758904848f
14 changed files with 1323 additions and 2 deletions

View File

@ -293,6 +293,29 @@ def clean_paging_values(offset_arg=0, limit_arg=CONF.default_limit_paging):
return offset, limit
def delete_all_project_resources(tenant_id, repos):
"""Logic to cleanup all project resources.
This cleanup uses same alchemy session to perform all db operations as a
transaction and will commit only when all db operations are performed
without error.
"""
session = get_session()
repos.container_repo.delete_project_entities(
tenant_id, suppress_exception=False, session=session)
# secret children SecretStoreMetadatum, EncryptedDatum
# and container_secrets are deleted as part of secret delete
repos.secret_repo.delete_project_entities(
tenant_id, suppress_exception=False, session=session)
repos.kek_repo.delete_project_entities(
tenant_id, suppress_exception=False, session=session)
repos.tenant_secret_repo.delete_project_entities(
tenant_id, suppress_exception=False, session=session)
repos.tenant_repo.delete_project_entities(
tenant_id, suppress_exception=False, session=session)
class Repositories(object):
"""Convenient way to pass repositories around.
@ -538,6 +561,72 @@ class BaseRepo(object):
if getattr(entity_ref, k) != values[k]:
setattr(entity_ref, k, values[k])
def _build_get_project_entities_query(self, tenant_id, session):
"""Sub-class hook: build a query to retrieve entities for a given
project.
:param tenant_id: id of barbican tenant (project) entity
:param session: existing db session reference.
:returns: A query object for getting all project related entities
This query is used by `get_project_entities` and
`delete_project_entities` functions in BaseRepo class.
This will filter deleted entities if there.
"""
msg = u._("{0} is missing query build method for get project "
"entities.").format(self._do_entity_name())
raise NotImplementedError(msg)
def get_project_entities(self, tenant_id, session=None):
"""Gets entities associated with a given project.
:param tenant_id: id of barbican tenant (project) entity
:param session: existing db session reference. If None, gets session.
:returns: list of matching entities found otherwise returns empty list
if no entity exists for a given project.
Sub-class should implement `_build_get_project_entities_query` function
to delete related entities otherwise it would raise NotImplementedError
on its usage.
"""
session = self.get_session(session)
query = self._build_get_project_entities_query(tenant_id, session)
if query:
return query.all()
else:
return []
def delete_project_entities(self, tenant_id,
suppress_exception=False,
session=None):
"""Deletes entities for a given project.
:param tenant_id: id of barbican tenant (project) entity
:param suppress_exception: Pass True if want to suppress exception
:param session: existing db session reference. If None, gets session.
Sub-class should implement `_build_get_project_entities_query` function
to delete related entities otherwise it would raise NotImplementedError
on its usage.
"""
session = self.get_session(session)
query = self._build_get_project_entities_query(tenant_id,
session=session)
try:
# query cannot be None as related repo class is expected to
# implement it otherwise error is raised in build query call
for entity in query:
# Its a soft delete so its more like entity update
entity.delete(session=session)
except sqlalchemy.exc.SQLAlchemyError:
LOG.exception('Problem finding project related entity to delete')
if not suppress_exception:
raise exception.BarbicanException('Error deleting project '
'entities for tenant_id=%s',
tenant_id)
class TenantRepo(BaseRepo):
"""Repository for the Tenant entity."""
@ -573,6 +662,12 @@ class TenantRepo(BaseRepo):
return entity
def _build_get_project_entities_query(self, tenant_id, session):
"""Builds query for retrieving project for given id.
"""
return session.query(models.Tenant).filter_by(id=tenant_id).filter_by(
deleted=False)
class SecretRepo(BaseRepo):
"""Repository for the Secret entity."""
@ -664,6 +759,18 @@ class SecretRepo(BaseRepo):
"""Sub-class hook: validate values."""
pass
def _build_get_project_entities_query(self, tenant_id, session):
"""Builds query for retrieving Secrets associated with a given
project via TenantSecret association.
:param tenant_id: id of barbican tenant (project) entity
:param session: existing db session reference.
"""
query = session.query(models.Secret).filter_by(deleted=False)
query = query.join(models.TenantSecret, models.Secret.tenant_assocs)
query = query.filter(models.TenantSecret.tenant_id == tenant_id)
return query
class EncryptedDatumRepo(BaseRepo):
"""Repository for the EncryptedDatum entity
@ -802,6 +909,16 @@ class KEKDatumRepo(BaseRepo):
"""Sub-class hook: validate values."""
pass
def _build_get_project_entities_query(self, tenant_id, session):
"""Builds query for retrieving KEK Datum instance(s) related to given
project.
:param tenant_id: id of barbican tenant (project) entity
:param session: existing db session reference.
"""
return session.query(models.KEKDatum).filter_by(
tenant_id=tenant_id).filter_by(deleted=False)
class TenantSecretRepo(BaseRepo):
"""Repository for the TenantSecret entity."""
@ -821,6 +938,15 @@ class TenantSecretRepo(BaseRepo):
"""Sub-class hook: validate values."""
pass
def _build_get_project_entities_query(self, tenant_id, session):
"""Builds query for retrieving TenantSecret related to given project.
:param tenant_id: id of barbican tenant (project) entity
:param session: existing db session reference.
"""
return session.query(models.TenantSecret).filter_by(
tenant_id=tenant_id).filter_by(deleted=False)
class OrderRepo(BaseRepo):
"""Repository for the Order entity."""
@ -891,6 +1017,15 @@ class OrderRepo(BaseRepo):
"""Sub-class hook: validate values."""
pass
def _build_get_project_entities_query(self, tenant_id, session):
"""Builds query for retrieving orders related to given project.
:param tenant_id: id of barbican tenant (project) entity
:param session: existing db session reference.
"""
return session.query(models.Order).filter_by(
tenant_id=tenant_id).filter_by(deleted=False)
class OrderPluginMetadatumRepo(BaseRepo):
"""Repository for the OrderPluginMetadatum entity
@ -1008,6 +1143,15 @@ class ContainerRepo(BaseRepo):
"""Sub-class hook: validate values."""
pass
def _build_get_project_entities_query(self, tenant_id, session):
"""Builds query for retrieving container related to given project.
:param tenant_id: id of barbican tenant (project) entity
:param session: existing db session reference.
"""
return session.query(models.Container).filter_by(
deleted=False).filter_by(tenant_id=tenant_id)
class ContainerSecretRepo(BaseRepo):
"""Repository for the ContainerSecret entity."""

View File

@ -18,6 +18,8 @@ Queue objects for Cloudkeep's Barbican
"""
from oslo.config import cfg
from oslo import messaging
from oslo.messaging.notify import dispatcher as notfiy_dispatcher
from oslo.messaging import server as msg_server
from barbican.common import exception
from barbican.common import utils
@ -43,10 +45,49 @@ queue_opts = [
help=u._('Server name for RPC task processing server')),
]
# constant at one place if this needs to be changed later
KS_NOTIFICATIONS_GRP_NAME = 'keystone_notifications'
ks_queue_opt_group = cfg.OptGroup(name=KS_NOTIFICATIONS_GRP_NAME,
title='Keystone Notification Options')
ks_queue_opts = [
cfg.BoolOpt('enable', default=False,
help=u._('True enables keystone notification listener '
' functionality.')),
cfg.StrOpt('control_exchange', default='openstack',
help=u._('The default exchange under which topics are scoped. '
'May be overridden by an exchange name specified in '
' the transport_url option.')),
cfg.StrOpt('topic', default='notifications',
help=u._("Keystone notification queue topic name. This name "
"needs to match one of values mentioned in Keystone "
"deployment\'s 'notification_topics' configuration "
"e.g."
" notification_topics=notifications, "
" barbican_notifications"
"Multiple servers may listen on a topic and messages "
" will be dispatched to one of the servers in a "
"round-robin fashion. That's why Barbican service "
" should have its own dedicated notification queue so "
" that it receives all of Keystone notifications.")),
cfg.BoolOpt('allow_requeue', default=False,
help=u._('True enables requeue feature in case of notification'
' processing error. Enable this only when underlying '
'transport supports this feature.')),
cfg.StrOpt('version', default='1.0',
help=u._('Version of tasks invoked via notifications')),
cfg.IntOpt('thread_pool_size', default=10,
help=u._('Define the number of max threads to be used for '
'notification server processing functionality.')),
]
CONF = cfg.CONF
CONF.register_group(queue_opt_group)
CONF.register_opts(queue_opts, group=queue_opt_group)
CONF.register_group(ks_queue_opt_group)
CONF.register_opts(ks_queue_opts, group=ks_queue_opt_group)
TRANSPORT = None
IS_SERVER_SIDE = True
@ -105,3 +146,30 @@ def get_server(target, endpoints, serializer=None):
endpoints,
executor='eventlet',
serializer=serializer)
def get_notification_target():
conf_opts = getattr(CONF, KS_NOTIFICATIONS_GRP_NAME)
return messaging.Target(exchange=conf_opts.control_exchange,
topic=conf_opts.topic,
version=conf_opts.version,
fanout=True)
def get_notification_server(targets, endpoints,
serializer=None):
"""Notification server uses same transport configuration as used by other
barbican functionality like async order processing.
Assumption is that messaging infrastructure is going to be shared (same)
among different barbican features.
"""
allow_requeue = getattr(getattr(CONF, KS_NOTIFICATIONS_GRP_NAME),
'allow_requeue')
TRANSPORT._require_driver_features(requeue=allow_requeue)
dispatcher = notfiy_dispatcher.NotificationDispatcher(targets, endpoints,
serializer,
allow_requeue)
# we don't want blocking executor so use eventlet as executor choice
return msg_server.MessageHandlingServer(TRANSPORT, dispatcher,
executor='eventlet')

View File

@ -0,0 +1,156 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Server-side (i.e. worker side) Keystone notification related classes and logic.
"""
from oslo import messaging
from barbican.common import utils
from barbican.openstack.common import service
from barbican import queue
from barbican.tasks import keystone_consumer
LOG = utils.getLogger(__name__)
class NotificationTask(object):
"""Notification task which exposes the API for consuming priority based
notifications.
The Oslo notification framework delivers notifications based on priority to
matching callback APIs as defined in its notification listener endpoint
list.
Currently from Keystone perspective, `info` API is sufficient as Keystone
send notifications at `info` priority ONLY. Other priority level APIs
(warn, error, critical, audit, debug) are not needed here.
"""
def __init__(self, conf):
self.conf = conf
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""Receives notification at info level."""
return self.process_event(ctxt, publisher_id, event_type, payload,
metadata)
def process_event(self, ctxt, publisher_id, event_type, payload, metadata):
"""Process Keystone Event based on event_type and payload data.
Parses notification data to identify if the event is related to delete
project or not. In case of delete project event, it passes project_id
to KeystoneEventConsumer logic for further processing. Barbican service
is not interested in other events so in that case it just returns None
as acknowledgment.
Messaging server considers message is acknowledged when either return
value is `messaging.NotificationResult.HANDLED` or None.
In case of successful processing of notification, the returned value is
`messaging.NotificationResult.HANDLED`
In case of notification processing error, the value returned is
messaging.NotificationResult.REQUEUE when transport supports this
feature otherwise `messaging.NotificationResult.HANDLED` is returned.
"""
LOG.debug("Input keystone event publisher_id = %s", publisher_id)
LOG.debug("Input keystone event payload = %s", payload)
LOG.debug("Input keystone event type = %s", event_type)
LOG.debug("Input keystone event metadata = %s", metadata)
project_id = self._parse_payload_for_project_id(payload)
resource_type, operation_type = self._parse_event_type(event_type)
LOG.debug('Keystone Event: resource type={0}, operation type={1}, '
'keystone id={2}'.format(resource_type, operation_type,
project_id))
if (project_id and resource_type == 'project' and
operation_type == 'deleted'):
task = keystone_consumer.KeystoneEventConsumer()
try:
task.process(project_id=project_id,
resource_type=resource_type,
operation_type=operation_type)
return messaging.NotificationResult.HANDLED
except Exception:
# No need to log message here as task process method has
# already logged it
if self.conf.keystone_notifications.allow_requeue:
return messaging.NotificationResult.REQUEUE
else:
return messaging.NotificationResult.HANDLED
return None # in case event is not project delete
def _parse_event_type(self, event_type):
"""Parses event type provided as part of notification to identify what
operation is performed and on which Keystone resource.
A few event type sample values are provided below::
identity.project.deleted
identity.role.created
identity.domain.updated
identity.authenticate
"""
resource_type = None
operation_type = None
if event_type:
type_list = event_type.split('.')
# 2 is min. number of dot delimiters expected in event_type value.
if len(type_list) > 2:
resource_type = type_list[-2].lower()
operation_type = type_list[-1].lower()
return resource_type, operation_type
def _parse_payload_for_project_id(self, payload_s):
"""Gets project resource identifier from payload
Sample payload is provided below::
{'resource_info': u'2b99a94ad02741978e613fb52dd1f4cd'}
"""
if payload_s:
return payload_s.get('resource_info')
class MessageServer(NotificationTask, service.Service):
"""Server to retrieve messages from queue used by Keystone to send public
notifications for openstack service consumption.
This server is an Oslo notification server that exposes set of standard
APIs for events consumption based on event priority.
Some of messaging server configuration needs to match with Keystone
deployment notification configuration e.g. exchange name, topic name
"""
def __init__(self, conf):
pool_size = conf.keystone_notifications.thread_pool_size
NotificationTask.__init__(self, conf)
service.Service.__init__(self, threads=pool_size)
self.target = queue.get_notification_target()
self._msg_server = queue.get_notification_server(targets=[self.target],
endpoints=[self])
def start(self):
self._msg_server.start()
super(MessageServer, self).start()
def stop(self):
super(MessageServer, self).stop()
self._msg_server.stop()
queue.cleanup()

View File

@ -0,0 +1,120 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Server-side Keystone notification payload processing logic.
"""
from barbican.common import utils
from barbican.model import repositories as rep
from barbican.openstack.common import gettextutils as u
from barbican.tasks import resources
LOG = utils.getLogger(__name__)
class KeystoneEventConsumer(resources.BaseTask):
"""Keystone event consumer listening for notifications sent by Keystone
deployment.
Currently this processes only Keystone project delete event.
"""
def get_name(self):
return u._('Project cleanup via Keystone notifications')
def __init__(self, tenant_repo=None, order_repo=None,
secret_repo=None, tenant_secret_repo=None,
datum_repo=None, kek_repo=None, secret_meta_repo=None,
container_repo=None):
LOG.debug('Creating KeystoneEventConsumer task processor')
self.repos = rep.Repositories(tenant_repo=tenant_repo,
order_repo=order_repo,
secret_repo=secret_repo,
tenant_secret_repo=tenant_secret_repo,
datum_repo=datum_repo,
kek_repo=kek_repo,
secret_meta_repo=secret_meta_repo,
container_repo=container_repo)
def process(self, *args, **kwargs):
try:
rep.start()
super(KeystoneEventConsumer, self).process(*args, **kwargs)
rep.commit()
except Exception as e:
"""Exceptions that reach here needs to revert the entire
transaction.
No need to log error message as its already done earlier.
"""
rep.rollback()
raise e
finally:
rep.clear()
def retrieve_entity(self, project_id, resource_type=None,
operation_type=None):
tenant_repo = self.repos.tenant_repo
return tenant_repo.find_by_keystone_id(keystone_id=project_id,
suppress_exception=True)
def handle_processing(self, barbican_project, *args, **kwargs):
self.handle_cleanup(barbican_project, *args, **kwargs)
def handle_error(self, project, status, message, exception,
project_id=None, resource_type=None, operation_type=None):
LOG.error('Error processing Keystone event, project_id={0}, event '
'resource={1}, event operation={2}, status={3}, error '
'message={4}'.format(project.tenant_id, resource_type,
operation_type, status, message))
def handle_success(self, project, project_id=None, resource_type=None,
operation_type=None):
LOG.info('Successfully handled Keystone event, project_id={0}, event '
'resource={1}, event operation={2}'.format(project_id,
resource_type,
operation_type))
def handle_cleanup(self, project, project_id=None, resource_type=None,
operation_type=None):
"""Handle Barbican resources cleanup needed as part of Keystone project
delete.
:param project: Barbican project entity which is retrieved by project
id available in Keystone notification.
:param project_id: project identifier as present in Keystone
notification.
:param resource_type: type of resource updated as part of Keystone
notification e.g. Keystone project, domain, user etc.
:param operation_type: type of operation (created, updated, deleted
etc.) performed on Keystone resource.
"""
if project is None:
LOG.info('No action is needed as there are no Barbican resources '
'present for Keystone project_id={0}'.format(project_id))
return
# barbican entities use tenants table 'id' field as foreign key. Delete
# apis are using that id to lookup related entities and not keystone
# project id which requires additional tenant table join.
tenant_id = project.id
rep.delete_all_project_resources(tenant_id, self.repos)
# reached here means there is no error so log the successful
# cleanup log entry.
LOG.info('Successfully completed Barbican resources cleanup for '
'Keystone project_id={0}'.format(project_id))

View File

@ -48,6 +48,7 @@ class WhenTestingPluginResource(testtools.TestCase):
**gen_plugin_config
)
self.gen_plugin_patcher.start()
self.addCleanup(self.gen_plugin_patcher.stop)
tenant_repo = mock.MagicMock()
secret_repo = mock.MagicMock()

View File

@ -0,0 +1,340 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import six
import uuid
from oslo.config import cfg
from oslo import messaging
from barbican.openstack.common import service
from barbican import queue
from barbican.queue import keystone_listener
from barbican.tasks import keystone_consumer as consumer
from barbican.tests import utils
class UtilMixin(object):
def __init__(self, *args, **kwargs):
super(UtilMixin, self).__init__(*args, **kwargs)
self.conf = cfg.CONF
# dict which has item as {property: (value, group_name)}
self.overrides = {}
def revert_overrides(self):
'''Reverts configuration override values after test end.'''
for k, v in six.iteritems(self.overrides):
value, group = v
self.conf.set_override(k, value, group)
def setUp(self):
super(UtilMixin, self).setUp()
self.addCleanup(self.revert_overrides)
def opt_in_group(self, group, **kw):
for k, v in six.iteritems(kw):
# add to local overrides if its not already set
# we want to keep the original value from first override
dict_value = self.overrides.get(k)
if not dict_value:
if group:
orig_value = getattr(getattr(self.conf, group), k)
else:
orig_value = getattr(self.conf, k)
self.overrides[k] = orig_value, group
self.conf.set_override(k, v, group)
class WhenUsingNotificationTask(UtilMixin, utils.BaseTestCase):
"""Test for 'Notification' task functionality."""
def setUp(self):
super(WhenUsingNotificationTask, self).setUp()
self.task = keystone_listener.NotificationTask(self.conf)
self.payload = {'resource_info': uuid.uuid4().hex}
self.type_index = 2
self.payload_index = 3
self.task_args = ['my_context', 'publisher_id', 'event_type',
self.payload, {'metadata': 'value'}]
@mock.patch.object(keystone_listener.NotificationTask, 'process_event')
def test_info_level_notification(self, mock_process):
self.task.info(*self.task_args)
mock_process.assert_called_once_with(*self.task_args)
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_create_project_event_notification(self, mock_process):
self.task_args[self.type_index] = 'identity.project.created'
result = self.task.info(*self.task_args)
self.assertFalse(mock_process.called, 'Should not call event consumer '
'for project create event')
self.assertIsNone(result)
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_update_project_event_notification(self, mock_process):
self.task_args[self.type_index] = 'identity.project.updated'
result = self.task.info(*self.task_args)
self.assertFalse(mock_process.called, 'Should not call event consumer '
'for project update event')
self.assertIsNone(result)
@mock.patch('barbican.model.repositories.Repositories')
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_delete_project_event_notification_with_required_data(
self, mock_process, mock_repos):
project_id = uuid.uuid4().hex
self.task_args[self.type_index] = 'identity.project.deleted'
self.task_args[self.payload_index] = {'resource_info': project_id}
result = self.task.info(*self.task_args)
mock_process.assert_called_once_with(project_id=project_id,
operation_type='deleted',
resource_type='project')
self.assertEqual(messaging.NotificationResult.HANDLED, result)
@mock.patch('barbican.model.repositories.Repositories')
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_delete_project_event_with_different_service_name_in_event_type(
self, mock_process, mock_repos):
project_id = uuid.uuid4().hex
self.task_args[self.type_index] = 'aaa.project.deleted'
self.task_args[self.payload_index] = {'resource_info': project_id}
result = self.task.info(*self.task_args)
mock_process.assert_called_once_with(project_id=project_id,
operation_type='deleted',
resource_type='project')
self.assertEqual(messaging.NotificationResult.HANDLED, result)
@mock.patch('barbican.model.repositories.Repositories')
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_delete_project_event_with_event_type_in_different_case(
self, mock_process, mock_repos):
project_id = uuid.uuid4().hex
self.task_args[self.type_index] = 'Identity.PROJECT.DeleteD'
self.task_args[self.payload_index] = {'resource_info': project_id}
result = self.task.info(*self.task_args)
mock_process.assert_called_once_with(project_id=project_id,
operation_type='deleted',
resource_type='project')
self.assertEqual(messaging.NotificationResult.HANDLED, result)
@mock.patch('barbican.model.repositories.Repositories')
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_delete_project_event_with_incomplete_event_type_format(
self, mock_process, mock_repos):
project_id = uuid.uuid4().hex
self.task_args[self.type_index] = 'project.deleted'
self.task_args[self.payload_index] = {'resource_info': project_id}
result = self.task.info(*self.task_args)
self.assertFalse(mock_process.called, 'Should not call event consumer '
'for project delete event as service name is missing '
'in event_type data. Expected format is '
' <service_name>.<resource_name>.<operation_type>')
self.assertIsNone(result)
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_delete_project_event_notification_with_missing_resource_info(
self, mock_process):
self.task_args[self.type_index] = 'identity.project.deleted'
self.task_args[self.payload_index] = {'resource_info': None}
result = self.task.info(*self.task_args)
self.assertFalse(mock_process.called, 'Should not call event consumer '
'for project delete event when project_id is missing '
'in payload')
self.assertIsNone(result)
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_delete_project_event_notification_with_missing_payload(
self, mock_process):
self.task_args[self.type_index] = 'identity.project.deleted'
self.task_args[self.payload_index] = None
result = self.task.info(*self.task_args)
self.assertFalse(mock_process.called, 'Should not call event consumer '
'for project delete event when payload is missing')
self.assertIsNone(result)
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_delete_project_event_notification_with_blank_payload(
self, mock_process):
self.task_args[self.type_index] = 'identity.project.deleted'
self.task_args[self.payload_index] = ''
result = self.task.info(*self.task_args)
self.assertFalse(mock_process.called, 'Should not call event consumer '
'for project delete event when payload is missing')
self.assertIsNone(result)
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_event_notification_with_missing_event_type(self, mock_process):
project_id = uuid.uuid4().hex
self.task_args[self.type_index] = None
self.task_args[self.payload_index] = {'resource_info': project_id}
result = self.task.info(*self.task_args)
self.assertFalse(mock_process.called, 'Should not call event consumer '
'for keystone event when event_type is missing in '
'notification')
self.assertIsNone(result)
@mock.patch.object(consumer.KeystoneEventConsumer, 'process',
return_value=None)
def test_event_notification_with_blank_event_type(self, mock_process):
project_id = uuid.uuid4().hex
self.task_args[self.type_index] = ''
self.task_args[self.payload_index] = {'resource_info': project_id}
result = self.task.info(*self.task_args)
self.assertFalse(mock_process.called, 'Should not call event consumer '
'keystone event when event_type is blank in '
'notification')
self.assertIsNone(result)
@mock.patch('barbican.model.repositories.Repositories')
@mock.patch.object(consumer.KeystoneEventConsumer, 'process')
def test_event_notification_with_processing_error_requeue_disabled(
self, mock_process, mock_repos):
self.opt_in_group(queue.KS_NOTIFICATIONS_GRP_NAME, allow_requeue=False)
local_task = keystone_listener.NotificationTask(self.conf)
mock_process.side_effect = Exception('Dummy Error')
project_id = uuid.uuid4().hex
self.task_args[self.type_index] = 'identity.project.deleted'
self.task_args[self.payload_index] = {'resource_info': project_id}
result = local_task.info(*self.task_args)
self.assertTrue(mock_process.called, 'Should call event consumer for'
' project delete event')
self.assertEqual(messaging.NotificationResult.HANDLED, result)
@mock.patch('barbican.model.repositories.Repositories')
@mock.patch.object(consumer.KeystoneEventConsumer, 'process')
def test_event_notification_with_processing_error_requeue_enabled(
self, mock_process, mock_repos):
self.opt_in_group(queue.KS_NOTIFICATIONS_GRP_NAME, allow_requeue=True)
local_task = keystone_listener.NotificationTask(self.conf)
mock_process.side_effect = Exception('Dummy Error')
project_id = uuid.uuid4().hex
self.task_args[self.type_index] = 'identity.project.deleted'
self.task_args[self.payload_index] = {'resource_info': project_id}
result = local_task.info(*self.task_args)
self.assertTrue(mock_process.called, 'Should call event consumer for'
' project delete event')
self.assertEqual(messaging.NotificationResult.REQUEUE, result)
class WhenUsingMessageServer(UtilMixin, utils.BaseTestCase):
"""Test using the asynchronous task client."""
def setUp(self):
super(WhenUsingMessageServer, self).setUp()
queue.init(self.conf)
patcher = mock.patch('oslo.messaging.server.MessageHandlingServer')
mock_server_class = patcher.start()
self.addCleanup(patcher.stop)
self.msg_server_mock = mock_server_class()
self.msg_server_mock.start.return_value = None
self.msg_server_mock.stop.return_value = None
self.msg_server_mock.wait.return_value = None
@mock.patch.object(queue, 'get_notification_server')
@mock.patch.object(queue, 'get_notification_target')
def test_target_and_notification_server_invocations(self, mock_target,
mock_server):
target = 'a target value here'
mock_target.return_value = target
msg_server = keystone_listener.MessageServer(self.conf)
mock_target.assert_called_once_with()
mock_server.assert_called_once_with(
targets=[target], endpoints=[msg_server])
def test_keystone_notification_config_used(self):
topic = 'my test topic'
exchange = 'my test exchange'
version = ' my test version'
self.opt_in_group(queue.KS_NOTIFICATIONS_GRP_NAME, topic=topic)
self.opt_in_group(queue.KS_NOTIFICATIONS_GRP_NAME,
control_exchange=exchange)
self.opt_in_group(queue.KS_NOTIFICATIONS_GRP_NAME, version=version)
self.opt_in_group(queue.KS_NOTIFICATIONS_GRP_NAME, version=version)
target = queue.get_notification_target()
self.assertEqual(topic, target.topic)
self.assertEqual(exchange, target.exchange)
self.assertEqual(version, target.version)
@mock.patch.object(service.Service, '__init__')
def test_keystone_notification_pool_size_used(self, mock_service_init):
thread_pool_size = 5
self.opt_in_group(queue.KS_NOTIFICATIONS_GRP_NAME,
thread_pool_size=thread_pool_size)
msg_server = keystone_listener.MessageServer(self.conf)
mock_service_init.assert_called_once_with(msg_server,
threads=thread_pool_size)
@mock.patch.object(service.Service, 'start')
def test_should_start(self, mock_service):
msg_server = keystone_listener.MessageServer(self.conf)
msg_server.start()
self.msg_server_mock.start.assert_called_with()
@mock.patch.object(service.Service, 'stop')
def test_should_stop(self, mock_service_stop):
msg_server = keystone_listener.MessageServer(self.conf)
msg_server.stop()
self.msg_server_mock.stop.assert_called_with()
@mock.patch.object(service.Service, 'wait')
def test_should_wait(self, mock_service_wait):
msg_server = keystone_listener.MessageServer(self.conf)
msg_server.wait()
self.assertFalse(self.msg_server_mock.stop.called, 'No need to call'
'message server wait() as Service itself creates the '
' wait event')
self.assertTrue(mock_service_wait.called, 'Expected to only call '
'service.Service.wait() method')

View File

@ -0,0 +1,318 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import sqlalchemy
import uuid
from oslo.config import cfg
from barbican.common import exception
from barbican.common import resources as c_resources
from barbican.model import models
from barbican.model import repositories as rep
from barbican.plugin import resources as plugin
from barbican.tasks import keystone_consumer as consumer
from barbican.tests.queue import test_keystone_listener as listener_test
from barbican.tests import utils
class WhenUsingKeystoneEventConsumer(listener_test.UtilMixin,
utils.BaseTestCase):
IN_MEM_DB_CONN_STRING = 'sqlite://'
def setUp(self):
super(WhenUsingKeystoneEventConsumer, self).setUp()
self.conf = cfg.CONF
self.engine = None
self.addCleanup(self._cleanup)
def _cleanup(self):
if self.engine:
self.engine.dispose()
def _init_memory_db_setup(self):
# using in-memory sqlalchemy database, sqlite, instead of simulating
# data via mocks to verify transaction behavior (like rollback when
# error occurs in middle of delete project entities logic). This also
# helps in verifying that project_entities related query is defined
# correctly.
self.opt_in_group(None, sql_connection=self.IN_MEM_DB_CONN_STRING)
self.project_id1 = uuid.uuid4().hex
self.project_id2 = uuid.uuid4().hex
rep._MAKER = None
rep._ENGINE = None
rep.configure_db()
self.repos = rep.Repositories(
tenant_repo=None, tenant_secret_repo=None, secret_repo=None,
datum_repo=None, kek_repo=None, secret_meta_repo=None,
order_repo=None, order_plugin_meta_repo=None,
transport_key_repo=None, container_repo=None,
container_secret_repo=None)
self.project1_data = c_resources.get_or_create_tenant(
self.project_id1, self.repos.tenant_repo)
self.assertIsNotNone(self.project1_data)
self.engine = rep.get_engine()
self.project2_data = c_resources.get_or_create_tenant(
self.project_id2, self.repos.tenant_repo)
self.assertIsNotNone(self.project2_data)
def _create_secret_for_project(self, project_data):
secret_info = {"name": uuid.uuid4().hex, "algorithm": "aes",
"bit_length": 256, "mode": "cbc",
"payload_content_type": "application/octet-stream"}
new_secret = plugin.generate_secret(
secret_info, secret_info.get('payload_content_type'), project_data,
self.repos)
return new_secret
def test_get_project_entities_lookup_call(self):
self._init_memory_db_setup()
secret = self._create_secret_for_project(self.project2_data)
project2_id = self.project2_data.id
self.assertIsNotNone(secret)
db_secrets = self.repos.secret_repo.get_project_entities(project2_id)
self.assertEqual(1, len(db_secrets))
self.assertEqual(secret.id, db_secrets[0].id)
db_tenant_secret = self.repos.tenant_secret_repo.get_project_entities(
project2_id)
self.assertEqual(1, len(db_tenant_secret))
db_kek = self.repos.kek_repo.get_project_entities(project2_id)
self.assertEqual(1, len(db_kek))
# secret_meta_repo does not implement function
# _build_get_project_entities_query, so it should raise error
self.assertRaises(NotImplementedError,
self.repos.secret_meta_repo.get_project_entities,
project2_id)
# transport_key_repo does not implement function
# _build_get_project_entities_query, so it should raise error
self.assertRaises(NotImplementedError,
self.repos.transport_key_repo.get_project_entities,
project2_id)
@mock.patch.object(consumer.KeystoneEventConsumer, 'handle_success')
def test_existing_project_entities_cleanup_for_plain_secret(
self, mock_handle_success):
self._init_memory_db_setup()
secret = self._create_secret_for_project(self.project1_data)
self.assertIsNotNone(secret)
secret_id = secret.id
project1_id = self.project1_data.id
db_secrets = self.repos.secret_repo.get_project_entities(project1_id)
self.assertEqual(1, len(db_secrets))
self.assertEqual(secret.id, db_secrets[0].id)
# Get secret_store_metadata for related secret
self.assertTrue(len(db_secrets[0].secret_store_metadata) > 0)
secret_metadata_id = db_secrets[0].secret_store_metadata.values()[0].id
self.assertIsNotNone(secret_metadata_id)
# Get db entry for secret_store_metadata by id to make sure its
# presence before removing via delete project task
db_secret_store_meta = self.repos.secret_meta_repo.get(
entity_id=secret_metadata_id)
self.assertIsNotNone(db_secret_store_meta)
db_tenant_secret = self.repos.tenant_secret_repo.get_project_entities(
project1_id)
self.assertEqual(1, len(db_tenant_secret))
db_kek = self.repos.kek_repo.get_project_entities(project1_id)
self.assertEqual(1, len(db_kek))
task = consumer.KeystoneEventConsumer()
result = task.process(project_id=self.project_id1,
resource_type='project',
operation_type='deleted')
self.assertIsNone(result, 'No return is expected as result')
mock_handle_success.assert_called()
_, kwargs = mock_handle_success.call_args
self.assertEqual(self.project_id1, kwargs['project_id'])
self.assertEqual('project', kwargs['resource_type'])
self.assertEqual('deleted', kwargs['operation_type'])
# After project entities delete, make sure secret is not found
ex = self.assertRaises(exception.NotFound, self.repos.secret_repo.get,
entity_id=secret_id,
keystone_id=self.project_id1)
self.assertIn(secret_id, str(ex))
# After project entities delete, make sure tenant_secret is not found
entities = self.repos.tenant_secret_repo.get_project_entities(
project1_id)
self.assertEqual(0, len(entities))
# After project entities delete, make sure kek data is not found
entities = self.repos.kek_repo.get_project_entities(project1_id)
self.assertEqual(0, len(entities))
db_tenant = self.repos.tenant_repo.get_project_entities(project1_id)
self.assertEqual(0, len(db_tenant))
# Should have deleted SecretStoreMetadatum via children delete
ex = self.assertRaises(exception.NotFound,
self.repos.secret_meta_repo.get,
entity_id=secret_metadata_id)
def test_project_entities_cleanup_for_no_matching_barbican_project(self):
self._init_memory_db_setup()
task = consumer.KeystoneEventConsumer()
result = task.process(project_id=self.project_id1,
resource_type='project',
operation_type='deleted')
self.assertIsNone(result, 'No return is expected as result')
def test_project_entities_cleanup_for_missing_barbican_project(self):
self._init_memory_db_setup()
task = consumer.KeystoneEventConsumer()
result = task.process(project_id=None,
resource_type='project',
operation_type='deleted')
self.assertIsNone(result, 'No return is expected as result')
@mock.patch.object(models.Tenant, 'delete',
side_effect=sqlalchemy.exc.SQLAlchemyError)
def test_delete_project_entities_alchemy_error_suppress_exception_true(
self, mock_entity_delete):
self._init_memory_db_setup()
secret = self._create_secret_for_project(self.project1_data)
self.assertIsNotNone(secret)
project1_id = self.project1_data.id
# sqlalchemy error is suppressed here
no_error = self.repos.tenant_repo.delete_project_entities(
project1_id, suppress_exception=True)
self.assertIsNone(no_error)
@mock.patch.object(models.Tenant, 'delete',
side_effect=sqlalchemy.exc.SQLAlchemyError)
def test_delete_project_entities_alchemy_error_suppress_exception_false(
self, mock_entity_delete):
self._init_memory_db_setup()
secret = self._create_secret_for_project(self.project1_data)
self.assertIsNotNone(secret)
project1_id = self.project1_data.id
# sqlalchemy error is not suppressed here
self.assertRaises(exception.BarbicanException,
self.repos.tenant_repo.delete_project_entities,
project1_id, suppress_exception=False)
def test_delete_project_entities_not_impl_error_suppress_exception_true(
self):
self._init_memory_db_setup()
secret = self._create_secret_for_project(self.project1_data)
self.assertIsNotNone(secret)
project1_id = self.project1_data.id
# NotImplementedError is not suppressed regardless of related flag
self.assertRaises(NotImplementedError,
self.repos.secret_meta_repo.delete_project_entities,
project1_id, suppress_exception=True)
def test_delete_project_entities_not_impl_error_suppress_exception_false(
self):
self._init_memory_db_setup()
secret = self._create_secret_for_project(self.project1_data)
self.assertIsNotNone(secret)
project1_id = self.project1_data.id
# NotImplementedError is not suppressed regardless of related flag
self.assertRaises(NotImplementedError,
self.repos.secret_meta_repo.delete_project_entities,
project1_id, suppress_exception=False)
@mock.patch.object(consumer.KeystoneEventConsumer, 'handle_error')
@mock.patch.object(rep.TenantRepo, 'delete_project_entities',
side_effect=exception.BarbicanException)
def test_rollback_with_error_during_project_cleanup(self, mock_delete,
mock_handle_error):
self._init_memory_db_setup()
rep.start()
secret = self._create_secret_for_project(self.project1_data)
self.assertIsNotNone(secret)
secret_id = secret.id
project1_id = self.project1_data.id
db_secrets = self.repos.secret_repo.get_project_entities(project1_id)
self.assertEqual(1, len(db_secrets))
self.assertEqual(secret.id, db_secrets[0].id)
db_tenant_secret = self.repos.tenant_secret_repo.get_project_entities(
project1_id)
self.assertEqual(1, len(db_tenant_secret))
db_kek = self.repos.kek_repo.get_project_entities(project1_id)
self.assertEqual(1, len(db_kek))
# rollback changes made so far before creating rollback scenario
rep.commit()
task = consumer.KeystoneEventConsumer()
handle_error_mock = mock.MagicMock()
task.handler_error = handle_error_mock
self.assertRaises(exception.BarbicanException,
task.process, project_id=self.project_id1,
resource_type='project', operation_type='deleted')
mock_handle_error.assert_called()
args, kwargs = mock_handle_error.call_args
self.assertEqual(500, args[1])
self.assertEqual(self.project_id1, kwargs['project_id'])
self.assertEqual('project', kwargs['resource_type'])
self.assertEqual('deleted', kwargs['operation_type'])
# Make sure entities are still present after rollback
db_secrets = self.repos.secret_repo.get_project_entities(project1_id)
self.assertEqual(1, len(db_secrets))
self.assertEqual(secret_id, db_secrets[0].id)
db_tenant_secret = self.repos.tenant_secret_repo.get_project_entities(
project1_id)
self.assertEqual(1, len(db_tenant_secret))
db_kek = self.repos.kek_repo.get_project_entities(project1_id)
self.assertEqual(1, len(db_kek))
db_tenant = self.repos.tenant_repo.get_project_entities(project1_id)
self.assertEqual(1, len(db_tenant))

View File

@ -0,0 +1,82 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Barbican Keystone notification listener server.
"""
import eventlet
import gettext
import os
import sys
# Oslo messaging notification server uses eventlet.
#
# To have remote debugging, thread module needs to be disabled.
# eventlet.monkey_patch(thread=False)
eventlet.monkey_patch()
# 'Borrowed' from the Glance project:
# If ../barbican/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'barbican', '__init__.py')):
sys.path.insert(0, possible_topdir)
gettext.install('barbican', unicode=1)
from barbican.common import config
from barbican.openstack.common import log
from barbican.openstack.common import service
from barbican import queue
from barbican.queue import keystone_listener
from oslo.config import cfg
def fail(returncode, e):
sys.stderr.write("ERROR: {0}\n".format(e))
sys.exit(returncode)
if __name__ == '__main__':
try:
config.parse_args()
config.setup_remote_pydev_debug()
# Import and configure logging.
log.setup('barbican')
LOG = log.getLogger(__name__)
LOG.info("Booting up Barbican Keystone listener node...")
# Queuing initialization
CONF = cfg.CONF
queue.init(CONF)
if getattr(getattr(CONF, queue.KS_NOTIFICATIONS_GRP_NAME), 'enable'):
service.launch(
keystone_listener.MessageServer(CONF)
).wait()
else:
LOG.info("Exiting as Barbican Keystone listener is not enabled...")
except RuntimeError as e:
fail(1, e)

View File

@ -0,0 +1,5 @@
#!/bin/bash
# barbican-keystone-listener - Script to run barbican keystone notification listener.
python bin/barbican-keystone-listener.py

View File

@ -0,0 +1,10 @@
# Barbican Keystone Listener upstart script
# Used in deb build. Keep in sync with etc/init/barbican-keystone-listener.conf
description "Barbican Keystone Event Listeners"
start on runlevel [2345]
stop on runlevel [06]
script
/usr/bin/barbican-keystone-listener
end script

View File

@ -151,6 +151,43 @@ version = '1.1'
# Server name for RPC service
server_name = 'barbican.queue'
# ================= Keystone Notification Options - Application ===============
[keystone_notifications]
# Keystone notification functionality uses transport related configuration
# from barbican common configuration as defined under
# 'Queue Options - oslo.messaging' comments.
# The HA related configuration is also shared with notification server.
# True enables keystone notification listener functionality.
enable = False
# The default exchange under which topics are scoped.
# May be overridden by an exchange name specified in the transport_url option.
control_exchange = 'openstack'
# Keystone notification queue topic name.
# This name needs to match one of values mentioned in Keystone deployment's
# 'notification_topics' configuration e.g.
# notification_topics=notifications, barbican_notifications
# Multiple servers may listen on a topic and messages will be dispatched to one
# of the servers in a round-robin fashion. That's why Barbican service should
# have its own dedicated notification queue so that it receives all of Keystone
# notifications.
topic = 'notifications'
# True enables requeue feature in case of notification processing error.
# Enable this only when underlying transport supports this feature.
allow_requeue = False
# Version of tasks invoked via notifications
version = '1.0'
# Define the number of max threads to be used for notification server
# processing functionality.
thread_pool_size = 10
# ================= Secret Store Plugin ===================
[secretstore]
namespace = barbican.secretstore.plugin

View File

@ -0,0 +1,10 @@
# Barbican Keystone Listener upstart script
# Used in rpm build. Keep in sync with debian/barbican-keystone-listener.upstart
description "Barbican Keystone Event Listeners"
start on runlevel [2345]
stop on runlevel [06]
script
barbican-keystone-listener.py >> /var/log/barbican/barbican_keystone_listener.log 2>&1
end script

View File

@ -25,8 +25,9 @@ Requires: python-six, python-sqlalchemy, python-stevedore
Requires: python-webob
%description
Common files for Barbican Key Management API (barbican-api) and
Barbican Worker (barbican-worker)
Common files for Barbican Key Management API (barbican-api),
Barbican Worker (barbican-worker) and Barbican Keystone Listener
(barbican-keystone-listener)
%prep
%setup -n barbican-%{version} -q
@ -42,7 +43,9 @@ mkdir -p $RPM_BUILD_ROOT/var/l{ib,og}/barbican
install -m 644 etc/barbican/policy.json $RPM_BUILD_ROOT/etc/barbican
install -m 644 etc/init/barbican-api.conf $RPM_BUILD_ROOT/etc/init
install -m 644 etc/init/barbican-worker.conf $RPM_BUILD_ROOT/etc/init
install -m 644 etc/init/barbican-keystone-listener.conf $RPM_BUILD_ROOT/etc/init
install bin/barbican-worker.py $RPM_BUILD_ROOT/usr/bin
install bin/barbican-keystone-listener.py $RPM_BUILD_ROOT/usr/bin
install bin/barbican-db-manage.py $RPM_BUILD_ROOT/usr/bin
install -m 644 -D etc/barbican/barbican* $RPM_BUILD_ROOT/etc/barbican
install -m 644 -D etc/barbican/vassals/*.ini $RPM_BUILD_ROOT/etc/barbican/vassals
@ -121,3 +124,29 @@ if [ $1 -eq 0 ] ; then
/sbin/stop barbican-worker >/dev/null 2>&1 || :
fi
# -------------------------
# Keystone Listener package
# -------------------------
%package -n barbican-keystone-listener
Summary: Barbican Keystone Listener daemon
Requires: barbican-common
%description -n barbican-keystone-listener
Barbican Keystone Listener daemon
%files -n barbican-keystone-listener
%defattr(-,root,root)
%dir /var/lib/barbican
%verify(not md5 size mtime) %attr(0750, barbican,root) /var/log/barbican/barbican-keystone-listener.log
/etc/logrotate.d/barbican-api
%attr(0755,root,root) /usr/bin/barbican-keystone-listener.py
%attr(0755,root,root) /usr/bin/barbican-db-manage.py
%config(noreplace) /etc/init/barbican-keystone-listener.conf
%config(noreplace) /etc/barbican/*
%preun -n barbican-keystone-listener
if [ $1 -eq 0 ] ; then
# Package removal, not upgrade
/sbin/stop barbican-keystone-listener >/dev/null 2>&1 || :
fi

View File

@ -24,6 +24,7 @@ packages =
scripts =
bin/barbican.sh
bin/barbican-worker.py
bin/barbican-keystone-listener.py
bin/barbican-db-manage.py
[entry_points]