From 3d1deca13930d97fcedcdd57ed8f9d50566b1df6 Mon Sep 17 00:00:00 2001 From: Tim Kuhlman Date: Mon, 18 Aug 2014 15:54:59 -0600 Subject: [PATCH] Move from using zookeeper to track partition offsets to kafka With kafka 0.8.1 it is now possible for non-java clients to track offsets with kafka. Previously the code had built this using zookeeper. This relies on kafka-python > 0.9.1 Change-Id: Ia42e713cc5d9ca61d8f8df2adc454f1e2579a229 --- README.md | 19 ++- monasca_notification/main.py | 10 +- .../processors/kafka_consumer.py | 60 ++++++-- monasca_notification/state_tracker.py | 139 +++++++++--------- notification.yaml | 12 +- requirements.txt | 2 +- tests/test_state_tracker.py | 28 +++- tools/monasca_notification_offsets.py | 12 +- 8 files changed, 168 insertions(+), 114 deletions(-) diff --git a/README.md b/README.md index edf9322..c9a1eb0 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,10 @@ There are four processing steps separated by queues implemented with python mult 3. Send Notification. - NotificationProcessor class 4. Add sent notifications to Kafka on the notification topic. - SentNotificationProcessor class -There is also a special processing step, the ZookeeperStateTracker, that runs in the main thread and keeps track of the +There is also a special processing step, the KafkaStateTracker, that runs in the main thread and keeps track of the last committed message and ones available for commit, it then periodically commits all progress. This handles the situation where alarms that are not acted on are quickly ready for commit but others which are prior to them in the -kafka order are still in progress. Locking is also handled by this class, so all zookeeper functionality is tracked in -this class. +kafka order are still in progress. Locking is also handled by this class using zookeeper. There are 4 internal queues: @@ -30,7 +29,7 @@ over and continue from where it left. A zookeeper lock file is used to ensure on the code can be modified to use kafka partitions to have multiple active engines working on different alarms. ## Fault Tolerance -When reading from the alarm topic no committing is done. The committing is done in sent_notification processor. This allows +When reading from the alarm topic no committing is done. The committing is done only after processing. This allows the processing to continue even though some notifications can be slow. In the event of a catastrophic failure some notifications could be sent but the alarms not yet acknowledged. This is an acceptable failure mode, better to send a notification twice than not at all. @@ -76,12 +75,12 @@ are gathered per thread, the thread number is indicated by a -# at the end of th # Future Considerations - Currently I lock the topic rather than the partitions. This effectively means there is only one active notification engine at any given time. In the future to share the load among multiple daemons we could lock by partition. -- The ZookeeperStateTracker is a likely place to end up as a bottleneck on high throughput. Detailed investigation of - its speed should be done. -- How fast is the mysql db? How much load do we put on it. Initially I think it makes most sense to read notification - details for each alarm but eventually I may want to cache that info. -- I am starting with a single KafkaConsumer and a single SentNotificationProcessor depending on load this may need - to scale. +- More extensive load testing is needed + - How fast is the mysql db? How much load do we put on it. Initially I think it makes most sense to read notification + details for each alarm but eventually I may want to cache that info. + - I am starting with a single KafkaConsumer and a single SentNotificationProcessor depending on load this may need + to scale. + - How fast is the state tracker? Do I need to scale or speed that up at all? # License diff --git a/monasca_notification/main.py b/monasca_notification/main.py index c524521..c606af5 100644 --- a/monasca_notification/main.py +++ b/monasca_notification/main.py @@ -23,7 +23,7 @@ import logging.config import multiprocessing import os import signal -from state_tracker import ZookeeperStateTracker +from state_tracker import KafkaStateTracker import sys import threading import time @@ -108,8 +108,9 @@ def main(argv=None): # State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock global tracker # Set to global for use in the cleanup function - tracker = ZookeeperStateTracker( - config['zookeeper']['url'], config['kafka']['alarm_topic'], finished, config['zookeeper']['max_offset_lag']) + tracker = KafkaStateTracker(finished, config['kafka']['url'], config['kafka']['group'], + config['kafka']['alarm_topic'], config['kafka']['max_offset_lag'], + config['zookeeper']['url']) tracker.lock(clean_exit) # Only begin if we have the processing lock tracker_thread = threading.Thread(target=tracker.run) @@ -120,8 +121,7 @@ def main(argv=None): alarms, config['kafka']['url'], config['kafka']['group'], - config['kafka']['alarm_topic'], - tracker.offsets + config['kafka']['alarm_topic'] ).run ) processors.append(kafka) diff --git a/monasca_notification/processors/kafka_consumer.py b/monasca_notification/processors/kafka_consumer.py index 653007b..6319ece 100644 --- a/monasca_notification/processors/kafka_consumer.py +++ b/monasca_notification/processors/kafka_consumer.py @@ -14,6 +14,7 @@ # limitations under the License. import kafka.client +import kafka.common import kafka.consumer import logging import statsd @@ -31,7 +32,7 @@ class KafkaConsumer(BaseProcessor): Unfortunately at this point the python-kafka client does not handle multiple consumers seamlessly. For more information see, https://github.com/mumrah/kafka-python/issues/112 """ - def __init__(self, queue, kafka_url, group, topic, initial_offsets=None): + def __init__(self, queue, kafka_url, group, topic): """Init kafka_url, group, topic - kafka connection details sent_queue - a sent_queue to publish log entries to @@ -39,21 +40,56 @@ class KafkaConsumer(BaseProcessor): self.queue = queue self.kafka = kafka.client.KafkaClient(kafka_url) - # No autocommit, it does not work with kafka 0.8.0 - see https://github.com/mumrah/kafka-python/issues/118 + # No auto-commit so that commits only happen after the alarm is processed. self.consumer = kafka.consumer.SimpleConsumer(self.kafka, group, topic, auto_commit=False) self.consumer.provide_partition_info() # Without this the partition is not provided in the response - if initial_offsets is not None: - # Set initial offsets directly in the consumer, there is no method for this so I have to do it here - self.consumer.offsets.update(initial_offsets) - # fetch offsets are +1 of standard offsets - for partition in initial_offsets: - self.consumer.fetch_offsets[partition] = initial_offsets[partition] + 1 + + self._initialize_offsets(group, topic) + # After my pull request is merged I can remove _initialize_offsets and use + # self.consumer.offsets = self.consumer.get_offsets() + # self.consumer.fetch_offsets = self.consumer.offsets.copy() + offsets = self.consumer.offsets.copy() + self.consumer.seek(0, 0) + if offsets != self.consumer.offsets: + log.error('Some messages not yet processed are no longer available in kafka, skipping to first available') + log.debug('Intialized offsets %s\nStarting offsets %s' % (offsets, self.consumer.offsets)) + + def _initialize_offsets(self, group, topic): + """Fetch initial offsets from kafka + This is largely taken from what the kafka consumer itself does when auto_commit is used + """ + def get_or_init_offset_callback(resp): + try: + kafka.common.check_error(resp) + return resp.offset + except kafka.common.UnknownTopicOrPartitionError: + return 0 + + for partition in self.kafka.topic_partitions[topic]: + req = kafka.common.OffsetFetchRequest(topic, partition) + (offset,) = self.consumer.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + + # The recorded offset is the last successfully processed, start processing at the next + # if no processing has been done the offset is 0 + if offset == 0: + self.consumer.offsets[partition] = offset + else: + self.consumer.offsets[partition] = offset + 1 + + # fetch_offsets are used by the SimpleConsumer + self.consumer.fetch_offsets = self.consumer.offsets.copy() def run(self): """Consume from kafka and place alarm objects on the sent_queue """ counter = statsd.Counter('ConsumedFromKafka') - for message in self.consumer: - counter += 1 - log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset)) - self._add_to_queue(self.queue, 'alarms', message) + try: + for message in self.consumer: + counter += 1 + log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset)) + self._add_to_queue(self.queue, 'alarms', message) + except Exception: + log.exception('Error running Kafka Consumer') + raise diff --git a/monasca_notification/state_tracker.py b/monasca_notification/state_tracker.py index 6fe80e4..5b91f17 100644 --- a/monasca_notification/state_tracker.py +++ b/monasca_notification/state_tracker.py @@ -14,6 +14,8 @@ # limitations under the License. import collections +import kafka.client +import kafka.common import kazoo.client import kazoo.exceptions import logging @@ -26,17 +28,18 @@ from monasca_notification import notification_exceptions log = logging.getLogger(__name__) -class ZookeeperStateTracker(object): +class KafkaStateTracker(object): """Tracks message offsets for a kafka topic and partitions. - Uses zookeeper to keep track of the last committed offset. As messages are finished with processing the committed offset is updated periodically. The messages are not necessarily finished in order, but the committed offset includes all previous messages so this object tracks any gaps updating as needed. + Uses zookeeper to keep track of the last committed offset. """ - def __init__(self, url, topic, finished_queue, max_lag): + def __init__(self, finished_queue, kafka_url, group, topic, max_lag, zookeeper_url): """Setup the finished_queue - url is the zookeeper hostname:port - topic is the kafka topic to track + finished_queue - queue containing all processed alarms + kafka_url, group, topic - kafka connection details + zookeeper_url is the zookeeper hostname:port """ self.finished_queue = finished_queue self.max_lag = max_lag @@ -44,14 +47,20 @@ class ZookeeperStateTracker(object): self.has_lock = False self.stop = False - self.zookeeper = kazoo.client.KazooClient(url) + # The kafka client is used solely for committing offsets, the consuming is done in the kafka_consumer + self.kafka_group = group + self.kafka = kafka.client.KafkaClient(kafka_url) + # The consumer is not needed but after my pull request is merged using it for get_offsets simplifies this code + # import kafka.consumer + # self.consumer = kafka.consumer.SimpleConsumer(self.kafka, group, topic, auto_commit=False) + + self.zookeeper = kazoo.client.KazooClient(zookeeper_url) self.zookeeper.start() - self.topic_path = '/consumers/monasca-notification/%s' % topic self.lock_retry_time = 15 # number of seconds to wait for retrying for the lock self.lock_path = '/locks/monasca-notification/%s' % topic - self._offsets = None + self._offsets = None # Initialized in the beginning of the run # This is a dictionary of sets used for tracking finished offsets when there is a gap and the committed offset # can not yet be advanced self._uncommitted_offsets = collections.defaultdict(set) @@ -68,53 +77,6 @@ class ZookeeperStateTracker(object): self.zookeeper.delete(self.lock_path) self.has_lock = False - def _get_offsets(self): - """Read the initial offsets from zookeeper or set defaults - The return is a dictionary with key name being partition # and value the offset - """ - if not self.has_lock: - log.warn('Reading offsets before the tracker has the lock, they could change') - try: - if self.zookeeper.exists(self.topic_path): - offsets = collections.defaultdict(int) - for child in self.zookeeper.get_children(self.topic_path): - offsets[int(child)] = int(self.zookeeper.get('/'.join((self.topic_path, child)))[0]) - log.info('Setting initial offsets to %s' % str(offsets)) - return offsets - else: - self.zookeeper.ensure_path(self.topic_path) - return {} - except kazoo.exceptions.KazooException: - log.exception('Error retrieving the committed offset in zookeeper') - raise - - def _update_offset(self, partition, value): - """Update the object and zookeepers stored offset number for a partition to value - """ - self.offset_update_count += value - self._offsets[partition] - self._offsets[partition] = value - partition_path = '/'.join((self.topic_path, str(partition))) - try: - with self.zk_timer.time(): - self.zookeeper.ensure_path(partition_path) - self.zookeeper.set(partition_path, str(value)) - log.debug('Updated committed offset at path %s, offsets %s' % (partition_path, value)) - except kazoo.exceptions.KazooException: - log.exception('Error updating the committed offset in zookeeper, path %s, value %s' - % (partition_path, value)) - raise - - self._last_commit_time[partition] = time.time() - - @property - def offsets(self): - """Generally only initialize the offsets after the lock has been pulled - """ - if self._offsets is None: - self._offsets = self._get_offsets() - - return self._offsets - def lock(self, exit_method): """Grab a lock within zookeeper, if not available retry. """ @@ -141,6 +103,35 @@ class ZookeeperStateTracker(object): # suspended, the process should be supervised so it starts right back up again. self.zookeeper.add_listener(exit_method) + @property + def offsets(self): + """Return self._offsets, this is a property because generally only initialize the offsets after the lock has + been pulled + """ + if not self.has_lock: + log.warn('Reading offsets before the tracker has the lock, they could change') + + if self._offsets is None: + # After my pull request is merged I can setup self.consumer as is done in kafka_consumer + # then simply use the get_offsets command as below + # self._offsets = self.consumer.get_offsets() + self._offsets = {} + + def get_or_init_offset_callback(resp): + try: + kafka.common.check_error(resp) + return resp.offset + except kafka.common.UnknownTopicOrPartitionError: + return 0 + for partition in self.kafka.topic_partitions[self.topic]: + req = kafka.common.OffsetFetchRequest(self.topic, partition) + (offset,) = self.kafka.send_offset_fetch_request(self.kafka_group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + self._offsets[partition] = offset + + return self._offsets + def run(self): """Mark a message as finished and where possible commit the new offset to zookeeper. There is no mechanism here to deal with the situation where a single alarm is extremely slow to finish @@ -149,9 +140,6 @@ class ZookeeperStateTracker(object): if not self.has_lock: raise notification_exceptions.NotificationException('Attempt to begin run without Zookeeper Lock') - if self._offsets is None: # Verify the offsets have been initialized - self._offsets = self._get_offsets() - finished_count = statsd.Counter('AlarmsFinished') while True: # If self.stop is True run the queue until it is empty, do final commits then exit @@ -165,7 +153,7 @@ class ZookeeperStateTracker(object): if ((time.time() - self._last_commit_time[partition]) > self.max_lag) and\ (len(self._uncommitted_offsets[partition]) > 0): log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition) - self._update_offset(partition, max(self._uncommitted_offsets[partition])) + self.update_offset(partition, max(self._uncommitted_offsets[partition])) break try: @@ -178,11 +166,8 @@ class ZookeeperStateTracker(object): offset = int(msg[1]) log.debug('Received commit finish for partition %d, offset %d' % (partition, offset)) - # Update immediately if the partition is not yet tracked or the offset is 1 above current offset - if partition not in self._offsets: - log.debug('Updating offset for partition %d, offset %d' % (partition, offset)) - self._update_offset(partition, offset) - elif self._offsets[partition] == offset - 1: + # Update immediately if the offset is 1 above current offset + if self.offsets[partition] == offset - 1: new_offset = offset for x in range(offset + 1, offset + 1 + len(self._uncommitted_offsets[partition])): @@ -192,13 +177,13 @@ class ZookeeperStateTracker(object): else: break - self._update_offset(partition, new_offset) + self.update_offset(partition, new_offset) if offset == new_offset: log.debug('Updating offset for partition %d, offset %d' % (partition, new_offset)) else: log.debug('Updating offset for partition %d, offset %d covering this update and older offsets' % (partition, new_offset)) - elif self._offsets[partition] > offset: + elif self.offsets[partition] > offset: log.warn('An offset was received that was lower than the committed offset.' + 'Possibly a result of skipping lagging notifications') else: # This is skipping offsets so add to the uncommitted set unless max_lag has been hit @@ -206,7 +191,27 @@ class ZookeeperStateTracker(object): log.debug('Added partition %d, offset %d to uncommited set' % (partition, offset)) if (self.max_lag is not None) and ((time.time() - self._last_commit_time[partition]) > self.max_lag): log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition) - self._update_offset(partition, max(self._uncommitted_offsets[partition])) + self.update_offset(partition, max(self._uncommitted_offsets[partition])) self._uncommitted_offsets[partition].clear() self._drop_lock() + + def update_offset(self, partition, value): + """Update the object and kafka offset number for a partition to value + """ + if self._offsets is None: # Initialize offsets if needed + self.offsets + + self.offset_update_count += value - self._offsets[partition] + self._offsets[partition] = value + + req = kafka.common.OffsetCommitRequest(self.topic, partition, value, None) + try: + responses = self.kafka.send_offset_commit_request(self.kafka_group, [req]) + kafka.common.check_error(responses[0]) + log.debug('Updated committed offset for partition %s, offset %s' % (partition, value)) + except kafka.common.KafkaError: + log.exception('Error updating the committed offset in kafka, partition %s, value %s' % (partition, value)) + raise + + self._last_commit_time[partition] = time.time() diff --git a/notification.yaml b/notification.yaml index 273bbec..0230d49 100644 --- a/notification.yaml +++ b/notification.yaml @@ -1,17 +1,18 @@ kafka: - url: 192.168.10.10:9092 # or comma seperated list of multiple hosts + url: 192.168.10.4:9092 # or comma seperated list of multiple hosts group: monasca-notification alarm_topic: alarm-state-transitions notification_topic: alarm-notifications + max_offset_lag: 600 # In seconds, undefined for none mysql: - host: 192.168.10.6 + host: 192.168.10.4 user: notification passwd: password db: mon email: - server: smtp3.hp.com + server: 192.168.10.4 port: 25 user: password: @@ -32,8 +33,7 @@ queues: sent_notifications_size: 50 # limiting this size reduces potential # of re-sent notifications after a failure zookeeper: - url: 192.168.10.10:2181 # or comma seperated list of multiple hosts - max_offset_lag: 600 # In seconds, undefined for none + url: 192.168.10.4:2181 # or comma seperated list of multiple hosts logging: # Used in logging.dictConfig version: 1 @@ -61,4 +61,4 @@ logging: # Used in logging.dictConfig root: handlers: - console - level: INFO + level: DEBUG diff --git a/requirements.txt b/requirements.txt index 52555b6..eb55e88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -kafka-python>=0.9.0 +kafka-python>=0.9.1 kazoo>=1.3 MySQL-python pbr>=0.6,<1.0 diff --git a/tests/test_state_tracker.py b/tests/test_state_tracker.py index 4c78de4..da17aa5 100644 --- a/tests/test_state_tracker.py +++ b/tests/test_state_tracker.py @@ -15,6 +15,7 @@ """Tests the StateTracker""" +import kafka.common import mock import multiprocessing import threading @@ -29,7 +30,11 @@ class TestStateTracker(unittest.TestCase): self.finished_queue = multiprocessing.Queue(10) with mock.patch('kazoo.client.KazooClient') as self.mock_zk: self.mock_zk.return_value = self.mock_zk - self.tracker = state_tracker.ZookeeperStateTracker('url', 'topic', self.finished_queue, 1) + with mock.patch('kafka.client.KafkaClient') as self.mock_kafka: + self.tracker = state_tracker.KafkaStateTracker(self.finished_queue, 'kafka_url', 'group', + 'topic', 1, 'zookeeper_url') + self.mock_kafka.return_value = self.mock_kafka + self.tracker.has_lock = True self.tracker_thread = threading.Thread(target=self.tracker.run) self.tracker_thread.daemon = True # needed for the thread to properly exit @@ -50,9 +55,12 @@ class TestStateTracker(unittest.TestCase): test_list = [(0, 1), (0, 2), (1, 1), (1, 2)] self._feed_queue(test_list) - expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value)) + expected_calls = [mock.call().send_offset_commit_request('group', + [kafka.common.OffsetCommitRequest('topic', partition, + value, None)]) for partition, value in test_list] - set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')] + set_calls = [call for call in self.mock_kafka.mock_calls + if call.__str__().startswith("call().send_offset_commit_request('")] self.assertTrue(expected_calls == set_calls) @@ -63,9 +71,12 @@ class TestStateTracker(unittest.TestCase): self._feed_queue(unordered_test_list) commit_list = [(0, 2), (1, 2)] - expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value)) + expected_calls = [mock.call().send_offset_commit_request('group', + [kafka.common.OffsetCommitRequest('topic', partition, + value, None)]) for partition, value in commit_list] - set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')] + set_calls = [call for call in self.mock_kafka.mock_calls + if call.__str__().startswith("call().send_offset_commit_request('")] self.assertTrue(expected_calls == set_calls) @@ -83,8 +94,11 @@ class TestStateTracker(unittest.TestCase): time.sleep(1) # wait for processing commit_list = [(0, 3), (1, 3)] - expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value)) + expected_calls = [mock.call().send_offset_commit_request('group', + [kafka.common.OffsetCommitRequest('topic', partition, + value, None)]) for partition, value in commit_list] - set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')] + set_calls = [call for call in self.mock_kafka.mock_calls + if call.__str__().startswith("call().send_offset_commit_request('")] self.assertTrue(expected_calls == set_calls) diff --git a/tools/monasca_notification_offsets.py b/tools/monasca_notification_offsets.py index f659fb0..b037588 100755 --- a/tools/monasca_notification_offsets.py +++ b/tools/monasca_notification_offsets.py @@ -29,7 +29,7 @@ from monasca_notification import state_tracker def listener(): - """Simple listener for ZookeeperStateTracker + """Simple listener for KafkaStateTracker """ sys.exit(1) @@ -53,12 +53,12 @@ def main(): # Parse config and setup state tracker config = yaml.load(open(args.config, 'r')) - tracker = state_tracker.ZookeeperStateTracker( - config['zookeeper']['url'], config['kafka']['alarm_topic'], None, config['zookeeper']['max_offset_lag']) + tracker = state_tracker.KafkaStateTracker(None, config['kafka']['url'], config['kafka']['group'], + config['kafka']['alarm_topic'], config['kafka']['max_offset_lag'], + config['zookeeper']['url']) - current_offsests = tracker.offsets if args.list: - print(json.dumps(current_offsests)) + print(json.dumps(tracker.offsets)) else: offsets = json.loads(args.set_offsets) raw_input("Warning setting offset will affect the behavior of the next notification engine to run.\n" + @@ -70,7 +70,7 @@ def main(): tracker.lock(listener) for partition in offsets.iterkeys(): - tracker._update_offset(int(partition), int(offsets[partition])) + tracker.update_offset(int(partition), int(offsets[partition])) if __name__ == "__main__": sys.exit(main())