Changes after running the hacking open stack code checks, except line length.
This commit is contained in:
parent
776f18b7a7
commit
06d7913248
|
@ -6,32 +6,31 @@
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import logging.config
|
import logging.config
|
||||||
from multiprocessing import Process, Queue
|
import multiprocessing
|
||||||
import signal
|
import signal
|
||||||
|
from state_tracker import ZookeeperStateTracker
|
||||||
import sys
|
import sys
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from state_tracker import ZookeeperStateTracker
|
|
||||||
from processors.kafka_consumer import KafkaConsumer
|
|
||||||
from processors.alarm_processor import AlarmProcessor
|
from processors.alarm_processor import AlarmProcessor
|
||||||
|
from processors.kafka_consumer import KafkaConsumer
|
||||||
from processors.notification_processor import NotificationProcessor
|
from processors.notification_processor import NotificationProcessor
|
||||||
from processors.sent_notification_processor import SentNotificationProcessor
|
from processors.sent_notification_processor import SentNotificationProcessor
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
processors = [] # global list to facilitate clean signal handling
|
processors = [] # global list to facilitate clean signal handling
|
||||||
|
|
||||||
|
|
||||||
def clean_exit(signum, frame=None):
|
def clean_exit(signum, frame=None):
|
||||||
""" Exit all processes cleanly
|
"""Exit all processes cleanly
|
||||||
Can be called on an os signal or no zookeeper losing connection.
|
Can be called on an os signal or no zookeeper losing connection.
|
||||||
"""
|
"""
|
||||||
for process in processors:
|
for process in processors:
|
||||||
# Since this is set up as a handler for SIGCHLD when this kills one child it gets another signal, the result
|
# Since this is set up as a handler for SIGCHLD when this kills one child it gets another signal, the result
|
||||||
# everything comes crashing down with some exceptions thrown for already dead processes
|
# everything comes crashing down with some exceptions thrown for already dead processes
|
||||||
try:
|
try:
|
||||||
process.terminate()
|
process.terminate()
|
||||||
except:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
@ -43,8 +42,8 @@ def main(argv=None):
|
||||||
if len(argv) == 2:
|
if len(argv) == 2:
|
||||||
config_file = argv[1]
|
config_file = argv[1]
|
||||||
elif len(argv) > 2:
|
elif len(argv) > 2:
|
||||||
print "Usage: " + argv[0] + " <config_file>"
|
print("Usage: " + argv[0] + " <config_file>")
|
||||||
print "Config file defaults to /etc/mon/notification.yaml"
|
print("Config file defaults to /etc/mon/notification.yaml")
|
||||||
return 1
|
return 1
|
||||||
else:
|
else:
|
||||||
config_file = '/etc/mon/notification.yaml'
|
config_file = '/etc/mon/notification.yaml'
|
||||||
|
@ -55,10 +54,10 @@ def main(argv=None):
|
||||||
logging.config.dictConfig(config['logging'])
|
logging.config.dictConfig(config['logging'])
|
||||||
|
|
||||||
#Create the queues
|
#Create the queues
|
||||||
alarms = Queue(config['queues']['alarms_size'])
|
alarms = multiprocessing.Queue(config['queues']['alarms_size'])
|
||||||
notifications = Queue(config['queues']['notifications_size']) # data is a list of notification objects
|
notifications = multiprocessing.Queue(config['queues']['notifications_size']) # [notification_object, ]
|
||||||
sent_notifications = Queue(config['queues']['sent_notifications_size']) # data is a list of notification objects
|
sent_notifications = multiprocessing.Queue(config['queues']['sent_notifications_size']) # [notification_object, ]
|
||||||
finished = Queue(config['queues']['finished_size']) # Data is of the form (partition, offset)
|
finished = multiprocessing.Queue(config['queues']['finished_size']) # Data is of the form (partition, offset)
|
||||||
|
|
||||||
#State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock
|
#State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock
|
||||||
tracker = ZookeeperStateTracker(config['zookeeper']['url'], config['kafka']['alarm_topic'], finished)
|
tracker = ZookeeperStateTracker(config['zookeeper']['url'], config['kafka']['alarm_topic'], finished)
|
||||||
|
@ -66,7 +65,7 @@ def main(argv=None):
|
||||||
|
|
||||||
## Define processors
|
## Define processors
|
||||||
#KafkaConsumer
|
#KafkaConsumer
|
||||||
kafka = Process(
|
kafka = multiprocessing.Process(
|
||||||
target=KafkaConsumer(
|
target=KafkaConsumer(
|
||||||
alarms,
|
alarms,
|
||||||
config['kafka']['url'],
|
config['kafka']['url'],
|
||||||
|
@ -79,8 +78,8 @@ def main(argv=None):
|
||||||
|
|
||||||
#AlarmProcessors
|
#AlarmProcessors
|
||||||
alarm_processors = []
|
alarm_processors = []
|
||||||
for i in xrange(config['processors']['alarm']['number']):
|
for i in range(config['processors']['alarm']['number']):
|
||||||
alarm_processors.append(Process(
|
alarm_processors.append(multiprocessing.Process(
|
||||||
target=AlarmProcessor(
|
target=AlarmProcessor(
|
||||||
alarms,
|
alarms,
|
||||||
notifications,
|
notifications,
|
||||||
|
@ -95,8 +94,8 @@ def main(argv=None):
|
||||||
|
|
||||||
#NotificationProcessors
|
#NotificationProcessors
|
||||||
notification_processors = []
|
notification_processors = []
|
||||||
for i in xrange(config['processors']['notification']['number']):
|
for i in range(config['processors']['notification']['number']):
|
||||||
notification_processors.append(Process(
|
notification_processors.append(multiprocessing.Process(
|
||||||
target=NotificationProcessor(
|
target=NotificationProcessor(
|
||||||
notifications,
|
notifications,
|
||||||
sent_notifications,
|
sent_notifications,
|
||||||
|
@ -107,7 +106,7 @@ def main(argv=None):
|
||||||
processors.extend(notification_processors)
|
processors.extend(notification_processors)
|
||||||
|
|
||||||
#SentNotificationProcessor
|
#SentNotificationProcessor
|
||||||
sent_notification_processor = Process(
|
sent_notification_processor = multiprocessing.Process(
|
||||||
target=SentNotificationProcessor(
|
target=SentNotificationProcessor(
|
||||||
sent_notifications,
|
sent_notifications,
|
||||||
finished,
|
finished,
|
||||||
|
@ -125,7 +124,7 @@ def main(argv=None):
|
||||||
for process in processors:
|
for process in processors:
|
||||||
process.start()
|
process.start()
|
||||||
tracker.run() # Runs in the main process
|
tracker.run() # Runs in the main process
|
||||||
except:
|
except Exception:
|
||||||
log.exception('Error exiting!')
|
log.exception('Error exiting!')
|
||||||
for process in processors:
|
for process in processors:
|
||||||
process.terminate()
|
process.terminate()
|
||||||
|
|
|
@ -2,7 +2,7 @@ import json
|
||||||
|
|
||||||
|
|
||||||
class Notification(object):
|
class Notification(object):
|
||||||
""" An abstract base class used to define the notification interface and common functions
|
"""An abstract base class used to define the notification interface and common functions
|
||||||
"""
|
"""
|
||||||
__slots__ = (
|
__slots__ = (
|
||||||
'address',
|
'address',
|
||||||
|
@ -20,13 +20,13 @@ class Notification(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, ntype, src_partition, src_offset, name, address, alarm):
|
def __init__(self, ntype, src_partition, src_offset, name, address, alarm):
|
||||||
""" Setup the notification object
|
"""Setup the notification object
|
||||||
The src_partition and src_offset allow the notification to be linked to the alarm that it came from
|
The src_partition and src_offset allow the notification to be linked to the alarm that it came from
|
||||||
ntype - The notification type
|
ntype - The notification type
|
||||||
name - Name used in sending
|
name - Name used in sending
|
||||||
address - to send the notification to
|
address - to send the notification to
|
||||||
alarm_data - info that caused the notification
|
alarm_data - info that caused the notification
|
||||||
notifications that come after this one to remain uncommitted.
|
notifications that come after this one to remain uncommitted.
|
||||||
"""
|
"""
|
||||||
self.address = address
|
self.address = address
|
||||||
self.name = name
|
self.name = name
|
||||||
|
@ -44,7 +44,7 @@ class Notification(object):
|
||||||
self.notification_timestamp = None # to be updated on actual notification send time
|
self.notification_timestamp = None # to be updated on actual notification send time
|
||||||
|
|
||||||
def to_json(self):
|
def to_json(self):
|
||||||
""" Return json representation
|
"""Return json representation
|
||||||
"""
|
"""
|
||||||
notification_fields = [
|
notification_fields = [
|
||||||
'address',
|
'address',
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
|
|
||||||
class NotificationException(Exception):
|
class NotificationException(Exception):
|
||||||
""" Base exception class """
|
"""Base exception class.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ log = logging.getLogger(__name__)
|
||||||
class BaseProcessor(object):
|
class BaseProcessor(object):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _add_to_queue(queue, queue_name, msg):
|
def _add_to_queue(queue, queue_name, msg):
|
||||||
""" Warns on full queue then does a blocking push to the queue.
|
"""Warns on full queue then does a blocking push to the queue.
|
||||||
"""
|
"""
|
||||||
if queue.full():
|
if queue.full():
|
||||||
log.warn('Queue %s is full, publishing is blocked' % queue_name)
|
log.warn('Queue %s is full, publishing is blocked' % queue_name)
|
||||||
|
|
|
@ -2,9 +2,9 @@ import json
|
||||||
import logging
|
import logging
|
||||||
import MySQLdb
|
import MySQLdb
|
||||||
|
|
||||||
from . import BaseProcessor
|
from mon_notification.processors import BaseProcessor
|
||||||
from ..notification_exceptions import AlarmFormatError
|
from mon_notification.notification import Notification
|
||||||
from ..notification import Notification
|
from mon_notification.notification_exceptions import AlarmFormatError
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -20,7 +20,7 @@ class AlarmProcessor(BaseProcessor):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _parse_alarm(alarm_data):
|
def _parse_alarm(alarm_data):
|
||||||
""" Parse the alarm message making sure it matches the expected format.
|
"""Parse the alarm message making sure it matches the expected format.
|
||||||
"""
|
"""
|
||||||
expected_fields = [
|
expected_fields = [
|
||||||
'alarmId',
|
'alarmId',
|
||||||
|
@ -43,8 +43,8 @@ class AlarmProcessor(BaseProcessor):
|
||||||
return alarm
|
return alarm
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
""" Check the notification setting for this project in mysql then create the appropriate notification or
|
"""Check the notification setting for this project in mysql then create the appropriate notification or
|
||||||
add to the finished_queue
|
add to the finished_queue
|
||||||
"""
|
"""
|
||||||
cur = self.mysql.cursor()
|
cur = self.mysql.cursor()
|
||||||
while True:
|
while True:
|
||||||
|
@ -53,7 +53,7 @@ class AlarmProcessor(BaseProcessor):
|
||||||
offset = raw_alarm[1].offset
|
offset = raw_alarm[1].offset
|
||||||
try:
|
try:
|
||||||
alarm = self._parse_alarm(raw_alarm[1].message.value)
|
alarm = self._parse_alarm(raw_alarm[1].message.value)
|
||||||
except Exception, e: # This is general because of a lack of json exception base class
|
except Exception as e: # This is general because of a lack of json exception base class
|
||||||
log.error("Invalid Alarm format skipping partition %d, offset %d\nErrror%s" % (partition, offset, e))
|
log.error("Invalid Alarm format skipping partition %d, offset %d\nErrror%s" % (partition, offset, e))
|
||||||
self._add_to_queue(self.finished_queue, 'finished', (partition, offset))
|
self._add_to_queue(self.finished_queue, 'finished', (partition, offset))
|
||||||
continue
|
continue
|
||||||
|
@ -81,4 +81,3 @@ class AlarmProcessor(BaseProcessor):
|
||||||
self._add_to_queue(self.finished_queue, 'finished', (partition, offset))
|
self._add_to_queue(self.finished_queue, 'finished', (partition, offset))
|
||||||
else:
|
else:
|
||||||
self._add_to_queue(self.notification_queue, 'notifications', notifications)
|
self._add_to_queue(self.notification_queue, 'notifications', notifications)
|
||||||
|
|
||||||
|
|
|
@ -1,30 +1,30 @@
|
||||||
|
import kafka.client
|
||||||
|
import kafka.consumer
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from . import BaseProcessor
|
from mon_notification.processors import BaseProcessor
|
||||||
from kafka.client import KafkaClient
|
|
||||||
from kafka.consumer import SimpleConsumer
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class KafkaConsumer(BaseProcessor):
|
class KafkaConsumer(BaseProcessor):
|
||||||
""" Pull from the alarm topic and place alarm objects on the sent_queue.
|
"""Pull from the alarm topic and place alarm objects on the sent_queue.
|
||||||
No commit is being done until processing is finished and as the processing can take some time it is done in
|
No commit is being done until processing is finished and as the processing can take some time it is done in
|
||||||
another step.
|
another step.
|
||||||
|
|
||||||
Unfortunately at this point the python-kafka client does not handle multiple consumers seamlessly.
|
Unfortunately at this point the python-kafka client does not handle multiple consumers seamlessly.
|
||||||
For more information see, https://github.com/mumrah/kafka-python/issues/112
|
For more information see, https://github.com/mumrah/kafka-python/issues/112
|
||||||
"""
|
"""
|
||||||
def __init__(self, queue, kafka_url, group, topic, initial_offsets=None):
|
def __init__(self, queue, kafka_url, group, topic, initial_offsets=None):
|
||||||
"""
|
"""Init
|
||||||
kafka_url, group, topic - kafka connection details
|
kafka_url, group, topic - kafka connection details
|
||||||
sent_queue - a sent_queue to publish log entries to
|
sent_queue - a sent_queue to publish log entries to
|
||||||
"""
|
"""
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
|
||||||
self.kafka = KafkaClient(kafka_url)
|
self.kafka = kafka.client.KafkaClient(kafka_url)
|
||||||
# No autocommit, it does not work with kafka 0.8.0 - see https://github.com/mumrah/kafka-python/issues/118
|
# No autocommit, it does not work with kafka 0.8.0 - see https://github.com/mumrah/kafka-python/issues/118
|
||||||
self.consumer = SimpleConsumer(self.kafka, group, topic, auto_commit=False)
|
self.consumer = kafka.consumer.SimpleConsumer(self.kafka, group, topic, auto_commit=False)
|
||||||
self.consumer.provide_partition_info() # Without this the partition is not provided in the response
|
self.consumer.provide_partition_info() # Without this the partition is not provided in the response
|
||||||
if initial_offsets is not None:
|
if initial_offsets is not None:
|
||||||
# Set initial offsets directly in the consumer, there is no method for this so I have to do it here
|
# Set initial offsets directly in the consumer, there is no method for this so I have to do it here
|
||||||
|
@ -34,7 +34,7 @@ class KafkaConsumer(BaseProcessor):
|
||||||
self.consumer.fetch_offsets[partition] = initial_offsets[partition] + 1
|
self.consumer.fetch_offsets[partition] = initial_offsets[partition] + 1
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
""" Consume from kafka and place alarm objects on the sent_queue
|
"""Consume from kafka and place alarm objects on the sent_queue
|
||||||
"""
|
"""
|
||||||
for message in self.consumer:
|
for message in self.consumer:
|
||||||
log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset))
|
log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset))
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
from email.mime.text import MIMEText
|
import email.mime.text
|
||||||
import logging
|
import logging
|
||||||
import smtplib
|
import smtplib
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from . import BaseProcessor
|
from mon_notification.processors import BaseProcessor
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -23,36 +23,37 @@ class NotificationProcessor(BaseProcessor):
|
||||||
self.notification_types = {'email': self._send_email}
|
self.notification_types = {'email': self._send_email}
|
||||||
|
|
||||||
def _send_email(self, notification):
|
def _send_email(self, notification):
|
||||||
""" Send the notification via email
|
"""Send the notification via email
|
||||||
Returns the notification upon success, None upon failure
|
Returns the notification upon success, None upon failure
|
||||||
"""
|
"""
|
||||||
|
msg = email.mime.text.MIMEText("%s\nAlarm %s transitioned to the %s state at %s\nFull Data:\n%s"
|
||||||
|
% (notification.message,
|
||||||
|
notification.alarm_name,
|
||||||
|
notification.state,
|
||||||
|
notification.alarm_timestamp,
|
||||||
|
notification.to_json()))
|
||||||
|
msg['Subject'] = '%s: %s' % (notification.state, notification.alarm_name)
|
||||||
|
msg['From'] = self.email_config['from_addr']
|
||||||
|
msg['To'] = notification.address
|
||||||
|
|
||||||
try:
|
try:
|
||||||
msg = MIMEText("%s\nAlarm %s transitioned to the %s state at %s\nFull Data:\n%s"
|
|
||||||
% (notification.message,
|
|
||||||
notification.alarm_name,
|
|
||||||
notification.state,
|
|
||||||
notification.alarm_timestamp,
|
|
||||||
notification.to_json()))
|
|
||||||
msg['Subject'] = '%s: %s' % (notification.state, notification.alarm_name)
|
|
||||||
msg['From'] = self.email_config['from_addr']
|
|
||||||
msg['To'] = notification.address
|
|
||||||
self.smtp.sendmail(self.email_config['from_addr'], notification.address, msg.as_string())
|
self.smtp.sendmail(self.email_config['from_addr'], notification.address, msg.as_string())
|
||||||
log.debug('Sent email to %s, notification %s' % (self.email_config['from_addr'], notification.to_json()))
|
log.debug('Sent email to %s, notification %s' % (notification.address, notification.to_json()))
|
||||||
except smtplib.SMTPServerDisconnected, e:
|
except smtplib.SMTPServerDisconnected as e:
|
||||||
log.debug('SMTP server disconnected. Will reconnect and retry message.')
|
log.debug('SMTP server disconnected. Will reconnect and retry message.')
|
||||||
self._smtp_connect()
|
self._smtp_connect()
|
||||||
try:
|
try:
|
||||||
self.smtp.sendmail(self.email_config['from_addr'], notification.address, msg.as_string())
|
self.smtp.sendmail(self.email_config['from_addr'], notification.address, msg.as_string())
|
||||||
log.debug('Sent email to %s, notification %s' % (self.email_config['from_addr'], notification.to_json()))
|
log.debug('Sent email to %s, notification %s' % (notification.address, notification.to_json()))
|
||||||
except smtplib.SMTPException, e:
|
except smtplib.SMTPException as e:
|
||||||
log.error("Error sending Email Notification:%s\nError:%s" % (notification.to_json(), e))
|
log.error("Error sending Email Notification:%s\nError:%s" % (notification.to_json(), e))
|
||||||
except smtplib.SMTPException, e:
|
except smtplib.SMTPException as e:
|
||||||
log.error("Error sending Email Notification:%s\nError:%s" % (notification.to_json(), e))
|
log.error("Error sending Email Notification:%s\nError:%s" % (notification.to_json(), e))
|
||||||
else:
|
else:
|
||||||
return notification
|
return notification
|
||||||
|
|
||||||
def _smtp_connect(self):
|
def _smtp_connect(self):
|
||||||
""" Connect to the smtp server
|
"""Connect to the smtp server
|
||||||
"""
|
"""
|
||||||
log.info('Connecting to Email Server %s' % self.email_config['server'])
|
log.info('Connecting to Email Server %s' % self.email_config['server'])
|
||||||
smtp = smtplib.SMTP(
|
smtp = smtplib.SMTP(
|
||||||
|
@ -64,9 +65,9 @@ class NotificationProcessor(BaseProcessor):
|
||||||
self.smtp = smtp
|
self.smtp = smtp
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
""" Send the notifications
|
"""Send the notifications
|
||||||
For each notification in a message it is sent according to its type.
|
For each notification in a message it is sent according to its type.
|
||||||
If all notifications fail the alarm partition/offset are added to the the finished queue
|
If all notifications fail the alarm partition/offset are added to the the finished queue
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
notifications = self.notification_queue.get()
|
notifications = self.notification_queue.get()
|
||||||
|
|
|
@ -1,40 +1,40 @@
|
||||||
|
import kafka.client
|
||||||
|
import kafka.producer
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from . import BaseProcessor
|
from mon_notification.processors import BaseProcessor
|
||||||
from kafka.client import KafkaClient
|
|
||||||
from kafka.producer import SimpleProducer
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class SentNotificationProcessor(BaseProcessor):
|
class SentNotificationProcessor(BaseProcessor):
|
||||||
""" Processes notifications which have been sent
|
"""Processes notifications which have been sent
|
||||||
This involves adding them into a kafka topic for persisting by another process and adding the alarm
|
This involves adding them into a kafka topic for persisting by another process and adding the alarm
|
||||||
to the finished queue.
|
to the finished queue.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, sent_queue, finished_queue, url, topic):
|
def __init__(self, sent_queue, finished_queue, url, topic):
|
||||||
"""
|
"""Init
|
||||||
url, group - kafka connection details
|
url, group - kafka connection details
|
||||||
topic - kafka topic to publish notifications to
|
topic - kafka topic to publish notifications to
|
||||||
finished_queue - queue written to when notifications are fully finished.
|
finished_queue - queue written to when notifications are fully finished.
|
||||||
sent_queue - the sent_notifications queue notifications are read from
|
sent_queue - the sent_notifications queue notifications are read from
|
||||||
"""
|
"""
|
||||||
self.topic = topic
|
self.topic = topic
|
||||||
self.finished_queue = finished_queue
|
self.finished_queue = finished_queue
|
||||||
self.sent_queue = sent_queue
|
self.sent_queue = sent_queue
|
||||||
|
|
||||||
self.kafka = KafkaClient(url)
|
self.kafka = kafka.client.KafkaClient(url)
|
||||||
self.producer = SimpleProducer(
|
self.producer = kafka.producer.SimpleProducer(
|
||||||
self.kafka,
|
self.kafka,
|
||||||
async=False,
|
async=False,
|
||||||
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
|
req_acks=kafka.producer.SimpleProducer.ACK_AFTER_LOCAL_WRITE,
|
||||||
ack_timeout=2000
|
ack_timeout=2000
|
||||||
)
|
)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
""" Takes messages from the sent_queue, puts them on the kafka notification topic and then adds
|
"""Takes messages from the sent_queue, puts them on the kafka notification topic and then adds
|
||||||
partition/offset to the finished queue
|
partition/offset to the finished queue
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
notifications = self.sent_queue.get()
|
notifications = self.sent_queue.get()
|
||||||
|
|
|
@ -1,32 +1,31 @@
|
||||||
from collections import defaultdict
|
import collections
|
||||||
|
import kazoo.client
|
||||||
|
import kazoo.exceptions
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from kazoo.client import KazooClient
|
from mon_notification import notification_exceptions
|
||||||
from kazoo.exceptions import KazooException
|
|
||||||
|
|
||||||
from notification_exceptions import NotificationException
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ZookeeperStateTracker(object):
|
class ZookeeperStateTracker(object):
|
||||||
""" Tracks message offsets for a kafka topic and partitions.
|
"""Tracks message offsets for a kafka topic and partitions.
|
||||||
Uses zookeeper to keep track of the last committed offset.
|
Uses zookeeper to keep track of the last committed offset.
|
||||||
As messages are finished with processing the committed offset is updated periodically.
|
As messages are finished with processing the committed offset is updated periodically.
|
||||||
The messages are not necessarily finished in order, but the committed offset includes
|
The messages are not necessarily finished in order, but the committed offset includes
|
||||||
all previous messages so this object tracks any gaps updating as needed.
|
all previous messages so this object tracks any gaps updating as needed.
|
||||||
"""
|
"""
|
||||||
def __init__(self, url, topic, finished_queue):
|
def __init__(self, url, topic, finished_queue):
|
||||||
""" Setup the finished_queue
|
"""Setup the finished_queue
|
||||||
url is the zookeeper hostname:port
|
url is the zookeeper hostname:port
|
||||||
topic is the kafka topic to track
|
topic is the kafka topic to track
|
||||||
"""
|
"""
|
||||||
self.finished_queue = finished_queue
|
self.finished_queue = finished_queue
|
||||||
self.topic = topic
|
self.topic = topic
|
||||||
self.has_lock = False
|
self.has_lock = False
|
||||||
|
|
||||||
self.zookeeper = KazooClient(url)
|
self.zookeeper = kazoo.client.KazooClient(url)
|
||||||
self.zookeeper.start()
|
self.zookeeper.start()
|
||||||
self.topic_path = '/consumers/mon-notification/%s' % topic
|
self.topic_path = '/consumers/mon-notification/%s' % topic
|
||||||
|
|
||||||
|
@ -36,11 +35,11 @@ class ZookeeperStateTracker(object):
|
||||||
self._offsets = None
|
self._offsets = None
|
||||||
# This is a dictionary of sets used for tracking finished offsets when there is a gap and the committed offset
|
# This is a dictionary of sets used for tracking finished offsets when there is a gap and the committed offset
|
||||||
# can not yet be advanced
|
# can not yet be advanced
|
||||||
self._uncommitted_offsets = defaultdict(set)
|
self._uncommitted_offsets = collections.defaultdict(set)
|
||||||
|
|
||||||
def _get_offsets(self):
|
def _get_offsets(self):
|
||||||
""" Read the initial offsets from zookeeper or set defaults
|
"""Read the initial offsets from zookeeper or set defaults
|
||||||
The return is a dictionary with key name being partition # and value the offset
|
The return is a dictionary with key name being partition # and value the offset
|
||||||
"""
|
"""
|
||||||
if not self.has_lock:
|
if not self.has_lock:
|
||||||
log.warn('Reading offsets before the tracker has the lock, they could change')
|
log.warn('Reading offsets before the tracker has the lock, they could change')
|
||||||
|
@ -54,11 +53,11 @@ class ZookeeperStateTracker(object):
|
||||||
else:
|
else:
|
||||||
self.zookeeper.ensure_path(self.topic_path)
|
self.zookeeper.ensure_path(self.topic_path)
|
||||||
return {}
|
return {}
|
||||||
except KazooException:
|
except kazoo.exceptions.KazooException:
|
||||||
log.exception('Error retrieving the committed offset in zookeeper')
|
log.exception('Error retrieving the committed offset in zookeeper')
|
||||||
|
|
||||||
def _update_zk_offsets(self):
|
def _update_zk_offsets(self):
|
||||||
""" Update zookeepers stored offset numbers to the values in self.offsets
|
"""Update zookeepers stored offset numbers to the values in self.offsets
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
for partition, value in self._offsets.iteritems():
|
for partition, value in self._offsets.iteritems():
|
||||||
|
@ -66,12 +65,12 @@ class ZookeeperStateTracker(object):
|
||||||
self.zookeeper.ensure_path(partition_path)
|
self.zookeeper.ensure_path(partition_path)
|
||||||
self.zookeeper.set(partition_path, str(value))
|
self.zookeeper.set(partition_path, str(value))
|
||||||
log.debug('Updated committed offsets at path %s, offsets %s' % (self.topic_path, self._offsets))
|
log.debug('Updated committed offsets at path %s, offsets %s' % (self.topic_path, self._offsets))
|
||||||
except KazooException:
|
except kazoo.exceptions.KazooException:
|
||||||
log.exception('Error updating the committed offset in zookeeper')
|
log.exception('Error updating the committed offset in zookeeper')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def offsets(self):
|
def offsets(self):
|
||||||
""" Generally only initialize the offsets after the lock has been pulled
|
"""Generally only initialize the offsets after the lock has been pulled
|
||||||
"""
|
"""
|
||||||
if self._offsets is None:
|
if self._offsets is None:
|
||||||
self._offsets = self._get_offsets()
|
self._offsets = self._get_offsets()
|
||||||
|
@ -79,8 +78,8 @@ class ZookeeperStateTracker(object):
|
||||||
return self._offsets
|
return self._offsets
|
||||||
|
|
||||||
def get_offsets(self, partitions=None):
|
def get_offsets(self, partitions=None):
|
||||||
""" Return the offsets for specified partitions or all if partitions is None
|
"""Return the offsets for specified partitions or all if partitions is None
|
||||||
The return is a dictionary with key name being partition # and value the offset
|
The return is a dictionary with key name being partition # and value the offset
|
||||||
"""
|
"""
|
||||||
if partitions is None:
|
if partitions is None:
|
||||||
return self.offsets
|
return self.offsets
|
||||||
|
@ -88,7 +87,7 @@ class ZookeeperStateTracker(object):
|
||||||
return {k: self.offsets[k] for k in partitions}
|
return {k: self.offsets[k] for k in partitions}
|
||||||
|
|
||||||
def lock(self, exit_method):
|
def lock(self, exit_method):
|
||||||
""" Grab a lock within zookeeper, if not available retry.
|
"""Grab a lock within zookeeper, if not available retry.
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
# The path is ephemeral so if it exists wait then cycle again
|
# The path is ephemeral so if it exists wait then cycle again
|
||||||
|
@ -99,7 +98,7 @@ class ZookeeperStateTracker(object):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.zookeeper.create(self.lock_path, ephemeral=True, makepath=True)
|
self.zookeeper.create(self.lock_path, ephemeral=True, makepath=True)
|
||||||
except KazooException, e:
|
except kazoo.exceptions.KazooException as e:
|
||||||
# If creating the path fails something beat us to it most likely, try again
|
# If creating the path fails something beat us to it most likely, try again
|
||||||
log.warn('Error creating lock path %s\n%s' % (self.lock_path, e))
|
log.warn('Error creating lock path %s\n%s' % (self.lock_path, e))
|
||||||
continue
|
continue
|
||||||
|
@ -114,12 +113,12 @@ class ZookeeperStateTracker(object):
|
||||||
self.zookeeper.add_listener(exit_method)
|
self.zookeeper.add_listener(exit_method)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
""" Mark a message as finished and where possible commit the new offset to zookeeper.
|
"""Mark a message as finished and where possible commit the new offset to zookeeper.
|
||||||
There is no mechanism here to deal with the situation where a single alarm is extremely slow to finish
|
There is no mechanism here to deal with the situation where a single alarm is extremely slow to finish
|
||||||
holding up all others behind it. It is assumed the notification job will time out allowing things to finish.
|
holding up all others behind it. It is assumed the notification will time out allowing things to finish.
|
||||||
"""
|
"""
|
||||||
if not self.has_lock:
|
if not self.has_lock:
|
||||||
raise NotificationException('Attempt to begin run without Zookeeper Lock')
|
raise notification_exceptions.NotificationException('Attempt to begin run without Zookeeper Lock')
|
||||||
|
|
||||||
if self._offsets is None: # Verify the offsets have been initialized
|
if self._offsets is None: # Verify the offsets have been initialized
|
||||||
self._offsets = self._get_offsets()
|
self._offsets = self._get_offsets()
|
||||||
|
@ -138,7 +137,7 @@ class ZookeeperStateTracker(object):
|
||||||
elif self._offsets[partition] == offset - 1:
|
elif self._offsets[partition] == offset - 1:
|
||||||
|
|
||||||
new_offset = offset
|
new_offset = offset
|
||||||
for x in xrange(offset + 1, offset + 1 + len(self._uncommitted_offsets[partition])):
|
for x in range(offset + 1, offset + 1 + len(self._uncommitted_offsets[partition])):
|
||||||
if x in self._uncommitted_offsets[partition]:
|
if x in self._uncommitted_offsets[partition]:
|
||||||
new_offset = x
|
new_offset = x
|
||||||
self._uncommitted_offsets[partition].remove(x)
|
self._uncommitted_offsets[partition].remove(x)
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
[pep8]
|
||||||
|
max-line-length = 120
|
||||||
|
|
||||||
|
[flake8]
|
||||||
|
max-line-length = 120
|
6
setup.py
6
setup.py
|
@ -1,9 +1,9 @@
|
||||||
from setuptools import setup, find_packages
|
import setuptools
|
||||||
|
|
||||||
setup(
|
setuptools.setup(
|
||||||
name="mon-notification",
|
name="mon-notification",
|
||||||
version="0.1",
|
version="0.1",
|
||||||
packages=find_packages(exclude=['tests']),
|
packages=setuptools.find_packages(exclude=['tests']),
|
||||||
entry_points={
|
entry_points={
|
||||||
'console_scripts': [
|
'console_scripts': [
|
||||||
'mon-notification = mon_notification.main:main'
|
'mon-notification = mon_notification.main:main'
|
||||||
|
|
Loading…
Reference in New Issue