diff --git a/monasca_common/confluent_kafka/consumer.py b/monasca_common/confluent_kafka/consumer.py new file mode 100644 index 00000000..de645635 --- /dev/null +++ b/monasca_common/confluent_kafka/consumer.py @@ -0,0 +1,85 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import logging +import time + +import confluent_kafka + +log = logging.getLogger(__name__) + + +class KafkaConsumer(object): + """Wrapper around high-level Kafka Consumer""" + def __init__(self, bootstrap_servers, group_id, topic, + fetch_min_bytes=1048576, client_id="", + repartition_callback=None, commit_callback=None, + max_commit_interval=30): + """ + Create new high-level Consumer instance. + + :param list(str) bootstrap_servers: A list of host/port pairs to use + for establishing the initial connection to the Kafka cluster. + :param str group_id: A unique string that identifies the consumer group + this consumer belongs to. + :param str topic: Topic to subscribe to. + :param int fetch_min_bytes: The minimum amount of data the server + should return for a fetch request. + :param str client_id: An id string to pass to the server when making + requests. + :param callable repartition_callback: Callback function executed on the + start of a rebalance operation. + :param callable commit_callback: Callback function responsible for + calling the commit() method. + :param int max_commit_interval: Maximum time in seconds between commits. + """ + + consumer_config = {'bootstrap.servers': bootstrap_servers, + 'group.id': group_id, + 'fetch.min.bytes': fetch_min_bytes, + 'client.id': client_id, + 'enable.auto.commit': False, + 'default.topic.config': + {'auto.offset.reset': 'earliest'} + } + self._commit_callback = commit_callback + self._max_commit_interval = max_commit_interval + self._consumer = confluent_kafka.Consumer(consumer_config) + self._consumer.subscribe([topic], on_revoke=repartition_callback) + self._last_commit = None + + def __iter__(self): + self._last_commit = datetime.datetime.now() + + while True: + message = self._consumer.poll(timeout=5) + + if message is None: + time.sleep(0.1) + continue + elif not message.error(): + yield message + else: + log.error("Kafka error: %s", message.error().str()) + raise confluent_kafka.KafkaException(message.error()) + + if self._commit_callback: + time_now = datetime.datetime.now() + time_delta = time_now - self._last_commit + if time_delta.total_seconds() > self._max_commit_interval: + self._commit_callback() + + def commit(self): + self._last_commit = datetime.datetime.now() + self._consumer.commit() diff --git a/monasca_common/tests/test_confluent_kafka.py b/monasca_common/tests/test_confluent_kafka.py index f5701cda..69d47ab1 100644 --- a/monasca_common/tests/test_confluent_kafka.py +++ b/monasca_common/tests/test_confluent_kafka.py @@ -13,6 +13,7 @@ import mock +from monasca_common.confluent_kafka import consumer from monasca_common.confluent_kafka import producer import confluent_kafka @@ -28,13 +29,13 @@ class TestConfluentKafkaProducer(base.BaseTestCase): def setUp(self, mock_confluent_producer): super(TestConfluentKafkaProducer, self).setUp() self.mock_confluent_producer = mock_confluent_producer - self.prod = producer.KafkaProducer(FAKE_KAFKA_TOPIC) + self.prod = producer.KafkaProducer(FAKE_KAFKA_URL) def tearDown(self): super(TestConfluentKafkaProducer, self).tearDown() def test_kafka_producer_init(self): - expected_config = {'bootstrap.servers': FAKE_KAFKA_TOPIC} + expected_config = {'bootstrap.servers': FAKE_KAFKA_URL} self.mock_confluent_producer.assert_called_once_with(expected_config) self.assertEqual(self.mock_confluent_producer.return_value, @@ -92,3 +93,85 @@ class TestConfluentKafkaProducer(base.BaseTestCase): def test_delivery_report(self, mock_message, mock_logger): self.prod.delivery_report(None, confluent_kafka.Message) mock_logger.debug.assert_called_once() + + +class TestConfluentKafkaConsumer(base.BaseTestCase): + + @mock.patch('confluent_kafka.Consumer') + def setUp(self, mock_confluent_consumer): + super(TestConfluentKafkaConsumer, self).setUp() + self.mock_confluent_consumer = mock_confluent_consumer + self.consumer = consumer.KafkaConsumer(['fake_server1', + 'fake_server2'], + 'fake_group', + FAKE_KAFKA_TOPIC, 128, + 'test_client', + TestConfluentKafkaConsumer.rep_callback, + TestConfluentKafkaConsumer.com_callback, + 5) + + @staticmethod + def rep_callback(consumer, partitions): + pass + + @staticmethod + def com_callback(consumer, partitions): + pass + + def tearDown(self): + super(TestConfluentKafkaConsumer, self).tearDown() + + def test_kafka_consumer_init(self): + expected_config = {'group.id': 'fake_group', + 'bootstrap.servers': ['fake_server1', + 'fake_server2'], + 'fetch.min.bytes': 128, + 'client.id': 'test_client', + 'enable.auto.commit': False, + 'default.topic.config': + {'auto.offset.reset': 'earliest'} + } + + self.mock_confluent_consumer.assert_called_once_with(expected_config) + self.assertEqual(self.consumer._consumer, + self.mock_confluent_consumer.return_value) + self.assertEqual(self.consumer._commit_callback, + TestConfluentKafkaConsumer.com_callback) + self.assertEqual(self.consumer._max_commit_interval, 5) + self.mock_confluent_consumer.return_value.subscribe \ + .assert_called_once_with([FAKE_KAFKA_TOPIC], + on_revoke=TestConfluentKafkaConsumer.rep_callback) + + @mock.patch('confluent_kafka.Message') + def test_kafka_consumer_iteration(self, mock_kafka_message): + mock_kafka_message.return_value.error.return_value = None + messages = [] + for i in range(5): + m = mock_kafka_message.return_value + m.set_value("message{}".format(i)) + messages.append(m) + self.consumer._consumer.poll.side_effect = messages + for index, message in enumerate(self.consumer): + self.assertEqual(message, messages[index]) + + @mock.patch('confluent_kafka.Message') + @mock.patch('confluent_kafka.KafkaError') + def test_kafka_consumer_poll_exception(self, + mock_kafka_error, + mock_kafka_message): + mock_kafka_error.return_value.str = 'fake error message' + mock_kafka_message.return_value.error.return_value = \ + mock_kafka_error + messages = [mock_kafka_message.return_value] + + self.consumer._consumer.poll.side_effect = messages + try: + list(self.consumer) + except Exception as ex: + self.assertIsInstance(ex, confluent_kafka.KafkaException) + + @mock.patch('datetime.datetime') + def test_kafka_commit(self, mock_datetime): + self.consumer.commit() + mock_datetime.now.assert_called_once() + self.mock_confluent_consumer.return_value.commit.assert_called_once()