Implemented offset_max_lag
This commit is contained in:
parent
be67d90ecd
commit
97469fa4cc
|
@ -94,7 +94,8 @@ def main(argv=None):
|
|||
|
||||
#State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock
|
||||
global tracker # Set to global for use in the cleanup function
|
||||
tracker = ZookeeperStateTracker(config['zookeeper']['url'], config['kafka']['alarm_topic'], finished)
|
||||
tracker = ZookeeperStateTracker(
|
||||
config['zookeeper']['url'], config['kafka']['alarm_topic'], finished, config['zookeeper']['max_offset_lag'])
|
||||
tracker.lock(clean_exit) # Only begin if we have the processing lock
|
||||
tracker_thread = threading.Thread(target=tracker.run)
|
||||
|
||||
|
|
|
@ -18,12 +18,13 @@ class ZookeeperStateTracker(object):
|
|||
The messages are not necessarily finished in order, but the committed offset includes
|
||||
all previous messages so this object tracks any gaps updating as needed.
|
||||
"""
|
||||
def __init__(self, url, topic, finished_queue):
|
||||
def __init__(self, url, topic, finished_queue, max_lag):
|
||||
"""Setup the finished_queue
|
||||
url is the zookeeper hostname:port
|
||||
topic is the kafka topic to track
|
||||
"""
|
||||
self.finished_queue = finished_queue
|
||||
self.max_lag = max_lag
|
||||
self.topic = topic
|
||||
self.has_lock = False
|
||||
self.stop = False
|
||||
|
@ -39,6 +40,10 @@ class ZookeeperStateTracker(object):
|
|||
# This is a dictionary of sets used for tracking finished offsets when there is a gap and the committed offset
|
||||
# can not yet be advanced
|
||||
self._uncommitted_offsets = collections.defaultdict(set)
|
||||
self._last_commit_time = collections.defaultdict(time.time)
|
||||
|
||||
self.zk_timer = statsd.Timer('OffsetCommitTime')
|
||||
self.offset_update_count = statsd.Counter('AlarmsOffsetUpdated')
|
||||
|
||||
def _drop_lock(self):
|
||||
"""Drop the lock file kept in zookeeper
|
||||
|
@ -67,17 +72,22 @@ class ZookeeperStateTracker(object):
|
|||
except kazoo.exceptions.KazooException:
|
||||
log.exception('Error retrieving the committed offset in zookeeper')
|
||||
|
||||
def _update_zk_offsets(self):
|
||||
"""Update zookeepers stored offset numbers to the values in self.offsets
|
||||
def _update_offset(self, partition, value):
|
||||
"""Update the object and zookeepers stored offset number for a partition to value
|
||||
"""
|
||||
self.offset_update_count += value - self._offsets[partition]
|
||||
self._offsets[partition] = value
|
||||
partition_path = '/'.join((self.topic_path, str(partition)))
|
||||
try:
|
||||
for partition, value in self._offsets.iteritems():
|
||||
partition_path = '/'.join((self.topic_path, str(partition)))
|
||||
with self.zk_timer.time():
|
||||
self.zookeeper.ensure_path(partition_path)
|
||||
self.zookeeper.set(partition_path, str(value))
|
||||
log.debug('Updated committed offsets at path %s, offsets %s' % (self.topic_path, self._offsets))
|
||||
log.debug('Updated committed offset at path %s, offsets %s' % (partition_path, value))
|
||||
except kazoo.exceptions.KazooException:
|
||||
log.exception('Error updating the committed offset in zookeeper')
|
||||
log.exception('Error updating the committed offset in zookeeper, path %s, value %s'
|
||||
% (partition_path, value))
|
||||
|
||||
self._last_commit_time[partition] = time.time()
|
||||
|
||||
@property
|
||||
def offsets(self):
|
||||
|
@ -135,15 +145,20 @@ class ZookeeperStateTracker(object):
|
|||
self._offsets = self._get_offsets()
|
||||
|
||||
finished_count = statsd.Counter('AlarmsFinished')
|
||||
offset_update_count = statsd.Counter('AlarmsOffsetUpdated')
|
||||
zk_timer = statsd.Timer('OffsetCommitTime')
|
||||
while True:
|
||||
# If self.stop is True run the queue until it is empty, don't block after that.
|
||||
# If self.stop is True run the queue until it is empty, do final commits then exit
|
||||
if self.stop and self.finished_queue.empty():
|
||||
log.debug('self.stop set and the finished_queue is empty, doing final wait')
|
||||
time.sleep(10) # Before final exit wait a bit to verify the queue is still empty
|
||||
if self.finished_queue.empty():
|
||||
# if the max_lag has been hit at this point commit the last received offset
|
||||
for partition in self._last_commit_time.iterkeys():
|
||||
if ((time.time() - self._last_commit_time[partition]) > self.max_lag) and\
|
||||
(len(self._uncommitted_offsets[partition]) > 0):
|
||||
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
|
||||
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
|
||||
break
|
||||
|
||||
try:
|
||||
msg = self.finished_queue.get(timeout=10)
|
||||
except Queue.Empty:
|
||||
|
@ -157,8 +172,7 @@ class ZookeeperStateTracker(object):
|
|||
# Update immediately if the partition is not yet tracked or the offset is 1 above current offset
|
||||
if partition not in self._offsets:
|
||||
log.debug('Updating offset for partition %d, offset %d' % (partition, offset))
|
||||
self._offsets[partition] = offset
|
||||
self._update_zk_offsets()
|
||||
self._update_offset(partition, offset)
|
||||
elif self._offsets[partition] == offset - 1:
|
||||
|
||||
new_offset = offset
|
||||
|
@ -169,19 +183,21 @@ class ZookeeperStateTracker(object):
|
|||
else:
|
||||
break
|
||||
|
||||
offset_update_count += new_offset - self._offsets[partition]
|
||||
self._offsets[partition] = new_offset
|
||||
self._update_offset(partition, new_offset)
|
||||
if offset == new_offset:
|
||||
log.debug('Updating offset for partition %d, offset %d' % (partition, new_offset))
|
||||
else:
|
||||
log.debug('Updating offset for partition %d, offset %d covering this update and older offsets'
|
||||
% (partition, new_offset))
|
||||
with zk_timer.time():
|
||||
self._update_zk_offsets()
|
||||
elif self._offsets[partition] > offset:
|
||||
log.error('An offset was received that was lower than the committed offset, this should not happen')
|
||||
else: # This is skipping offsets so just add to the uncommitted set
|
||||
log.warn('An offset was received that was lower than the committed offset.' +
|
||||
'Possibly a result of skipping lagging notifications')
|
||||
else: # This is skipping offsets so add to the uncommitted set unless max_lag has been hit
|
||||
self._uncommitted_offsets[partition].add(offset)
|
||||
log.debug('Added partition %d, offset %d to uncommited set' % (partition, offset))
|
||||
if (time.time() - self._last_commit_time[partition]) > self.max_lag:
|
||||
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
|
||||
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
|
||||
self._uncommitted_offsets[partition].clear()
|
||||
|
||||
self._drop_lock()
|
||||
|
|
|
@ -33,6 +33,7 @@ queues:
|
|||
|
||||
zookeeper:
|
||||
url: 192.168.10.10:2181
|
||||
max_offset_lag: 600 # In seconds
|
||||
|
||||
logging: # Used in logging.dictConfig
|
||||
version: 1
|
||||
|
|
Loading…
Reference in New Issue