diff --git a/monasca_common/confluent_kafka/producer.py b/monasca_common/confluent_kafka/producer.py index a3ccc935..23b74afc 100644 --- a/monasca_common/confluent_kafka/producer.py +++ b/monasca_common/confluent_kafka/producer.py @@ -22,16 +22,17 @@ log = logging.getLogger(__name__) class KafkaProducer(object): """Wrapper around asynchronous Kafka Producer""" - def __init__(self, bootstrap_servers): + def __init__(self, bootstrap_servers, **config): """ Create new Producer wrapper instance. :param str bootstrap_servers: Initial list of brokers as a CSV list of broker host or host:port. + :param config Configuration properties """ - self._producer = confluent_kafka.Producer({'bootstrap.servers': - bootstrap_servers}) + config['bootstrap.servers'] = bootstrap_servers + self._producer = confluent_kafka.Producer(config) @staticmethod def delivery_report(err, msg): diff --git a/monasca_common/kafka/client_factory.py b/monasca_common/kafka/client_factory.py index 6347172b..97b7ae57 100644 --- a/monasca_common/kafka/client_factory.py +++ b/monasca_common/kafka/client_factory.py @@ -49,8 +49,8 @@ def get_kafka_consumer(kafka_url, ) -def get_kafka_producer(kafka_url, use_legacy_client=False): +def get_kafka_producer(kafka_url, use_legacy_client=False, **config): if use_legacy_client: return legacy_kafka_producer.KafkaProducer(kafka_url) else: - return producer.KafkaProducer(",".join(kafka_url)) + return producer.KafkaProducer(",".join(kafka_url), **config)