Py3:Fix messages encoding for kafka producer

Story: 2000975
Task: 23056

Change-Id: Id0273c6c6f54342286d02e326a392e1479a15fe8
This commit is contained in:
Adrian Czarnecki 2018-07-18 10:30:35 +02:00
parent 0a11428eaf
commit 1cce6824ed
2 changed files with 12 additions and 6 deletions

View File

@ -16,7 +16,7 @@
import logging
import time
from six import PY2
from six import PY3
import monasca_common.kafka_lib.client as kafka_client
import monasca_common.kafka_lib.producer as kafka_producer
@ -51,11 +51,15 @@ class KafkaProducer(object):
first = True
success = False
if key is None:
key = int(time.time() * 1000)
if PY3:
key = bytes(str(key), 'utf-8')
messages = [m.encode("utf-8") for m in messages]
else:
key = str(key)
while not success:
try:
if key is None:
key = int(time.time() * 1000)
key = str(key) if PY2 else bytes(str(key), 'utf-8')
self._producer.send_messages(topic, key, *messages)
success = True
except Exception:

View File

@ -53,11 +53,12 @@ class TestKafkaProducer(base.BaseTestCase):
messages = ['message']
key = "key"
expected_key = b"key"
expected_messages = [b'message']
self.monasca_kafka_producer.publish(topic, messages, key)
self.producer.send_messages.assert_called_once_with(
topic, expected_key, *messages)
topic, expected_key, *expected_messages)
@mock.patch('monasca_common.kafka.producer.time')
def test_kafka_producer_publish_one_message_without_key(self, mock_time):
@ -65,12 +66,13 @@ class TestKafkaProducer(base.BaseTestCase):
message = 'not_a_list'
mock_time.time.return_value = 1
expected_key = b'1000'
expected_message = b'not_a_list'
self.monasca_kafka_producer.publish(topic, message)
self.assertTrue(mock_time.time.called)
self.producer.send_messages.assert_called_once_with(
topic, expected_key, message)
topic, expected_key, expected_message)
@mock.patch('monasca_common.kafka.producer.log')
def test_kafka_producer_publish_exception(self, mock_logger):