Added in statsd!

This commit is contained in:
Tim Kuhlman 2014-03-18 11:09:47 -06:00
parent 93635b6571
commit c8e2e5c29f
9 changed files with 75 additions and 11 deletions

View File

@ -38,6 +38,25 @@ It is assumed the notification engine will be run by a process supervisor which
# Operation
Yaml config file by default is in '/etc/mon/notification.yaml', a sample is in this project.
## Monitoring
statsd is incorporated into the daemon and will send all stats to localhost on udp port 8125. In many cases the stats
are gathered per thread, the thread number is indicated by a -# at the end of the name.
- Counters
- ConsumedFromKafka
- AlarmsFailedParse
- AlarmsFinished
- AlarmsNoNotification
- AlarmsOffsetUpdated
- NotificationsCreated
- NotificationsSentSMTP
- NotificationsSentFailed
- NotificationsInvalidType
- PublishedToKafka
- Timers
- ConfigDBTime
- SMTPTime
# Future Considerations
- Currently I lock the topic rather than the partitions. This effectively means there is only one active notification
engine at any given time. In the future to share the load among multiple daemons we could lock by partition.

2
debian/control vendored
View File

@ -11,6 +11,6 @@ X-Python-Version: >= 2.6
Package: mon-notification
Architecture: all
Section: python
Depends: ${misc:Depends}, ${python:Depends}, libpython2.7, python-pkg-resources, kafka-python, python-yaml, python-mysqldb, python-kazoo
Depends: ${misc:Depends}, ${python:Depends}, libpython2.7, python-pkg-resources, kafka-python, python-yaml, python-mysqldb, python-kazoo, python-statsd
Description: Notification engine for monitoring.
Consumes alarms from Kafka and sends notifications appropriately.

View File

@ -88,7 +88,7 @@ def main(argv=None):
config['mysql']['user'],
config['mysql']['passwd'],
config['mysql']['db']
).run)
).run),
)
processors.extend(alarm_processors)
@ -101,7 +101,7 @@ def main(argv=None):
sent_notifications,
finished,
config['email']
).run)
).run),
)
processors.extend(notification_processors)

View File

@ -1,6 +1,8 @@
import json
import logging
import MySQLdb
import multiprocessing
import statsd
from mon_notification.processors import BaseProcessor
from mon_notification.notification import Notification
@ -47,6 +49,12 @@ class AlarmProcessor(BaseProcessor):
add to the finished_queue
"""
cur = self.mysql.cursor()
pname = multiprocessing.current_process().name
failed_parse_count = statsd.Counter('AlarmsFailedParse-%s' % pname)
no_notification_count = statsd.Counter('AlarmsNoNotification-%s' % pname)
notification_count = statsd.Counter('NotificationsCreated-%s' % pname)
db_time = statsd.Timer('ConfigDBTime-%s' % pname)
while True:
raw_alarm = self.alarm_queue.get()
partition = raw_alarm[0]
@ -54,6 +62,7 @@ class AlarmProcessor(BaseProcessor):
try:
alarm = self._parse_alarm(raw_alarm[1].message.value)
except Exception as e: # This is general because of a lack of json exception base class
failed_parse_count += 1
log.error("Invalid Alarm format skipping partition %d, offset %d\nErrror%s" % (partition, offset, e))
self._add_to_queue(self.finished_queue, 'finished', (partition, offset))
continue
@ -62,12 +71,13 @@ class AlarmProcessor(BaseProcessor):
% (partition, offset, alarm))
try:
cur.execute("SELECT notification_method_id FROM alarm_action WHERE alarm_id = %s", alarm['alarmId'])
ids = [row[0] for row in cur]
if len(ids) == 1:
cur.execute("SELECT name, type, address FROM notification_method WHERE id = %s", ids[0])
elif len(ids) > 1:
cur.execute("SELECT name, type, address FROM notification_method WHERE id in (%s)", ','.join(ids))
with db_time.time():
cur.execute("SELECT notification_method_id FROM alarm_action WHERE alarm_id = %s", alarm['alarmId'])
ids = [row[0] for row in cur]
if len(ids) == 1:
cur.execute("SELECT name, type, address FROM notification_method WHERE id = %s", ids[0])
elif len(ids) > 1:
cur.execute("SELECT name, type, address FROM notification_method WHERE id in (%s)", ','.join(ids))
except MySQLdb.Error:
log.exception('Error reading from mysql')
@ -76,8 +86,10 @@ class AlarmProcessor(BaseProcessor):
Notification(row[1].lower(), partition, offset, row[0], row[2], alarm) for row in cur]
if len(notifications) == 0:
no_notification_count += 1
log.debug('No notifications found for this alarm, partition %d, offset %d, alarm data %s'
% (partition, offset, alarm))
self._add_to_queue(self.finished_queue, 'finished', (partition, offset))
else:
notification_count += len(notifications)
self._add_to_queue(self.notification_queue, 'notifications', notifications)

View File

@ -1,6 +1,7 @@
import kafka.client
import kafka.consumer
import logging
import statsd
from mon_notification.processors import BaseProcessor
@ -36,6 +37,8 @@ class KafkaConsumer(BaseProcessor):
def run(self):
"""Consume from kafka and place alarm objects on the sent_queue
"""
counter = statsd.Counter('ConsumedFromKafka')
for message in self.consumer:
counter += 1
log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset))
self._add_to_queue(self.queue, 'alarms', message)

View File

@ -1,6 +1,8 @@
import email.mime.text
import logging
import multiprocessing
import smtplib
import statsd
import time
from mon_notification.processors import BaseProcessor
@ -69,18 +71,33 @@ class NotificationProcessor(BaseProcessor):
For each notification in a message it is sent according to its type.
If all notifications fail the alarm partition/offset are added to the the finished queue
"""
pname = multiprocessing.current_process().name
invalid_count = statsd.Counter('NotificationsInvalidType-%s' % pname)
failed_count = statsd.Counter('NotificationsSentFailed-%s' % pname)
smtp_sent_count = statsd.Counter('NotificationsSentSMTP-%s' % pname)
counters = {'email': smtp_sent_count}
smtp_time = statsd.Timer('SMTPTime-%s' % pname)
timers = {'email': smtp_time}
while True:
notifications = self.notification_queue.get()
sent_notifications = []
for notification in notifications:
if notification.type not in self.notification_types:
log.warn('Notification type %s is not a valid type' % notification.type)
invalid_count += 1
continue
else:
sent = self.notification_types[notification.type](notification)
if sent is not None:
with timers[notification.type].time():
sent = self.notification_types[notification.type](notification)
if sent is None:
failed_count += 1
else:
sent.notification_timestamp = time.time()
sent_notifications.append(sent)
counters[notification.type] += 1
if len(sent_notifications) == 0: # All notifications failed
self._add_to_queue(

View File

@ -1,6 +1,7 @@
import kafka.client
import kafka.producer
import logging
import statsd
from mon_notification.processors import BaseProcessor
@ -36,10 +37,12 @@ class SentNotificationProcessor(BaseProcessor):
"""Takes messages from the sent_queue, puts them on the kafka notification topic and then adds
partition/offset to the finished queue
"""
published_count = statsd.Counter('PublishedToKafka')
while True:
notifications = self.sent_queue.get()
for notification in notifications:
responses = self.producer.send_messages(self.topic, notification.to_json())
published_count += 1
log.debug('Published to topic %s, message %s' % (self.topic, notification.to_json()))
for resp in responses:
if resp.error != 0:

View File

@ -3,6 +3,7 @@ import kazoo.client
import kazoo.exceptions
import logging
import time
import statsd
from mon_notification import notification_exceptions
@ -123,8 +124,11 @@ class ZookeeperStateTracker(object):
if self._offsets is None: # Verify the offsets have been initialized
self._offsets = self._get_offsets()
finished_count = statsd.Counter('AlarmsFinished')
offset_update_count = statsd.Counter('AlarmsOffsetUpdated')
while True:
msg = self.finished_queue.get()
finished_count += 1
partition = int(msg[0])
offset = int(msg[1])
@ -144,6 +148,7 @@ class ZookeeperStateTracker(object):
else:
break
offset_update_count += new_offset - self._offsets[partition]
self._offsets[partition] = new_offset
if offset == new_offset:
log.debug('Updating offset for partition %d, offset %d' % (partition, new_offset))

View File

@ -40,6 +40,9 @@ logging: # Used in logging.dictConfig
default:
format: "%(asctime)s %(levelname)s %(name)s %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: default
file:
class : logging.handlers.RotatingFileHandler
filename: /var/log/mon-notification/notification.log
@ -51,6 +54,8 @@ logging: # Used in logging.dictConfig
level: WARN
kafka:
level: WARN
statsd:
level: WARN
root:
handlers:
- file