Merge "Add support for synchronous commit"

This commit is contained in:
Zuul 2018-01-29 16:07:32 +00:00 committed by Gerrit Code Review
commit 66c056dd56
3 changed files with 18 additions and 5 deletions

View File

@ -153,6 +153,8 @@ class ConsumerConnection(Connection):
self.consumer_timeout = driver_conf.kafka_consumer_timeout
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
self.group_id = driver_conf.consumer_group
self.enable_auto_commit = driver_conf.enable_auto_commit
self.max_poll_records = driver_conf.max_poll_records
self._consume_loop_stopped = False
@with_reconnect()
@ -165,6 +167,10 @@ class ConsumerConnection(Connection):
# NOTE(sileht): really ? you return payload but no messages...
# simulate timeout to consume message again
raise kafka.errors.ConsumerTimeout()
if not self.enable_auto_commit:
self.consumer.commit()
return messages
def consume(self, timeout=None):
@ -204,14 +210,12 @@ class ConsumerConnection(Connection):
@with_reconnect()
def declare_topic_consumer(self, topics, group=None):
# TODO(Support for manual/auto_commit functionality)
# When auto_commit is False, consumer can manually notify
# the completion of the subscription.
# Currently we don't support for non auto commit option
self.consumer = kafka.KafkaConsumer(
*topics, group_id=(group or self.group_id),
enable_auto_commit=self.enable_auto_commit,
bootstrap_servers=self.hostaddrs,
max_partition_fetch_bytes=self.max_fetch_bytes,
max_poll_records=self.max_poll_records,
selector=KAFKA_SELECTOR
)

View File

@ -56,7 +56,14 @@ KAFKA_OPTS = [
"in seconds"),
cfg.IntOpt('producer_batch_size', default=16384,
help='Size of batch for the producer async send')
help='Size of batch for the producer async send'),
cfg.BoolOpt('enable_auto_commit',
default=False,
help='Enable asynchronous consumer commits'),
cfg.IntOpt('max_poll_records', default=500,
help='The maximum number of records returned in a poll call')
]

View File

@ -115,8 +115,10 @@ class TestKafkaDriver(test_utils.BaseTestCase):
targets_and_priorities, "kafka_test", 1000, 10)
consumer.assert_called_once_with(
*expected_topics, group_id="kafka_test",
enable_auto_commit=mock.ANY,
bootstrap_servers=['localhost:9092'],
max_partition_fetch_bytes=mock.ANY,
max_poll_records=mock.ANY,
selector=mock.ANY
)