Added timeout to webhook

Added a timeout keyword to requests.post that we use to send webhooks.  Updated
the tests to verify that the webhook will properly time out.

Change-Id: I27140a8c13afcd8f2e30ea1367f0bd9f4944ba36
This commit is contained in:
Joe Keen 2014-12-12 11:46:05 -07:00
parent a2271ba7c8
commit f0b4f91960
4 changed files with 50 additions and 19 deletions

View File

@ -156,7 +156,8 @@ def main(argv=None):
notifications,
sent_notifications,
finished,
config['email']
config['email'],
config['webhook']
).run),
)
processors.extend(notification_processors)

View File

@ -28,18 +28,24 @@ log = logging.getLogger(__name__)
class NotificationProcessor(BaseProcessor):
def __init__(self, notification_queue, sent_notification_queue, finished_queue, email_config):
def __init__(self, notification_queue, sent_notification_queue,
finished_queue, email_config, webhook_config):
self.notification_queue = notification_queue
self.sent_notification_queue = sent_notification_queue
self.finished_queue = finished_queue
self.email_config = email_config
self.webhook_config = {'timeout': 5}
self.webhook_config.update(webhook_config)
self.smtp = None
self._smtp_connect()
# Types as key, method used to process that type as value
self.notification_types = {'email': self._send_email,
'webhook': self._post_webhook}
self.monascastatsd = mstatsd.Client(name='monasca',
dimensions=BaseProcessor.dimensions)
@ -139,7 +145,10 @@ class NotificationProcessor(BaseProcessor):
try:
# Posting on the given URL
result = requests.post(url=url, data=body, headers=headers)
result = requests.post(url=url,
data=body,
headers=headers,
timeout=self.webhook_config['timeout'])
if result.status_code in range(200, 300):
log.info("Notification successfully posted.")
return notification

View File

@ -21,6 +21,9 @@ email:
timeout: 60
from_addr: hpcs.mon@hp.com
webhook:
timeout: 5
processors:
alarm:
number: 2

View File

@ -17,6 +17,7 @@
import mock
import multiprocessing
import requests
import time
import unittest
@ -51,6 +52,7 @@ class TestStateTracker(unittest.TestCase):
'password': None,
'timeout': 60,
'from_addr': 'hpcs.mon@hp.com'}
self.webhook_config = {'timeout': 50}
@mock.patch('monasca_notification.processors.notification_processor.requests')
@mock.patch('monasca_notification.processors.notification_processor.smtplib')
@ -70,27 +72,31 @@ class TestStateTracker(unittest.TestCase):
self.mock_log = mock_log
self.mock_smtp = mock_smtp
nprocessor = notification_processor.NotificationProcessor(
self.notification_queue, self.sent_notification_queue, self.finished_queue, self.email_config
)
nprocessor = (notification_processor.
NotificationProcessor(self.notification_queue,
self.sent_notification_queue,
self.finished_queue,
self.email_config,
self.webhook_config))
self.processor = multiprocessing.Process(target=nprocessor.run)
self.processor.start()
def _smtpStub(self, *arg, **kwargs):
return smtpStub(self.log_queue)
def _http_post_200(self, url, data, headers):
def _http_post_200(self, url, data, headers, **kwargs):
self.log_queue.put("%s %s %s" % (url, data, headers))
r = requestsResponse(200)
return r
def _http_post_404(self, url, data, headers):
def _http_post_404(self, url, data, headers, **kwargs):
r = requestsResponse(404)
return r
def _http_post_exception(self, url, data, headers):
# generate error
foobar
def _http_post_exception(self, url, data, headers, **kwargs):
self.log_queue.put("timeout %s" % kwargs["timeout"])
raise requests.exceptions.Timeout
def test_invalid_notification(self):
"""Verify invalid notification type is rejected.
@ -176,7 +182,7 @@ class TestStateTracker(unittest.TestCase):
self.assertTrue(self.log_queue.empty())
def test_webhook_good_http_response(self):
"""webhook
"""webhook good response
"""
self.http_func = self._http_post_200
@ -210,7 +216,7 @@ class TestStateTracker(unittest.TestCase):
self.assertTrue(self.log_queue.empty())
def test_webhook_bad_http_response(self):
"""webhook
"""webhook bad response
"""
self.http_func = self._http_post_404
@ -245,8 +251,8 @@ class TestStateTracker(unittest.TestCase):
self.assertTrue(self.log_queue.empty())
def test_webhook_exception_on_http_response(self):
"""webhook
def test_webhook_timeout_exception_on_http_response(self):
"""webhook timeout exception
"""
self.http_func = self._http_post_exception
@ -266,17 +272,29 @@ class TestStateTracker(unittest.TestCase):
"timestamp": time.time(),
"metrics": metrics}
notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict)
notification = Notification('webhook',
0,
1,
'webhook notification',
'http://localhost:21356',
alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
self.notification_queue.put([notification])
log_msg = self.log_queue.get(timeout=3)
self.assertEqual(log_msg, "timeout 50")
log_msg = self.log_queue.get(timeout=3)
self.processor.terminate()
self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm")
self.assertNotRegexpMatches(log_msg, "content-type.: .application/json")
self.assertRegexpMatches(log_msg, "Error trying to post on URL me@here.com")
self.assertRegexpMatches(log_msg, "Error trying to post on URL http://localhost:21356")
self.assertRaises(requests.exceptions.Timeout)
self.assertTrue(self.log_queue.empty())
self.processor.terminate()