diff --git a/monasca_notification/main.py b/monasca_notification/main.py index 02499e0..fbec277 100644 --- a/monasca_notification/main.py +++ b/monasca_notification/main.py @@ -28,6 +28,7 @@ import time import yaml from notification_engine import NotificationEngine +from retry_engine import RetryEngine log = logging.getLogger(__name__) processors = [] # global list to facilitate clean signal handling @@ -67,6 +68,12 @@ def clean_exit(signum, frame=None): sys.exit(0) +def start_process(proccess_type, config): + log.info("start process: {}".format(proccess_type)) + p = proccess_type(config) + p.run() + + def main(argv=None): if argv is None: argv = sys.argv @@ -84,8 +91,12 @@ def main(argv=None): # Setup logging logging.config.dictConfig(config['logging']) - notifier = multiprocessing.Process(target=NotificationEngine(config).run) - processors.append(notifier) + for proc in range(0, config['processors']['notification']['number']): + processors.append(multiprocessing.Process( + target=start_process, args=(NotificationEngine, config))) + + processors.append(multiprocessing.Process( + target=start_process, args=(RetryEngine, config))) # Start try: diff --git a/monasca_notification/notification.py b/monasca_notification/notification.py index e2f4b95..16d1b15 100644 --- a/monasca_notification/notification.py +++ b/monasca_notification/notification.py @@ -33,17 +33,21 @@ class Notification(object): 'state', 'tenant_id', 'type', - 'metrics' + 'metrics', + 'retry_count', + 'raw_alarm' ) - def __init__(self, ntype, src_partition, src_offset, name, address, alarm): + def __init__(self, ntype, src_partition, src_offset, name, address, + retry_count, alarm): """Setup the notification object The src_partition and src_offset allow the notification to be linked to the alarm that it came from. ntype - The notification type name - Name used in sending address - to send the notification to - alarm_data - info that caused the notification + retry_count - number of times we've tried to send + alarm - info that caused the notification notifications that come after this one to remain uncommitted. """ self.address = address @@ -51,6 +55,9 @@ class Notification(object): self.src_partition = src_partition self.src_offset = src_offset self.type = ntype + self.retry_count = retry_count + + self.raw_alarm = alarm self.alarm_id = alarm['alarmId'] self.alarm_name = alarm['alarmName'] @@ -77,8 +84,11 @@ class Notification(object): """Return json representation """ notification_fields = [ - 'address', + 'type', 'name', + 'address', + 'retry_count', + 'raw_alarm', 'alarm_id', 'alarm_name', 'alarm_timestamp', diff --git a/monasca_notification/notification_engine.py b/monasca_notification/notification_engine.py index 7ebf5bb..219647f 100644 --- a/monasca_notification/notification_engine.py +++ b/monasca_notification/notification_engine.py @@ -26,13 +26,14 @@ class NotificationEngine(object): def __init__(self, config): self._topics = {} self._topics['notification_topic'] = config['kafka']['notification_topic'] + self._topics['retry_topic'] = config['kafka']['notification_retry_topic'] self._statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions) self._consumer = KafkaConsumer(config['kafka']['url'], config['zookeeper']['url'], - config['zookeeper']['path'], + config['zookeeper']['notification_path'], config['kafka']['group'], config['kafka']['alarm_topic']) @@ -62,6 +63,7 @@ class NotificationEngine(object): if notifications: sent, failed = self._notifier.send(notifications) self._producer.publish(self._topics['notification_topic'], sent) + self._producer.publish(self._topics['retry_topic'], failed) self._consumer.commit([partition]) diff --git a/monasca_notification/processors/alarm_processor.py b/monasca_notification/processors/alarm_processor.py index 1225e9e..aa6ee00 100644 --- a/monasca_notification/processors/alarm_processor.py +++ b/monasca_notification/processors/alarm_processor.py @@ -125,6 +125,7 @@ class AlarmProcessor(BaseProcessor): offset, row[0], row[2], + 0, alarm) for row in cur] if len(notifications) == 0: diff --git a/monasca_notification/processors/kafka_producer.py b/monasca_notification/processors/kafka_producer.py index dc1e2fa..06daceb 100644 --- a/monasca_notification/processors/kafka_producer.py +++ b/monasca_notification/processors/kafka_producer.py @@ -17,6 +17,7 @@ import kafka.client import kafka.producer import logging import monascastatsd +import time from monasca_notification.processors.base import BaseProcessor @@ -34,10 +35,10 @@ class KafkaProducer(BaseProcessor): self._statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions) self._kafka = kafka.client.KafkaClient(url) - self._producer = kafka.producer.SimpleProducer( + self._producer = kafka.producer.KeyedProducer( self._kafka, async=False, - req_acks=kafka.producer.SimpleProducer.ACK_AFTER_LOCAL_WRITE, + req_acks=kafka.producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000) def publish(self, topic, messages): @@ -46,7 +47,13 @@ class KafkaProducer(BaseProcessor): published_to_kafka = self._statsd.get_counter(name='published_to_kafka') for message in messages: - responses = self._producer.send_messages(topic, message.to_json()) + key = time.time() * 1000 + try: + responses = self._producer.send(topic, key, message.to_json()) + except Exception: + log.exception("error publishing message to kafka") + continue + published_to_kafka += 1 log.debug('Published to topic {}, message {}'.format(topic, message.to_json())) diff --git a/monasca_notification/processors/notification_processor.py b/monasca_notification/processors/notification_processor.py index 75abcfe..a0b9a1b 100644 --- a/monasca_notification/processors/notification_processor.py +++ b/monasca_notification/processors/notification_processor.py @@ -40,10 +40,10 @@ class NotificationProcessor(BaseProcessor): sent, failed, invalid = notifiers.send_notifications(notifications) - if failed > 0: - sent_failed_count.increment(failed) + if failed: + sent_failed_count.increment(len(failed)) - if invalid > 0: - invalid_type_count.increment(invalid) + if invalid: + invalid_type_count.increment(len(invalid)) return sent, failed diff --git a/monasca_notification/retry_engine.py b/monasca_notification/retry_engine.py new file mode 100644 index 0000000..d5e5f01 --- /dev/null +++ b/monasca_notification/retry_engine.py @@ -0,0 +1,98 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import monascastatsd +import time + +from notification import Notification +from processors.base import BaseProcessor +from processors.kafka_consumer import KafkaConsumer +from processors.kafka_producer import KafkaProducer +from processors.notification_processor import NotificationProcessor + +log = logging.getLogger(__name__) + + +class RetryEngine(object): + def __init__(self, config): + self._retry_interval = config['retry']['interval'] + self._retry_max = config['retry']['max_attempts'] + + self._topics = {} + self._topics['notification_topic'] = config['kafka']['notification_topic'] + self._topics['retry_topic'] = config['kafka']['notification_retry_topic'] + + self._statsd = monascastatsd.Client(name='monasca', + dimensions=BaseProcessor.dimensions) + + self._consumer = KafkaConsumer(config['kafka']['url'], + config['zookeeper']['url'], + config['zookeeper']['notification_retry_path'], + config['kafka']['group'], + config['kafka']['notification_retry_topic']) + + self._producer = KafkaProducer(config['kafka']['url']) + + self._notifier = NotificationProcessor(config['notification_types']) + + def run(self): + for raw_notification in self._consumer: + partition = raw_notification[0] + offset = raw_notification[1].offset + message = raw_notification[1].message.value + + notification_data = json.loads(message) + + ntype = notification_data['type'] + name = notification_data['name'] + addr = notification_data['address'] + + notification = Notification(ntype, + partition, + offset, + name, + addr, + notification_data['retry_count'], + notification_data['raw_alarm']) + + wait_duration = self._retry_interval - ( + time.time() - notification_data['notification_timestamp']) + + if wait_duration > 0: + time.sleep(wait_duration) + + sent, failed = self._notifier.send([notification]) + + if sent: + self._producer.publish(self._topics['notification_topic'], sent) + + if failed: + notification.retry_count += 1 + notification.notification_timestamp = time.time() + if notification.retry_count < self._retry_max: + log.error("retry failed for {} with name {} " + "at {}. " + "Saving for later retry.".format(ntype, name, addr)) + self._producer.publish(self._topics['retry_topic'], + [notification]) + else: + log.error("retry failed for {} with name {} " + "at {} after {} retries. " + "Giving up on retry." + .format(ntype, name, addr, self._retry_max)) + + self._consumer.commit([partition]) diff --git a/monasca_notification/types/email_notifier.py b/monasca_notification/types/email_notifier.py index 5f3a494..a69e3c6 100644 --- a/monasca_notification/types/email_notifier.py +++ b/monasca_notification/types/email_notifier.py @@ -15,7 +15,6 @@ import email.mime.text import smtplib -import sys import time from abstract_notifier import AbstractNotifier @@ -96,8 +95,7 @@ class EmailNotifier(AbstractNotifier): self._smtp = smtp except Exception: - self._log.exception("Unable to connect to email server. Exiting.") - sys.exit(1) + self._log.exception("Unable to connect to email server.") def _create_msg(self, hostname, notification): """Create two kind of messages: diff --git a/monasca_notification/types/notifiers.py b/monasca_notification/types/notifiers.py index 23475d1..440265a 100644 --- a/monasca_notification/types/notifiers.py +++ b/monasca_notification/types/notifiers.py @@ -64,27 +64,28 @@ def config(config): def send_notifications(notifications): sent = [] - failed_count = 0 - invalid_count = 0 + failed = [] + invalid = [] for notification in notifications: ntype = notification.type if ntype not in configured_notifiers: log.warn("attempting to send unconfigured notification: {}".format(ntype)) - invalid_count += 1 + invalid.append(notification) continue + notification.notification_timestamp = time.time() + with statsd_timer.time(ntype + '_time'): result = send_single_notification(notification) if result: - notification.notification_timestamp = time.time() sent.append(notification) statsd_counter[ntype].increment(1) else: - failed_count += 1 + failed.append(notification) - return (sent, failed_count, invalid_count) + return (sent, failed, invalid) def send_single_notification(notification): diff --git a/notification.yaml b/notification.yaml index 8dcf398..7d086eb 100644 --- a/notification.yaml +++ b/notification.yaml @@ -3,6 +3,7 @@ kafka: group: monasca-notification alarm_topic: alarm-state-transitions notification_topic: alarm-notifications + notification_retry_topic: retry-notifications max_offset_lag: 600 # In seconds, undefined for none mysql: @@ -36,6 +37,10 @@ processors: notification: number: 4 +retry: + interval: 30 + max_attempts: 5 + queues: alarms_size: 256 finished_size: 256 @@ -44,6 +49,8 @@ queues: zookeeper: url: 192.168.10.4:2181 # or comma seperated list of multiple hosts + notification_path: /notification/alarms + notification_retry_path: /notification/retry logging: # Used in logging.dictConfig version: 1 diff --git a/tests/test_alarm_processor.py b/tests/test_alarm_processor.py index 51d322e..a64904a 100755 --- a/tests/test_alarm_processor.py +++ b/tests/test_alarm_processor.py @@ -116,7 +116,7 @@ class TestAlarmProcessor(unittest.TestCase): sql_response = [['test notification', 'EMAIL', 'me@here.com']] notifications, partition, offset = self._run_alarm_processor(alarm, sql_response) - test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', alarm_dict) + test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', 0, alarm_dict) self.assertEqual(notifications, [test_notification]) self.assertEqual(partition, 0) @@ -132,8 +132,8 @@ class TestAlarmProcessor(unittest.TestCase): sql_response = [['test notification', 'EMAIL', 'me@here.com'], ['test notification2', 'EMAIL', 'me@here.com']] notifications, partition, offset = self._run_alarm_processor(alarm, sql_response) - test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', alarm_dict) - test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', alarm_dict) + test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', 0, alarm_dict) + test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', 0, alarm_dict) self.assertEqual(notifications, [test_notification, test_notification2]) self.assertEqual(partition, 0) diff --git a/tests/test_email_notification.py b/tests/test_email_notification.py index 839a33a..9716993 100644 --- a/tests/test_email_notification.py +++ b/tests/test_email_notification.py @@ -84,7 +84,7 @@ class TestEmail(unittest.TestCase): alarm_dict = alarm(metric) - notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) + notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict) self.trap.append(email.send_notification(notification)) @@ -133,9 +133,8 @@ class TestEmail(unittest.TestCase): return_value = self.trap.pop(0) self.assertTrue(return_value) - @mock.patch('monasca_notification.types.email_notifier.sys') @mock.patch('monasca_notification.types.email_notifier.smtplib') - def test_smtp_sendmail_failed_connection_twice(self, mock_smtp, mock_sys): + def test_smtp_sendmail_failed_connection_twice(self, mock_smtp): """Email that fails on smtp_connect twice """ @@ -171,17 +170,15 @@ class TestEmail(unittest.TestCase): alarm_dict = alarm(metrics) - notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) + notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict) self.trap.append(email.send_notification(notification)) self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap) - self.assertIn("Unable to connect to email server. Exiting.", self.trap) - self.assertTrue(mock_sys.exit.called) + self.assertIn("Unable to connect to email server.", self.trap) - @mock.patch('monasca_notification.types.email_notifier.sys') @mock.patch('monasca_notification.types.email_notifier.smtplib') - def test_smtp_sendmail_failed_connection_once_then_email(self, mock_smtp, mock_sys): + def test_smtp_sendmail_failed_connection_once_then_email(self, mock_smtp): """Email that fails on smtp_connect once then email """ @@ -214,13 +211,13 @@ class TestEmail(unittest.TestCase): alarm_dict = alarm(metrics) - notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) + notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict) self.trap.append(email.send_notification(notification)) self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap) self.assertIn("Error sending Email Notification", self.trap) - self.assertFalse(mock_sys.exit.called) + self.assertNotIn("Unable to connect to email server.", self.trap) @mock.patch('monasca_notification.types.email_notifier.smtplib') def test_smtp_sendmail_failed_connection_once(self, mock_smtp): @@ -254,7 +251,7 @@ class TestEmail(unittest.TestCase): alarm_dict = alarm(metrics) - notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) + notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict) self.trap.append(email.send_notification(notification)) @@ -294,7 +291,7 @@ class TestEmail(unittest.TestCase): alarm_dict = alarm(metrics) - notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) + notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict) self.trap.append(email.send_notification(notification)) diff --git a/tests/test_notification.py b/tests/test_notification.py index 514d4a5..b0ce08d 100644 --- a/tests/test_notification.py +++ b/tests/test_notification.py @@ -29,9 +29,12 @@ def test_json(): 'newState': 'newState', 'tenantId': 'tenantId', 'metrics': 'cpu_util'} - test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm) + test_notification = notification.Notification('ntype', 'src_partition', + 'src_offset', 'name', + 'address', 0, alarm) expected_dict = {u'name': u'name', + u'type': u'ntype', u'notification_timestamp': None, u'tenant_id': u'tenantId', u'alarm_name': u'alarmName', @@ -39,7 +42,17 @@ def test_json(): u'state': u'newState', u'alarm_timestamp': u'timestamp', u'address': u'address', - u'message': u'stateChangeReason'} + u'message': u'stateChangeReason', + u'retry_count': 0, + u'raw_alarm': { + u'alarmId': u'alarmId', + u'alarmName': u'alarmName', + u'timestamp': u'timestamp', + u'stateChangeReason': u'stateChangeReason', + u'newState': u'newState', + u'tenantId': u'tenantId', + u'metrics': u'cpu_util'}} + # Compare as dicts so ordering is not an issue assert json.loads(test_notification.to_json()) == expected_dict @@ -52,8 +65,12 @@ def test_equal(): 'newState': 'newState', 'tenantId': 'tenantId', 'metrics': 'cpu_util'} - test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm) - test_notification2 = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm) + test_notification = notification.Notification('ntype', 'src_partition', + 'src_offset', 'name', + 'address', 0, alarm) + test_notification2 = notification.Notification('ntype', 'src_partition', + 'src_offset', 'name', + 'address', 0, alarm) assert(test_notification == test_notification2) @@ -66,8 +83,12 @@ def test_unequal(): 'newState': 'newState', 'tenantId': 'tenantId', 'metrics': 'cpu_util'} - test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm) - test_notification2 = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm) + test_notification = notification.Notification('ntype', 'src_partition', + 'src_offset', 'name', + 'address', 0, alarm) + test_notification2 = notification.Notification('ntype', 'src_partition', + 'src_offset', 'name', + 'address', 0, alarm) test_notification2.alarm_id = None assert(test_notification != test_notification2) diff --git a/tests/test_notification_processor.py b/tests/test_notification_processor.py index 8292429..99225ae 100644 --- a/tests/test_notification_processor.py +++ b/tests/test_notification_processor.py @@ -85,7 +85,7 @@ class TestNotificationProcessor(unittest.TestCase): "timestamp": time.time(), "metrics": metric} - notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict) + notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict) self._start_processor([notification]) @@ -98,7 +98,7 @@ class TestNotificationProcessor(unittest.TestCase): """ alarm_dict = {"tenantId": "0", "alarmId": "0", "alarmName": "test Alarm", "oldState": "OK", "newState": "ALARM", "stateChangeReason": "I am alarming!", "timestamp": time.time(), "metrics": "cpu_util"} - invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', alarm_dict) + invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', 0, alarm_dict) self._start_processor([invalid_notification]) diff --git a/tests/test_notifiers.py b/tests/test_notifiers.py index 8841e3f..6df8d29 100644 --- a/tests/test_notifiers.py +++ b/tests/test_notifiers.py @@ -224,7 +224,7 @@ class TestInterface(unittest.TestCase): notifications = [] notifications.append(Notification('email', 0, 1, 'email notification', - 'me@here.com', alarm({}))) + 'me@here.com', 0, alarm({}))) notifiers.send_notifications(notifications) @@ -250,13 +250,13 @@ class TestInterface(unittest.TestCase): notifications = [] notifications.append(Notification('email', 0, 1, 'email notification', - 'me@here.com', alarm({}))) + 'me@here.com', 0, alarm({}))) sent, failed, invalid = notifiers.send_notifications(notifications) self.assertEqual(sent, []) - self.assertEqual(failed, 1) - self.assertEqual(invalid, 0) + self.assertEqual(len(failed), 1) + self.assertEqual(invalid, []) @mock.patch('monasca_notification.types.notifiers.email_notifier') @mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib') @@ -279,13 +279,13 @@ class TestInterface(unittest.TestCase): notifications = [] notifications.append(Notification('pagerduty', 0, 1, 'pagerduty notification', - 'me@here.com', alarm({}))) + 'me@here.com', 0, alarm({}))) sent, failed, invalid = notifiers.send_notifications(notifications) self.assertEqual(sent, []) - self.assertEqual(failed, 0) - self.assertEqual(invalid, 1) + self.assertEqual(failed, []) + self.assertEqual(len(invalid), 1) self.assertIn("attempting to send unconfigured notification: pagerduty", self.trap) @@ -312,19 +312,19 @@ class TestInterface(unittest.TestCase): notifications = [] notifications.append(Notification('email', 0, 1, 'email notification', - 'me@here.com', alarm({}))) + 'me@here.com', 0, alarm({}))) notifications.append(Notification('email', 0, 1, 'email notification', - 'foo@here.com', alarm({}))) + 'foo@here.com', 0, alarm({}))) notifications.append(Notification('email', 0, 1, 'email notification', - 'bar@here.com', alarm({}))) + 'bar@here.com', 0, alarm({}))) sent, failed, invalid = notifiers.send_notifications(notifications) self.assertEqual(len(sent), 3) - self.assertEqual(failed, 0) - self.assertEqual(invalid, 0) + self.assertEqual(failed, []) + self.assertEqual(invalid, []) for n in sent: self.assertEqual(n.notification_timestamp, 42) @@ -348,13 +348,13 @@ class TestInterface(unittest.TestCase): notifications = [] notifications.append(Notification('email', 0, 1, 'email notification', - 'me@here.com', alarm({}))) + 'me@here.com', 0, alarm({}))) notifications.append(Notification('email', 0, 1, 'email notification', - 'foo@here.com', alarm({}))) + 'foo@here.com', 0, alarm({}))) notifications.append(Notification('email', 0, 1, 'email notification', - 'bar@here.com', alarm({}))) + 'bar@here.com', 0, alarm({}))) notifiers.send_notifications(notifications) diff --git a/tests/test_pagerduty_notification.py b/tests/test_pagerduty_notification.py index d4f3ea6..e686e8c 100644 --- a/tests/test_pagerduty_notification.py +++ b/tests/test_pagerduty_notification.py @@ -146,6 +146,7 @@ class TestWebhook(unittest.TestCase): 1, 'pagerduty notification', 'ABCDEF', + 0, alarm_dict) self.trap.put(pagerduty.send_notification(notification)) diff --git a/tests/test_webhook_notification.py b/tests/test_webhook_notification.py index 6bdbafc..630ce27 100644 --- a/tests/test_webhook_notification.py +++ b/tests/test_webhook_notification.py @@ -81,7 +81,7 @@ class TestWebhook(unittest.TestCase): alarm_dict = alarm(metric) - notification = Notification('webhook', 0, 1, 'webhook notification', 'me@here.com', alarm_dict) + notification = Notification('webhook', 0, 1, 'webhook notification', 'me@here.com', 0, alarm_dict) self.trap.put(webhook.send_notification(notification))