drop base polling test separation

we only have one polling agent and it's difficult to make edits
having half the code in one place and the other half in another

Change-Id: I8b1a4e840e32e1a4052351569aec12f365d39710
This commit is contained in:
gord chung 2017-12-11 17:12:37 +00:00
parent 46c84ef7f7
commit 98204af682
2 changed files with 543 additions and 589 deletions

View File

@ -1,558 +0,0 @@
#
# Copyright 2012 New Dream Network, LLC (DreamHost)
# Copyright 2013 Intel corp.
# Copyright 2013 eNovance
# Copyright 2014 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import copy
import datetime
import mock
import six
from stevedore import extension
from ceilometer.polling import manager as poll_manager
from ceilometer.polling import plugin_base
from ceilometer import sample
from ceilometer import service
from ceilometer.tests import base
def default_test_data(name='test'):
return sample.Sample(
name=name,
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'Pollster'})
class TestPollster(plugin_base.PollsterBase):
test_data = default_test_data()
discovery = None
@property
def default_discovery(self):
return self.discovery
def get_samples(self, manager, cache, resources):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
c = copy.deepcopy(self.test_data)
c.resource_metadata['resources'] = resources
return [c]
class BatchTestPollster(TestPollster):
test_data = default_test_data()
discovery = None
@property
def default_discovery(self):
return self.discovery
def get_samples(self, manager, cache, resources):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
for resource in resources:
c = copy.deepcopy(self.test_data)
c.timestamp = datetime.datetime.utcnow().isoformat()
c.resource_id = resource
c.resource_metadata['resource'] = resource
yield c
class TestDiscovery(plugin_base.DiscoveryBase):
def discover(self, manager, param=None):
self.params.append(param)
return self.resources
class TestDiscoveryException(plugin_base.DiscoveryBase):
def discover(self, manager, param=None):
self.params.append(param)
raise Exception()
@six.add_metaclass(abc.ABCMeta)
class BaseAgentManagerTestCase(base.BaseTestCase):
class Pollster(TestPollster):
samples = []
resources = []
test_data = default_test_data()
class BatchPollster(BatchTestPollster):
samples = []
resources = []
test_data = default_test_data()
class PollsterAnother(TestPollster):
samples = []
resources = []
test_data = default_test_data('testanother')
class Discovery(TestDiscovery):
params = []
resources = []
class DiscoveryAnother(TestDiscovery):
params = []
resources = []
@property
def group_id(self):
return 'another_group'
class DiscoveryException(TestDiscoveryException):
params = []
def setup_polling(self, poll_cfg=None):
name = self.cfg2file(poll_cfg or self.polling_cfg)
self.CONF.set_override('cfg_file', name, group='polling')
self.mgr.polling_manager = poll_manager.PollingManager(self.CONF)
def create_extension_list(self):
return [extension.Extension('test',
None,
None,
self.Pollster(self.CONF), ),
extension.Extension('testbatch',
None,
None,
self.BatchPollster(self.CONF), ),
extension.Extension('testanother',
None,
None,
self.PollsterAnother(self.CONF), ),
]
def create_discoveries(self):
return extension.ExtensionManager.make_test_instance(
[
extension.Extension(
'testdiscovery',
None,
None,
self.Discovery(self.CONF), ),
extension.Extension(
'testdiscoveryanother',
None,
None,
self.DiscoveryAnother(self.CONF), ),
extension.Extension(
'testdiscoveryexception',
None,
None,
self.DiscoveryException(self.CONF), ),
],
)
@abc.abstractmethod
def create_manager(self):
"""Return subclass specific manager."""
def setUp(self):
super(BaseAgentManagerTestCase, self).setUp()
self.CONF = service.prepare_service([], [])
self.CONF.set_override("backend_url", "zake://", "coordination")
self.CONF.set_override(
'cfg_file',
self.path_get('etc/ceilometer/polling_all.yaml'), group='polling'
)
self.mgr = self.create_manager()
self.mgr.extensions = self.create_extension_list()
self.hashring = mock.MagicMock()
self.hashring.belongs_to_self = mock.MagicMock()
self.hashring.belongs_to_self.return_value = True
self.mgr.hashrings = mock.MagicMock()
self.mgr.hashrings.__getitem__.return_value = self.hashring
self.polling_cfg = {
'sources': [{
'name': 'test_polling',
'interval': 60,
'meters': ['test'],
'resources': ['test://']}]
}
self.setup_polling()
def tearDown(self):
self.Pollster.samples = []
self.Pollster.discovery = []
self.PollsterAnother.samples = []
self.PollsterAnother.discovery = []
self.Pollster.resources = []
self.PollsterAnother.resources = []
self.Discovery.params = []
self.DiscoveryAnother.params = []
self.DiscoveryException.params = []
self.Discovery.resources = []
self.DiscoveryAnother.resources = []
super(BaseAgentManagerTestCase, self).tearDown()
@mock.patch('ceilometer.polling.manager.PollingManager')
def test_start(self, manager):
self.mgr.setup_polling_tasks = mock.MagicMock()
self.mgr.run()
manager.assert_called_once_with(self.CONF)
self.mgr.setup_polling_tasks.assert_called_once_with()
self.mgr.terminate()
def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
per_task_resources = polling_tasks[60].resources
self.assertEqual(1, len(per_task_resources))
self.assertEqual(set(self.polling_cfg['sources'][0]['resources']),
set(per_task_resources['test_polling-test'].get({})))
def test_setup_polling_tasks_multiple_interval(self):
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 10,
'meters': ['test'],
'resources': ['test://'],
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(2, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.assertIn(10, polling_tasks.keys())
def test_setup_polling_tasks_mismatch_counter(self):
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 10,
'meters': ['test_invalid'],
'resources': ['invalid://'],
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.assertNotIn(10, polling_tasks.keys())
def test_setup_polling_task_same_interval(self):
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 60,
'meters': ['testanother'],
'resources': ['testanother://'],
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
pollsters = polling_tasks.get(60).pollster_matches
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
key = 'test_polling-test'
self.assertEqual(set(self.polling_cfg['sources'][0]['resources']),
set(per_task_resources[key].get({})))
key = 'test_polling_1-testanother'
self.assertEqual(set(self.polling_cfg['sources'][1]['resources']),
set(per_task_resources[key].get({})))
def test_agent_manager_start(self):
mgr = self.create_manager()
mgr.extensions = self.mgr.extensions
mgr.create_polling_task = mock.MagicMock()
mgr.run()
self.addCleanup(mgr.terminate)
mgr.create_polling_task.assert_called_once_with()
def _verify_discovery_params(self, expected):
self.assertEqual(expected, self.Discovery.params)
self.assertEqual(expected, self.DiscoveryAnother.params)
self.assertEqual(expected, self.DiscoveryException.params)
def _do_test_per_pollster_discovery(self, discovered_resources,
static_resources):
self.Pollster.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
if static_resources:
# just so we can test that static + pre_polling amalgamated
# override per_pollster
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.polling_cfg['sources'][0]['resources'] = static_resources
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
if static_resources:
self.assertEqual(set(static_resources +
self.DiscoveryAnother.resources),
set(self.Pollster.resources))
else:
self.assertEqual(set(self.Discovery.resources),
set(self.Pollster.resources))
# Make sure no duplicated resource from discovery
for x in self.Pollster.resources:
self.assertEqual(1, self.Pollster.resources.count(x))
def test_per_pollster_discovery(self):
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_pollster_discovery_overridden_by_per_polling_discovery(self):
# ensure static+per_source_discovery overrides per_pollster_discovery
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_pollster_discovery_duplicated(self):
self._do_test_per_pollster_discovery(['dup', 'discovered_1', 'dup'],
[])
def test_per_pollster_discovery_overridden_by_duplicated_static(self):
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
['static_1', 'dup', 'dup'])
def test_per_pollster_discovery_caching(self):
# ensure single discovery associated with multiple pollsters
# only called once per polling cycle
discovered_resources = ['discovered_1', 'discovered_2']
self.Pollster.discovery = 'testdiscovery'
self.PollsterAnother.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.polling_cfg['sources'][0]['meters'].append('testanother')
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(1, len(self.Discovery.params))
self.assertEqual(discovered_resources, self.Pollster.resources)
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
def _do_test_per_polling_discovery(self, discovered_resources,
static_resources):
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.polling_cfg['sources'][0]['resources'] = static_resources
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
discovery = self.Discovery.resources + self.DiscoveryAnother.resources
# compare resource lists modulo ordering
self.assertEqual(set(static_resources + discovery),
set(self.Pollster.resources))
# Make sure no duplicated resource from discovery
for x in self.Pollster.resources:
self.assertEqual(1, self.Pollster.resources.count(x))
def test_per_polling_discovery_discovered_only(self):
self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_polling_discovery_static_only(self):
self._do_test_per_polling_discovery([], ['static_1', 'static_2'])
def test_per_polling_discovery_discovered_augmented_by_static(self):
self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_polling_discovery_discovered_duplicated_static(self):
self._do_test_per_polling_discovery(['discovered_1', 'pud'],
['dup', 'static_1', 'dup'])
def test_multiple_pollings_different_static_resources(self):
# assert that the individual lists of static and discovered resources
# for each polling with a common interval are passed to individual
# pollsters matching each polling
self.polling_cfg['sources'][0]['resources'] = ['test://']
self.polling_cfg['sources'][0]['discovery'] = ['testdiscovery']
self.polling_cfg['sources'].append({
'name': 'another_polling',
'interval': 60,
'meters': ['test'],
'resources': ['another://'],
'discovery': ['testdiscoveryanother'],
})
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = ['discovered_1', 'discovered_2']
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual([None], self.Discovery.params)
self.assertEqual([None], self.DiscoveryAnother.params)
self.assertEqual(2, len(self.Pollster.samples))
samples = self.Pollster.samples
test_resources = ['test://', 'discovered_1', 'discovered_2']
another_resources = ['another://', 'discovered_3', 'discovered_4']
if samples[0][1] == test_resources:
self.assertEqual(another_resources, samples[1][1])
elif samples[0][1] == another_resources:
self.assertEqual(test_resources, samples[1][1])
else:
self.fail('unexpected sample resources %s' % samples)
def test_multiple_sources_different_discoverers(self):
self.Discovery.resources = ['discovered_1', 'discovered_2']
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
sources = [{'name': 'test_source_1',
'interval': 60,
'meters': ['test'],
'discovery': ['testdiscovery']},
{'name': 'test_source_2',
'interval': 60,
'meters': ['testanother'],
'discovery': ['testdiscoveryanother']}]
self.polling_cfg = {'sources': sources}
self.mgr.discoveries = self.create_discoveries()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(1, len(self.Pollster.samples))
self.assertEqual(['discovered_1', 'discovered_2'],
self.Pollster.resources)
self.assertEqual(1, len(self.PollsterAnother.samples))
self.assertEqual(['discovered_3', 'discovered_4'],
self.PollsterAnother.resources)
def test_discovery_partitioning(self):
discovered_resources = ['discovered_1', 'discovered_2']
self.Pollster.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.hashring.belongs_to_self.assert_has_calls(
[mock.call('discovered_1'), mock.call('discovered_2')])
def test_discovery_partitioning_unhashable(self):
discovered_resources = [{'unhashable': True}]
self.Pollster.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.hashring.belongs_to_self.assert_has_calls(
[mock.call('{\'unhashable\': True}')])
def test_static_resources_partitioning(self):
static_resources = ['static_1', 'static_2']
static_resources2 = ['static_3', 'static_4']
self.polling_cfg['sources'][0]['resources'] = static_resources
self.polling_cfg['sources'].append({
'name': 'test_polling2',
'interval': 60,
'meters': ['test', 'test2'],
'resources': static_resources2,
})
# have one polling without static resources defined
self.polling_cfg['sources'].append({
'name': 'test_polling3',
'interval': 60,
'meters': ['test', 'test2'],
'resources': [],
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.hashring.belongs_to_self.assert_has_calls([
mock.call('static_1'),
mock.call('static_2'),
mock.call('static_3'),
mock.call('static_4'),
], any_order=True)
@mock.patch('ceilometer.polling.manager.LOG')
def test_polling_and_notify_with_resources(self, LOG):
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Polling pollster %(poll)s in the context of %(src)s',
{'poll': 'test', 'src': 'test_polling'})
@mock.patch('ceilometer.polling.manager.LOG')
def test_skip_polling_and_notify_with_no_resources(self, LOG):
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches['test_polling'])[0]
polling_task.poll_and_notify()
LOG.debug.assert_called_with(
'Skip pollster %(name)s, no %(p_context)sresources found this '
'cycle', {'name': pollster.name, 'p_context': ''})
@mock.patch('ceilometer.polling.manager.LOG')
def test_skip_polling_polled_resources(self, LOG):
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 60,
'meters': ['test'],
'resources': ['test://'],
})
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task.poll_and_notify()
LOG.debug.assert_called_with(
'Skip pollster %(name)s, no %(p_context)sresources found this '
'cycle', {'name': 'test', 'p_context': 'new '})
@mock.patch('oslo_utils.timeutils.utcnow')
def test_polling_samples_timestamp(self, mock_utc):
polled_samples = []
timestamp = '2222-11-22T00:11:22.333333'
def fake_send_notification(samples):
polled_samples.extend(samples)
mock_utc.return_value = datetime.datetime.strptime(
timestamp, "%Y-%m-%dT%H:%M:%S.%f")
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task._send_notification = mock.Mock(
side_effect=fake_send_notification)
polling_task.poll_and_notify()
self.assertEqual(timestamp, polled_samples[0]['timestamp'])

View File

@ -1,5 +1,8 @@
#
# Copyright 2013 Intel Corp.
# Copyright 2012 New Dream Network, LLC (DreamHost)
# Copyright 2013 Intel corp.
# Copyright 2013 eNovance
# Copyright 2014 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,29 +16,62 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer agent manager"""
import copy
import datetime
import fixtures
from keystoneauth1 import exceptions as ka_exceptions
import mock
from oslotest import base
from keystoneauth1 import exceptions as ka_exceptions
from stevedore import extension
from ceilometer.compute import discovery as nova_discover
from ceilometer.hardware import discovery
from ceilometer.polling import manager
from ceilometer.polling import plugin_base
from ceilometer import sample
from ceilometer import service
from ceilometer.tests.unit.polling import agentbase
from ceilometer.tests import base
def fakedelayed(delay, target, *args, **kwargs):
return target(*args, **kwargs)
def default_test_data(name='test'):
return sample.Sample(
name=name,
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'Pollster'})
class TestPollster(plugin_base.PollsterBase):
test_data = default_test_data()
discovery = None
@property
def default_discovery(self):
return self.discovery
def get_samples(self, manager, cache, resources):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
c = copy.deepcopy(self.test_data)
c.resource_metadata['resources'] = resources
return [c]
class PollingException(Exception):
pass
class TestPollsterBuilder(agentbase.TestPollster):
class TestPollsterBuilder(TestPollster):
@classmethod
def build_pollsters(cls, conf):
return [('builder1', cls(conf)), ('builder2', cls(conf))]
@ -123,8 +159,7 @@ class TestManager(base.BaseTestCase):
extension.Extension('test',
None,
None,
agentbase.TestPollster(
self.conf)),
TestPollster(self.conf)),
]
)
@ -134,16 +169,36 @@ class TestManager(base.BaseTestCase):
self.assertEqual(3, len(mgr.extensions))
for ext in mgr.extensions:
self.assertIn(ext.name, ['builder1', 'builder2', 'test'])
self.assertIsInstance(ext.obj, agentbase.TestPollster)
self.assertIsInstance(ext.obj, TestPollster)
class TestPollsterKeystone(agentbase.TestPollster):
class BatchTestPollster(TestPollster):
test_data = default_test_data()
discovery = None
@property
def default_discovery(self):
return self.discovery
def get_samples(self, manager, cache, resources):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
for resource in resources:
c = copy.deepcopy(self.test_data)
c.timestamp = datetime.datetime.utcnow().isoformat()
c.resource_id = resource
c.resource_metadata['resource'] = resource
yield c
class TestPollsterKeystone(TestPollster):
def get_samples(self, manager, cache, resources):
# Just try to use keystone, that will raise an exception
manager.keystone.projects.list()
class TestPollsterPollingException(agentbase.TestPollster):
class TestPollsterPollingException(TestPollster):
discovery = 'test'
polling_failures = 0
@ -161,17 +216,64 @@ class TestPollsterPollingException(agentbase.TestPollster):
return sample
class TestRunTasks(agentbase.BaseAgentManagerTestCase):
class TestDiscovery(plugin_base.DiscoveryBase):
def discover(self, manager, param=None):
self.params.append(param)
return self.resources
class TestDiscoveryException(plugin_base.DiscoveryBase):
def discover(self, manager, param=None):
self.params.append(param)
raise Exception()
class TestPollingAgent(base.BaseTestCase):
class Pollster(TestPollster):
samples = []
resources = []
test_data = default_test_data()
class BatchPollster(BatchTestPollster):
samples = []
resources = []
test_data = default_test_data()
class PollsterAnother(TestPollster):
samples = []
resources = []
test_data = default_test_data('testanother')
class PollsterKeystone(TestPollsterKeystone):
samples = []
resources = []
test_data = agentbase.default_test_data('testkeystone')
test_data = default_test_data('testkeystone')
class PollsterPollingException(TestPollsterPollingException):
samples = []
resources = []
test_data = agentbase.default_test_data('testpollingexception')
test_data = default_test_data('testpollingexception')
class Discovery(TestDiscovery):
params = []
resources = []
class DiscoveryAnother(TestDiscovery):
params = []
resources = []
@property
def group_id(self):
return 'another_group'
class DiscoveryException(TestDiscoveryException):
params = []
def setup_polling(self, poll_cfg=None):
name = self.cfg2file(poll_cfg or self.polling_cfg)
self.CONF.set_override('cfg_file', name, group='polling')
self.mgr.polling_manager = manager.PollingManager(self.CONF)
def create_manager(self):
return manager.AgentManager(0, self.CONF)
@ -182,35 +284,388 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.notified_samples.append(m)
def setUp(self):
super(TestPollingAgent, self).setUp()
self.notified_samples = []
self.notifier = mock.Mock()
self.notifier.sample.side_effect = self.fake_notifier_sample
self.useFixture(fixtures.MockPatch('oslo_messaging.Notifier',
return_value=self.notifier))
super(TestRunTasks, self).setUp()
self.useFixture(fixtures.MockPatch(
'keystoneclient.v2_0.client.Client',
return_value=mock.Mock()))
self.useFixture(fixtures.MockPatch('keystoneclient.v2_0.client.Client',
return_value=mock.Mock()))
self.CONF = service.prepare_service([], [])
self.CONF.set_override("backend_url", "zake://", "coordination")
self.CONF.set_override(
'cfg_file',
self.path_get('etc/ceilometer/polling_all.yaml'), group='polling'
)
self.mgr = self.create_manager()
self.mgr.extensions = self.create_extension_list()
self.hashring = mock.MagicMock()
self.hashring.belongs_to_self = mock.MagicMock()
self.hashring.belongs_to_self.return_value = True
self.mgr.hashrings = mock.MagicMock()
self.mgr.hashrings.__getitem__.return_value = self.hashring
self.polling_cfg = {
'sources': [{
'name': 'test_polling',
'interval': 60,
'meters': ['test'],
'resources': ['test://']}]
}
self.setup_polling()
def tearDown(self):
self.PollsterKeystone.samples = []
self.PollsterKeystone.resources = []
self.PollsterPollingException.samples = []
self.PollsterPollingException.resources = []
super(TestRunTasks, self).tearDown()
self.Pollster.samples = []
self.Pollster.discovery = []
self.PollsterAnother.samples = []
self.PollsterAnother.discovery = []
self.Pollster.resources = []
self.PollsterAnother.resources = []
self.Discovery.params = []
self.DiscoveryAnother.params = []
self.DiscoveryException.params = []
self.Discovery.resources = []
self.DiscoveryAnother.resources = []
super(TestPollingAgent, self).tearDown()
def create_extension_list(self):
exts = super(TestRunTasks, self).create_extension_list()
exts.extend([extension.Extension('testkeystone',
None,
None,
self.PollsterKeystone(self.CONF), ),
extension.Extension('testpollingexception',
None,
None,
self.PollsterPollingException(
self.CONF), )])
return exts
return [extension.Extension('test',
None,
None,
self.Pollster(self.CONF), ),
extension.Extension('testbatch',
None,
None,
self.BatchPollster(self.CONF), ),
extension.Extension('testanother',
None,
None,
self.PollsterAnother(self.CONF), ),
extension.Extension('testkeystone',
None,
None,
self.PollsterKeystone(self.CONF), ),
extension.Extension('testpollingexception',
None,
None,
self.PollsterPollingException(self.CONF), )
]
def create_discoveries(self):
return extension.ExtensionManager.make_test_instance(
[
extension.Extension(
'testdiscovery',
None,
None,
self.Discovery(self.CONF), ),
extension.Extension(
'testdiscoveryanother',
None,
None,
self.DiscoveryAnother(self.CONF), ),
extension.Extension(
'testdiscoveryexception',
None,
None,
self.DiscoveryException(self.CONF), ),
],
)
@mock.patch('ceilometer.polling.manager.PollingManager')
def test_start(self, poll_manager):
self.mgr.setup_polling_tasks = mock.MagicMock()
self.mgr.run()
poll_manager.assert_called_once_with(self.CONF)
self.mgr.setup_polling_tasks.assert_called_once_with()
self.mgr.terminate()
def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
per_task_resources = polling_tasks[60].resources
self.assertEqual(1, len(per_task_resources))
self.assertEqual(set(self.polling_cfg['sources'][0]['resources']),
set(per_task_resources['test_polling-test'].get({})))
def test_setup_polling_tasks_multiple_interval(self):
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 10,
'meters': ['test'],
'resources': ['test://'],
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(2, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.assertIn(10, polling_tasks.keys())
def test_setup_polling_tasks_mismatch_counter(self):
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 10,
'meters': ['test_invalid'],
'resources': ['invalid://'],
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.assertNotIn(10, polling_tasks.keys())
def test_setup_polling_task_same_interval(self):
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 60,
'meters': ['testanother'],
'resources': ['testanother://'],
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
pollsters = polling_tasks.get(60).pollster_matches
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
key = 'test_polling-test'
self.assertEqual(set(self.polling_cfg['sources'][0]['resources']),
set(per_task_resources[key].get({})))
key = 'test_polling_1-testanother'
self.assertEqual(set(self.polling_cfg['sources'][1]['resources']),
set(per_task_resources[key].get({})))
def test_agent_manager_start(self):
mgr = self.create_manager()
mgr.extensions = self.mgr.extensions
mgr.create_polling_task = mock.MagicMock()
mgr.run()
self.addCleanup(mgr.terminate)
mgr.create_polling_task.assert_called_once_with()
def _verify_discovery_params(self, expected):
self.assertEqual(expected, self.Discovery.params)
self.assertEqual(expected, self.DiscoveryAnother.params)
self.assertEqual(expected, self.DiscoveryException.params)
def _do_test_per_pollster_discovery(self, discovered_resources,
static_resources):
self.Pollster.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
if static_resources:
# just so we can test that static + pre_polling amalgamated
# override per_pollster
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.polling_cfg['sources'][0]['resources'] = static_resources
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
if static_resources:
self.assertEqual(set(static_resources +
self.DiscoveryAnother.resources),
set(self.Pollster.resources))
else:
self.assertEqual(set(self.Discovery.resources),
set(self.Pollster.resources))
# Make sure no duplicated resource from discovery
for x in self.Pollster.resources:
self.assertEqual(1, self.Pollster.resources.count(x))
def test_per_pollster_discovery(self):
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_pollster_discovery_overridden_by_per_polling_discovery(self):
# ensure static+per_source_discovery overrides per_pollster_discovery
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_pollster_discovery_duplicated(self):
self._do_test_per_pollster_discovery(['dup', 'discovered_1', 'dup'],
[])
def test_per_pollster_discovery_overridden_by_duplicated_static(self):
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
['static_1', 'dup', 'dup'])
def test_per_pollster_discovery_caching(self):
# ensure single discovery associated with multiple pollsters
# only called once per polling cycle
discovered_resources = ['discovered_1', 'discovered_2']
self.Pollster.discovery = 'testdiscovery'
self.PollsterAnother.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.polling_cfg['sources'][0]['meters'].append('testanother')
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(1, len(self.Discovery.params))
self.assertEqual(discovered_resources, self.Pollster.resources)
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
def _do_test_per_polling_discovery(self, discovered_resources,
static_resources):
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.polling_cfg['sources'][0]['resources'] = static_resources
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
discovery = self.Discovery.resources + self.DiscoveryAnother.resources
# compare resource lists modulo ordering
self.assertEqual(set(static_resources + discovery),
set(self.Pollster.resources))
# Make sure no duplicated resource from discovery
for x in self.Pollster.resources:
self.assertEqual(1, self.Pollster.resources.count(x))
def test_per_polling_discovery_discovered_only(self):
self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_polling_discovery_static_only(self):
self._do_test_per_polling_discovery([], ['static_1', 'static_2'])
def test_per_polling_discovery_discovered_augmented_by_static(self):
self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_polling_discovery_discovered_duplicated_static(self):
self._do_test_per_polling_discovery(['discovered_1', 'pud'],
['dup', 'static_1', 'dup'])
def test_multiple_pollings_different_static_resources(self):
# assert that the individual lists of static and discovered resources
# for each polling with a common interval are passed to individual
# pollsters matching each polling
self.polling_cfg['sources'][0]['resources'] = ['test://']
self.polling_cfg['sources'][0]['discovery'] = ['testdiscovery']
self.polling_cfg['sources'].append({
'name': 'another_polling',
'interval': 60,
'meters': ['test'],
'resources': ['another://'],
'discovery': ['testdiscoveryanother'],
})
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = ['discovered_1', 'discovered_2']
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual([None], self.Discovery.params)
self.assertEqual([None], self.DiscoveryAnother.params)
self.assertEqual(2, len(self.Pollster.samples))
samples = self.Pollster.samples
test_resources = ['test://', 'discovered_1', 'discovered_2']
another_resources = ['another://', 'discovered_3', 'discovered_4']
if samples[0][1] == test_resources:
self.assertEqual(another_resources, samples[1][1])
elif samples[0][1] == another_resources:
self.assertEqual(test_resources, samples[1][1])
else:
self.fail('unexpected sample resources %s' % samples)
def test_multiple_sources_different_discoverers(self):
self.Discovery.resources = ['discovered_1', 'discovered_2']
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
sources = [{'name': 'test_source_1',
'interval': 60,
'meters': ['test'],
'discovery': ['testdiscovery']},
{'name': 'test_source_2',
'interval': 60,
'meters': ['testanother'],
'discovery': ['testdiscoveryanother']}]
self.polling_cfg = {'sources': sources}
self.mgr.discoveries = self.create_discoveries()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertIn(60, polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(1, len(self.Pollster.samples))
self.assertEqual(['discovered_1', 'discovered_2'],
self.Pollster.resources)
self.assertEqual(1, len(self.PollsterAnother.samples))
self.assertEqual(['discovered_3', 'discovered_4'],
self.PollsterAnother.resources)
@mock.patch('ceilometer.polling.manager.LOG')
def test_polling_and_notify_with_resources(self, LOG):
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Polling pollster %(poll)s in the context of %(src)s',
{'poll': 'test', 'src': 'test_polling'})
@mock.patch('ceilometer.polling.manager.LOG')
def test_skip_polling_and_notify_with_no_resources(self, LOG):
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches['test_polling'])[0]
polling_task.poll_and_notify()
LOG.debug.assert_called_with(
'Skip pollster %(name)s, no %(p_context)sresources found this '
'cycle', {'name': pollster.name, 'p_context': ''})
@mock.patch('ceilometer.polling.manager.LOG')
def test_skip_polling_polled_resources(self, LOG):
self.polling_cfg['sources'].append({
'name': 'test_polling_1',
'interval': 60,
'meters': ['test'],
'resources': ['test://'],
})
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task.poll_and_notify()
LOG.debug.assert_called_with(
'Skip pollster %(name)s, no %(p_context)sresources found this '
'cycle', {'name': 'test', 'p_context': 'new '})
@mock.patch('oslo_utils.timeutils.utcnow')
def test_polling_samples_timestamp(self, mock_utc):
polled_samples = []
timestamp = '2222-11-22T00:11:22.333333'
def fake_send_notification(samples):
polled_samples.extend(samples)
mock_utc.return_value = datetime.datetime.strptime(
timestamp, "%Y-%m-%dT%H:%M:%S.%f")
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task._send_notification = mock.Mock(
side_effect=fake_send_notification)
polling_task.poll_and_notify()
self.assertEqual(timestamp, polled_samples[0]['timestamp'])
def test_get_sample_resources(self):
polling_tasks = self.mgr.setup_polling_tasks()
@ -243,10 +698,10 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
@mock.patch('ceilometer.polling.manager.LOG')
@mock.patch('ceilometer.nova_client.LOG')
def test_hardware_discover_fail_minimize_logs(self, novalog, baselog):
class PollsterHardware(agentbase.TestPollster):
class PollsterHardware(TestPollster):
discovery = 'tripleo_overcloud_nodes'
class PollsterHardwareAnother(agentbase.TestPollster):
class PollsterHardwareAnother(TestPollster):
discovery = 'tripleo_overcloud_nodes'
self.mgr.extensions.extend([
@ -378,3 +833,60 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
samples = self.notified_samples
self.assertEqual(expected_samples, len(samples))
self.assertEqual(call_count, self.notifier.sample.call_count)
def test_discovery_partitioning(self):
discovered_resources = ['discovered_1', 'discovered_2']
self.Pollster.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.hashring.belongs_to_self.assert_has_calls(
[mock.call('discovered_1'), mock.call('discovered_2')])
def test_discovery_partitioning_unhashable(self):
discovered_resources = [{'unhashable': True}]
self.Pollster.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.hashring.belongs_to_self.assert_has_calls(
[mock.call('{\'unhashable\': True}')])
def test_static_resources_partitioning(self):
static_resources = ['static_1', 'static_2']
static_resources2 = ['static_3', 'static_4']
self.polling_cfg['sources'][0]['resources'] = static_resources
self.polling_cfg['sources'].append({
'name': 'test_polling2',
'interval': 60,
'meters': ['test', 'test2'],
'resources': static_resources2,
})
# have one polling without static resources defined
self.polling_cfg['sources'].append({
'name': 'test_polling3',
'interval': 60,
'meters': ['test', 'test2'],
'resources': [],
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.hashring.belongs_to_self.assert_has_calls([
mock.call('static_1'),
mock.call('static_2'),
mock.call('static_3'),
mock.call('static_4'),
], any_order=True)