Fixed Kafka read error

If Kafka has to resize the partitions while the notification_engine is off then
once the notification_engine is restarted the kafka python library generates an
OffsetOutOfRangeError exception if our current location is no longer a valid
offset.  Now we trap the exception, seek to the front of the currently
available data, and resume processing from there.

Change-Id: I37b805265d042adddc2367fda8267f319b1acd2d
This commit is contained in:
Joe Keen 2015-03-03 17:00:00 -07:00
parent 89d495e2d9
commit 19cee45612
1 changed files with 24 additions and 8 deletions

View File

@ -113,14 +113,30 @@ class KafkaConsumer(BaseProcessor):
self._consumer.fetch_offsets = partitioned_fetch_offsets
for message in self._consumer:
if not set_partitioner.acquired:
break
consumed_from_kafka += 1
log.debug("Consuming message from kafka, "
"partition {}, offset {}".format(message[0],
message[1].offset))
yield message
# When Kafka resizes the partitions it's possible that it
# will remove data at our current offset. When this
# happens the next attempt to read from Kafka will generate
# an OffsetOutOfRangeError. We trap this error and seek to
# the head of the current Kafka data. Because this error
# only happens when Kafka removes data we're currently
# pointing at we're gauranteed that we won't read any
# duplicate data however we will lose any information
# between our current offset and the new Kafka head.
try:
for message in self._consumer:
if not set_partitioner.acquired:
break
consumed_from_kafka += 1
log.debug("Consuming message from kafka, "
"partition {}, offset {}".
format(message[0], message[1].offset))
yield message
except kafka.common.OffsetOutOfRangeError:
log.error("Kafka OffsetOutOfRange. Jumping to head.")
self._consumer.seek(0, 0)
elif set_partitioner.allocating:
log.info("Waiting to acquire locks on partition set")