Merge "notification: remove workload partitioning"

This commit is contained in:
Zuul 2018-09-04 20:41:31 +00:00 committed by Gerrit Code Review
commit cb8aee3945
14 changed files with 24 additions and 689 deletions

View File

@ -1,5 +1,5 @@
#
# Copyright 2017 Red Hat, Inc.
# Copyright 2017-2018 Red Hat, Inc.
# Copyright 2012-2013 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,45 +13,25 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import threading
import time
import uuid
from concurrent import futures
import cotyledon
from futurist import periodics
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from stevedore import named
from tooz import coordination
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer import utils
LOG = log.getLogger(__name__)
OPTS = [
cfg.IntOpt('pipeline_processing_queues',
deprecated_for_removal=True,
default=10,
min=1,
help='Number of queues to parallelize workload across. This '
'value should be larger than the number of active '
'notification agents for optimal results. WARNING: '
'Once set, lowering this value may result in lost data.'),
cfg.BoolOpt('ack_on_event_error',
default=True,
help='Acknowledge message when event persistence fails.'),
cfg.BoolOpt('workload_partitioning',
deprecated_for_removal=True,
default=False,
help='Enable workload partitioning, allowing multiple '
'notification agents to be run simultaneously.'),
cfg.MultiStrOpt('messaging_urls',
default=[],
secret=True,
@ -68,10 +48,6 @@ OPTS = [
help='Number of notification messages to wait before '
'publishing them. Batching is advised when transformations are '
'applied in pipeline.'),
cfg.IntOpt('batch_timeout',
default=5,
help='Number of seconds to wait before publishing samples '
'when batch_size is not reached (None means indefinitely)'),
cfg.IntOpt('workers',
default=1,
min=1,
@ -114,25 +90,11 @@ class NotificationService(cotyledon.Service):
self.startup_delay = worker_id
self.conf = conf
self.periodic = None
self.shutdown = False
self.listeners = []
# NOTE(kbespalov): for the pipeline queues used a single amqp host
# hence only one listener is required
self.pipeline_listener = None
if self.conf.notification.workload_partitioning:
# XXX uuid4().bytes ought to work, but it requires ascii for now
coordination_id = (coordination_id or
str(uuid.uuid4()).encode('ascii'))
self.partition_coordinator = coordination.get_coordinator(
self.conf.coordination.backend_url, coordination_id)
self.partition_set = list(range(
self.conf.notification.pipeline_processing_queues))
self.group_state = None
else:
self.partition_coordinator = None
def get_targets(self):
"""Return a sequence of oslo_messaging.Target
@ -154,49 +116,22 @@ class NotificationService(cotyledon.Service):
time.sleep(self.startup_delay)
super(NotificationService, self).run()
self.coord_lock = threading.Lock()
self.managers = [ext.obj for ext in named.NamedExtensionManager(
namespace='ceilometer.notification.pipeline',
names=self.conf.notification.pipelines, invoke_on_load=True,
on_missing_entrypoints_callback=self._log_missing_pipeline,
invoke_args=(self.conf,
self.conf.notification.workload_partitioning))]
invoke_args=(self.conf,))]
self.transport = messaging.get_transport(self.conf)
if self.conf.notification.workload_partitioning:
self.partition_coordinator.start(start_heart=True)
else:
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
# not a ceilometer. Until we have something to get the
# notification_topics in another way, we must create a transport
# to ensure the option has been registered by oslo_messaging.
messaging.get_notifier(self.transport, '')
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
# not a ceilometer. Until we have something to get the
# notification_topics in another way, we must create a transport
# to ensure the option has been registered by oslo_messaging.
messaging.get_notifier(self.transport, '')
self._configure_main_queue_listeners()
if self.conf.notification.workload_partitioning:
# join group after all manager set up is configured
self.hashring = self.partition_coordinator.join_partitioned_group(
self.NOTIFICATION_NAMESPACE)
@periodics.periodic(spacing=self.conf.coordination.check_watchers,
run_immediately=True)
def run_watchers():
self.partition_coordinator.run_watchers()
if self.group_state != self.hashring.ring.nodes:
self.group_state = self.hashring.ring.nodes.copy()
self._refresh_agent()
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=lambda:
futures.ThreadPoolExecutor(max_workers=10))
self.periodic.add(run_watchers)
utils.spawn_thread(self.periodic.start)
def _configure_main_queue_listeners(self):
endpoints = []
for pipe_mgr in self.managers:
endpoints.extend(pipe_mgr.get_main_endpoints())
@ -214,41 +149,6 @@ class NotificationService(cotyledon.Service):
)
self.listeners.append(listener)
def _refresh_agent(self):
with self.coord_lock:
if self.shutdown:
# NOTE(sileht): We are going to shutdown we everything will be
# stopped, we should not restart them
return
self._configure_pipeline_listener()
def _configure_pipeline_listener(self):
partitioned = list(filter(
self.hashring.belongs_to_self, self.partition_set))
endpoints = []
for pipe_mgr in self.managers:
endpoints.extend(pipe_mgr.get_interim_endpoints())
targets = []
for mgr, hash_id in itertools.product(self.managers, partitioned):
topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)])
LOG.debug('Listening to queue: %s', topic)
targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:
self.kill_listeners([self.pipeline_listener])
self.pipeline_listener = messaging.get_batch_notification_listener(
self.transport, targets, endpoints, allow_requeue=True,
batch_size=self.conf.notification.batch_size,
batch_timeout=self.conf.notification.batch_timeout)
# NOTE(gordc): set single thread to process data sequentially
# if batching enabled.
batch = (1 if self.conf.notification.batch_size > 1
else self.conf.max_parallel_requests)
self.pipeline_listener.start(override_pool_size=batch)
@staticmethod
def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
@ -259,15 +159,8 @@ class NotificationService(cotyledon.Service):
listener.wait()
def terminate(self):
self.shutdown = True
if self.periodic:
self.periodic.stop()
self.periodic.wait()
if self.partition_coordinator:
self.partition_coordinator.stop()
with self.coord_lock:
if self.pipeline_listener:
self.kill_listeners([self.pipeline_listener])
self.kill_listeners(self.listeners)
if self.pipeline_listener:
self.kill_listeners([self.pipeline_listener])
self.kill_listeners(self.listeners)
super(NotificationService, self).terminate()

View File

@ -22,7 +22,6 @@ import oslo_messaging
import six
from ceilometer import agent
from ceilometer import messaging
from ceilometer import publisher
OPTS = [
@ -45,52 +44,6 @@ class PipelineException(agent.ConfigException):
super(PipelineException, self).__init__('Pipeline', message, cfg)
class InterimPublishContext(object):
"""Publisher to hash/shard data to pipelines"""
def __init__(self, conf, mgr):
self.conf = conf
self.mgr = mgr
self.notifiers = self._get_notifiers(messaging.get_transport(conf))
def _get_notifiers(self, transport):
notifiers = []
for x in range(self.conf.notification.pipeline_processing_queues):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=self.conf.publisher_notifier.telemetry_driver,
topics=['-'.join(
[self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])]))
return notifiers
@staticmethod
def hash_grouping(datapoint, grouping_keys):
# FIXME(gordc): this logic only supports a single grouping_key. we
# need to change to support pipeline with multiple transformers and
# different grouping_keys
value = ''
for key in grouping_keys or []:
value += datapoint.get(key) if datapoint.get(key) else ''
return hash(value)
def __enter__(self):
def p(data):
data = [data] if not isinstance(data, list) else data
for datapoint in data:
for pipe in self.mgr.pipelines:
if pipe.supported(datapoint):
serialized_data = pipe.serializer(datapoint)
key = (self.hash_grouping(serialized_data,
pipe.get_grouping_key())
% len(self.notifiers))
self.notifiers[key].sample({}, event_type=pipe.name,
payload=[serialized_data])
return p
def __exit__(self, exc_type, exc_value, traceback):
pass
class PublishContext(object):
def __init__(self, pipelines):
self.pipelines = pipelines or []
@ -239,24 +192,10 @@ class Pipeline(object):
def publish_data(self, data):
"""Publish data from pipeline."""
@abc.abstractproperty
def default_grouping_key(self):
"""Attribute to hash data on. Pass if no partitioning."""
@abc.abstractmethod
def supported(self, data):
"""Attribute to filter on. Pass if no partitioning."""
@abc.abstractmethod
def serializer(self, data):
"""Serialize data for interim transport. Pass if no partitioning."""
def get_grouping_key(self):
keys = []
for transformer in self.sink.transformers:
keys += transformer.grouping_keys
return list(set(keys)) or self.default_grouping_key
class PublisherManager(object):
def __init__(self, conf, purpose):
@ -281,7 +220,7 @@ class PipelineManager(agent.ConfigManagerBase):
NOTIFICATION_IPC = 'ceilometer_ipc'
def __init__(self, conf, cfg_file, transformer_manager, partition):
def __init__(self, conf, cfg_file, transformer_manager):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@ -381,7 +320,6 @@ class PipelineManager(agent.ConfigManagerBase):
unique_names.add(pipe.name)
self.pipelines.append(pipe)
unique_names.clear()
self.partition = partition
@abc.abstractproperty
def pm_type(self):
@ -403,23 +341,10 @@ class PipelineManager(agent.ConfigManagerBase):
"""Build publisher for pipeline publishing."""
return PublishContext(self.pipelines)
def interim_publisher(self):
"""Build publishing context for IPC."""
return InterimPublishContext(self.conf, self)
def get_main_publisher(self):
"""Return the publishing context to use"""
return (self.interim_publisher() if self.partition else
self.publisher())
def get_main_endpoints(self):
"""Return endpoints for main queue."""
pass
def get_interim_endpoints(self):
"""Return endpoints for interim pipeline queues."""
pass
class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API."""

View File

@ -11,18 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from itertools import chain
from oslo_log import log
import oslo_messaging
from oslo_utils import timeutils
from stevedore import extension
from ceilometer import agent
from ceilometer.event import converter
from ceilometer.event import models
from ceilometer.pipeline import base
from ceilometer.publisher import utils as publisher_utils
LOG = log.getLogger(__name__)
@ -67,39 +62,6 @@ class EventEndpoint(base.MainNotificationEndpoint):
return oslo_messaging.NotificationResult.HANDLED
class InterimEventEndpoint(base.NotificationEndpoint):
def __init__(self, conf, publisher, pipe_name):
self.event_types = [pipe_name]
super(InterimEventEndpoint, self).__init__(conf, publisher)
def sample(self, notifications):
return self.process_notifications('sample', notifications)
def process_notifications(self, priority, notifications):
events = chain.from_iterable(m["payload"] for m in notifications)
events = [
models.Event(
message_id=ev['message_id'],
event_type=ev['event_type'],
generated=timeutils.normalize_time(
timeutils.parse_isotime(ev['generated'])),
traits=[models.Trait(name, dtype,
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']],
raw=ev.get('raw', {}))
for ev in events if publisher_utils.verify_signature(
ev, self.conf.publisher.telemetry_secret)
]
try:
with self.publisher as p:
p(events)
except Exception:
if not self.conf.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
raise
return oslo_messaging.NotificationResult.HANDLED
class EventSource(base.PipelineSource):
"""Represents a source of events.
@ -140,8 +102,6 @@ class EventSink(base.Sink):
class EventPipeline(base.Pipeline):
"""Represents a pipeline for Events."""
default_grouping_key = ['event_type']
def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name.
@ -153,10 +113,6 @@ class EventPipeline(base.Pipeline):
supported = [e for e in events if self.supported(e)]
self.sink.publish_events(supported)
def serializer(self, event):
return publisher_utils.message_from_event(
event, self.conf.publisher.telemetry_secret)
def supported(self, event):
return self.source.support_event(event.event_type)
@ -168,17 +124,9 @@ class EventPipelineManager(base.PipelineManager):
pm_source = EventSource
pm_sink = EventSink
def __init__(self, conf, partition=False):
def __init__(self, conf):
super(EventPipelineManager, self).__init__(
conf, conf.event_pipeline_cfg_file, {}, partition)
conf, conf.event_pipeline_cfg_file, {})
def get_main_endpoints(self):
return [EventEndpoint(self.conf, self.get_main_publisher())]
def get_interim_endpoints(self):
# FIXME(gordc): change this so we shard data rather than per
# pipeline. this will allow us to use self.publisher and less
# queues.
return [InterimEventEndpoint(
self.conf, base.PublishContext([pipe]), pipe.name)
for pipe in self.pipelines]
return [EventEndpoint(self.conf, self.publisher())]

View File

@ -10,15 +10,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from itertools import chain
from oslo_log import log
from stevedore import extension
from ceilometer import agent
from ceilometer.pipeline import base
from ceilometer.publisher import utils as publisher_utils
from ceilometer import sample as sample_util
LOG = log.getLogger(__name__)
@ -52,37 +48,6 @@ class SampleEndpoint(base.MainNotificationEndpoint):
pass
class InterimSampleEndpoint(base.NotificationEndpoint):
def __init__(self, conf, publisher, pipe_name):
self.event_types = [pipe_name]
super(InterimSampleEndpoint, self).__init__(conf, publisher)
def sample(self, notifications):
return self.process_notifications('sample', notifications)
def process_notifications(self, priority, notifications):
samples = chain.from_iterable(m["payload"] for m in notifications)
samples = [
sample_util.Sample(name=s['counter_name'],
type=s['counter_type'],
unit=s['counter_unit'],
volume=s['counter_volume'],
user_id=s['user_id'],
project_id=s['project_id'],
resource_id=s['resource_id'],
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source'),
# NOTE(sileht): May come from an older node,
# Put None in this case.
monotonic_time=s.get('monotonic_time'))
for s in samples if publisher_utils.verify_signature(
s, self.conf.publisher.telemetry_secret)
]
with self.publisher as p:
p(samples)
class SampleSource(base.PipelineSource):
"""Represents a source of samples.
@ -181,8 +146,6 @@ class SampleSink(base.Sink):
class SamplePipeline(base.Pipeline):
"""Represents a pipeline for Samples."""
default_grouping_key = ['resource_id']
def _validate_volume(self, s):
volume = s.volume
if volume is None:
@ -219,10 +182,6 @@ class SamplePipeline(base.Pipeline):
and self._validate_volume(s)]
self.sink.publish_samples(supported)
def serializer(self, sample):
return publisher_utils.meter_message_from_counter(
sample, self.conf.publisher.telemetry_secret)
def supported(self, sample):
return self.source.support_meter(sample.name)
@ -234,10 +193,9 @@ class SamplePipelineManager(base.PipelineManager):
pm_source = SampleSource
pm_sink = SampleSink
def __init__(self, conf, partition=False):
def __init__(self, conf):
super(SamplePipelineManager, self).__init__(
conf, conf.pipeline_cfg_file, self.get_transform_manager(),
partition)
conf, conf.pipeline_cfg_file, self.get_transform_manager())
@staticmethod
def get_transform_manager():
@ -247,13 +205,5 @@ class SamplePipelineManager(base.PipelineManager):
exts = extension.ExtensionManager(
namespace='ceilometer.sample.endpoint',
invoke_on_load=True,
invoke_args=(self.conf, self.get_main_publisher()))
invoke_args=(self.conf, self.publisher()))
return [ext.obj for ext in exts]
def get_interim_endpoints(self):
# FIXME(gordc): change this so we shard data rather than per
# pipeline. this will allow us to use self.publisher and less
# queues.
return [InterimSampleEndpoint(
self.conf, base.PublishContext([pipe]), pipe.name)
for pipe in self.pipelines]

View File

@ -75,7 +75,6 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClass(transformer.TransformerBase):
samples = []
grouping_keys = ['counter_name']
def __init__(self, append_name='_update'):
self.__class__.samples = []
@ -102,7 +101,6 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClassDrop(transformer.TransformerBase):
samples = []
grouping_keys = ['resource_id']
def __init__(self):
self.__class__.samples = []
@ -111,7 +109,6 @@ class BasePipelineTestCase(base.BaseTestCase):
self.__class__.samples.append(counter)
class TransformerClassException(object):
grouping_keys = ['resource_id']
@staticmethod
def handle_sample(counter):
@ -2171,46 +2168,3 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()
def test_get_pipeline_grouping_key(self):
transformer_cfg = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
},
{
'name': 'update',
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._build_and_set_new_pipeline()
pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
self.assertEqual(set(['resource_id', 'counter_name']),
set(pipeline_manager.pipelines[0].get_grouping_key()))
def test_get_pipeline_duplicate_grouping_key(self):
transformer_cfg = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'update',
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._build_and_set_new_pipeline()
pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
self.assertEqual(['counter_name'],
pipeline_manager.pipelines[0].get_grouping_key())

View File

@ -16,15 +16,12 @@ import traceback
import uuid
import fixtures
import mock
import oslo_messaging
from ceilometer.event import models
from ceilometer.pipeline import base as pipeline
from ceilometer.pipeline import event
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
from ceilometer.publisher import utils
from ceilometer import service
from ceilometer.tests import base
@ -357,40 +354,3 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()
def test_event_pipeline_endpoint_requeue_on_failure(self):
self.CONF.set_override("ack_on_event_error", False,
group="notification")
self.CONF.set_override("telemetry_secret", "not-so-secret",
group="publisher")
test_data = {
'message_id': uuid.uuid4(),
'event_type': 'a',
'generated': '2013-08-08 21:06:37.803826',
'traits': [
{'name': 't_text',
'value': 1,
'dtype': 'text_trait'
}
],
'raw': {'status': 'started'}
}
message_sign = utils.compute_signature(test_data, 'not-so-secret')
test_data['message_signature'] = message_sign
fake_publisher = mock.Mock()
self.useFixture(fixtures.MockPatch(
'ceilometer.publisher.test.TestPublisher',
return_value=fake_publisher))
self._build_and_set_new_pipeline()
pipeline_manager = event.EventPipelineManager(self.CONF)
pipe = pipeline_manager.pipelines[0]
event_pipeline_endpoint = event.InterimEventEndpoint(
self.CONF, pipeline.PublishContext([pipe]), pipe.name)
fake_publisher.publish_events.side_effect = Exception
ret = event_pipeline_endpoint.sample([
{'ctxt': {}, 'publisher_id': 'compute.vagrant-precise',
'event_type': 'a', 'payload': [test_data], 'metadata': {}}])
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)

View File

@ -17,7 +17,6 @@
import time
import mock
import oslo_messaging
from oslo_utils import fileutils
import six
import yaml
@ -84,14 +83,6 @@ class BaseNotificationTest(tests_base.BaseTestCase):
def run_service(self, srv):
srv.run()
self.addCleanup(srv.terminate)
if srv.conf.notification.workload_partitioning:
start = time.time()
while time.time() - start < 10:
if srv.group_state and srv.pipeline_listener:
break # ensure pipeline is set if HA
time.sleep(0.1)
else:
self.fail('Did not start pipeline queues')
class TestNotification(BaseNotificationTest):
@ -242,273 +233,3 @@ class TestRealNotification(BaseRealNotification):
if len(self.publisher.events) >= self.expected_events:
break
self.assertEqual(self.expected_events, len(self.publisher.events))
class TestRealNotificationHA(BaseRealNotification):
def setUp(self):
super(TestRealNotificationHA, self).setUp()
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.CONF.set_override("backend_url", "zake://", group="coordination")
self.srv = notification.NotificationService(0, self.CONF)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'stop')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'wait')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
def test_notification_threads(self, m_listener, m_wait, m_stop):
self.CONF.set_override('batch_size', 1, group='notification')
self.srv.run()
m_listener.assert_called_with(
override_pool_size=self.CONF.max_parallel_requests)
m_listener.reset_mock()
self.CONF.set_override('batch_size', 2, group='notification')
self.srv._refresh_agent()
m_listener.assert_called_with(override_pool_size=1)
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_reset_listener_on_refresh(self, mock_listener):
mock_listener.side_effect = [
mock.MagicMock(), # main listener
mock.MagicMock(), # pipeline listener
mock.MagicMock(), # refresh pipeline listener
]
self.run_service(self.srv)
listener = self.srv.pipeline_listener
self.srv._refresh_agent()
self.assertIsNot(listener, self.srv.pipeline_listener)
def test_hashring_targets(self):
maybe = {"maybe": 0}
def _once_over_five(item):
maybe["maybe"] += 1
return maybe["maybe"] % 5 == 0
hashring = mock.MagicMock()
hashring.belongs_to_self = _once_over_five
self.srv.partition_coordinator = pc = mock.MagicMock()
pc.join_partitioned_group.return_value = hashring
self.run_service(self.srv)
topics = [target.topic for target in
self.srv.pipeline_listener.targets]
self.assertEqual(4, len(topics))
self.assertEqual(
{'ceilometer_ipc-sample-4', 'ceilometer_ipc-sample-9',
'ceilometer_ipc-event-4', 'ceilometer_ipc-event-9'},
set(topics))
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_notify_to_relevant_endpoint(self, mock_listener):
self.run_service(self.srv)
targets = mock_listener.call_args[0][1]
self.assertIsNotEmpty(targets)
pipe_list = []
for mgr in self.srv.managers:
for pipe in mgr.pipelines:
pipe_list.append(pipe.name)
for pipe in pipe_list:
for endpoint in mock_listener.call_args[0][2]:
self.assertTrue(hasattr(endpoint, 'filter_rule'))
if endpoint.filter_rule.match(None, None, pipe, None, None):
break
else:
self.fail('%s not handled by any endpoint' % pipe)
@mock.patch('oslo_messaging.Notifier.sample')
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
self.run_service(self.srv)
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
None, None)):
continue
endpoint.info([{
'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'nonmatching.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertFalse(mock_notifier.called)
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
not endpoint.filter_rule.match(None, None,
'compute.instance.create.end',
None, None)):
continue
endpoint.info([{
'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'compute.instance.create.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertTrue(mock_notifier.called)
self.assertEqual(3, mock_notifier.call_count)
self.assertEqual(1, len([i for i in mock_notifier.call_args_list
if 'event_type' in i[1]['payload'][0]]))
self.assertEqual(2, len([i for i in mock_notifier.call_args_list
if 'counter_name' in i[1]['payload'][0]]))
class TestRealNotificationMultipleAgents(BaseNotificationTest):
def setup_pipeline(self, transformers):
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 5,
'meters': ['vcpus', 'memory'],
'sinks': ['test_sink']
}],
'sinks': [{
'name': 'test_sink',
'transformers': transformers,
'publishers': ['test://']
}]
})
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
return pipeline_cfg_file
def setup_event_pipeline(self):
pipeline = yaml.dump({
'sources': [],
'sinks': []
})
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(
content=pipeline, prefix="event_pipeline", suffix="yaml")
return pipeline_cfg_file
def setUp(self):
super(TestRealNotificationMultipleAgents, self).setUp()
self.CONF = service.prepare_service([], [])
self.setup_messaging(self.CONF, 'nova')
pipeline_cfg_file = self.setup_pipeline(['instance', 'memory'])
event_pipeline_cfg_file = self.setup_event_pipeline()
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.CONF.set_override("event_pipeline_cfg_file",
event_pipeline_cfg_file)
self.CONF.set_override("backend_url", "zake://", group="coordination")
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.CONF.set_override('pipeline_processing_queues', 2,
group='notification')
self.CONF.set_override('check_watchers', 1, group='coordination')
self.publisher = test_publisher.TestPublisher(self.CONF, "")
self.publisher2 = test_publisher.TestPublisher(self.CONF, "")
def _check_notifications(self, fake_publisher_cls):
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
maybe = {"srv": 0, "srv2": -1}
def _sometimes_srv(item):
maybe["srv"] += 1
return (maybe["srv"] % 2) == 0
self.srv = notification.NotificationService(0, self.CONF)
self.srv.partition_coordinator = pc = mock.MagicMock()
hashring_srv1 = mock.MagicMock()
hashring_srv1.belongs_to_self = _sometimes_srv
hashring_srv1.ring.nodes = {'id1': mock.Mock()}
pc.join_partitioned_group.return_value = hashring_srv1
self.run_service(self.srv)
def _sometimes_srv2(item):
maybe["srv2"] += 1
return (maybe["srv2"] % 2) == 0
self.srv2 = notification.NotificationService(0, self.CONF)
self.srv2.partition_coordinator = pc = mock.MagicMock()
hashring = mock.MagicMock()
hashring.belongs_to_self = _sometimes_srv2
hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()}
self.srv.hashring.ring.nodes = hashring.ring.nodes.copy()
pc.join_partitioned_group.return_value = hashring
self.run_service(self.srv2)
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
payload1 = TEST_NOTICE_PAYLOAD.copy()
payload1['instance_id'] = '0'
notifier.info({}, 'compute.instance.create.end', payload1)
payload2 = TEST_NOTICE_PAYLOAD.copy()
payload2['instance_id'] = '1'
notifier.info({}, 'compute.instance.create.end', payload2)
self.expected_samples = 4
with mock.patch('six.moves.builtins.hash', lambda x: int(x)):
start = time.time()
while time.time() - start < 10:
if (len(self.publisher.samples + self.publisher2.samples) >=
self.expected_samples and
len(self.srv.group_state) == 2):
break
time.sleep(0.1)
self.assertEqual(2, len(self.publisher.samples))
self.assertEqual(2, len(self.publisher2.samples))
self.assertEqual(1, len(set(
s.resource_id for s in self.publisher.samples)))
self.assertEqual(1, len(set(
s.resource_id for s in self.publisher2.samples)))
self.assertEqual(2, len(self.srv.group_state))
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_no_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline([])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline(
[{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_multiple_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline(
[{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}, {
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)

View File

@ -42,10 +42,6 @@ class TransformerBase(object):
:param sample: A sample.
"""
@abc.abstractproperty
def grouping_keys(self):
"""Keys used to group transformer."""
@staticmethod
def flush():
"""Flush samples cached previously."""

View File

@ -22,8 +22,6 @@ class TransformerAccumulator(transformer.TransformerBase):
And then flushes them out into the wild.
"""
grouping_keys = ['resource_id']
def __init__(self, size=1, **kwargs):
if size >= 1:
self.samples = []

View File

@ -36,8 +36,6 @@ class ArithmeticTransformer(transformer.TransformerBase):
over one or more meters and/or their metadata.
"""
grouping_keys = ['resource_id']
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
def __init__(self, target=None, **kwargs):

View File

@ -30,8 +30,6 @@ LOG = log.getLogger(__name__)
class BaseConversionTransformer(transformer.TransformerBase):
"""Transformer to derive conversion."""
grouping_keys = ['resource_id']
def __init__(self, source=None, target=None, **kwargs):
"""Initialize transformer with configured parameters.

View File

@ -262,7 +262,6 @@ function configure_ceilometer {
if [[ -n "$CEILOMETER_COORDINATION_URL" ]]; then
iniset $CEILOMETER_CONF coordination backend_url $CEILOMETER_COORDINATION_URL
iniset $CEILOMETER_CONF notification workload_partitioning True
iniset $CEILOMETER_CONF notification workers $API_WORKERS
fi

View File

@ -94,18 +94,9 @@ Additionally, it must set ``get_main_endpoints`` which provides endpoints to be
added to the main queue listener in the notification agent. This main queue
endpoint inherits :class:`ceilometer.pipeline.base.MainNotificationEndpoint`
and defines which notification priorities to listen, normalises the data,
and redirects the data for pipeline processing or requeuing depending on
`workload_partitioning` configuration.
and redirects the data for pipeline processing.
If a pipeline is configured to support `workload_partitioning`, data from the
main queue endpoints are shared and requeued in internal queues. The
notification agent configures a second notification consumer to handle these
internal queues and pushes data to endpoints defined by
``get_interim_endpoints`` in the pipeline manager. These interim endpoints
define how to handle the shared, normalised data models for pipeline
processing
Both main queue and interim queue notification endpoints should implement:
Notification endpoints should implement:
``event_types``
A sequence of strings defining the event types the endpoint should handle

View File

@ -0,0 +1,4 @@
---
upgrade:
- |
The deprecated workload partitioning for notification agent has been removed.