Use Confluent Kafka producer for logs

As already implemented for metrics, also logs should be published to
Apache Kafka using the new Kafka client.

Change-Id: Ie909b13c7692267e787d481f4de658db3b07a1c4
Story: 2003705
Task: 36866
This commit is contained in:
Witek Bedyk 2019-10-08 13:30:53 +02:00 committed by Witold Bedyk
parent c3993bf48a
commit 394be3f330
3 changed files with 19 additions and 21 deletions

View File

@ -16,7 +16,7 @@
import time
import falcon
from monasca_common.kafka import producer
from monasca_common.kafka import client_factory
from monasca_common.rest import utils as rest_utils
from oslo_log import log
from oslo_utils import encodeutils
@ -63,9 +63,8 @@ class LogPublisher(object):
self._topics = CONF.kafka.logs_topics
self.max_message_size = CONF.log_publisher.max_message_size
self._kafka_publisher = producer.KafkaProducer(
url=CONF.kafka.uri
)
self._kafka_publisher = client_factory.get_kafka_producer(
CONF.kafka.uri, CONF.kafka.legacy_kafka_client_enabled)
LOG.info('Initializing LogPublisher <%s>', self)

View File

@ -35,8 +35,8 @@ EPOCH_START = datetime.datetime(1970, 1, 1)
class TestSendMessage(base.BaseTestCase):
@mock.patch('monasca_api.api.core.log.log_publisher.producer'
'.KafkaProducer')
@mock.patch('monasca_api.api.core.log.log_publisher.client_factory'
'.get_kafka_producer')
def test_should_not_send_empty_message(self, _):
instance = log_publisher.LogPublisher()
@ -51,8 +51,8 @@ class TestSendMessage(base.BaseTestCase):
not_dict_value = 123
instance.send_message(not_dict_value)
@mock.patch('monasca_api.api.core.log.log_publisher.producer'
'.KafkaProducer')
@mock.patch('monasca_api.api.core.log.log_publisher.client_factory'
'.get_kafka_producer')
def test_should_not_send_message_missing_keys(self, _):
# checks every combination of missing keys
# test does not rely on those keys having a value or not,
@ -75,8 +75,8 @@ class TestSendMessage(base.BaseTestCase):
instance.send_message,
message)
@mock.patch('monasca_api.api.core.log.log_publisher.producer'
'.KafkaProducer')
@mock.patch('monasca_api.api.core.log.log_publisher.client_factory'
'.get_kafka_producer')
def test_should_not_send_message_missing_values(self, _):
# original message assumes that every property has value
# test modify each property one by one by removing that value
@ -99,8 +99,8 @@ class TestSendMessage(base.BaseTestCase):
instance.send_message,
tmp_message)
@mock.patch('monasca_api.api.core.log.log_publisher.producer'
'.KafkaProducer')
@mock.patch('monasca_api.api.core.log.log_publisher.client_factory'
'.get_kafka_producer')
def test_should_send_message(self, kafka_producer):
instance = log_publisher.LogPublisher()
instance._kafka_publisher = kafka_producer
@ -134,8 +134,8 @@ class TestSendMessage(base.BaseTestCase):
cfg.CONF.kafka.logs_topics[0],
[ujson.dumps(msg, ensure_ascii=False).encode('utf-8')])
@mock.patch('monasca_api.api.core.log.log_publisher.producer'
'.KafkaProducer')
@mock.patch('monasca_api.api.core.log.log_publisher.client_factory'
'.get_kafka_producer')
def test_should_send_message_multiple_topics(self, _):
topics = ['logs_topics', 'analyzer', 'tester']
self.conf_override(logs_topics=topics,
@ -179,8 +179,8 @@ class TestSendMessage(base.BaseTestCase):
topic,
[json_msg.encode('utf-8')])
@mock.patch('monasca_api.api.core.log.log_publisher.producer'
'.KafkaProducer')
@mock.patch('monasca_api.api.core.log.log_publisher.client_factory'
'.get_kafka_producer')
def test_should_send_unicode_message(self, kp):
instance = log_publisher.LogPublisher()
instance._kafka_publisher = kp
@ -217,9 +217,8 @@ class TestSendMessage(base.BaseTestCase):
raise
@mock.patch(
'monasca_api.api.core.log.log_publisher.producer'
'.KafkaProducer')
@mock.patch('monasca_api.api.core.log.log_publisher.client_factory'
'.get_kafka_producer')
class TestTruncation(base.BaseTestCase):
EXTRA_CHARS_SIZE = len(bytearray(ujson.dumps({
'log': {

View File

@ -157,8 +157,8 @@ class TestApiLogs(base.BaseApiTestCase):
class TestUnicodeLogs(base.BaseApiTestCase):
@mock.patch('monasca_api.api.core.log.log_publisher.producer.'
'KafkaProducer')
@mock.patch('monasca_api.api.core.log.log_publisher.client_factory'
'.get_kafka_producer')
def test_should_send_unicode_messages(self, _):
_init_resource(self)