From eb8e6ff7e259e68ec35ce0e6e1345dd6e7cfb7a4 Mon Sep 17 00:00:00 2001 From: Witek Bedyk Date: Mon, 18 Nov 2019 11:06:12 +0100 Subject: [PATCH] Allow passing config options to Kafka producer The change allows passing supported options as dictionary to create a Kafka producer object. Change-Id: I7627a8caa2d6bb9c7789df143e72a4085060b164 Story: 2006059 Task: 37531 --- monasca_common/confluent_kafka/producer.py | 7 ++++--- monasca_common/kafka/client_factory.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/monasca_common/confluent_kafka/producer.py b/monasca_common/confluent_kafka/producer.py index a3ccc935..23b74afc 100644 --- a/monasca_common/confluent_kafka/producer.py +++ b/monasca_common/confluent_kafka/producer.py @@ -22,16 +22,17 @@ log = logging.getLogger(__name__) class KafkaProducer(object): """Wrapper around asynchronous Kafka Producer""" - def __init__(self, bootstrap_servers): + def __init__(self, bootstrap_servers, **config): """ Create new Producer wrapper instance. :param str bootstrap_servers: Initial list of brokers as a CSV list of broker host or host:port. + :param config Configuration properties """ - self._producer = confluent_kafka.Producer({'bootstrap.servers': - bootstrap_servers}) + config['bootstrap.servers'] = bootstrap_servers + self._producer = confluent_kafka.Producer(config) @staticmethod def delivery_report(err, msg): diff --git a/monasca_common/kafka/client_factory.py b/monasca_common/kafka/client_factory.py index 6347172b..97b7ae57 100644 --- a/monasca_common/kafka/client_factory.py +++ b/monasca_common/kafka/client_factory.py @@ -49,8 +49,8 @@ def get_kafka_consumer(kafka_url, ) -def get_kafka_producer(kafka_url, use_legacy_client=False): +def get_kafka_producer(kafka_url, use_legacy_client=False, **config): if use_legacy_client: return legacy_kafka_producer.KafkaProducer(kafka_url) else: - return producer.KafkaProducer(",".join(kafka_url)) + return producer.KafkaProducer(",".join(kafka_url), **config)