Partition rebalance broken in kafka-python 0.9.5

Our partiton rebalance mechanism broke on the upgrade from kafka-python 0.9.2
to 0.9.5.  Rather than fiddling with the internals of the kafka consumer object
we're now reconstructing the consumer object after each rebalance and handing
it the specific partitions it needs to worry about.

Closes-bug: #1560178
Change-Id: I469ceb28538db1f36918f211eaea4fcfdaa17649
This commit is contained in:
Joe Keen 2016-03-21 16:00:03 -06:00
parent 8ed12871f8
commit 5aaf3df47a
1 changed files with 24 additions and 25 deletions

View File

@ -81,25 +81,33 @@ class KafkaConsumer(object):
self._partitions = []
self._kafka_group = group
self._kafka_topic = topic
self._kafka_fetch_size = fetch_size
self._zookeeper_url = zookeeper_url
self._zookeeper_path = zookeeper_path
self._kafka = kafka.client.KafkaClient(kafka_url)
# No auto-commit so that commits only happen after the message is processed.
self._consumer = kafka.consumer.SimpleConsumer(self._kafka,
group,
self._kafka_topic,
auto_commit=False,
iter_timeout=5,
fetch_size_bytes=fetch_size,
buffer_size=fetch_size,
max_buffer_size=None)
self._consumer = self._create_kafka_consumer()
self._consumer.provide_partition_info()
self._consumer.fetch_last_known_offsets()
def _create_kafka_consumer(self, partitions=None):
# No auto-commit so that commits only happen after the message is processed.
consumer = kafka.consumer.SimpleConsumer(
self._kafka,
self._kafka_group,
self._kafka_topic,
auto_commit=False,
partitions=partitions,
iter_timeout=5,
fetch_size_bytes=self._kafka_fetch_size,
buffer_size=self._kafka_fetch_size,
max_buffer_size=None)
consumer.provide_partition_info()
consumer.fetch_last_known_offsets()
return consumer
def __iter__(self):
self._partition()
@ -194,20 +202,11 @@ class KafkaConsumer(object):
log.info("Acquired locks on partition set {} "
"for topic {}".format(self._partitions, self._kafka_topic))
# Refresh the last known offsets again to make sure
# that they are the latest after having acquired the
# lock. Updates self._consumer.fetch_offsets.
self._consumer.fetch_last_known_offsets()
# Modify self._consumer.fetch_offsets to hold only the
# offsets for the set of Kafka partitions acquired
# by this instance.
partitioned_fetch_offsets = {}
for p in self._partitions:
partitioned_fetch_offsets[p] = (
self._consumer.fetch_offsets[p])
self._consumer.fetch_offsets = partitioned_fetch_offsets
# Reconstruct the kafka consumer object because the
# consumer has no API that allows the set of partitons
# to be updated outside of construction.
self._consumer.stop()
self._consumer = self._create_kafka_consumer(self._partitions)
return
elif self._set_partitioner.allocating: