Merge "Retry publish once on failures"

This commit is contained in:
Zuul 2017-12-14 13:53:10 +00:00 committed by Gerrit Code Review
commit 7dd0a0a7bb
1 changed files with 27 additions and 8 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
# (C) Copyright 2015, 2017 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -47,10 +47,29 @@ class KafkaProducer(object):
if not isinstance(messages, list):
messages = [messages]
try:
if key is None:
key = int(time.time() * 1000)
self._producer.send_messages(topic, str(key), *messages)
except Exception:
log.exception('Error publishing to {} topic.'.format(topic))
raise
first = True
success = False
while not success:
try:
if key is None:
key = int(time.time() * 1000)
self._producer.send_messages(topic, str(key), *messages)
success = True
except Exception:
if first:
# This is a warning because of all the other warning and
# error messages that are logged in this case. This way
# someone looking at the log file can see the retry
log.warn("Failed send on topic {}, clear metadata and retry"
.format(topic))
# If Kafka is running in Kubernetes, the cached metadata
# contains the IP Address of the Kafka pod. If the Kafka
# pod has restarted, the IP Address will have changed
# which would have caused the first publish to fail. So,
# clear the cached metadata and retry the publish
self._kafka.reset_topic_metadata(topic)
first = False
continue
log.exception('Error publishing to {} topic.'.format(topic))
raise