Add mechanism that send msg again if request fail

Improve _publish method to send message again
if first request return FailedPayloadsError.

Change-Id: I2ceb4e55f30a982bfe55ae8dd280ded5eca70704
This commit is contained in:
Adrian Czarnecki 2018-08-31 12:48:01 +02:00
parent 8e84984c74
commit 735a08637a
1 changed files with 28 additions and 10 deletions

View File

@ -15,6 +15,7 @@
import falcon
from monasca_common.kafka import producer
from monasca_common.kafka_lib.common import FailedPayloadsError
from monasca_common.rest import utils as rest_utils
from oslo_log import log
@ -149,16 +150,33 @@ class EventPublisher(object):
LOG.debug('Publishing %d messages', num_of_msg)
try:
for topic in self._topics:
self._kafka_publisher.publish(
topic,
messages
)
LOG.debug('Sent %d messages to topic %s', num_of_msg, topic)
except Exception as ex:
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
first = True
while True:
try:
for topic in self._topics:
self._kafka_publisher.publish(
topic,
messages
)
LOG.debug('Sent %d messages to topic %s',
num_of_msg, topic)
break
except FailedPayloadsError as ex:
# FailedPayloadsError exception can be cause by connection
# problem, to make sure that is not connection issue
# message is sent again.
LOG.error('Failed to send messages %s', ex)
if first:
LOG.error('Retrying')
first = False
continue
else:
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
except Exception as ex:
LOG.error('Failed to send messages %s', ex)
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
def _check_if_all_messages_was_publish(self, send_count, to_send_count):
"""Executed after publishing to sent metrics.