From 1cce6824ed48e9016dfd5bb6dd82cb88e2a44cba Mon Sep 17 00:00:00 2001 From: Adrian Czarnecki Date: Wed, 18 Jul 2018 10:30:35 +0200 Subject: [PATCH] Py3:Fix messages encoding for kafka producer Story: 2000975 Task: 23056 Change-Id: Id0273c6c6f54342286d02e326a392e1479a15fe8 --- monasca_common/kafka/producer.py | 12 ++++++++---- monasca_common/tests/test_kafka.py | 6 ++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/monasca_common/kafka/producer.py b/monasca_common/kafka/producer.py index e38f79e9..55ad0d79 100644 --- a/monasca_common/kafka/producer.py +++ b/monasca_common/kafka/producer.py @@ -16,7 +16,7 @@ import logging import time -from six import PY2 +from six import PY3 import monasca_common.kafka_lib.client as kafka_client import monasca_common.kafka_lib.producer as kafka_producer @@ -51,11 +51,15 @@ class KafkaProducer(object): first = True success = False + if key is None: + key = int(time.time() * 1000) + if PY3: + key = bytes(str(key), 'utf-8') + messages = [m.encode("utf-8") for m in messages] + else: + key = str(key) while not success: try: - if key is None: - key = int(time.time() * 1000) - key = str(key) if PY2 else bytes(str(key), 'utf-8') self._producer.send_messages(topic, key, *messages) success = True except Exception: diff --git a/monasca_common/tests/test_kafka.py b/monasca_common/tests/test_kafka.py index f28ec139..8589c523 100644 --- a/monasca_common/tests/test_kafka.py +++ b/monasca_common/tests/test_kafka.py @@ -53,11 +53,12 @@ class TestKafkaProducer(base.BaseTestCase): messages = ['message'] key = "key" expected_key = b"key" + expected_messages = [b'message'] self.monasca_kafka_producer.publish(topic, messages, key) self.producer.send_messages.assert_called_once_with( - topic, expected_key, *messages) + topic, expected_key, *expected_messages) @mock.patch('monasca_common.kafka.producer.time') def test_kafka_producer_publish_one_message_without_key(self, mock_time): @@ -65,12 +66,13 @@ class TestKafkaProducer(base.BaseTestCase): message = 'not_a_list' mock_time.time.return_value = 1 expected_key = b'1000' + expected_message = b'not_a_list' self.monasca_kafka_producer.publish(topic, message) self.assertTrue(mock_time.time.called) self.producer.send_messages.assert_called_once_with( - topic, expected_key, message) + topic, expected_key, expected_message) @mock.patch('monasca_common.kafka.producer.log') def test_kafka_producer_publish_exception(self, mock_logger):