From a8b68cbc1932045dcb8fbc7aab2a26e0b63bc06e Mon Sep 17 00:00:00 2001 From: Witold Bedyk Date: Tue, 23 Oct 2018 14:12:18 +0200 Subject: [PATCH] Add wrapper for Confluent Kafka async Producer Story: 2003705 Task: 27630 Change-Id: I5981e40ecf62cd123ff1969109a8b4b0b9ec204d --- lower-constraints.txt | 1 + monasca_common/confluent_kafka/__init__.py | 0 monasca_common/confluent_kafka/producer.py | 84 +++++++++++++++++ monasca_common/tests/test_confluent_kafka.py | 94 ++++++++++++++++++++ requirements.txt | 1 + 5 files changed, 180 insertions(+) create mode 100644 monasca_common/confluent_kafka/__init__.py create mode 100644 monasca_common/confluent_kafka/producer.py create mode 100644 monasca_common/tests/test_confluent_kafka.py diff --git a/lower-constraints.txt b/lower-constraints.txt index b8bef50a..05a8d414 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -1,6 +1,7 @@ appdirs==1.3.0 Babel==2.3.4 bandit==1.4.0 +confluent-kafka==0.11.4 coverage==4.0 debtcollector==1.2.0 docutils==0.11 diff --git a/monasca_common/confluent_kafka/__init__.py b/monasca_common/confluent_kafka/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monasca_common/confluent_kafka/producer.py b/monasca_common/confluent_kafka/producer.py new file mode 100644 index 00000000..08a0652f --- /dev/null +++ b/monasca_common/confluent_kafka/producer.py @@ -0,0 +1,84 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import confluent_kafka +from oslo_utils import encodeutils + +log = logging.getLogger(__name__) + + +class KafkaProducer(object): + """Wrapper around asynchronous Kafka Producer""" + + def __init__(self, bootstrap_servers): + """ + Create new Producer wrapper instance. + + :param str bootstrap_servers: Initial list of brokers as a CSV + list of broker host or host:port. + """ + + self._producer = confluent_kafka.Producer({'bootstrap.servers': + bootstrap_servers}) + + @staticmethod + def delivery_report(err, msg): + """ + Callback called once for each produced message to indicate the final + delivery result. Triggered by poll() or flush(). + + :param confluent_kafka.KafkaError err: Information about any error + that occurred whilst producing the message. + :param confluent_kafka.Message msg: Information about the message + produced. + :returns: None + :raises confluent_kafka.KafkaException + """ + + if err is not None: + log.exception(u'Message delivery failed: {}'.format(err)) + raise confluent_kafka.KafkaException(err) + else: + log.debug(u'Message delivered to {} [{}]: {}'.format( + msg.topic(), msg.partition(), msg.value())) + + def publish(self, topic, messages, key=None, timeout=2): + """ + Publish messages to the topic. + + :param str topic: Topic to produce messages to. + :param list(str) messages: List of message payloads. + :param str key: Message key. + :param float timeout: Maximum time to block in seconds. + :returns: Number of messages still in queue. + :rtype int + """ + + if not isinstance(messages, list): + messages = [messages] + + try: + for m in messages: + m = encodeutils.safe_encode(m, incoming='utf-8') + self._producer.produce(topic, m, key, + callback=KafkaProducer.delivery_report) + self._producer.poll(0) + + return self._producer.flush(timeout) + + except (BufferError, confluent_kafka.KafkaException, + NotImplementedError): + log.exception(u'Error publishing to {} topic.'.format(topic)) + raise diff --git a/monasca_common/tests/test_confluent_kafka.py b/monasca_common/tests/test_confluent_kafka.py new file mode 100644 index 00000000..f5701cda --- /dev/null +++ b/monasca_common/tests/test_confluent_kafka.py @@ -0,0 +1,94 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from monasca_common.confluent_kafka import producer + +import confluent_kafka +from oslotest import base + +FAKE_KAFKA_TOPIC = 'topic' +FAKE_KAFKA_URL = 'FAKE_KAFKA_URL' + + +class TestConfluentKafkaProducer(base.BaseTestCase): + + @mock.patch('confluent_kafka.Producer') + def setUp(self, mock_confluent_producer): + super(TestConfluentKafkaProducer, self).setUp() + self.mock_confluent_producer = mock_confluent_producer + self.prod = producer.KafkaProducer(FAKE_KAFKA_TOPIC) + + def tearDown(self): + super(TestConfluentKafkaProducer, self).tearDown() + + def test_kafka_producer_init(self): + expected_config = {'bootstrap.servers': FAKE_KAFKA_TOPIC} + + self.mock_confluent_producer.assert_called_once_with(expected_config) + self.assertEqual(self.mock_confluent_producer.return_value, + self.prod._producer) + + def test_kafka_producer_publish(self): + topic = FAKE_KAFKA_TOPIC + messages = [u'message'] + expected_message = b'message' + + self.prod.publish(topic, messages) + + produce_callback = producer.KafkaProducer.delivery_report + self.prod._producer.produce.assert_called_once_with(topic, + expected_message, + None, + callback=produce_callback) + self.prod._producer.flush.assert_called_once() + + def test_kafka_producer_publish_one_message_with_key(self): + topic = FAKE_KAFKA_TOPIC + one_message = u'message' + key = u'1000' + expected_message = b'message' + + self.prod.publish(topic, one_message, key) + + produce_callback = producer.KafkaProducer.delivery_report + self.prod._producer.produce.assert_called_once_with(topic, + expected_message, + key, + callback=produce_callback) + self.prod._producer.flush.assert_called_once() + + def test_kafka_producer_publish_exception(self): + topic = FAKE_KAFKA_TOPIC + messages = [u'message'] + self.prod._producer.produce.side_effect = \ + confluent_kafka.KafkaException + + self.assertRaises(confluent_kafka.KafkaException, self.prod.publish, + topic, messages) + + @mock.patch('monasca_common.confluent_kafka.producer.log') + @mock.patch('confluent_kafka.Message') + def test_delivery_report_exception(self, mock_message, mock_logger): + self.assertRaises(confluent_kafka.KafkaException, + self.prod.delivery_report, + confluent_kafka.KafkaError, + confluent_kafka.Message) + mock_logger.exception.assert_called_once() + + @mock.patch('monasca_common.confluent_kafka.producer.log') + @mock.patch('confluent_kafka.Message') + def test_delivery_report(self, mock_message, mock_logger): + self.prod.delivery_report(None, confluent_kafka.Message) + mock_logger.debug.assert_called_once() diff --git a/requirements.txt b/requirements.txt index b349f74f..6808390e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ oslo.policy>=1.30.0 # Apache-2.0 pbr!=2.1.0,>=2.0.0 # Apache-2.0 pyparsing>=2.1.0 # MIT ujson>=1.35 # BSD +confluent-kafka>=0.11.4 # Apache-2.0