Merge "Convert partition key to str"

This commit is contained in:
Jenkins 2016-01-27 00:20:22 +00:00 committed by Gerrit Code Review
commit c7a064c899
1 changed files with 2 additions and 11 deletions

View File

@ -40,22 +40,13 @@ class KafkaProducer(object):
"""Takes messages and puts them on the supplied kafka topic
"""
# Using a key producer to make sure we can distribute messages evenly
# across all partitions. In the kafka-python library, as of version
# 0.9.2, it doesn't support sending message batches for keyed
# producers. Batching writes to kafka is important for performance so
# we have to work around this limitation. Using the _next_partition
# function allows us to get proper distribution and the speed of the
# send_messages function.
if not isinstance(messages, list):
messages = [messages]
try:
if key is None:
key = time.time() * 1000
partition = self._producer._next_partition(topic, key)
self._producer.send_messages(topic, partition, *messages)
key = int(time.time() * 1000)
self._producer.send_messages(topic, str(key), *messages)
except Exception:
log.exception('Error publishing to {} topic.'.format(topic))
raise