monasca-common/monasca_common/kafka/producer.py

76 lines
2.7 KiB
Python

# (C) Copyright 2015, 2017 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import monasca_common.kafka_lib.client as kafka_client
import monasca_common.kafka_lib.producer as kafka_producer
log = logging.getLogger(__name__)
class KafkaProducer(object):
"""Adds messages to a kafka topic
"""
def __init__(self, url):
"""Init
url - kafka connection details
"""
self._kafka = kafka_client.KafkaClient(url)
self._producer = kafka_producer.KeyedProducer(
self._kafka,
async=False,
req_acks=kafka_producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
def publish(self, topic, messages, key=None):
"""Takes messages and puts them on the supplied kafka topic
"""
if not isinstance(messages, list):
messages = [messages]
first = True
success = False
while not success:
try:
if key is None:
key = int(time.time() * 1000)
self._producer.send_messages(topic, str(key), *messages)
success = True
except Exception:
if first:
# This is a warning because of all the other warning and
# error messages that are logged in this case. This way
# someone looking at the log file can see the retry
log.warn("Failed send on topic {}, clear metadata and retry"
.format(topic))
# If Kafka is running in Kubernetes, the cached metadata
# contains the IP Address of the Kafka pod. If the Kafka
# pod has restarted, the IP Address will have changed
# which would have caused the first publish to fail. So,
# clear the cached metadata and retry the publish
self._kafka.reset_topic_metadata(topic)
first = False
continue
log.exception('Error publishing to {} topic.'.format(topic))
raise