diff --git a/almanach/collector/notification.py b/almanach/collector/notification.py index ae1667f..221c2fc 100644 --- a/almanach/collector/notification.py +++ b/almanach/collector/notification.py @@ -28,15 +28,18 @@ class NotificationMessage(object): self.metadata = metadata def increment_retry_count(self): - if self.RETRY_COUNTER not in self.context: - self.context[self.RETRY_COUNTER] = 1 - else: + if self.has_counter(): self.context[self.RETRY_COUNTER] += 1 + else: + self.context[self.RETRY_COUNTER] = 1 def get_retry_counter(self): - if self.RETRY_COUNTER not in self.context: - return 0 - return self.context[self.RETRY_COUNTER] + if self.has_counter(): + return self.context[self.RETRY_COUNTER] + return 0 + + def has_counter(self): + return self.RETRY_COUNTER in self.context class NotificationHandler(object): @@ -50,7 +53,7 @@ class NotificationHandler(object): self.handlers.append(handler) def info(self, ctxt, publisher_id, event_type, payload, metadata): - LOG.info('Received event "%s" from "%s" on info queue', event_type, publisher_id) + LOG.debug('Received event "%s" from "%s" on info queue', event_type, publisher_id) notification = NotificationMessage(event_type, ctxt, payload, metadata) try: @@ -63,20 +66,27 @@ class NotificationHandler(object): LOG.warning('Notification payload: %s', notification.payload) LOG.warning('Notification metadata: %s', notification.metadata) LOG.exception(e) + notification.increment_retry_count() self._retry_notification(notification) def error(self, ctxt, publisher_id, event_type, payload, metadata): LOG.warning('Received event "%s" from "%s" on error queue', event_type, publisher_id) notification = NotificationMessage(event_type, ctxt, payload, metadata) - time.sleep(self.config.collector.retry_delay) - self._retry_notification(notification) + + if notification.has_counter(): + LOG.error('Retry notification "%s" on info queue', event_type) + time.sleep(self.config.collector.retry_delay) + self._retry_notification(notification) + else: + LOG.info('Ignoring notification "%s" sent on error queue', event_type) def _retry_notification(self, notification): - notification.increment_retry_count() notifier = self.messaging.get_notifier() - if notification.get_retry_counter() > self.config.collector.max_retries: - LOG.critical('Send notification "%s" to critical queue', notification.metadata.get('message_id')) + if notification.get_retry_counter() >= self.config.collector.max_retries: + LOG.critical('Send notification "%s" (%s) to critical queue', + notification.metadata.get('message_id'), + notification.event_type) notifier.critical(notification.context, notification.event_type, notification.payload) else: notifier.error(notification.context, notification.event_type, notification.payload) diff --git a/almanach/tests/unit/collector/test_notification.py b/almanach/tests/unit/collector/test_notification.py index f3c3530..1ead0d3 100644 --- a/almanach/tests/unit/collector/test_notification.py +++ b/almanach/tests/unit/collector/test_notification.py @@ -80,3 +80,8 @@ class TestNotification(base.BaseTestCase): self.handler.error(context, 'compute.nova01', 'some_event', dict(), dict()) self.notifier.error.assert_not_called() self.notifier.critical.assert_called_once() + + def test_unrelated_notifications_are_not_handled_in_error_queue(self): + self.handler.error(dict(), 'compute.nova01', 'some_event', dict(), dict()) + self.notifier.error.assert_not_called() + self.notifier.critical.assert_not_called()