Merge "notification: Use oslo.messaging batch listener"

This commit is contained in:
Jenkins 2016-01-19 04:34:28 +00:00 committed by Gerrit Code Review
commit 77dc986058
9 changed files with 131 additions and 104 deletions

View File

@ -19,12 +19,16 @@ import abc
import collections
from oslo_context import context
from oslo_log import log
import oslo_messaging
import six
from stevedore import extension
from ceilometer.i18n import _LE
from ceilometer import messaging
LOG = log.getLogger(__name__)
ExchangeTopics = collections.namedtuple('ExchangeTopics',
['exchange', 'topics'])
@ -74,39 +78,35 @@ class NotificationBase(PluginBase):
:param message: Message to process.
"""
def info(self, ctxt, publisher_id, event_type, payload, metadata):
def info(self, notifications):
"""RPC endpoint for notification messages at info level
When another service sends a notification over the message
bus, this method receives it.
:param ctxt: oslo.messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
:param notifications: list of notifications
"""
notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata)
self.to_samples_and_publish(context.get_admin_context(), notification)
self._process_notifications('info', notifications)
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
def sample(self, notifications):
"""RPC endpoint for notification messages at sample level
When another service sends a notification over the message
bus at sample priority, this method receives it.
:param ctxt: oslo.messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
:param notifications: list of notifications
"""
notification = messaging.convert_to_old_notification_format(
'sample', ctxt, publisher_id, event_type, payload, metadata)
self.to_samples_and_publish(context.get_admin_context(), notification)
self._process_notifications('sample', notifications)
def _process_notifications(self, priority, notifications):
for notification in notifications:
try:
notification = messaging.convert_to_old_notification_format(
priority, notification)
self.to_samples_and_publish(context.get_admin_context(),
notification)
except Exception:
LOG.error(_LE('Fail to process notification'), exc_info=True)
def to_samples_and_publish(self, context, notification):
"""Return samples produced by *process_notification*.

View File

@ -20,6 +20,7 @@ import oslo_messaging
from stevedore import extension
from ceilometer.event import converter as event_converter
from ceilometer.i18n import _LE
from ceilometer import messaging
LOG = logging.getLogger(__name__)
@ -35,48 +36,35 @@ class EventsNotificationEndpoint(object):
namespace='ceilometer.event.trait_plugin'))
self.manager = manager
def info(self, ctxt, publisher_id, event_type, payload, metadata):
def info(self, notifications):
"""Convert message at info level to Ceilometer Event.
:param ctxt: oslo_messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
:param notifications: list of notifications
"""
return self.process_notification('info', notifications)
# NOTE: the rpc layer currently rips out the notification
# delivery_info, which is critical to determining the
# source of the notification. This will have to get added back later.
notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata)
return self.process_notification(notification)
def error(self, notifications):
"""Convert message at error level to Ceilometer Event.
def error(self, ctxt, publisher_id, event_type, payload, metadata):
"""Convert error message to Ceilometer Event.
:param ctxt: oslo_messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
:param notifications: list of notifications
"""
return self.process_notification('error', notifications)
# NOTE: the rpc layer currently rips out the notification
# delivery_info, which is critical to determining the
# source of the notification. This will have to get added back later.
notification = messaging.convert_to_old_notification_format(
'error', ctxt, publisher_id, event_type, payload, metadata)
return self.process_notification(notification)
def process_notification(self, notification):
try:
event = self.event_converter.to_event(notification)
if event is not None:
with self.manager.publisher(self.ctxt) as p:
p(event)
except Exception:
if not cfg.CONF.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
raise
def process_notification(self, priority, notifications):
for notification in notifications:
# NOTE: the rpc layer currently rips out the notification
# delivery_info, which is critical to determining the
# source of the notification. This will have to get added back
# later.
notification = messaging.convert_to_old_notification_format(
priority, notification)
try:
event = self.event_converter.to_event(notification)
if event is not None:
with self.manager.publisher(self.ctxt) as p:
p(event)
except Exception:
if not cfg.CONF.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
LOG.error(_LE('Fail to process a notification'), exc_info=True)
return oslo_messaging.NotificationResult.HANDLED

View File

@ -105,16 +105,15 @@ def get_notifier(transport, publisher_id):
return notifier.prepare(publisher_id=publisher_id)
def convert_to_old_notification_format(priority, ctxt, publisher_id,
event_type, payload, metadata):
def convert_to_old_notification_format(priority, notification):
# FIXME(sileht): temporary convert notification to old format
# to focus on oslo_messaging migration before refactoring the code to
# use the new oslo_messaging facilities
notification = {'priority': priority,
'payload': payload,
'event_type': event_type,
'publisher_id': publisher_id}
notification.update(metadata)
for k in ctxt:
notification['_context_' + k] = ctxt[k]
notification = notification.copy()
notification['priority'] = priority
notification.update(notification["metadata"])
for k in notification['ctxt']:
notification['_context_' + k] = notification['ctxt'][k]
del notification['ctxt']
del notification['metadata']
return notification

View File

@ -66,6 +66,14 @@ OPTS = [
"Example: transport://user:pass@host1:port"
"[,hostN:portN]/virtual_host "
"(DEFAULT/transport_url is used if empty)"),
cfg.IntOpt('batch_size',
default=1,
help='Number of notification messages to wait before '
'publishing them'),
cfg.IntOpt('batch_timeout',
default=None,
help='Number of seconds to wait before publishing samples'
'when batch_size is not reached (None means indefinitely)'),
]
cfg.CONF.register_opts(exchange_control.EXCHANGE_OPTS)
@ -228,8 +236,10 @@ class NotificationService(service_base.BaseService):
urls = cfg.CONF.notification.messaging_urls or [None]
for url in urls:
transport = messaging.get_transport(url)
listener = messaging.get_notification_listener(
transport, targets, endpoints)
listener = messaging.get_batch_notification_listener(
transport, targets, endpoints,
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
listener.start()
self.listeners.append(listener)
@ -272,10 +282,12 @@ class NotificationService(service_base.BaseService):
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
listener = messaging.get_batch_notification_listener(
transport,
[oslo_messaging.Target(topic=topic)],
[pipe_endpoint(self.ctxt, pipe)])
[pipe_endpoint(self.ctxt, pipe)],
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
listener.start()
self.pipeline_listeners.append(listener)

View File

@ -19,6 +19,7 @@
import abc
import hashlib
from itertools import chain
import os
from oslo_config import cfg
@ -83,12 +84,13 @@ class PipelineEndpoint(object):
self.publish_context = PublishContext(context, [pipeline])
@abc.abstractmethod
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
def sample(self, messages):
pass
class SamplePipelineEndpoint(PipelineEndpoint):
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
def sample(self, messages):
samples = chain.from_iterable(m["payload"] for m in messages)
samples = [
sample_util.Sample(name=s['counter_name'],
type=s['counter_type'],
@ -100,7 +102,7 @@ class SamplePipelineEndpoint(PipelineEndpoint):
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source'))
for s in payload if publisher_utils.verify_signature(
for s in samples if publisher_utils.verify_signature(
s, cfg.CONF.publisher.telemetry_secret)
]
with self.publish_context as p:
@ -108,7 +110,8 @@ class SamplePipelineEndpoint(PipelineEndpoint):
class EventPipelineEndpoint(PipelineEndpoint):
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
def sample(self, messages):
events = chain.from_iterable(m["payload"] for m in messages)
events = [
models.Event(
message_id=ev['message_id'],
@ -119,7 +122,7 @@ class EventPipelineEndpoint(PipelineEndpoint):
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']],
raw=ev.get('raw', {}))
for ev in payload if publisher_utils.verify_signature(
for ev in events if publisher_utils.verify_signature(
ev, cfg.CONF.publisher.telemetry_secret)
]
try:

View File

@ -131,9 +131,11 @@ class TestNotification(tests_base.BaseTestCase):
self._do_process_notification_manager_start()
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
self.plugin.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
self.plugin.info([{'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'compute.instance.create.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints))
self.assertTrue(self.srv.pipeline_manager.publisher.called)
@ -415,9 +417,12 @@ class TestRealNotificationHA(BaseRealNotification):
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
None, None)):
continue
endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'nonmatching.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
endpoint.info([{
'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'nonmatching.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertFalse(mock_notifier.called)
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
@ -425,9 +430,13 @@ class TestRealNotificationHA(BaseRealNotification):
'compute.instance.create.end',
None, None)):
continue
endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
endpoint.info([{
'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'compute.instance.create.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertTrue(mock_notifier.called)
self.assertEqual(3, mock_notifier.call_count)
self.assertEqual('pipeline.event',

View File

@ -37,13 +37,16 @@ class NotificationBaseTestCase(base.BaseTestCase):
def test_plugin_info(self):
plugin = self.FakePlugin(mock.Mock())
plugin.to_samples_and_publish = mock.Mock()
ctxt = {'user_id': 'fake_user_id', 'project_id': 'fake_project_id'}
publisher_id = 'fake.publisher_id'
event_type = 'fake.event'
payload = {'foo': 'bar'}
metadata = {'message_id': '3577a84f-29ec-4904-9566-12c52289c2e8',
'timestamp': '2015-06-1909:19:35.786893'}
plugin.info(ctxt, publisher_id, event_type, payload, metadata)
message = {
'ctxt': {'user_id': 'fake_user_id',
'project_id': 'fake_project_id'},
'publisher_id': 'fake.publisher_id',
'event_type': 'fake.event',
'payload': {'foo': 'bar'},
'metadata': {'message_id': '3577a84f-29ec-4904-9566-12c52289c2e8',
'timestamp': '2015-06-1909:19:35.786893'}
}
plugin.info([message])
notification = {
'priority': 'info',
'event_type': 'fake.event',

View File

@ -142,18 +142,23 @@ class TestEventEndpoint(tests_base.BaseTestCase):
def test_message_to_event(self):
self._setup_endpoint(['test://'])
self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
self.endpoint.info([{'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'compute.instance.create.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
def test_bad_event_non_ack_and_requeue(self):
self._setup_endpoint(['test://'])
self.fake_publisher.publish_events.side_effect = Exception
self.CONF.set_override("ack_on_event_error", False,
group="notification")
ret = self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
ret = self.endpoint.info([{'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'compute.instance.create.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)
def test_message_to_event_bad_event(self):
@ -162,9 +167,13 @@ class TestEventEndpoint(tests_base.BaseTestCase):
self.CONF.set_override("ack_on_event_error", False,
group="notification")
message = {'event_type': "foo", 'message_id': "abc"}
message = {
'payload': {'event_type': "foo", 'message_id': "abc"},
'metadata': {},
'ctxt': {}
}
with mock.patch("ceilometer.pipeline.LOG") as mock_logger:
ret = self.endpoint.process_notification(message)
ret = self.endpoint.process_notification('info', [message])
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)
exception_mock = mock_logger.exception
self.assertIn('Exit after error from publisher',
@ -178,10 +187,13 @@ class TestEventEndpoint(tests_base.BaseTestCase):
self.CONF.set_override("ack_on_event_error", False,
group="notification")
message = {'event_type': "foo", 'message_id': "abc"}
message = {
'payload': {'event_type': "foo", 'message_id': "abc"},
'metadata': {},
'ctxt': {}
}
with mock.patch("ceilometer.pipeline.LOG") as mock_logger:
ret = self.endpoint.process_notification(message)
ret = self.endpoint.process_notification('info', [message])
self.assertEqual(oslo_messaging.NotificationResult.HANDLED, ret)
exception_mock = mock_logger.exception
self.assertIn('Continue after error from publisher',

View File

@ -404,6 +404,7 @@ class EventPipelineTestCase(base.BaseTestCase):
mock.Mock(), pipeline_manager.pipelines[0])
fake_publisher.publish_events.side_effect = Exception
ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise',
'a', [test_data], None)
ret = event_pipeline_endpoint.sample([
{'ctxt': {}, 'publisher_id': 'compute.vagrant-precise',
'event_type': 'a', 'payload': [test_data], 'metadata': {}}])
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)