diff --git a/ceilometer/publisher/kafka_broker.py b/ceilometer/publisher/kafka_broker.py index 0dd488a6..3018841e 100644 --- a/ceilometer/publisher/kafka_broker.py +++ b/ceilometer/publisher/kafka_broker.py @@ -13,24 +13,19 @@ # License for the specific language governing permissions and limitations # under the License. -import json - import kafka -from oslo_config import cfg from oslo_log import log +from oslo_serialization import jsonutils from oslo_utils import netutils from six.moves.urllib import parse as urlparse from ceilometer.i18n import _LE -from ceilometer.i18n import _LI -from ceilometer.i18n import _LW -from ceilometer import publisher -from ceilometer.publisher import utils +from ceilometer.publisher import messaging LOG = log.getLogger(__name__) -class KafkaBrokerPublisher(publisher.PublisherBase): +class KafkaBrokerPublisher(messaging.MessagingPublisher): """Publish metering data to kafka broker. The ip address and port number of kafka broker should be configured in @@ -68,132 +63,34 @@ class KafkaBrokerPublisher(publisher.PublisherBase): """ def __init__(self, parsed_url): - self.kafka_client = None + super(KafkaBrokerPublisher, self).__init__(parsed_url) + options = urlparse.parse_qs(parsed_url.query) - self.host, self.port = netutils.parse_host_port( + self._producer = None + self._host, self._port = netutils.parse_host_port( parsed_url.netloc, default_port=9092) + self._topic = options.get('topic', ['ceilometer'])[-1] + self.max_retry = int(options.get('max_retry', [100])[-1]) - self.local_queue = [] - - params = urlparse.parse_qs(parsed_url.query) - self.topic = params.get('topic', ['ceilometer'])[-1] - self.policy = params.get('policy', ['default'])[-1] - self.max_queue_length = int(params.get( - 'max_queue_length', [1024])[-1]) - self.max_retry = int(params.get('max_retry', [100])[-1]) - - if self.policy in ['default', 'drop', 'queue']: - LOG.info(_LI('Publishing policy set to %s') % self.policy) - else: - LOG.warn(_LW('Publishing policy is unknown (%s) force to default') - % self.policy) - self.policy = 'default' + def _ensure_connection(self): + if self._producer: + return try: - self._get_client() + client = kafka.KafkaClient("%s:%s" % (self._host, self._port)) + self._producer = kafka.SimpleProducer(client) except Exception as e: LOG.exception(_LE("Failed to connect to Kafka service: %s"), e) + raise messaging.DeliveryFailure('Kafka Client is not available, ' + 'please restart Kafka client') - def publish_samples(self, context, samples): - """Send a metering message for kafka broker. - - :param context: Execution context from the service or RPC call - :param samples: Samples from pipeline after transformation - """ - samples_list = [ - utils.meter_message_from_counter( - sample, cfg.CONF.publisher.telemetry_secret) - for sample in samples - ] - - self.local_queue.append(samples_list) - + def _send(self, context, event_type, data): + self._ensure_connection() + # TODO(sileht): don't split the payload into multiple network + # message ... but how to do that without breaking consuming + # application... try: - self._check_kafka_connection() + for d in data: + self._producer.send_messages(self._topic, jsonutils.dumps(d)) except Exception as e: - raise e - - self.flush() - - def flush(self): - queue = self.local_queue - self.local_queue = self._process_queue(queue) - if self.policy == 'queue': - self._check_queue_length() - - def publish_events(self, context, events): - """Send an event message for kafka broker. - - :param context: Execution context from the service or RPC call - :param events: events from pipeline after transformation - """ - events_list = [utils.message_from_event( - event, cfg.CONF.publisher.telemetry_secret) for event in events] - - self.local_queue.append(events_list) - - try: - self._check_kafka_connection() - except Exception as e: - raise e - - self.flush() - - def _process_queue(self, queue): - current_retry = 0 - while queue: - data = queue[0] - try: - self._send(data) - except Exception: - LOG.warn(_LW("Failed to publish %d datum"), - sum([len(d) for d in queue])) - if self.policy == 'queue': - return queue - elif self.policy == 'drop': - return [] - current_retry += 1 - if current_retry >= self.max_retry: - self.local_queue = [] - LOG.exception(_LE("Failed to retry to send sample data " - "with max_retry times")) - raise - else: - queue.pop(0) - return [] - - def _check_queue_length(self): - queue_length = len(self.local_queue) - if queue_length > self.max_queue_length > 0: - diff = queue_length - self.max_queue_length - self.local_queue = self.local_queue[diff:] - LOG.warn(_LW("Kafka Publisher max local queue length is exceeded, " - "dropping %d oldest data") % diff) - - def _check_kafka_connection(self): - try: - self._get_client() - except Exception as e: - LOG.exception(_LE("Failed to connect to Kafka service: %s"), e) - - if self.policy == 'queue': - self._check_queue_length() - else: - self.local_queue = [] - raise Exception('Kafka Client is not available, ' - 'please restart Kafka client') - - def _get_client(self): - if not self.kafka_client: - self.kafka_client = kafka.KafkaClient( - "%s:%s" % (self.host, self.port)) - self.kafka_producer = kafka.SimpleProducer(self.kafka_client) - - def _send(self, data): - for d in data: - try: - self.kafka_producer.send_messages( - self.topic, json.dumps(d)) - except Exception as e: - LOG.exception(_LE("Failed to send sample data: %s"), e) - raise + messaging.raise_delivery_failure(e) diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 34cda432..66822814 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -22,10 +22,12 @@ import operator from oslo_config import cfg from oslo_log import log import oslo_messaging +from oslo_utils import encodeutils +from oslo_utils import excutils import six import six.moves.urllib.parse as urlparse -from ceilometer.i18n import _ +from ceilometer.i18n import _, _LE from ceilometer import messaging from ceilometer import publisher from ceilometer.publisher import utils @@ -67,6 +69,18 @@ cfg.CONF.register_opts(NOTIFIER_OPTS, cfg.CONF.import_opt('host', 'ceilometer.service') +class DeliveryFailure(Exception): + def __init__(self, message=None, cause=None): + super(DeliveryFailure, self).__init__(message) + self.cause = cause + + +def raise_delivery_failure(exc): + excutils.raise_with_cause(DeliveryFailure, + encodeutils.exception_to_unicode(exc), + cause=exc) + + @six.add_metaclass(abc.ABCMeta) class MessagingPublisher(publisher.PublisherBase): @@ -81,6 +95,7 @@ class MessagingPublisher(publisher.PublisherBase): self.policy = options.get('policy', ['default'])[-1] self.max_queue_length = int(options.get( 'max_queue_length', [1024])[-1]) + self.max_retry = 0 self.local_queue = [] @@ -144,11 +159,12 @@ class MessagingPublisher(publisher.PublisherBase): "dropping %d oldest samples") % count) def _process_queue(self, queue, policy): + current_retry = 0 while queue: context, topic, data = queue[0] try: self._send(context, topic, data) - except oslo_messaging.MessageDeliveryFailure: + except DeliveryFailure: data = sum([len(m) for __, __, m in queue]) if policy == 'queue': LOG.warn(_("Failed to publish %d datapoints, queue them"), @@ -158,8 +174,11 @@ class MessagingPublisher(publisher.PublisherBase): LOG.warn(_("Failed to publish %d datapoints, " "dropping them"), data) return [] - # default, occur only if rabbit_max_retries > 0 - raise + current_retry += 1 + if current_retry >= self.max_retry: + LOG.exception(_LE("Failed to retry to send sample data " + "with max_retry times")) + raise else: queue.pop(0) return [] @@ -195,8 +214,11 @@ class RPCPublisher(MessagingPublisher): ) def _send(self, context, topic, meters): - self.rpc_client.prepare(topic=topic).cast(context, self.target, - data=meters) + try: + self.rpc_client.prepare(topic=topic).cast(context, self.target, + data=meters) + except oslo_messaging.MessageDeliveryFailure as e: + raise_delivery_failure(e) class NotifierPublisher(MessagingPublisher): @@ -213,8 +235,11 @@ class NotifierPublisher(MessagingPublisher): ) def _send(self, context, event_type, data): - self.notifier.sample(context.to_dict(), event_type=event_type, - payload=data) + try: + self.notifier.sample(context.to_dict(), event_type=event_type, + payload=data) + except oslo_messaging.MessageDeliveryFailure as e: + raise_delivery_failure(e) class SampleNotifierPublisher(NotifierPublisher): diff --git a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py index d7ea0ba9..9daaaef1 100644 --- a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py +++ b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py @@ -22,12 +22,14 @@ from oslo_utils import netutils from ceilometer.event.storage import models as event from ceilometer.publisher import kafka_broker as kafka +from ceilometer.publisher import messaging as msg_publisher from ceilometer import sample from ceilometer.tests import base as tests_base @mock.patch('ceilometer.publisher.kafka_broker.LOG', mock.Mock()) -@mock.patch.object(kafka.KafkaBrokerPublisher, '_get_client', mock.Mock()) +@mock.patch('ceilometer.publisher.kafka_broker.kafka.KafkaClient', + mock.Mock()) class TestKafkaPublisher(tests_base.BaseTestCase): test_event_data = [ event.Event(message_id=uuid.uuid4(), @@ -95,25 +97,22 @@ class TestKafkaPublisher(tests_base.BaseTestCase): ), ] - def setUp(self): - super(TestKafkaPublisher, self).setUp() - def test_publish(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer')) - with mock.patch.object(publisher, '_send') as fake_send: + with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_samples(mock.MagicMock(), self.test_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_without_options(self): publisher = kafka.KafkaBrokerPublisher( netutils.urlsplit('kafka://127.0.0.1:9092')) - with mock.patch.object(publisher, '_send') as fake_send: + with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_samples(mock.MagicMock(), self.test_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_without_policy(self): @@ -129,39 +128,40 @@ class TestKafkaPublisher(tests_base.BaseTestCase): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=default')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = TypeError - self.assertRaises(TypeError, publisher.publish_samples, + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = TypeError + self.assertRaises(msg_publisher.DeliveryFailure, + publisher.publish_samples, mock.MagicMock(), self.test_data) - self.assertEqual(100, len(fake_send.mock_calls)) + self.assertEqual(100, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_with_drop_policy(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = Exception("test") + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") publisher.publish_samples(mock.MagicMock(), self.test_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(1, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_with_queue_policy(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = Exception("test") + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") publisher.publish_samples(mock.MagicMock(), self.test_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(1, len(fake_producer.send_messages.mock_calls)) self.assertEqual(1, len(publisher.local_queue)) def test_publish_to_down_host_with_default_queue_size(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = Exception('No Connection') + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") for i in range(0, 2000): for s in self.test_data: @@ -170,16 +170,16 @@ class TestKafkaPublisher(tests_base.BaseTestCase): self.assertEqual(1024, len(publisher.local_queue)) self.assertEqual('test-976', - publisher.local_queue[0][0]['counter_name']) + publisher.local_queue[0][2][0]['counter_name']) self.assertEqual('test-1999', - publisher.local_queue[1023][0]['counter_name']) + publisher.local_queue[1023][2][0]['counter_name']) def test_publish_to_host_from_down_to_up_with_queue(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = Exception('No Connection') + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") for i in range(0, 16): for s in self.test_data: s.name = 'test-%d' % i @@ -187,7 +187,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase): self.assertEqual(16, len(publisher.local_queue)) - fake_send.side_effect = None + fake_producer.send_messages.side_effect = None for s in self.test_data: s.name = 'test-%d' % 16 publisher.publish_samples(mock.MagicMock(), self.test_data) @@ -197,13 +197,14 @@ class TestKafkaPublisher(tests_base.BaseTestCase): publisher = kafka.KafkaBrokerPublisher( netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer')) - with mock.patch.object(publisher, '_send') as fake_send: + with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_events(mock.MagicMock(), self.test_event_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = TypeError - self.assertRaises(TypeError, publisher.publish_events, + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") + self.assertRaises(msg_publisher.DeliveryFailure, + publisher.publish_events, mock.MagicMock(), self.test_event_data) - self.assertEqual(100, len(fake_send.mock_calls)) + self.assertEqual(100, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py index 23420307..3bd26487 100644 --- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py +++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py @@ -21,7 +21,6 @@ import eventlet import mock from oslo_config import fixture as fixture_config from oslo_context import context -import oslo_messaging from oslo_utils import netutils import testscenarios.testcase @@ -250,11 +249,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_no_policy(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) @@ -267,11 +266,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_policy_block(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=default' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) @@ -283,11 +282,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_policy_incorrect(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=notexist' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.warn.called) @@ -303,7 +302,7 @@ class TestPublisherPolicyReactions(TestPublisher): def test_published_with_policy_drop_and_rpc_down(self): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=drop' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect getattr(publisher, self.pub_func)(mock.MagicMock(), @@ -315,7 +314,7 @@ class TestPublisherPolicyReactions(TestPublisher): def test_published_with_policy_queue_and_rpc_down(self): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect @@ -330,7 +329,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect getattr(publisher, self.pub_func)(mock.MagicMock(), @@ -354,7 +353,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls(netutils.urlsplit( '%s://?policy=queue&max_queue_length=3' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect for i in range(0, 5): @@ -381,7 +380,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect for i in range(0, 2000):