diff --git a/monasca_common/kafka/consumer.py b/monasca_common/kafka/consumer.py index 7b9a08ce..670ac1f1 100644 --- a/monasca_common/kafka/consumer.py +++ b/monasca_common/kafka/consumer.py @@ -95,6 +95,12 @@ class KafkaConsumer(object): def _create_kafka_consumer(self, partitions=None): # No auto-commit so that commits only happen after the message is processed. + + # auto_offset_reset is a param that alters where the current offset in the consumer + # will modify from (see whence param in SimpleConsumer.seek()). It is imperative to set + # this param as either "largest" or "smallest" depending on where we would like + # to modify the offset from, no matter what whence is set to. + consumer = kafka_consumer.SimpleConsumer( self._kafka, self._kafka_group, @@ -104,7 +110,8 @@ class KafkaConsumer(object): iter_timeout=5, fetch_size_bytes=self._kafka_fetch_size, buffer_size=self._kafka_fetch_size, - max_buffer_size=None) + max_buffer_size=None, + auto_offset_reset="smallest") consumer.provide_partition_info() consumer.fetch_last_known_offsets() @@ -127,7 +134,7 @@ class KafkaConsumer(object): # an OffsetOutOfRangeError. We trap this error and seek to # the head of the current Kafka data. Because this error # only happens when Kafka removes data we're currently - # pointing at we're gauranteed that we won't read any + # pointing at we're guaranteed that we won't read any # duplicate data however we will lose any information # between our current offset and the new Kafka head.