From 66754b1a4aa2efee615eebae4a9da191bc6764a5 Mon Sep 17 00:00:00 2001 From: Witek Bedyk Date: Fri, 6 Sep 2019 12:44:30 +0200 Subject: [PATCH] Avoid value decoding in Kafka delivery report MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In Python 2 Kafka message value gets implicitly converted to unicode in delivery_report function. In case the message value already contains unicode characters the convertion fails with UnicodeDecodeError. This case can be reproduced when trying to run: u'goose: {}'.format('gęś') This commit fixes the debug message and avoids implicit type convertion. Change-Id: Ia9463cd4e9f92cdf18b092abd2cf676aa4966679 Story: 2006503 Task: 36481 --- monasca_common/confluent_kafka/producer.py | 4 ++-- monasca_common/tests/test_confluent_kafka.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/monasca_common/confluent_kafka/producer.py b/monasca_common/confluent_kafka/producer.py index 08a0652f..a3ccc935 100644 --- a/monasca_common/confluent_kafka/producer.py +++ b/monasca_common/confluent_kafka/producer.py @@ -48,10 +48,10 @@ class KafkaProducer(object): """ if err is not None: - log.exception(u'Message delivery failed: {}'.format(err)) + log.exception('Message delivery failed: {}'.format(err)) raise confluent_kafka.KafkaException(err) else: - log.debug(u'Message delivered to {} [{}]: {}'.format( + log.debug('Message delivered to {} [{}]: {}'.format( msg.topic(), msg.partition(), msg.value())) def publish(self, topic, messages, key=None, timeout=2): diff --git a/monasca_common/tests/test_confluent_kafka.py b/monasca_common/tests/test_confluent_kafka.py index d966cd98..8bc60dec 100644 --- a/monasca_common/tests/test_confluent_kafka.py +++ b/monasca_common/tests/test_confluent_kafka.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -94,6 +95,17 @@ class TestConfluentKafkaProducer(base.BaseTestCase): self.prod.delivery_report(None, confluent_kafka.Message) mock_logger.debug.assert_called_once() + @mock.patch('monasca_common.confluent_kafka.producer.log') + @mock.patch('confluent_kafka.Message') + def test_delivery_report_with_unicode(self, mock_message, mock_logger): + mock_message.topic.return_value = 'test_topic' + mock_message.partition.return_value = '1' + mock_message.value.return_value = 'gęś' + self.prod.delivery_report(None, mock_message) + mock_logger.debug.assert_called_once_with('Message delivered to ' + 'test_topic [1]: ' + 'gęś') + class TestConfluentKafkaConsumer(base.BaseTestCase):