diff --git a/ceilometer/collector.py b/ceilometer/collector.py index c0010c79..c9574d12 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -45,16 +45,9 @@ OPTS = [ default=False, help='Requeue the event on the collector event queue ' 'when the collector fails to dispatch it.'), - cfg.BoolOpt('enable_rpc', - default=False, - help='Enable the RPC functionality of collector. This ' - 'functionality is now deprecated in favour of notifier ' - 'publisher and queues.') ] cfg.CONF.register_opts(OPTS, group="collector") -cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', - group='publisher_rpc') cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', group='publisher_notifier') cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging', @@ -73,7 +66,6 @@ class CollectorService(os_service.Service): # ensure dispatcher is configured before starting other services dispatcher_managers = dispatcher.load_dispatcher_manager() (self.meter_manager, self.event_manager) = dispatcher_managers - self.rpc_server = None self.sample_listener = None self.event_listener = None super(CollectorService, self).start() @@ -83,12 +75,6 @@ class CollectorService(os_service.Service): transport = messaging.get_transport(optional=True) if transport: - if cfg.CONF.collector.enable_rpc: - LOG.warning('RPC collector is deprecated in favour of queues. ' - 'Please switch to notifier publisher.') - self.rpc_server = messaging.get_rpc_server( - transport, cfg.CONF.publisher_rpc.metering_topic, self) - if list(self.meter_manager): sample_target = oslo_messaging.Target( topic=cfg.CONF.publisher_notifier.metering_topic) @@ -109,9 +95,6 @@ class CollectorService(os_service.Service): requeue_event_on_dispatcher_error)) self.event_listener.start() - if cfg.CONF.collector.enable_rpc: - self.rpc_server.start() - if not cfg.CONF.collector.udp_address: # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) @@ -144,8 +127,6 @@ class CollectorService(os_service.Service): def stop(self): self.udp_run = False - if cfg.CONF.collector.enable_rpc and self.rpc_server: - self.rpc_server.stop() if self.sample_listener: utils.kill_listeners([self.sample_listener]) if self.event_listener: diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index 824ffbc8..c56ea21c 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -81,23 +81,6 @@ _SERIALIZER = RequestContextSerializer( oslo_serializer.JsonPayloadSerializer()) -def get_rpc_server(transport, topic, endpoint): - """Return a configured oslo_messaging rpc server.""" - cfg.CONF.import_opt('host', 'ceilometer.service') - target = oslo_messaging.Target(server=cfg.CONF.host, topic=topic) - return oslo_messaging.get_rpc_server(transport, target, - [endpoint], executor='threading', - serializer=_SERIALIZER) - - -def get_rpc_client(transport, retry=None, **kwargs): - """Return a configured oslo_messaging RPCClient.""" - target = oslo_messaging.Target(**kwargs) - return oslo_messaging.RPCClient(transport, target, - serializer=_SERIALIZER, - retry=retry) - - def get_notification_listener(transport, targets, endpoints, allow_requeue=False): """Return a configured oslo_messaging notification listener.""" diff --git a/ceilometer/opts.py b/ceilometer/opts.py index 384c2565..f6d6fe4b 100644 --- a/ceilometer/opts.py +++ b/ceilometer/opts.py @@ -104,7 +104,6 @@ def list_opts(): ('polling', ceilometer.agent.manager.OPTS), ('publisher', ceilometer.publisher.utils.OPTS), ('publisher_notifier', ceilometer.publisher.messaging.NOTIFIER_OPTS), - ('publisher_rpc', ceilometer.publisher.messaging.RPC_OPTS), ('rgw_admin_credentials', ceilometer.objectstore.rgw.CREDENTIAL_OPTS), # NOTE(sileht): the configuration file contains only the options # for the password plugin that handles keystone v2 and v3 API diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 5ca95d98..b2365582 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -35,15 +35,6 @@ from ceilometer.publisher import utils LOG = log.getLogger(__name__) -RPC_OPTS = [ - cfg.StrOpt('metering_topic', - default='metering', - deprecated_for_removal=True, - help='The topic that ceilometer uses for metering messages.', - deprecated_group="DEFAULT", - ), -] - NOTIFIER_OPTS = [ cfg.StrOpt('metering_topic', default='metering', @@ -63,8 +54,6 @@ NOTIFIER_OPTS = [ ) ] -cfg.CONF.register_opts(RPC_OPTS, - group="publisher_rpc") cfg.CONF.register_opts(NOTIFIER_OPTS, group="publisher_notifier") cfg.CONF.import_opt('host', 'ceilometer.service') @@ -122,7 +111,7 @@ class MessagingPublisher(publisher.PublisherBase): sample, cfg.CONF.publisher.telemetry_secret) for sample in samples ] - topic = cfg.CONF.publisher_rpc.metering_topic + topic = cfg.CONF.publisher_notifier.metering_topic self.local_queue.append((context, topic, meters)) if self.per_meter_topic: @@ -201,26 +190,6 @@ class MessagingPublisher(publisher.PublisherBase): """Send the meters to the messaging topic.""" -class RPCPublisher(MessagingPublisher): - def __init__(self, parsed_url): - super(RPCPublisher, self).__init__(parsed_url) - - options = urlparse.parse_qs(parsed_url.query) - self.target = options.get('target', ['record_metering_data'])[0] - - self.rpc_client = messaging.get_rpc_client( - messaging.get_transport(), - retry=self.retry, version='1.0' - ) - - def _send(self, context, topic, meters): - try: - self.rpc_client.prepare(topic=topic).cast(context, self.target, - data=meters) - except oslo_messaging.MessageDeliveryFailure as e: - raise_delivery_failure(e) - - class NotifierPublisher(MessagingPublisher): def __init__(self, parsed_url, default_topic): super(NotifierPublisher, self).__init__(parsed_url) diff --git a/ceilometer/tests/functional/test_collector.py b/ceilometer/tests/functional/test_collector.py index 5e6bf585..b93f0f0e 100644 --- a/ceilometer/tests/functional/test_collector.py +++ b/ceilometer/tests/functional/test_collector.py @@ -17,7 +17,6 @@ import socket import mock import msgpack from oslo_config import fixture as fixture_config -from oslo_context import context import oslo_messaging from oslo_utils import timeutils from oslotest import mockpatch @@ -25,7 +24,6 @@ from stevedore import extension from ceilometer import collector from ceilometer import dispatcher -from ceilometer import messaging from ceilometer.publisher import utils from ceilometer import sample from ceilometer.tests import base as tests_base @@ -208,18 +206,6 @@ class TestCollector(tests_base.BaseTestCase): self.assertEqual(0, rpc_start.call_count) self.assertEqual(1, udp_start.call_count) - @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start') - @mock.patch.object(collector.CollectorService, 'start_udp') - def test_only_rpc(self, udp_start, rpc_start): - """Check that only RPC is started if udp_address is empty.""" - self.CONF.set_override('enable_rpc', True, group='collector') - self.CONF.set_override('udp_address', '', group='collector') - self._setup_fake_dispatcher() - self.srv.start() - # two calls because two servers (notification and rpc) - self.assertEqual(2, rpc_start.call_count) - self.assertEqual(0, udp_start.call_count) - def test_udp_receive_valid_encoding(self): self._setup_messaging(False) mock_dispatcher = self._setup_fake_dispatcher() @@ -231,21 +217,6 @@ class TestCollector(tests_base.BaseTestCase): mock_dispatcher.method_calls[0][1][0], "not-so-secret")) - @mock.patch('ceilometer.storage.impl_log.LOG') - def test_collector_no_mock(self, mylog): - self.CONF.set_override('enable_rpc', True, group='collector') - self.CONF.set_override('udp_address', '', group='collector') - mylog.info.side_effect = lambda *args: self.srv.stop() - self.srv.start() - - client = messaging.get_rpc_client(self.transport, version='1.0') - cclient = client.prepare(topic='metering') - cclient.cast(context.RequestContext(), - 'record_metering_data', data=[self.utf8_msg]) - self.srv.rpc_server.wait() - mylog.info.assert_called_once_with( - 'metering data test for test_run_tasks: 1') - def _test_collector_requeue(self, listener): mock_dispatcher = self._setup_fake_dispatcher() diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py index 10b15e2d..f2ee8432 100644 --- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py +++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py @@ -19,12 +19,10 @@ import uuid import mock from oslo_config import fixture as fixture_config -from oslo_context import context from oslo_utils import netutils import testscenarios.testcase from ceilometer.event.storage import models as event -from ceilometer import messaging from ceilometer.publisher import messaging as msg_publisher from ceilometer import sample from ceilometer.tests import base as tests_base @@ -103,76 +101,6 @@ class BasePublisherTestCase(tests_base.BaseTestCase): self.setup_messaging(self.CONF) -class RpcOnlyPublisherTest(BasePublisherTestCase): - def test_published_no_mock(self): - publisher = msg_publisher.RPCPublisher( - netutils.urlsplit('rpc://')) - - endpoint = mock.MagicMock(['record_metering_data']) - collector = messaging.get_rpc_server( - self.transport, self.CONF.publisher_rpc.metering_topic, endpoint) - endpoint.record_metering_data.side_effect = (lambda *args, **kwds: - collector.stop()) - - collector.start() - publisher.publish_samples(context.RequestContext(), - self.test_sample_data) - collector.wait() - - class Matcher(object): - @staticmethod - def __eq__(data): - for i, sample_item in enumerate(data): - if (sample_item['counter_name'] != - self.test_sample_data[i].name): - return False - return True - - endpoint.record_metering_data.assert_called_once_with( - mock.ANY, data=Matcher()) - - def test_publish_target(self): - publisher = msg_publisher.RPCPublisher( - netutils.urlsplit('rpc://?target=custom_procedure_call')) - cast_context = mock.MagicMock() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.return_value = cast_context - publisher.publish_samples(mock.MagicMock(), - self.test_sample_data) - - prepare.assert_called_once_with( - topic=self.CONF.publisher_rpc.metering_topic) - cast_context.cast.assert_called_once_with( - mock.ANY, 'custom_procedure_call', data=mock.ANY) - - def test_published_with_per_meter_topic(self): - publisher = msg_publisher.RPCPublisher( - netutils.urlsplit('rpc://?per_meter_topic=1')) - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - publisher.publish_samples(mock.MagicMock(), - self.test_sample_data) - - class MeterGroupMatcher(object): - def __eq__(self, meters): - return len(set(meter['counter_name'] - for meter in meters)) == 1 - - topic = self.CONF.publisher_rpc.metering_topic - expected = [mock.call(topic=topic), - mock.call().cast(mock.ANY, 'record_metering_data', - data=mock.ANY), - mock.call(topic=topic + '.test'), - mock.call().cast(mock.ANY, 'record_metering_data', - data=MeterGroupMatcher()), - mock.call(topic=topic + '.test2'), - mock.call().cast(mock.ANY, 'record_metering_data', - data=MeterGroupMatcher()), - mock.call(topic=topic + '.test3'), - mock.call().cast(mock.ANY, 'record_metering_data', - data=MeterGroupMatcher())] - self.assertEqual(expected, prepare.mock_calls) - - class NotifierOnlyPublisherTest(BasePublisherTestCase): @mock.patch('oslo_messaging.Notifier') @@ -203,17 +131,13 @@ class TestPublisher(testscenarios.testcase.WithScenarios, publisher_cls=msg_publisher.EventNotifierPublisher, test_data=BasePublisherTestCase.test_event_data, pub_func='publish_events', attr='event_type')), - ('rpc', dict(protocol="rpc", - publisher_cls=msg_publisher.RPCPublisher, - test_data=BasePublisherTestCase.test_sample_data, - pub_func='publish_samples', attr='source')), ] def setUp(self): super(TestPublisher, self).setUp() self.topic = (self.CONF.publisher_notifier.event_topic if self.pub_func == 'publish_events' else - self.CONF.publisher_rpc.metering_topic) + self.CONF.publisher_notifier.metering_topic) class TestPublisherPolicy(TestPublisher): diff --git a/setup.cfg b/setup.cfg index 6903d427..81c829f6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -218,9 +218,6 @@ ceilometer.transformer = ceilometer.publisher = test = ceilometer.publisher.test:TestPublisher - meter_publisher = ceilometer.publisher.messaging:RPCPublisher - meter = ceilometer.publisher.messaging:RPCPublisher - rpc = ceilometer.publisher.messaging:RPCPublisher notifier = ceilometer.publisher.messaging:SampleNotifierPublisher udp = ceilometer.publisher.udp:UDPPublisher file = ceilometer.publisher.file:FilePublisher diff --git a/tools/send_test_data.py b/tools/send_test_data.py index ed9b9f59..a1d37a3a 100755 --- a/tools/send_test_data.py +++ b/tools/send_test_data.py @@ -30,7 +30,6 @@ import uuid import make_test_data from oslo_config import cfg -from oslo_context import context import oslo_messaging from six import moves @@ -39,11 +38,6 @@ from ceilometer.publisher import utils from ceilometer import service -def send_batch_rpc(rpc_client, topic, batch): - rpc_client.prepare(topic=topic).cast(context.RequestContext(), - 'record_metering_data', data=batch) - - def send_batch_notifier(notifier, topic, batch): notifier.sample({}, event_type=topic, payload=batch) @@ -58,13 +52,6 @@ def get_notifier(config_file): ) -def get_rpc_client(config_file): - service.prepare_service(argv=['/', '--config-file', config_file]) - transport = messaging.get_transport() - rpc_client = messaging.get_rpc_client(transport, version='1.0') - return rpc_client - - def generate_data(send_batch, make_data_args, samples_count, batch_size, resources_count, topic): make_data_args.interval = 1 @@ -104,12 +91,6 @@ def generate_data(send_batch, make_data_args, samples_count, def get_parser(): parser = argparse.ArgumentParser() - parser.add_argument( - '--notify', - dest='notify', - type=bool, - default=True - ) parser.add_argument( '--batch-size', @@ -148,12 +129,8 @@ def get_parser(): def main(): args = get_parser().parse_known_args()[0] make_data_args = make_test_data.get_parser().parse_known_args()[0] - if args.notify: - notifier = get_notifier(args.config_file) - send_batch = functools.partial(send_batch_notifier, notifier) - else: - rpc_client = get_rpc_client(args.config_file) - send_batch = functools.partial(send_batch_rpc, rpc_client) + notifier = get_notifier(args.config_file) + send_batch = functools.partial(send_batch_notifier, notifier) result_dir = args.result_dir del args.notify del args.config_file