Handle more Nova and Neutron events

This introduces a simpler matching system for event types that supports
wildcard. We use this to grab all Nova instances notifications and a bit
more Neutron events.

We also change the get_event_types to a property to simplify the code a
bit.

Change-Id: Ica133ff2e9348a5d2640f068251d034bbd8a4f43
This commit is contained in:
Julien Danjou 2013-07-24 17:23:11 +02:00
parent d7fb018905
commit e49c949f39
11 changed files with 169 additions and 67 deletions

View File

@ -167,7 +167,7 @@ class CollectorService(rpc_service.Service):
handler = ext.obj
ack_on_error = cfg.CONF.collector.ack_on_event_error
LOG.debug('Event types from %s: %s (ack_on_error=%s)',
ext.name, ', '.join(handler.get_event_types()),
ext.name, ', '.join(handler.event_types),
ack_on_error)
for exchange_topic in handler.get_exchange_topics(cfg.CONF):
@ -247,12 +247,9 @@ class CollectorService(rpc_service.Service):
raise
def _process_notification_for_ext(self, ext, notification):
handler = ext.obj
if notification['event_type'] in handler.get_event_types():
ctxt = context.get_admin_context()
with self.pipeline_manager.publisher(ctxt) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(handler.process_notification(notification)))
with self.pipeline_manager.publisher(context.get_admin_context()) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(ext.obj.to_samples(notification)))
def collector():

View File

@ -51,9 +51,7 @@ class ComputeNotificationBase(plugin.NotificationBase):
class InstanceScheduled(ComputeNotificationBase):
@staticmethod
def get_event_types():
return ['scheduler.run_instance.scheduled']
event_types = ['scheduler.run_instance.scheduled']
def process_notification(self, message):
yield sample.Sample.from_notification(
@ -72,16 +70,7 @@ class InstanceScheduled(ComputeNotificationBase):
class ComputeInstanceNotificationBase(ComputeNotificationBase):
"""Convert compute.instance.* notifications into Counters
"""
@staticmethod
def get_event_types():
return ['compute.instance.create.start',
'compute.instance.create.end',
'compute.instance.exists',
'compute.instance.update',
'compute.instance.delete.start',
'compute.instance.delete.end',
'compute.instance.finish_resize.end',
'compute.instance.resize.revert.end']
event_types = ['compute.instance.*']
class Instance(ComputeInstanceNotificationBase):
@ -169,9 +158,7 @@ class InstanceDelete(ComputeInstanceNotificationBase):
when an instance is being deleted.
"""
@staticmethod
def get_event_types():
return ['compute.instance.delete.samples']
event_types = ['compute.instance.delete.samples']
def process_notification(self, message):
for s in message['payload'].get('samples', []):

View File

@ -51,13 +51,11 @@ class ImageBase(plugin.NotificationBase):
class ImageCRUDBase(ImageBase):
@staticmethod
def get_event_types():
return [
'image.update',
'image.upload',
'image.delete',
]
event_types = [
'image.update',
'image.upload',
'image.delete',
]
class ImageCRUD(ImageCRUDBase):
@ -101,11 +99,7 @@ class ImageSize(ImageCRUDBase):
class ImageDownload(ImageBase):
"""Emit image_download counter when an image is downloaded."""
@staticmethod
def get_event_types():
return [
'image.send',
]
event_types = ['image.send']
def process_notification(self, message):
yield sample.Sample.from_notification(
@ -121,11 +115,7 @@ class ImageDownload(ImageBase):
class ImageServe(ImageBase):
"""Emit image_serve counter when an image is served out."""
@staticmethod
def get_event_types():
return [
'image.send',
]
event_types = ['image.send']
def process_notification(self, message):
yield sample.Sample.from_notification(

View File

@ -42,10 +42,11 @@ class NetworkNotificationBase(plugin.NotificationBase):
resource_name = None
def get_event_types(self):
@property
def event_types(self):
return [
'%s.create.end' % (self.resource_name),
'%s.update.end' % (self.resource_name),
'%s.create.*' % (self.resource_name),
'%s.update.*' % (self.resource_name),
'%s.exists' % (self.resource_name),
# FIXME(dhellmann): Neutron delete notifications do
# not include the same metadata as the other messages,

View File

@ -57,12 +57,9 @@ def _load_pipeline_manager():
def _process_notification_for_ext(ext, context, notification):
handler = ext.obj
if notification['event_type'] in handler.get_event_types():
with _pipeline_manager.publisher(context) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(handler.process_notification(notification)))
with _pipeline_manager.publisher(context) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(ext.obj.to_samples(notification)))
def notify(context, message):

View File

@ -20,6 +20,7 @@
import abc
import collections
import fnmatch
from oslo.config import cfg
# Import this option so every Notification plugin can use it freely.
@ -48,8 +49,8 @@ class NotificationBase(PluginBase):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get_event_types(self):
@abc.abstractproperty
def event_types(self):
"""Return a sequence of strings defining the event types to be
given to this plugin.
"""
@ -69,6 +70,27 @@ class NotificationBase(PluginBase):
:param message: Message to process.
"""
@staticmethod
def _handle_event_type(event_type, event_type_to_handle):
"""Check whether event_type should be handled according to
event_type_to_handle.
"""
return any(map(lambda e: fnmatch.fnmatch(event_type, e),
event_type_to_handle))
def to_samples(self, notification):
"""Return samples produced by *process_notification* for the given
notification, if it's handled by this notification handler.
:param notification: The notification to process.
"""
if self._handle_event_type(notification['event_type'],
self.event_types):
return self.process_notification(notification)
return []
class PollsterBase(PluginBase):
"""Base class for plugins that support the polling API."""

View File

@ -38,6 +38,12 @@ cfg.CONF.register_opts(OPTS)
class _Base(plugin.NotificationBase):
"""Convert volume notifications into Counters."""
event_types = [
'volume.exists',
'volume.create.*',
'volume.delete.*',
]
@staticmethod
def get_exchange_topics(conf):
"""Return a sequence of ExchangeTopics defining the exchange and
@ -50,12 +56,6 @@ class _Base(plugin.NotificationBase):
for topic in conf.notification_topics)),
]
@staticmethod
def get_event_types():
return ['volume.exists',
'volume.create.end',
'volume.delete.start']
class Volume(_Base):
def process_notification(self, message):

View File

@ -85,12 +85,11 @@ Notifications
Notifications are defined as subclass of the
:class:`ceilometer.plugin.NotificationBase` meta class as defined in
the ``ceilometer/plugin.py`` file. Notifications must implement two
methods:
the ``ceilometer/plugin.py`` file. Notifications must implement:
``get_event_types(self)`` which should return a sequence of strings defining the event types to be given to the plugin and
``event_types`` which should be a sequence of strings defining the event types to be given to the plugin and
``process_notification(self, message)`` which receives an event message from the list provided to get_event_types and returns a sequence of Counter objects as defined in the ``ceilometer/counter.py`` file.
``process_notification(self, message)`` which receives an event message from the list provided to event_types and returns a sequence of Counter objects as defined in the ``ceilometer/counter.py`` file.
In the ``InstanceNotifications`` plugin, it listens to three events:

View File

@ -560,7 +560,7 @@ class TestNotifications(base.TestCase):
ic = notifications.InstanceScheduled()
self.assertIn(INSTANCE_SCHEDULED['event_type'],
ic.get_event_types())
ic.event_types)
counters = list(ic.process_notification(INSTANCE_SCHEDULED))
self.assertEqual(len(counters), 1)

View File

@ -249,21 +249,21 @@ class TestEventTypes(base.TestCase):
def test_network(self):
v = notifications.Network()
events = v.get_event_types()
events = v.event_types
assert events
def test_subnet(self):
v = notifications.Subnet()
events = v.get_event_types()
events = v.event_types
assert events
def test_port(self):
v = notifications.Port()
events = v.get_event_types()
events = v.event_types
assert events
def test_router(self):
assert notifications.Router().get_event_types()
self.assertTrue(notifications.Router().event_types)
def test_floatingip(self):
assert notifications.FloatingIP().get_event_types()
self.assertTrue(notifications.FloatingIP().event_types)

109
tests/test_plugin.py Normal file
View File

@ -0,0 +1,109 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 eNovance <licensing@enovance.com>
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer import plugin
from ceilometer.tests import base
TEST_NOTIFICATION = {
u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
u'_context_is_admin': True,
u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
u'_context_quota_class': None,
u'_context_read_deleted': u'no',
u'_context_remote_address': u'10.0.2.15',
u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
u'_context_roles': [u'admin'],
u'_context_timestamp': u'2012-05-08T20:23:41.425105',
u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'event_type': u'compute.instance.create.end',
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'payload': {u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
u'vcpus': 1,
u'root_gb': 0,
u'ephemeral_gb': 0,
u'host': u'compute-host-name',
u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
u'os_type': u'linux?',
u'architecture': u'x86',
u'image_ref': u'UUID',
u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
},
u'priority': u'INFO',
u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
class NotificationBaseTestCase(base.TestCase):
def test_handle_event_type(self):
self.assertFalse(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['compute']))
self.assertFalse(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['compute.*.foobar']))
self.assertFalse(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['compute.*.*.foobar']))
self.assertTrue(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['compute.*']))
self.assertTrue(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['*']))
self.assertTrue(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['compute.*.start']))
self.assertTrue(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['*.start']))
self.assertTrue(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['compute.*.*.foobar', 'compute.*']))
class FakePlugin(plugin.NotificationBase):
def get_exchange_topics(self, conf):
return
def process_notification(self, message):
return message
class FakeComputePlugin(FakePlugin):
event_types = ['compute.*']
class FakeNetworkPlugin(FakePlugin):
event_types = ['network.*']
def test_to_samples(self):
c = self.FakeComputePlugin()
n = self.FakeNetworkPlugin()
self.assertTrue(len(list(c.to_samples(TEST_NOTIFICATION))) > 0)
self.assertEqual(len(list(n.to_samples(TEST_NOTIFICATION))), 0)