diff --git a/monasca_notification/main.py b/monasca_notification/main.py index 32a6e8e..83c75a0 100644 --- a/monasca_notification/main.py +++ b/monasca_notification/main.py @@ -157,7 +157,8 @@ def main(argv=None): sent_notifications, finished, config['email'], - config['webhook'] + config['webhook'], + config['pagerduty'] ).run), ) processors.extend(notification_processors) diff --git a/monasca_notification/processors/notification_processor.py b/monasca_notification/processors/notification_processor.py index 5a9b597..4dcdadd 100644 --- a/monasca_notification/processors/notification_processor.py +++ b/monasca_notification/processors/notification_processor.py @@ -14,6 +14,7 @@ # limitations under the License. import email.mime.text +import json import logging import monascastatsd import requests @@ -28,8 +29,9 @@ log = logging.getLogger(__name__) class NotificationProcessor(BaseProcessor): - def __init__(self, notification_queue, sent_notification_queue, - finished_queue, email_config, webhook_config): + def __init__(self, notification_queue, + sent_notification_queue, finished_queue, + email_config, webhook_config, pagerduty_config): self.notification_queue = notification_queue self.sent_notification_queue = sent_notification_queue self.finished_queue = finished_queue @@ -39,12 +41,18 @@ class NotificationProcessor(BaseProcessor): self.webhook_config = {'timeout': 5} self.webhook_config.update(webhook_config) + self.pagerduty_config = { + 'timeout': 5, + 'url': 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'} + self.pagerduty_config.update(pagerduty_config) + self.smtp = None self._smtp_connect() # Types as key, method used to process that type as value self.notification_types = {'email': self._send_email, - 'webhook': self._post_webhook} + 'webhook': self._post_webhook, + 'pagerduty': self._post_pagerduty} self.statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions) @@ -156,15 +164,49 @@ class NotificationProcessor(BaseProcessor): except: log.error("Error trying to post on URL %s: %s." % (url, sys.exc_info()[0])) + def _post_pagerduty(self, notification): + """Send pagerduty notification + """ + + url = self.pagerduty_config['url'] + headers = {"content-type": "application/json"} + body = {"service_key": notification.address, + "event_type": "trigger", + "description": notification.message, + "client": "Monasca", + "client_url": "", + "details": {"alarm_id": notification.alarm_id, + "alarm_name": notification.alarm_name, + "current": notification.state, + "message": notification.message}} + + try: + result = requests.post(url=url, + data=json.dumps(body), + headers=headers, + timeout=self.pagerduty_config['timeout']) + + valid_http_codes = [200, 201, 204] + if result.status_code in valid_http_codes: + return notification + + log.error("Error with pagerduty request. key=<%s> response=%s" + % (notification.address, result.status_code)) + except: + log.error("Exception on pagerduty request. key=<%s> exception=%s" + % (notification.address, sys.exc_info()[0])) + def run(self): """Send the notifications For each notification in a message it is sent according to its type. If all notifications fail the alarm partition/offset are added to the the finished queue """ counters = {'email': self.statsd.get_counter(name='sent_smtp_count'), - 'webhook': self.statsd.get_counter(name='sent_webhook_count')} + 'webhook': self.statsd.get_counter(name='sent_webhook_count'), + 'pagerduty': self.statsd.get_counter(name='sent_pagerduty_count')} timers = {'email': self.statsd.get_timer(), - 'webhook': self.statsd.get_timer()} + 'webhook': self.statsd.get_timer(), + 'pagerduty': self.statsd.get_timer()} invalid_type_count = self.statsd.get_counter(name='invalid_type_count') sent_failed_count = self.statsd.get_counter(name='sent_failed_count') @@ -187,7 +229,6 @@ class NotificationProcessor(BaseProcessor): sent.notification_timestamp = time.time() sent_notifications.append(sent) counters[notification.type] += 1 - if len(sent_notifications) == 0: # All notifications failed self._add_to_queue( self.finished_queue, 'finished', (notifications[0].src_partition, notifications[0].src_offset)) diff --git a/notification.yaml b/notification.yaml index 3f54521..a8b2852 100644 --- a/notification.yaml +++ b/notification.yaml @@ -24,6 +24,10 @@ email: webhook: timeout: 5 +pagerduty: + timeout: 5 + url: "https://events.pagerduty.com/generic/2010-04-15/create_event.json" + processors: alarm: number: 2 diff --git a/tests/test_notification_processor.py b/tests/test_notification_processor.py index 2387983..2ddbad0 100644 --- a/tests/test_notification_processor.py +++ b/tests/test_notification_processor.py @@ -15,6 +15,7 @@ """Tests NotificationProcessor""" +import json import mock import multiprocessing import requests @@ -53,6 +54,17 @@ class TestStateTracker(unittest.TestCase): 'timeout': 60, 'from_addr': 'hpcs.mon@hp.com'} self.webhook_config = {'timeout': 50} + self.pagerduty_config = {'timeout': 50, 'key': 'foobar'} + + def tearDown(self): + self.assertTrue(self.log_queue.empty()) + self.assertTrue(self.sent_notification_queue.empty()) + self.assertTrue(self.finished_queue.empty()) + self.processor.terminate() + + # ------------------------------------------------------------------------ + # Test helper functions + # ------------------------------------------------------------------------ @mock.patch('monasca_notification.processors.notification_processor.requests') @mock.patch('monasca_notification.processors.notification_processor.smtplib') @@ -77,7 +89,8 @@ class TestStateTracker(unittest.TestCase): self.sent_notification_queue, self.finished_queue, self.email_config, - self.webhook_config)) + self.webhook_config, + self.pagerduty_config)) self.processor = multiprocessing.Process(target=nprocessor.run) self.processor.start() @@ -86,18 +99,145 @@ class TestStateTracker(unittest.TestCase): return smtpStub(self.log_queue) def _http_post_200(self, url, data, headers, **kwargs): - self.log_queue.put("%s %s %s" % (url, data, headers)) + self.log_queue.put(url) + self.log_queue.put(data) + self.log_queue.put(headers) r = requestsResponse(200) return r + def _http_post_201(self, url, data, headers, **kwargs): + self.log_queue.put(url) + self.log_queue.put(data) + self.log_queue.put(headers) + r = requestsResponse(201) + return r + + def _http_post_202(self, url, data, headers, **kwargs): + r = requestsResponse(202) + return r + + def _http_post_204(self, url, data, headers, **kwargs): + self.log_queue.put(url) + self.log_queue.put(data) + self.log_queue.put(headers) + r = requestsResponse(204) + return r + + def _http_post_400(self, url, data, headers, **kwargs): + r = requestsResponse(400) + return r + + def _http_post_403(self, url, data, headers, **kwargs): + r = requestsResponse(403) + return r + def _http_post_404(self, url, data, headers, **kwargs): r = requestsResponse(404) return r + def _http_post_500(self, url, data, headers, **kwargs): + r = requestsResponse(500) + return r + + def _http_post_504(self, url, data, headers, **kwargs): + r = requestsResponse(504) + return r + def _http_post_exception(self, url, data, headers, **kwargs): self.log_queue.put("timeout %s" % kwargs["timeout"]) raise requests.exceptions.Timeout + def assertSentNotification(self): + notification_msg = self.sent_notification_queue.get(timeout=3) + self.assertNotEqual(notification_msg, None) + + def assertSentFinished(self): + finished_msg = self.finished_queue.get(timeout=3) + self.assertNotEqual(finished_msg, None) + + def alarm(self, metrics): + return {"tenantId": "0", + "alarmId": "0", + "alarmName": "test Alarm", + "oldState": "OK", + "newState": "ALARM", + "stateChangeReason": "I am alarming!", + "timestamp": time.time(), + "metrics": metrics} + + def email_setup(self, metric): + alarm_dict = self.alarm(metric) + + notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) + + self.notification_queue.put([notification]) + self._start_processor() + + def webhook_setup(self, http_func): + self.http_func = http_func + + metrics = [] + metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}} + metrics.append(metric_data) + metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}} + metrics.append(metric_data) + + alarm_dict = self.alarm(metrics) + + notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict) + + self.notification_queue.put([notification]) + self._start_processor() + + def pagerduty_setup(self, http_stub): + self.http_func = http_stub + + metrics = [] + metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}} + metrics.append(metric_data) + metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}} + metrics.append(metric_data) + + alarm_dict = self.alarm(metrics) + + notification = Notification('pagerduty', + 0, + 1, + 'pagerduty notification', + 'ABCDEF', + alarm_dict) + self._start_processor() + self.notification_queue.put([notification]) + + def valid_pagerduty_message(self, url, data, headers): + self.assertEqual( + url, 'https://events.pagerduty.com/generic/2010-04-15/create_event.json') + + headers = dict(headers) + self.assertEqual(headers['content-type'], 'application/json') + + data = dict(json.loads(data)) + self.assertEqual(data['service_key'], 'ABCDEF') + self.assertEqual(data['event_type'], 'trigger') + self.assertEqual(data['description'], 'I am alarming!') + self.assertEqual(data['client'], 'Monasca') + self.assertEqual(data['client_url'], '') + + details = dict(data['details']) + self.assertEqual(details['alarm_id'], '0') + self.assertEqual(details['alarm_name'], 'test Alarm') + self.assertEqual(details['current'], 'ALARM') + self.assertEqual(details['message'], 'I am alarming!') + + def pagerduty_http_error(self, log_msg, http_response): + self.assertRegexpMatches(log_msg, "Error with pagerduty request.") + self.assertRegexpMatches(log_msg, "key=") + self.assertRegexpMatches(log_msg, "response=%s" % http_response) + + # ------------------------------------------------------------------------ + # Unit tests + # ------------------------------------------------------------------------ + def test_invalid_notification(self): """Verify invalid notification type is rejected. """ @@ -122,21 +262,9 @@ class TestStateTracker(unittest.TestCase): metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}} metrics.append(metric_data) - alarm_dict = {"tenantId": "0", - "alarmId": "0", - "alarmName": "test Alarm", - "oldState": "OK", - "newState": "ALARM", - "stateChangeReason": "I am alarming!", - "timestamp": time.time(), - "metrics": metrics} + self.email_setup(metrics) - notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) - - self.notification_queue.put([notification]) - self._start_processor() log_msg = self.log_queue.get(timeout=3) - self.processor.terminate() self.assertRegexpMatches(log_msg, "From: hpcs.mon@hp.com") self.assertRegexpMatches(log_msg, "To: me@here.com") @@ -144,7 +272,7 @@ class TestStateTracker(unittest.TestCase): self.assertRegexpMatches(log_msg, "Alarm .test Alarm.") self.assertRegexpMatches(log_msg, "On host .foo1.") - self.assertTrue(self.log_queue.empty()) + self.assertSentNotification() def test_email_notification_multiple_hosts(self): """Email with multiple hosts @@ -156,21 +284,9 @@ class TestStateTracker(unittest.TestCase): metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}} metrics.append(metric_data) - alarm_dict = {"tenantId": "0", - "alarmId": "0", - "alarmName": "test Alarm", - "oldState": "OK", - "newState": "ALARM", - "stateChangeReason": "I am alarming!", - "timestamp": time.time(), - "metrics": metrics} + self.email_setup(metrics) - notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) - - self.notification_queue.put([notification]) - self._start_processor() log_msg = self.log_queue.get(timeout=3) - self.processor.terminate() self.assertRegexpMatches(log_msg, "From: hpcs.mon@hp.com") self.assertRegexpMatches(log_msg, "To: me@here.com") @@ -179,69 +295,31 @@ class TestStateTracker(unittest.TestCase): self.assertNotRegexpMatches(log_msg, "foo1") self.assertNotRegexpMatches(log_msg, "foo2") - self.assertTrue(self.log_queue.empty()) + self.assertSentNotification() def test_webhook_good_http_response(self): - """webhook good response + """webhook 200 """ - self.http_func = self._http_post_200 + self.webhook_setup(self._http_post_200) - metrics = [] - metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}} - metrics.append(metric_data) - metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}} - metrics.append(metric_data) + url = self.log_queue.get(timeout=3) + data = self.log_queue.get(timeout=3) + headers = self.log_queue.get(timeout=3) - alarm_dict = {"tenantId": "0", - "alarmId": "0", - "alarmName": "test Alarm", - "oldState": "OK", - "newState": "ALARM", - "stateChangeReason": "I am alarming!", - "timestamp": time.time(), - "metrics": metrics} + self.assertEqual(url, "me@here.com") + self.assertEqual(data, {'alarm_id': '0'}) + self.assertEqual(headers, {'content-type': 'application/json'}) - notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict) - - self.notification_queue.put([notification]) - self._start_processor() - log_msg = self.log_queue.get(timeout=3) - self.processor.terminate() - - self.assertRegexpMatches(log_msg, "me@here.com") - self.assertRegexpMatches(log_msg, "alarm_id.: '0'") - self.assertRegexpMatches(log_msg, "content-type.: .application/json") - - self.assertTrue(self.log_queue.empty()) + self.assertSentNotification() def test_webhook_bad_http_response(self): """webhook bad response """ - self.http_func = self._http_post_404 + self.webhook_setup(self._http_post_404) - metrics = [] - metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}} - metrics.append(metric_data) - metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}} - metrics.append(metric_data) - - alarm_dict = {"tenantId": "0", - "alarmId": "0", - "alarmName": "test Alarm", - "oldState": "OK", - "newState": "ALARM", - "stateChangeReason": "I am alarming!", - "timestamp": time.time(), - "metrics": metrics} - - notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict) - - self.notification_queue.put([notification]) - self._start_processor() log_msg = self.log_queue.get(timeout=3) - self.processor.terminate() self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm") self.assertNotRegexpMatches(log_msg, "content-type.: .application/json") @@ -249,39 +327,13 @@ class TestStateTracker(unittest.TestCase): self.assertRegexpMatches(log_msg, "HTTP code 404") self.assertRegexpMatches(log_msg, "post on URL me@here.com") - self.assertTrue(self.log_queue.empty()) + self.assertSentFinished() def test_webhook_timeout_exception_on_http_response(self): """webhook timeout exception """ - self.http_func = self._http_post_exception - - metrics = [] - metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}} - metrics.append(metric_data) - metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}} - metrics.append(metric_data) - - alarm_dict = {"tenantId": "0", - "alarmId": "0", - "alarmName": "test Alarm", - "oldState": "OK", - "newState": "ALARM", - "stateChangeReason": "I am alarming!", - "timestamp": time.time(), - "metrics": metrics} - - notification = Notification('webhook', - 0, - 1, - 'webhook notification', - 'http://localhost:21356', - alarm_dict) - - self._start_processor() - - self.notification_queue.put([notification]) + self.webhook_setup(self._http_post_exception) log_msg = self.log_queue.get(timeout=3) @@ -292,9 +344,111 @@ class TestStateTracker(unittest.TestCase): self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm") self.assertNotRegexpMatches(log_msg, "content-type.: .application/json") - self.assertRegexpMatches(log_msg, "Error trying to post on URL http://localhost:21356") + self.assertRegexpMatches(log_msg, "Error trying to post on URL me@here") self.assertRaises(requests.exceptions.Timeout) - self.assertTrue(self.log_queue.empty()) + self.assertSentFinished() - self.processor.terminate() + def test_pagerduty_200(self): + """pagerduty 200 + """ + + self.pagerduty_setup(self._http_post_200) + + url = self.log_queue.get(timeout=3) + data = self.log_queue.get(timeout=3) + headers = self.log_queue.get(timeout=3) + + self.valid_pagerduty_message(url, data, headers) + self.assertSentNotification() + + def test_pagerduty_201(self): + """pagerduty 201 + """ + + self.pagerduty_setup(self._http_post_201) + + url = self.log_queue.get(timeout=3) + data = self.log_queue.get(timeout=3) + headers = self.log_queue.get(timeout=3) + + self.valid_pagerduty_message(url, data, headers) + self.assertSentNotification() + + def test_pagerduty_204(self): + """pagerduty 204 + """ + + self.pagerduty_setup(self._http_post_204) + + url = self.log_queue.get(timeout=3) + data = self.log_queue.get(timeout=3) + headers = self.log_queue.get(timeout=3) + + self.valid_pagerduty_message(url, data, headers) + self.assertSentNotification() + + def test_pagerduty_202(self): + """pagerduty 202 + """ + + self.pagerduty_setup(self._http_post_202) + log_msg = self.log_queue.get(timeout=3) + self.pagerduty_http_error(log_msg, "202") + self.assertSentFinished() + + def test_pagerduty_400(self): + """pagerduty 400 + """ + + self.pagerduty_setup(self._http_post_400) + log_msg = self.log_queue.get(timeout=3) + self.pagerduty_http_error(log_msg, "400") + self.assertSentFinished() + + def test_pagerduty_403(self): + """pagerduty 403 + """ + + self.pagerduty_setup(self._http_post_403) + log_msg = self.log_queue.get(timeout=3) + self.pagerduty_http_error(log_msg, "403") + self.assertSentFinished() + + def test_pagerduty_500(self): + """pagerduty 500 + """ + + self.pagerduty_setup(self._http_post_500) + log_msg = self.log_queue.get(timeout=3) + self.pagerduty_http_error(log_msg, "500") + self.assertSentFinished() + + def test_pagerduty_504(self): + """pagerduty 504 + """ + + self.pagerduty_setup(self._http_post_504) + log_msg = self.log_queue.get(timeout=3) + self.pagerduty_http_error(log_msg, "504") + self.assertSentFinished() + + def test_pagerduty_exception(self): + """pagerduty exception + """ + + self.pagerduty_setup(self._http_post_exception) + + log_msg = self.log_queue.get(timeout=3) + + self.assertEqual(log_msg, "timeout 50") + + log_msg = self.log_queue.get(timeout=3) + + self.assertRegexpMatches(log_msg, "Exception on pagerduty request") + self.assertRegexpMatches(log_msg, "key=") + self.assertRegexpMatches( + log_msg, "exception=") + + self.assertRaises(requests.exceptions.Timeout) + self.assertSentFinished()