Added pagerduty support to notification_processor

Refactored the tests to make pagerduty functionality more clear

Change-Id: Ie9616d8629112fc5f1f1576951a3cbe3b99218b0
This commit is contained in:
Joe Keen 2014-12-19 10:46:15 -07:00
parent 9b1df54682
commit 45a2411ebb
4 changed files with 316 additions and 116 deletions

View File

@ -157,7 +157,8 @@ def main(argv=None):
sent_notifications,
finished,
config['email'],
config['webhook']
config['webhook'],
config['pagerduty']
).run),
)
processors.extend(notification_processors)

View File

@ -14,6 +14,7 @@
# limitations under the License.
import email.mime.text
import json
import logging
import monascastatsd
import requests
@ -28,8 +29,9 @@ log = logging.getLogger(__name__)
class NotificationProcessor(BaseProcessor):
def __init__(self, notification_queue, sent_notification_queue,
finished_queue, email_config, webhook_config):
def __init__(self, notification_queue,
sent_notification_queue, finished_queue,
email_config, webhook_config, pagerduty_config):
self.notification_queue = notification_queue
self.sent_notification_queue = sent_notification_queue
self.finished_queue = finished_queue
@ -39,12 +41,18 @@ class NotificationProcessor(BaseProcessor):
self.webhook_config = {'timeout': 5}
self.webhook_config.update(webhook_config)
self.pagerduty_config = {
'timeout': 5,
'url': 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'}
self.pagerduty_config.update(pagerduty_config)
self.smtp = None
self._smtp_connect()
# Types as key, method used to process that type as value
self.notification_types = {'email': self._send_email,
'webhook': self._post_webhook}
'webhook': self._post_webhook,
'pagerduty': self._post_pagerduty}
self.statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions)
@ -156,15 +164,49 @@ class NotificationProcessor(BaseProcessor):
except:
log.error("Error trying to post on URL %s: %s." % (url, sys.exc_info()[0]))
def _post_pagerduty(self, notification):
"""Send pagerduty notification
"""
url = self.pagerduty_config['url']
headers = {"content-type": "application/json"}
body = {"service_key": notification.address,
"event_type": "trigger",
"description": notification.message,
"client": "Monasca",
"client_url": "",
"details": {"alarm_id": notification.alarm_id,
"alarm_name": notification.alarm_name,
"current": notification.state,
"message": notification.message}}
try:
result = requests.post(url=url,
data=json.dumps(body),
headers=headers,
timeout=self.pagerduty_config['timeout'])
valid_http_codes = [200, 201, 204]
if result.status_code in valid_http_codes:
return notification
log.error("Error with pagerduty request. key=<%s> response=%s"
% (notification.address, result.status_code))
except:
log.error("Exception on pagerduty request. key=<%s> exception=%s"
% (notification.address, sys.exc_info()[0]))
def run(self):
"""Send the notifications
For each notification in a message it is sent according to its type.
If all notifications fail the alarm partition/offset are added to the the finished queue
"""
counters = {'email': self.statsd.get_counter(name='sent_smtp_count'),
'webhook': self.statsd.get_counter(name='sent_webhook_count')}
'webhook': self.statsd.get_counter(name='sent_webhook_count'),
'pagerduty': self.statsd.get_counter(name='sent_pagerduty_count')}
timers = {'email': self.statsd.get_timer(),
'webhook': self.statsd.get_timer()}
'webhook': self.statsd.get_timer(),
'pagerduty': self.statsd.get_timer()}
invalid_type_count = self.statsd.get_counter(name='invalid_type_count')
sent_failed_count = self.statsd.get_counter(name='sent_failed_count')
@ -187,7 +229,6 @@ class NotificationProcessor(BaseProcessor):
sent.notification_timestamp = time.time()
sent_notifications.append(sent)
counters[notification.type] += 1
if len(sent_notifications) == 0: # All notifications failed
self._add_to_queue(
self.finished_queue, 'finished', (notifications[0].src_partition, notifications[0].src_offset))

View File

@ -24,6 +24,10 @@ email:
webhook:
timeout: 5
pagerduty:
timeout: 5
url: "https://events.pagerduty.com/generic/2010-04-15/create_event.json"
processors:
alarm:
number: 2

View File

@ -15,6 +15,7 @@
"""Tests NotificationProcessor"""
import json
import mock
import multiprocessing
import requests
@ -53,6 +54,17 @@ class TestStateTracker(unittest.TestCase):
'timeout': 60,
'from_addr': 'hpcs.mon@hp.com'}
self.webhook_config = {'timeout': 50}
self.pagerduty_config = {'timeout': 50, 'key': 'foobar'}
def tearDown(self):
self.assertTrue(self.log_queue.empty())
self.assertTrue(self.sent_notification_queue.empty())
self.assertTrue(self.finished_queue.empty())
self.processor.terminate()
# ------------------------------------------------------------------------
# Test helper functions
# ------------------------------------------------------------------------
@mock.patch('monasca_notification.processors.notification_processor.requests')
@mock.patch('monasca_notification.processors.notification_processor.smtplib')
@ -77,7 +89,8 @@ class TestStateTracker(unittest.TestCase):
self.sent_notification_queue,
self.finished_queue,
self.email_config,
self.webhook_config))
self.webhook_config,
self.pagerduty_config))
self.processor = multiprocessing.Process(target=nprocessor.run)
self.processor.start()
@ -86,18 +99,145 @@ class TestStateTracker(unittest.TestCase):
return smtpStub(self.log_queue)
def _http_post_200(self, url, data, headers, **kwargs):
self.log_queue.put("%s %s %s" % (url, data, headers))
self.log_queue.put(url)
self.log_queue.put(data)
self.log_queue.put(headers)
r = requestsResponse(200)
return r
def _http_post_201(self, url, data, headers, **kwargs):
self.log_queue.put(url)
self.log_queue.put(data)
self.log_queue.put(headers)
r = requestsResponse(201)
return r
def _http_post_202(self, url, data, headers, **kwargs):
r = requestsResponse(202)
return r
def _http_post_204(self, url, data, headers, **kwargs):
self.log_queue.put(url)
self.log_queue.put(data)
self.log_queue.put(headers)
r = requestsResponse(204)
return r
def _http_post_400(self, url, data, headers, **kwargs):
r = requestsResponse(400)
return r
def _http_post_403(self, url, data, headers, **kwargs):
r = requestsResponse(403)
return r
def _http_post_404(self, url, data, headers, **kwargs):
r = requestsResponse(404)
return r
def _http_post_500(self, url, data, headers, **kwargs):
r = requestsResponse(500)
return r
def _http_post_504(self, url, data, headers, **kwargs):
r = requestsResponse(504)
return r
def _http_post_exception(self, url, data, headers, **kwargs):
self.log_queue.put("timeout %s" % kwargs["timeout"])
raise requests.exceptions.Timeout
def assertSentNotification(self):
notification_msg = self.sent_notification_queue.get(timeout=3)
self.assertNotEqual(notification_msg, None)
def assertSentFinished(self):
finished_msg = self.finished_queue.get(timeout=3)
self.assertNotEqual(finished_msg, None)
def alarm(self, metrics):
return {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
def email_setup(self, metric):
alarm_dict = self.alarm(metric)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
def webhook_setup(self, http_func):
self.http_func = http_func
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
alarm_dict = self.alarm(metrics)
notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
def pagerduty_setup(self, http_stub):
self.http_func = http_stub
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
alarm_dict = self.alarm(metrics)
notification = Notification('pagerduty',
0,
1,
'pagerduty notification',
'ABCDEF',
alarm_dict)
self._start_processor()
self.notification_queue.put([notification])
def valid_pagerduty_message(self, url, data, headers):
self.assertEqual(
url, 'https://events.pagerduty.com/generic/2010-04-15/create_event.json')
headers = dict(headers)
self.assertEqual(headers['content-type'], 'application/json')
data = dict(json.loads(data))
self.assertEqual(data['service_key'], 'ABCDEF')
self.assertEqual(data['event_type'], 'trigger')
self.assertEqual(data['description'], 'I am alarming!')
self.assertEqual(data['client'], 'Monasca')
self.assertEqual(data['client_url'], '')
details = dict(data['details'])
self.assertEqual(details['alarm_id'], '0')
self.assertEqual(details['alarm_name'], 'test Alarm')
self.assertEqual(details['current'], 'ALARM')
self.assertEqual(details['message'], 'I am alarming!')
def pagerduty_http_error(self, log_msg, http_response):
self.assertRegexpMatches(log_msg, "Error with pagerduty request.")
self.assertRegexpMatches(log_msg, "key=<ABCDEF>")
self.assertRegexpMatches(log_msg, "response=%s" % http_response)
# ------------------------------------------------------------------------
# Unit tests
# ------------------------------------------------------------------------
def test_invalid_notification(self):
"""Verify invalid notification type is rejected.
"""
@ -122,21 +262,9 @@ class TestStateTracker(unittest.TestCase):
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
alarm_dict = {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
self.email_setup(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
log_msg = self.log_queue.get(timeout=3)
self.processor.terminate()
self.assertRegexpMatches(log_msg, "From: hpcs.mon@hp.com")
self.assertRegexpMatches(log_msg, "To: me@here.com")
@ -144,7 +272,7 @@ class TestStateTracker(unittest.TestCase):
self.assertRegexpMatches(log_msg, "Alarm .test Alarm.")
self.assertRegexpMatches(log_msg, "On host .foo1.")
self.assertTrue(self.log_queue.empty())
self.assertSentNotification()
def test_email_notification_multiple_hosts(self):
"""Email with multiple hosts
@ -156,21 +284,9 @@ class TestStateTracker(unittest.TestCase):
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
alarm_dict = {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
self.email_setup(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
log_msg = self.log_queue.get(timeout=3)
self.processor.terminate()
self.assertRegexpMatches(log_msg, "From: hpcs.mon@hp.com")
self.assertRegexpMatches(log_msg, "To: me@here.com")
@ -179,69 +295,31 @@ class TestStateTracker(unittest.TestCase):
self.assertNotRegexpMatches(log_msg, "foo1")
self.assertNotRegexpMatches(log_msg, "foo2")
self.assertTrue(self.log_queue.empty())
self.assertSentNotification()
def test_webhook_good_http_response(self):
"""webhook good response
"""webhook 200
"""
self.http_func = self._http_post_200
self.webhook_setup(self._http_post_200)
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
url = self.log_queue.get(timeout=3)
data = self.log_queue.get(timeout=3)
headers = self.log_queue.get(timeout=3)
alarm_dict = {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
self.assertEqual(url, "me@here.com")
self.assertEqual(data, {'alarm_id': '0'})
self.assertEqual(headers, {'content-type': 'application/json'})
notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
log_msg = self.log_queue.get(timeout=3)
self.processor.terminate()
self.assertRegexpMatches(log_msg, "me@here.com")
self.assertRegexpMatches(log_msg, "alarm_id.: '0'")
self.assertRegexpMatches(log_msg, "content-type.: .application/json")
self.assertTrue(self.log_queue.empty())
self.assertSentNotification()
def test_webhook_bad_http_response(self):
"""webhook bad response
"""
self.http_func = self._http_post_404
self.webhook_setup(self._http_post_404)
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
alarm_dict = {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
log_msg = self.log_queue.get(timeout=3)
self.processor.terminate()
self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm")
self.assertNotRegexpMatches(log_msg, "content-type.: .application/json")
@ -249,39 +327,13 @@ class TestStateTracker(unittest.TestCase):
self.assertRegexpMatches(log_msg, "HTTP code 404")
self.assertRegexpMatches(log_msg, "post on URL me@here.com")
self.assertTrue(self.log_queue.empty())
self.assertSentFinished()
def test_webhook_timeout_exception_on_http_response(self):
"""webhook timeout exception
"""
self.http_func = self._http_post_exception
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
alarm_dict = {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
notification = Notification('webhook',
0,
1,
'webhook notification',
'http://localhost:21356',
alarm_dict)
self._start_processor()
self.notification_queue.put([notification])
self.webhook_setup(self._http_post_exception)
log_msg = self.log_queue.get(timeout=3)
@ -292,9 +344,111 @@ class TestStateTracker(unittest.TestCase):
self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm")
self.assertNotRegexpMatches(log_msg, "content-type.: .application/json")
self.assertRegexpMatches(log_msg, "Error trying to post on URL http://localhost:21356")
self.assertRegexpMatches(log_msg, "Error trying to post on URL me@here")
self.assertRaises(requests.exceptions.Timeout)
self.assertTrue(self.log_queue.empty())
self.assertSentFinished()
self.processor.terminate()
def test_pagerduty_200(self):
"""pagerduty 200
"""
self.pagerduty_setup(self._http_post_200)
url = self.log_queue.get(timeout=3)
data = self.log_queue.get(timeout=3)
headers = self.log_queue.get(timeout=3)
self.valid_pagerduty_message(url, data, headers)
self.assertSentNotification()
def test_pagerduty_201(self):
"""pagerduty 201
"""
self.pagerduty_setup(self._http_post_201)
url = self.log_queue.get(timeout=3)
data = self.log_queue.get(timeout=3)
headers = self.log_queue.get(timeout=3)
self.valid_pagerduty_message(url, data, headers)
self.assertSentNotification()
def test_pagerduty_204(self):
"""pagerduty 204
"""
self.pagerduty_setup(self._http_post_204)
url = self.log_queue.get(timeout=3)
data = self.log_queue.get(timeout=3)
headers = self.log_queue.get(timeout=3)
self.valid_pagerduty_message(url, data, headers)
self.assertSentNotification()
def test_pagerduty_202(self):
"""pagerduty 202
"""
self.pagerduty_setup(self._http_post_202)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "202")
self.assertSentFinished()
def test_pagerduty_400(self):
"""pagerduty 400
"""
self.pagerduty_setup(self._http_post_400)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "400")
self.assertSentFinished()
def test_pagerduty_403(self):
"""pagerduty 403
"""
self.pagerduty_setup(self._http_post_403)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "403")
self.assertSentFinished()
def test_pagerduty_500(self):
"""pagerduty 500
"""
self.pagerduty_setup(self._http_post_500)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "500")
self.assertSentFinished()
def test_pagerduty_504(self):
"""pagerduty 504
"""
self.pagerduty_setup(self._http_post_504)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "504")
self.assertSentFinished()
def test_pagerduty_exception(self):
"""pagerduty exception
"""
self.pagerduty_setup(self._http_post_exception)
log_msg = self.log_queue.get(timeout=3)
self.assertEqual(log_msg, "timeout 50")
log_msg = self.log_queue.get(timeout=3)
self.assertRegexpMatches(log_msg, "Exception on pagerduty request")
self.assertRegexpMatches(log_msg, "key=<ABCDEF>")
self.assertRegexpMatches(
log_msg, "exception=<class 'requests.exceptions.Timeout'>")
self.assertRaises(requests.exceptions.Timeout)
self.assertSentFinished()