collector: never allow to lose data

The current default allows to lose data very easily: if the dispatcher
fails to send data to the backend (e.g. Gnocchi is down), then the
dispatcher raises and the data are lost for ever. This is completely
unacceptable, and nobody should be able to configure Ceilometer in that
way.

This patch entirely remove that option, and switch the behavior to
something sane.

Change-Id: I45cb3da84eb2a785f46b3ec676c1a052ce999206
This commit is contained in:
Julien Danjou 2016-03-22 17:03:29 +01:00
parent 09aec58d7e
commit 40684dafae
3 changed files with 21 additions and 65 deletions

View File

@ -37,15 +37,6 @@ OPTS = [
cfg.PortOpt('udp_port',
default=4952,
help='Port to which the UDP socket is bound.'),
cfg.BoolOpt('requeue_sample_on_dispatcher_error',
default=False,
help='Requeue the sample on the collector sample queue '
'when the collector fails to dispatch it. This is only valid '
'if the sample come from the notifier publisher.'),
cfg.BoolOpt('requeue_event_on_dispatcher_error',
default=False,
help='Requeue the event on the collector event queue '
'when the collector fails to dispatch it.'),
cfg.IntOpt('batch_size',
default=1,
help='Number of notification messages to wait before '
@ -91,8 +82,7 @@ class CollectorService(os_service.Service):
messaging.get_batch_notification_listener(
transport, [sample_target],
[SampleEndpoint(self.meter_manager)],
allow_requeue=(cfg.CONF.collector.
requeue_sample_on_dispatcher_error),
allow_requeue=True,
batch_size=cfg.CONF.collector.batch_size,
batch_timeout=cfg.CONF.collector.batch_timeout))
self.sample_listener.start()
@ -104,8 +94,7 @@ class CollectorService(os_service.Service):
messaging.get_batch_notification_listener(
transport, [event_target],
[EventEndpoint(self.event_manager)],
allow_requeue=(cfg.CONF.collector.
requeue_event_on_dispatcher_error),
allow_requeue=True,
batch_size=cfg.CONF.collector.batch_size,
batch_timeout=cfg.CONF.collector.batch_timeout))
self.event_listener.start()
@ -158,9 +147,8 @@ class CollectorService(os_service.Service):
class CollectorEndpoint(object):
def __init__(self, dispatcher_manager, requeue_on_error):
def __init__(self, dispatcher_manager):
self.dispatcher_manager = dispatcher_manager
self.requeue_on_error = requeue_on_error
def sample(self, messages):
"""RPC endpoint for notification messages
@ -172,28 +160,16 @@ class CollectorEndpoint(object):
try:
self.dispatcher_manager.map_method(self.method, samples)
except Exception:
if self.requeue_on_error:
LOG.exception(_LE("Dispatcher failed to handle the %s, "
"requeue it."), self.ep_type)
return oslo_messaging.NotificationResult.REQUEUE
raise
LOG.exception(_LE("Dispatcher failed to handle the %s, "
"requeue it."), self.ep_type)
return oslo_messaging.NotificationResult.REQUEUE
class SampleEndpoint(CollectorEndpoint):
method = 'record_metering_data'
ep_type = 'sample'
def __init__(self, dispatcher_manager):
super(SampleEndpoint, self).__init__(
dispatcher_manager,
cfg.CONF.collector.requeue_sample_on_dispatcher_error)
class EventEndpoint(CollectorEndpoint):
method = 'record_events'
ep_type = 'event'
def __init__(self, dispatcher_manager):
super(EventEndpoint, self).__init__(
dispatcher_manager,
cfg.CONF.collector.requeue_event_on_dispatcher_error)

View File

@ -236,46 +236,11 @@ class TestCollector(tests_base.BaseTestCase):
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_sample_requeue(self):
self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
group='collector')
self._test_collector_requeue('sample_listener')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_event_requeue(self):
self.CONF.set_override('requeue_event_on_dispatcher_error', True,
group='collector')
self.CONF.set_override('store_events', True, group='notification')
self._test_collector_requeue('event_listener')
def _test_collector_no_requeue(self, listener):
mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
mock_dispatcher.record_metering_data.side_effect = (FakeException
('boom'))
mock_dispatcher.record_events.side_effect = (FakeException
('boom'))
self.srv.start()
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
self.assertRaises(FakeException, endp.sample, [
{'ctxt': {}, 'publisher_id': 'pub_id', 'event_type': 'event',
'payload': {}, 'metadata': {}}])
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_sample_no_requeue(self):
self.CONF.set_override('requeue_sample_on_dispatcher_error', False,
group='collector')
self._test_collector_no_requeue('sample_listener')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_event_no_requeue(self):
self.CONF.set_override('requeue_event_on_dispatcher_error', False,
group='collector')
self.CONF.set_override('store_events', True, group='notification')
self._test_collector_no_requeue('event_listener')

View File

@ -0,0 +1,15 @@
---
critical:
- >
The previous configuration options default for
`requeue_sample_on_dispatcher_error' and
`requeue_event_on_dispatcher_error' allowed to lose data very easily: if
the dispatcher failed to send data to the backend (e.g. Gnocchi is down),
then the dispatcher raised and the data were lost forever. This was
completely unacceptable, and nobody should be able to configure Ceilometer
in that way."
upgrade:
- >
The options `requeue_event_on_dispatcher_error' and
`requeue_sample_on_dispatcher_error' have been enabled and removed.