From 89995280927d495042f342e8a0a520fa4775c515 Mon Sep 17 00:00:00 2001 From: gord chung Date: Wed, 30 Nov 2016 16:53:08 +0000 Subject: [PATCH] polling definition file add support for polling specific definition file. this splits the existing polling specific options out of pipeline.yaml as transformations only exists on notifcation agent and polling interval/discovery only exists on polling agents. backward compatibility is maintained so pipeline.yaml file from previous releases can still be passed in as polling definition file. Change-Id: I206566349f98d6b17336cd5ea36ceb1e304dd90c --- ceilometer/agent/manager.py | 5 + ceilometer/pipeline.py | 177 ++++++++------ ceilometer/tests/unit/agent/agentbase.py | 220 ++++++++---------- ceilometer/tests/unit/pipeline_base.py | 39 ---- .../tests/unit/test_decoupled_pipeline.py | 14 -- ceilometer/tests/unit/test_polling.py | 102 ++++++++ devstack/plugin.sh | 4 +- etc/ceilometer/pipeline.yaml | 4 - etc/ceilometer/polling.yaml | 6 + .../polling-definition-efffb92e3810e571.yaml | 10 + 10 files changed, 328 insertions(+), 253 deletions(-) create mode 100644 ceilometer/tests/unit/test_polling.py create mode 100644 etc/ceilometer/polling.yaml create mode 100644 releasenotes/notes/polling-definition-efffb92e3810e571.yaml diff --git a/ceilometer/agent/manager.py b/ceilometer/agent/manager.py index 454a2d9f..bd8505b1 100644 --- a/ceilometer/agent/manager.py +++ b/ceilometer/agent/manager.py @@ -57,6 +57,10 @@ OPTS = [ ] POLLING_OPTS = [ + cfg.StrOpt('cfg_file', + default="polling.yaml", + help="Configuration file for pipeline definition." + ), cfg.StrOpt('partitioning_group_prefix', deprecated_group='central', help='Work-load partitioning group prefix. Use only if you ' @@ -520,6 +524,7 @@ class AgentManager(service_base.PipelineBasedService): self.polling_periodics.wait() self.polling_periodics = None + # FIXME(gordc): refactor pipeline dependency out of polling agent. def reload_pipeline(self): if self.pipeline_validated: LOG.info(_LI("Reconfiguring polling tasks.")) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 881815ba..d39afc82 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -63,13 +63,24 @@ OPTS = [ LOG = log.getLogger(__name__) -class PipelineException(Exception): - def __init__(self, message, pipeline_cfg): +class ConfigException(Exception): + def __init__(self, cfg_type, message, cfg): + self.cfg_type = cfg_type self.msg = message - self.pipeline_cfg = pipeline_cfg + self.cfg = cfg def __str__(self): - return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg) + return '%s %s: %s' % (self.cfg_type, self.cfg, self.msg) + + +class PollingException(ConfigException): + def __init__(self, message, cfg): + super(PollingException, self).__init__('Polling', message, cfg) + + +class PipelineException(ConfigException): + def __init__(self, message, cfg): + super(PipelineException, self).__init__('Pipeline', message, cfg) @six.add_metaclass(abc.ABCMeta) @@ -219,14 +230,12 @@ class PublishContext(object): class Source(object): - """Represents a source of samples or events.""" + """Represents a generic source""" def __init__(self, cfg): self.cfg = cfg - try: self.name = cfg['name'] - self.sinks = cfg.get('sinks') except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) @@ -234,17 +243,6 @@ class Source(object): def __str__(self): return self.name - def check_sinks(self, sinks): - if not self.sinks: - raise PipelineException( - "No sink defined in source %s" % self, - self.cfg) - for sink in self.sinks: - if sink not in sinks: - raise PipelineException( - "Dangling sink %s from source %s" % (sink, self), - self.cfg) - def check_source_filtering(self, data, d_type): """Source data rules checking @@ -282,7 +280,30 @@ class Source(object): return all(datapoint.startswith('!') for datapoint in dataset) -class EventSource(Source): +class PipelineSource(Source): + """Represents a source of samples or events.""" + + def __init__(self, cfg): + super(PipelineSource, self).__init__(cfg) + try: + self.sinks = cfg['sinks'] + except KeyError as err: + raise PipelineException( + "Required field %s not specified" % err.args[0], cfg) + + def check_sinks(self, sinks): + if not self.sinks: + raise PipelineException( + "No sink defined in source %s" % self, + self.cfg) + for sink in self.sinks: + if sink not in sinks: + raise PipelineException( + "Dangling sink %s from source %s" % (sink, self), + self.cfg) + + +class EventSource(PipelineSource): """Represents a source of events. In effect it is a set of notification handlers capturing events for a set @@ -298,13 +319,12 @@ class EventSource(Source): return self.is_supported(self.events, event_name) -class SampleSource(Source): +class SampleSource(PipelineSource): """Represents a source of samples. - In effect it is a set of pollsters and/or notification handlers emitting + In effect it is a set of notification handlers processing samples for a set of matching meters. Each source encapsulates meter name - matching, polling interval determination, optional resource enumeration or - discovery, and mapping to one or more sinks for publication. + matching and mapping to one or more sinks for publication. """ def __init__(self, cfg): @@ -313,10 +333,33 @@ class SampleSource(Source): self.meters = cfg['meters'] except KeyError: raise PipelineException("Missing meters value", cfg) + self.check_source_filtering(self.meters, 'meters') + + def support_meter(self, meter_name): + return self.is_supported(self.meters, meter_name) + + +class PollingSource(Source): + """Represents a source of pollsters + + In effect it is a set of pollsters emitting + samples for a set of matching meters. Each source encapsulates meter name + matching, polling interval determination, optional resource enumeration or + discovery. + """ + + def __init__(self, cfg): + super(PollingSource, self).__init__(cfg) try: - self.interval = int(cfg.get('interval', 600)) + self.meters = cfg['meters'] + except KeyError: + raise PipelineException("Missing meters value", cfg) + try: + self.interval = int(cfg['interval']) except ValueError: raise PipelineException("Invalid interval value", cfg) + except KeyError: + raise PipelineException("Missing interval value", cfg) if self.interval <= 0: raise PipelineException("Interval value should > 0", cfg) @@ -560,17 +603,6 @@ class EventPipeline(Pipeline): class SamplePipeline(Pipeline): """Represents a pipeline for Samples.""" - def get_interval(self): - return self.source.interval - - @property - def resources(self): - return self.source.resources - - @property - def discovery(self): - return self.source.discovery - def support_meter(self, meter_name): return self.source.support_meter(meter_name) @@ -692,9 +724,6 @@ class PipelineManager(ConfigManagerBase): """Pipeline Manager Pipeline manager sets up pipelines according to config file - - Usually only one pipeline manager exists in the system. - """ def __init__(self, conf, cfg_file, transformer_manager, @@ -705,7 +734,7 @@ class PipelineManager(ConfigManagerBase): Decoupled: the source and sink configuration are separately specified before being linked together. This allows source- - specific configuration, such as resource discovery, to be + specific configuration, such as meter handling, to be kept focused only on the fine-grained source while avoiding the necessity for wide duplication of sink-related config. @@ -713,13 +742,10 @@ class PipelineManager(ConfigManagerBase): of dictionaries defining sources and sinks, for example: {"sources": [{"name": source_1, - "interval": interval_time, "meters" : ["meter_1", "meter_2"], - "resources": ["resource_uri1", "resource_uri2"], "sinks" : ["sink_1", "sink_2"] }, {"name": source_2, - "interval": interval_time, "meters" : ["meter_3"], "sinks" : ["sink_2"] }, @@ -740,26 +766,15 @@ class PipelineManager(ConfigManagerBase): ] } - The interval determines the cadence of sample injection into - the pipeline where samples are produced under the direct control - of an agent, i.e. via a polling cycle as opposed to incoming - notifications. - Valid meter format is '*', '!meter_name', or 'meter_name'. '*' is wildcard symbol means any meters; '!meter_name' means "meter_name" will be excluded; 'meter_name' means 'meter_name' will be included. - The 'meter_name" is Sample name field. - Valid meters definition is all "included meter names", all "excluded meter names", wildcard and "excluded meter names", or only wildcard. - The resources is list of URI indicating the resources from where - the meters should be polled. It's optional and it's up to the - specific pollster to decide how to use it. - Transformer's name is plugin name in setup.cfg. Publisher's name is plugin name in setup.cfg @@ -830,26 +845,48 @@ class PollingManager(ConfigManagerBase): def __init__(self, conf, cfg_file): """Setup the polling according to config. - The configuration is the sources half of the Pipeline Config. + The configuration is supported as follows: + + {"sources": [{"name": source_1, + "interval": interval_time, + "meters" : ["meter_1", "meter_2"], + "resources": ["resource_uri1", "resource_uri2"], + }, + {"name": source_2, + "interval": interval_time, + "meters" : ["meter_3"], + }, + ]} + } + + The interval determines the cadence of sample polling + + Valid meter format is '*', '!meter_name', or 'meter_name'. + '*' is wildcard symbol means any meters; '!meter_name' means + "meter_name" will be excluded; 'meter_name' means 'meter_name' + will be included. + + Valid meters definition is all "included meter names", all + "excluded meter names", wildcard and "excluded meter names", or + only wildcard. + + The resources is list of URI indicating the resources from where + the meters should be polled. It's optional and it's up to the + specific pollster to decide how to use it. + """ super(PollingManager, self).__init__(conf) - cfg = self.load_config(cfg_file) + try: + cfg = self.load_config(cfg_file) + except (TypeError, IOError): + LOG.warning(_LW('Unable to locate polling configuration, falling ' + 'back to pipeline configuration.')) + cfg = self.load_config(conf.pipeline_cfg_file) self.sources = [] - if not ('sources' in cfg and 'sinks' in cfg): - raise PipelineException("Both sources & sinks are required", - cfg) - LOG.info(_LI('detected decoupled pipeline config format')) - - unique_names = set() + if 'sources' not in cfg: + raise PollingException("sources required", cfg) for s in cfg.get('sources'): - name = s.get('name') - if name in unique_names: - raise PipelineException("Duplicated source names: %s" % - name, self) - else: - unique_names.add(name) - self.sources.append(SampleSource(s)) - unique_names.clear() + self.sources.append(PollingSource(s)) def setup_event_pipeline(conf, transformer_manager=None): @@ -870,7 +907,7 @@ def setup_pipeline(conf, transformer_manager=None): def setup_polling(conf): """Setup polling manager according to yaml config file.""" - cfg_file = conf.pipeline_cfg_file + cfg_file = conf.polling.cfg_file return PollingManager(conf, cfg_file) diff --git a/ceilometer/tests/unit/agent/agentbase.py b/ceilometer/tests/unit/agent/agentbase.py index c88c2dd9..9e3ea470 100644 --- a/ceilometer/tests/unit/agent/agentbase.py +++ b/ceilometer/tests/unit/agent/agentbase.py @@ -19,18 +19,18 @@ import abc import copy import datetime +import os +import tempfile import time import mock from oslo_config import fixture as fixture_config -from oslotest import mockpatch import six from stevedore import extension +import yaml from ceilometer.agent import plugin_base from ceilometer import pipeline -from ceilometer import publisher -from ceilometer.publisher import test as test_publisher from ceilometer import sample from ceilometer.tests import base from ceilometer import utils @@ -195,7 +195,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def setup_polling(self): self.mgr.polling_manager = pipeline.PollingManager( - self.CONF, self.cfg2file(self.pipeline_cfg)) + self.CONF, self.cfg2file(self.polling_cfg)) def create_extension_list(self): return [extension.Extension('test', @@ -250,8 +250,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase): super(BaseAgentManagerTestCase, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF.set_override( - 'pipeline_cfg_file', - self.path_get('etc/ceilometer/pipeline.yaml') + 'cfg_file', + self.path_get('etc/ceilometer/polling.yaml'), group='polling' ) self.CONF.set_override('heartbeat', 1.0, group='coordination') self.CONF(args=[]) @@ -262,27 +262,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase): p_coord = self.mgr.partition_coordinator p_coord.extract_my_subset.side_effect = fake_subset self.mgr.tg = mock.MagicMock() - self.pipeline_cfg = { + self.polling_cfg = { 'sources': [{ - 'name': 'test_pipeline', + 'name': 'test_polling', 'interval': 60, 'meters': ['test'], - 'resources': ['test://'], - 'sinks': ['test_sink']}], - 'sinks': [{ - 'name': 'test_sink', - 'transformers': [], - 'publishers': ["test"]}] + 'resources': ['test://']}] } self.setup_polling() - self.useFixture(mockpatch.PatchObject( - publisher, 'get_publisher', side_effect=self.get_publisher)) - - def get_publisher(self, url, namespace=''): - fake_drivers = {'test://': test_publisher.TestPublisher, - 'new://': test_publisher.TestPublisher, - 'rpc://': test_publisher.TestPublisher} - return fake_drivers[url](self.CONF, url) def tearDown(self): self.Pollster.samples = [] @@ -334,7 +321,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.mgr.join_partitioning_groups() p_coord = self.mgr.partition_coordinator static_group_ids = [utils.hash_of_set(p['resources']) - for p in self.pipeline_cfg['sources'] + for p in self.polling_cfg['sources'] if p['resources']] expected = [mock.call(self.mgr.construct_group_id(g)) for g in ['another_group', 'global'] + static_group_ids] @@ -348,16 +335,15 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertIn(60, polling_tasks.keys()) per_task_resources = polling_tasks[60].resources self.assertEqual(1, len(per_task_resources)) - self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']), - set(per_task_resources['test_pipeline-test'].get({}))) + self.assertEqual(set(self.polling_cfg['sources'][0]['resources']), + set(per_task_resources['test_polling-test'].get({}))) def test_setup_polling_tasks_multiple_interval(self): - self.pipeline_cfg['sources'].append({ - 'name': 'test_pipeline_1', + self.polling_cfg['sources'].append({ + 'name': 'test_polling_1', 'interval': 10, 'meters': ['test'], 'resources': ['test://'], - 'sinks': ['test_sink'] }) self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() @@ -366,12 +352,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertIn(10, polling_tasks.keys()) def test_setup_polling_tasks_mismatch_counter(self): - self.pipeline_cfg['sources'].append({ - 'name': 'test_pipeline_1', + self.polling_cfg['sources'].append({ + 'name': 'test_polling_1', 'interval': 10, 'meters': ['test_invalid'], 'resources': ['invalid://'], - 'sinks': ['test_sink'] }) polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) @@ -379,12 +364,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertNotIn(10, polling_tasks.keys()) def test_setup_polling_task_same_interval(self): - self.pipeline_cfg['sources'].append({ - 'name': 'test_pipeline_1', + self.polling_cfg['sources'].append({ + 'name': 'test_polling_1', 'interval': 60, 'meters': ['testanother'], 'resources': ['testanother://'], - 'sinks': ['test_sink'] }) self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() @@ -393,11 +377,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual(2, len(pollsters)) per_task_resources = polling_tasks[60].resources self.assertEqual(2, len(per_task_resources)) - key = 'test_pipeline-test' - self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']), + key = 'test_polling-test' + self.assertEqual(set(self.polling_cfg['sources'][0]['resources']), set(per_task_resources[key].get({}))) - key = 'test_pipeline_1-testanother' - self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']), + key = 'test_polling_1-testanother' + self.assertEqual(set(self.polling_cfg['sources'][1]['resources']), set(per_task_resources[key].get({}))) def test_agent_manager_start(self): @@ -408,12 +392,38 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.addCleanup(mgr.terminate) mgr.create_polling_task.assert_called_once_with() + def test_agent_manager_start_fallback(self): + pipeline_cfg = { + 'sources': [{ + 'name': 'test_pipeline', + 'interval': 60, + 'meters': ['test'], + 'resources': ['test://'], + 'sinks': ['test_sink']}], + 'sinks': [{ + 'name': 'test_sink', + 'transformers': [], + 'publishers': ["test"]}] + } + tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False) + tmp_cfg.write(yaml.safe_dump(pipeline_cfg)) + tmp_cfg.close() + self.CONF.set_override('pipeline_cfg_file', tmp_cfg.name) + self.CONF.set_override('cfg_file', None, group='polling') + + mgr = self.create_manager() + mgr.extensions = self.mgr.extensions + mgr.create_polling_task = mock.MagicMock() + mgr.run() + self.addCleanup(mgr.terminate) + self.addCleanup(os.unlink, tmp_cfg.name) + mgr.create_polling_task.assert_called_once_with() + def test_manager_exception_persistency(self): - self.pipeline_cfg['sources'].append({ - 'name': 'test_pipeline_1', + self.polling_cfg['sources'].append({ + 'name': 'test_polling_1', 'interval': 60, 'meters': ['testanother'], - 'sinks': ['test_sink'] }) self.setup_polling() @@ -430,13 +440,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.DiscoveryAnother.resources = [d[::-1] for d in discovered_resources] if static_resources: - # just so we can test that static + pre_pipeline amalgamated + # just so we can test that static + pre_polling amalgamated # override per_pollster - self.pipeline_cfg['sources'][0]['discovery'] = [ + self.polling_cfg['sources'][0]['discovery'] = [ 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] - self.pipeline_cfg['sources'][0]['resources'] = static_resources + self.polling_cfg['sources'][0]['resources'] = static_resources self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -456,7 +466,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'], []) - def test_per_pollster_discovery_overridden_by_per_pipeline_discovery(self): + def test_per_pollster_discovery_overridden_by_per_polling_discovery(self): # ensure static+per_source_discovery overrides per_pollster_discovery self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'], ['static_1', 'static_2']) @@ -477,8 +487,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.PollsterAnother.discovery = 'testdiscovery' self.mgr.discoveries = self.create_discoveries() self.Discovery.resources = discovered_resources - self.pipeline_cfg['sources'][0]['meters'].append('testanother') - self.pipeline_cfg['sources'][0]['resources'] = [] + self.polling_cfg['sources'][0]['meters'].append('testanother') + self.polling_cfg['sources'][0]['resources'] = [] self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -486,17 +496,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual(discovered_resources, self.Pollster.resources) self.assertEqual(discovered_resources, self.PollsterAnother.resources) - def _do_test_per_pipeline_discovery(self, - discovered_resources, - static_resources): + def _do_test_per_polling_discovery(self, discovered_resources, + static_resources): self.mgr.discoveries = self.create_discoveries() self.Discovery.resources = discovered_resources self.DiscoveryAnother.resources = [d[::-1] for d in discovered_resources] - self.pipeline_cfg['sources'][0]['discovery'] = [ + self.polling_cfg['sources'][0]['discovery'] = [ 'testdiscovery', 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] - self.pipeline_cfg['sources'][0]['resources'] = static_resources + self.polling_cfg['sources'][0]['resources'] = static_resources self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -509,35 +518,33 @@ class BaseAgentManagerTestCase(base.BaseTestCase): for x in self.Pollster.resources: self.assertEqual(1, self.Pollster.resources.count(x)) - def test_per_pipeline_discovery_discovered_only(self): - self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'], - []) + def test_per_polling_discovery_discovered_only(self): + self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'], + []) - def test_per_pipeline_discovery_static_only(self): - self._do_test_per_pipeline_discovery([], - ['static_1', 'static_2']) + def test_per_polling_discovery_static_only(self): + self._do_test_per_polling_discovery([], ['static_1', 'static_2']) - def test_per_pipeline_discovery_discovered_augmented_by_static(self): - self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'], - ['static_1', 'static_2']) + def test_per_polling_discovery_discovered_augmented_by_static(self): + self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'], + ['static_1', 'static_2']) - def test_per_pipeline_discovery_discovered_duplicated_static(self): - self._do_test_per_pipeline_discovery(['discovered_1', 'pud'], - ['dup', 'static_1', 'dup']) + def test_per_polling_discovery_discovered_duplicated_static(self): + self._do_test_per_polling_discovery(['discovered_1', 'pud'], + ['dup', 'static_1', 'dup']) - def test_multiple_pipelines_different_static_resources(self): + def test_multiple_pollings_different_static_resources(self): # assert that the individual lists of static and discovered resources - # for each pipeline with a common interval are passed to individual - # pollsters matching each pipeline - self.pipeline_cfg['sources'][0]['resources'] = ['test://'] - self.pipeline_cfg['sources'][0]['discovery'] = ['testdiscovery'] - self.pipeline_cfg['sources'].append({ - 'name': 'another_pipeline', + # for each polling with a common interval are passed to individual + # pollsters matching each polling + self.polling_cfg['sources'][0]['resources'] = ['test://'] + self.polling_cfg['sources'][0]['discovery'] = ['testdiscovery'] + self.polling_cfg['sources'].append({ + 'name': 'another_polling', 'interval': 60, 'meters': ['test'], 'resources': ['another://'], 'discovery': ['testdiscoveryanother'], - 'sinks': ['test_sink_new'] }) self.mgr.discoveries = self.create_discoveries() self.Discovery.resources = ['discovered_1', 'discovered_2'] @@ -566,20 +573,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase): sources = [{'name': 'test_source_1', 'interval': 60, 'meters': ['test'], - 'discovery': ['testdiscovery'], - 'sinks': ['test_sink_1']}, + 'discovery': ['testdiscovery']}, {'name': 'test_source_2', 'interval': 60, 'meters': ['testanother'], - 'discovery': ['testdiscoveryanother'], - 'sinks': ['test_sink_2']}] - sinks = [{'name': 'test_sink_1', - 'transformers': [], - 'publishers': ['test://']}, - {'name': 'test_sink_2', - 'transformers': [], - 'publishers': ['test://']}] - self.pipeline_cfg = {'sources': sources, 'sinks': sinks} + 'discovery': ['testdiscoveryanother']}] + self.polling_cfg = {'sources': sources} self.mgr.discoveries = self.create_discoveries() self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() @@ -593,37 +592,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual(['discovered_3', 'discovered_4'], self.PollsterAnother.resources) - def test_multiple_sinks_same_discoverer(self): - self.Discovery.resources = ['discovered_1', 'discovered_2'] - sources = [{'name': 'test_source_1', - 'interval': 60, - 'meters': ['test'], - 'discovery': ['testdiscovery'], - 'sinks': ['test_sink_1', 'test_sink_2']}] - sinks = [{'name': 'test_sink_1', - 'transformers': [], - 'publishers': ['test://']}, - {'name': 'test_sink_2', - 'transformers': [], - 'publishers': ['test://']}] - self.pipeline_cfg = {'sources': sources, 'sinks': sinks} - self.mgr.discoveries = self.create_discoveries() - self.setup_polling() - polling_tasks = self.mgr.setup_polling_tasks() - self.assertEqual(1, len(polling_tasks)) - self.assertIn(60, polling_tasks.keys()) - self.mgr.interval_task(polling_tasks.get(60)) - self.assertEqual(1, len(self.Pollster.samples)) - self.assertEqual(['discovered_1', 'discovered_2'], - self.Pollster.resources) - def test_discovery_partitioning(self): self.mgr.discoveries = self.create_discoveries() p_coord = self.mgr.partition_coordinator - self.pipeline_cfg['sources'][0]['discovery'] = [ + self.polling_cfg['sources'][0]['discovery'] = [ 'testdiscovery', 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] - self.pipeline_cfg['sources'][0]['resources'] = [] + self.polling_cfg['sources'][0]['resources'] = [] self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -640,26 +615,24 @@ class BaseAgentManagerTestCase(base.BaseTestCase): p_coord = self.mgr.partition_coordinator static_resources = ['static_1', 'static_2'] static_resources2 = ['static_3', 'static_4'] - self.pipeline_cfg['sources'][0]['resources'] = static_resources - self.pipeline_cfg['sources'].append({ - 'name': 'test_pipeline2', + self.polling_cfg['sources'][0]['resources'] = static_resources + self.polling_cfg['sources'].append({ + 'name': 'test_polling2', 'interval': 60, 'meters': ['test', 'test2'], 'resources': static_resources2, - 'sinks': ['test_sink'] }) - # have one pipeline without static resources defined - self.pipeline_cfg['sources'].append({ - 'name': 'test_pipeline3', + # have one polling without static resources defined + self.polling_cfg['sources'].append({ + 'name': 'test_polling3', 'interval': 60, 'meters': ['test', 'test2'], 'resources': [], - 'sinks': ['test_sink'] }) self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) - # Only two groups need to be created, one for each pipeline, + # Only two groups need to be created, one for each polling, # even though counter test is used twice expected = [mock.call(self.mgr.construct_group_id( utils.hash_of_set(resources)), @@ -678,14 +651,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase): polling_task.poll_and_notify() LOG.info.assert_called_with( 'Polling pollster %(poll)s in the context of %(src)s', - {'poll': 'test', 'src': 'test_pipeline'}) + {'poll': 'test', 'src': 'test_polling'}) @mock.patch('ceilometer.agent.manager.LOG') def test_skip_polling_and_notify_with_no_resources(self, LOG): - self.pipeline_cfg['sources'][0]['resources'] = [] + self.polling_cfg['sources'][0]['resources'] = [] self.setup_polling() polling_task = list(self.mgr.setup_polling_tasks().values())[0] - pollster = list(polling_task.pollster_matches['test_pipeline'])[0] + pollster = list(polling_task.pollster_matches['test_polling'])[0] polling_task.poll_and_notify() LOG.info.assert_called_with( 'Skip pollster %(name)s, no %(p_context)sresources found this ' @@ -693,12 +666,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): @mock.patch('ceilometer.agent.manager.LOG') def test_skip_polling_polled_resources(self, LOG): - self.pipeline_cfg['sources'].append({ - 'name': 'test_pipeline_1', + self.polling_cfg['sources'].append({ + 'name': 'test_polling_1', 'interval': 60, 'meters': ['test'], 'resources': ['test://'], - 'sinks': ['test_sink'] }) self.setup_polling() polling_task = list(self.mgr.setup_polling_tasks().values())[0] diff --git a/ceilometer/tests/unit/pipeline_base.py b/ceilometer/tests/unit/pipeline_base.py index a76581cf..2781af87 100644 --- a/ceilometer/tests/unit/pipeline_base.py +++ b/ceilometer/tests/unit/pipeline_base.py @@ -213,23 +213,10 @@ class BasePipelineTestCase(base.BaseTestCase): self._unset_pipeline_cfg('name') self._exception_create_pipelinemanager() - def test_no_interval(self): - self._unset_pipeline_cfg('interval') - pipeline_manager = pipeline.PipelineManager( - self.CONF, - self.cfg2file(self.pipeline_cfg), self.transformer_manager) - pipe = pipeline_manager.pipelines[0] - self.assertEqual(600, pipe.get_interval()) - def test_no_publishers(self): self._unset_pipeline_cfg('publishers') self._exception_create_pipelinemanager() - def test_invalid_resources(self): - invalid_resource = {'invalid': 1} - self._set_pipeline_cfg('resources', invalid_resource) - self._exception_create_pipelinemanager() - def test_check_counters_include_exclude_same(self): counter_cfg = ['a', '!a'] self._set_pipeline_cfg('meters', counter_cfg) @@ -249,10 +236,6 @@ class BasePipelineTestCase(base.BaseTestCase): publisher_cfg = ['test_invalid'] self._set_pipeline_cfg('publishers', publisher_cfg) - def test_invalid_string_interval(self): - self._set_pipeline_cfg('interval', 'string') - self._exception_create_pipelinemanager() - def test_check_transformer_invalid_transformer(self): transformer_cfg = [ {'name': "test_invalid", @@ -261,13 +244,6 @@ class BasePipelineTestCase(base.BaseTestCase): self._set_pipeline_cfg('transformers', transformer_cfg) self._exception_create_pipelinemanager() - def test_get_interval(self): - pipeline_manager = pipeline.PipelineManager( - self.CONF, - self.cfg2file(self.pipeline_cfg), self.transformer_manager) - pipe = pipeline_manager.pipelines[0] - self.assertEqual(5, pipe.get_interval()) - def test_publisher_transformer_invoked(self): pipeline_manager = pipeline.PipelineManager( self.CONF, @@ -1196,21 +1172,6 @@ class BasePipelineTestCase(base.BaseTestCase): (counters[1],) ) - def test_resources(self): - resources = ['test1://', 'test2://'] - self._set_pipeline_cfg('resources', resources) - pipeline_manager = pipeline.PipelineManager( - self.CONF, - self.cfg2file(self.pipeline_cfg), self.transformer_manager) - self.assertEqual(resources, - pipeline_manager.pipelines[0].resources) - - def test_no_resources(self): - pipeline_manager = pipeline.PipelineManager( - self.CONF, - self.cfg2file(self.pipeline_cfg), self.transformer_manager) - self.assertEqual(0, len(pipeline_manager.pipelines[0].resources)) - def _do_test_rate_of_change_mapping(self, pipe, meters, units): now = timeutils.utcnow() base = 1000 diff --git a/ceilometer/tests/unit/test_decoupled_pipeline.py b/ceilometer/tests/unit/test_decoupled_pipeline.py index 4ac69d0d..4c9e0641 100644 --- a/ceilometer/tests/unit/test_decoupled_pipeline.py +++ b/ceilometer/tests/unit/test_decoupled_pipeline.py @@ -23,9 +23,7 @@ from ceilometer.tests.unit import pipeline_base class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): def _setup_pipeline_cfg(self): source = {'name': 'test_source', - 'interval': 5, 'meters': ['a'], - 'resources': [], 'sinks': ['test_sink']} sink = {'name': 'test_sink', 'transformers': [{'name': 'update', 'parameters': {}}], @@ -35,9 +33,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): def _augment_pipeline_cfg(self): self.pipeline_cfg['sources'].append({ 'name': 'second_source', - 'interval': 5, 'meters': ['b'], - 'resources': [], 'sinks': ['second_sink'] }) self.pipeline_cfg['sinks'].append({ @@ -55,9 +51,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): def _break_pipeline_cfg(self): self.pipeline_cfg['sources'].append({ 'name': 'second_source', - 'interval': 5, 'meters': ['b'], - 'resources': [], 'sinks': ['second_sink'] }) self.pipeline_cfg['sinks'].append({ @@ -75,9 +69,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): def _dup_pipeline_name_cfg(self): self.pipeline_cfg['sources'].append({ 'name': 'test_source', - 'interval': 5, 'meters': ['b'], - 'resources': [], 'sinks': ['test_sink'] }) @@ -106,9 +98,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): def test_source_dangling_sink(self): self.pipeline_cfg['sources'].append({ 'name': 'second_source', - 'interval': 5, 'meters': ['b'], - 'resources': [], 'sinks': ['second_sink'] }) self._exception_create_pipelinemanager() @@ -170,9 +160,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): def test_multiple_sources_with_single_sink(self): self.pipeline_cfg['sources'].append({ 'name': 'second_source', - 'interval': 5, 'meters': ['b'], - 'resources': [], 'sinks': ['test_sink'] }) pipeline_manager = pipeline.PipelineManager( @@ -283,9 +271,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): def test_duplicated_source_names(self): self.pipeline_cfg['sources'].append({ 'name': 'test_source', - 'interval': 5, 'meters': ['a'], - 'resources': [], 'sinks': ['test_sink'] }) self.assertRaises(pipeline.PipelineException, diff --git a/ceilometer/tests/unit/test_polling.py b/ceilometer/tests/unit/test_polling.py new file mode 100644 index 00000000..8b0ac90a --- /dev/null +++ b/ceilometer/tests/unit/test_polling.py @@ -0,0 +1,102 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import tempfile + +from oslo_config import fixture as fixture_config +from oslotest import base +import yaml + +from ceilometer import pipeline + + +class PollingTestCase(base.BaseTestCase): + + def cfg2file(self, data): + self.tmp_cfg.write(yaml.safe_dump(data)) + self.tmp_cfg.close() + return self.tmp_cfg.name + + def setUp(self): + super(PollingTestCase, self).setUp() + self.CONF = self.useFixture(fixture_config.Config()).conf + + self.tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False) + self.poll_cfg = {'sources': [{'name': 'test_source', + 'interval': 600, + 'meters': ['a']}]} + + def tearDown(self): + os.unlink(self.tmp_cfg.name) + super(PollingTestCase, self).tearDown() + + def test_no_name(self): + del self.poll_cfg['sources'][0]['name'] + self.assertRaises(pipeline.PipelineException, + pipeline.PollingManager, + self.CONF, self.cfg2file(self.poll_cfg)) + + def test_no_interval(self): + del self.poll_cfg['sources'][0]['interval'] + self.assertRaises(pipeline.PipelineException, + pipeline.PollingManager, + self.CONF, self.cfg2file(self.poll_cfg)) + + def test_invalid_string_interval(self): + self.poll_cfg['sources'][0]['interval'] = 'string' + self.assertRaises(pipeline.PipelineException, + pipeline.PollingManager, + self.CONF, self.cfg2file(self.poll_cfg)) + + def test_get_interval(self): + poll_manager = pipeline.PollingManager( + self.CONF, self.cfg2file(self.poll_cfg)) + source = poll_manager.sources[0] + self.assertEqual(600, source.get_interval()) + + def test_invalid_resources(self): + self.poll_cfg['sources'][0]['resources'] = {'invalid': 1} + self.assertRaises(pipeline.PipelineException, + pipeline.PollingManager, + self.CONF, self.cfg2file(self.poll_cfg)) + + def test_resources(self): + resources = ['test1://', 'test2://'] + self.poll_cfg['sources'][0]['resources'] = resources + poll_manager = pipeline.PollingManager( + self.CONF, self.cfg2file(self.poll_cfg)) + self.assertEqual(resources, poll_manager.sources[0].resources) + + def test_no_resources(self): + poll_manager = pipeline.PollingManager( + self.CONF, self.cfg2file(self.poll_cfg)) + self.assertEqual(0, len(poll_manager.sources[0].resources)) + + def test_check_meters_include_exclude_same(self): + self.poll_cfg['sources'][0]['meters'] = ['a', '!a'] + self.assertRaises(pipeline.PipelineException, + pipeline.PollingManager, + self.CONF, self.cfg2file(self.poll_cfg)) + + def test_check_meters_include_exclude(self): + self.poll_cfg['sources'][0]['meters'] = ['a', '!b'] + self.assertRaises(pipeline.PipelineException, + pipeline.PollingManager, + self.CONF, self.cfg2file(self.poll_cfg)) + + def test_check_meters_wildcard_included(self): + self.poll_cfg['sources'][0]['meters'] = ['a', '*'] + self.assertRaises(pipeline.PipelineException, + pipeline.PollingManager, + self.CONF, self.cfg2file(self.poll_cfg)) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index a0b13999..dbeb60c1 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -301,13 +301,13 @@ function configure_ceilometer { # with rootwrap installation done elsewhere and also clobber # ceilometer.conf settings that have already been made. # Anyway, explicit is better than implicit. - for conffile in policy.json api_paste.ini pipeline.yaml \ + for conffile in policy.json api_paste.ini pipeline.yaml polling.yaml \ event_definitions.yaml event_pipeline.yaml; do cp $CEILOMETER_DIR/etc/ceilometer/$conffile $CEILOMETER_CONF_DIR done if [ "$CEILOMETER_PIPELINE_INTERVAL" ]; then - sed -i "s/interval:.*/interval: ${CEILOMETER_PIPELINE_INTERVAL}/" $CEILOMETER_CONF_DIR/pipeline.yaml + sed -i "s/interval:.*/interval: ${CEILOMETER_PIPELINE_INTERVAL}/" $CEILOMETER_CONF_DIR/polling.yaml fi if [ "$CEILOMETER_EVENT_ALARM" == "True" ]; then if ! grep -q '^ *- notifier://?topic=alarm.all$' $CEILOMETER_CONF_DIR/event_pipeline.yaml; then diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml index a5bd5148..59e6f8fc 100644 --- a/etc/ceilometer/pipeline.yaml +++ b/etc/ceilometer/pipeline.yaml @@ -1,20 +1,17 @@ --- sources: - name: meter_source - interval: 600 meters: - "*" sinks: - meter_sink - name: cpu_source - interval: 600 meters: - "cpu" sinks: - cpu_sink - cpu_delta_sink - name: disk_source - interval: 600 meters: - "disk.read.bytes" - "disk.read.requests" @@ -27,7 +24,6 @@ sources: sinks: - disk_sink - name: network_source - interval: 600 meters: - "network.incoming.bytes" - "network.incoming.packets" diff --git a/etc/ceilometer/polling.yaml b/etc/ceilometer/polling.yaml new file mode 100644 index 00000000..518edbe9 --- /dev/null +++ b/etc/ceilometer/polling.yaml @@ -0,0 +1,6 @@ +--- +sources: + - name: all_pollsters + interval: 600 + meters: + - "*" diff --git a/releasenotes/notes/polling-definition-efffb92e3810e571.yaml b/releasenotes/notes/polling-definition-efffb92e3810e571.yaml new file mode 100644 index 00000000..f47e10f4 --- /dev/null +++ b/releasenotes/notes/polling-definition-efffb92e3810e571.yaml @@ -0,0 +1,10 @@ +--- +upgrade: + - Pipeline processing in polling agents was removed in Liberty cycle. A new + polling specific definition file is created to handle polling functionality + and pipeline definition file is now reserved exclusively for + transformations and routing. The polling.yaml file follows the same syntax + as the pipeline.yaml but only handles polling attributes such as interval, + discovery, resources, meter matching. It is configured by setting cfg_file + under the polling section.If no polling definition file is found, it will + fallback to reuse pipeline_cfg_file.