diff --git a/monasca_events_api/app/common/events_publisher.py b/monasca_events_api/app/common/events_publisher.py index c151657..7e49df8 100644 --- a/monasca_events_api/app/common/events_publisher.py +++ b/monasca_events_api/app/common/events_publisher.py @@ -15,6 +15,7 @@ import falcon from monasca_common.kafka import producer +from monasca_common.kafka_lib.common import FailedPayloadsError from monasca_common.rest import utils as rest_utils from oslo_log import log @@ -149,16 +150,33 @@ class EventPublisher(object): LOG.debug('Publishing %d messages', num_of_msg) - try: - for topic in self._topics: - self._kafka_publisher.publish( - topic, - messages - ) - LOG.debug('Sent %d messages to topic %s', num_of_msg, topic) - except Exception as ex: - raise falcon.HTTPServiceUnavailable('Service unavailable', - str(ex), 60) + first = True + while True: + try: + for topic in self._topics: + self._kafka_publisher.publish( + topic, + messages + ) + LOG.debug('Sent %d messages to topic %s', + num_of_msg, topic) + break + except FailedPayloadsError as ex: + # FailedPayloadsError exception can be cause by connection + # problem, to make sure that is not connection issue + # message is sent again. + LOG.error('Failed to send messages %s', ex) + if first: + LOG.error('Retrying') + first = False + continue + else: + raise falcon.HTTPServiceUnavailable('Service unavailable', + str(ex), 60) + except Exception as ex: + LOG.error('Failed to send messages %s', ex) + raise falcon.HTTPServiceUnavailable('Service unavailable', + str(ex), 60) def _check_if_all_messages_was_publish(self, send_count, to_send_count): """Executed after publishing to sent metrics.