Fixed inconfigurable kafka consumer offset location

PROBLEM: Consumer offset was resetting to the latest index rather than the earliest
SOLUTION: Modified consumer creation to include `auto_offset_reset="smallest"` which
	allows the offset to reset to the earliest known index.

NOTE: This does exactly what the whence parameter in SimpleConsumer.seek()
	is expected to do, however in order to achieve this functionality,
	the parameter `auto_offset_reset` MUST be set to either "largest"
	or "smallest".

Change-Id: I887892d80f2da9619c7f11737b3ab2e1d1dacf1e
This commit is contained in:
Habeeb Mohammed 2017-09-22 18:21:08 -06:00
parent 5711485cf2
commit 94e0a059b5
1 changed files with 9 additions and 2 deletions

View File

@ -95,6 +95,12 @@ class KafkaConsumer(object):
def _create_kafka_consumer(self, partitions=None):
# No auto-commit so that commits only happen after the message is processed.
# auto_offset_reset is a param that alters where the current offset in the consumer
# will modify from (see whence param in SimpleConsumer.seek()). It is imperative to set
# this param as either "largest" or "smallest" depending on where we would like
# to modify the offset from, no matter what whence is set to.
consumer = kafka_consumer.SimpleConsumer(
self._kafka,
self._kafka_group,
@ -104,7 +110,8 @@ class KafkaConsumer(object):
iter_timeout=5,
fetch_size_bytes=self._kafka_fetch_size,
buffer_size=self._kafka_fetch_size,
max_buffer_size=None)
max_buffer_size=None,
auto_offset_reset="smallest")
consumer.provide_partition_info()
consumer.fetch_last_known_offsets()
@ -127,7 +134,7 @@ class KafkaConsumer(object):
# an OffsetOutOfRangeError. We trap this error and seek to
# the head of the current Kafka data. Because this error
# only happens when Kafka removes data we're currently
# pointing at we're gauranteed that we won't read any
# pointing at we're guaranteed that we won't read any
# duplicate data however we will lose any information
# between our current offset and the new Kafka head.