Merge "Support kafka connection timeout option"

This commit is contained in:
Zuul 2019-08-09 15:45:58 +00:00 committed by Gerrit Code Review
commit 2a6cc6cc0a
3 changed files with 13 additions and 7 deletions

View File

@ -25,7 +25,7 @@ class KafkaConsumer(object):
def __init__(self, bootstrap_servers, group_id, topic,
fetch_min_bytes=1048576, client_id="",
repartition_callback=None, commit_callback=None,
max_commit_interval=30):
max_commit_interval=30, timeout=10000):
"""
Create new high-level Consumer instance.
@ -43,10 +43,12 @@ class KafkaConsumer(object):
:param callable commit_callback: Callback function responsible for
calling the commit() method.
:param int max_commit_interval: Maximum time in seconds between commits.
:param int timeout: Client group session and failure detection timeout.
"""
consumer_config = {'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'session.timeout.ms': timeout,
'fetch.min.bytes': fetch_min_bytes,
'client.id': client_id,
'enable.auto.commit': False,

View File

@ -18,13 +18,14 @@ import logging
import threading
import time
import monasca_common.kafka_lib.client as kafka_client
import monasca_common.kafka_lib.common as kafka_common
import monasca_common.kafka_lib.consumer as kafka_consumer
from kazoo.client import KazooClient
from kazoo.recipe.partitioner import SetPartitioner
import monasca_common.kafka_lib.client as kafka_client
import monasca_common.kafka_lib.common as kafka_common
from monasca_common.kafka_lib.conn import DEFAULT_SOCKET_TIMEOUT_SECONDS
import monasca_common.kafka_lib.consumer as kafka_consumer
log = logging.getLogger(__name__)
"""Kafka consumer interface
@ -52,7 +53,8 @@ class KafkaConsumer(object):
fetch_size=1048576,
repartition_callback=None,
commit_callback=None,
commit_timeout=30):
commit_timeout=30,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
"""Init
kafka_url - Kafka location
@ -68,6 +70,7 @@ class KafkaConsumer(object):
commit_callback - Callback to run when the commit_timeout
has elapsed between commits.
commit_timeout - Timeout between commits.
timeout - kafka connection timeout
"""
self._kazoo_client = None
@ -89,7 +92,7 @@ class KafkaConsumer(object):
self._zookeeper_url = zookeeper_url
self._zookeeper_path = zookeeper_path
self._kafka = kafka_client.KafkaClient(kafka_url)
self._kafka = kafka_client.KafkaClient(kafka_url, timeout=timeout)
self._consumer = self._create_kafka_consumer()

View File

@ -123,6 +123,7 @@ class TestConfluentKafkaConsumer(base.BaseTestCase):
def test_kafka_consumer_init(self):
expected_config = {'group.id': 'fake_group',
'session.timeout.ms': 10000,
'bootstrap.servers': ['fake_server1',
'fake_server2'],
'fetch.min.bytes': 128,