Remove class KafkaBrokerPublisher
Remove class KafkaBrokerPublisher and use NotifierPublisher instead Change-Id: I12fb8666c9af485c9bf9aace8eee08f2e4683e09
This commit is contained in:
parent
1cb713f3e2
commit
2002373017
|
@ -1,101 +0,0 @@
|
|||
#
|
||||
# Copyright 2015 Cisco Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from debtcollector import removals
|
||||
import kafka
|
||||
from oslo_log import log
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import netutils
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
from ceilometer.publisher import messaging
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@removals.removed_class("KafkaBrokerPublisher",
|
||||
message="use NotifierPublisher instead",
|
||||
removal_version='10.0')
|
||||
class KafkaBrokerPublisher(messaging.MessagingPublisher):
|
||||
"""Publish metering data to kafka broker.
|
||||
|
||||
The ip address and port number of kafka broker should be configured in
|
||||
ceilometer pipeline configuration file. If an ip address is not specified,
|
||||
this kafka publisher will not publish any meters.
|
||||
|
||||
To enable this publisher, add the following section to the
|
||||
/etc/ceilometer/pipeline.yaml file or simply add it to an existing
|
||||
pipeline::
|
||||
|
||||
meter:
|
||||
- name: meter_kafka
|
||||
meters:
|
||||
- "*"
|
||||
sinks:
|
||||
- kafka_sink
|
||||
sinks:
|
||||
- name: kafka_sink
|
||||
transformers:
|
||||
publishers:
|
||||
- kafka://[kafka_broker_ip]:[kafka_broker_port]?topic=[topic]
|
||||
|
||||
Kafka topic name and broker's port are required for this publisher to work
|
||||
properly. If topic parameter is missing, this kafka publisher publish
|
||||
metering data under a topic name, 'ceilometer'. If the port number is not
|
||||
specified, this Kafka Publisher will use 9092 as the broker's port.
|
||||
This publisher has transmit options such as queue, drop, and retry. These
|
||||
options are specified using policy field of URL parameter. When queue
|
||||
option could be selected, local queue length can be determined using
|
||||
max_queue_length field as well. When the transfer fails with retry
|
||||
option, try to resend the data as many times as specified in max_retry
|
||||
field. If max_retry is not specified, default the number of retry is 100.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, parsed_url):
|
||||
super(KafkaBrokerPublisher, self).__init__(conf, parsed_url)
|
||||
options = urlparse.parse_qs(parsed_url.query)
|
||||
|
||||
self._producer = None
|
||||
self._host, self._port = netutils.parse_host_port(
|
||||
parsed_url.netloc, default_port=9092)
|
||||
self._topic = options.get('topic', ['ceilometer'])[-1]
|
||||
self.max_retry = int(options.get('max_retry', [100])[-1])
|
||||
|
||||
def _ensure_connection(self):
|
||||
if self._producer:
|
||||
return
|
||||
|
||||
try:
|
||||
self._producer = kafka.KafkaProducer(
|
||||
bootstrap_servers=["%s:%s" % (self._host, self._port)])
|
||||
except kafka.errors.KafkaError as e:
|
||||
LOG.exception("Failed to connect to Kafka service: %s", e)
|
||||
raise messaging.DeliveryFailure('Kafka Client is not available, '
|
||||
'please restart Kafka client')
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to connect to Kafka service: %s", e)
|
||||
raise messaging.DeliveryFailure('Kafka Client is not available, '
|
||||
'please restart Kafka client')
|
||||
|
||||
def _send(self, event_type, data):
|
||||
self._ensure_connection()
|
||||
# TODO(sileht): don't split the payload into multiple network
|
||||
# message ... but how to do that without breaking consuming
|
||||
# application...
|
||||
try:
|
||||
for d in data:
|
||||
self._producer.send(self._topic, jsonutils.dumps(d))
|
||||
except Exception as e:
|
||||
messaging.raise_delivery_failure(e)
|
|
@ -1,213 +0,0 @@
|
|||
#
|
||||
# Copyright 2015 Cisco Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for ceilometer/publisher/kafka_broker.py
|
||||
"""
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from oslo_utils import netutils
|
||||
|
||||
from ceilometer.event.storage import models as event
|
||||
from ceilometer.publisher import kafka_broker as kafka
|
||||
from ceilometer.publisher import messaging as msg_publisher
|
||||
from ceilometer import sample
|
||||
from ceilometer import service
|
||||
from ceilometer.tests import base as tests_base
|
||||
|
||||
|
||||
@mock.patch('ceilometer.publisher.kafka_broker.LOG', mock.Mock())
|
||||
class TestKafkaPublisher(tests_base.BaseTestCase):
|
||||
test_event_data = [
|
||||
event.Event(message_id=uuid.uuid4(),
|
||||
event_type='event_%d' % i,
|
||||
generated=datetime.datetime.utcnow(),
|
||||
traits=[], raw={})
|
||||
for i in range(0, 5)
|
||||
]
|
||||
|
||||
test_data = [
|
||||
sample.Sample(
|
||||
name='test',
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='',
|
||||
volume=1,
|
||||
user_id='test',
|
||||
project_id='test',
|
||||
resource_id='test_run_tasks',
|
||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||
resource_metadata={'name': 'TestPublish'},
|
||||
),
|
||||
sample.Sample(
|
||||
name='test',
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='',
|
||||
volume=1,
|
||||
user_id='test',
|
||||
project_id='test',
|
||||
resource_id='test_run_tasks',
|
||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||
resource_metadata={'name': 'TestPublish'},
|
||||
),
|
||||
sample.Sample(
|
||||
name='test2',
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='',
|
||||
volume=1,
|
||||
user_id='test',
|
||||
project_id='test',
|
||||
resource_id='test_run_tasks',
|
||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||
resource_metadata={'name': 'TestPublish'},
|
||||
),
|
||||
sample.Sample(
|
||||
name='test2',
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='',
|
||||
volume=1,
|
||||
user_id='test',
|
||||
project_id='test',
|
||||
resource_id='test_run_tasks',
|
||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||
resource_metadata={'name': 'TestPublish'},
|
||||
),
|
||||
sample.Sample(
|
||||
name='test3',
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='',
|
||||
volume=1,
|
||||
user_id='test',
|
||||
project_id='test',
|
||||
resource_id='test_run_tasks',
|
||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||
resource_metadata={'name': 'TestPublish'},
|
||||
),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestKafkaPublisher, self).setUp()
|
||||
self.CONF = service.prepare_service([], [])
|
||||
|
||||
def test_publish(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer'))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
publisher.publish_samples(self.test_data)
|
||||
self.assertEqual(5, len(fake_producer.send.mock_calls))
|
||||
self.assertEqual(0, len(publisher.local_queue))
|
||||
|
||||
def test_publish_without_options(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(
|
||||
self.CONF, netutils.urlsplit('kafka://127.0.0.1:9092'))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
publisher.publish_samples(self.test_data)
|
||||
self.assertEqual(5, len(fake_producer.send.mock_calls))
|
||||
self.assertEqual(0, len(publisher.local_queue))
|
||||
|
||||
def test_publish_to_host_without_policy(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer'))
|
||||
self.assertEqual('default', publisher.policy)
|
||||
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer&policy=test'))
|
||||
self.assertEqual('default', publisher.policy)
|
||||
|
||||
def test_publish_to_host_with_default_policy(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer&policy=default'))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
fake_producer.send.side_effect = TypeError
|
||||
self.assertRaises(msg_publisher.DeliveryFailure,
|
||||
publisher.publish_samples,
|
||||
self.test_data)
|
||||
self.assertEqual(100, len(fake_producer.send.mock_calls))
|
||||
self.assertEqual(0, len(publisher.local_queue))
|
||||
|
||||
def test_publish_to_host_with_drop_policy(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop'))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
fake_producer.send.side_effect = Exception("test")
|
||||
publisher.publish_samples(self.test_data)
|
||||
self.assertEqual(1, len(fake_producer.send.mock_calls))
|
||||
self.assertEqual(0, len(publisher.local_queue))
|
||||
|
||||
def test_publish_to_host_with_queue_policy(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
fake_producer.send.side_effect = Exception("test")
|
||||
publisher.publish_samples(self.test_data)
|
||||
self.assertEqual(1, len(fake_producer.send.mock_calls))
|
||||
self.assertEqual(1, len(publisher.local_queue))
|
||||
|
||||
def test_publish_to_down_host_with_default_queue_size(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
fake_producer.send.side_effect = Exception("test")
|
||||
|
||||
for i in range(0, 2000):
|
||||
for s in self.test_data:
|
||||
s.name = 'test-%d' % i
|
||||
publisher.publish_samples(self.test_data)
|
||||
|
||||
self.assertEqual(1024, len(publisher.local_queue))
|
||||
self.assertEqual('test-976',
|
||||
publisher.local_queue[0][1][0]['counter_name'])
|
||||
self.assertEqual('test-1999',
|
||||
publisher.local_queue[1023][1][0]['counter_name'])
|
||||
|
||||
def test_publish_to_host_from_down_to_up_with_queue(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
fake_producer.send.side_effect = Exception("test")
|
||||
for i in range(0, 16):
|
||||
for s in self.test_data:
|
||||
s.name = 'test-%d' % i
|
||||
publisher.publish_samples(self.test_data)
|
||||
|
||||
self.assertEqual(16, len(publisher.local_queue))
|
||||
|
||||
fake_producer.send.side_effect = None
|
||||
for s in self.test_data:
|
||||
s.name = 'test-%d' % 16
|
||||
publisher.publish_samples(self.test_data)
|
||||
self.assertEqual(0, len(publisher.local_queue))
|
||||
|
||||
def test_publish_event_with_default_policy(self):
|
||||
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
|
||||
'kafka://127.0.0.1:9092?topic=ceilometer'))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
publisher.publish_events(self.test_event_data)
|
||||
self.assertEqual(5, len(fake_producer.send.mock_calls))
|
||||
|
||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||
fake_producer.send.side_effect = Exception("test")
|
||||
self.assertRaises(msg_publisher.DeliveryFailure,
|
||||
publisher.publish_events,
|
||||
self.test_event_data)
|
||||
self.assertEqual(100, len(fake_producer.send.mock_calls))
|
||||
self.assertEqual(0, len(publisher.local_queue))
|
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
upgrade:
|
||||
- |
|
||||
The deprecated kafka publisher has been removed, use NotifierPublisher instead.
|
|
@ -245,7 +245,6 @@ ceilometer.sample.publisher =
|
|||
udp = ceilometer.publisher.udp:UDPPublisher
|
||||
file = ceilometer.publisher.file:FilePublisher
|
||||
direct = ceilometer.publisher.direct:DirectPublisher
|
||||
kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher
|
||||
http = ceilometer.publisher.http:HttpPublisher
|
||||
https = ceilometer.publisher.http:HttpPublisher
|
||||
gnocchi = ceilometer.publisher.direct:DirectPublisher
|
||||
|
@ -258,7 +257,6 @@ ceilometer.event.publisher =
|
|||
test = ceilometer.publisher.test:TestPublisher
|
||||
direct = ceilometer.publisher.direct:DirectPublisher
|
||||
notifier = ceilometer.publisher.messaging:EventNotifierPublisher
|
||||
kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher
|
||||
http = ceilometer.publisher.http:HttpPublisher
|
||||
https = ceilometer.publisher.http:HttpPublisher
|
||||
gnocchi = ceilometer.publisher.direct:DirectPublisher
|
||||
|
|
Loading…
Reference in New Issue