Move from using zookeeper to track partition offsets to kafka

With kafka 0.8.1 it is now possible for non-java clients to track
offsets with kafka. Previously the code had built this using zookeeper.
This relies on kafka-python > 0.9.1

Change-Id: Ia42e713cc5d9ca61d8f8df2adc454f1e2579a229
This commit is contained in:
Tim Kuhlman 2014-08-18 15:54:59 -06:00
parent 224e654f4c
commit 3d1deca139
8 changed files with 168 additions and 114 deletions

View File

@ -10,11 +10,10 @@ There are four processing steps separated by queues implemented with python mult
3. Send Notification. - NotificationProcessor class
4. Add sent notifications to Kafka on the notification topic. - SentNotificationProcessor class
There is also a special processing step, the ZookeeperStateTracker, that runs in the main thread and keeps track of the
There is also a special processing step, the KafkaStateTracker, that runs in the main thread and keeps track of the
last committed message and ones available for commit, it then periodically commits all progress. This handles the
situation where alarms that are not acted on are quickly ready for commit but others which are prior to them in the
kafka order are still in progress. Locking is also handled by this class, so all zookeeper functionality is tracked in
this class.
kafka order are still in progress. Locking is also handled by this class using zookeeper.
There are 4 internal queues:
@ -30,7 +29,7 @@ over and continue from where it left. A zookeeper lock file is used to ensure on
the code can be modified to use kafka partitions to have multiple active engines working on different alarms.
## Fault Tolerance
When reading from the alarm topic no committing is done. The committing is done in sent_notification processor. This allows
When reading from the alarm topic no committing is done. The committing is done only after processing. This allows
the processing to continue even though some notifications can be slow. In the event of a catastrophic failure some
notifications could be sent but the alarms not yet acknowledged. This is an acceptable failure mode, better to send a
notification twice than not at all.
@ -76,12 +75,12 @@ are gathered per thread, the thread number is indicated by a -# at the end of th
# Future Considerations
- Currently I lock the topic rather than the partitions. This effectively means there is only one active notification
engine at any given time. In the future to share the load among multiple daemons we could lock by partition.
- The ZookeeperStateTracker is a likely place to end up as a bottleneck on high throughput. Detailed investigation of
its speed should be done.
- How fast is the mysql db? How much load do we put on it. Initially I think it makes most sense to read notification
details for each alarm but eventually I may want to cache that info.
- I am starting with a single KafkaConsumer and a single SentNotificationProcessor depending on load this may need
to scale.
- More extensive load testing is needed
- How fast is the mysql db? How much load do we put on it. Initially I think it makes most sense to read notification
details for each alarm but eventually I may want to cache that info.
- I am starting with a single KafkaConsumer and a single SentNotificationProcessor depending on load this may need
to scale.
- How fast is the state tracker? Do I need to scale or speed that up at all?
# License

View File

@ -23,7 +23,7 @@ import logging.config
import multiprocessing
import os
import signal
from state_tracker import ZookeeperStateTracker
from state_tracker import KafkaStateTracker
import sys
import threading
import time
@ -108,8 +108,9 @@ def main(argv=None):
# State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock
global tracker # Set to global for use in the cleanup function
tracker = ZookeeperStateTracker(
config['zookeeper']['url'], config['kafka']['alarm_topic'], finished, config['zookeeper']['max_offset_lag'])
tracker = KafkaStateTracker(finished, config['kafka']['url'], config['kafka']['group'],
config['kafka']['alarm_topic'], config['kafka']['max_offset_lag'],
config['zookeeper']['url'])
tracker.lock(clean_exit) # Only begin if we have the processing lock
tracker_thread = threading.Thread(target=tracker.run)
@ -120,8 +121,7 @@ def main(argv=None):
alarms,
config['kafka']['url'],
config['kafka']['group'],
config['kafka']['alarm_topic'],
tracker.offsets
config['kafka']['alarm_topic']
).run
)
processors.append(kafka)

View File

@ -14,6 +14,7 @@
# limitations under the License.
import kafka.client
import kafka.common
import kafka.consumer
import logging
import statsd
@ -31,7 +32,7 @@ class KafkaConsumer(BaseProcessor):
Unfortunately at this point the python-kafka client does not handle multiple consumers seamlessly.
For more information see, https://github.com/mumrah/kafka-python/issues/112
"""
def __init__(self, queue, kafka_url, group, topic, initial_offsets=None):
def __init__(self, queue, kafka_url, group, topic):
"""Init
kafka_url, group, topic - kafka connection details
sent_queue - a sent_queue to publish log entries to
@ -39,21 +40,56 @@ class KafkaConsumer(BaseProcessor):
self.queue = queue
self.kafka = kafka.client.KafkaClient(kafka_url)
# No autocommit, it does not work with kafka 0.8.0 - see https://github.com/mumrah/kafka-python/issues/118
# No auto-commit so that commits only happen after the alarm is processed.
self.consumer = kafka.consumer.SimpleConsumer(self.kafka, group, topic, auto_commit=False)
self.consumer.provide_partition_info() # Without this the partition is not provided in the response
if initial_offsets is not None:
# Set initial offsets directly in the consumer, there is no method for this so I have to do it here
self.consumer.offsets.update(initial_offsets)
# fetch offsets are +1 of standard offsets
for partition in initial_offsets:
self.consumer.fetch_offsets[partition] = initial_offsets[partition] + 1
self._initialize_offsets(group, topic)
# After my pull request is merged I can remove _initialize_offsets and use
# self.consumer.offsets = self.consumer.get_offsets()
# self.consumer.fetch_offsets = self.consumer.offsets.copy()
offsets = self.consumer.offsets.copy()
self.consumer.seek(0, 0)
if offsets != self.consumer.offsets:
log.error('Some messages not yet processed are no longer available in kafka, skipping to first available')
log.debug('Intialized offsets %s\nStarting offsets %s' % (offsets, self.consumer.offsets))
def _initialize_offsets(self, group, topic):
"""Fetch initial offsets from kafka
This is largely taken from what the kafka consumer itself does when auto_commit is used
"""
def get_or_init_offset_callback(resp):
try:
kafka.common.check_error(resp)
return resp.offset
except kafka.common.UnknownTopicOrPartitionError:
return 0
for partition in self.kafka.topic_partitions[topic]:
req = kafka.common.OffsetFetchRequest(topic, partition)
(offset,) = self.consumer.client.send_offset_fetch_request(group, [req],
callback=get_or_init_offset_callback,
fail_on_error=False)
# The recorded offset is the last successfully processed, start processing at the next
# if no processing has been done the offset is 0
if offset == 0:
self.consumer.offsets[partition] = offset
else:
self.consumer.offsets[partition] = offset + 1
# fetch_offsets are used by the SimpleConsumer
self.consumer.fetch_offsets = self.consumer.offsets.copy()
def run(self):
"""Consume from kafka and place alarm objects on the sent_queue
"""
counter = statsd.Counter('ConsumedFromKafka')
for message in self.consumer:
counter += 1
log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset))
self._add_to_queue(self.queue, 'alarms', message)
try:
for message in self.consumer:
counter += 1
log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset))
self._add_to_queue(self.queue, 'alarms', message)
except Exception:
log.exception('Error running Kafka Consumer')
raise

View File

@ -14,6 +14,8 @@
# limitations under the License.
import collections
import kafka.client
import kafka.common
import kazoo.client
import kazoo.exceptions
import logging
@ -26,17 +28,18 @@ from monasca_notification import notification_exceptions
log = logging.getLogger(__name__)
class ZookeeperStateTracker(object):
class KafkaStateTracker(object):
"""Tracks message offsets for a kafka topic and partitions.
Uses zookeeper to keep track of the last committed offset.
As messages are finished with processing the committed offset is updated periodically.
The messages are not necessarily finished in order, but the committed offset includes
all previous messages so this object tracks any gaps updating as needed.
Uses zookeeper to keep track of the last committed offset.
"""
def __init__(self, url, topic, finished_queue, max_lag):
def __init__(self, finished_queue, kafka_url, group, topic, max_lag, zookeeper_url):
"""Setup the finished_queue
url is the zookeeper hostname:port
topic is the kafka topic to track
finished_queue - queue containing all processed alarms
kafka_url, group, topic - kafka connection details
zookeeper_url is the zookeeper hostname:port
"""
self.finished_queue = finished_queue
self.max_lag = max_lag
@ -44,14 +47,20 @@ class ZookeeperStateTracker(object):
self.has_lock = False
self.stop = False
self.zookeeper = kazoo.client.KazooClient(url)
# The kafka client is used solely for committing offsets, the consuming is done in the kafka_consumer
self.kafka_group = group
self.kafka = kafka.client.KafkaClient(kafka_url)
# The consumer is not needed but after my pull request is merged using it for get_offsets simplifies this code
# import kafka.consumer
# self.consumer = kafka.consumer.SimpleConsumer(self.kafka, group, topic, auto_commit=False)
self.zookeeper = kazoo.client.KazooClient(zookeeper_url)
self.zookeeper.start()
self.topic_path = '/consumers/monasca-notification/%s' % topic
self.lock_retry_time = 15 # number of seconds to wait for retrying for the lock
self.lock_path = '/locks/monasca-notification/%s' % topic
self._offsets = None
self._offsets = None # Initialized in the beginning of the run
# This is a dictionary of sets used for tracking finished offsets when there is a gap and the committed offset
# can not yet be advanced
self._uncommitted_offsets = collections.defaultdict(set)
@ -68,53 +77,6 @@ class ZookeeperStateTracker(object):
self.zookeeper.delete(self.lock_path)
self.has_lock = False
def _get_offsets(self):
"""Read the initial offsets from zookeeper or set defaults
The return is a dictionary with key name being partition # and value the offset
"""
if not self.has_lock:
log.warn('Reading offsets before the tracker has the lock, they could change')
try:
if self.zookeeper.exists(self.topic_path):
offsets = collections.defaultdict(int)
for child in self.zookeeper.get_children(self.topic_path):
offsets[int(child)] = int(self.zookeeper.get('/'.join((self.topic_path, child)))[0])
log.info('Setting initial offsets to %s' % str(offsets))
return offsets
else:
self.zookeeper.ensure_path(self.topic_path)
return {}
except kazoo.exceptions.KazooException:
log.exception('Error retrieving the committed offset in zookeeper')
raise
def _update_offset(self, partition, value):
"""Update the object and zookeepers stored offset number for a partition to value
"""
self.offset_update_count += value - self._offsets[partition]
self._offsets[partition] = value
partition_path = '/'.join((self.topic_path, str(partition)))
try:
with self.zk_timer.time():
self.zookeeper.ensure_path(partition_path)
self.zookeeper.set(partition_path, str(value))
log.debug('Updated committed offset at path %s, offsets %s' % (partition_path, value))
except kazoo.exceptions.KazooException:
log.exception('Error updating the committed offset in zookeeper, path %s, value %s'
% (partition_path, value))
raise
self._last_commit_time[partition] = time.time()
@property
def offsets(self):
"""Generally only initialize the offsets after the lock has been pulled
"""
if self._offsets is None:
self._offsets = self._get_offsets()
return self._offsets
def lock(self, exit_method):
"""Grab a lock within zookeeper, if not available retry.
"""
@ -141,6 +103,35 @@ class ZookeeperStateTracker(object):
# suspended, the process should be supervised so it starts right back up again.
self.zookeeper.add_listener(exit_method)
@property
def offsets(self):
"""Return self._offsets, this is a property because generally only initialize the offsets after the lock has
been pulled
"""
if not self.has_lock:
log.warn('Reading offsets before the tracker has the lock, they could change')
if self._offsets is None:
# After my pull request is merged I can setup self.consumer as is done in kafka_consumer
# then simply use the get_offsets command as below
# self._offsets = self.consumer.get_offsets()
self._offsets = {}
def get_or_init_offset_callback(resp):
try:
kafka.common.check_error(resp)
return resp.offset
except kafka.common.UnknownTopicOrPartitionError:
return 0
for partition in self.kafka.topic_partitions[self.topic]:
req = kafka.common.OffsetFetchRequest(self.topic, partition)
(offset,) = self.kafka.send_offset_fetch_request(self.kafka_group, [req],
callback=get_or_init_offset_callback,
fail_on_error=False)
self._offsets[partition] = offset
return self._offsets
def run(self):
"""Mark a message as finished and where possible commit the new offset to zookeeper.
There is no mechanism here to deal with the situation where a single alarm is extremely slow to finish
@ -149,9 +140,6 @@ class ZookeeperStateTracker(object):
if not self.has_lock:
raise notification_exceptions.NotificationException('Attempt to begin run without Zookeeper Lock')
if self._offsets is None: # Verify the offsets have been initialized
self._offsets = self._get_offsets()
finished_count = statsd.Counter('AlarmsFinished')
while True:
# If self.stop is True run the queue until it is empty, do final commits then exit
@ -165,7 +153,7 @@ class ZookeeperStateTracker(object):
if ((time.time() - self._last_commit_time[partition]) > self.max_lag) and\
(len(self._uncommitted_offsets[partition]) > 0):
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
self.update_offset(partition, max(self._uncommitted_offsets[partition]))
break
try:
@ -178,11 +166,8 @@ class ZookeeperStateTracker(object):
offset = int(msg[1])
log.debug('Received commit finish for partition %d, offset %d' % (partition, offset))
# Update immediately if the partition is not yet tracked or the offset is 1 above current offset
if partition not in self._offsets:
log.debug('Updating offset for partition %d, offset %d' % (partition, offset))
self._update_offset(partition, offset)
elif self._offsets[partition] == offset - 1:
# Update immediately if the offset is 1 above current offset
if self.offsets[partition] == offset - 1:
new_offset = offset
for x in range(offset + 1, offset + 1 + len(self._uncommitted_offsets[partition])):
@ -192,13 +177,13 @@ class ZookeeperStateTracker(object):
else:
break
self._update_offset(partition, new_offset)
self.update_offset(partition, new_offset)
if offset == new_offset:
log.debug('Updating offset for partition %d, offset %d' % (partition, new_offset))
else:
log.debug('Updating offset for partition %d, offset %d covering this update and older offsets'
% (partition, new_offset))
elif self._offsets[partition] > offset:
elif self.offsets[partition] > offset:
log.warn('An offset was received that was lower than the committed offset.' +
'Possibly a result of skipping lagging notifications')
else: # This is skipping offsets so add to the uncommitted set unless max_lag has been hit
@ -206,7 +191,27 @@ class ZookeeperStateTracker(object):
log.debug('Added partition %d, offset %d to uncommited set' % (partition, offset))
if (self.max_lag is not None) and ((time.time() - self._last_commit_time[partition]) > self.max_lag):
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
self.update_offset(partition, max(self._uncommitted_offsets[partition]))
self._uncommitted_offsets[partition].clear()
self._drop_lock()
def update_offset(self, partition, value):
"""Update the object and kafka offset number for a partition to value
"""
if self._offsets is None: # Initialize offsets if needed
self.offsets
self.offset_update_count += value - self._offsets[partition]
self._offsets[partition] = value
req = kafka.common.OffsetCommitRequest(self.topic, partition, value, None)
try:
responses = self.kafka.send_offset_commit_request(self.kafka_group, [req])
kafka.common.check_error(responses[0])
log.debug('Updated committed offset for partition %s, offset %s' % (partition, value))
except kafka.common.KafkaError:
log.exception('Error updating the committed offset in kafka, partition %s, value %s' % (partition, value))
raise
self._last_commit_time[partition] = time.time()

View File

@ -1,17 +1,18 @@
kafka:
url: 192.168.10.10:9092 # or comma seperated list of multiple hosts
url: 192.168.10.4:9092 # or comma seperated list of multiple hosts
group: monasca-notification
alarm_topic: alarm-state-transitions
notification_topic: alarm-notifications
max_offset_lag: 600 # In seconds, undefined for none
mysql:
host: 192.168.10.6
host: 192.168.10.4
user: notification
passwd: password
db: mon
email:
server: smtp3.hp.com
server: 192.168.10.4
port: 25
user:
password:
@ -32,8 +33,7 @@ queues:
sent_notifications_size: 50 # limiting this size reduces potential # of re-sent notifications after a failure
zookeeper:
url: 192.168.10.10:2181 # or comma seperated list of multiple hosts
max_offset_lag: 600 # In seconds, undefined for none
url: 192.168.10.4:2181 # or comma seperated list of multiple hosts
logging: # Used in logging.dictConfig
version: 1
@ -61,4 +61,4 @@ logging: # Used in logging.dictConfig
root:
handlers:
- console
level: INFO
level: DEBUG

View File

@ -1,4 +1,4 @@
kafka-python>=0.9.0
kafka-python>=0.9.1
kazoo>=1.3
MySQL-python
pbr>=0.6,<1.0

View File

@ -15,6 +15,7 @@
"""Tests the StateTracker"""
import kafka.common
import mock
import multiprocessing
import threading
@ -29,7 +30,11 @@ class TestStateTracker(unittest.TestCase):
self.finished_queue = multiprocessing.Queue(10)
with mock.patch('kazoo.client.KazooClient') as self.mock_zk:
self.mock_zk.return_value = self.mock_zk
self.tracker = state_tracker.ZookeeperStateTracker('url', 'topic', self.finished_queue, 1)
with mock.patch('kafka.client.KafkaClient') as self.mock_kafka:
self.tracker = state_tracker.KafkaStateTracker(self.finished_queue, 'kafka_url', 'group',
'topic', 1, 'zookeeper_url')
self.mock_kafka.return_value = self.mock_kafka
self.tracker.has_lock = True
self.tracker_thread = threading.Thread(target=self.tracker.run)
self.tracker_thread.daemon = True # needed for the thread to properly exit
@ -50,9 +55,12 @@ class TestStateTracker(unittest.TestCase):
test_list = [(0, 1), (0, 2), (1, 1), (1, 2)]
self._feed_queue(test_list)
expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value))
expected_calls = [mock.call().send_offset_commit_request('group',
[kafka.common.OffsetCommitRequest('topic', partition,
value, None)])
for partition, value in test_list]
set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')]
set_calls = [call for call in self.mock_kafka.mock_calls
if call.__str__().startswith("call().send_offset_commit_request('")]
self.assertTrue(expected_calls == set_calls)
@ -63,9 +71,12 @@ class TestStateTracker(unittest.TestCase):
self._feed_queue(unordered_test_list)
commit_list = [(0, 2), (1, 2)]
expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value))
expected_calls = [mock.call().send_offset_commit_request('group',
[kafka.common.OffsetCommitRequest('topic', partition,
value, None)])
for partition, value in commit_list]
set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')]
set_calls = [call for call in self.mock_kafka.mock_calls
if call.__str__().startswith("call().send_offset_commit_request('")]
self.assertTrue(expected_calls == set_calls)
@ -83,8 +94,11 @@ class TestStateTracker(unittest.TestCase):
time.sleep(1) # wait for processing
commit_list = [(0, 3), (1, 3)]
expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value))
expected_calls = [mock.call().send_offset_commit_request('group',
[kafka.common.OffsetCommitRequest('topic', partition,
value, None)])
for partition, value in commit_list]
set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')]
set_calls = [call for call in self.mock_kafka.mock_calls
if call.__str__().startswith("call().send_offset_commit_request('")]
self.assertTrue(expected_calls == set_calls)

View File

@ -29,7 +29,7 @@ from monasca_notification import state_tracker
def listener():
"""Simple listener for ZookeeperStateTracker
"""Simple listener for KafkaStateTracker
"""
sys.exit(1)
@ -53,12 +53,12 @@ def main():
# Parse config and setup state tracker
config = yaml.load(open(args.config, 'r'))
tracker = state_tracker.ZookeeperStateTracker(
config['zookeeper']['url'], config['kafka']['alarm_topic'], None, config['zookeeper']['max_offset_lag'])
tracker = state_tracker.KafkaStateTracker(None, config['kafka']['url'], config['kafka']['group'],
config['kafka']['alarm_topic'], config['kafka']['max_offset_lag'],
config['zookeeper']['url'])
current_offsests = tracker.offsets
if args.list:
print(json.dumps(current_offsests))
print(json.dumps(tracker.offsets))
else:
offsets = json.loads(args.set_offsets)
raw_input("Warning setting offset will affect the behavior of the next notification engine to run.\n" +
@ -70,7 +70,7 @@ def main():
tracker.lock(listener)
for partition in offsets.iterkeys():
tracker._update_offset(int(partition), int(offsets[partition]))
tracker.update_offset(int(partition), int(offsets[partition]))
if __name__ == "__main__":
sys.exit(main())