From 7c14e2e868de64237eacd2349b4d77059a7f53fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Guillot?= Date: Tue, 15 Nov 2016 17:45:15 -0500 Subject: [PATCH] Migrate to oslo_messaging and oslo_service This modification introduce a breaking change about AMQP queue handling. Now, Almanach use the same terminology as other OpenStack projects. - By default, Almanach listen on the "almanach" topic - The "info" priority is used to receive notifications - The "error" level is used as retry queue - The "critical" level for the dead queue To be compatible with olso_messaging drivers, transport_url must start by "rabbit://" to use the driver kombu. Change-Id: Ia331d68e94c8ce4196b2d5f3b974a8dbdd6016ef --- almanach/api/v1/routes.py | 2 +- almanach/collector/bus_adapter.py | 66 ----- almanach/collector/handlers/base_handler.py | 24 ++ .../collector/handlers/instance_handler.py | 79 +++--- almanach/collector/handlers/volume_handler.py | 110 ++++---- .../collector/handlers/volume_type_handler.py | 32 +++ almanach/collector/main.py | 39 +-- almanach/collector/messaging.py | 35 +++ almanach/collector/notification.py | 82 ++++++ almanach/collector/retry_adapter.py | 133 ---------- almanach/collector/service.py | 39 +++ almanach/core/opts.py | 40 +-- doc/source/index.rst | 7 + etc/almanach/almanach.docker.conf | 31 +-- integration_tests/builders/messages.py | 110 +------- requirements.txt | 6 +- tests/builders/__init__.py | 0 tests/builders/notification.py | 98 +++++++ .../handlers/test_instance_handler.py | 81 ++++-- .../collector/handlers/test_volume_handler.py | 186 ++++++------- .../handlers/test_volume_type_handler.py | 38 +++ tests/collector/test_bus_adapter.py | 251 ------------------ tests/collector/test_messaging.py | 32 +++ tests/collector/test_notification.py | 82 ++++++ tests/collector/test_retry_adapter.py | 49 ---- 25 files changed, 759 insertions(+), 893 deletions(-) delete mode 100644 almanach/collector/bus_adapter.py create mode 100644 almanach/collector/handlers/base_handler.py create mode 100644 almanach/collector/handlers/volume_type_handler.py create mode 100644 almanach/collector/messaging.py create mode 100644 almanach/collector/notification.py delete mode 100644 almanach/collector/retry_adapter.py create mode 100644 almanach/collector/service.py create mode 100644 tests/builders/__init__.py create mode 100644 tests/builders/notification.py create mode 100644 tests/collector/handlers/test_volume_type_handler.py delete mode 100644 tests/collector/test_bus_adapter.py create mode 100644 tests/collector/test_messaging.py create mode 100644 tests/collector/test_notification.py delete mode 100644 tests/collector/test_retry_adapter.py diff --git a/almanach/api/v1/routes.py b/almanach/api/v1/routes.py index 9a76b6e..163ffb0 100644 --- a/almanach/api/v1/routes.py +++ b/almanach/api/v1/routes.py @@ -55,7 +55,7 @@ def to_json(api_call): except exception.InvalidAttributeException as e: LOG.warning(e.get_error_message()) return send_response({"error": e.get_error_message()}, 400) - except exception.AlmanachEntityNotFoundException as e: + except (exception.AlmanachEntityNotFoundException, exception.VolumeTypeNotFoundException) as e: LOG.warning(e.message) return send_response({"error": e.message}, 404) except exception.AlmanachException as e: diff --git a/almanach/collector/bus_adapter.py b/almanach/collector/bus_adapter.py deleted file mode 100644 index 547df11..0000000 --- a/almanach/collector/bus_adapter.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2016 Internap. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import kombu -import six - -from kombu import mixins -from oslo_log import log -from oslo_serialization import jsonutils - -from almanach.collector.handlers import instance_handler -from almanach.collector.handlers import volume_handler - -LOG = log.getLogger(__name__) - - -class BusAdapter(mixins.ConsumerMixin): - def __init__(self, config, controller, connection, retry_adapter): - super(BusAdapter, self).__init__() - self.config = config - self.controller = controller - self.connection = connection - self.retry_adapter = retry_adapter - - def on_message(self, notification, message): - try: - self._process_notification(notification) - except Exception as e: - LOG.warning('Sending notification to retry letter exchange %s', jsonutils.dumps(notification)) - LOG.exception(e) - self.retry_adapter.publish_to_dead_letter(message) - message.ack() - - def _process_notification(self, notification): - if isinstance(notification, six.string_types): - notification = jsonutils.loads(notification) - - event_type = notification.get('event_type') - LOG.info('Received event: %s', event_type) - - instance_handler.InstanceHandler(self.controller).handle_events(event_type, notification) - volume_handler.VolumeHandler(self.controller).handle_events(event_type, notification) - - def get_consumers(self, consumer, channel): - queue = kombu.Queue(self.config.collector.queue, routing_key=self.config.collector.routing_key) - return [consumer( - [queue], - callbacks=[self.on_message], - auto_declare=False)] - - def run(self, _tokens=1): - try: - super(BusAdapter, self).run(_tokens) - except KeyboardInterrupt: - pass diff --git a/almanach/collector/handlers/base_handler.py b/almanach/collector/handlers/base_handler.py new file mode 100644 index 0000000..4571319 --- /dev/null +++ b/almanach/collector/handlers/base_handler.py @@ -0,0 +1,24 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class BaseHandler(object): + + @abc.abstractmethod + def handle_events(self, notification): + pass diff --git a/almanach/collector/handlers/instance_handler.py b/almanach/collector/handlers/instance_handler.py index ec5b96c..1c32b87 100644 --- a/almanach/collector/handlers/instance_handler.py +++ b/almanach/collector/handlers/instance_handler.py @@ -12,58 +12,53 @@ # See the License for the specific language governing permissions and # limitations under the License. +from almanach.collector.handlers import base_handler + + +class InstanceHandler(base_handler.BaseHandler): -class InstanceHandler(object): def __init__(self, controller): + super(InstanceHandler, self).__init__() self.controller = controller - def handle_events(self, event_type, notification): - if event_type == "compute.instance.create.end": - self.on_instance_created(notification) - elif event_type == "compute.instance.delete.end": - self.on_instance_deleted(notification) - elif event_type == "compute.instance.resize.confirm.end": - self.on_instance_resized(notification) - elif event_type == "compute.instance.rebuild.end": - self.on_instance_rebuilt(notification) - - def on_instance_created(self, notification): - payload = notification.get("payload") - metadata = payload.get("metadata") - - if isinstance(metadata, list): - metadata = {} + def handle_events(self, notification): + if notification.event_type == "compute.instance.create.end": + self._on_instance_created(notification) + elif notification.event_type == "compute.instance.delete.end": + self._on_instance_deleted(notification) + elif notification.event_type == "compute.instance.resize.confirm.end": + self._on_instance_resized(notification) + elif notification.event_type == "compute.instance.rebuild.end": + self._on_instance_rebuilt(notification) + def _on_instance_created(self, notification): self.controller.create_instance( - payload.get("instance_id"), - payload.get("tenant_id"), - payload.get("created_at"), - payload.get("instance_type"), - payload.get("image_meta").get("os_type"), - payload.get("image_meta").get("distro"), - payload.get("image_meta").get("version"), - payload.get("hostname"), - metadata + notification.payload.get("instance_id"), + notification.payload.get("tenant_id"), + notification.payload.get("created_at"), + notification.payload.get("instance_type"), + notification.payload.get("image_meta").get("os_type"), + notification.payload.get("image_meta").get("distro"), + notification.payload.get("image_meta").get("version"), + notification.payload.get("hostname"), + notification.payload.get("metadata", {}) ) - def on_instance_deleted(self, notification): - payload = notification.get("payload") - date = payload.get("terminated_at") - instance_id = payload.get("instance_id") + def _on_instance_deleted(self, notification): + date = notification.payload.get("terminated_at") + instance_id = notification.payload.get("instance_id") self.controller.delete_instance(instance_id, date) - def on_instance_resized(self, notification): - payload = notification.get("payload") - date = notification.get("timestamp") - flavor = payload.get("instance_type") - instance_id = payload.get("instance_id") + def _on_instance_resized(self, notification): + date = notification.context.get("timestamp") + flavor = notification.payload.get("instance_type") + instance_id = notification.payload.get("instance_id") self.controller.resize_instance(instance_id, flavor, date) - def on_instance_rebuilt(self, notification): - payload = notification.get("payload") - date = notification.get("timestamp") - instance_id = payload.get("instance_id") - distro = payload.get("image_meta").get("distro") - version = payload.get("image_meta").get("version") - os_type = payload.get("image_meta").get("os_type") + def _on_instance_rebuilt(self, notification): + date = notification.context.get("timestamp") + instance_id = notification.payload.get("instance_id") + distro = notification.payload.get("image_meta").get("distro") + version = notification.payload.get("image_meta").get("version") + os_type = notification.payload.get("image_meta").get("os_type") self.controller.rebuild_instance(instance_id, distro, version, os_type, date) diff --git a/almanach/collector/handlers/volume_handler.py b/almanach/collector/handlers/volume_handler.py index 638b2eb..3140dc0 100644 --- a/almanach/collector/handlers/volume_handler.py +++ b/almanach/collector/handlers/volume_handler.py @@ -12,82 +12,72 @@ # See the License for the specific language governing permissions and # limitations under the License. +from almanach.collector.handlers import base_handler + + +class VolumeHandler(base_handler.BaseHandler): -class VolumeHandler(object): def __init__(self, controller): + super(VolumeHandler, self).__init__() self.controller = controller - def handle_events(self, event_type, notification): - if event_type == "volume.create.end": - self.on_volume_created(notification) - elif event_type == "volume.delete.end": - self.on_volume_deleted(notification) - elif event_type == "volume.resize.end": - self.on_volume_resized(notification) - elif event_type == "volume.attach.end": - self.on_volume_attached(notification) - elif event_type == "volume.detach.end": - self.on_volume_detached(notification) - elif event_type == "volume.update.end": - self.on_volume_renamed(notification) - elif event_type == "volume.exists": - self.on_volume_renamed(notification) - elif event_type == "volume_type.create": - self.on_volume_type_create(notification) + def handle_events(self, notification): + if notification.event_type == "volume.create.end": + self._on_volume_created(notification) + elif notification.event_type == "volume.delete.end": + self._on_volume_deleted(notification) + elif notification.event_type == "volume.resize.end": + self._on_volume_resized(notification) + elif notification.event_type == "volume.attach.end": + self._on_volume_attached(notification) + elif notification.event_type == "volume.detach.end": + self._on_volume_detached(notification) + elif notification.event_type == "volume.update.end": + self._on_volume_renamed(notification) + elif notification.event_type == "volume.exists": + self._on_volume_renamed(notification) - def on_volume_created(self, notification): - payload = notification.get("payload") - date = payload.get("created_at") - project_id = payload.get("tenant_id") - volume_id = payload.get("volume_id") - volume_name = payload.get("display_name") - volume_type = payload.get("volume_type") - volume_size = payload.get("size") + def _on_volume_created(self, notification): + date = notification.payload.get("created_at") + project_id = notification.payload.get("tenant_id") + volume_id = notification.payload.get("volume_id") + volume_name = notification.payload.get("display_name") + volume_type = notification.payload.get("volume_type") + volume_size = notification.payload.get("size") self.controller.create_volume(volume_id, project_id, date, volume_type, volume_size, volume_name) - def on_volume_deleted(self, notification): - payload = notification.get("payload") - volume_id = payload.get("volume_id") - end_date = notification.get("timestamp") + def _on_volume_deleted(self, notification): + volume_id = notification.payload.get("volume_id") + end_date = notification.context.get("timestamp") self.controller.delete_volume(volume_id, end_date) - def on_volume_renamed(self, notification): - payload = notification.get("payload") - volume_id = payload.get("volume_id") - volume_name = payload.get("display_name") + def _on_volume_renamed(self, notification): + volume_id = notification.payload.get("volume_id") + volume_name = notification.payload.get("display_name") self.controller.rename_volume(volume_id, volume_name) - def on_volume_resized(self, notification): - payload = notification.get("payload") - date = notification.get("timestamp") - volume_id = payload.get("volume_id") - volume_size = payload.get("size") + def _on_volume_resized(self, notification): + date = notification.context.get("timestamp") + volume_id = notification.payload.get("volume_id") + volume_size = notification.payload.get("size") self.controller.resize_volume(volume_id, volume_size, date) - def on_volume_attached(self, notification): - payload = notification.get("payload") - volume_id = payload.get("volume_id") - event_date = notification.get("timestamp") - self.controller.attach_volume(volume_id, event_date, self._get_attached_instances(payload)) + def _on_volume_attached(self, notification): + volume_id = notification.payload.get("volume_id") + event_date = notification.context.get("timestamp") + self.controller.attach_volume(volume_id, event_date, self._get_attached_instances(notification)) - def on_volume_detached(self, notification): - payload = notification.get("payload") - volume_id = payload.get("volume_id") - event_date = notification.get("timestamp") - self.controller.detach_volume(volume_id, event_date, self._get_attached_instances(payload)) - - def on_volume_type_create(self, notification): - volume_types = notification.get("payload").get("volume_types") - volume_type_id = volume_types.get("id") - volume_type_name = volume_types.get("name") - self.controller.create_volume_type(volume_type_id, volume_type_name) + def _on_volume_detached(self, notification): + volume_id = notification.payload.get("volume_id") + event_date = notification.context.get("timestamp") + self.controller.detach_volume(volume_id, event_date, self._get_attached_instances(notification)) @staticmethod - def _get_attached_instances(payload): + def _get_attached_instances(notification): instances_ids = [] - if "volume_attachment" in payload: - for instance in payload["volume_attachment"]: + if "volume_attachment" in notification.payload: + for instance in notification.payload["volume_attachment"]: instances_ids.append(instance.get("instance_uuid")) - elif payload.get("instance_uuid") is not None: - instances_ids.append(payload.get("instance_uuid")) + elif notification.payload.get("instance_uuid") is not None: + instances_ids.append(notification.payload.get("instance_uuid")) return instances_ids diff --git a/almanach/collector/handlers/volume_type_handler.py b/almanach/collector/handlers/volume_type_handler.py new file mode 100644 index 0000000..57edd37 --- /dev/null +++ b/almanach/collector/handlers/volume_type_handler.py @@ -0,0 +1,32 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from almanach.collector.handlers import base_handler + + +class VolumeTypeHandler(base_handler.BaseHandler): + + def __init__(self, controller): + super(VolumeTypeHandler, self).__init__() + self.controller = controller + + def handle_events(self, notification): + if notification.event_type == "volume_type.create": + self._on_volume_type_created(notification) + + def _on_volume_type_created(self, notification): + volume_types = notification.payload.get("volume_types") + volume_type_id = volume_types.get("id") + volume_type_name = volume_types.get("name") + self.controller.create_volume_type(volume_type_id, volume_type_name) diff --git a/almanach/collector/main.py b/almanach/collector/main.py index b2b9971..79c7fb2 100644 --- a/almanach/collector/main.py +++ b/almanach/collector/main.py @@ -12,12 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import kombu from oslo_log import log +from oslo_service import service import sys -from almanach.collector import bus_adapter -from almanach.collector import retry_adapter +from almanach.collector.handlers import instance_handler +from almanach.collector.handlers import volume_handler +from almanach.collector.handlers import volume_type_handler +from almanach.collector import messaging +from almanach.collector import notification +from almanach.collector import service as collector_service from almanach.core import controller from almanach.core import opts from almanach.storage import storage_driver @@ -26,25 +30,24 @@ LOG = log.getLogger(__name__) def main(): - try: - opts.CONF(sys.argv[1:]) - config = opts.CONF + opts.CONF(sys.argv[1:]) + config = opts.CONF - database_driver = storage_driver.StorageDriver(config).get_database_driver() - database_driver.connect() + database_driver = storage_driver.StorageDriver(config).get_database_driver() + database_driver.connect() - application_controller = controller.Controller(config, database_driver) + messaging_factory = messaging.MessagingFactory(config) + app_controller = controller.Controller(config, database_driver) - connection = kombu.Connection(config.collector.url, heartbeat=config.collector.heartbeat) - retry_listener = retry_adapter.RetryAdapter(config, connection) - bus_listener = bus_adapter.BusAdapter(config, application_controller, - connection, retry_listener) + notification_handler = notification.NotificationHandler(config, messaging_factory) + notification_handler.add_event_handler(instance_handler.InstanceHandler(app_controller)) + notification_handler.add_event_handler(volume_handler.VolumeHandler(app_controller)) + notification_handler.add_event_handler(volume_type_handler.VolumeTypeHandler(app_controller)) - LOG.info('Listening for incoming events') - bus_listener.run() - except Exception as e: - LOG.exception(e) - sys.exit(100) + listener = messaging_factory.get_listener(notification_handler) + + launcher = service.launch(config, collector_service.CollectorService(listener)) + launcher.wait() if __name__ == '__main__': main() diff --git a/almanach/collector/messaging.py b/almanach/collector/messaging.py new file mode 100644 index 0000000..eb4f562 --- /dev/null +++ b/almanach/collector/messaging.py @@ -0,0 +1,35 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import oslo_messaging + + +class MessagingFactory(object): + def __init__(self, config): + self.config = config + + def _get_transport(self): + return oslo_messaging.get_notification_transport(self.config, url=self.config.collector.transport_url) + + def get_listener(self, handler): + targets = [ + oslo_messaging.Target(topic=self.config.collector.topic), + ] + + return oslo_messaging.get_notification_listener(self._get_transport(), targets, + endpoints=[handler], executor='threading') + + def get_notifier(self): + return oslo_messaging.Notifier(self._get_transport(), publisher_id='almanach.collector', + topic=self.config.collector.topic, driver='messagingv2') diff --git a/almanach/collector/notification.py b/almanach/collector/notification.py new file mode 100644 index 0000000..ae1667f --- /dev/null +++ b/almanach/collector/notification.py @@ -0,0 +1,82 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +import time + +LOG = logging.getLogger(__name__) + + +class NotificationMessage(object): + RETRY_COUNTER = 'retry_count' + + def __init__(self, event_type, context, payload, metadata): + self.event_type = event_type + self.context = context + self.payload = payload + self.metadata = metadata + + def increment_retry_count(self): + if self.RETRY_COUNTER not in self.context: + self.context[self.RETRY_COUNTER] = 1 + else: + self.context[self.RETRY_COUNTER] += 1 + + def get_retry_counter(self): + if self.RETRY_COUNTER not in self.context: + return 0 + return self.context[self.RETRY_COUNTER] + + +class NotificationHandler(object): + + def __init__(self, config, messaging): + self.config = config + self.messaging = messaging + self.handlers = [] + + def add_event_handler(self, handler): + self.handlers.append(handler) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info('Received event "%s" from "%s" on info queue', event_type, publisher_id) + notification = NotificationMessage(event_type, ctxt, payload, metadata) + + try: + for handler in self.handlers: + handler.handle_events(notification) + except Exception as e: + LOG.warning('Send notification "%s" to error queue', notification.metadata.get('message_id')) + LOG.warning('Notification event_type: %s', notification.event_type) + LOG.warning('Notification context: %s', notification.context) + LOG.warning('Notification payload: %s', notification.payload) + LOG.warning('Notification metadata: %s', notification.metadata) + LOG.exception(e) + self._retry_notification(notification) + + def error(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.warning('Received event "%s" from "%s" on error queue', event_type, publisher_id) + notification = NotificationMessage(event_type, ctxt, payload, metadata) + time.sleep(self.config.collector.retry_delay) + self._retry_notification(notification) + + def _retry_notification(self, notification): + notification.increment_retry_count() + notifier = self.messaging.get_notifier() + + if notification.get_retry_counter() > self.config.collector.max_retries: + LOG.critical('Send notification "%s" to critical queue', notification.metadata.get('message_id')) + notifier.critical(notification.context, notification.event_type, notification.payload) + else: + notifier.error(notification.context, notification.event_type, notification.payload) diff --git a/almanach/collector/retry_adapter.py b/almanach/collector/retry_adapter.py deleted file mode 100644 index 3c6fe3b..0000000 --- a/almanach/collector/retry_adapter.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright 2016 Internap. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import kombu - -from oslo_log import log - -LOG = log.getLogger(__name__) - - -class RetryAdapter(object): - def __init__(self, config, connection, retry_producer=None, dead_producer=None): - self.config = config - self.connection = connection - - retry_exchange = self._configure_retry_exchanges(self.connection) - dead_exchange = self._configure_dead_exchange(self.connection) - - self._retry_producer = retry_producer or kombu.Producer(self.connection, exchange=retry_exchange) - self._dead_producer = dead_producer or kombu.Producer(self.connection, exchange=dead_exchange) - - def publish_to_dead_letter(self, message): - death_count = self._get_rejected_count(message) - LOG.info('Message die %d times', death_count) - - if death_count < self.config.collector.max_retries: - LOG.info('Publishing message to retry queue') - self._publish_message(self._retry_producer, message) - else: - LOG.info('Publishing message to dead letter queue') - self._publish_message(self._dead_producer, message) - - def _configure_retry_exchanges(self, connection): - def declare_queues(): - channel = connection.channel() - - retry_exchange = kombu.Exchange( - name=self.config.collector.retry_exchange, - type='direct', - channel=channel) - - retry_queue = kombu.Queue( - name=self.config.collector.retry_queue, - exchange=retry_exchange, - routing_key=self.config.collector.routing_key, - queue_arguments=self._get_queue_arguments(), - channel=channel) - - main_exchange = kombu.Exchange( - name=self.config.collector.retry_return_exchange, - type='direct', - channel=channel) - - main_queue = kombu.Queue( - name=self.config.collector.queue, - exchange=main_exchange, - durable=False, - routing_key=self.config.collector.routing_key, - channel=channel) - - retry_queue.declare() - main_queue.declare() - - return retry_exchange - - def error_callback(exception, interval): - LOG.error('Failed to declare queues and exchanges, retrying in %d seconds. %r', interval, exception) - - declare_queues = connection.ensure(connection, declare_queues, errback=error_callback, - interval_start=0, interval_step=5, interval_max=30) - return declare_queues() - - def _configure_dead_exchange(self, connection): - def declare_dead_queue(): - channel = connection.channel() - dead_exchange = kombu.Exchange( - name=self.config.collector.dead_exchange, - type='direct', - channel=channel) - - dead_queue = kombu.Queue( - name=self.config.collector.dead_queue, - routing_key=self.config.collector.routing_key, - exchange=dead_exchange, - channel=channel) - - dead_queue.declare() - - return dead_exchange - - def error_callback(exception, interval): - LOG.error('Failed to declare dead queue and exchange, retrying in %d seconds. %r', - interval, exception) - - declare_dead_queue = connection.ensure(connection, declare_dead_queue, errback=error_callback, - interval_start=0, interval_step=5, interval_max=30) - return declare_dead_queue() - - def _get_queue_arguments(self): - return { - "x-message-ttl": self.config.collector.retry_ttl * 1000, - "x-dead-letter-exchange": self.config.collector.retry_return_exchange, - "x-dead-letter-routing-key": self.config.collector.routing_key, - } - - def _get_rejected_count(self, message): - if 'x-death' in message.headers: - return len(message.headers['x-death']) - return 0 - - def _publish_message(self, producer, message): - publish = self.connection.ensure(producer, producer.publish, errback=self._error_callback, - interval_start=0, interval_step=5, interval_max=30) - publish(message.body, - routing_key=message.delivery_info['routing_key'], - headers=message.headers, - content_type=message.content_type, - content_encoding=message.content_encoding) - - def _error_callback(self, exception, interval): - LOG.error('Failed to publish message to dead letter queue, retrying in %d seconds. %r', - interval, exception) diff --git a/almanach/collector/service.py b/almanach/collector/service.py new file mode 100644 index 0000000..af795f2 --- /dev/null +++ b/almanach/collector/service.py @@ -0,0 +1,39 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +from oslo_service import service + +LOG = logging.getLogger(__name__) + + +class CollectorService(service.ServiceBase): + + def __init__(self, listener): + super(CollectorService, self).__init__() + self.listener = listener + + def start(self): + LOG.info('Starting collector service...') + self.listener.start() + + def wait(self): + LOG.info('Waiting...') + + def stop(self): + LOG.info('Graceful shutdown of the collector service...') + self.listener.stop() + + def reset(self): + pass diff --git a/almanach/core/opts.py b/almanach/core/opts.py index bd09aa1..dbff423 100644 --- a/almanach/core/opts.py +++ b/almanach/core/opts.py @@ -39,43 +39,19 @@ api_opts = [ ] collector_opts = [ - cfg.StrOpt('url', + cfg.StrOpt('transport_url', secret=True, - default='amqp://guest:guest@localhost:5672', - help='RabbitMQ connection URL'), - cfg.IntOpt('heartbeat', - default=540, - help='RabbitMQ connection heartbeat'), - cfg.StrOpt('queue', - default='almanach.info', - help='Default queue name'), - cfg.StrOpt('exchange', - default='almanach.info', - help='Default exchange name'), - cfg.StrOpt('routing_key', - default='almanach.info', - help='Default queue routing key'), - cfg.StrOpt('retry_queue', - default='almanach.retry', - help='Retry queue name'), - cfg.StrOpt('retry_exchange', - default='almanach.retry', - help='Retry exchange name'), - cfg.StrOpt('retry_return_exchange', + default='rabbit://guest:guest@localhost:5672', + help='AMQP connection URL'), + cfg.StrOpt('topic', default='almanach', - help='Retry return exchange name'), - cfg.IntOpt('retry_ttl', - default=10, - help='Time to live value of messages sent on the retry queue'), + help='AMQP topic used for OpenStack notifications'), cfg.IntOpt('max_retries', default=3, help='Maximal number of message retries'), - cfg.StrOpt('dead_queue', - default='almanach.dead', - help='Dead queue name'), - cfg.StrOpt('dead_exchange', - default='almanach.dead', - help='Dead exchange name'), + cfg.IntOpt('retry_delay', + default=10, + help='Delay in seconds between retries'), ] auth_opts = [ diff --git a/doc/source/index.rst b/doc/source/index.rst index 3e5d032..acd2238 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -60,6 +60,13 @@ Start the collector: almanach-collector --config-file /etc/almanach/almanach.conf +Signal Handling +--------------- + +- :code:`SIGINT`: force instantaneous termination +- :code:`SIGTERM`: graceful termination of the service +- :code:`SIGHUP`: reload service + Authentication -------------- diff --git a/etc/almanach/almanach.docker.conf b/etc/almanach/almanach.docker.conf index 176b9e6..e06c005 100644 --- a/etc/almanach/almanach.docker.conf +++ b/etc/almanach/almanach.docker.conf @@ -48,36 +48,13 @@ bind_port = 8000 # # RabbitMQ connection URL (string value) -url = amqp://guest:guest@messaging:5672 - -# Default queue name (string value) -#default_queue = almanach.info - -# Default exchange name (string value) -#default_exchange = almanach.info - -# Default queue routing key (string value) -#default_routing_key = almanach.info - -# Retry queue name (string value) -#retry_queue = almanach.retry - -# Retry exchange name (string value) -#retry_exchange = almanach.retry - -# Time to live value of messages sent on the retry queue (integer -# value) -#retry_ttl = 10 +transport_url = rabbit://guest:guest@messaging:5672 # Maximal number of message retries (integer value) -#max_retries = 3 - -# Dead queue name (string value) -#dead_queue = almanach.dead - -# Dead exchange name (string value) -#dead_exchange = almanach.dead +max_retries = 5 +# Delay in seconds between retries +retry_delay = 10 [database] diff --git a/integration_tests/builders/messages.py b/integration_tests/builders/messages.py index b3e4636..d695dbf 100644 --- a/integration_tests/builders/messages.py +++ b/integration_tests/builders/messages.py @@ -102,25 +102,6 @@ def get_volume_delete_end_sample(volume_id=None, tenant_id=None, volume_type=Non return _get_volume_icehouse_payload("volume.delete.end", **kwargs) -def get_volume_attach_icehouse_end_sample(volume_id=None, tenant_id=None, volume_type=None, volume_size=None, - creation_timestamp=None, name=None, attached_to=None): - kwargs = { - "volume_id": volume_id or "64a0ca7f-5f5a-4dc5-a1e1-e04e89eb95ed", - "tenant_id": tenant_id or "46eeb8e44298460899cf4b3554bfe11f", - "display_name": name or "mytenant-0001-myvolume", - "volume_type": volume_type or DEFAULT_VOLUME_TYPE, - "volume_size": volume_size or 50, - "attached_to": attached_to or "e7d44dea-21c1-452c-b50c-cbab0d07d7d3", - "created_at": creation_timestamp if creation_timestamp else datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), - "launched_at": creation_timestamp + timedelta(seconds=1) if creation_timestamp else datetime(2014, 2, 14, 17, - 18, 40, - tzinfo=pytz.utc), - "timestamp": creation_timestamp + timedelta(seconds=1) if creation_timestamp else datetime(2014, 2, 14, 17, 18, - 40, tzinfo=pytz.utc), - } - return _get_volume_icehouse_payload("volume.attach.end", **kwargs) - - def get_volume_attach_kilo_end_sample(volume_id=None, tenant_id=None, volume_type=None, volume_size=None, timestamp=None, name=None, attached_to=None): kwargs = { @@ -166,63 +147,6 @@ def get_volume_resize_end_sample(volume_id=None, tenant_id=None, volume_type=Non return _get_volume_kilo_payload("volume.resize.end", **kwargs) -def get_volume_detach_end_sample(volume_id=None, tenant_id=None, volume_type=None, volume_size=None, - creation_timestamp=None, deletion_timestamp=None, name=None): - kwargs = { - "volume_id": volume_id or "64a0ca7f-5f5a-4dc5-a1e1-e04e89eb95ed", - "tenant_id": tenant_id or "46eeb8e44298460899cf4b3554bfe11f", - "display_name": name or "mytenant-0001-myvolume", - "volume_type": volume_type or DEFAULT_VOLUME_TYPE, - "volume_size": volume_size or 50, - "attached_to": None, - "created_at": creation_timestamp if creation_timestamp else datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), - "launched_at": creation_timestamp + timedelta(seconds=1) if creation_timestamp else datetime(2014, 2, 14, 17, - 18, 40, - tzinfo=pytz.utc), - "timestamp": deletion_timestamp if deletion_timestamp else datetime(2014, 2, 23, 8, 1, 58, tzinfo=pytz.utc), - "status": "detach" - } - return _get_volume_icehouse_payload("volume.detach.end", **kwargs) - - -def get_volume_rename_end_sample(volume_id=None, tenant_id=None, volume_type=None, volume_size=None, - creation_timestamp=None, deletion_timestamp=None, name=None): - kwargs = { - "volume_id": volume_id or "64a0ca7f-5f5a-4dc5-a1e1-e04e89eb95ed", - "tenant_id": tenant_id or "46eeb8e44298460899cf4b3554bfe11f", - "display_name": name or "mytenant-0001-mysnapshot01", - "volume_type": volume_type or DEFAULT_VOLUME_TYPE, - "volume_size": volume_size or 50, - "attached_to": None, - "created_at": creation_timestamp if creation_timestamp else datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), - "launched_at": creation_timestamp + timedelta(seconds=1) if creation_timestamp else datetime(2014, 2, 14, 17, - 18, 40, - tzinfo=pytz.utc), - "timestamp": deletion_timestamp if deletion_timestamp else datetime(2014, 2, 23, 8, 1, 58, tzinfo=pytz.utc), - "status": "detach" - } - return _get_volume_icehouse_payload("volume.update.end", **kwargs) - - -def get_volume_exists_sample(volume_id=None, tenant_id=None, volume_type=None, volume_size=None, - creation_timestamp=None, deletion_timestamp=None, name=None): - kwargs = { - "volume_id": volume_id or "64a0ca7f-5f5a-4dc5-a1e1-e04e89eb95ed", - "tenant_id": tenant_id or "46eeb8e44298460899cf4b3554bfe11f", - "display_name": name or "mytenant-0001-mysnapshot", - "volume_type": volume_type or DEFAULT_VOLUME_TYPE, - "volume_size": volume_size or 50, - "attached_to": None, - "created_at": creation_timestamp if creation_timestamp else datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), - "launched_at": creation_timestamp + timedelta(seconds=1) if creation_timestamp else datetime(2014, 2, 14, 17, - 18, 40, - tzinfo=pytz.utc), - "timestamp": deletion_timestamp if deletion_timestamp else datetime(2014, 2, 23, 8, 1, 58, tzinfo=pytz.utc), - "status": "detach" - } - return _get_volume_icehouse_payload("volume.exists", **kwargs) - - def _format_date(datetime_obj): return datetime_obj.strftime("%Y-%m-%dT%H:%M:%S.%fZ") @@ -247,7 +171,6 @@ def _get_instance_payload(event_type, instance_id=None, tenant_id=None, hostname deleted_at = _format_date(deleted_at) if deleted_at else "" terminated_at = _format_date(terminated_at) if terminated_at else "" state = state or "active" - metadata = metadata if not isinstance(timestamp, datetime): timestamp = dateutil.parser.parse(timestamp) @@ -301,8 +224,9 @@ def _get_instance_payload(event_type, instance_id=None, tenant_id=None, hostname "instance_flavor_id": instance_flavor_id, "metadata": metadata }, - "timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "_context_timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), "updated_at": _format_date(timestamp - timedelta(seconds=10)), + "priority": "INFO", } @@ -325,7 +249,7 @@ def _get_volume_icehouse_payload(event_type, volume_id=None, tenant_id=None, dis return { "event_type": event_type, - "timestamp": launched_at, + "_context_timestamp": launched_at, "publisher_id": "volume.cinder01", "payload": { "instance_uuid": attached_to, @@ -343,7 +267,6 @@ def _get_volume_icehouse_payload(event_type, volume_id=None, tenant_id=None, dis }, "priority": "INFO", "updated_at": _format_date(timestamp - timedelta(seconds=10)), - } @@ -380,7 +303,7 @@ def _get_volume_kilo_payload(event_type, volume_id=None, tenant_id=None, display return { "event_type": event_type, - "timestamp": _format_date(timestamp), + "_context_timestamp": _format_date(timestamp), "publisher_id": "volume.cinder01", "payload": { "status": "in-use", @@ -403,32 +326,9 @@ def _get_volume_kilo_payload(event_type, volume_id=None, tenant_id=None, display } -def get_instance_rebuild_end_sample(): - return _get_instance_payload("compute.instance.rebuild.end") - - -def get_instance_resized_end_sample(): - return _get_instance_payload("compute.instance.resize.confirm.end") - - -def get_volume_update_end_sample(volume_id=None, tenant_id=None, volume_type=None, volume_size=None, - creation_timestamp=None, deletion_timestamp=None, name=None): - kwargs = { - "volume_id": volume_id or "64a0ca7f-5f5a-4dc5-a1e1-e04e89eb95ed", - "tenant_id": tenant_id or "46eeb8e44298460899cf4b3554bfe11f", - "display_name": name or "mytenant-0001-myvolume", - "volume_type": volume_type or DEFAULT_VOLUME_TYPE, - "volume_size": volume_size or 50, - "created_at": creation_timestamp if creation_timestamp else datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), - "launched_at": deletion_timestamp if deletion_timestamp else datetime(2014, 2, 23, 8, 1, 58, tzinfo=pytz.utc), - "timestamp": deletion_timestamp if deletion_timestamp else datetime(2014, 2, 23, 8, 1, 58, tzinfo=pytz.utc), - "status": "deleting" - } - return _get_volume_icehouse_payload("volume.resize.end", **kwargs) - - def get_volume_type_create_sample(volume_type_id, volume_type_name): return { + "priority": "INFO", "event_type": "volume_type.create", "publisher_id": "volume.cinder01", "payload": { diff --git a/requirements.txt b/requirements.txt index d045ec8..082434f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,13 @@ pbr>=1.10.0 Flask==0.10.1 -PyYAML==3.11 jsonpickle==0.7.1 pymongo>=3.0.2,!=3.1 # Apache-2.0 -kombu>=3.0.25 # BSD pytz>=2014.10 voluptuous==0.8.11 python-keystoneclient>=1.6.0 six>=1.9.0 # MIT oslo.serialization>=1.10.0 # Apache-2.0 oslo.config>=3.14.0 # Apache-2.0 -oslo.log>=3.11.0 # Apache-2.0 \ No newline at end of file +oslo.log>=3.11.0 # Apache-2.0 +oslo.messaging>=5.2.0 # Apache-2.0 +oslo.service>=1.10.0 # Apache-2.0 \ No newline at end of file diff --git a/tests/builders/__init__.py b/tests/builders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/builders/notification.py b/tests/builders/notification.py new file mode 100644 index 0000000..9ac7760 --- /dev/null +++ b/tests/builders/notification.py @@ -0,0 +1,98 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +import pytz + +from almanach.collector import notification + + +class NotificationMessageBuilder(object): + + def __init__(self): + self.event_type = None + self.context = dict() + self.payload = dict() + self.metadata = dict() + + def with_event_type(self, event_type): + self.event_type = event_type + return self + + def with_context_value(self, key, value): + self.context[key] = value + return self + + def with_payload_value(self, key, value): + self.payload[key] = value + return self + + def build(self): + return notification.NotificationMessage(self.event_type, self.context, self.payload, self.metadata) + + +class InstanceNotificationBuilder(NotificationMessageBuilder): + + def __init__(self): + super(InstanceNotificationBuilder, self).__init__() + self.payload = { + 'instance_id': 'my_instance_id', + 'tenant_id': 'my_tenant_id', + 'created_at': datetime(2014, 2, 14, 16, 29, 58, tzinfo=pytz.utc), + 'terminated_at': None, + 'instance_type': 'my_flavor_name', + 'image_meta': {}, + 'hostname': 'my_instance_name', + 'metadata': {}, + } + + def with_image_meta(self, key, value): + self.payload['image_meta'][key] = value + return self + + +class VolumeTypeNotificationBuilder(NotificationMessageBuilder): + + def __init__(self): + super(VolumeTypeNotificationBuilder, self).__init__() + self.payload = { + 'volume_types': { + 'name': 'my_volume_type_name', + "qos_specs_id": None, + "deleted": False, + "created_at": "2014-02-14T17:18:35.036186Z", + "extra_specs": {}, + "deleted_at": None, + "id": 'my_volume_type_id', + } + } + + +class VolumeNotificationBuilder(NotificationMessageBuilder): + + def __init__(self): + super(VolumeNotificationBuilder, self).__init__() + self.payload = { + 'created_at': '2015-07-27T16:11:07Z', + 'tenant_id': 'my_tenant_id', + 'volume_id': 'my_volume_id', + 'display_name': 'my_volume_name', + 'volume_type': 'my_volume_type', + 'size': 1, + 'volume_attachment': [], + } + + def with_instance_attached(self, instance_id): + self.payload['volume_attachment'].append({'instance_uuid': instance_id}) + return self diff --git a/tests/collector/handlers/test_instance_handler.py b/tests/collector/handlers/test_instance_handler.py index a3494da..22cabef 100644 --- a/tests/collector/handlers/test_instance_handler.py +++ b/tests/collector/handlers/test_instance_handler.py @@ -12,37 +12,84 @@ # See the License for the specific language governing permissions and # limitations under the License. -from flexmock import flexmock +import mock from almanach.collector.handlers import instance_handler -from integration_tests.builders import messages from tests import base +from tests.builders import notification as builder class InstanceHandlerTest(base.BaseTestCase): def setUp(self): super(InstanceHandlerTest, self).setUp() + self.controller = mock.Mock() + self.instance_handler = instance_handler.InstanceHandler(self.controller) - self.controller = flexmock() - self.retry = flexmock() - self.instance_bus_adapter = instance_handler.InstanceHandler(self.controller) + def test_instance_created(self): + notification = builder.InstanceNotificationBuilder()\ + .with_event_type('compute.instance.create.end')\ + .with_image_meta('os_type', 'linux')\ + .with_image_meta('distro', 'ubuntu')\ + .with_image_meta('version', '16.04')\ + .build() - def test_deleted_instance(self): - notification = messages.get_instance_delete_end_sample() + self.instance_handler.handle_events(notification) - self.controller.should_receive('delete_instance').once() - self.instance_bus_adapter.on_instance_deleted(notification) + self.controller.create_instance.assert_called_once_with( + notification.payload['instance_id'], + notification.payload['tenant_id'], + notification.payload['created_at'], + notification.payload['instance_type'], + notification.payload['image_meta']['os_type'], + notification.payload['image_meta']['distro'], + notification.payload['image_meta']['version'], + notification.payload['hostname'], + notification.payload['metadata'], + ) + + def test_instance_deleted(self): + notification = builder.InstanceNotificationBuilder() \ + .with_event_type('compute.instance.delete.end') \ + .with_payload_value('terminated_at', 'a_date') \ + .build() + + self.instance_handler.handle_events(notification) + + self.controller.delete_instance.assert_called_once_with( + notification.payload['instance_id'], + notification.payload['terminated_at'] + ) def test_instance_resized(self): - notification = messages.get_instance_rebuild_end_sample() + notification = builder.InstanceNotificationBuilder() \ + .with_event_type('compute.instance.resize.confirm.end') \ + .with_context_value('timestamp', 'a_date') \ + .build() - self.controller.should_receive('resize_instance').once() - self.instance_bus_adapter.on_instance_resized(notification) + self.instance_handler.handle_events(notification) + + self.controller.resize_instance.assert_called_once_with( + notification.payload['instance_id'], + notification.payload['instance_type'], + notification.context.get("timestamp") + ) def test_instance_rebuild(self): - notification = messages.get_instance_rebuild_end_sample() - self.controller \ - .should_receive("rebuild_instance") \ - .once() - self.instance_bus_adapter.on_instance_rebuilt(notification) + notification = builder.InstanceNotificationBuilder() \ + .with_event_type('compute.instance.rebuild.end') \ + .with_context_value('timestamp', 'a_date') \ + .with_image_meta('os_type', 'linux') \ + .with_image_meta('distro', 'ubuntu') \ + .with_image_meta('version', '16.04') \ + .build() + + self.instance_handler.handle_events(notification) + + self.controller.rebuild_instance.assert_called_once_with( + notification.payload['instance_id'], + notification.payload['image_meta']['distro'], + notification.payload['image_meta']['version'], + notification.payload['image_meta']['os_type'], + notification.context.get("timestamp") + ) diff --git a/tests/collector/handlers/test_volume_handler.py b/tests/collector/handlers/test_volume_handler.py index 1c57f0e..4befdae 100644 --- a/tests/collector/handlers/test_volume_handler.py +++ b/tests/collector/handlers/test_volume_handler.py @@ -12,119 +12,127 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime -from flexmock import flexmock -import pytz +import mock from almanach.collector.handlers import volume_handler -from integration_tests.builders import messages from tests import base +from tests.builders import notification as builder class VolumeHandlerTest(base.BaseTestCase): def setUp(self): super(VolumeHandlerTest, self).setUp() - - self.controller = flexmock() - self.retry = flexmock() + self.controller = mock.Mock() self.volume_handler = volume_handler.VolumeHandler(self.controller) - def test_deleted_volume(self): - notification = messages.get_volume_delete_end_sample() + def test_volume_created(self): + notification = builder.VolumeNotificationBuilder() \ + .with_event_type('volume.create.end') \ + .build() - self.controller.should_receive('delete_volume').once() - self.volume_handler.on_volume_deleted(notification) + self.volume_handler.handle_events(notification) - def test_resize_volume(self): - notification = messages.get_volume_update_end_sample() - - self.controller.should_receive('resize_volume').once() - self.volume_handler.on_volume_resized(notification) - - def test_updated_volume(self): - notification = messages.get_volume_update_end_sample() - - self.controller.should_receive('resize_volume').once() - self.volume_handler.on_volume_resized(notification) - - def test_attach_volume_with_kilo_payload_and_empty_attachments(self): - notification = messages.get_volume_attach_kilo_end_sample( - volume_id="my-volume-id", - timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), - attached_to=[] + self.controller.create_volume.assert_called_once_with( + notification.payload['volume_id'], + notification.payload['tenant_id'], + notification.payload['created_at'], + notification.payload['volume_type'], + notification.payload['size'], + notification.payload['display_name'], ) - self.controller \ - .should_receive('attach_volume') \ - .with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", []) \ - .once() + def test_volume_deleted(self): + notification = builder.VolumeNotificationBuilder() \ + .with_event_type('volume.delete.end') \ + .with_context_value('timestamp', 'a_date') \ + .build() - self.volume_handler.on_volume_attached(notification) + self.volume_handler.handle_events(notification) - def test_detached_volume(self): - notification = messages.get_volume_detach_end_sample() - - (self.controller - .should_receive('detach_volume') - .once()) - - self.volume_handler.on_volume_detached(notification) - - def test_renamed_volume_with_volume_update_end(self): - notification = messages.get_volume_update_end_sample() - - self.controller \ - .should_receive('rename_volume') \ - .once() - - self.volume_handler.on_volume_renamed(notification) - - def test_renamed_volume_with_volume_exists(self): - notification = messages.get_volume_exists_sample() - - self.controller.should_receive('rename_volume').once() - self.volume_handler.on_volume_renamed(notification) - - def test_attach_volume_with_icehouse_payload(self): - notification = messages.get_volume_attach_icehouse_end_sample( - volume_id="my-volume-id", - creation_timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), attached_to="my-instance-id" + self.controller.delete_volume.assert_called_once_with( + notification.payload['volume_id'], + notification.context['timestamp'], ) - self.controller \ - .should_receive('attach_volume') \ - .with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", ["my-instance-id"]) \ - .once() + def test_volume_renamed(self): + notification = builder.VolumeNotificationBuilder() \ + .with_event_type('volume.update.end') \ + .build() - self.volume_handler.on_volume_attached(notification) + self.volume_handler.handle_events(notification) - def test_attach_volume_with_kilo_payload(self): - notification = messages.get_volume_attach_kilo_end_sample( - volume_id="my-volume-id", - timestamp=datetime(2014, 2, 14, 17, 18, 35, tzinfo=pytz.utc), - attached_to=["I1"] + self.controller.rename_volume.assert_called_once_with( + notification.payload['volume_id'], + notification.payload['display_name'], ) - self.controller \ - .should_receive('attach_volume') \ - .with_args("my-volume-id", "2014-02-14T17:18:36.000000Z", ["I1"]) \ - .once() + def test_volume_renamed_with_exists_event(self): + notification = builder.VolumeNotificationBuilder() \ + .with_event_type('volume.exists') \ + .build() - self.volume_handler.on_volume_attached(notification) + self.volume_handler.handle_events(notification) - def test_get_attached_instances(self): - self.assertEqual(["truc"], self.volume_handler._get_attached_instances({"instance_uuid": "truc"})) - self.assertEqual([], self.volume_handler._get_attached_instances({"instance_uuid": None})) - self.assertEqual([], self.volume_handler._get_attached_instances({})) - self.assertEqual( - ["a", "b"], - self.volume_handler._get_attached_instances( - {"volume_attachment": [{"instance_uuid": "a"}, {"instance_uuid": "b"}]} - ) + self.controller.rename_volume.assert_called_once_with( + notification.payload['volume_id'], + notification.payload['display_name'], ) - self.assertEqual( - ["a"], - self.volume_handler._get_attached_instances({"volume_attachment": [{"instance_uuid": "a"}]}) + + def test_volume_resized(self): + notification = builder.VolumeNotificationBuilder() \ + .with_event_type('volume.resize.end') \ + .with_context_value('timestamp', 'a_date') \ + .build() + + self.volume_handler.handle_events(notification) + + self.controller.resize_volume.assert_called_once_with( + notification.payload['volume_id'], + notification.payload['size'], + notification.context['timestamp'], + ) + + def test_volume_attach_empty(self): + notification = builder.VolumeNotificationBuilder() \ + .with_event_type('volume.attach.end') \ + .with_context_value('timestamp', 'a_date') \ + .build() + + self.volume_handler.handle_events(notification) + + self.controller.attach_volume.assert_called_once_with( + notification.payload['volume_id'], + notification.context['timestamp'], + [] + ) + + def test_volume_attach_with_instances(self): + notification = builder.VolumeNotificationBuilder() \ + .with_event_type('volume.attach.end') \ + .with_context_value('timestamp', 'a_date') \ + .with_instance_attached('instance_id1') \ + .with_instance_attached('instance_id2') \ + .build() + + self.volume_handler.handle_events(notification) + + self.controller.attach_volume.assert_called_once_with( + notification.payload['volume_id'], + notification.context['timestamp'], + ['instance_id1', 'instance_id2'] + ) + + def test_volume_detached(self): + notification = builder.VolumeNotificationBuilder() \ + .with_event_type('volume.detach.end') \ + .with_context_value('timestamp', 'a_date') \ + .build() + + self.volume_handler.handle_events(notification) + + self.controller.detach_volume.assert_called_once_with( + notification.payload['volume_id'], + notification.context['timestamp'], + [] ) - self.assertEqual([], self.volume_handler._get_attached_instances({"volume_attachment": []})) diff --git a/tests/collector/handlers/test_volume_type_handler.py b/tests/collector/handlers/test_volume_type_handler.py new file mode 100644 index 0000000..29221d1 --- /dev/null +++ b/tests/collector/handlers/test_volume_type_handler.py @@ -0,0 +1,38 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from almanach.collector.handlers import volume_type_handler +from tests import base +from tests.builders import notification as builder + + +class VolumeTypeHandlerTest(base.BaseTestCase): + + def setUp(self): + super(VolumeTypeHandlerTest, self).setUp() + self.controller = mock.Mock() + self.volume_type_handler = volume_type_handler.VolumeTypeHandler(self.controller) + + def test_volume_type_created(self): + notification = builder.VolumeTypeNotificationBuilder()\ + .with_event_type('volume_type.create')\ + .build() + + self.volume_type_handler.handle_events(notification) + self.controller.create_volume_type.assert_called_once_with( + notification.payload['volume_types']['id'], + notification.payload['volume_types']['name'] + ) diff --git a/tests/collector/test_bus_adapter.py b/tests/collector/test_bus_adapter.py deleted file mode 100644 index bae7994..0000000 --- a/tests/collector/test_bus_adapter.py +++ /dev/null @@ -1,251 +0,0 @@ -# Copyright 2016 Internap. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime -from flexmock import flexmock - -import pytz - -from almanach.collector import bus_adapter -from almanach.core import exception -from integration_tests.builders import messages -from tests import base - - -class BusAdapterTest(base.BaseTestCase): - - def setUp(self): - super(BusAdapterTest, self).setUp() - self.controller = flexmock() - self.retry = flexmock() - self.bus_adapter = bus_adapter.BusAdapter(self.config, self.controller, None, retry_adapter=self.retry) - - def test_on_message(self): - instance_id = "e7d44dea-21c1-452c-b50c-cbab0d07d7d3" - tenant_id = "0be9215b503b43279ae585d50a33aed8" - instance_type = "myflavor" - timestamp = datetime(2014, 2, 14, 16, 30, 10, tzinfo=pytz.utc) - hostname = "some hostname" - metadata = {"a_metadata.to_filter": "filtered_value", } - - notification = messages.get_instance_create_end_sample(instance_id=instance_id, tenant_id=tenant_id, - flavor_name=instance_type, creation_timestamp=timestamp, - name=hostname, metadata=metadata) - os_type = notification.get("payload").get("image_meta").get("os_type") - distro = notification.get("payload").get("image_meta").get("distro") - version = notification.get("payload").get("image_meta").get("version") - metadata = notification.get("payload").get("metadata") - - self.controller \ - .should_receive("create_instance") \ - .with_args( - instance_id, - tenant_id, - timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - instance_type, - os_type, - distro, - version, - hostname, - metadata - ) \ - .once() - - message = flexmock() - message.should_receive("ack") - - self.bus_adapter.on_message(notification, message) - - def test_on_message_with_empty_metadata(self): - instance_id = "e7d44dea-21c1-452c-b50c-cbab0d07d7d3" - tenant_id = "0be9215b503b43279ae585d50a33aed8" - instance_type = "myflavor" - timestamp = datetime(2014, 2, 14, 16, 30, 10, tzinfo=pytz.utc) - hostname = "some hostname" - - notification = messages.get_instance_create_end_sample(instance_id=instance_id, tenant_id=tenant_id, - flavor_name=instance_type, creation_timestamp=timestamp, - name=hostname, metadata={}) - os_type = notification.get("payload").get("image_meta").get("os_type") - distro = notification.get("payload").get("image_meta").get("distro") - version = notification.get("payload").get("image_meta").get("version") - metadata = notification.get("payload").get("metadata") - - self.controller \ - .should_receive("create_instance") \ - .with_args( - instance_id, tenant_id, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), instance_type, os_type, - distro, version, hostname, metadata - ) \ - .once() - - message = flexmock() - (flexmock(message) - .should_receive("ack")) - - self.bus_adapter.on_message(notification, message) - - def test_on_message_with_delete_instance(self): - notification = messages.get_instance_delete_end_sample() - - self.controller \ - .should_receive("delete_instance") \ - .with_args( - notification['payload']['instance_id'], - notification['payload']['terminated_at'] - ) \ - .once() - - message = flexmock() - (flexmock(message) - .should_receive("ack")) - - self.bus_adapter.on_message(notification, message) - - def test_on_message_with_rebuild_instance(self): - notification = messages.get_instance_rebuild_end_sample() - - self.controller \ - .should_receive("rebuild_instance") \ - .with_args( - notification['payload']['instance_id'], - notification['payload']['image_meta']['distro'], - notification['payload']['image_meta']['version'], - notification['payload']['image_meta']['os_type'], - notification['timestamp'], - ) \ - .once() - - message = flexmock() - (flexmock(message) - .should_receive("ack")) - - self.bus_adapter.on_message(notification, message) - - def test_on_message_with_resize_instance(self): - notification = messages.get_instance_resized_end_sample() - - self.controller \ - .should_receive("resize_instance") \ - .with_args( - notification['payload']['instance_id'], - notification['payload']['instance_type'], - notification['timestamp'], - )\ - .once() - - message = flexmock() - (flexmock(message) - .should_receive("ack")) - - self.bus_adapter.on_message(notification, message) - - def test_on_message_with_resize_volume(self): - notification = messages.get_volume_update_end_sample() - - self.controller \ - .should_receive("resize_volume") \ - .with_args( - notification['payload']['volume_id'], - notification['payload']['size'], - notification['timestamp'], - ) \ - .once() - - message = flexmock() - (flexmock(message) - .should_receive("ack")) - - self.bus_adapter.on_message(notification, message) - - def test_on_message_with_volume(self): - volume_id = "vol_id" - tenant_id = "tenant_id" - timestamp_datetime = datetime(2014, 2, 14, 16, 30, 10, tzinfo=pytz.utc) - volume_type = "SF400" - volume_size = 100000 - some_volume = "volume_name" - - notification = messages.get_volume_create_end_sample(volume_id=volume_id, tenant_id=tenant_id, - volume_type=volume_type, volume_size=volume_size, - creation_timestamp=timestamp_datetime, name=some_volume) - self.controller \ - .should_receive("create_volume") \ - .with_args( - volume_id, tenant_id, timestamp_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), volume_type, - volume_size, some_volume - ) \ - .once() - - message = flexmock() - (flexmock(message) - .should_receive("ack")) - - self.bus_adapter.on_message(notification, message) - - def test_on_message_with_volume_type(self): - volume_type_id = "an_id" - volume_type_name = "a_name" - - notification = messages.get_volume_type_create_sample(volume_type_id=volume_type_id, - volume_type_name=volume_type_name) - - self.controller \ - .should_receive("create_volume_type") \ - .with_args(volume_type_id, volume_type_name) \ - .once() - - message = flexmock() - (flexmock(message) - .should_receive("ack")) - - self.bus_adapter.on_message(notification, message) - - def test_on_message_with_delete_volume(self): - notification = messages.get_volume_delete_end_sample() - - self.controller \ - .should_receive("delete_volume") \ - .with_args( - notification['payload']['volume_id'], - notification['timestamp'], - ) \ - .once() - - message = flexmock() - (flexmock(message) - .should_receive("ack")) - - self.bus_adapter.on_message(notification, message) - - def test_failing_notification_get_retry(self): - notification = messages.get_instance_rebuild_end_sample() - message = flexmock() - - (flexmock(message).should_receive("ack")) - self.controller.should_receive('instance_rebuilded').and_raise(Exception("Foobar")) - self.retry.should_receive('publish_to_dead_letter').with_args(message).once() - - self.bus_adapter.on_message(notification, message) - - def test_that_entity_not_found_exceptions_goes_to_retry_queue(self): - notification = messages.get_instance_delete_end_sample(instance_id="My instance id") - message = flexmock() - - (flexmock(message).should_receive("ack")) - self.controller.should_receive('delete_instance')\ - .and_raise(exception.AlmanachEntityNotFoundException("Entity not found")) - self.retry.should_receive('publish_to_dead_letter').with_args(message).once() - - self.bus_adapter.on_message(notification, message) diff --git a/tests/collector/test_messaging.py b/tests/collector/test_messaging.py new file mode 100644 index 0000000..915a87b --- /dev/null +++ b/tests/collector/test_messaging.py @@ -0,0 +1,32 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import oslo_messaging + +from almanach.collector import messaging +from tests import base + + +class MessagingFactoryTest(base.BaseTestCase): + + def setUp(self): + super(MessagingFactoryTest, self).setUp() + self.factory = messaging.MessagingFactory(self.config) + + def test_get_listener(self): + self.assertIsNotNone(self.factory.get_listener(mock.Mock())) + + def test_get_notifier(self): + self.assertIsInstance(self.factory.get_notifier(), oslo_messaging.Notifier) diff --git a/tests/collector/test_notification.py b/tests/collector/test_notification.py new file mode 100644 index 0000000..099db8d --- /dev/null +++ b/tests/collector/test_notification.py @@ -0,0 +1,82 @@ +# Copyright 2016 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from almanach.collector import notification +from tests import base + + +class NotificationTest(base.BaseTestCase): + + def setUp(self): + super(NotificationTest, self).setUp() + + self.config_fixture.config(retry_delay=0, group='collector') + self.config_fixture.config(max_retries=3, group='collector') + + self.messaging = mock.Mock() + self.notifier = mock.Mock() + self.messaging.get_notifier.return_value = self.notifier + self.handler = notification.NotificationHandler(self.config, self.messaging) + + def test_retry_counter(self): + message = notification.NotificationMessage('event_type', dict(), dict(), dict()) + + self.assertEqual(0, message.get_retry_counter()) + + message.increment_retry_count() + self.assertEqual(1, message.get_retry_counter()) + + message.increment_retry_count() + self.assertEqual(2, message.get_retry_counter()) + + def test_incoming_notifications_are_processed(self): + event_handler1 = mock.Mock() + event_handler2 = mock.Mock() + + self.handler.add_event_handler(event_handler1) + self.handler.add_event_handler(event_handler2) + + self.handler.info(dict(), 'compute.nova01', 'some_event', dict(), dict()) + + event_handler1.handle_events.assert_called_once() + event_handler2.handle_events.assert_called_once() + + def test_failed_notification_are_sent_to_error_queue(self): + event_handler = mock.Mock() + event_handler.handle_events.side_effect = Exception() + + self.handler.add_event_handler(event_handler) + self.handler.info(dict(), 'compute.nova01', 'some_event', dict(), dict()) + + self.notifier.error.assert_called_once() + + def test_notifications_are_sent_again_to_error_queue_if_under_threshold(self): + context = { + notification.NotificationMessage.RETRY_COUNTER: 2 + } + + self.handler.error(context, 'compute.nova01', 'some_event', dict(), dict()) + self.notifier.error.assert_called_once() + self.notifier.critical.assert_not_called() + + def test_notifications_are_sent_to_critical_queue_if_above_threshold(self): + context = { + notification.NotificationMessage.RETRY_COUNTER: 3 + } + + self.handler.error(context, 'compute.nova01', 'some_event', dict(), dict()) + self.notifier.error.assert_not_called() + self.notifier.critical.assert_called_once() diff --git a/tests/collector/test_retry_adapter.py b/tests/collector/test_retry_adapter.py deleted file mode 100644 index b8104d2..0000000 --- a/tests/collector/test_retry_adapter.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright 2016 Internap. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import mock - -from almanach.collector import retry_adapter -from tests import base - - -class BusAdapterTest(base.BaseTestCase): - - def setUp(self): - super(BusAdapterTest, self).setUp() - self.connection = mock.Mock() - self.retry_producer = mock.Mock() - self.dead_producer = mock.Mock() - self.retry_adapter = retry_adapter.RetryAdapter(self.config, self.connection, - self.retry_producer, self.dead_producer) - - def test_message_is_published_to_retry_queue(self): - message = mock.Mock(headers=dict()) - message.delivery_info = dict(routing_key='test') - - self.retry_adapter.publish_to_dead_letter(message) - self.connection.ensure.assert_called_with(self.retry_producer, self.retry_producer.publish, - errback=self.retry_adapter._error_callback, - interval_max=30, interval_start=0, interval_step=5) - - def test_message_is_published_to_dead_queue(self): - message = mock.Mock(headers={'x-death': [0, 1, 2, 3]}) - message.delivery_info = dict(routing_key='test') - - self.retry_adapter.publish_to_dead_letter(message) - self.assertEqual(self.connection.ensure.call_count, 3) - - self.connection.ensure.assert_called_with(self.dead_producer, self.dead_producer.publish, - errback=self.retry_adapter._error_callback, - interval_max=30, interval_start=0, interval_step=5)