make gnocchi event dispatcher work
The gnocchi event dispatcher have been broken by two major changes because of no tests coverage (requests->gnocchiclient and panko refactoring) To fix that: * the event dispatcher class derive the base event dispatcher class. * Use correct gnocchi client method to search resources. * Use correct gnocchi client method to update resources. * Since record_event received a raw notification now and no more the event, we change the parsing codetoo. * And obviously add a test ! Change-Id: I94b7393d79e495c5101225b4097c7073978ca7d7 Co-Authored-By: Mehdi Abaakouk <sileht@redhat.com> Closes-bug: 1627928
This commit is contained in:
parent
67bbd3f833
commit
a417a8754c
|
@ -25,6 +25,7 @@ from gnocchiclient import utils as gnocchi_utils
|
|||
from keystoneauth1 import exceptions as ka_exceptions
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import fnmatch
|
||||
from oslo_utils import timeutils
|
||||
import retrying
|
||||
|
@ -95,6 +96,11 @@ class ResourcesDefinition(object):
|
|||
self._attributes[name] = declarative.Definition(name, attr_cfg,
|
||||
plugin_manager)
|
||||
|
||||
self._event_attributes = {}
|
||||
for name, attr_cfg in self.cfg.get('event_attributes', {}).items():
|
||||
self._event_attributes[name] = declarative.Definition(
|
||||
name, attr_cfg, plugin_manager)
|
||||
|
||||
self.metrics = {}
|
||||
for t in self.cfg['metrics']:
|
||||
archive_policy = self.cfg.get('archive_policy',
|
||||
|
@ -143,12 +149,11 @@ class ResourcesDefinition(object):
|
|||
return attrs
|
||||
|
||||
def event_attributes(self, event):
|
||||
attrs = {}
|
||||
traits = dict([(trait[0], trait[2]) for trait in event['traits']])
|
||||
for attr, field in self.cfg.get('event_attributes', {}).items():
|
||||
value = traits.get(field)
|
||||
attrs = {'type': self.cfg['resource_type']}
|
||||
for name, definition in self._event_attributes.items():
|
||||
value = definition.parse(event)
|
||||
if value is not None:
|
||||
attrs[attr] = value
|
||||
attrs[name] = value
|
||||
return attrs
|
||||
|
||||
|
||||
|
@ -176,7 +181,8 @@ class LockedDefaultDict(defaultdict):
|
|||
key_lock.release()
|
||||
|
||||
|
||||
class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
||||
class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
|
||||
dispatcher.EventDispatcherBase):
|
||||
"""Dispatcher class for recording metering data into database.
|
||||
|
||||
The dispatcher class records each meter into the gnocchi service
|
||||
|
@ -192,6 +198,7 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
|||
|
||||
[DEFAULT]
|
||||
meter_dispatchers = gnocchi
|
||||
event_dispatchers = gnocchi
|
||||
"""
|
||||
def __init__(self, conf):
|
||||
super(GnocchiDispatcher, self).__init__(conf)
|
||||
|
@ -505,25 +512,42 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
|||
continue
|
||||
|
||||
rd, operation = rd
|
||||
resource_type = rd.cfg['resource_type']
|
||||
resource = rd.event_attributes(event)
|
||||
|
||||
if operation == EVENT_DELETE:
|
||||
ended_at = timeutils.utcnow().isoformat()
|
||||
resources_to_end = [resource]
|
||||
extra_resources = cfg['event_associated_resources'].items()
|
||||
for resource_type, filters in extra_resources:
|
||||
resources_to_end.extend(self._gnocchi.search_resource(
|
||||
resource_type, filters['query'] % resource['id']))
|
||||
for resource in resources_to_end:
|
||||
try:
|
||||
self._gnocchi.update_resource(resource_type,
|
||||
resource['id'],
|
||||
{'ended_at': ended_at})
|
||||
except gnocchi_exc.NoSuchResource:
|
||||
LOG.debug(_("Delete event received on unexiting "
|
||||
"resource (%s), ignore it.") %
|
||||
resource['id'])
|
||||
except Exception:
|
||||
LOG.error(_LE("Fail to update the resource %s") %
|
||||
resource, exc_info=True)
|
||||
self._delete_event(rd, event)
|
||||
|
||||
def _delete_event(self, rd, event):
|
||||
ended_at = timeutils.utcnow().isoformat()
|
||||
|
||||
resource = rd.event_attributes(event)
|
||||
associated_resources = rd.cfg['event_associated_resources']
|
||||
|
||||
to_end = itertools.chain([resource], *[
|
||||
self._search_resource(resource_type, query % resource['id'])
|
||||
for resource_type, query in associated_resources.items()
|
||||
])
|
||||
|
||||
for resource in to_end:
|
||||
self._set_ended_at(resource, ended_at)
|
||||
|
||||
def _search_resource(self, resource_type, query):
|
||||
try:
|
||||
return self._gnocchi.resource.search(
|
||||
resource_type, jsonutils.loads(query))
|
||||
except Exception:
|
||||
LOG.error(_LE("Fail to search resource type %{resource_type}s "
|
||||
"with '%{query}s'"),
|
||||
{'resource_type': resource_type, 'query': query},
|
||||
exc_info=True)
|
||||
return []
|
||||
|
||||
def _set_ended_at(self, resource, ended_at):
|
||||
try:
|
||||
self._gnocchi.resource.update(resource['type'], resource['id'],
|
||||
{'ended_at': ended_at})
|
||||
except gnocchi_exc.ResourceNotFound:
|
||||
LOG.debug("Delete event received on unexisting resource (%s), "
|
||||
"ignore it.", resource['id'])
|
||||
except Exception:
|
||||
LOG.error(_LE("Fail to update the resource %s"), resource,
|
||||
exc_info=True)
|
||||
LOG.debug('Resource %s ended at %s' % (resource["id"], ended_at))
|
||||
|
|
|
@ -22,6 +22,8 @@ from keystoneauth1 import exceptions as ka_exceptions
|
|||
import mock
|
||||
from oslo_config import fixture as config_fixture
|
||||
from oslo_utils import fileutils
|
||||
from oslo_utils import fixture as utils_fixture
|
||||
from oslo_utils import timeutils
|
||||
from oslotest import mockpatch
|
||||
import requests
|
||||
import six
|
||||
|
@ -35,6 +37,50 @@ from ceilometer.tests import base
|
|||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
INSTANCE_DELETE_START = {
|
||||
u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
|
||||
u'_context_is_admin': True,
|
||||
u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
|
||||
u'_context_quota_class': None,
|
||||
u'_context_read_deleted': u'no',
|
||||
u'_context_remote_address': u'10.0.2.15',
|
||||
u'_context_request_id': u'req-fb3c4546-a2e5-49b7-9fd2-a63bd658bc39',
|
||||
u'_context_roles': [u'admin'],
|
||||
u'_context_timestamp': u'2012-05-08T20:24:14.547374',
|
||||
u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
|
||||
u'event_type': u'compute.instance.delete.start',
|
||||
u'message_id': u'a15b94ee-cb8e-4c71-9abe-14aa80055fb4',
|
||||
u'payload': {u'created_at': u'2012-05-08 20:23:41',
|
||||
u'deleted_at': u'',
|
||||
u'disk_gb': 0,
|
||||
u'display_name': u'testme',
|
||||
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
|
||||
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
|
||||
u'instance_type': u'm1.tiny',
|
||||
u'instance_type_id': 2,
|
||||
u'launched_at': u'2012-05-08 20:23:47',
|
||||
u'memory_mb': 512,
|
||||
u'state': u'active',
|
||||
u'state_description': u'deleting',
|
||||
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
|
||||
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
|
||||
u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
|
||||
u'vcpus': 1,
|
||||
u'root_gb': 0,
|
||||
u'ephemeral_gb': 0,
|
||||
u'host': u'compute-host-name',
|
||||
u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
|
||||
u'os_type': u'linux?',
|
||||
u'architecture': u'x86',
|
||||
u'image_ref': u'UUID',
|
||||
u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
|
||||
u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
|
||||
},
|
||||
u'priority': u'INFO',
|
||||
u'publisher_id': u'compute.vagrant-precise',
|
||||
u'timestamp': u'2012-05-08 20:24:14.824743',
|
||||
}
|
||||
|
||||
|
||||
@mock.patch('gnocchiclient.v1.client.Client', mock.Mock())
|
||||
class DispatcherTest(base.BaseTestCase):
|
||||
|
@ -359,6 +405,51 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||
self.sample['message_signature'] = utils.compute_signature(
|
||||
self.sample, self.conf.conf.publisher.telemetry_secret)
|
||||
|
||||
@mock.patch('gnocchiclient.v1.client.Client')
|
||||
def test_event_workflow(self, fakeclient_cls):
|
||||
self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf)
|
||||
|
||||
fakeclient = fakeclient_cls.return_value
|
||||
|
||||
fakeclient.resource.search.side_effect = [
|
||||
[{"id": "b26268d6-8bb5-11e6-baff-00224d8226cd",
|
||||
"type": "instance_disk",
|
||||
"instance_id": "9f9d01b9-4a58-4271-9e27-398b21ab20d1"}],
|
||||
[{"id": "b1c7544a-8bb5-11e6-850e-00224d8226cd",
|
||||
"type": "instance_network_interface",
|
||||
"instance_id": "9f9d01b9-4a58-4271-9e27-398b21ab20d1"}],
|
||||
]
|
||||
|
||||
search_params = {
|
||||
'=': {'instance_id': '9f9d01b9-4a58-4271-9e27-398b21ab20d1'}
|
||||
}
|
||||
|
||||
now = timeutils.utcnow()
|
||||
self.useFixture(utils_fixture.TimeFixture(now))
|
||||
|
||||
expected_calls = [
|
||||
mock.call.capabilities.list(),
|
||||
mock.call.resource.search('instance_disk', search_params),
|
||||
mock.call.resource.search('instance_network_interface',
|
||||
search_params),
|
||||
mock.call.resource.update(
|
||||
'instance', '9f9d01b9-4a58-4271-9e27-398b21ab20d1',
|
||||
{'ended_at': now.isoformat()}),
|
||||
mock.call.resource.update(
|
||||
'instance_disk',
|
||||
'b26268d6-8bb5-11e6-baff-00224d8226cd',
|
||||
{'ended_at': now.isoformat()}),
|
||||
mock.call.resource.update(
|
||||
'instance_network_interface',
|
||||
'b1c7544a-8bb5-11e6-850e-00224d8226cd',
|
||||
{'ended_at': now.isoformat()})
|
||||
]
|
||||
|
||||
self.dispatcher.record_events([INSTANCE_DELETE_START])
|
||||
self.assertEqual(6, len(fakeclient.mock_calls))
|
||||
for call in expected_calls:
|
||||
self.assertIn(call, fakeclient.mock_calls)
|
||||
|
||||
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
|
||||
@mock.patch('gnocchiclient.v1.client.Client')
|
||||
def test_workflow(self, fakeclient_cls, logger):
|
||||
|
|
|
@ -66,14 +66,10 @@ resources:
|
|||
server_group: resource_metadata.user_metadata.server_group
|
||||
event_delete: compute.instance.delete.start
|
||||
event_attributes:
|
||||
id: instance_id
|
||||
id: payload.instance_id
|
||||
event_associated_resources:
|
||||
instance_network_interface:
|
||||
query: '{"=": {"instance_id": "%s"}}'
|
||||
fields: [id]
|
||||
instance_disk:
|
||||
query: '{"=": {"instance_id": "%s"}}'
|
||||
fields: [id]
|
||||
instance_network_interface: '{"=": {"instance_id": "%s"}}'
|
||||
instance_disk: '{"=": {"instance_id": "%s"}}'
|
||||
|
||||
- resource_type: instance_network_interface
|
||||
metrics:
|
||||
|
@ -120,7 +116,7 @@ resources:
|
|||
disk_format: resource_metadata.disk_format
|
||||
event_delete: image.delete
|
||||
event_attributes:
|
||||
id: resource_id
|
||||
id: payload.resource_id
|
||||
|
||||
- resource_type: ipmi
|
||||
metrics:
|
||||
|
|
Loading…
Reference in New Issue