From c189feeb8ba3cea2887c76233eb0fd6d0bec1707 Mon Sep 17 00:00:00 2001 From: Ashwin Agate Date: Fri, 6 Jan 2017 22:14:42 +0000 Subject: [PATCH] Delete hourly offsets from offsets table Pre Hourly processor fails if offsets recorded in kafka_offsets table no longer exist in kafka. This change deletes the offsets from kafka_offsets table, so that the pre hourly processor can resume processing with the next run. Change-Id: I017c271e630fdf6de05a73b3bfcb14f5ed18615f --- monasca_transform/driver/mon_metrics_kafka.py | 4 ++++ monasca_transform/processor/pre_hourly_processor.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/monasca_transform/driver/mon_metrics_kafka.py b/monasca_transform/driver/mon_metrics_kafka.py index 1d5db43..8f3542f 100644 --- a/monasca_transform/driver/mon_metrics_kafka.py +++ b/monasca_transform/driver/mon_metrics_kafka.py @@ -579,5 +579,9 @@ def invoke(): MonMetricsKafkaProcessor.reset_kafka_offsets(application_name) + # delete pre hourly processor offsets + if cfg.CONF.stage_processors.pre_hourly_processor_enabled: + PreHourlyProcessor.reset_kafka_offsets() + if __name__ == "__main__": invoke() diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index c5cc152..024121b 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -84,8 +84,11 @@ class PreHourlyProcessor(Processor): batch_time_info) @staticmethod - def reset_kafka_offsets(app_name): + def reset_kafka_offsets(): """delete all offsets from the offset specification.""" + + app_name = PreHourlyProcessor.get_app_name() + # get the offsets from global var offset_specs = simport.load(cfg.CONF.repositories.offsets)() offset_specs.delete_all_kafka_offsets(app_name)