From f139eb258db12c10d921560db132263a7cf25b61 Mon Sep 17 00:00:00 2001 From: Ilya Tyaptin Date: Tue, 21 Jun 2016 14:57:17 +0300 Subject: [PATCH] Moving driver to new kafka-python version Currently Kafka driver for an oslo.messaging uses kafka-python==0.9.5 and mostly broken. This package version supports only low level Kafka producer and consumer API which are marked as deprecated now [1]. Using of these interfaces bring a big concern to the message processing, because current KafkaConsumer has not any consuming coordination. This fact causes a message duplication for the several consumers of one topic. This behavior is specific to Ceilometer and services which read and process notifications from other services. New version of kafka-python allows to use async thread safe message producers and coordinated consumers [1]. [1] http://kafka-python.readthedocs.io/en/master/changelog.html#feb-15-2016 The driver is currently experimental, python-kafka<1.0.0 API have major issue described above that can't make the oslo.messaging driver works, so we prefer having a working driver with a non-synced dependencies, that the reverse. Co-Authored-By: Mehdi Abaakouk Change-Id: I29862ed7bf56b9d8878fa8e9fb1cbd9d643908a4 --- oslo_messaging/_drivers/impl_kafka.py | 275 ++++++++++-------- oslo_messaging/_drivers/kafka_options.py | 52 ++++ oslo_messaging/opts.py | 2 + .../tests/drivers/test_impl_kafka.py | 170 +++-------- oslo_messaging/tests/functional/test_kafka.py | 72 ----- oslo_messaging/tests/test_opts.py | 3 +- test-requirements.txt | 7 +- tools/tox_install.sh | 7 + tox.ini | 1 + 9 files changed, 263 insertions(+), 326 deletions(-) create mode 100644 oslo_messaging/_drivers/kafka_options.py delete mode 100644 oslo_messaging/tests/functional/test_kafka.py diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index aa060eedc..52e793327 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -11,64 +11,77 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +# Following code fixes 2 issues with kafka-python and +# The current release of eventlet (0.19.0) does not actually remove +# select.poll [1]. Because of kafka-python.selectors34 selects +# PollSelector instead of SelectSelector [2]. PollSelector relies on +# select.poll, which does not work when eventlet/greenlet is used. This +# bug in evenlet is fixed in the master branch [3], but there's no +# release of eventlet that includes this fix at this point. + +import json import threading +import kafka +from kafka.client_async import selectors +import kafka.errors +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import eventletutils +import tenacity + from oslo_messaging._drivers import base from oslo_messaging._drivers import common as driver_common +from oslo_messaging._drivers import kafka_options from oslo_messaging._drivers import pool as driver_pool from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LW from oslo_serialization import jsonutils -import kafka -from kafka.common import KafkaError -from oslo_config import cfg -from oslo_log import log as logging +if eventletutils.is_monkey_patched('select'): + # monkeypatch the vendored SelectSelector._select like eventlet does + # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32 + from eventlet.green import select + selectors.SelectSelector._select = staticmethod(select.select) + # Force to use the select selectors + KAFKA_SELECTOR = selectors.SelectSelector +else: + KAFKA_SELECTOR = selectors.DefaultSelector LOG = logging.getLogger(__name__) -PURPOSE_SEND = 'send' -PURPOSE_LISTEN = 'listen' -kafka_opts = [ - cfg.StrOpt('kafka_default_host', default='localhost', - deprecated_for_removal=True, - deprecated_reason="Replaced by [DEFAULT]/transport_url", - help='Default Kafka broker Host'), - - cfg.PortOpt('kafka_default_port', default=9092, - deprecated_for_removal=True, - deprecated_reason="Replaced by [DEFAULT]/transport_url", - help='Default Kafka broker Port'), - - cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024, - help='Max fetch bytes of Kafka consumer'), - - cfg.IntOpt('kafka_consumer_timeout', default=1.0, - help='Default timeout(s) for Kafka consumers'), - - cfg.IntOpt('pool_size', default=10, - help='Pool Size for Kafka Consumers'), - - cfg.IntOpt('conn_pool_min_size', default=2, - help='The pool size limit for connections expiration policy'), - - cfg.IntOpt('conn_pool_ttl', default=1200, - help='The time-to-live in sec of idle connections in the pool') -] - -CONF = cfg.CONF +def unpack_message(msg): + context = {} + message = None + try: + if msg: + msg = json.loads(msg) + message = driver_common.deserialize_msg(msg) + if 'context' in message: + context = message['context'] + del message['context'] + except ValueError as e: + LOG.info("Invalid format of consumed message: %s" % e) + except Exception: + LOG.warning(_LW("Exception during message unpacking")) + return message, context -def pack_context_with_message(ctxt, msg): +def pack_message(ctxt, msg): """Pack context into msg.""" + if isinstance(ctxt, dict): context_d = ctxt else: context_d = ctxt.to_dict() + msg['context'] = context_d - return {'message': msg, 'context': context_d} + msg = driver_common.serialize_msg(msg) + + return msg def target_to_topic(target, priority=None): @@ -84,18 +97,67 @@ def target_to_topic(target, priority=None): return target.topic + '.' + priority +def retry_on_retriable_kafka_error(exc): + return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable) + + +def with_reconnect(retries=None): + def decorator(func): + @tenacity.retry( + retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error), + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_attempt(retries), + reraise=True + ) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + return wrapper + return decorator + + +class Producer(object): + _producer = None + _servers = None + _lock = threading.Lock() + + @staticmethod + @with_reconnect() + def connect(servers, **kwargs): + return kafka.KafkaProducer( + bootstrap_servers=servers, + selector=KAFKA_SELECTOR, + **kwargs) + + @classmethod + def producer(cls, servers, **kwargs): + with cls._lock: + if not cls._producer or cls._servers != servers: + cls._servers = servers + cls._producer = cls.connect(servers, **kwargs) + return cls._producer + + @classmethod + def cleanup(cls): + with cls._lock: + if cls._producer: + cls._producer.close() + cls._producer = None + + class Connection(object): def __init__(self, conf, url, purpose): + self.client = None driver_conf = conf.oslo_messaging_kafka - + self.batch_size = driver_conf.producer_batch_size + self.linger_ms = driver_conf.producer_batch_timeout * 1000 self.conf = conf - self.kafka_client = None self.producer = None self.consumer = None - self.fetch_messages_max_bytes = driver_conf.kafka_max_fetch_bytes self.consumer_timeout = float(driver_conf.kafka_consumer_timeout) + self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes + self.group_id = driver_conf.consumer_group self.url = url self._parse_url() # TODO(Support for manual/auto_commit functionality) @@ -107,7 +169,6 @@ class Connection(object): def _parse_url(self): driver_conf = self.conf.oslo_messaging_kafka - self.hostaddrs = [] for host in self.url.hosts: @@ -128,82 +189,61 @@ class Connection(object): :param msg: messages for publishing :param retry: the number of retry """ - message = pack_context_with_message(ctxt, msg) + + message = pack_message(ctxt, msg) self._ensure_connection() self._send_and_retry(message, topic, retry) def _send_and_retry(self, message, topic, retry): - current_retry = 0 if not isinstance(message, str): message = jsonutils.dumps(message) - while message is not None: - try: - self._send(message, topic) - message = None - except Exception: - LOG.warning(_LW("Failed to publish a message of topic %s"), - topic) - current_retry += 1 - if retry is not None and current_retry >= retry: - LOG.exception(_LE("Failed to retry to send data " - "with max retry times")) - message = None + retry = retry if retry >= 0 else None - def _send(self, message, topic): - self.producer.send_messages(topic, message) + @with_reconnect(retries=retry) + def _send(topic, message): + self.producer.send(topic, message) + + try: + _send(topic, message) + except Exception: + Producer.cleanup() + LOG.exception(_LE("Failed to send message")) + + @with_reconnect() + def _poll_messages(self, timeout): + return self.consumer.poll(timeout) def consume(self, timeout=None): """Receive up to 'max_fetch_messages' messages. :param timeout: poll timeout in seconds """ - duration = (self.consumer_timeout if timeout is None else timeout) - timer = driver_common.DecayingTimer(duration=duration) - timer.start() + if self._consume_loop_stopped: + return None - def _raise_timeout(): - LOG.debug('Timed out waiting for Kafka response') - raise driver_common.Timeout() - - poll_timeout = (self.consumer_timeout if timeout is None - else min(timeout, self.consumer_timeout)) - - while True: - if self._consume_loop_stopped: - return - try: - next_timeout = poll_timeout * 1000.0 - # TODO(use configure() method instead) - # Currently KafkaConsumer does not support for - # the case of updating only fetch_max_wait_ms parameter - self.consumer._config['fetch_max_wait_ms'] = next_timeout - messages = list(self.consumer.fetch_messages()) - except Exception as e: - LOG.exception(_LE("Failed to consume messages: %s"), e) - messages = None - - if not messages: - poll_timeout = timer.check_return( - _raise_timeout, maximum=self.consumer_timeout) - continue - - return messages + timeout = timeout if timeout >= 0 else self.consumer_timeout + try: + messages = self._poll_messages(timeout) + except kafka.errors.ConsumerTimeout as e: + raise driver_common.Timeout(e.message) + except Exception: + LOG.exception(_LE("Failed to consume messages")) + messages = None + return messages def stop_consuming(self): self._consume_loop_stopped = True def reset(self): """Reset a connection so it can be used again.""" - if self.consumer: - self.consumer.close() - self.consumer = None + pass def close(self): - if self.kafka_client: - self.kafka_client.close() - self.kafka_client = None if self.producer: - self.producer.stop() + self.producer.close() + self.producer = None + if self.consumer: + self.consumer.close() self.consumer = None def commit(self): @@ -218,25 +258,22 @@ class Connection(object): self.consumer.commit() def _ensure_connection(self): - if self.kafka_client: - return try: - self.kafka_client = kafka.KafkaClient( - self.hostaddrs) - self.producer = kafka.SimpleProducer(self.kafka_client) - except KafkaError as e: - LOG.exception(_LE("Kafka Connection is not available: %s"), e) - self.kafka_client = None + self.producer = Producer.producer(self.hostaddrs, + linger_ms=self.linger_ms, + batch_size=self.batch_size) + except kafka.errors.KafkaError as e: + LOG.exception(_LE("KafkaProducer could not be initialized: %s"), e) + raise + @with_reconnect() def declare_topic_consumer(self, topics, group=None): - self._ensure_connection() - for topic in topics: - self.kafka_client.ensure_topic_exists(topic) self.consumer = kafka.KafkaConsumer( - *topics, group_id=group, + *topics, group_id=(group or self.group_id), bootstrap_servers=self.hostaddrs, - fetch_message_max_bytes=self.fetch_messages_max_bytes) - self._consume_loop_stopped = False + max_partition_fetch_bytes=self.max_fetch_bytes, + selector=KAFKA_SELECTOR + ) class OsloKafkaMessage(base.RpcIncomingMessage): @@ -261,20 +298,26 @@ class KafkaListener(base.PollStyleListener): @base.batch_poll_helper def poll(self, timeout=None): + # TODO(sileht): use batch capability of kafka while not self._stopped.is_set(): if self.incoming_queue: return self.incoming_queue.pop(0) try: messages = self.conn.consume(timeout=timeout) - for msg in messages: - message = msg.value - LOG.debug('poll got message : %s', message) - message = jsonutils.loads(message) - self.incoming_queue.append(OsloKafkaMessage( - ctxt=message['context'], message=message['message'])) + if messages: + self._put_messages_to_queue(messages) except driver_common.Timeout: return None + def _put_messages_to_queue(self, messages): + for topic, records in messages.items(): + if records: + for record in records: + message, context = unpack_message(record.value) + if message: + self.incoming_queue.append( + OsloKafkaMessage(ctxt=context, message=message)) + def stop(self): self._stopped.set() self.conn.stop_consuming() @@ -302,7 +345,7 @@ class KafkaDriver(base.BaseDriver): opt_group = cfg.OptGroup(name='oslo_messaging_kafka', title='Kafka driver options') conf.register_group(opt_group) - conf.register_opts(kafka_opts, group=opt_group) + conf.register_opts(kafka_options.KAFKA_OPTS, group=opt_group) super(KafkaDriver, self).__init__( conf, url, default_exchange, allowed_remote_exmods) @@ -344,7 +387,7 @@ class KafkaDriver(base.BaseDriver): N means N retries :type retry: int """ - with self._get_connection(purpose=PURPOSE_SEND) as conn: + with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn: conn.notify_send(target_to_topic(target), ctxt, message, retry) def listen(self, target, batch_size, batch_timeout): @@ -363,7 +406,7 @@ class KafkaDriver(base.BaseDriver): :param pool: consumer group of Kafka consumers :type pool: string """ - conn = self._get_connection(purpose=PURPOSE_LISTEN) + conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN) topics = set() for target, priority in targets_and_priorities: topics.add(target_to_topic(target, priority)) diff --git a/oslo_messaging/_drivers/kafka_options.py b/oslo_messaging/_drivers/kafka_options.py new file mode 100644 index 000000000..c733a0a42 --- /dev/null +++ b/oslo_messaging/_drivers/kafka_options.py @@ -0,0 +1,52 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +KAFKA_OPTS = [ + cfg.StrOpt('kafka_default_host', default='localhost', + deprecated_for_removal=True, + deprecated_reason="Replaced by [DEFAULT]/transport_url", + help='Default Kafka broker Host'), + + cfg.PortOpt('kafka_default_port', default=9092, + deprecated_for_removal=True, + deprecated_reason="Replaced by [DEFAULT]/transport_url", + help='Default Kafka broker Port'), + + cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024, + help='Max fetch bytes of Kafka consumer'), + + cfg.IntOpt('kafka_consumer_timeout', default=1.0, + help='Default timeout(s) for Kafka consumers'), + + cfg.IntOpt('pool_size', default=10, + help='Pool Size for Kafka Consumers'), + + cfg.IntOpt('conn_pool_min_size', default=2, + help='The pool size limit for connections expiration policy'), + + cfg.IntOpt('conn_pool_ttl', default=1200, + help='The time-to-live in sec of idle connections in the pool'), + + cfg.StrOpt('consumer_group', default="oslo_messaging_consumer", + help='Group id for Kafka consumer. Consumers in one group ' + 'will coordinate message consumption'), + + cfg.FloatOpt('producer_batch_timeout', default=0., + help="Upper bound on the delay for KafkaProducer batching " + "in seconds"), + + cfg.IntOpt('producer_batch_size', default=16384, + help='Size of batch for the producer async send') +] diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 7c373c353..61bb692f8 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -26,6 +26,7 @@ from oslo_messaging._drivers import base as drivers_base from oslo_messaging._drivers import impl_pika from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers.impl_zmq import zmq_options +from oslo_messaging._drivers import kafka_options from oslo_messaging._drivers.pika_driver import pika_connection_factory from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis from oslo_messaging.notify import notifier @@ -53,6 +54,7 @@ _opts = [ pika_connection_factory.pika_opts, impl_pika.pika_pool_opts, impl_pika.message_opts, impl_pika.notification_opts, impl_pika.rpc_opts))), + ('oslo_messaging_kafka', kafka_options.KAFKA_OPTS), ] diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index cc241c213..6262aab4f 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -12,14 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. import kafka -from kafka.common import KafkaError +import kafka.errors import mock -from oslo_serialization import jsonutils import testscenarios -import time import oslo_messaging -from oslo_messaging._drivers import common as driver_common +from oslo_messaging._drivers import common as common_driver from oslo_messaging._drivers import impl_kafka as kafka_driver from oslo_messaging.tests import utils as test_utils @@ -63,7 +61,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): self.addCleanup(transport.cleanup) driver = transport._driver - conn = driver._get_connection(kafka_driver.PURPOSE_SEND) + conn = driver._get_connection(common_driver.PURPOSE_SEND) self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs) @@ -76,6 +74,7 @@ class TestKafkaDriver(test_utils.BaseTestCase): self.messaging_conf.transport_driver = 'kafka' transport = oslo_messaging.get_transport(self.conf) self.driver = transport._driver + self.addCleanup(kafka_driver.Producer.cleanup) def test_send(self): target = oslo_messaging.Target(topic="topic_test") @@ -85,16 +84,40 @@ class TestKafkaDriver(test_utils.BaseTestCase): def test_send_notification(self): target = oslo_messaging.Target(topic="topic_test") - with mock.patch.object( - kafka_driver.Connection, 'notify_send') as fake_send: - self.driver.send_notification(target, {}, {}, None) - self.assertEqual(1, len(fake_send.mock_calls)) + with mock.patch("kafka.KafkaProducer") as fake_producer_class: + fake_producer = fake_producer_class.return_value + fake_producer.send.side_effect = kafka.errors.NoBrokersAvailable + self.driver.send_notification(target, {}, {"payload": ["test_1"]}, + None, retry=3) + self.assertEqual(3, fake_producer.send.call_count) def test_listen(self): target = oslo_messaging.Target(topic="topic_test") self.assertRaises(NotImplementedError, self.driver.listen, target, None, None) + def test_listen_for_notifications(self): + targets_and_priorities = [ + (oslo_messaging.Target(topic="topic_test_1"), "sample"), + ] + expected_topics = ["topic_test_1.sample"] + with mock.patch("kafka.KafkaConsumer") as consumer: + self.driver.listen_for_notifications( + targets_and_priorities, "kafka_test", 1000, 10) + consumer.assert_called_once_with( + *expected_topics, group_id="kafka_test", + bootstrap_servers=['localhost:9092'], + max_partition_fetch_bytes=mock.ANY, + selector=mock.ANY + ) + + def test_cleanup(self): + listeners = [mock.MagicMock(), mock.MagicMock()] + self.driver.listeners.extend(listeners) + self.driver.cleanup() + for listener in listeners: + listener.close.assert_called_once() + class TestKafkaConnection(test_utils.BaseTestCase): @@ -105,134 +128,9 @@ class TestKafkaConnection(test_utils.BaseTestCase): self.driver = transport._driver @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, '_send') + @mock.patch.object(kafka_driver.Connection, '_send_and_retry') def test_notify(self, fake_send, fake_ensure_connection): - conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND) + conn = self.driver._get_connection(common_driver.PURPOSE_SEND) conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, {"fake_text": "fake_message_1"}, 10) self.assertEqual(1, len(fake_send.mock_calls)) - - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, '_send') - def test_notify_with_retry(self, fake_send, fake_ensure_connection): - conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND) - fake_send.side_effect = KafkaError("fake_exception") - conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, - {"fake_text": "fake_message_2"}, 10) - self.assertEqual(10, len(fake_send.mock_calls)) - - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, '_parse_url') - def test_consume(self, fake_parse_url, fake_ensure_connection): - fake_message = { - "context": {"fake": "fake_context_1"}, - "message": {"fake": "fake_message_1"}} - - conn = kafka_driver.Connection( - self.conf, '', kafka_driver.PURPOSE_LISTEN) - - conn.consumer = mock.MagicMock() - conn.consumer.fetch_messages = mock.MagicMock( - return_value=iter([jsonutils.dumps(fake_message)])) - - self.assertEqual(fake_message, jsonutils.loads(conn.consume()[0])) - self.assertEqual(1, len(conn.consumer.fetch_messages.mock_calls)) - - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, '_parse_url') - def test_consume_timeout(self, fake_parse_url, fake_ensure_connection): - deadline = time.time() + 3 - conn = kafka_driver.Connection( - self.conf, '', kafka_driver.PURPOSE_LISTEN) - - conn.consumer = mock.MagicMock() - conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([])) - - self.assertRaises(driver_common.Timeout, conn.consume, timeout=3) - self.assertEqual(0, int(deadline - time.time())) - - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, '_parse_url') - def test_consume_with_default_timeout( - self, fake_parse_url, fake_ensure_connection): - deadline = time.time() + 1 - conn = kafka_driver.Connection( - self.conf, '', kafka_driver.PURPOSE_LISTEN) - - conn.consumer = mock.MagicMock() - conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([])) - - self.assertRaises(driver_common.Timeout, conn.consume) - self.assertEqual(0, int(deadline - time.time())) - - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, '_parse_url') - def test_consume_timeout_without_consumers( - self, fake_parse_url, fake_ensure_connection): - deadline = time.time() + 3 - conn = kafka_driver.Connection( - self.conf, '', kafka_driver.PURPOSE_LISTEN) - conn.consumer = mock.MagicMock(return_value=None) - - self.assertRaises(driver_common.Timeout, conn.consume, timeout=3) - self.assertEqual(0, int(deadline - time.time())) - - -class TestKafkaListener(test_utils.BaseTestCase): - - def setUp(self): - super(TestKafkaListener, self).setUp() - self.messaging_conf.transport_driver = 'kafka' - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') - def test_create_listener(self, fake_consumer, fake_ensure_connection): - fake_target = oslo_messaging.Target(topic='fake_topic') - fake_targets_and_priorities = [(fake_target, 'info')] - self.driver.listen_for_notifications(fake_targets_and_priorities, None, - None, None) - self.assertEqual(1, len(fake_consumer.mock_calls)) - - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') - def test_converting_targets_to_topics(self, fake_consumer, - fake_ensure_connection): - fake_targets_and_priorities = [ - (oslo_messaging.Target(topic="fake_topic", - exchange="test1"), 'info'), - (oslo_messaging.Target(topic="fake_topic", - exchange="test2"), 'info'), - (oslo_messaging.Target(topic="fake_topic", - exchange="test1"), 'error'), - (oslo_messaging.Target(topic="fake_topic", - exchange="test3"), 'error'), - ] - self.driver.listen_for_notifications(fake_targets_and_priorities, None, - None, None) - self.assertEqual(1, len(fake_consumer.mock_calls)) - fake_consumer.assert_called_once_with(set(['fake_topic.error', - 'fake_topic.info']), - None) - - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') - def test_stop_listener(self, fake_consumer, fake_client): - fake_target = oslo_messaging.Target(topic='fake_topic') - fake_targets_and_priorities = [(fake_target, 'info')] - listener = self.driver.listen_for_notifications( - fake_targets_and_priorities, None, None, None)._poll_style_listener - listener.conn.consume = mock.MagicMock() - listener.conn.consume.return_value = ( - iter([kafka.common.KafkaMessage( - topic='fake_topic', partition=0, offset=0, - key=None, value='{"message": {"fake": "fake_message_1"},' - '"context": {"fake": "fake_context_1"}}')])) - listener.poll() - self.assertEqual(1, len(listener.conn.consume.mock_calls)) - listener.conn.stop_consuming = mock.MagicMock() - listener.stop() - fake_response = listener.poll() - self.assertEqual(1, len(listener.conn.consume.mock_calls)) - self.assertEqual([], fake_response) diff --git a/oslo_messaging/tests/functional/test_kafka.py b/oslo_messaging/tests/functional/test_kafka.py deleted file mode 100644 index 705f99932..000000000 --- a/oslo_messaging/tests/functional/test_kafka.py +++ /dev/null @@ -1,72 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import time - -from oslo_config import cfg - -import oslo_messaging -from oslo_messaging.tests.functional import utils - - -class TestWithRealKafkaBroker(utils.SkipIfNoTransportURL): - def setUp(self): - super(TestWithRealKafkaBroker, self).setUp(conf=cfg.ConfigOpts()) - if not self.url.startswith('kafka://'): - self.skipTest("TRANSPORT_URL is not set to kafka driver") - transport = oslo_messaging.get_transport(self.conf, self.url) - self.driver = transport._driver - - def test_send_and_receive_message(self): - target = oslo_messaging.Target( - topic="fake_topic", exchange='fake_exchange') - targets_and_priorities = [(target, 'fake_info')] - - listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None)._poll_style_listener - fake_context = {"fake_context_key": "fake_context_value"} - fake_message = {"fake_message_key": "fake_message_value"} - self.driver.send_notification( - target, fake_context, fake_message, None) - - received_message = listener.poll()[0] - self.assertEqual(fake_context, received_message.ctxt) - self.assertEqual(fake_message, received_message.message) - - def test_send_and_receive_message_without_exchange(self): - target = oslo_messaging.Target(topic="fake_no_exchange_topic") - targets_and_priorities = [(target, 'fake_info')] - - listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None)._poll_style_listener - fake_context = {"fake_context_key": "fake_context_value"} - fake_message = {"fake_message_key": "fake_message_value"} - self.driver.send_notification( - target, fake_context, fake_message, None) - - received_message = listener.poll()[0] - self.assertEqual(fake_context, received_message.ctxt) - self.assertEqual(fake_message, received_message.message) - - def test_receive_message_from_empty_topic_with_timeout(self): - target = oslo_messaging.Target( - topic="fake_empty_topic", exchange='fake_empty_exchange') - targets_and_priorities = [(target, 'fake_info')] - - listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None)._poll_style_listener - - deadline = time.time() + 3 - received_message = listener.poll(batch_timeout=3) - self.assertEqual(0, int(deadline - time.time())) - self.assertEqual([], received_message) diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index 0e4b1f89e..f7137388c 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -32,7 +32,7 @@ class OptsTestCase(test_utils.BaseTestCase): super(OptsTestCase, self).setUp() def _test_list_opts(self, result): - self.assertEqual(6, len(result)) + self.assertEqual(7, len(result)) groups = [g for (g, l) in result] self.assertIn(None, groups) @@ -41,6 +41,7 @@ class OptsTestCase(test_utils.BaseTestCase): self.assertIn('oslo_messaging_amqp', groups) self.assertIn('oslo_messaging_notifications', groups) self.assertIn('oslo_messaging_rabbit', groups) + self.assertIn('oslo_messaging_kafka', groups) opt_names = [o.name for (g, l) in result for o in l] self.assertIn('rpc_backend', opt_names) diff --git a/test-requirements.txt b/test-requirements.txt index b283a255f..e96097fe3 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -21,7 +21,12 @@ redis>=2.10.0 # MIT pyzmq>=14.3.1 # LGPL+BSD # for test_impl_kafka -kafka-python<1.0.0,>=0.9.5 # Apache-2.0 +# NOTE(sileht) temporary commented since requirements repo cap it to <1.0.0 +# due to monasca project that have some concern with newer version. +# The driver is currently experimental, python-kafka<1.0.0 API have major issue +# that can't make the oslo.messaging driver works, so we prefer having a working +# driver with a non-synced dep, that the reverse +# kafka-python>=1.3.1 # Apache-2.0 # when we can require tox>= 1.4, this can go into tox.ini: # [testenv:cover] diff --git a/tools/tox_install.sh b/tools/tox_install.sh index 43468e450..8572755db 100755 --- a/tools/tox_install.sh +++ b/tools/tox_install.sh @@ -27,4 +27,11 @@ pip install -c$localfile openstack-requirements edit-constraints $localfile -- $CLIENT_NAME pip install -c$localfile -U $* +# NOTE(sileht) temporary overrided since requirements repo cap it to <1.0.0 +# due to monasca project that have some concern with newer version. +# The driver is currently experimental, python-kafka<1.0.0 API have major issue +# that can't make the oslo.messaging driver works, so we prefer having a working +# driver with a non-synced dep, that the reverse +pip install -U 'kafka-python>=1.3.1' + exit $? diff --git a/tox.ini b/tox.ini index 613bb5d8d..1cad49a4c 100644 --- a/tox.ini +++ b/tox.ini @@ -58,6 +58,7 @@ commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args= setenv = {[testenv]setenv} TRANSPORT_DRIVER=kafka + kafka-python>=1.3.1 commands = {toxinidir}/setup-test-env-kafka.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}' [testenv:py27-func-amqp1]