From 5aaf3df47ad94d3ce007add77c1849aaa24e1841 Mon Sep 17 00:00:00 2001 From: Joe Keen Date: Mon, 21 Mar 2016 16:00:03 -0600 Subject: [PATCH] Partition rebalance broken in kafka-python 0.9.5 Our partiton rebalance mechanism broke on the upgrade from kafka-python 0.9.2 to 0.9.5. Rather than fiddling with the internals of the kafka consumer object we're now reconstructing the consumer object after each rebalance and handing it the specific partitions it needs to worry about. Closes-bug: #1560178 Change-Id: I469ceb28538db1f36918f211eaea4fcfdaa17649 --- monasca_common/kafka/consumer.py | 49 ++++++++++++++++---------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/monasca_common/kafka/consumer.py b/monasca_common/kafka/consumer.py index 01a013a1..208732da 100644 --- a/monasca_common/kafka/consumer.py +++ b/monasca_common/kafka/consumer.py @@ -81,25 +81,33 @@ class KafkaConsumer(object): self._partitions = [] + self._kafka_group = group self._kafka_topic = topic + self._kafka_fetch_size = fetch_size self._zookeeper_url = zookeeper_url self._zookeeper_path = zookeeper_path self._kafka = kafka.client.KafkaClient(kafka_url) - # No auto-commit so that commits only happen after the message is processed. - self._consumer = kafka.consumer.SimpleConsumer(self._kafka, - group, - self._kafka_topic, - auto_commit=False, - iter_timeout=5, - fetch_size_bytes=fetch_size, - buffer_size=fetch_size, - max_buffer_size=None) + self._consumer = self._create_kafka_consumer() - self._consumer.provide_partition_info() - self._consumer.fetch_last_known_offsets() + def _create_kafka_consumer(self, partitions=None): + # No auto-commit so that commits only happen after the message is processed. + consumer = kafka.consumer.SimpleConsumer( + self._kafka, + self._kafka_group, + self._kafka_topic, + auto_commit=False, + partitions=partitions, + iter_timeout=5, + fetch_size_bytes=self._kafka_fetch_size, + buffer_size=self._kafka_fetch_size, + max_buffer_size=None) + + consumer.provide_partition_info() + consumer.fetch_last_known_offsets() + return consumer def __iter__(self): self._partition() @@ -194,20 +202,11 @@ class KafkaConsumer(object): log.info("Acquired locks on partition set {} " "for topic {}".format(self._partitions, self._kafka_topic)) - # Refresh the last known offsets again to make sure - # that they are the latest after having acquired the - # lock. Updates self._consumer.fetch_offsets. - self._consumer.fetch_last_known_offsets() - - # Modify self._consumer.fetch_offsets to hold only the - # offsets for the set of Kafka partitions acquired - # by this instance. - partitioned_fetch_offsets = {} - for p in self._partitions: - partitioned_fetch_offsets[p] = ( - self._consumer.fetch_offsets[p]) - - self._consumer.fetch_offsets = partitioned_fetch_offsets + # Reconstruct the kafka consumer object because the + # consumer has no API that allows the set of partitons + # to be updated outside of construction. + self._consumer.stop() + self._consumer = self._create_kafka_consumer(self._partitions) return elif self._set_partitioner.allocating: