diff --git a/monasca_notification/processors/kafka_consumer.py b/monasca_notification/processors/kafka_consumer.py index 3589af0..19f73f6 100644 --- a/monasca_notification/processors/kafka_consumer.py +++ b/monasca_notification/processors/kafka_consumer.py @@ -18,6 +18,7 @@ import kafka.common import kafka.consumer import logging import monascastatsd +import threading from kazoo.client import KazooClient from kazoo.recipe.partitioner import SetPartitioner @@ -69,10 +70,13 @@ class KafkaConsumer(BaseProcessor): kazoo_client = KazooClient(hosts=self._zookeeper_url) kazoo_client.start() + state_change_event = threading.Event() + set_partitioner = ( SetPartitioner(kazoo_client, path=self._zookeeper_path, - set=self._consumer.fetch_offsets.keys())) + set=self._consumer.fetch_offsets.keys(), + state_change_event=state_change_event)) consumed_from_kafka = self._statsd.get_counter(name='consumed_from_kafka') @@ -95,6 +99,14 @@ class KafkaConsumer(BaseProcessor): if not partitions: partitions = [p for p in set_partitioner] + if not partitions: + log.info("Not assigned any partitions on topic {}," + " waiting for a Partitioner state change" + .format(self._kafka_topic)) + state_change_event.wait() + state_change_event.clear() + continue + log.info("Acquired locks on partition set {} " "for topic {}".format(partitions, self._kafka_topic))