Added pagerduty support to notification_processor
Refactored the tests to make pagerduty functionality more clear Change-Id: Ie9616d8629112fc5f1f1576951a3cbe3b99218b0
This commit is contained in:
parent
9b1df54682
commit
45a2411ebb
|
@ -157,7 +157,8 @@ def main(argv=None):
|
|||
sent_notifications,
|
||||
finished,
|
||||
config['email'],
|
||||
config['webhook']
|
||||
config['webhook'],
|
||||
config['pagerduty']
|
||||
).run),
|
||||
)
|
||||
processors.extend(notification_processors)
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import email.mime.text
|
||||
import json
|
||||
import logging
|
||||
import monascastatsd
|
||||
import requests
|
||||
|
@ -28,8 +29,9 @@ log = logging.getLogger(__name__)
|
|||
|
||||
class NotificationProcessor(BaseProcessor):
|
||||
|
||||
def __init__(self, notification_queue, sent_notification_queue,
|
||||
finished_queue, email_config, webhook_config):
|
||||
def __init__(self, notification_queue,
|
||||
sent_notification_queue, finished_queue,
|
||||
email_config, webhook_config, pagerduty_config):
|
||||
self.notification_queue = notification_queue
|
||||
self.sent_notification_queue = sent_notification_queue
|
||||
self.finished_queue = finished_queue
|
||||
|
@ -39,12 +41,18 @@ class NotificationProcessor(BaseProcessor):
|
|||
self.webhook_config = {'timeout': 5}
|
||||
self.webhook_config.update(webhook_config)
|
||||
|
||||
self.pagerduty_config = {
|
||||
'timeout': 5,
|
||||
'url': 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'}
|
||||
self.pagerduty_config.update(pagerduty_config)
|
||||
|
||||
self.smtp = None
|
||||
self._smtp_connect()
|
||||
|
||||
# Types as key, method used to process that type as value
|
||||
self.notification_types = {'email': self._send_email,
|
||||
'webhook': self._post_webhook}
|
||||
'webhook': self._post_webhook,
|
||||
'pagerduty': self._post_pagerduty}
|
||||
|
||||
self.statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions)
|
||||
|
||||
|
@ -156,15 +164,49 @@ class NotificationProcessor(BaseProcessor):
|
|||
except:
|
||||
log.error("Error trying to post on URL %s: %s." % (url, sys.exc_info()[0]))
|
||||
|
||||
def _post_pagerduty(self, notification):
|
||||
"""Send pagerduty notification
|
||||
"""
|
||||
|
||||
url = self.pagerduty_config['url']
|
||||
headers = {"content-type": "application/json"}
|
||||
body = {"service_key": notification.address,
|
||||
"event_type": "trigger",
|
||||
"description": notification.message,
|
||||
"client": "Monasca",
|
||||
"client_url": "",
|
||||
"details": {"alarm_id": notification.alarm_id,
|
||||
"alarm_name": notification.alarm_name,
|
||||
"current": notification.state,
|
||||
"message": notification.message}}
|
||||
|
||||
try:
|
||||
result = requests.post(url=url,
|
||||
data=json.dumps(body),
|
||||
headers=headers,
|
||||
timeout=self.pagerduty_config['timeout'])
|
||||
|
||||
valid_http_codes = [200, 201, 204]
|
||||
if result.status_code in valid_http_codes:
|
||||
return notification
|
||||
|
||||
log.error("Error with pagerduty request. key=<%s> response=%s"
|
||||
% (notification.address, result.status_code))
|
||||
except:
|
||||
log.error("Exception on pagerduty request. key=<%s> exception=%s"
|
||||
% (notification.address, sys.exc_info()[0]))
|
||||
|
||||
def run(self):
|
||||
"""Send the notifications
|
||||
For each notification in a message it is sent according to its type.
|
||||
If all notifications fail the alarm partition/offset are added to the the finished queue
|
||||
"""
|
||||
counters = {'email': self.statsd.get_counter(name='sent_smtp_count'),
|
||||
'webhook': self.statsd.get_counter(name='sent_webhook_count')}
|
||||
'webhook': self.statsd.get_counter(name='sent_webhook_count'),
|
||||
'pagerduty': self.statsd.get_counter(name='sent_pagerduty_count')}
|
||||
timers = {'email': self.statsd.get_timer(),
|
||||
'webhook': self.statsd.get_timer()}
|
||||
'webhook': self.statsd.get_timer(),
|
||||
'pagerduty': self.statsd.get_timer()}
|
||||
invalid_type_count = self.statsd.get_counter(name='invalid_type_count')
|
||||
sent_failed_count = self.statsd.get_counter(name='sent_failed_count')
|
||||
|
||||
|
@ -187,7 +229,6 @@ class NotificationProcessor(BaseProcessor):
|
|||
sent.notification_timestamp = time.time()
|
||||
sent_notifications.append(sent)
|
||||
counters[notification.type] += 1
|
||||
|
||||
if len(sent_notifications) == 0: # All notifications failed
|
||||
self._add_to_queue(
|
||||
self.finished_queue, 'finished', (notifications[0].src_partition, notifications[0].src_offset))
|
||||
|
|
|
@ -24,6 +24,10 @@ email:
|
|||
webhook:
|
||||
timeout: 5
|
||||
|
||||
pagerduty:
|
||||
timeout: 5
|
||||
url: "https://events.pagerduty.com/generic/2010-04-15/create_event.json"
|
||||
|
||||
processors:
|
||||
alarm:
|
||||
number: 2
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
"""Tests NotificationProcessor"""
|
||||
|
||||
import json
|
||||
import mock
|
||||
import multiprocessing
|
||||
import requests
|
||||
|
@ -53,6 +54,17 @@ class TestStateTracker(unittest.TestCase):
|
|||
'timeout': 60,
|
||||
'from_addr': 'hpcs.mon@hp.com'}
|
||||
self.webhook_config = {'timeout': 50}
|
||||
self.pagerduty_config = {'timeout': 50, 'key': 'foobar'}
|
||||
|
||||
def tearDown(self):
|
||||
self.assertTrue(self.log_queue.empty())
|
||||
self.assertTrue(self.sent_notification_queue.empty())
|
||||
self.assertTrue(self.finished_queue.empty())
|
||||
self.processor.terminate()
|
||||
|
||||
# ------------------------------------------------------------------------
|
||||
# Test helper functions
|
||||
# ------------------------------------------------------------------------
|
||||
|
||||
@mock.patch('monasca_notification.processors.notification_processor.requests')
|
||||
@mock.patch('monasca_notification.processors.notification_processor.smtplib')
|
||||
|
@ -77,7 +89,8 @@ class TestStateTracker(unittest.TestCase):
|
|||
self.sent_notification_queue,
|
||||
self.finished_queue,
|
||||
self.email_config,
|
||||
self.webhook_config))
|
||||
self.webhook_config,
|
||||
self.pagerduty_config))
|
||||
|
||||
self.processor = multiprocessing.Process(target=nprocessor.run)
|
||||
self.processor.start()
|
||||
|
@ -86,18 +99,145 @@ class TestStateTracker(unittest.TestCase):
|
|||
return smtpStub(self.log_queue)
|
||||
|
||||
def _http_post_200(self, url, data, headers, **kwargs):
|
||||
self.log_queue.put("%s %s %s" % (url, data, headers))
|
||||
self.log_queue.put(url)
|
||||
self.log_queue.put(data)
|
||||
self.log_queue.put(headers)
|
||||
r = requestsResponse(200)
|
||||
return r
|
||||
|
||||
def _http_post_201(self, url, data, headers, **kwargs):
|
||||
self.log_queue.put(url)
|
||||
self.log_queue.put(data)
|
||||
self.log_queue.put(headers)
|
||||
r = requestsResponse(201)
|
||||
return r
|
||||
|
||||
def _http_post_202(self, url, data, headers, **kwargs):
|
||||
r = requestsResponse(202)
|
||||
return r
|
||||
|
||||
def _http_post_204(self, url, data, headers, **kwargs):
|
||||
self.log_queue.put(url)
|
||||
self.log_queue.put(data)
|
||||
self.log_queue.put(headers)
|
||||
r = requestsResponse(204)
|
||||
return r
|
||||
|
||||
def _http_post_400(self, url, data, headers, **kwargs):
|
||||
r = requestsResponse(400)
|
||||
return r
|
||||
|
||||
def _http_post_403(self, url, data, headers, **kwargs):
|
||||
r = requestsResponse(403)
|
||||
return r
|
||||
|
||||
def _http_post_404(self, url, data, headers, **kwargs):
|
||||
r = requestsResponse(404)
|
||||
return r
|
||||
|
||||
def _http_post_500(self, url, data, headers, **kwargs):
|
||||
r = requestsResponse(500)
|
||||
return r
|
||||
|
||||
def _http_post_504(self, url, data, headers, **kwargs):
|
||||
r = requestsResponse(504)
|
||||
return r
|
||||
|
||||
def _http_post_exception(self, url, data, headers, **kwargs):
|
||||
self.log_queue.put("timeout %s" % kwargs["timeout"])
|
||||
raise requests.exceptions.Timeout
|
||||
|
||||
def assertSentNotification(self):
|
||||
notification_msg = self.sent_notification_queue.get(timeout=3)
|
||||
self.assertNotEqual(notification_msg, None)
|
||||
|
||||
def assertSentFinished(self):
|
||||
finished_msg = self.finished_queue.get(timeout=3)
|
||||
self.assertNotEqual(finished_msg, None)
|
||||
|
||||
def alarm(self, metrics):
|
||||
return {"tenantId": "0",
|
||||
"alarmId": "0",
|
||||
"alarmName": "test Alarm",
|
||||
"oldState": "OK",
|
||||
"newState": "ALARM",
|
||||
"stateChangeReason": "I am alarming!",
|
||||
"timestamp": time.time(),
|
||||
"metrics": metrics}
|
||||
|
||||
def email_setup(self, metric):
|
||||
alarm_dict = self.alarm(metric)
|
||||
|
||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
||||
|
||||
self.notification_queue.put([notification])
|
||||
self._start_processor()
|
||||
|
||||
def webhook_setup(self, http_func):
|
||||
self.http_func = http_func
|
||||
|
||||
metrics = []
|
||||
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
|
||||
metrics.append(metric_data)
|
||||
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
|
||||
metrics.append(metric_data)
|
||||
|
||||
alarm_dict = self.alarm(metrics)
|
||||
|
||||
notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
||||
|
||||
self.notification_queue.put([notification])
|
||||
self._start_processor()
|
||||
|
||||
def pagerduty_setup(self, http_stub):
|
||||
self.http_func = http_stub
|
||||
|
||||
metrics = []
|
||||
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
|
||||
metrics.append(metric_data)
|
||||
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
|
||||
metrics.append(metric_data)
|
||||
|
||||
alarm_dict = self.alarm(metrics)
|
||||
|
||||
notification = Notification('pagerduty',
|
||||
0,
|
||||
1,
|
||||
'pagerduty notification',
|
||||
'ABCDEF',
|
||||
alarm_dict)
|
||||
self._start_processor()
|
||||
self.notification_queue.put([notification])
|
||||
|
||||
def valid_pagerduty_message(self, url, data, headers):
|
||||
self.assertEqual(
|
||||
url, 'https://events.pagerduty.com/generic/2010-04-15/create_event.json')
|
||||
|
||||
headers = dict(headers)
|
||||
self.assertEqual(headers['content-type'], 'application/json')
|
||||
|
||||
data = dict(json.loads(data))
|
||||
self.assertEqual(data['service_key'], 'ABCDEF')
|
||||
self.assertEqual(data['event_type'], 'trigger')
|
||||
self.assertEqual(data['description'], 'I am alarming!')
|
||||
self.assertEqual(data['client'], 'Monasca')
|
||||
self.assertEqual(data['client_url'], '')
|
||||
|
||||
details = dict(data['details'])
|
||||
self.assertEqual(details['alarm_id'], '0')
|
||||
self.assertEqual(details['alarm_name'], 'test Alarm')
|
||||
self.assertEqual(details['current'], 'ALARM')
|
||||
self.assertEqual(details['message'], 'I am alarming!')
|
||||
|
||||
def pagerduty_http_error(self, log_msg, http_response):
|
||||
self.assertRegexpMatches(log_msg, "Error with pagerduty request.")
|
||||
self.assertRegexpMatches(log_msg, "key=<ABCDEF>")
|
||||
self.assertRegexpMatches(log_msg, "response=%s" % http_response)
|
||||
|
||||
# ------------------------------------------------------------------------
|
||||
# Unit tests
|
||||
# ------------------------------------------------------------------------
|
||||
|
||||
def test_invalid_notification(self):
|
||||
"""Verify invalid notification type is rejected.
|
||||
"""
|
||||
|
@ -122,21 +262,9 @@ class TestStateTracker(unittest.TestCase):
|
|||
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
|
||||
metrics.append(metric_data)
|
||||
|
||||
alarm_dict = {"tenantId": "0",
|
||||
"alarmId": "0",
|
||||
"alarmName": "test Alarm",
|
||||
"oldState": "OK",
|
||||
"newState": "ALARM",
|
||||
"stateChangeReason": "I am alarming!",
|
||||
"timestamp": time.time(),
|
||||
"metrics": metrics}
|
||||
self.email_setup(metrics)
|
||||
|
||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
||||
|
||||
self.notification_queue.put([notification])
|
||||
self._start_processor()
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.processor.terminate()
|
||||
|
||||
self.assertRegexpMatches(log_msg, "From: hpcs.mon@hp.com")
|
||||
self.assertRegexpMatches(log_msg, "To: me@here.com")
|
||||
|
@ -144,7 +272,7 @@ class TestStateTracker(unittest.TestCase):
|
|||
self.assertRegexpMatches(log_msg, "Alarm .test Alarm.")
|
||||
self.assertRegexpMatches(log_msg, "On host .foo1.")
|
||||
|
||||
self.assertTrue(self.log_queue.empty())
|
||||
self.assertSentNotification()
|
||||
|
||||
def test_email_notification_multiple_hosts(self):
|
||||
"""Email with multiple hosts
|
||||
|
@ -156,21 +284,9 @@ class TestStateTracker(unittest.TestCase):
|
|||
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
|
||||
metrics.append(metric_data)
|
||||
|
||||
alarm_dict = {"tenantId": "0",
|
||||
"alarmId": "0",
|
||||
"alarmName": "test Alarm",
|
||||
"oldState": "OK",
|
||||
"newState": "ALARM",
|
||||
"stateChangeReason": "I am alarming!",
|
||||
"timestamp": time.time(),
|
||||
"metrics": metrics}
|
||||
self.email_setup(metrics)
|
||||
|
||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
||||
|
||||
self.notification_queue.put([notification])
|
||||
self._start_processor()
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.processor.terminate()
|
||||
|
||||
self.assertRegexpMatches(log_msg, "From: hpcs.mon@hp.com")
|
||||
self.assertRegexpMatches(log_msg, "To: me@here.com")
|
||||
|
@ -179,69 +295,31 @@ class TestStateTracker(unittest.TestCase):
|
|||
self.assertNotRegexpMatches(log_msg, "foo1")
|
||||
self.assertNotRegexpMatches(log_msg, "foo2")
|
||||
|
||||
self.assertTrue(self.log_queue.empty())
|
||||
self.assertSentNotification()
|
||||
|
||||
def test_webhook_good_http_response(self):
|
||||
"""webhook good response
|
||||
"""webhook 200
|
||||
"""
|
||||
|
||||
self.http_func = self._http_post_200
|
||||
self.webhook_setup(self._http_post_200)
|
||||
|
||||
metrics = []
|
||||
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
|
||||
metrics.append(metric_data)
|
||||
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
|
||||
metrics.append(metric_data)
|
||||
url = self.log_queue.get(timeout=3)
|
||||
data = self.log_queue.get(timeout=3)
|
||||
headers = self.log_queue.get(timeout=3)
|
||||
|
||||
alarm_dict = {"tenantId": "0",
|
||||
"alarmId": "0",
|
||||
"alarmName": "test Alarm",
|
||||
"oldState": "OK",
|
||||
"newState": "ALARM",
|
||||
"stateChangeReason": "I am alarming!",
|
||||
"timestamp": time.time(),
|
||||
"metrics": metrics}
|
||||
self.assertEqual(url, "me@here.com")
|
||||
self.assertEqual(data, {'alarm_id': '0'})
|
||||
self.assertEqual(headers, {'content-type': 'application/json'})
|
||||
|
||||
notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
||||
|
||||
self.notification_queue.put([notification])
|
||||
self._start_processor()
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.processor.terminate()
|
||||
|
||||
self.assertRegexpMatches(log_msg, "me@here.com")
|
||||
self.assertRegexpMatches(log_msg, "alarm_id.: '0'")
|
||||
self.assertRegexpMatches(log_msg, "content-type.: .application/json")
|
||||
|
||||
self.assertTrue(self.log_queue.empty())
|
||||
self.assertSentNotification()
|
||||
|
||||
def test_webhook_bad_http_response(self):
|
||||
"""webhook bad response
|
||||
"""
|
||||
|
||||
self.http_func = self._http_post_404
|
||||
self.webhook_setup(self._http_post_404)
|
||||
|
||||
metrics = []
|
||||
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
|
||||
metrics.append(metric_data)
|
||||
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
|
||||
metrics.append(metric_data)
|
||||
|
||||
alarm_dict = {"tenantId": "0",
|
||||
"alarmId": "0",
|
||||
"alarmName": "test Alarm",
|
||||
"oldState": "OK",
|
||||
"newState": "ALARM",
|
||||
"stateChangeReason": "I am alarming!",
|
||||
"timestamp": time.time(),
|
||||
"metrics": metrics}
|
||||
|
||||
notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
||||
|
||||
self.notification_queue.put([notification])
|
||||
self._start_processor()
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.processor.terminate()
|
||||
|
||||
self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm")
|
||||
self.assertNotRegexpMatches(log_msg, "content-type.: .application/json")
|
||||
|
@ -249,39 +327,13 @@ class TestStateTracker(unittest.TestCase):
|
|||
self.assertRegexpMatches(log_msg, "HTTP code 404")
|
||||
self.assertRegexpMatches(log_msg, "post on URL me@here.com")
|
||||
|
||||
self.assertTrue(self.log_queue.empty())
|
||||
self.assertSentFinished()
|
||||
|
||||
def test_webhook_timeout_exception_on_http_response(self):
|
||||
"""webhook timeout exception
|
||||
"""
|
||||
|
||||
self.http_func = self._http_post_exception
|
||||
|
||||
metrics = []
|
||||
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
|
||||
metrics.append(metric_data)
|
||||
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
|
||||
metrics.append(metric_data)
|
||||
|
||||
alarm_dict = {"tenantId": "0",
|
||||
"alarmId": "0",
|
||||
"alarmName": "test Alarm",
|
||||
"oldState": "OK",
|
||||
"newState": "ALARM",
|
||||
"stateChangeReason": "I am alarming!",
|
||||
"timestamp": time.time(),
|
||||
"metrics": metrics}
|
||||
|
||||
notification = Notification('webhook',
|
||||
0,
|
||||
1,
|
||||
'webhook notification',
|
||||
'http://localhost:21356',
|
||||
alarm_dict)
|
||||
|
||||
self._start_processor()
|
||||
|
||||
self.notification_queue.put([notification])
|
||||
self.webhook_setup(self._http_post_exception)
|
||||
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
|
||||
|
@ -292,9 +344,111 @@ class TestStateTracker(unittest.TestCase):
|
|||
self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm")
|
||||
self.assertNotRegexpMatches(log_msg, "content-type.: .application/json")
|
||||
|
||||
self.assertRegexpMatches(log_msg, "Error trying to post on URL http://localhost:21356")
|
||||
self.assertRegexpMatches(log_msg, "Error trying to post on URL me@here")
|
||||
self.assertRaises(requests.exceptions.Timeout)
|
||||
|
||||
self.assertTrue(self.log_queue.empty())
|
||||
self.assertSentFinished()
|
||||
|
||||
self.processor.terminate()
|
||||
def test_pagerduty_200(self):
|
||||
"""pagerduty 200
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_200)
|
||||
|
||||
url = self.log_queue.get(timeout=3)
|
||||
data = self.log_queue.get(timeout=3)
|
||||
headers = self.log_queue.get(timeout=3)
|
||||
|
||||
self.valid_pagerduty_message(url, data, headers)
|
||||
self.assertSentNotification()
|
||||
|
||||
def test_pagerduty_201(self):
|
||||
"""pagerduty 201
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_201)
|
||||
|
||||
url = self.log_queue.get(timeout=3)
|
||||
data = self.log_queue.get(timeout=3)
|
||||
headers = self.log_queue.get(timeout=3)
|
||||
|
||||
self.valid_pagerduty_message(url, data, headers)
|
||||
self.assertSentNotification()
|
||||
|
||||
def test_pagerduty_204(self):
|
||||
"""pagerduty 204
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_204)
|
||||
|
||||
url = self.log_queue.get(timeout=3)
|
||||
data = self.log_queue.get(timeout=3)
|
||||
headers = self.log_queue.get(timeout=3)
|
||||
|
||||
self.valid_pagerduty_message(url, data, headers)
|
||||
self.assertSentNotification()
|
||||
|
||||
def test_pagerduty_202(self):
|
||||
"""pagerduty 202
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_202)
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.pagerduty_http_error(log_msg, "202")
|
||||
self.assertSentFinished()
|
||||
|
||||
def test_pagerduty_400(self):
|
||||
"""pagerduty 400
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_400)
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.pagerduty_http_error(log_msg, "400")
|
||||
self.assertSentFinished()
|
||||
|
||||
def test_pagerduty_403(self):
|
||||
"""pagerduty 403
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_403)
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.pagerduty_http_error(log_msg, "403")
|
||||
self.assertSentFinished()
|
||||
|
||||
def test_pagerduty_500(self):
|
||||
"""pagerduty 500
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_500)
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.pagerduty_http_error(log_msg, "500")
|
||||
self.assertSentFinished()
|
||||
|
||||
def test_pagerduty_504(self):
|
||||
"""pagerduty 504
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_504)
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
self.pagerduty_http_error(log_msg, "504")
|
||||
self.assertSentFinished()
|
||||
|
||||
def test_pagerduty_exception(self):
|
||||
"""pagerduty exception
|
||||
"""
|
||||
|
||||
self.pagerduty_setup(self._http_post_exception)
|
||||
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
|
||||
self.assertEqual(log_msg, "timeout 50")
|
||||
|
||||
log_msg = self.log_queue.get(timeout=3)
|
||||
|
||||
self.assertRegexpMatches(log_msg, "Exception on pagerduty request")
|
||||
self.assertRegexpMatches(log_msg, "key=<ABCDEF>")
|
||||
self.assertRegexpMatches(
|
||||
log_msg, "exception=<class 'requests.exceptions.Timeout'>")
|
||||
|
||||
self.assertRaises(requests.exceptions.Timeout)
|
||||
self.assertSentFinished()
|
||||
|
|
Loading…
Reference in New Issue