From 5a842ae15582e4eedfb1b2510eaf4a8997701f58 Mon Sep 17 00:00:00 2001 From: Andy Smith Date: Mon, 17 Sep 2018 13:21:42 -0400 Subject: [PATCH] Switch driver to confluent-kafka client library This patch switches the kafka python client from kafka-python to confluent-kafka due to documented threading issues with the kafka-python consumer and the recommendation to use multiplrocessing. The confluent-kafka client leverages the high performance librdkafka C client and is safe for multiple thread use. This patch: * switches to confluent-kafka library * revises consumer and producer message operations * utilizes event.tpool method for confluent-kafka blocking calls * updates unit tests * adds kafka specific timeouts for functional tests * adds release note Depends-On: Ice374dca539b8ed1b1965b75379bad5140121483 Change-Id: Idfb9fe3700d882c8285c6dc56b0620951178eba2 --- .zuul.yaml | 5 +- bindep.txt | 2 + doc/requirements.txt | 2 +- lower-constraints.txt | 2 +- oslo_messaging/_drivers/impl_kafka.py | 243 ++++++++++-------- .../tests/drivers/test_impl_kafka.py | 59 +++-- .../tests/functional/test_functional.py | 41 ++- oslo_messaging/tests/functional/utils.py | 3 - ...lient-library-change-fe16d5a34550db7f.yaml | 13 + setup-test-env-kafka.sh | 2 +- setup.cfg | 3 +- test-requirements.txt | 3 +- 12 files changed, 221 insertions(+), 157 deletions(-) create mode 100644 releasenotes/notes/kafka-client-library-change-fe16d5a34550db7f.yaml diff --git a/.zuul.yaml b/.zuul.yaml index 0f7bfad41..bf8704f99 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -77,7 +77,7 @@ devstack_plugins: devstack-plugin-amqp1: git://git.openstack.org/openstack/devstack-plugin-amqp1 zuul_copy_output: - '{{ devstack_base_dir }}/logs/qdrouterd.log': logs + '{{ devstack_log_dir }}/qdrouterd.log': logs - job: @@ -102,8 +102,7 @@ devstack_plugins: devstack-plugin-kafka: git://git.openstack.org/openstack/devstack-plugin-kafka zuul_copy_output: - '{{ devstack_base_dir }}/logs/qdrouterd.log': logs - + '{{ devstack_log_dir }}/server.log': logs - job: name: oslo.messaging-src-dsvm-full-kafka-centos-7 diff --git a/bindep.txt b/bindep.txt index 36f4ccd5f..c5ad6cb9e 100644 --- a/bindep.txt +++ b/bindep.txt @@ -32,6 +32,8 @@ swig [platform:rpm amqp1] # kafka dpkg openjdk-8-jdk [platform:dpkg kafka] +librdkafka1 [platform:dpkg kafka] # kafka rpm java-1.8.0-openjdk [platform:rpm kafka] +librdkafka [platform:rpm kafka] diff --git a/doc/requirements.txt b/doc/requirements.txt index 5d126f760..9df2d5b7d 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -8,6 +8,6 @@ reno>=2.5.0 # Apache-2.0 # imported when the source code is parsed for generating documentation: fixtures>=3.0.0 # Apache-2.0/BSD -kafka-python>=1.3.1 # Apache-2.0 +confluent-kafka>=0.11.6 # Apache-2.0 pyngus>=2.2.0 # Apache-2.0 tenacity>=3.2.1 # Apache-2.0 diff --git a/lower-constraints.txt b/lower-constraints.txt index ef882446d..a7e932d57 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -7,6 +7,7 @@ cachetools==2.0.0 cffi==1.7.0 cliff==2.8.0 cmd2==0.8.0 +confluent-kafka==0.11.6 contextlib2==0.4.0 coverage==4.0 debtcollector==1.2.0 @@ -26,7 +27,6 @@ hacking==0.12.0 imagesize==0.7.1 iso8601==0.1.11 Jinja2==2.10 -kafka-python==1.3.1 keystoneauth1==3.4.0 kombu==4.0.0 linecache2==1.0.0 diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 20b97ca77..1871be11f 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -12,54 +12,30 @@ # License for the specific language governing permissions and limitations # under the License. -# Following code fixes 2 issues with kafka-python and -# The current release of eventlet (0.19.0) does not actually remove -# select.poll [1]. Because of kafka-python.selectors34 selects -# PollSelector instead of SelectSelector [2]. PollSelector relies on -# select.poll, which does not work when eventlet/greenlet is used. This -# bug in evenlet is fixed in the master branch [3], but there's no -# release of eventlet that includes this fix at this point. - -import json +import logging import threading -import kafka -from kafka.client_async import selectors -import kafka.errors -from oslo_log import log as logging +import confluent_kafka +from confluent_kafka import KafkaException +from oslo_serialization import jsonutils from oslo_utils import eventletutils -import tenacity +from oslo_utils import importutils from oslo_messaging._drivers import base from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers.kafka_driver import kafka_options -from oslo_messaging._i18n import _LE -from oslo_messaging._i18n import _LW -from oslo_serialization import jsonutils -import logging as l -l.basicConfig(level=l.INFO) -l.getLogger("kafka").setLevel(l.WARN) -l.getLogger("stevedore").setLevel(l.WARN) - -if eventletutils.is_monkey_patched('select'): - # monkeypatch the vendored SelectSelector._select like eventlet does - # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32 - from eventlet.green import select - selectors.SelectSelector._select = staticmethod(select.select) - - # Force to use the select selectors - KAFKA_SELECTOR = selectors.SelectSelector -else: - KAFKA_SELECTOR = selectors.DefaultSelector +if eventletutils.EVENTLET_AVAILABLE: + tpool = importutils.try_import('eventlet.tpool') LOG = logging.getLogger(__name__) def unpack_message(msg): + """Unpack context and msg.""" context = {} message = None - msg = json.loads(msg) + msg = jsonutils.loads(msg) message = driver_common.deserialize_msg(msg) context = message['_context'] del message['_context'] @@ -68,7 +44,6 @@ def unpack_message(msg): def pack_message(ctxt, msg): """Pack context into msg.""" - if isinstance(ctxt, dict): context_d = ctxt else: @@ -97,25 +72,28 @@ def target_to_topic(target, priority=None, vhost=None): return concat(".", [target.topic, priority, vhost]) -def retry_on_retriable_kafka_error(exc): - return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable) +class ConsumerTimeout(KafkaException): + pass -def with_reconnect(retries=None): - def decorator(func): - @tenacity.retry( - retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error), - wait=tenacity.wait_fixed(1), - stop=tenacity.stop_after_attempt(retries), - reraise=True - ) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - return wrapper - return decorator +class AssignedPartition(object): + """This class is used by the ConsumerConnection to track the + assigned partitions. + """ + def __init__(self, topic, partition): + super(AssignedPartition, self).__init__() + self.topic = topic + self.partition = partition + self.skey = '%s %d' % (self.topic, self.partition) + + def to_dict(self): + return {'topic': self.topic, 'partition': self.partition} class Connection(object): + """This is the base class for consumer and producer connections for + transport attributes. + """ def __init__(self, conf, url): @@ -141,7 +119,7 @@ class Connection(object): self.password = host.password else: if self.username != host.username: - LOG.warning(_LW("Different transport usernames detected")) + LOG.warning("Different transport usernames detected") if host.hostname: self.hostaddrs.append("%s:%s" % (host.hostname, host.port)) @@ -152,7 +130,8 @@ class Connection(object): class ConsumerConnection(Connection): - + """This is the class for kafka topic/assigned partition consumer + """ def __init__(self, conf, url): super(ConsumerConnection, self).__init__(conf, url) @@ -160,28 +139,59 @@ class ConsumerConnection(Connection): self.consumer_timeout = self.driver_conf.kafka_consumer_timeout self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes self.group_id = self.driver_conf.consumer_group - self.enable_auto_commit = self.driver_conf.enable_auto_commit + self.use_auto_commit = self.driver_conf.enable_auto_commit self.max_poll_records = self.driver_conf.max_poll_records self._consume_loop_stopped = False + self.assignment_dict = dict() + + def find_assignment(self, topic, partition): + """Find and return existing assignment based on topic and partition""" + skey = '%s %d' % (topic, partition) + return self.assignment_dict.get(skey) + + def on_assign(self, consumer, topic_partitions): + """Rebalance on_assign callback""" + assignment = [AssignedPartition(p.topic, p.partition) + for p in topic_partitions] + self.assignment_dict = {a.skey: a for a in assignment} + for t in topic_partitions: + LOG.debug("Topic %s assigned to partition %d", + t.topic, t.partition) + + def on_revoke(self, consumer, topic_partitions): + """Rebalance on_revoke callback""" + self.assignment_dict = dict() + for t in topic_partitions: + LOG.debug("Topic %s revoked from partition %d", + t.topic, t.partition) - @with_reconnect() def _poll_messages(self, timeout): - messages = self.consumer.poll(timeout * 1000.0) - messages = [record.value - for records in messages.values() if records - for record in records] - if not messages: - # NOTE(sileht): really ? you return payload but no messages... - # simulate timeout to consume message again - raise kafka.errors.ConsumerNoMoreData() + """Consume messages, callbacks and return list of messages""" + msglist = self.consumer.consume(self.max_poll_records, + timeout) - if not self.enable_auto_commit: - self.consumer.commit() + if ((len(self.assignment_dict) == 0) or (len(msglist) == 0)): + raise ConsumerTimeout() + + messages = [] + for message in msglist: + if message is None: + break + a = self.find_assignment(message.topic(), message.partition()) + if a is None: + LOG.warning(("Message for %s received on unassigned " + "partition %d"), + message.topic(), message.partition()) + else: + messages.append(message.value()) + + if not self.use_auto_commit: + self.consumer.commit(asynchronous=False) return messages def consume(self, timeout=None): - """Receive up to 'max_fetch_messages' messages. + """Receive messages. :param timeout: poll timeout in seconds """ @@ -199,12 +209,14 @@ class ConsumerConnection(Connection): if self._consume_loop_stopped: return try: + if eventletutils.is_monkey_patched('thread'): + return tpool.execute(self._poll_messages, poll_timeout) return self._poll_messages(poll_timeout) - except kafka.errors.ConsumerNoMoreData as exc: + except ConsumerTimeout as exc: poll_timeout = timer.check_return( _raise_timeout, exc, maximum=self.consumer_timeout) except Exception: - LOG.exception(_LE("Failed to consume messages")) + LOG.exception("Failed to consume messages") return def stop_consuming(self): @@ -215,21 +227,25 @@ class ConsumerConnection(Connection): self.consumer.close() self.consumer = None - @with_reconnect() def declare_topic_consumer(self, topics, group=None): - self.consumer = kafka.KafkaConsumer( - *topics, group_id=(group or self.group_id), - enable_auto_commit=self.enable_auto_commit, - bootstrap_servers=self.hostaddrs, - max_partition_fetch_bytes=self.max_fetch_bytes, - max_poll_records=self.max_poll_records, - security_protocol=self.security_protocol, - sasl_mechanism=self.sasl_mechanism, - sasl_plain_username=self.username, - sasl_plain_password=self.password, - ssl_cafile=self.ssl_cafile, - selector=KAFKA_SELECTOR - ) + conf = { + 'bootstrap.servers': ",".join(self.hostaddrs), + 'group.id': (group or self.group_id), + 'enable.auto.commit': self.use_auto_commit, + 'max.partition.fetch.bytes': self.max_fetch_bytes, + 'security.protocol': self.security_protocol, + 'sasl.mechanism': self.sasl_mechanism, + 'sasl.username': self.username, + 'sasl.password': self.password, + 'ssl.ca.location': self.ssl_cafile, + 'enable.partition.eof': False, + 'default.topic.config': {'auto.offset.reset': 'latest'} + } + LOG.debug("Subscribing to %s as %s", topics, (group or self.group_id)) + self.consumer = confluent_kafka.Consumer(conf) + self.consumer.subscribe(topics, + on_assign=self.on_assign, + on_revoke=self.on_revoke) class ProducerConnection(Connection): @@ -242,6 +258,20 @@ class ProducerConnection(Connection): self.producer = None self.producer_lock = threading.Lock() + def _produce_message(self, topic, message): + while True: + try: + self.producer.produce(topic, message) + except KafkaException as e: + LOG.error("Produce message failed: %s" % str(e)) + except BufferError: + LOG.debug("Produce message queue full, waiting for deliveries") + self.producer.poll(0.5) + continue + break + + self.producer.poll(0) + def notify_send(self, topic, ctxt, msg, retry): """Send messages to Kafka broker. @@ -254,16 +284,11 @@ class ProducerConnection(Connection): message = pack_message(ctxt, msg) message = jsonutils.dumps(message).encode('utf-8') - @with_reconnect(retries=retry) - def wrapped_with_reconnect(): - self._ensure_producer() - # NOTE(sileht): This returns a future, we can use get() - # if we want to block like other driver - future = self.producer.send(topic, message) - future.get() - try: - wrapped_with_reconnect() + self._ensure_producer() + if eventletutils.is_monkey_patched('thread'): + return tpool.execute(self._produce_message, topic, message) + return self._produce_message(topic, message) except Exception: # NOTE(sileht): if something goes wrong close the producer # connection @@ -276,7 +301,10 @@ class ProducerConnection(Connection): def _close_producer(self): with self.producer_lock: if self.producer: - self.producer.close() + try: + self.producer.flush() + except KafkaException: + LOG.error("Flush error during producer close") self.producer = None def _ensure_producer(self): @@ -285,16 +313,17 @@ class ProducerConnection(Connection): with self.producer_lock: if self.producer: return - self.producer = kafka.KafkaProducer( - bootstrap_servers=self.hostaddrs, - linger_ms=self.linger_ms, - batch_size=self.batch_size, - security_protocol=self.security_protocol, - sasl_mechanism=self.sasl_mechanism, - sasl_plain_username=self.username, - sasl_plain_password=self.password, - ssl_cafile=self.ssl_cafile, - selector=KAFKA_SELECTOR) + conf = { + 'bootstrap.servers': ",".join(self.hostaddrs), + 'linger.ms': self.linger_ms, + 'batch.num.messages': self.batch_size, + 'security.protocol': self.security_protocol, + 'sasl.mechanism': self.sasl_mechanism, + 'sasl.username': self.username, + 'sasl.password': self.password, + 'ssl.ca.location': self.ssl_cafile + } + self.producer = confluent_kafka.Producer(conf) class OsloKafkaMessage(base.RpcIncomingMessage): @@ -303,13 +332,13 @@ class OsloKafkaMessage(base.RpcIncomingMessage): super(OsloKafkaMessage, self).__init__(ctxt, message) def requeue(self): - LOG.warning(_LW("requeue is not supported")) + LOG.warning("requeue is not supported") def reply(self, reply=None, failure=None): - LOG.warning(_LW("reply is not supported")) + LOG.warning("reply is not supported") def heartbeat(self): - LOG.warning(_LW("heartbeat is not supported")) + LOG.warning("heartbeat is not supported") class KafkaListener(base.PollStyleListener): @@ -347,8 +376,9 @@ class KafkaListener(base.PollStyleListener): class KafkaDriver(base.BaseDriver): - """Note: Current implementation of this driver is experimental. - We will have functional and/or integrated testing enabled for this driver. + """Kafka Driver + + Note: Current implementation of this driver is experimental. """ def __init__(self, conf, url, default_exchange=None, @@ -366,6 +396,7 @@ class KafkaDriver(base.BaseDriver): for c in self.listeners: c.close() self.listeners = [] + LOG.info("Kafka messaging driver shutdown") def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, call_monitor_timeout=None, retry=None): @@ -414,9 +445,9 @@ class KafkaDriver(base.BaseDriver): :type pool: string """ conn = ConsumerConnection(self.conf, self._url) - topics = set() + topics = [] for target, priority in targets_and_priorities: - topics.add(target_to_topic(target, priority)) + topics.append(target_to_topic(target, priority)) conn.declare_topic_consumer(topics, pool) diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index dff6176da..80af57651 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -11,8 +11,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import kafka -import kafka.errors from six.moves import mock import testscenarios @@ -77,6 +75,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): self.addCleanup(transport.cleanup) driver = transport._driver + self.assertIsInstance(driver, kafka_driver.KafkaDriver) self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs) self.assertEqual(self.expected['username'], driver.pconn.username) self.assertEqual(self.expected['password'], driver.pconn.password) @@ -101,14 +100,20 @@ class TestKafkaDriver(test_utils.BaseTestCase): def test_send_notification(self): target = oslo_messaging.Target(topic="topic_test") - with mock.patch("kafka.KafkaProducer") as fake_producer_class: - fake_producer = fake_producer_class.return_value - fake_producer.send.side_effect = kafka.errors.NoBrokersAvailable - self.assertRaises(kafka.errors.NoBrokersAvailable, - self.driver.send_notification, - target, {}, {"payload": ["test_1"]}, - None, retry=3) - self.assertEqual(3, fake_producer.send.call_count) + with mock.patch("confluent_kafka.Producer") as producer: + self.driver.send_notification( + target, {}, {"payload": ["test_1"]}, + None, retry=3) + producer.assert_called_once_with({ + 'bootstrap.servers': '', + 'linger.ms': mock.ANY, + 'batch.num.messages': mock.ANY, + 'security.protocol': 'PLAINTEXT', + 'sasl.mechanism': 'PLAIN', + 'sasl.username': mock.ANY, + 'sasl.password': mock.ANY, + 'ssl.ca.location': '' + }) def test_listen(self): target = oslo_messaging.Target(topic="topic_test") @@ -119,23 +124,22 @@ class TestKafkaDriver(test_utils.BaseTestCase): targets_and_priorities = [ (oslo_messaging.Target(topic="topic_test_1"), "sample"), ] - expected_topics = ["topic_test_1.sample"] - with mock.patch("kafka.KafkaConsumer") as consumer: + with mock.patch("confluent_kafka.Consumer") as consumer: self.driver.listen_for_notifications( targets_and_priorities, "kafka_test", 1000, 10) - consumer.assert_called_once_with( - *expected_topics, group_id="kafka_test", - enable_auto_commit=mock.ANY, - bootstrap_servers=[], - max_partition_fetch_bytes=mock.ANY, - max_poll_records=mock.ANY, - security_protocol='PLAINTEXT', - sasl_mechanism='PLAIN', - sasl_plain_username=mock.ANY, - sasl_plain_password=mock.ANY, - ssl_cafile='', - selector=mock.ANY - ) + consumer.assert_called_once_with({ + 'bootstrap.servers': '', + 'enable.partition.eof': False, + 'group.id': 'kafka_test', + 'enable.auto.commit': mock.ANY, + 'max.partition.fetch.bytes': mock.ANY, + 'security.protocol': 'PLAINTEXT', + 'sasl.mechanism': 'PLAIN', + 'sasl.username': mock.ANY, + 'sasl.password': mock.ANY, + 'ssl.ca.location': '', + 'default.topic.config': {'auto.offset.reset': 'latest'} + }) def test_cleanup(self): listeners = [mock.MagicMock(), mock.MagicMock()] @@ -155,10 +159,9 @@ class TestKafkaConnection(test_utils.BaseTestCase): def test_notify(self): - with mock.patch("kafka.KafkaProducer") as fake_producer_class: - fake_producer = fake_producer_class.return_value + with mock.patch("confluent_kafka.Producer") as producer: self.driver.pconn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, {"fake_text": "fake_message_1"}, 10) - self.assertEqual(2, len(fake_producer.send.mock_calls)) + assert producer.call_count == 1 diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 6e1357702..0ce17ed55 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -328,8 +328,13 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): # NOTE(sileht): Each test must not use the same topics # to be run in parallel + # NOTE(ansmith): kafka partition assignment delay requires + # longer timeouts for test completion + def test_simple(self): + get_timeout = 1 if self.url.startswith("kafka://"): + get_timeout = 5 self.conf.set_override('consumer_group', 'test_simple', group='oslo_messaging_kafka') @@ -338,14 +343,16 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): notifier = listener.notifier('abc') notifier.info({}, 'test', 'Hello World!') - event = listener.events.get(timeout=1) + event = listener.events.get(timeout=get_timeout) self.assertEqual('info', event[0]) self.assertEqual('test', event[1]) self.assertEqual('Hello World!', event[2]) self.assertEqual('abc', event[3]) def test_multiple_topics(self): + get_timeout = 1 if self.url.startswith("kafka://"): + get_timeout = 5 self.conf.set_override('consumer_group', 'test_multiple_topics', group='oslo_messaging_kafka') @@ -363,7 +370,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): received = {} while len(received) < len(sent): - e = listener.events.get(timeout=1) + e = listener.events.get(timeout=get_timeout) received[e[3]] = e for key in received: @@ -374,10 +381,15 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertEqual(expected[2], actual[2]) def test_multiple_servers(self): + timeout = 0.5 if self.url.startswith("amqp:"): self.skipTest("QPID-6307") - if self.url.startswith("kafka"): - self.skipTest("Kafka: Need to be fixed") + if self.url.startswith("kafka://"): + self.skipTest("Kafka: needs to be fixed") + timeout = 5 + self.conf.set_override('consumer_group', + 'test_multiple_servers', + group='oslo_messaging_kafka') listener_a = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['test-topic'])) @@ -391,15 +403,17 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): for event_type, payload in events_out: n.info({}, event_type, payload) - events_in = [[(e[1], e[2]) for e in listener_a.get_events()], - [(e[1], e[2]) for e in listener_b.get_events()]] + events_in = [[(e[1], e[2]) for e in listener_a.get_events(timeout)], + [(e[1], e[2]) for e in listener_b.get_events(timeout)]] self.assertThat(events_in, utils.IsValidDistributionOf(events_out)) for stream in events_in: self.assertThat(len(stream), matchers.GreaterThan(0)) def test_independent_topics(self): + get_timeout = 0.5 if self.url.startswith("kafka://"): + get_timeout = 5 self.conf.set_override('consumer_group', 'test_independent_topics_a', group='oslo_messaging_kafka') @@ -425,7 +439,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): b.info({}, event_type, payload) def check_received(listener, publisher, messages): - actuals = sorted([listener.events.get(timeout=0.5) + actuals = sorted([listener.events.get(timeout=get_timeout) for __ in range(len(a_out))]) expected = sorted([['info', m[0], m[1], publisher] for m in messages]) @@ -435,7 +449,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): check_received(listener_b, "pub-2", b_out) def test_all_categories(self): + get_timeout = 1 if self.url.startswith("kafka://"): + get_timeout = 5 self.conf.set_override('consumer_group', 'test_all_categories', group='oslo_messaging_kafka') @@ -451,7 +467,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): # order between events with different categories is not guaranteed received = {} for expected in events: - e = listener.events.get(timeout=1) + e = listener.events.get(timeout=get_timeout) received[e[0]] = e for expected in events: @@ -461,6 +477,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertEqual(expected[3], actual[2]) def test_simple_batch(self): + get_timeout = 3 + batch_timeout = 2 if self.url.startswith("amqp:"): backend = os.environ.get("AMQP1_BACKEND") if backend == "qdrouterd": @@ -468,18 +486,21 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): # sender pends until batch_size or timeout reached self.skipTest("qdrouterd backend") if self.url.startswith("kafka://"): + get_timeout = 10 + batch_timeout = 5 self.conf.set_override('consumer_group', 'test_simple_batch', group='oslo_messaging_kafka') listener = self.useFixture( utils.BatchNotificationFixture(self.conf, self.url, ['test_simple_batch'], - batch_size=100, batch_timeout=2)) + batch_size=100, + batch_timeout=batch_timeout)) notifier = listener.notifier('abc') for i in six.moves.range(0, 205): notifier.info({}, 'test%s' % i, 'Hello World!') - events = listener.get_events(timeout=3) + events = listener.get_events(timeout=get_timeout) self.assertEqual(3, len(events)) self.assertEqual(100, len(events[0][1])) self.assertEqual(100, len(events[1][1])) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index fcebba8fd..4d403a07b 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -313,9 +313,6 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): kafka_options.register_opts(conf, transport_url) - self.config(producer_batch_size=0, - group='oslo_messaging_kafka') - class NotificationFixture(fixtures.Fixture): def __init__(self, conf, url, topics, batch=None): diff --git a/releasenotes/notes/kafka-client-library-change-fe16d5a34550db7f.yaml b/releasenotes/notes/kafka-client-library-change-fe16d5a34550db7f.yaml new file mode 100644 index 000000000..ed2fcae1f --- /dev/null +++ b/releasenotes/notes/kafka-client-library-change-fe16d5a34550db7f.yaml @@ -0,0 +1,13 @@ +--- +fixes: + - | + Threading issues with the kafka-python consumer client were identified + and documented. The driver has been updated to integrate the + confluent-kafka python library. The confluent-kafka client + leverages the high performance librdkafka C client and is safe + for multiple thread use. +upgrade: + - | + With the change in the client library used, projects using the + Kafka driver should use extras oslo.messaging[kafka] to pull in + dependencies for the driver. diff --git a/setup-test-env-kafka.sh b/setup-test-env-kafka.sh index 8d58cdbff..98f1d9154 100755 --- a/setup-test-env-kafka.sh +++ b/setup-test-env-kafka.sh @@ -4,7 +4,7 @@ set -e . tools/functions.sh SCALA_VERSION=${SCALA_VERSION:-"2.12"} -KAFKA_VERSION=${KAFKA_VERSION:-"1.1.0"} +KAFKA_VERSION=${KAFKA_VERSION:-"2.0.0"} if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX) diff --git a/setup.cfg b/setup.cfg index 3c03ffacf..c43107264 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,8 +25,7 @@ classifier = amqp1 = pyngus>=2.2.0 # Apache-2.0 kafka = - kafka-python>=1.3.1 # Apache-2.0 - tenacity>=4.4.0 # Apache-2.0 + confluent-kafka>=0.11.6 # Apache-2.0 [files] packages = diff --git a/test-requirements.txt b/test-requirements.txt index 22636cb65..25b019be2 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -14,8 +14,7 @@ oslotest>=3.2.0 # Apache-2.0 pifpaf>=0.10.0 # Apache-2.0 # for test_impl_kafka -tenacity>=4.4.0 # Apache-2.0 -kafka-python>=1.3.1 # Apache-2.0 +confluent-kafka>=0.11.6 # Apache-2.0 # when we can require tox>= 1.4, this can go into tox.ini: # [testenv:cover]