polling definition file

add support for polling specific definition file.
this splits the existing polling specific options out of
pipeline.yaml as transformations only exists on notifcation agent
and polling interval/discovery only exists on polling agents.

backward compatibility is maintained so pipeline.yaml file from
previous releases can still be passed in as polling definition file.

Change-Id: I206566349f98d6b17336cd5ea36ceb1e304dd90c
This commit is contained in:
gord chung 2016-11-30 16:53:08 +00:00 committed by gordon chung
parent f786989976
commit 8999528092
10 changed files with 328 additions and 253 deletions

View File

@ -57,6 +57,10 @@ OPTS = [
]
POLLING_OPTS = [
cfg.StrOpt('cfg_file',
default="polling.yaml",
help="Configuration file for pipeline definition."
),
cfg.StrOpt('partitioning_group_prefix',
deprecated_group='central',
help='Work-load partitioning group prefix. Use only if you '
@ -520,6 +524,7 @@ class AgentManager(service_base.PipelineBasedService):
self.polling_periodics.wait()
self.polling_periodics = None
# FIXME(gordc): refactor pipeline dependency out of polling agent.
def reload_pipeline(self):
if self.pipeline_validated:
LOG.info(_LI("Reconfiguring polling tasks."))

View File

@ -63,13 +63,24 @@ OPTS = [
LOG = log.getLogger(__name__)
class PipelineException(Exception):
def __init__(self, message, pipeline_cfg):
class ConfigException(Exception):
def __init__(self, cfg_type, message, cfg):
self.cfg_type = cfg_type
self.msg = message
self.pipeline_cfg = pipeline_cfg
self.cfg = cfg
def __str__(self):
return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
return '%s %s: %s' % (self.cfg_type, self.cfg, self.msg)
class PollingException(ConfigException):
def __init__(self, message, cfg):
super(PollingException, self).__init__('Polling', message, cfg)
class PipelineException(ConfigException):
def __init__(self, message, cfg):
super(PipelineException, self).__init__('Pipeline', message, cfg)
@six.add_metaclass(abc.ABCMeta)
@ -219,14 +230,12 @@ class PublishContext(object):
class Source(object):
"""Represents a source of samples or events."""
"""Represents a generic source"""
def __init__(self, cfg):
self.cfg = cfg
try:
self.name = cfg['name']
self.sinks = cfg.get('sinks')
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@ -234,17 +243,6 @@ class Source(object):
def __str__(self):
return self.name
def check_sinks(self, sinks):
if not self.sinks:
raise PipelineException(
"No sink defined in source %s" % self,
self.cfg)
for sink in self.sinks:
if sink not in sinks:
raise PipelineException(
"Dangling sink %s from source %s" % (sink, self),
self.cfg)
def check_source_filtering(self, data, d_type):
"""Source data rules checking
@ -282,7 +280,30 @@ class Source(object):
return all(datapoint.startswith('!') for datapoint in dataset)
class EventSource(Source):
class PipelineSource(Source):
"""Represents a source of samples or events."""
def __init__(self, cfg):
super(PipelineSource, self).__init__(cfg)
try:
self.sinks = cfg['sinks']
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
def check_sinks(self, sinks):
if not self.sinks:
raise PipelineException(
"No sink defined in source %s" % self,
self.cfg)
for sink in self.sinks:
if sink not in sinks:
raise PipelineException(
"Dangling sink %s from source %s" % (sink, self),
self.cfg)
class EventSource(PipelineSource):
"""Represents a source of events.
In effect it is a set of notification handlers capturing events for a set
@ -298,13 +319,12 @@ class EventSource(Source):
return self.is_supported(self.events, event_name)
class SampleSource(Source):
class SampleSource(PipelineSource):
"""Represents a source of samples.
In effect it is a set of pollsters and/or notification handlers emitting
In effect it is a set of notification handlers processing
samples for a set of matching meters. Each source encapsulates meter name
matching, polling interval determination, optional resource enumeration or
discovery, and mapping to one or more sinks for publication.
matching and mapping to one or more sinks for publication.
"""
def __init__(self, cfg):
@ -313,10 +333,33 @@ class SampleSource(Source):
self.meters = cfg['meters']
except KeyError:
raise PipelineException("Missing meters value", cfg)
self.check_source_filtering(self.meters, 'meters')
def support_meter(self, meter_name):
return self.is_supported(self.meters, meter_name)
class PollingSource(Source):
"""Represents a source of pollsters
In effect it is a set of pollsters emitting
samples for a set of matching meters. Each source encapsulates meter name
matching, polling interval determination, optional resource enumeration or
discovery.
"""
def __init__(self, cfg):
super(PollingSource, self).__init__(cfg)
try:
self.interval = int(cfg.get('interval', 600))
self.meters = cfg['meters']
except KeyError:
raise PipelineException("Missing meters value", cfg)
try:
self.interval = int(cfg['interval'])
except ValueError:
raise PipelineException("Invalid interval value", cfg)
except KeyError:
raise PipelineException("Missing interval value", cfg)
if self.interval <= 0:
raise PipelineException("Interval value should > 0", cfg)
@ -560,17 +603,6 @@ class EventPipeline(Pipeline):
class SamplePipeline(Pipeline):
"""Represents a pipeline for Samples."""
def get_interval(self):
return self.source.interval
@property
def resources(self):
return self.source.resources
@property
def discovery(self):
return self.source.discovery
def support_meter(self, meter_name):
return self.source.support_meter(meter_name)
@ -692,9 +724,6 @@ class PipelineManager(ConfigManagerBase):
"""Pipeline Manager
Pipeline manager sets up pipelines according to config file
Usually only one pipeline manager exists in the system.
"""
def __init__(self, conf, cfg_file, transformer_manager,
@ -705,7 +734,7 @@ class PipelineManager(ConfigManagerBase):
Decoupled: the source and sink configuration are separately
specified before being linked together. This allows source-
specific configuration, such as resource discovery, to be
specific configuration, such as meter handling, to be
kept focused only on the fine-grained source while avoiding
the necessity for wide duplication of sink-related config.
@ -713,13 +742,10 @@ class PipelineManager(ConfigManagerBase):
of dictionaries defining sources and sinks, for example:
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
"sinks" : ["sink_1", "sink_2"]
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
"sinks" : ["sink_2"]
},
@ -740,26 +766,15 @@ class PipelineManager(ConfigManagerBase):
]
}
The interval determines the cadence of sample injection into
the pipeline where samples are produced under the direct control
of an agent, i.e. via a polling cycle as opposed to incoming
notifications.
Valid meter format is '*', '!meter_name', or 'meter_name'.
'*' is wildcard symbol means any meters; '!meter_name' means
"meter_name" will be excluded; 'meter_name' means 'meter_name'
will be included.
The 'meter_name" is Sample name field.
Valid meters definition is all "included meter names", all
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
The resources is list of URI indicating the resources from where
the meters should be polled. It's optional and it's up to the
specific pollster to decide how to use it.
Transformer's name is plugin name in setup.cfg.
Publisher's name is plugin name in setup.cfg
@ -830,26 +845,48 @@ class PollingManager(ConfigManagerBase):
def __init__(self, conf, cfg_file):
"""Setup the polling according to config.
The configuration is the sources half of the Pipeline Config.
The configuration is supported as follows:
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
},
]}
}
The interval determines the cadence of sample polling
Valid meter format is '*', '!meter_name', or 'meter_name'.
'*' is wildcard symbol means any meters; '!meter_name' means
"meter_name" will be excluded; 'meter_name' means 'meter_name'
will be included.
Valid meters definition is all "included meter names", all
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
The resources is list of URI indicating the resources from where
the meters should be polled. It's optional and it's up to the
specific pollster to decide how to use it.
"""
super(PollingManager, self).__init__(conf)
cfg = self.load_config(cfg_file)
try:
cfg = self.load_config(cfg_file)
except (TypeError, IOError):
LOG.warning(_LW('Unable to locate polling configuration, falling '
'back to pipeline configuration.'))
cfg = self.load_config(conf.pipeline_cfg_file)
self.sources = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_LI('detected decoupled pipeline config format'))
unique_names = set()
if 'sources' not in cfg:
raise PollingException("sources required", cfg)
for s in cfg.get('sources'):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
self.sources.append(SampleSource(s))
unique_names.clear()
self.sources.append(PollingSource(s))
def setup_event_pipeline(conf, transformer_manager=None):
@ -870,7 +907,7 @@ def setup_pipeline(conf, transformer_manager=None):
def setup_polling(conf):
"""Setup polling manager according to yaml config file."""
cfg_file = conf.pipeline_cfg_file
cfg_file = conf.polling.cfg_file
return PollingManager(conf, cfg_file)

View File

@ -19,18 +19,18 @@
import abc
import copy
import datetime
import os
import tempfile
import time
import mock
from oslo_config import fixture as fixture_config
from oslotest import mockpatch
import six
from stevedore import extension
import yaml
from ceilometer.agent import plugin_base
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
from ceilometer import sample
from ceilometer.tests import base
from ceilometer import utils
@ -195,7 +195,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def setup_polling(self):
self.mgr.polling_manager = pipeline.PollingManager(
self.CONF, self.cfg2file(self.pipeline_cfg))
self.CONF, self.cfg2file(self.polling_cfg))
def create_extension_list(self):
return [extension.Extension('test',
@ -250,8 +250,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
super(BaseAgentManagerTestCase, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.set_override(
'pipeline_cfg_file',
self.path_get('etc/ceilometer/pipeline.yaml')
'cfg_file',
self.path_get('etc/ceilometer/polling.yaml'), group='polling'
)
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.CONF(args=[])
@ -262,27 +262,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
p_coord = self.mgr.partition_coordinator
p_coord.extract_my_subset.side_effect = fake_subset
self.mgr.tg = mock.MagicMock()
self.pipeline_cfg = {
self.polling_cfg = {
'sources': [{
'name': 'test_pipeline',
'name': 'test_polling',
'interval': 60,
'meters': ['test'],
'resources': ['test://'],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
'resources': ['test://']}]
}
self.setup_polling()
self.useFixture(mockpatch.PatchObject(
publisher, 'get_publisher', side_effect=self.get_publisher))
def get_publisher(self, url, namespace=''):
fake_drivers = {'test://': test_publisher.TestPublisher,
'new://': test_publisher.TestPublisher,
'rpc://': test_publisher.TestPublisher}
return fake_drivers[url](self.CONF, url)
def tearDown(self):
self.Pollster.samples = []
@ -334,7 +321,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
static_group_ids = [utils.hash_of_set(p['resources'])
for p in self.pipeline_cfg['sources']
for p in self.polling_cfg['sources']
if p['resources']]
expected = [mock.call(self.mgr.construct_group_id(g))
for g in ['another_group', 'global'] + static_group_ids]
@ -348,16 +335,15 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertIn(60, polling_tasks.keys())
per_task_resources = polling_tasks[60].resources
self.assertEqual(1, len(per_task_resources))
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
set(per_task_resources['test_pipeline-test'].get({})))
self.assertEqual(set(self.polling_cfg['sources'][0]['resources']),
set(per_task_resources['test_polling-test'].get({})))
def test_setup_polling_tasks_multiple_interval(self):
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 10,
'meters': ['test'],
'resources': ['test://'],
'sinks': ['test_sink']
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
@ -366,12 +352,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertIn(10, polling_tasks.keys())
def test_setup_polling_tasks_mismatch_counter(self):
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 10,
'meters': ['test_invalid'],
'resources': ['invalid://'],
'sinks': ['test_sink']
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
@ -379,12 +364,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertNotIn(10, polling_tasks.keys())
def test_setup_polling_task_same_interval(self):
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 60,
'meters': ['testanother'],
'resources': ['testanother://'],
'sinks': ['test_sink']
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
@ -393,11 +377,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
key = 'test_pipeline-test'
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
key = 'test_polling-test'
self.assertEqual(set(self.polling_cfg['sources'][0]['resources']),
set(per_task_resources[key].get({})))
key = 'test_pipeline_1-testanother'
self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']),
key = 'test_polling_1-testanother'
self.assertEqual(set(self.polling_cfg['sources'][1]['resources']),
set(per_task_resources[key].get({})))
def test_agent_manager_start(self):
@ -408,12 +392,38 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.addCleanup(mgr.terminate)
mgr.create_polling_task.assert_called_once_with()
def test_agent_manager_start_fallback(self):
pipeline_cfg = {
'sources': [{
'name': 'test_pipeline',
'interval': 60,
'meters': ['test'],
'resources': ['test://'],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False)
tmp_cfg.write(yaml.safe_dump(pipeline_cfg))
tmp_cfg.close()
self.CONF.set_override('pipeline_cfg_file', tmp_cfg.name)
self.CONF.set_override('cfg_file', None, group='polling')
mgr = self.create_manager()
mgr.extensions = self.mgr.extensions
mgr.create_polling_task = mock.MagicMock()
mgr.run()
self.addCleanup(mgr.terminate)
self.addCleanup(os.unlink, tmp_cfg.name)
mgr.create_polling_task.assert_called_once_with()
def test_manager_exception_persistency(self):
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 60,
'meters': ['testanother'],
'sinks': ['test_sink']
})
self.setup_polling()
@ -430,13 +440,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
if static_resources:
# just so we can test that static + pre_pipeline amalgamated
# just so we can test that static + pre_polling amalgamated
# override per_pollster
self.pipeline_cfg['sources'][0]['discovery'] = [
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.polling_cfg['sources'][0]['resources'] = static_resources
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -456,7 +466,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_pollster_discovery_overridden_by_per_pipeline_discovery(self):
def test_per_pollster_discovery_overridden_by_per_polling_discovery(self):
# ensure static+per_source_discovery overrides per_pollster_discovery
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
@ -477,8 +487,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.PollsterAnother.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.pipeline_cfg['sources'][0]['meters'].append('testanother')
self.pipeline_cfg['sources'][0]['resources'] = []
self.polling_cfg['sources'][0]['meters'].append('testanother')
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -486,17 +496,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(discovered_resources, self.Pollster.resources)
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
def _do_test_per_pipeline_discovery(self,
discovered_resources,
static_resources):
def _do_test_per_polling_discovery(self, discovered_resources,
static_resources):
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
self.pipeline_cfg['sources'][0]['discovery'] = [
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.polling_cfg['sources'][0]['resources'] = static_resources
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -509,35 +518,33 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
for x in self.Pollster.resources:
self.assertEqual(1, self.Pollster.resources.count(x))
def test_per_pipeline_discovery_discovered_only(self):
self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_polling_discovery_discovered_only(self):
self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_pipeline_discovery_static_only(self):
self._do_test_per_pipeline_discovery([],
['static_1', 'static_2'])
def test_per_polling_discovery_static_only(self):
self._do_test_per_polling_discovery([], ['static_1', 'static_2'])
def test_per_pipeline_discovery_discovered_augmented_by_static(self):
self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_polling_discovery_discovered_augmented_by_static(self):
self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_pipeline_discovery_discovered_duplicated_static(self):
self._do_test_per_pipeline_discovery(['discovered_1', 'pud'],
['dup', 'static_1', 'dup'])
def test_per_polling_discovery_discovered_duplicated_static(self):
self._do_test_per_polling_discovery(['discovered_1', 'pud'],
['dup', 'static_1', 'dup'])
def test_multiple_pipelines_different_static_resources(self):
def test_multiple_pollings_different_static_resources(self):
# assert that the individual lists of static and discovered resources
# for each pipeline with a common interval are passed to individual
# pollsters matching each pipeline
self.pipeline_cfg['sources'][0]['resources'] = ['test://']
self.pipeline_cfg['sources'][0]['discovery'] = ['testdiscovery']
self.pipeline_cfg['sources'].append({
'name': 'another_pipeline',
# for each polling with a common interval are passed to individual
# pollsters matching each polling
self.polling_cfg['sources'][0]['resources'] = ['test://']
self.polling_cfg['sources'][0]['discovery'] = ['testdiscovery']
self.polling_cfg['sources'].append({
'name': 'another_polling',
'interval': 60,
'meters': ['test'],
'resources': ['another://'],
'discovery': ['testdiscoveryanother'],
'sinks': ['test_sink_new']
})
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = ['discovered_1', 'discovered_2']
@ -566,20 +573,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
sources = [{'name': 'test_source_1',
'interval': 60,
'meters': ['test'],
'discovery': ['testdiscovery'],
'sinks': ['test_sink_1']},
'discovery': ['testdiscovery']},
{'name': 'test_source_2',
'interval': 60,
'meters': ['testanother'],
'discovery': ['testdiscoveryanother'],
'sinks': ['test_sink_2']}]
sinks = [{'name': 'test_sink_1',
'transformers': [],
'publishers': ['test://']},
{'name': 'test_sink_2',
'transformers': [],
'publishers': ['test://']}]
self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
'discovery': ['testdiscoveryanother']}]
self.polling_cfg = {'sources': sources}
self.mgr.discoveries = self.create_discoveries()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
@ -593,37 +592,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(['discovered_3', 'discovered_4'],
self.PollsterAnother.resources)
def test_multiple_sinks_same_discoverer(self):
self.Discovery.resources = ['discovered_1', 'discovered_2']
sources = [{'name': 'test_source_1',
'interval': 60,
'meters': ['test'],
'discovery': ['testdiscovery'],
'sinks': ['test_sink_1', 'test_sink_2']}]
sinks = [{'name': 'test_sink_1',
'transformers': [],
'publishers': ['test://']},
{'name': 'test_sink_2',
'transformers': [],
'publishers': ['test://']}]
self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
self.mgr.discoveries = self.create_discoveries()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(1, len(self.Pollster.samples))
self.assertEqual(['discovered_1', 'discovered_2'],
self.Pollster.resources)
def test_discovery_partitioning(self):
self.mgr.discoveries = self.create_discoveries()
p_coord = self.mgr.partition_coordinator
self.pipeline_cfg['sources'][0]['discovery'] = [
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = []
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -640,26 +615,24 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
p_coord = self.mgr.partition_coordinator
static_resources = ['static_1', 'static_2']
static_resources2 = ['static_3', 'static_4']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline2',
self.polling_cfg['sources'][0]['resources'] = static_resources
self.polling_cfg['sources'].append({
'name': 'test_polling2',
'interval': 60,
'meters': ['test', 'test2'],
'resources': static_resources2,
'sinks': ['test_sink']
})
# have one pipeline without static resources defined
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline3',
# have one polling without static resources defined
self.polling_cfg['sources'].append({
'name': 'test_polling3',
'interval': 60,
'meters': ['test', 'test2'],
'resources': [],
'sinks': ['test_sink']
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
# Only two groups need to be created, one for each pipeline,
# Only two groups need to be created, one for each polling,
# even though counter test is used twice
expected = [mock.call(self.mgr.construct_group_id(
utils.hash_of_set(resources)),
@ -678,14 +651,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Polling pollster %(poll)s in the context of %(src)s',
{'poll': 'test', 'src': 'test_pipeline'})
{'poll': 'test', 'src': 'test_polling'})
@mock.patch('ceilometer.agent.manager.LOG')
def test_skip_polling_and_notify_with_no_resources(self, LOG):
self.pipeline_cfg['sources'][0]['resources'] = []
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches['test_pipeline'])[0]
pollster = list(polling_task.pollster_matches['test_polling'])[0]
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Skip pollster %(name)s, no %(p_context)sresources found this '
@ -693,12 +666,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
@mock.patch('ceilometer.agent.manager.LOG')
def test_skip_polling_polled_resources(self, LOG):
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 60,
'meters': ['test'],
'resources': ['test://'],
'sinks': ['test_sink']
})
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]

View File

@ -213,23 +213,10 @@ class BasePipelineTestCase(base.BaseTestCase):
self._unset_pipeline_cfg('name')
self._exception_create_pipelinemanager()
def test_no_interval(self):
self._unset_pipeline_cfg('interval')
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertEqual(600, pipe.get_interval())
def test_no_publishers(self):
self._unset_pipeline_cfg('publishers')
self._exception_create_pipelinemanager()
def test_invalid_resources(self):
invalid_resource = {'invalid': 1}
self._set_pipeline_cfg('resources', invalid_resource)
self._exception_create_pipelinemanager()
def test_check_counters_include_exclude_same(self):
counter_cfg = ['a', '!a']
self._set_pipeline_cfg('meters', counter_cfg)
@ -249,10 +236,6 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher_cfg = ['test_invalid']
self._set_pipeline_cfg('publishers', publisher_cfg)
def test_invalid_string_interval(self):
self._set_pipeline_cfg('interval', 'string')
self._exception_create_pipelinemanager()
def test_check_transformer_invalid_transformer(self):
transformer_cfg = [
{'name': "test_invalid",
@ -261,13 +244,6 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
self._exception_create_pipelinemanager()
def test_get_interval(self):
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertEqual(5, pipe.get_interval())
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(
self.CONF,
@ -1196,21 +1172,6 @@ class BasePipelineTestCase(base.BaseTestCase):
(counters[1],)
)
def test_resources(self):
resources = ['test1://', 'test2://']
self._set_pipeline_cfg('resources', resources)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(resources,
pipeline_manager.pipelines[0].resources)
def test_no_resources(self):
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(0, len(pipeline_manager.pipelines[0].resources))
def _do_test_rate_of_change_mapping(self, pipe, meters, units):
now = timeutils.utcnow()
base = 1000

View File

@ -23,9 +23,7 @@ from ceilometer.tests.unit import pipeline_base
class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _setup_pipeline_cfg(self):
source = {'name': 'test_source',
'interval': 5,
'meters': ['a'],
'resources': [],
'sinks': ['test_sink']}
sink = {'name': 'test_sink',
'transformers': [{'name': 'update', 'parameters': {}}],
@ -35,9 +33,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _augment_pipeline_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'interval': 5,
'meters': ['b'],
'resources': [],
'sinks': ['second_sink']
})
self.pipeline_cfg['sinks'].append({
@ -55,9 +51,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _break_pipeline_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'interval': 5,
'meters': ['b'],
'resources': [],
'sinks': ['second_sink']
})
self.pipeline_cfg['sinks'].append({
@ -75,9 +69,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _dup_pipeline_name_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'test_source',
'interval': 5,
'meters': ['b'],
'resources': [],
'sinks': ['test_sink']
})
@ -106,9 +98,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def test_source_dangling_sink(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'interval': 5,
'meters': ['b'],
'resources': [],
'sinks': ['second_sink']
})
self._exception_create_pipelinemanager()
@ -170,9 +160,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def test_multiple_sources_with_single_sink(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'interval': 5,
'meters': ['b'],
'resources': [],
'sinks': ['test_sink']
})
pipeline_manager = pipeline.PipelineManager(
@ -283,9 +271,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def test_duplicated_source_names(self):
self.pipeline_cfg['sources'].append({
'name': 'test_source',
'interval': 5,
'meters': ['a'],
'resources': [],
'sinks': ['test_sink']
})
self.assertRaises(pipeline.PipelineException,

View File

@ -0,0 +1,102 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import tempfile
from oslo_config import fixture as fixture_config
from oslotest import base
import yaml
from ceilometer import pipeline
class PollingTestCase(base.BaseTestCase):
def cfg2file(self, data):
self.tmp_cfg.write(yaml.safe_dump(data))
self.tmp_cfg.close()
return self.tmp_cfg.name
def setUp(self):
super(PollingTestCase, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False)
self.poll_cfg = {'sources': [{'name': 'test_source',
'interval': 600,
'meters': ['a']}]}
def tearDown(self):
os.unlink(self.tmp_cfg.name)
super(PollingTestCase, self).tearDown()
def test_no_name(self):
del self.poll_cfg['sources'][0]['name']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_no_interval(self):
del self.poll_cfg['sources'][0]['interval']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_invalid_string_interval(self):
self.poll_cfg['sources'][0]['interval'] = 'string'
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_get_interval(self):
poll_manager = pipeline.PollingManager(
self.CONF, self.cfg2file(self.poll_cfg))
source = poll_manager.sources[0]
self.assertEqual(600, source.get_interval())
def test_invalid_resources(self):
self.poll_cfg['sources'][0]['resources'] = {'invalid': 1}
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_resources(self):
resources = ['test1://', 'test2://']
self.poll_cfg['sources'][0]['resources'] = resources
poll_manager = pipeline.PollingManager(
self.CONF, self.cfg2file(self.poll_cfg))
self.assertEqual(resources, poll_manager.sources[0].resources)
def test_no_resources(self):
poll_manager = pipeline.PollingManager(
self.CONF, self.cfg2file(self.poll_cfg))
self.assertEqual(0, len(poll_manager.sources[0].resources))
def test_check_meters_include_exclude_same(self):
self.poll_cfg['sources'][0]['meters'] = ['a', '!a']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_check_meters_include_exclude(self):
self.poll_cfg['sources'][0]['meters'] = ['a', '!b']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_check_meters_wildcard_included(self):
self.poll_cfg['sources'][0]['meters'] = ['a', '*']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))

View File

@ -301,13 +301,13 @@ function configure_ceilometer {
# with rootwrap installation done elsewhere and also clobber
# ceilometer.conf settings that have already been made.
# Anyway, explicit is better than implicit.
for conffile in policy.json api_paste.ini pipeline.yaml \
for conffile in policy.json api_paste.ini pipeline.yaml polling.yaml \
event_definitions.yaml event_pipeline.yaml; do
cp $CEILOMETER_DIR/etc/ceilometer/$conffile $CEILOMETER_CONF_DIR
done
if [ "$CEILOMETER_PIPELINE_INTERVAL" ]; then
sed -i "s/interval:.*/interval: ${CEILOMETER_PIPELINE_INTERVAL}/" $CEILOMETER_CONF_DIR/pipeline.yaml
sed -i "s/interval:.*/interval: ${CEILOMETER_PIPELINE_INTERVAL}/" $CEILOMETER_CONF_DIR/polling.yaml
fi
if [ "$CEILOMETER_EVENT_ALARM" == "True" ]; then
if ! grep -q '^ *- notifier://?topic=alarm.all$' $CEILOMETER_CONF_DIR/event_pipeline.yaml; then

View File

@ -1,20 +1,17 @@
---
sources:
- name: meter_source
interval: 600
meters:
- "*"
sinks:
- meter_sink
- name: cpu_source
interval: 600
meters:
- "cpu"
sinks:
- cpu_sink
- cpu_delta_sink
- name: disk_source
interval: 600
meters:
- "disk.read.bytes"
- "disk.read.requests"
@ -27,7 +24,6 @@ sources:
sinks:
- disk_sink
- name: network_source
interval: 600
meters:
- "network.incoming.bytes"
- "network.incoming.packets"

View File

@ -0,0 +1,6 @@
---
sources:
- name: all_pollsters
interval: 600
meters:
- "*"

View File

@ -0,0 +1,10 @@
---
upgrade:
- Pipeline processing in polling agents was removed in Liberty cycle. A new
polling specific definition file is created to handle polling functionality
and pipeline definition file is now reserved exclusively for
transformations and routing. The polling.yaml file follows the same syntax
as the pipeline.yaml but only handles polling attributes such as interval,
discovery, resources, meter matching. It is configured by setting cfg_file
under the polling section.If no polling definition file is found, it will
fallback to reuse pipeline_cfg_file.