Ignore unrelated notifications sent on error queue

- Nova can send notifications directly on the error queue.
- We should handle only notifications reported as error by Almanach.

Change-Id: Ic41e7115dd7d4f99d72d3319af67b204a0b62122
This commit is contained in:
Frédéric Guillot 2017-01-18 13:40:55 -05:00
parent 0f11fff0a9
commit f66cc7ef44
2 changed files with 27 additions and 12 deletions

View File

@ -28,15 +28,18 @@ class NotificationMessage(object):
self.metadata = metadata self.metadata = metadata
def increment_retry_count(self): def increment_retry_count(self):
if self.RETRY_COUNTER not in self.context: if self.has_counter():
self.context[self.RETRY_COUNTER] = 1
else:
self.context[self.RETRY_COUNTER] += 1 self.context[self.RETRY_COUNTER] += 1
else:
self.context[self.RETRY_COUNTER] = 1
def get_retry_counter(self): def get_retry_counter(self):
if self.RETRY_COUNTER not in self.context: if self.has_counter():
return 0 return self.context[self.RETRY_COUNTER]
return self.context[self.RETRY_COUNTER] return 0
def has_counter(self):
return self.RETRY_COUNTER in self.context
class NotificationHandler(object): class NotificationHandler(object):
@ -50,7 +53,7 @@ class NotificationHandler(object):
self.handlers.append(handler) self.handlers.append(handler)
def info(self, ctxt, publisher_id, event_type, payload, metadata): def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info('Received event "%s" from "%s" on info queue', event_type, publisher_id) LOG.debug('Received event "%s" from "%s" on info queue', event_type, publisher_id)
notification = NotificationMessage(event_type, ctxt, payload, metadata) notification = NotificationMessage(event_type, ctxt, payload, metadata)
try: try:
@ -63,20 +66,27 @@ class NotificationHandler(object):
LOG.warning('Notification payload: %s', notification.payload) LOG.warning('Notification payload: %s', notification.payload)
LOG.warning('Notification metadata: %s', notification.metadata) LOG.warning('Notification metadata: %s', notification.metadata)
LOG.exception(e) LOG.exception(e)
notification.increment_retry_count()
self._retry_notification(notification) self._retry_notification(notification)
def error(self, ctxt, publisher_id, event_type, payload, metadata): def error(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.warning('Received event "%s" from "%s" on error queue', event_type, publisher_id) LOG.warning('Received event "%s" from "%s" on error queue', event_type, publisher_id)
notification = NotificationMessage(event_type, ctxt, payload, metadata) notification = NotificationMessage(event_type, ctxt, payload, metadata)
time.sleep(self.config.collector.retry_delay)
self._retry_notification(notification) if notification.has_counter():
LOG.error('Retry notification "%s" on info queue', event_type)
time.sleep(self.config.collector.retry_delay)
self._retry_notification(notification)
else:
LOG.info('Ignoring notification "%s" sent on error queue', event_type)
def _retry_notification(self, notification): def _retry_notification(self, notification):
notification.increment_retry_count()
notifier = self.messaging.get_notifier() notifier = self.messaging.get_notifier()
if notification.get_retry_counter() > self.config.collector.max_retries: if notification.get_retry_counter() >= self.config.collector.max_retries:
LOG.critical('Send notification "%s" to critical queue', notification.metadata.get('message_id')) LOG.critical('Send notification "%s" (%s) to critical queue',
notification.metadata.get('message_id'),
notification.event_type)
notifier.critical(notification.context, notification.event_type, notification.payload) notifier.critical(notification.context, notification.event_type, notification.payload)
else: else:
notifier.error(notification.context, notification.event_type, notification.payload) notifier.error(notification.context, notification.event_type, notification.payload)

View File

@ -80,3 +80,8 @@ class TestNotification(base.BaseTestCase):
self.handler.error(context, 'compute.nova01', 'some_event', dict(), dict()) self.handler.error(context, 'compute.nova01', 'some_event', dict(), dict())
self.notifier.error.assert_not_called() self.notifier.error.assert_not_called()
self.notifier.critical.assert_called_once() self.notifier.critical.assert_called_once()
def test_unrelated_notifications_are_not_handled_in_error_queue(self):
self.handler.error(dict(), 'compute.nova01', 'some_event', dict(), dict())
self.notifier.error.assert_not_called()
self.notifier.critical.assert_not_called()