Merge "Added retry engine to notification system"

This commit is contained in:
Jenkins 2015-02-11 18:12:18 +00:00 committed by Gerrit Code Review
commit e5ede47918
17 changed files with 216 additions and 62 deletions

View File

@ -28,6 +28,7 @@ import time
import yaml
from notification_engine import NotificationEngine
from retry_engine import RetryEngine
log = logging.getLogger(__name__)
processors = [] # global list to facilitate clean signal handling
@ -67,6 +68,12 @@ def clean_exit(signum, frame=None):
sys.exit(0)
def start_process(proccess_type, config):
log.info("start process: {}".format(proccess_type))
p = proccess_type(config)
p.run()
def main(argv=None):
if argv is None:
argv = sys.argv
@ -84,8 +91,12 @@ def main(argv=None):
# Setup logging
logging.config.dictConfig(config['logging'])
notifier = multiprocessing.Process(target=NotificationEngine(config).run)
processors.append(notifier)
for proc in range(0, config['processors']['notification']['number']):
processors.append(multiprocessing.Process(
target=start_process, args=(NotificationEngine, config)))
processors.append(multiprocessing.Process(
target=start_process, args=(RetryEngine, config)))
# Start
try:

View File

@ -33,17 +33,21 @@ class Notification(object):
'state',
'tenant_id',
'type',
'metrics'
'metrics',
'retry_count',
'raw_alarm'
)
def __init__(self, ntype, src_partition, src_offset, name, address, alarm):
def __init__(self, ntype, src_partition, src_offset, name, address,
retry_count, alarm):
"""Setup the notification object
The src_partition and src_offset allow the notification
to be linked to the alarm that it came from.
ntype - The notification type
name - Name used in sending
address - to send the notification to
alarm_data - info that caused the notification
retry_count - number of times we've tried to send
alarm - info that caused the notification
notifications that come after this one to remain uncommitted.
"""
self.address = address
@ -51,6 +55,9 @@ class Notification(object):
self.src_partition = src_partition
self.src_offset = src_offset
self.type = ntype
self.retry_count = retry_count
self.raw_alarm = alarm
self.alarm_id = alarm['alarmId']
self.alarm_name = alarm['alarmName']
@ -77,8 +84,11 @@ class Notification(object):
"""Return json representation
"""
notification_fields = [
'address',
'type',
'name',
'address',
'retry_count',
'raw_alarm',
'alarm_id',
'alarm_name',
'alarm_timestamp',

View File

@ -26,13 +26,14 @@ class NotificationEngine(object):
def __init__(self, config):
self._topics = {}
self._topics['notification_topic'] = config['kafka']['notification_topic']
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
self._statsd = monascastatsd.Client(name='monasca',
dimensions=BaseProcessor.dimensions)
self._consumer = KafkaConsumer(config['kafka']['url'],
config['zookeeper']['url'],
config['zookeeper']['path'],
config['zookeeper']['notification_path'],
config['kafka']['group'],
config['kafka']['alarm_topic'])
@ -62,6 +63,7 @@ class NotificationEngine(object):
if notifications:
sent, failed = self._notifier.send(notifications)
self._producer.publish(self._topics['notification_topic'], sent)
self._producer.publish(self._topics['retry_topic'], failed)
self._consumer.commit([partition])

View File

@ -125,6 +125,7 @@ class AlarmProcessor(BaseProcessor):
offset,
row[0],
row[2],
0,
alarm) for row in cur]
if len(notifications) == 0:

View File

@ -17,6 +17,7 @@ import kafka.client
import kafka.producer
import logging
import monascastatsd
import time
from monasca_notification.processors.base import BaseProcessor
@ -34,10 +35,10 @@ class KafkaProducer(BaseProcessor):
self._statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions)
self._kafka = kafka.client.KafkaClient(url)
self._producer = kafka.producer.SimpleProducer(
self._producer = kafka.producer.KeyedProducer(
self._kafka,
async=False,
req_acks=kafka.producer.SimpleProducer.ACK_AFTER_LOCAL_WRITE,
req_acks=kafka.producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
def publish(self, topic, messages):
@ -46,7 +47,13 @@ class KafkaProducer(BaseProcessor):
published_to_kafka = self._statsd.get_counter(name='published_to_kafka')
for message in messages:
responses = self._producer.send_messages(topic, message.to_json())
key = time.time() * 1000
try:
responses = self._producer.send(topic, key, message.to_json())
except Exception:
log.exception("error publishing message to kafka")
continue
published_to_kafka += 1
log.debug('Published to topic {}, message {}'.format(topic, message.to_json()))

View File

@ -40,10 +40,10 @@ class NotificationProcessor(BaseProcessor):
sent, failed, invalid = notifiers.send_notifications(notifications)
if failed > 0:
sent_failed_count.increment(failed)
if failed:
sent_failed_count.increment(len(failed))
if invalid > 0:
invalid_type_count.increment(invalid)
if invalid:
invalid_type_count.increment(len(invalid))
return sent, failed

View File

@ -0,0 +1,98 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import monascastatsd
import time
from notification import Notification
from processors.base import BaseProcessor
from processors.kafka_consumer import KafkaConsumer
from processors.kafka_producer import KafkaProducer
from processors.notification_processor import NotificationProcessor
log = logging.getLogger(__name__)
class RetryEngine(object):
def __init__(self, config):
self._retry_interval = config['retry']['interval']
self._retry_max = config['retry']['max_attempts']
self._topics = {}
self._topics['notification_topic'] = config['kafka']['notification_topic']
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
self._statsd = monascastatsd.Client(name='monasca',
dimensions=BaseProcessor.dimensions)
self._consumer = KafkaConsumer(config['kafka']['url'],
config['zookeeper']['url'],
config['zookeeper']['notification_retry_path'],
config['kafka']['group'],
config['kafka']['notification_retry_topic'])
self._producer = KafkaProducer(config['kafka']['url'])
self._notifier = NotificationProcessor(config['notification_types'])
def run(self):
for raw_notification in self._consumer:
partition = raw_notification[0]
offset = raw_notification[1].offset
message = raw_notification[1].message.value
notification_data = json.loads(message)
ntype = notification_data['type']
name = notification_data['name']
addr = notification_data['address']
notification = Notification(ntype,
partition,
offset,
name,
addr,
notification_data['retry_count'],
notification_data['raw_alarm'])
wait_duration = self._retry_interval - (
time.time() - notification_data['notification_timestamp'])
if wait_duration > 0:
time.sleep(wait_duration)
sent, failed = self._notifier.send([notification])
if sent:
self._producer.publish(self._topics['notification_topic'], sent)
if failed:
notification.retry_count += 1
notification.notification_timestamp = time.time()
if notification.retry_count < self._retry_max:
log.error("retry failed for {} with name {} "
"at {}. "
"Saving for later retry.".format(ntype, name, addr))
self._producer.publish(self._topics['retry_topic'],
[notification])
else:
log.error("retry failed for {} with name {} "
"at {} after {} retries. "
"Giving up on retry."
.format(ntype, name, addr, self._retry_max))
self._consumer.commit([partition])

View File

@ -15,7 +15,6 @@
import email.mime.text
import smtplib
import sys
import time
from abstract_notifier import AbstractNotifier
@ -96,8 +95,7 @@ class EmailNotifier(AbstractNotifier):
self._smtp = smtp
except Exception:
self._log.exception("Unable to connect to email server. Exiting.")
sys.exit(1)
self._log.exception("Unable to connect to email server.")
def _create_msg(self, hostname, notification):
"""Create two kind of messages:

View File

@ -64,27 +64,28 @@ def config(config):
def send_notifications(notifications):
sent = []
failed_count = 0
invalid_count = 0
failed = []
invalid = []
for notification in notifications:
ntype = notification.type
if ntype not in configured_notifiers:
log.warn("attempting to send unconfigured notification: {}".format(ntype))
invalid_count += 1
invalid.append(notification)
continue
notification.notification_timestamp = time.time()
with statsd_timer.time(ntype + '_time'):
result = send_single_notification(notification)
if result:
notification.notification_timestamp = time.time()
sent.append(notification)
statsd_counter[ntype].increment(1)
else:
failed_count += 1
failed.append(notification)
return (sent, failed_count, invalid_count)
return (sent, failed, invalid)
def send_single_notification(notification):

View File

@ -3,6 +3,7 @@ kafka:
group: monasca-notification
alarm_topic: alarm-state-transitions
notification_topic: alarm-notifications
notification_retry_topic: retry-notifications
max_offset_lag: 600 # In seconds, undefined for none
mysql:
@ -36,6 +37,10 @@ processors:
notification:
number: 4
retry:
interval: 30
max_attempts: 5
queues:
alarms_size: 256
finished_size: 256
@ -44,6 +49,8 @@ queues:
zookeeper:
url: 192.168.10.4:2181 # or comma seperated list of multiple hosts
notification_path: /notification/alarms
notification_retry_path: /notification/retry
logging: # Used in logging.dictConfig
version: 1

View File

@ -116,7 +116,7 @@ class TestAlarmProcessor(unittest.TestCase):
sql_response = [['test notification', 'EMAIL', 'me@here.com']]
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', alarm_dict)
test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', 0, alarm_dict)
self.assertEqual(notifications, [test_notification])
self.assertEqual(partition, 0)
@ -132,8 +132,8 @@ class TestAlarmProcessor(unittest.TestCase):
sql_response = [['test notification', 'EMAIL', 'me@here.com'], ['test notification2', 'EMAIL', 'me@here.com']]
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', alarm_dict)
test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', alarm_dict)
test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', 0, alarm_dict)
test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', 0, alarm_dict)
self.assertEqual(notifications, [test_notification, test_notification2])
self.assertEqual(partition, 0)

View File

@ -84,7 +84,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metric)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
self.trap.append(email.send_notification(notification))
@ -133,9 +133,8 @@ class TestEmail(unittest.TestCase):
return_value = self.trap.pop(0)
self.assertTrue(return_value)
@mock.patch('monasca_notification.types.email_notifier.sys')
@mock.patch('monasca_notification.types.email_notifier.smtplib')
def test_smtp_sendmail_failed_connection_twice(self, mock_smtp, mock_sys):
def test_smtp_sendmail_failed_connection_twice(self, mock_smtp):
"""Email that fails on smtp_connect twice
"""
@ -171,17 +170,15 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
self.trap.append(email.send_notification(notification))
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
self.assertIn("Unable to connect to email server. Exiting.", self.trap)
self.assertTrue(mock_sys.exit.called)
self.assertIn("Unable to connect to email server.", self.trap)
@mock.patch('monasca_notification.types.email_notifier.sys')
@mock.patch('monasca_notification.types.email_notifier.smtplib')
def test_smtp_sendmail_failed_connection_once_then_email(self, mock_smtp, mock_sys):
def test_smtp_sendmail_failed_connection_once_then_email(self, mock_smtp):
"""Email that fails on smtp_connect once then email
"""
@ -214,13 +211,13 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
self.trap.append(email.send_notification(notification))
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
self.assertIn("Error sending Email Notification", self.trap)
self.assertFalse(mock_sys.exit.called)
self.assertNotIn("Unable to connect to email server.", self.trap)
@mock.patch('monasca_notification.types.email_notifier.smtplib')
def test_smtp_sendmail_failed_connection_once(self, mock_smtp):
@ -254,7 +251,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
self.trap.append(email.send_notification(notification))
@ -294,7 +291,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
self.trap.append(email.send_notification(notification))

View File

@ -29,9 +29,12 @@ def test_json():
'newState': 'newState',
'tenantId': 'tenantId',
'metrics': 'cpu_util'}
test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
test_notification = notification.Notification('ntype', 'src_partition',
'src_offset', 'name',
'address', 0, alarm)
expected_dict = {u'name': u'name',
u'type': u'ntype',
u'notification_timestamp': None,
u'tenant_id': u'tenantId',
u'alarm_name': u'alarmName',
@ -39,7 +42,17 @@ def test_json():
u'state': u'newState',
u'alarm_timestamp': u'timestamp',
u'address': u'address',
u'message': u'stateChangeReason'}
u'message': u'stateChangeReason',
u'retry_count': 0,
u'raw_alarm': {
u'alarmId': u'alarmId',
u'alarmName': u'alarmName',
u'timestamp': u'timestamp',
u'stateChangeReason': u'stateChangeReason',
u'newState': u'newState',
u'tenantId': u'tenantId',
u'metrics': u'cpu_util'}}
# Compare as dicts so ordering is not an issue
assert json.loads(test_notification.to_json()) == expected_dict
@ -52,8 +65,12 @@ def test_equal():
'newState': 'newState',
'tenantId': 'tenantId',
'metrics': 'cpu_util'}
test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
test_notification2 = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
test_notification = notification.Notification('ntype', 'src_partition',
'src_offset', 'name',
'address', 0, alarm)
test_notification2 = notification.Notification('ntype', 'src_partition',
'src_offset', 'name',
'address', 0, alarm)
assert(test_notification == test_notification2)
@ -66,8 +83,12 @@ def test_unequal():
'newState': 'newState',
'tenantId': 'tenantId',
'metrics': 'cpu_util'}
test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
test_notification2 = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
test_notification = notification.Notification('ntype', 'src_partition',
'src_offset', 'name',
'address', 0, alarm)
test_notification2 = notification.Notification('ntype', 'src_partition',
'src_offset', 'name',
'address', 0, alarm)
test_notification2.alarm_id = None
assert(test_notification != test_notification2)

View File

@ -85,7 +85,7 @@ class TestNotificationProcessor(unittest.TestCase):
"timestamp": time.time(),
"metrics": metric}
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
self._start_processor([notification])
@ -98,7 +98,7 @@ class TestNotificationProcessor(unittest.TestCase):
"""
alarm_dict = {"tenantId": "0", "alarmId": "0", "alarmName": "test Alarm", "oldState": "OK", "newState": "ALARM",
"stateChangeReason": "I am alarming!", "timestamp": time.time(), "metrics": "cpu_util"}
invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', alarm_dict)
invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', 0, alarm_dict)
self._start_processor([invalid_notification])

View File

@ -224,7 +224,7 @@ class TestInterface(unittest.TestCase):
notifications = []
notifications.append(Notification('email', 0, 1,
'email notification',
'me@here.com', alarm({})))
'me@here.com', 0, alarm({})))
notifiers.send_notifications(notifications)
@ -250,13 +250,13 @@ class TestInterface(unittest.TestCase):
notifications = []
notifications.append(Notification('email', 0, 1,
'email notification',
'me@here.com', alarm({})))
'me@here.com', 0, alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications)
self.assertEqual(sent, [])
self.assertEqual(failed, 1)
self.assertEqual(invalid, 0)
self.assertEqual(len(failed), 1)
self.assertEqual(invalid, [])
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@ -279,13 +279,13 @@ class TestInterface(unittest.TestCase):
notifications = []
notifications.append(Notification('pagerduty', 0, 1,
'pagerduty notification',
'me@here.com', alarm({})))
'me@here.com', 0, alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications)
self.assertEqual(sent, [])
self.assertEqual(failed, 0)
self.assertEqual(invalid, 1)
self.assertEqual(failed, [])
self.assertEqual(len(invalid), 1)
self.assertIn("attempting to send unconfigured notification: pagerduty", self.trap)
@ -312,19 +312,19 @@ class TestInterface(unittest.TestCase):
notifications = []
notifications.append(Notification('email', 0, 1,
'email notification',
'me@here.com', alarm({})))
'me@here.com', 0, alarm({})))
notifications.append(Notification('email', 0, 1,
'email notification',
'foo@here.com', alarm({})))
'foo@here.com', 0, alarm({})))
notifications.append(Notification('email', 0, 1,
'email notification',
'bar@here.com', alarm({})))
'bar@here.com', 0, alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications)
self.assertEqual(len(sent), 3)
self.assertEqual(failed, 0)
self.assertEqual(invalid, 0)
self.assertEqual(failed, [])
self.assertEqual(invalid, [])
for n in sent:
self.assertEqual(n.notification_timestamp, 42)
@ -348,13 +348,13 @@ class TestInterface(unittest.TestCase):
notifications = []
notifications.append(Notification('email', 0, 1,
'email notification',
'me@here.com', alarm({})))
'me@here.com', 0, alarm({})))
notifications.append(Notification('email', 0, 1,
'email notification',
'foo@here.com', alarm({})))
'foo@here.com', 0, alarm({})))
notifications.append(Notification('email', 0, 1,
'email notification',
'bar@here.com', alarm({})))
'bar@here.com', 0, alarm({})))
notifiers.send_notifications(notifications)

View File

@ -146,6 +146,7 @@ class TestWebhook(unittest.TestCase):
1,
'pagerduty notification',
'ABCDEF',
0,
alarm_dict)
self.trap.put(pagerduty.send_notification(notification))

View File

@ -81,7 +81,7 @@ class TestWebhook(unittest.TestCase):
alarm_dict = alarm(metric)
notification = Notification('webhook', 0, 1, 'webhook notification', 'me@here.com', alarm_dict)
notification = Notification('webhook', 0, 1, 'webhook notification', 'me@here.com', 0, alarm_dict)
self.trap.put(webhook.send_notification(notification))