Changed ttl to seconds, made both ttl and max_lag optional

This commit is contained in:
Tim Kuhlman 2014-03-19 13:19:06 -06:00
parent 1887dc8a7c
commit 2dd29d0b28
3 changed files with 11 additions and 10 deletions

View File

@ -75,7 +75,7 @@ class AlarmProcessor(BaseProcessor):
% (partition, offset, alarm))
alarm_age = time.time() - alarm['timestamp'] # Should all be in seconds since epoch
if alarm_age/60 > self.alarm_ttl: # ttl is in minutes not seconds
if (self.alarm_ttl is not None) and (alarm_age > self.alarm_ttl):
no_notification_count += 1
self._add_to_queue(self.finished_queue, 'finished', (partition, offset))
log.warn('Received alarm older than the ttl, skipping. Alarm from %s' % time.ctime(alarm['timestamp']))

View File

@ -151,12 +151,13 @@ class ZookeeperStateTracker(object):
log.debug('self.stop set and the finished_queue is empty, doing final wait')
time.sleep(10) # Before final exit wait a bit to verify the queue is still empty
if self.finished_queue.empty():
# if the max_lag has been hit at this point commit the last received offset
for partition in self._last_commit_time.iterkeys():
if ((time.time() - self._last_commit_time[partition]) > self.max_lag) and\
(len(self._uncommitted_offsets[partition]) > 0):
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
if self.max_lag is not None:
# if the max_lag has been hit at this point commit the last received offset
for partition in self._last_commit_time.iterkeys():
if ((time.time() - self._last_commit_time[partition]) > self.max_lag) and\
(len(self._uncommitted_offsets[partition]) > 0):
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
break
try:
@ -195,7 +196,7 @@ class ZookeeperStateTracker(object):
else: # This is skipping offsets so add to the uncommitted set unless max_lag has been hit
self._uncommitted_offsets[partition].add(offset)
log.debug('Added partition %d, offset %d to uncommited set' % (partition, offset))
if (time.time() - self._last_commit_time[partition]) > self.max_lag:
if (self.max_lag is not None) and ((time.time() - self._last_commit_time[partition]) > self.max_lag):
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
self._uncommitted_offsets[partition].clear()

View File

@ -21,7 +21,7 @@ email:
processors:
alarm:
number: 2
ttl: 1440 # In minutes, Alarms older than this are not processed
ttl: 14400 # In seconds, undefined for none. Alarms older than this are not processed
notification:
number: 4
@ -33,7 +33,7 @@ queues:
zookeeper:
url: 192.168.10.10:2181
max_offset_lag: 600 # In seconds
max_offset_lag: 600 # In seconds, undefined for none
logging: # Used in logging.dictConfig
version: 1