Merge "Add mechanism that send msg again if request fail"

This commit is contained in:
Zuul 2018-09-13 12:23:31 +00:00 committed by Gerrit Code Review
commit 29b2d3036a
1 changed files with 28 additions and 10 deletions

View File

@ -15,6 +15,7 @@
import falcon
from monasca_common.kafka import producer
from monasca_common.kafka_lib.common import FailedPayloadsError
from monasca_common.rest import utils as rest_utils
from oslo_log import log
@ -149,16 +150,33 @@ class EventPublisher(object):
LOG.debug('Publishing %d messages', num_of_msg)
try:
for topic in self._topics:
self._kafka_publisher.publish(
topic,
messages
)
LOG.debug('Sent %d messages to topic %s', num_of_msg, topic)
except Exception as ex:
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
first = True
while True:
try:
for topic in self._topics:
self._kafka_publisher.publish(
topic,
messages
)
LOG.debug('Sent %d messages to topic %s',
num_of_msg, topic)
break
except FailedPayloadsError as ex:
# FailedPayloadsError exception can be cause by connection
# problem, to make sure that is not connection issue
# message is sent again.
LOG.error('Failed to send messages %s', ex)
if first:
LOG.error('Retrying')
first = False
continue
else:
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
except Exception as ex:
LOG.error('Failed to send messages %s', ex)
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
def _check_if_all_messages_was_publish(self, send_count, to_send_count):
"""Executed after publishing to sent metrics.