Allow passing config options to Kafka producer

The change allows passing supported options as dictionary to create a
Kafka producer object.

Change-Id: I7627a8caa2d6bb9c7789df143e72a4085060b164
Story: 2006059
Task: 37531
This commit is contained in:
Witek Bedyk 2019-11-18 11:06:12 +01:00
parent 66754b1a4a
commit eb8e6ff7e2
2 changed files with 6 additions and 5 deletions

View File

@ -22,16 +22,17 @@ log = logging.getLogger(__name__)
class KafkaProducer(object):
"""Wrapper around asynchronous Kafka Producer"""
def __init__(self, bootstrap_servers):
def __init__(self, bootstrap_servers, **config):
"""
Create new Producer wrapper instance.
:param str bootstrap_servers: Initial list of brokers as a CSV
list of broker host or host:port.
:param config Configuration properties
"""
self._producer = confluent_kafka.Producer({'bootstrap.servers':
bootstrap_servers})
config['bootstrap.servers'] = bootstrap_servers
self._producer = confluent_kafka.Producer(config)
@staticmethod
def delivery_report(err, msg):

View File

@ -49,8 +49,8 @@ def get_kafka_consumer(kafka_url,
)
def get_kafka_producer(kafka_url, use_legacy_client=False):
def get_kafka_producer(kafka_url, use_legacy_client=False, **config):
if use_legacy_client:
return legacy_kafka_producer.KafkaProducer(kafka_url)
else:
return producer.KafkaProducer(",".join(kafka_url))
return producer.KafkaProducer(",".join(kafka_url), **config)