diff --git a/monasca_common/kafka/producer.py b/monasca_common/kafka/producer.py index a2fd636e..a389dedb 100644 --- a/monasca_common/kafka/producer.py +++ b/monasca_common/kafka/producer.py @@ -40,22 +40,13 @@ class KafkaProducer(object): """Takes messages and puts them on the supplied kafka topic """ - # Using a key producer to make sure we can distribute messages evenly - # across all partitions. In the kafka-python library, as of version - # 0.9.2, it doesn't support sending message batches for keyed - # producers. Batching writes to kafka is important for performance so - # we have to work around this limitation. Using the _next_partition - # function allows us to get proper distribution and the speed of the - # send_messages function. - if not isinstance(messages, list): messages = [messages] try: if key is None: - key = time.time() * 1000 - partition = self._producer._next_partition(topic, key) - self._producer.send_messages(topic, partition, *messages) + key = int(time.time() * 1000) + self._producer.send_messages(topic, str(key), *messages) except Exception: log.exception('Error publishing to {} topic.'.format(topic)) raise