Merge "improve notification processing"

This commit is contained in:
Jenkins 2016-09-15 21:50:54 +00:00 committed by Gerrit Code Review
commit 8888abf3f2
5 changed files with 39 additions and 8 deletions

View File

@ -69,10 +69,12 @@ OPTS = [
"notifications go to rabbit-nova:5672, while all "
"cinder notifications go to rabbit-cinder:5672."),
cfg.IntOpt('batch_size',
default=1,
default=100, min=1,
help='Number of notification messages to wait before '
'publishing them'),
'publishing them. Batching is advised when transformations are'
'applied in pipeline.'),
cfg.IntOpt('batch_timeout',
default=5,
help='Number of seconds to wait before publishing samples'
'when batch_size is not reached (None means indefinitely)'),
]
@ -250,10 +252,10 @@ class NotificationService(service_base.PipelineBasedService):
urls = cfg.CONF.notification.messaging_urls or [None]
for url in urls:
transport = messaging.get_transport(url)
# NOTE(gordc): ignore batching as we want pull
# to maintain sequencing as much as possible.
listener = messaging.get_batch_notification_listener(
transport, targets, endpoints,
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
transport, targets, endpoints)
listener.start()
self.listeners.append(listener)
@ -299,7 +301,10 @@ class NotificationService(service_base.PipelineBasedService):
endpoints,
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
self.pipeline_listener.start()
# NOTE(gordc): set single thread to process data sequentially
# if batching enabled.
batch = (1 if cfg.CONF.notification.batch_size > 1 else None)
self.pipeline_listener.start(override_pool_size=batch)
def terminate(self):
self.shutdown = True

View File

@ -17,6 +17,7 @@
import abc
import hashlib
from itertools import chain
from operator import methodcaller
import os
from oslo_config import cfg
@ -105,7 +106,7 @@ class SamplePipelineEndpoint(PipelineEndpoint):
s, cfg.CONF.publisher.telemetry_secret)
]
with self.publish_context as p:
p(samples)
p(sorted(samples, key=methodcaller('get_iso_timestamp')))
class EventPipelineEndpoint(PipelineEndpoint):

View File

@ -24,7 +24,7 @@ import copy
import uuid
from oslo_config import cfg
from oslo_utils import timeutils
OPTS = [
cfg.StrOpt('sample_source',
@ -99,6 +99,10 @@ class Sample(object):
def set_timestamp(self, timestamp):
self.timestamp = timestamp
def get_iso_timestamp(self):
return timeutils.parse_isotime(self.timestamp)
TYPE_GAUGE = 'gauge'
TYPE_DELTA = 'delta'
TYPE_CUMULATIVE = 'cumulative'

View File

@ -367,6 +367,16 @@ class TestRealNotificationHA(BaseRealNotification):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
def test_notification_threads(self, m_listener):
self.CONF.set_override('batch_size', 1, group='notification')
self.srv.run()
m_listener.assert_called_with(override_pool_size=None)
m_listener.reset_mock()
self.CONF.set_override('batch_size', 2, group='notification')
self.srv.run()
m_listener.assert_called_with(override_pool_size=1)
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_reset_listener_on_refresh(self, mock_listener):
mock_listener.side_effect = [

View File

@ -0,0 +1,11 @@
---
upgrade:
- Batching is enabled by default now when coordinated workers are enabled.
Depending on load, it is recommended to scale out the number of
`pipeline_processing_queues` to improve distribution. `batch_size` should
also be configured accordingly.
fixes:
- Fix to improve handling messages in environments heavily backed up.
Previously, notification handlers greedily grabbed messages from queues
which could cause ordering issues. A fix was applied to sequentially
process messages in a single thread to prevent ordering issues.