diff --git a/monasca_common/kafka/producer.py b/monasca_common/kafka/producer.py index 236ced95..e38f79e9 100644 --- a/monasca_common/kafka/producer.py +++ b/monasca_common/kafka/producer.py @@ -16,6 +16,8 @@ import logging import time +from six import PY2 + import monasca_common.kafka_lib.client as kafka_client import monasca_common.kafka_lib.producer as kafka_producer @@ -53,7 +55,8 @@ class KafkaProducer(object): try: if key is None: key = int(time.time() * 1000) - self._producer.send_messages(topic, str(key), *messages) + key = str(key) if PY2 else bytes(str(key), 'utf-8') + self._producer.send_messages(topic, key, *messages) success = True except Exception: if first: diff --git a/monasca_common/tests/test_kafka.py b/monasca_common/tests/test_kafka.py index 5f7443b8..f28ec139 100644 --- a/monasca_common/tests/test_kafka.py +++ b/monasca_common/tests/test_kafka.py @@ -51,19 +51,20 @@ class TestKafkaProducer(base.BaseTestCase): def test_kafka_producer_publish(self): topic = FAKE_KAFKA_TOPIC messages = ['message'] - key = 'key' + key = "key" + expected_key = b"key" self.monasca_kafka_producer.publish(topic, messages, key) self.producer.send_messages.assert_called_once_with( - topic, key, *messages) + topic, expected_key, *messages) @mock.patch('monasca_common.kafka.producer.time') def test_kafka_producer_publish_one_message_without_key(self, mock_time): topic = FAKE_KAFKA_TOPIC message = 'not_a_list' mock_time.time.return_value = 1 - expected_key = '1000' + expected_key = b'1000' self.monasca_kafka_producer.publish(topic, message)