From 4fc9b6a3c94f1ea399410e6dacf0874e3266392a Mon Sep 17 00:00:00 2001 From: Tim Kuhlman Date: Thu, 27 Feb 2014 16:55:07 -0700 Subject: [PATCH] Initial structure --- .gitignore | 1 + README.md | 35 +++++++++++++ alarm.py | 3 ++ debian/changelog | 6 +++ debian/compat | 1 + debian/control | 16 ++++++ debian/copyright | 4 ++ debian/rules | 4 ++ dependencies.txt | 4 ++ kafka_consumer.py | 6 +++ main.py | 91 +++++++++++++++++++++++++++++++++ notification.yaml | 12 +++++ notifications/__init__.py | 5 ++ notifications/email.py | 6 +++ processors/__init__.py | 0 processors/alarm.py | 3 ++ processors/notification.py | 6 +++ processors/sent_notification.py | 4 ++ setup.py | 13 +++++ 19 files changed, 220 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 alarm.py create mode 100644 debian/changelog create mode 100644 debian/compat create mode 100644 debian/control create mode 100644 debian/copyright create mode 100755 debian/rules create mode 100644 dependencies.txt create mode 100644 kafka_consumer.py create mode 100644 main.py create mode 100644 notification.yaml create mode 100644 notifications/__init__.py create mode 100644 notifications/email.py create mode 100644 processors/__init__.py create mode 100644 processors/alarm.py create mode 100644 processors/notification.py create mode 100644 processors/sent_notification.py create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/README.md b/README.md new file mode 100644 index 0000000..9b49479 --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# Notification Engine + +This engine reads alarms from Kafka and then notifies the customer using their configured notification method. + +# Architecture +There are four processing steps separated by queues implemented with python multiprocessing. The steps are: +1. Reads Alarms from Kafka. - KafkaConsumer class +2. Determine notification type for an alarm. Done by reading from mysql. - AlarmProcessor class +3. Send Notification. - NotificationProcessor class +4. Update Vertica and Kafka that the notifications were sent. SentNotificationProcessor class + +There are three internal queues: +1. alarms - kafka alarms are added to this queue. Consists of Alarm objects. +2. notifications - notifications to be sent are added to this queue. Consists of Notification objects. +3. sent_notifications - notifications that have been sent are added here. Consists of Notification objects. + +Notification classes inherit from the notification abstract class and implement their specific notification method. + +## High Availability +HA is handled by utilizing multiple partitions withing kafka. When multiple notification engines are running the partitions +are spread out among them, as engines die/restart things reshuffle. + +The final step of writing back to to Vertica that an alarm was sent then updating the kafka pointer, could fail to run in a catastrophic failure. +This would result in multiple notifications which is an acceptable failure mode, better to send a notification twice than not at all. + +It is assumed the notification engine will be run by a process supervisor which will restart it in case of a failure. + +# Operation +Yaml config file by default in '/etc/mon/notification.yaml' process runs via upstart script. + +# Future Considerations +- How fast is the mysql db? How much load do we put on it. Initially I think it makes most sense to read notification + details for each alarm but eventually I may want to cache that info. +- I am starting with a single KafkaConsumer and a single SentNotificationProcessor depending on load this may need + to scale. diff --git a/alarm.py b/alarm.py new file mode 100644 index 0000000..b810287 --- /dev/null +++ b/alarm.py @@ -0,0 +1,3 @@ + +class Alarm(object): + pass \ No newline at end of file diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 0000000..d13055d --- /dev/null +++ b/debian/changelog @@ -0,0 +1,6 @@ +mon-notification (0.0.1) precise; urgency=low + + * Initial Package creation + + -- Tim Kuhlman Thu, 27 Feb 2014 04:12:44 -0600 + diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000..7f8f011 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +7 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000..7e335c1 --- /dev/null +++ b/debian/control @@ -0,0 +1,16 @@ +Source: mon-notification +Section: python +Priority: optional +Maintainer: HPCloud Monitoring +Build-Depends: debhelper (>= 7), + python (>= 2.6.6-3~), + python-setuptools +Standards-Version: 3.9.3 +X-Python-Version: >= 2.6 + +Package: mon-notification +Architecture: all +Section: python +Depends: ${misc:Depends}, ${python:Depends}, python-pyodbc, libpython2.7, python-pkg-resources, kafka-python +Description: Notification engine for monitoring. + Consumes alarms from Kafka and sends notifications appropriately. diff --git a/debian/copyright b/debian/copyright new file mode 100644 index 0000000..49d743b --- /dev/null +++ b/debian/copyright @@ -0,0 +1,4 @@ +Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ +Files: * +Copyright: 2014, HP +License: Proprietary diff --git a/debian/rules b/debian/rules new file mode 100755 index 0000000..4647c9c --- /dev/null +++ b/debian/rules @@ -0,0 +1,4 @@ +#!/usr/bin/make -f + +%: + dh $@ --with python2 diff --git a/dependencies.txt b/dependencies.txt new file mode 100644 index 0000000..7526c5b --- /dev/null +++ b/dependencies.txt @@ -0,0 +1,4 @@ +kafka-python +pyodbc +pkg-resources +yaml diff --git a/kafka_consumer.py b/kafka_consumer.py new file mode 100644 index 0000000..67447ed --- /dev/null +++ b/kafka_consumer.py @@ -0,0 +1,6 @@ + +class KafkaConsumer(object): + pass + +# Todo I need to intelligently handle partitions so that multiple notification engines can run +# I need to make sure to not advance the marker in kafka so that is only done by the SentNotificationProcessor \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..40af2eb --- /dev/null +++ b/main.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# +""" Notification Engine + This engine reads alarms from Kafka and then notifies the customer using their configured notification method. +""" + +import logging +import yaml +from multiprocessing import Process, Queue +import os +import signal +import sys + +from kafka_consumer import KafkaConsumer +from processors.alarm import AlarmProcessor +from processors.notification import NotificationProcessor +from processors.sent_notification import SentNotificationProcessor + +log = logging.getLogger(__name__) +processors = [] # global list to facilitate clean signal handling + + +def clean_exit(signum, frame): + """ Exit cleanly on defined signals + """ + # todo - Figure out good exiting. For most situations, make sure it all shuts down nicely, finishing up anything in the queue + for process in processors: + process.terminate() + + +def main(argv=None): + if argv is None: + argv = sys.argv + if len(argv) == 2: + config_file = argv[1] + elif len(argv) > 2: + print "Usage: " + argv[0] + " " + print "Config file defaults to /etc/mon/notification.yaml" + return 1 + else: + config_file = '/etc/mon/notification.yaml' + + config = yaml.load(open(config_file, 'r')) + + # Setup logging + log_path = os.path.join(config['log_dir'], 'notification.log') + logging.basicConfig(format='%(asctime)s %(message)s', filename=log_path, level=logging.INFO) + + #Create the queues + alarms = Queue(config['queues']['alarms_size']) + notifications = Queue(config['queues']['notifications_size']) + sent_notifications = Queue(config['queues']['sent_notifications_size']) + + ## Define processes + #start KafkaConsumer + kafka = Process(target=KafkaConsumer(config, alarms).run) # todo don't pass the config object just the bits needed + processors.append(kafka) + + #Define AlarmProcessors + alarm_processors = [] + for i in xrange(config['processors']['alarm']['number']): + alarm_processors.append(Process(target=AlarmProcessor(config, alarms, notifications).run)) # todo don't pass the config object just the bits needed + processors.extend(alarm_processors) + + #Define NotificationProcessors + notification_processors = [] + for i in xrange(config['processors']['notification']['number']): + notification_processors.append(Process(target=NotificationProcessor(config, notifications, sent_notifications).run)) # todo don't pass the config object just the bits needed + processors.extend(notification_processors) + + #Define SentNotificationProcessor + sent_notification_processor = Process(target=SentNotificationProcessor(config, sent_notifications).run) # todo don't pass the config object just the bits needed + processors.append(sent_notification_processor) + + ## Start + signal.signal(signal.SIGTERM, clean_exit) + try: + log.info('Starting processes') + for process in processors: + process.start() + except: + log.exception('Error exiting!') + for process in processors: + process.terminate() + + +# todo - I need to make a deb for kafka-python, code currently in ~/working/kafka-python + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/notification.yaml b/notification.yaml new file mode 100644 index 0000000..c1b5f55 --- /dev/null +++ b/notification.yaml @@ -0,0 +1,12 @@ +log_dir: /var/log/mon + +processors: + alarm: + number: 2 + notification: + number: 8 + +queues: + alarms_size: 1024 + notifications_size: 1024 + sent_notifications_size: 1024 diff --git a/notifications/__init__.py b/notifications/__init__.py new file mode 100644 index 0000000..8630210 --- /dev/null +++ b/notifications/__init__.py @@ -0,0 +1,5 @@ + +class Notification(object): + """ An abstract base class used to define the notification interface and common functions + """ + pass \ No newline at end of file diff --git a/notifications/email.py b/notifications/email.py new file mode 100644 index 0000000..0ef00f0 --- /dev/null +++ b/notifications/email.py @@ -0,0 +1,6 @@ +from . import Notification + +class EmailNotification(Notification): + pass + +# todo the smtp connection should have the ability to round robin over multiple connections which are managed and kept open \ No newline at end of file diff --git a/processors/__init__.py b/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/processors/alarm.py b/processors/alarm.py new file mode 100644 index 0000000..b02342c --- /dev/null +++ b/processors/alarm.py @@ -0,0 +1,3 @@ + +class AlarmProcessor(object): + pass diff --git a/processors/notification.py b/processors/notification.py new file mode 100644 index 0000000..2dac8b9 --- /dev/null +++ b/processors/notification.py @@ -0,0 +1,6 @@ + +class NotificationProcessor(object): + pass + +# todo - I can use pyodbc for both vertica and for mysql or could investigate MySQLdb for direct to mysql +# Both have the same interface so I can hide the details the same way. diff --git a/processors/sent_notification.py b/processors/sent_notification.py new file mode 100644 index 0000000..89faa27 --- /dev/null +++ b/processors/sent_notification.py @@ -0,0 +1,4 @@ + +class SentNotificationProcessor(object): + pass +# todo review the python vertica code John was working on and implement batch writes to vertica diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..2c34dc0 --- /dev/null +++ b/setup.py @@ -0,0 +1,13 @@ +from setuptools import setup, find_packages + +setup( + name="Monitoring Notification Engine", + version="0.1", + packages=find_packages(exclude=['tests']), + entry_points={ + 'console_scripts': [ + 'notification_engine = main' + ], + }, + test_suite='nose.collector' +)