From 2da390414e7e644a18ffd8680175dddb36ae6cf5 Mon Sep 17 00:00:00 2001 From: agateaaa Date: Tue, 4 Apr 2017 17:24:00 -0700 Subject: [PATCH] Hourly aggregation account for early arrving metrics With this change pre hourly processor which does the hourly aggregation (second stage) and writes the final aggregated metrics to metris topic in kafka now accounts for any early arriving metrics. This change along with two previous changes to pre hourly processor that added 1.) configurable late metrics slack time (https://review.openstack.org/#/c/394497/), and 2.) batch filtering (https://review.openstack.org/#/c/363100/) will make sure all late arriving or early arriving metrics for an hour are aggregated appropriately. Also made improvement in MySQL offset to call delete excess revisions only once. Change-Id: I919cddf343821fe52ad6a1d4170362311f84c0e4 --- .../monasca-transform/monasca-transform.conf | 1 + etc/monasca-transform.conf | 1 + .../config/config_initializer.py | 3 +- monasca_transform/mysql_offset_specs.py | 14 +- .../processor/pre_hourly_processor.py | 121 +++++++++++- .../test_pre_hourly_processor_agg.py | 174 ++++++++++++++++++ tests/functional/test_mysql_kafka_offsets.py | 45 +++++ ...t_config_with_dummy_messaging_adapter.conf | 8 + 8 files changed, 361 insertions(+), 6 deletions(-) diff --git a/devstack/files/monasca-transform/monasca-transform.conf b/devstack/files/monasca-transform/monasca-transform.conf index 6ee431b..e907cf1 100644 --- a/devstack/files/monasca-transform/monasca-transform.conf +++ b/devstack/files/monasca-transform/monasca-transform.conf @@ -29,6 +29,7 @@ enable_instance_usage_df_cache = True instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_batch_time_filtering = True data_provider=monasca_transform.processor.pre_hourly_processor:PreHourlyProcessorDataProvider +effective_batch_revision=2 # # Configurable values for the monasca-transform service diff --git a/etc/monasca-transform.conf b/etc/monasca-transform.conf index 53c1e0f..70a1d7d 100644 --- a/etc/monasca-transform.conf +++ b/etc/monasca-transform.conf @@ -27,6 +27,7 @@ enable_pre_hourly_processor = True enable_instance_usage_df_cache = True instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_batch_time_filtering = True +effective_batch_revision=2 # # Configurable values for the monasca-transform service diff --git a/monasca_transform/config/config_initializer.py b/monasca_transform/config/config_initializer.py index 40b59ff..0269957 100644 --- a/monasca_transform/config/config_initializer.py +++ b/monasca_transform/config/config_initializer.py @@ -143,7 +143,8 @@ class ConfigInitializer(object): 'PreHourlyProcessorDataProvider'), cfg.BoolOpt('enable_instance_usage_df_cache'), cfg.StrOpt('instance_usage_df_cache_storage_level'), - cfg.BoolOpt('enable_batch_time_filtering') + cfg.BoolOpt('enable_batch_time_filtering'), + cfg.IntOpt('effective_batch_revision', default=2) ] app_group = cfg.OptGroup(name='pre_hourly_processor', title='pre_hourly_processor') diff --git a/monasca_transform/mysql_offset_specs.py b/monasca_transform/mysql_offset_specs.py index 653f076..b09486c 100644 --- a/monasca_transform/mysql_offset_specs.py +++ b/monasca_transform/mysql_offset_specs.py @@ -79,9 +79,10 @@ class MySQLOffsetSpecs(OffsetSpecs): version_spec.revision = revision revision = revision + 1 - self.session.query(MySQLOffsetSpec).filter( - MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete( - synchronize_session="fetch") + # delete any revisions excess than required + self.session.query(MySQLOffsetSpec).filter( + MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete( + synchronize_session="fetch") def get_kafka_offsets(self, app_name): return {'%s_%s_%s' % ( @@ -90,6 +91,13 @@ class MySQLOffsetSpecs(OffsetSpecs): MySQLOffsetSpec.app_name == app_name, MySQLOffsetSpec.revision == 1).all()} + def get_kafka_offsets_by_revision(self, app_name, revision): + return {'%s_%s_%s' % ( + offset.get_app_name(), offset.get_topic(), offset.get_partition() + ): offset for offset in self.session.query(MySQLOffsetSpec).filter( + MySQLOffsetSpec.app_name == app_name, + MySQLOffsetSpec.revision == revision).all()} + def get_most_recent_batch_time_from_offsets(self, app_name, topic): try: # get partition 0 as a representative of all others diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index 299ee5f..b01b8e2 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -288,16 +288,123 @@ class PreHourlyProcessor(Processor): """ return simport.load(cfg.CONF.repositories.offsets)() + @staticmethod + def get_effective_offset_range_list(offset_range_list): + """get effective batch offset range. + Effective batch offset range covers offsets starting + from effective batch revision (defined by effective_batch_revision + config property). By default this method will set the + pyspark Offset.fromOffset for each partition + to have value older than the latest revision + (defaults to latest -1) so that prehourly processor has access + to entire data for the hour. This will also account for and cover + any early arriving data (data that arrives before the start hour). + """ + + offset_specifications = PreHourlyProcessor.get_offset_specs() + + app_name = PreHourlyProcessor.get_app_name() + + topic = PreHourlyProcessor.get_kafka_topic() + + # start offset revision + effective_batch_revision = cfg.CONF.pre_hourly_processor.\ + effective_batch_revision + + effective_batch_spec = offset_specifications\ + .get_kafka_offsets_by_revision(app_name, + effective_batch_revision) + + # get latest revision, if penultimate is unavailable + if not effective_batch_spec: + log.debug("effective batch spec: offsets: revision %s unavailable," + " getting the latest revision instead..." % ( + effective_batch_revision)) + # not available + effective_batch_spec = offset_specifications.get_kafka_offsets( + app_name) + + effective_batch_offsets = PreHourlyProcessor._parse_saved_offsets( + app_name, topic, + effective_batch_spec) + + # for debugging + for effective_key in effective_batch_offsets.keys(): + effective_offset = effective_batch_offsets.get(effective_key, + None) + (effect_app_name, + effect_topic_name, + effect_partition, + effect_from_offset, + effect_until_offset) = effective_offset + log.debug( + "effective batch offsets (from db):" + " OffSetRanges: %s %s %s %s" % ( + effect_topic_name, effect_partition, + effect_from_offset, effect_until_offset)) + + # effective batch revision + effective_offset_range_list = [] + for offset_range in offset_range_list: + part_topic_key = "_".join((offset_range.topic, + str(offset_range.partition))) + effective_offset = effective_batch_offsets.get(part_topic_key, + None) + if effective_offset: + (effect_app_name, + effect_topic_name, + effect_partition, + effect_from_offset, + effect_until_offset) = effective_offset + + log.debug( + "Extending effective offset range:" + " OffSetRanges: %s %s %s-->%s %s" % ( + effect_topic_name, effect_partition, + offset_range.fromOffset, + effect_from_offset, + effect_until_offset)) + + effective_offset_range_list.append( + OffsetRange(offset_range.topic, + offset_range.partition, + effect_from_offset, + offset_range.untilOffset)) + else: + effective_offset_range_list.append( + OffsetRange(offset_range.topic, + offset_range.partition, + offset_range.fromOffset, + offset_range.untilOffset)) + + # return effective offset range list + return effective_offset_range_list + @staticmethod def fetch_pre_hourly_data(spark_context, offset_range_list): """get metrics pre hourly data from offset range list.""" + for o in offset_range_list: + log.debug( + "fetch_pre_hourly: offset_range_list:" + " OffSetRanges: %s %s %s %s" % ( + o.topic, o.partition, o.fromOffset, o.untilOffset)) + + effective_offset_list = PreHourlyProcessor.\ + get_effective_offset_range_list(offset_range_list) + + for o in effective_offset_list: + log.debug( + "fetch_pre_hourly: effective_offset_range_list:" + " OffSetRanges: %s %s %s %s" % ( + o.topic, o.partition, o.fromOffset, o.untilOffset)) + # get kafka stream over the same offsets pre_hourly_rdd = KafkaUtils.createRDD(spark_context, {"metadata.broker.list": cfg.CONF.messaging.brokers}, - offset_range_list) + effective_offset_list) return pre_hourly_rdd @staticmethod @@ -339,10 +446,17 @@ class PreHourlyProcessor(Processor): app_name, topic)) if most_recent_batch_time: + # batches can fire after late metrics slack time, not neccessarily + # at the top of the hour + most_recent_batch_time_truncated = most_recent_batch_time.replace( + minute=0, second=0, microsecond=0) + log.debug("filter out records before : %s" % ( + most_recent_batch_time_truncated.strftime( + '%Y-%m-%dT%H:%M:%S'))) # filter out records before current batch instance_usage_df = instance_usage_df.filter( instance_usage_df.lastrecord_timestamp_string >= - most_recent_batch_time) + most_recent_batch_time_truncated) # determine the timestamp of the most recent top-of-the-hour (which # is the end of the current batch). @@ -351,6 +465,9 @@ class PreHourlyProcessor(Processor): minute=0, second=0, microsecond=0) # filter out records after current batch + log.debug("filter out records after : %s" % ( + truncated_timestamp_to_current_hour.strftime( + '%Y-%m-%dT%H:%M:%S'))) instance_usage_df = instance_usage_df.filter( instance_usage_df.firstrecord_timestamp_string < truncated_timestamp_to_current_hour) diff --git a/tests/functional/processor/test_pre_hourly_processor_agg.py b/tests/functional/processor/test_pre_hourly_processor_agg.py index 4b38ba3..247d412 100644 --- a/tests/functional/processor/test_pre_hourly_processor_agg.py +++ b/tests/functional/processor/test_pre_hourly_processor_agg.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import datetime import os import random import sys @@ -21,10 +22,15 @@ from collections import defaultdict import mock from oslo_config import cfg +from oslo_utils import uuidutils from pyspark.streaming.kafka import OffsetRange +from monasca_common.kafka_lib.common import OffsetResponse + from monasca_transform.config.config_initializer import ConfigInitializer +from monasca_transform.mysql_offset_specs import MySQLOffsetSpecs from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor + from tests.functional.component.insert.dummy_insert import DummyInsert from tests.functional.json_offset_specs import JSONOffsetSpecs from tests.functional.messaging.adapter import DummyAdapter @@ -49,6 +55,174 @@ class TestPreHourlyProcessorAgg(SparkContextTest): DummyAdapter.init() DummyAdapter.adapter_impl.metric_list = [] + # get mysql offset specs + self.kafka_offset_specs = MySQLOffsetSpecs() + + def add_offset_for_test(self, my_app, my_topic, my_partition, + my_from_offset, my_until_offset, + my_batch_time): + """"utility method to populate mysql db with offsets.""" + self.kafka_offset_specs.add(topic=my_topic, partition=my_partition, + app_name=my_app, + from_offset=my_from_offset, + until_offset=my_until_offset, + batch_time_info=my_batch_time) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_app_name') + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_kafka_topic') + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor._get_offsets_from_kafka') + def test_get_processing_offset_range_list(self, + kafka_get_offsets, + kafka_topic_name, + app_name): + + # setup + my_app = uuidutils.generate_uuid() + my_topic = uuidutils.generate_uuid() + + # mock app_name, topic_name, partition + app_name.return_value = my_app + kafka_topic_name.return_value = my_topic + my_partition = 1 + + ret_offset_key = "_".join((my_topic, str(my_partition))) + kafka_get_offsets.side_effect = [ + # mock latest offsets + {ret_offset_key: OffsetResponse(topic=my_topic, + partition=my_partition, + error=None, + offsets=[30])}, + # mock earliest offsets + {ret_offset_key: OffsetResponse(topic=my_topic, + partition=my_partition, + error=None, + offsets=[0])} + ] + + # add offsets + my_until_offset = 0 + my_from_offset = 10 + my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00', + '%Y-%m-%d %H:%M:%S') + self.add_offset_for_test(my_app, my_topic, + my_partition, my_until_offset, + my_from_offset, my_batch_time) + + my_until_offset_2 = 10 + my_from_offset_2 = 20 + my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00', + '%Y-%m-%d %H:%M:%S') + self.add_offset_for_test(my_app, my_topic, + my_partition, my_until_offset_2, + my_from_offset_2, my_batch_time_2) + + # get latest offset spec as dict + current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00', + '%Y-%m-%d %H:%M:%S') + + # use mysql offset repositories + cfg.CONF.set_override( + 'offsets', + 'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs', + group='repositories') + + # list of pyspark.streaming.kafka.OffsetRange objects + offset_range_list = PreHourlyProcessor.\ + get_processing_offset_range_list( + current_batch_time) + + self.assertEqual(my_partition, + offset_range_list[0].partition) + self.assertEqual(my_topic, + offset_range_list[0].topic) + self.assertEqual(20, + offset_range_list[0].fromOffset) + self.assertEqual(30, + offset_range_list[0].untilOffset) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_app_name') + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_kafka_topic') + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor._get_offsets_from_kafka') + def test_get_effective_offset_range_list(self, + kafka_get_offsets, + kafka_topic_name, + app_name): + # setup + my_app = uuidutils.generate_uuid() + my_topic = uuidutils.generate_uuid() + + # mock app_name, topic_name, partition + app_name.return_value = my_app + kafka_topic_name.return_value = my_topic + my_partition = 1 + + ret_offset_key = "_".join((my_topic, str(my_partition))) + kafka_get_offsets.side_effect = [ + # mock latest offsets in kafka + {ret_offset_key: OffsetResponse(topic=my_topic, + partition=my_partition, + error=None, + offsets=[3000])}, + # mock earliest offsets in kafka + {ret_offset_key: OffsetResponse(topic=my_topic, + partition=my_partition, + error=None, + offsets=[0])} + ] + + # add offsets + my_until_offset = 500 + my_from_offset = 1000 + my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00', + '%Y-%m-%d %H:%M:%S') + self.add_offset_for_test(my_app, my_topic, + my_partition, my_until_offset, + my_from_offset, my_batch_time) + + my_until_offset_2 = 1000 + my_from_offset_2 = 2000 + my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00', + '%Y-%m-%d %H:%M:%S') + self.add_offset_for_test(my_app, my_topic, + my_partition, my_until_offset_2, + my_from_offset_2, my_batch_time_2) + + # get latest offset spec as dict + current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00', + '%Y-%m-%d %H:%M:%S') + + # use mysql offset repositories + cfg.CONF.set_override( + 'offsets', + 'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs', + group='repositories') + + # list of pyspark.streaming.kafka.OffsetRange objects + offset_range_list = PreHourlyProcessor.\ + get_processing_offset_range_list( + current_batch_time) + + # effective batch range list + # should cover range of starting from (latest - 1) offset version to + # latest + offset_range_list = PreHourlyProcessor.get_effective_offset_range_list( + offset_range_list) + + self.assertEqual(my_partition, + offset_range_list[0].partition) + self.assertEqual(my_topic, + offset_range_list[0].topic) + self.assertEqual(500, + offset_range_list[0].fromOffset) + self.assertEqual(3000, + offset_range_list[0].untilOffset) + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', DummyInsert) @mock.patch('monasca_transform.processor.pre_hourly_processor.' diff --git a/tests/functional/test_mysql_kafka_offsets.py b/tests/functional/test_mysql_kafka_offsets.py index 7b9f658..1c2f64b 100644 --- a/tests/functional/test_mysql_kafka_offsets.py +++ b/tests/functional/test_mysql_kafka_offsets.py @@ -164,3 +164,48 @@ class TestMySQLOffsetSpecs(unittest.TestCase): int(offset_value.get_from_offset())) self.assertEqual(used_value.get('app_name'), offset_value.get_app_name()) + + def test_get_offset_by_revision(self): + topic_1 = uuidutils.generate_uuid() + partition_1 = 0 + until_offset_1 = 10 + from_offset_1 = 0 + app_name_1 = uuidutils.generate_uuid() + + my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00', + '%Y-%m-%d %H:%M:%S') + + self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, + app_name=app_name_1, + from_offset=from_offset_1, + until_offset=until_offset_1, + batch_time_info=my_batch_time) + + until_offset_2 = 20 + from_offset_2 = 10 + my_batch_time2 = datetime.datetime.strptime('2016-01-01 01:10:00', + '%Y-%m-%d %H:%M:%S') + + self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, + app_name=app_name_1, + from_offset=from_offset_2, + until_offset=until_offset_2, + batch_time_info=my_batch_time2) + + # get penultimate revision + penultimate_revision = 2 + kafka_offset_specs = self.kafka_offset_specs\ + .get_kafka_offsets_by_revision(app_name_1, + penultimate_revision) + + offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1) + offset_value_1 = kafka_offset_specs.get(offset_key_1) + + used_values = {} + used_values[offset_key_1] = { + "topic": topic_1, "partition": partition_1, "app_name": app_name_1, + "from_offset": from_offset_1, "until_offset": until_offset_1 + } + + self.assertions_on_offset(used_value=used_values.get(offset_key_1), + offset_value=offset_value_1) diff --git a/tests/functional/test_resources/config/test_config_with_dummy_messaging_adapter.conf b/tests/functional/test_resources/config/test_config_with_dummy_messaging_adapter.conf index dfb5c38..62ac225 100644 --- a/tests/functional/test_resources/config/test_config_with_dummy_messaging_adapter.conf +++ b/tests/functional/test_resources/config/test_config_with_dummy_messaging_adapter.conf @@ -14,8 +14,16 @@ enable_pre_hourly_processor = False enable_instance_usage_df_cache = False instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_batch_time_filtering = True +effective_batch_revision = 2 [service] enable_record_store_df_cache = False record_store_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_debug_log_entries = true + +[database] +server_type = mysql:thin +host = localhost +database_name = monasca_transform +username = m-transform +password = password \ No newline at end of file