From 4b123f43270cdb7167c1c9ccfd1049799adec05b Mon Sep 17 00:00:00 2001 From: Ben Motz Date: Thu, 1 Oct 2015 16:24:22 +0100 Subject: [PATCH] Do not spin when more processors than partitions If monasca-notification is configured with more notification processors than there are partitions in the corresponding topic then one or more of the processor instances will not be allocated any partitions. At present this results in the while loop spinning and wasting CPU cycles. Instead, this change will block until the SetPartitioner indicates that the state has changed, at which point the assigned partitions will be re-evaluated. Change-Id: I55e8b1b52b704618cb35593f7bdfc1415d8b584d --- monasca_notification/processors/kafka_consumer.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/monasca_notification/processors/kafka_consumer.py b/monasca_notification/processors/kafka_consumer.py index 7c51c6f..784ad18 100644 --- a/monasca_notification/processors/kafka_consumer.py +++ b/monasca_notification/processors/kafka_consumer.py @@ -18,6 +18,7 @@ import kafka.common import kafka.consumer import logging import monascastatsd +import threading from kazoo.client import KazooClient from kazoo.recipe.partitioner import SetPartitioner @@ -68,10 +69,13 @@ class KafkaConsumer(BaseProcessor): kazoo_client = KazooClient(hosts=self._zookeeper_url) kazoo_client.start() + state_change_event = threading.Event() + set_partitioner = ( SetPartitioner(kazoo_client, path=self._zookeeper_path, - set=self._consumer.fetch_offsets.keys())) + set=self._consumer.fetch_offsets.keys(), + state_change_event=state_change_event)) consumed_from_kafka = self._statsd.get_counter(name='consumed_from_kafka') @@ -94,6 +98,14 @@ class KafkaConsumer(BaseProcessor): if not partitions: partitions = [p for p in set_partitioner] + if not partitions: + log.info("Not assigned any partitions on topic {}," + " waiting for a Partitioner state change" + .format(self._kafka_topic)) + state_change_event.wait() + state_change_event.clear() + continue + log.info("Acquired locks on partition set {} " "for topic {}".format(partitions, self._kafka_topic))