Pollsters now send notifications without doing transforms

This makes the polling agents not use pipelines. Instead it simply sends
notifications for the notification agent to pick up and transform if
the pipeline.yaml says it should.

Inside the AgentManager and the PollingTask the data representation
is adjusted somewhat. Rather than making a single task for any given
interval, we make a single task for any name in the "sources" list.
This ought to mean (given that the interval is the same across
various sources in the default config) that we will get some I/Ox
interleaving.

At the moment all samples gathered by one pollng task are sent as an
individual notification. This is being done to minimize the apparent
surface area of this change. The expected long term change is for
single samples to be sent so as to increase granularity and
I/O interleaving.

The unit tests have been updated to reflect the new data
representation. The agent tests are fairly strongly oriented towards
testing that discovery and resource handling behave correctly. Some
additions have been made to make sure that samples traverse a fake
messaging bus as expected. Coverage of the ceilometer/agent/base has
increased from 98 to 99%. Additional functional testing should be
implemented when we have established the infrastructure for such
things.

Implements blueprint pollsters-no-transform

DocImpact

Change-Id: I25c22077e80509799713571dfd79c87fe21c8677
This commit is contained in:
Chris Dent 2015-07-06 19:17:55 +00:00
parent a2e7fce7a7
commit 5b9e5d8e7a
5 changed files with 325 additions and 346 deletions

View File

@ -26,7 +26,7 @@ import random
from oslo_config import cfg
from oslo_context import context
from oslo_log import log
import six
import oslo_messaging
from six import moves
from six.moves.urllib import parse as urlparse
from stevedore import extension
@ -34,7 +34,9 @@ from stevedore import extension
from ceilometer.agent import plugin_base
from ceilometer import coordination
from ceilometer.i18n import _, _LI
from ceilometer import pipeline as publish_pipeline
from ceilometer import messaging
from ceilometer import pipeline
from ceilometer.publisher import utils as publisher_utils
from ceilometer import service_base
from ceilometer import utils
@ -49,6 +51,8 @@ OPTS = [
]
cfg.CONF.register_opts(OPTS)
cfg.CONF.import_opt('telemetry_driver', 'ceilometer.publisher.messaging',
group='publisher_notifier')
class PollsterListForbidden(Exception):
@ -68,9 +72,9 @@ class Resources(object):
self.blacklist = []
self.last_dup = []
def setup(self, pipeline):
self._resources = pipeline.resources
self._discovery = pipeline.discovery
def setup(self, source):
self._resources = source.resources
self._discovery = source.discovery
def get(self, discovery_cache=None):
source_discovery = (self.agent_manager.discover(self._discovery,
@ -91,7 +95,7 @@ class Resources(object):
class PollingTask(object):
"""Polling task for polling samples and inject into pipeline.
"""Polling task for polling samples and notifying.
A polling task can be invoked periodically or only once.
"""
@ -103,92 +107,94 @@ class PollingTask(object):
# with a common interval
self.pollster_matches = collections.defaultdict(set)
# per-sink publisher contexts associated with each source
self.publishers = {}
# we relate the static resources and per-source discovery to
# each combination of pollster and matching source
resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory)
def add(self, pollster, pipeline):
if pipeline.source.name not in self.publishers:
publish_context = publish_pipeline.PublishContext(
self.manager.context)
self.publishers[pipeline.source.name] = publish_context
self.publishers[pipeline.source.name].add_pipelines([pipeline])
self.pollster_matches[pipeline.source.name].add(pollster)
key = Resources.key(pipeline.source.name, pollster)
self.resources[key].setup(pipeline)
def add(self, pollster, source):
self.pollster_matches[source.name].add(pollster)
key = Resources.key(source.name, pollster)
self.resources[key].setup(source)
def poll_and_publish(self):
"""Polling sample and publish into pipeline."""
def poll_and_notify(self):
"""Polling sample and notify."""
cache = {}
discovery_cache = {}
for source_name in self.pollster_matches:
with self.publishers[source_name] as publisher:
for pollster in self.pollster_matches[source_name]:
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
for pollster in self.pollster_matches[source_name]:
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
seen = []
duplicated = []
polling_resources = []
black_res = self.resources[key].blacklist
for x in candidate_res:
if x not in seen:
seen.append(x)
if x not in black_res:
polling_resources.append(x)
else:
duplicated.append(x)
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
seen = []
duplicated = []
polling_resources = []
black_res = self.resources[key].blacklist
for x in candidate_res:
if x not in seen:
seen.append(x)
if x not in black_res:
polling_resources.append(x)
else:
duplicated.append(x)
# Warn duplicated resources for the 1st time
if self.resources[key].last_dup != duplicated:
self.resources[key].last_dup = duplicated
LOG.warning(_(
'Found following duplicated resoures for '
'%(name)s in context of %(source)s:%(list)s. '
'Check pipeline configuration.')
% ({'name': pollster.name,
'source': source_name,
'list': duplicated
}))
# Warn duplicated resources for the 1st time
if self.resources[key].last_dup != duplicated:
self.resources[key].last_dup = duplicated
LOG.warning(_(
'Found following duplicated resoures for '
'%(name)s in context of %(source)s:%(list)s. '
'Check pipeline configuration.')
% ({'name': pollster.name,
'source': source_name,
'list': duplicated
}))
# If no resources, skip for this pollster
if not polling_resources:
LOG.info(_("Skip polling pollster %s, no resources"
" found"), pollster.name)
continue
# If no resources, skip for this pollster
if not polling_resources:
LOG.info(_("Skip polling pollster %s, no resources"
" found"), pollster.name)
continue
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
))
publisher(samples)
except plugin_base.PollsterPermanentError as err:
LOG.error(_(
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
self.resources[key].blacklist.append(err.fail_res)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
try:
samples = pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
)
sample_messages = []
for sample in samples:
sample_dict = (
publisher_utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret
))
sample_messages.append(sample_dict)
self.manager.notifier.info(
self.manager.context.to_dict(),
'telemetry.api',
sample_messages
)
except plugin_base.PollsterPermanentError as err:
LOG.error(_(
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
self.resources[key].blacklist.append(err.fail_res)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
class AgentManager(service_base.BaseService):
@ -230,6 +236,11 @@ class AgentManager(service_base.BaseService):
self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
if group_prefix else namespace_prefix)
self.notifier = oslo_messaging.Notifier(
messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id="ceilometer.api")
@staticmethod
def _extensions(category, agent_ns=None):
namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
@ -261,7 +272,7 @@ class AgentManager(service_base.BaseService):
# let each set of statically-defined resources have its own group
static_resource_groups = set([
self.construct_group_id(utils.hash_of_set(p.resources))
for p in self.pipeline_manager.pipelines
for p in self.polling_manager.sources
if p.resources
])
self.groups.update(static_resource_groups)
@ -274,14 +285,18 @@ class AgentManager(service_base.BaseService):
def setup_polling_tasks(self):
polling_tasks = {}
for pipeline in self.pipeline_manager.pipelines:
for source in self.polling_manager.sources:
polling_task = None
for pollster in self.extensions:
if pipeline.support_meter(pollster.name):
polling_task = polling_tasks.get(pipeline.get_interval())
if source.support_meter(pollster.name):
if not polling_task:
polling_task = self.create_polling_task()
polling_tasks[pipeline.get_interval()] = polling_task
polling_task.add(pollster, pipeline)
polling_task.add(pollster, source)
if polling_task:
polling_tasks[source.name] = {
'task': polling_task,
'interval': source.get_interval()
}
return polling_tasks
@ -299,7 +314,10 @@ class AgentManager(service_base.BaseService):
0, cfg.CONF.shuffle_time_before_polling_task)
pollster_timers = []
for interval, task in six.iteritems(self.setup_polling_tasks()):
data = self.setup_polling_tasks()
for name, polling_task in data.items():
interval = polling_task['interval']
task = polling_task['task']
delay_time = (interval + delay_polling_time if delay_start
else delay_polling_time)
pollster_timers.append(self.tg.add_timer(interval,
@ -312,7 +330,7 @@ class AgentManager(service_base.BaseService):
return pollster_timers
def start(self):
self.pipeline_manager = publish_pipeline.setup_pipeline()
self.polling_manager = pipeline.setup_polling()
self.partition_coordinator.start()
self.join_partitioning_groups()
@ -328,7 +346,7 @@ class AgentManager(service_base.BaseService):
@staticmethod
def interval_task(task):
task.poll_and_publish()
task.poll_and_notify()
@staticmethod
def _parse_discoverer(url):

View File

@ -311,6 +311,9 @@ class SampleSource(Source):
raise PipelineException("Discovery should be a list", cfg)
self.check_source_filtering(self.meters, 'meters')
def get_interval(self):
return self.interval
# (yjiang5) To support meters like instance:m1.tiny,
# which include variable part at the end starting with ':'.
# Hope we will not add such meters in future.
@ -704,6 +707,35 @@ class PipelineManager(object):
return PublishContext(context, self.pipelines)
class PollingManager(object):
"""Polling Manager
Polling manager sets up polling according to config file.
"""
def __init__(self, cfg):
"""Setup the polling according to config.
The configuration is the sources half of the Pipeline Config.
"""
self.sources = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_('detected decoupled pipeline config format'))
unique_names = set()
for s in cfg.get('sources', []):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
self.sources.append(SampleSource(s))
unique_names.clear()
def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
if not os.path.exists(cfg_file):
cfg_file = cfg.CONF.find_file(cfg_file)
@ -723,6 +755,21 @@ def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
), p_type)
def _setup_polling_manager(cfg_file):
if not os.path.exists(cfg_file):
cfg_file = cfg.CONF.find_file(cfg_file)
LOG.debug(_("Polling config file: %s"), cfg_file)
with open(cfg_file) as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
LOG.info(_("Pipeline config: %s"), pipeline_cfg)
return PollingManager(pipeline_cfg)
def setup_event_pipeline(transformer_manager=None):
"""Setup event pipeline manager according to yaml config file."""
cfg_file = cfg.CONF.event_pipeline_cfg_file
@ -762,3 +809,9 @@ def get_pipeline_hash(p_type=SAMPLE_TYPE):
file_hash = hashlib.md5(data).hexdigest()
return file_hash
def setup_polling():
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return _setup_polling_manager(cfg_file)

View File

@ -54,7 +54,12 @@ class BaseService(os_service.Service):
LOG.info(_LI("Detected change in pipeline configuration."))
try:
self.pipeline_manager = pipeline.setup_pipeline()
# Pipeline in the notification agent.
if hasattr(self, 'pipeline_manager'):
self.pipeline_manager = pipeline.setup_pipeline()
# Polling in the polling agent.
elif hasattr(self, 'polling_manager'):
self.polling_manager = pipeline.setup_polling()
LOG.debug(_("Pipeline has been refreshed. "
"old hash: %(old)s, new hash: %(new)s") %
({'old': self.pipeline_hash,

View File

@ -24,18 +24,12 @@
import abc
import copy
import datetime
import shutil
import eventlet
import mock
from oslo_config import fixture as fixture_config
from oslo_service import service as os_service
from oslo_utils import fileutils
from oslo_utils import timeutils
from oslotest import mockpatch
import six
from stevedore import extension
import yaml
from ceilometer.agent import plugin_base
from ceilometer import pipeline
@ -177,13 +171,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
class DiscoveryException(TestDiscoveryException):
params = []
def setup_pipeline(self):
self.transformer_manager = extension.ExtensionManager(
'ceilometer.transformer',
)
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
def setup_polling(self):
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
def create_extension_list(self):
return [extension.Extension('test',
@ -228,7 +217,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def create_manager(self):
"""Return subclass specific manager."""
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch('ceilometer.pipeline.setup_polling', mock.MagicMock())
def setUp(self):
super(BaseAgentManagerTestCase, self).setUp()
self.mgr = self.create_manager()
@ -250,7 +239,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.setup_pipeline()
self.setup_polling()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.set_override(
'pipeline_cfg_file',
@ -286,21 +275,23 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.DiscoveryAnother.resources = []
super(BaseAgentManagerTestCase, self).tearDown()
@mock.patch('ceilometer.pipeline.setup_pipeline')
def test_start(self, setup_pipeline):
@mock.patch('ceilometer.pipeline.setup_polling')
def test_start(self, setup_polling):
self.mgr.join_partitioning_groups = mock.MagicMock()
self.mgr.setup_polling_tasks = mock.MagicMock()
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.mgr.start()
setup_pipeline.assert_called_once_with()
setup_polling.assert_called_once_with()
self.mgr.partition_coordinator.start.assert_called_once_with()
self.mgr.join_partitioning_groups.assert_called_once_with()
self.mgr.setup_polling_tasks.assert_called_once_with()
timer_call = mock.call(1.0, self.mgr.partition_coordinator.heartbeat)
self.assertEqual([timer_call], self.mgr.tg.add_timer.call_args_list)
self.mgr.stop()
self.mgr.partition_coordinator.stop.assert_called_once_with()
@mock.patch('ceilometer.pipeline.setup_pipeline')
def test_start_with_pipeline_poller(self, setup_pipeline):
@mock.patch('ceilometer.pipeline.setup_polling')
def test_start_with_pipeline_poller(self, setup_polling):
self.mgr.join_partitioning_groups = mock.MagicMock()
self.mgr.setup_polling_tasks = mock.MagicMock()
@ -308,7 +299,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 5)
self.mgr.start()
setup_pipeline.assert_called_once_with()
setup_polling.assert_called_once_with()
self.mgr.partition_coordinator.start.assert_called_once_with()
self.mgr.join_partitioning_groups.assert_called_once_with()
self.mgr.setup_polling_tasks.assert_called_once_with()
@ -317,84 +308,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual([timer_call, pipeline_poller_call],
self.mgr.tg.add_timer.call_args_list)
def test_start_with_reloadable_pipeline(self):
def setup_pipeline_file(pipeline):
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
return pipeline_cfg_file
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 2)
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 1,
'meters': ['test'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
})
pipeline_cfg_file = setup_pipeline_file(pipeline)
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start()
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(pub.samples) >= self.expected_samples:
break
eventlet.sleep(0)
del pub.samples[0].resource_metadata['resources']
self.assertEqual(self.Pollster.test_data, pub.samples[0])
# Flush publisher samples to test reloading
pub.samples = []
# Modify the collection targets
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 1,
'meters': ['testanother'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
})
updated_pipeline_cfg_file = setup_pipeline_file(pipeline)
# Move/re-name the updated pipeline file to the original pipeline
# file path as recorded in oslo config
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
# Random sleep to let the pipeline poller complete the reloading
eventlet.sleep(3)
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(pub.samples) >= self.expected_samples:
break
eventlet.sleep(0)
del pub.samples[0].resource_metadata['resources']
self.assertEqual(self.PollsterAnother.test_data, pub.samples[0])
def test_join_partitioning_groups(self):
self.mgr.discovery_manager = self.create_discovery_manager()
self.mgr.join_partitioning_groups()
@ -411,16 +324,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
per_task_resources = polling_tasks[60].resources
self.assertTrue('test_pipeline' in polling_tasks.keys())
per_task_resources = polling_tasks['test_pipeline']['task'].resources
self.assertEqual(1, len(per_task_resources))
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
set(per_task_resources['test_pipeline-test'].get({})))
task = list(polling_tasks.values())[0]
self.mgr.interval_task(task)
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
del pub.samples[0].resource_metadata['resources']
self.assertEqual(self.Pollster.test_data, pub.samples[0])
def test_setup_polling_tasks_multiple_interval(self):
self.pipeline_cfg['sources'].append({
@ -430,11 +338,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']
})
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(2, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.assertTrue(10 in polling_tasks.keys())
self.assertTrue('test_pipeline' in polling_tasks.keys())
self.assertTrue('test_pipeline_1' in polling_tasks.keys())
def test_setup_polling_tasks_mismatch_counter(self):
self.pipeline_cfg['sources'].append({
@ -446,58 +354,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
def test_setup_polling_task_same_interval(self):
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
'interval': 60,
'meters': ['testanother'],
'resources': ['testanother://'] if self.source_resources else [],
'sinks': ['test_sink']
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
pollsters = polling_tasks.get(60).pollster_matches
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
key = 'test_pipeline-test'
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
set(per_task_resources[key].get({})))
key = 'test_pipeline_1-testanother'
self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']),
set(per_task_resources[key].get({})))
def test_interval_exception_isolation(self):
self.pipeline_cfg = {
'sources': [{
'name': 'test_pipeline_1',
'interval': 10,
'meters': ['testexceptionanother'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']},
{'name': 'test_pipeline_2',
'interval': 10,
'meters': ['testexception'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks.keys()))
polling_tasks.get(10)
self.mgr.interval_task(polling_tasks.get(10))
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(pub.samples))
self.assertTrue('test_pipeline' in polling_tasks.keys())
self.assertFalse('test_pipeline_1' in polling_tasks.keys())
def test_agent_manager_start(self):
mgr = self.create_manager()
@ -514,7 +372,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'meters': ['testanother'],
'sinks': ['test_sink']
})
self.setup_pipeline()
self.setup_polling()
def _verify_discovery_params(self, expected):
self.assertEqual(expected, self.Discovery.params)
@ -536,9 +394,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
if static_resources:
self.assertEqual(set(static_resources +
self.DiscoveryAnother.resources),
@ -578,9 +436,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.Discovery.resources = discovered_resources
self.pipeline_cfg['sources'][0]['meters'].append('testanother')
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
self.assertEqual(1, len(self.Discovery.params))
self.assertEqual(discovered_resources, self.Pollster.resources)
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
@ -596,9 +454,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
discovery = self.Discovery.resources + self.DiscoveryAnother.resources
# compare resource lists modulo ordering
self.assertEqual(set(static_resources + discovery),
@ -638,19 +496,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'discovery': ['testdiscoveryanother'],
'sinks': ['test_sink_new']
})
self.pipeline_cfg['sinks'].append({
'name': "test_sink_new",
'transformers': [],
'publishers': ["new"],
})
self.mgr.discovery_manager = self.create_discovery_manager()
self.Discovery.resources = ['discovered_1', 'discovered_2']
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(2, len(polling_tasks))
self.assertTrue('another_pipeline' in polling_tasks.keys())
self.assertTrue('test_pipeline' in polling_tasks.keys())
self.mgr.interval_task(polling_tasks['another_pipeline']['task'])
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
self.assertEqual([None], self.Discovery.params)
self.assertEqual([None], self.DiscoveryAnother.params)
self.assertEqual(2, len(self.Pollster.samples))
@ -663,23 +518,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(test_resources, samples[1][1])
else:
self.fail('unexpected sample resources %s' % samples)
all_resources = set(test_resources)
all_resources.update(another_resources)
expected_pipelines = {'test://': 'test_pipeline:test_sink',
'another://': 'another_pipeline:test_sink_new'}
sunk_resources = []
for pipe_line in self.mgr.pipeline_manager.pipelines:
self.assertEqual(1, len(pipe_line.publishers[0].samples))
published = pipe_line.publishers[0].samples[0]
published_resources = published.resource_metadata['resources']
self.assertEqual(3, len(published_resources))
self.assertTrue(published_resources[0] in expected_pipelines)
self.assertEqual(expected_pipelines[published_resources[0]],
pipe_line.name)
for published_resource in published_resources:
self.assertTrue(published_resource in all_resources)
sunk_resources.extend(published_resources)
self.assertEqual(all_resources, set(sunk_resources))
def test_multiple_sources_different_discoverers(self):
self.Discovery.resources = ['discovered_1', 'discovered_2']
@ -702,11 +540,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'publishers': ['test://']}]
self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
self.mgr.discovery_manager = self.create_discovery_manager()
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(2, len(polling_tasks))
self.assertTrue('test_source_1' in polling_tasks.keys())
self.assertTrue('test_source_2' in polling_tasks.keys())
self.mgr.interval_task(polling_tasks['test_source_1']['task'])
self.mgr.interval_task(polling_tasks['test_source_2']['task'])
self.assertEqual(1, len(self.Pollster.samples))
self.assertEqual(['discovered_1', 'discovered_2'],
self.Pollster.resources)
@ -729,11 +569,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'publishers': ['test://']}]
self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
self.mgr.discovery_manager = self.create_discovery_manager()
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertTrue('test_source_1' in polling_tasks.keys())
self.mgr.interval_task(polling_tasks['test_source_1']['task'])
self.assertEqual(1, len(self.Pollster.samples))
self.assertEqual(['discovered_1', 'discovered_2'],
self.Pollster.resources)
@ -745,9 +585,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
d.obj.resources)
for d in self.mgr.discovery_manager
@ -777,9 +617,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'resources': [],
'sinks': ['test_sink']
})
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
for meter_name in polling_tasks:
self.mgr.interval_task(polling_tasks[meter_name]['task'])
# Only two groups need to be created, one for each pipeline,
# even though counter test is used twice
expected = [mock.call(self.mgr.construct_group_id(
@ -791,36 +632,3 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
def test_arithmetic_transformer(self):
self.pipeline_cfg['sources'][0]['meters'] = ['test', 'testanother']
self.pipeline_cfg['sinks'][0]['transformers'] = [
{'name': 'arithmetic',
'parameters': {
'target': {'name': 'test_sum',
'unit': default_test_data.unit,
'type': default_test_data.type,
'expr': '$(test) * 10 + $(testanother)'
}
}}
]
self.setup_pipeline()
self.mgr.setup_polling_tasks()[60].poll_and_publish()
samples = self.mgr.pipeline_manager.pipelines[0].publishers[0].samples
self.assertEqual(1, len(samples))
self.assertEqual('test_sum', samples[0].name)
self.assertEqual(11, samples[0].volume)
@mock.patch('ceilometer.agent.base.LOG')
@mock.patch('ceilometer.tests.agent.agentbase.TestPollster.get_samples')
def test_skip_polling_and_publish_with_no_resources(
self, get_samples, LOG):
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches['test_pipeline'])[0]
polling_task.poll_and_publish()
LOG.info.assert_called_with(
'Skip polling pollster %s, no resources found', pollster.name)
self.assertEqual(0, get_samples._mock_call_count)

View File

@ -15,10 +15,18 @@
"""Tests for ceilometer/central/manager.py
"""
import shutil
import eventlet
import mock
from oslo_service import service as os_service
from oslo_utils import fileutils
from oslo_utils import timeutils
from oslotest import base
from oslotest import mockpatch
import six
from stevedore import extension
import yaml
from ceilometer.agent import base as agent_base
from ceilometer.agent import manager
@ -33,7 +41,7 @@ class PollingException(Exception):
class TestManager(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch('ceilometer.pipeline.setup_polling', mock.MagicMock())
def test_load_plugins(self):
mgr = manager.AgentManager()
self.assertIsNotNone(list(mgr.extensions))
@ -176,7 +184,17 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
def create_manager():
return manager.AgentManager()
def fake_notifier_sample(self, ctxt, event_type, payload):
for m in payload:
del m['message_signature']
self.notified_samples.append(m)
def setUp(self):
self.notified_samples = []
notifier = mock.Mock()
notifier.info.side_effect = self.fake_notifier_sample
self.useFixture(mockpatch.Patch('oslo_messaging.Notifier',
return_value=notifier))
self.source_resources = True
super(TestRunTasks, self).setUp()
self.useFixture(mockpatch.Patch(
@ -204,8 +222,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
def test_get_sample_resources(self):
polling_tasks = self.mgr.setup_polling_tasks()
task = list(polling_tasks.values())[0]
self.mgr.interval_task(task)
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
self.assertTrue(self.Pollster.resources)
def test_when_keystone_fail(self):
@ -225,18 +242,12 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
polling_tasks = self.mgr.setup_polling_tasks()
task = list(polling_tasks.values())[0]
task = polling_tasks['test_keystone']['task']
self.mgr.interval_task(task)
self.assertFalse(self.PollsterKeystone.samples)
def test_interval_exception_isolation(self):
super(TestRunTasks, self).test_interval_exception_isolation()
self.assertEqual(1, len(self.PollsterException.samples))
self.assertEqual(1, len(self.PollsterExceptionAnother.samples))
self.assertFalse(self.notified_samples)
@mock.patch('ceilometer.agent.base.LOG')
def test_polling_exception(self, LOG):
@ -253,18 +264,102 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
polling_task = self.mgr.setup_polling_tasks()[source_name]['task']
pollster = list(polling_task.pollster_matches[source_name])[0]
# 2 samples after 4 pollings, as pollster got disabled unpon exception
# 2 samples after 4 pollings, as pollster got disabled upon exception
for x in range(0, 4):
self.mgr.interval_task(polling_task)
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(2, len(pub.samples))
samples = self.notified_samples
self.assertEqual(2, len(samples))
LOG.error.assert_called_once_with((
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
def test_start_with_reloadable_pipeline(self):
def setup_pipeline_file(pipeline):
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
return pipeline_cfg_file
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 2)
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 1,
'meters': ['test'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
})
pipeline_cfg_file = setup_pipeline_file(pipeline)
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start()
expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.notified_samples) >= expected_samples:
break
eventlet.sleep(0)
# we only got the old name of meters
for sample in self.notified_samples:
self.assertEqual('test', sample['counter_name'])
self.assertEqual(1, sample['counter_volume'])
self.assertEqual('test_run_tasks', sample['resource_id'])
# Modify the collection targets
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 1,
'meters': ['testanother'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
})
updated_pipeline_cfg_file = setup_pipeline_file(pipeline)
# Move/re-name the updated pipeline file to the original pipeline
# file path as recorded in oslo config
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
# Random sleep to let the pipeline poller complete the reloading
eventlet.sleep(3)
# Flush notified samples to test only new, nothing latent on
# fake message bus.
self.notified_samples = []
expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.notified_samples) >= expected_samples:
break
eventlet.sleep(0)
# we only got the new name of meters
for sample in self.notified_samples:
self.assertEqual('testanother', sample['counter_name'])
self.assertEqual(1, sample['counter_volume'])
self.assertEqual('test_run_tasks', sample['resource_id'])