pollster/api now publish to sample queue
Pollsters and Post Sample implementation will publish to the notifications.sample queue with this change. Change-Id: Iccb9261ee805d41ed057bd0ac72306cd6a2ddaff Closes-Bug: #1484695 Closes-Bug: #1489341
This commit is contained in:
parent
0c279d885b
commit
4e8e3c0feb
|
@ -210,7 +210,7 @@ class PollingTask(object):
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
|
|
||||||
def _send_notification(self, samples):
|
def _send_notification(self, samples):
|
||||||
self.manager.notifier.info(
|
self.manager.notifier.sample(
|
||||||
self.manager.context.to_dict(),
|
self.manager.context.to_dict(),
|
||||||
'telemetry.polling',
|
'telemetry.polling',
|
||||||
{'samples': samples}
|
{'samples': samples}
|
||||||
|
|
|
@ -123,7 +123,7 @@ class NotificationBase(PluginBase):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||||
"""RPC endpoint for notification messages
|
"""RPC endpoint for notification messages at info level
|
||||||
|
|
||||||
When another service sends a notification over the message
|
When another service sends a notification over the message
|
||||||
bus, this method receives it.
|
bus, this method receives it.
|
||||||
|
@ -139,6 +139,23 @@ class NotificationBase(PluginBase):
|
||||||
'info', ctxt, publisher_id, event_type, payload, metadata)
|
'info', ctxt, publisher_id, event_type, payload, metadata)
|
||||||
self.to_samples_and_publish(context.get_admin_context(), notification)
|
self.to_samples_and_publish(context.get_admin_context(), notification)
|
||||||
|
|
||||||
|
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||||
|
"""RPC endpoint for notification messages at sample level
|
||||||
|
|
||||||
|
When another service sends a notification over the message
|
||||||
|
bus at sample priority, this method receives it.
|
||||||
|
|
||||||
|
:param ctxt: oslo.messaging context
|
||||||
|
:param publisher_id: publisher of the notification
|
||||||
|
:param event_type: type of notification
|
||||||
|
:param payload: notification payload
|
||||||
|
:param metadata: metadata about the notification
|
||||||
|
|
||||||
|
"""
|
||||||
|
notification = messaging.convert_to_old_notification_format(
|
||||||
|
'sample', ctxt, publisher_id, event_type, payload, metadata)
|
||||||
|
self.to_samples_and_publish(context.get_admin_context(), notification)
|
||||||
|
|
||||||
def to_samples_and_publish(self, context, notification):
|
def to_samples_and_publish(self, context, notification):
|
||||||
"""Return samples produced by *process_notification*.
|
"""Return samples produced by *process_notification*.
|
||||||
|
|
||||||
|
|
|
@ -367,8 +367,8 @@ class MeterController(rest.RestController):
|
||||||
tenant=def_project_id,
|
tenant=def_project_id,
|
||||||
is_admin=True)
|
is_admin=True)
|
||||||
notifier = pecan.request.notifier
|
notifier = pecan.request.notifier
|
||||||
notifier.info(ctxt.to_dict(), 'telemetry.api',
|
notifier.sample(ctxt.to_dict(), 'telemetry.api',
|
||||||
{'samples': published_samples})
|
{'samples': published_samples})
|
||||||
|
|
||||||
return samples
|
return samples
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ class EventsNotificationEndpoint(object):
|
||||||
self.manager = manager
|
self.manager = manager
|
||||||
|
|
||||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||||
"""Convert message to Ceilometer Event.
|
"""Convert message at info level to Ceilometer Event.
|
||||||
|
|
||||||
:param ctxt: oslo_messaging context
|
:param ctxt: oslo_messaging context
|
||||||
:param publisher_id: publisher of the notification
|
:param publisher_id: publisher of the notification
|
||||||
|
|
|
@ -42,8 +42,11 @@ class TelemetryBase(plugin_base.NotificationBase):
|
||||||
for topic in conf.notification_topics]
|
for topic in conf.notification_topics]
|
||||||
|
|
||||||
|
|
||||||
class TelemetryApiPost(TelemetryBase):
|
class TelemetryIpc(TelemetryBase):
|
||||||
"""Handle sample from notification bus, which is posted via API."""
|
"""Handle sample from notification bus
|
||||||
|
|
||||||
|
Telemetry samples can be posted via API or polled by Polling agent.
|
||||||
|
"""
|
||||||
|
|
||||||
event_types = ['telemetry.api', 'telemetry.polling']
|
event_types = ['telemetry.api', 'telemetry.polling']
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ class TestPostSamples(v2.FunctionalTest,
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.published = []
|
self.published = []
|
||||||
notifier = mock.Mock()
|
notifier = mock.Mock()
|
||||||
notifier.info.side_effect = self.fake_notifier_sample
|
notifier.sample.side_effect = self.fake_notifier_sample
|
||||||
self.useFixture(mockpatch.Patch('oslo_messaging.Notifier',
|
self.useFixture(mockpatch.Patch('oslo_messaging.Notifier',
|
||||||
return_value=notifier))
|
return_value=notifier))
|
||||||
super(TestPostSamples, self).setUp()
|
super(TestPostSamples, self).setUp()
|
||||||
|
|
|
@ -246,7 +246,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.notified_samples = []
|
self.notified_samples = []
|
||||||
self.notifier = mock.Mock()
|
self.notifier = mock.Mock()
|
||||||
self.notifier.info.side_effect = self.fake_notifier_sample
|
self.notifier.sample.side_effect = self.fake_notifier_sample
|
||||||
self.useFixture(mockpatch.Patch('oslo_messaging.Notifier',
|
self.useFixture(mockpatch.Patch('oslo_messaging.Notifier',
|
||||||
return_value=self.notifier))
|
return_value=self.notifier))
|
||||||
self.source_resources = True
|
self.source_resources = True
|
||||||
|
@ -418,7 +418,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
||||||
|
|
||||||
samples = self.notified_samples
|
samples = self.notified_samples
|
||||||
self.assertEqual(expected_samples, len(samples))
|
self.assertEqual(expected_samples, len(samples))
|
||||||
self.assertEqual(call_count, self.notifier.info.call_count)
|
self.assertEqual(call_count, self.notifier.sample.call_count)
|
||||||
|
|
||||||
def test_start_with_reloadable_pipeline(self):
|
def test_start_with_reloadable_pipeline(self):
|
||||||
|
|
||||||
|
|
|
@ -60,10 +60,10 @@ NOTIFICATION = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class TelemetryApiPostTestCase(base.BaseTestCase):
|
class TelemetryIpcTestCase(base.BaseTestCase):
|
||||||
|
|
||||||
def test_process_notification(self):
|
def test_process_notification(self):
|
||||||
sample_creation = notifications.TelemetryApiPost(None)
|
sample_creation = notifications.TelemetryIpc(None)
|
||||||
samples = list(sample_creation.process_notification(NOTIFICATION))
|
samples = list(sample_creation.process_notification(NOTIFICATION))
|
||||||
self.assertEqual(2, len(samples))
|
self.assertEqual(2, len(samples))
|
||||||
payload = NOTIFICATION["payload"]['samples']
|
payload = NOTIFICATION["payload"]['samples']
|
||||||
|
|
|
@ -52,7 +52,7 @@ ceilometer.notification =
|
||||||
network.services.vpn.ipsecpolicy = ceilometer.network.notifications:IPSecPolicy
|
network.services.vpn.ipsecpolicy = ceilometer.network.notifications:IPSecPolicy
|
||||||
network.services.vpn.ikepolicy = ceilometer.network.notifications:IKEPolicy
|
network.services.vpn.ikepolicy = ceilometer.network.notifications:IKEPolicy
|
||||||
network.services.vpn.connections = ceilometer.network.notifications:IPSecSiteConnection
|
network.services.vpn.connections = ceilometer.network.notifications:IPSecSiteConnection
|
||||||
_sample = ceilometer.telemetry.notifications:TelemetryApiPost
|
_sample = ceilometer.telemetry.notifications:TelemetryIpc
|
||||||
trove.instance.exists = ceilometer.database.notifications:InstanceExists
|
trove.instance.exists = ceilometer.database.notifications:InstanceExists
|
||||||
dns.domain.exists = ceilometer.dns.notifications:DomainExists
|
dns.domain.exists = ceilometer.dns.notifications:DomainExists
|
||||||
meter = ceilometer.meter.notifications:ProcessMeterNotifications
|
meter = ceilometer.meter.notifications:ProcessMeterNotifications
|
||||||
|
|
Loading…
Reference in New Issue