diff --git a/devstack/files/monasca-transform/monasca-transform.conf b/devstack/files/monasca-transform/monasca-transform.conf index 6ee431b..e907cf1 100644 --- a/devstack/files/monasca-transform/monasca-transform.conf +++ b/devstack/files/monasca-transform/monasca-transform.conf @@ -29,6 +29,7 @@ enable_instance_usage_df_cache = True instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_batch_time_filtering = True data_provider=monasca_transform.processor.pre_hourly_processor:PreHourlyProcessorDataProvider +effective_batch_revision=2 # # Configurable values for the monasca-transform service diff --git a/etc/monasca-transform.conf b/etc/monasca-transform.conf index 53c1e0f..70a1d7d 100644 --- a/etc/monasca-transform.conf +++ b/etc/monasca-transform.conf @@ -27,6 +27,7 @@ enable_pre_hourly_processor = True enable_instance_usage_df_cache = True instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_batch_time_filtering = True +effective_batch_revision=2 # # Configurable values for the monasca-transform service diff --git a/monasca_transform/config/config_initializer.py b/monasca_transform/config/config_initializer.py index 40b59ff..0269957 100644 --- a/monasca_transform/config/config_initializer.py +++ b/monasca_transform/config/config_initializer.py @@ -143,7 +143,8 @@ class ConfigInitializer(object): 'PreHourlyProcessorDataProvider'), cfg.BoolOpt('enable_instance_usage_df_cache'), cfg.StrOpt('instance_usage_df_cache_storage_level'), - cfg.BoolOpt('enable_batch_time_filtering') + cfg.BoolOpt('enable_batch_time_filtering'), + cfg.IntOpt('effective_batch_revision', default=2) ] app_group = cfg.OptGroup(name='pre_hourly_processor', title='pre_hourly_processor') diff --git a/monasca_transform/mysql_offset_specs.py b/monasca_transform/mysql_offset_specs.py index 653f076..b09486c 100644 --- a/monasca_transform/mysql_offset_specs.py +++ b/monasca_transform/mysql_offset_specs.py @@ -79,9 +79,10 @@ class MySQLOffsetSpecs(OffsetSpecs): version_spec.revision = revision revision = revision + 1 - self.session.query(MySQLOffsetSpec).filter( - MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete( - synchronize_session="fetch") + # delete any revisions excess than required + self.session.query(MySQLOffsetSpec).filter( + MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete( + synchronize_session="fetch") def get_kafka_offsets(self, app_name): return {'%s_%s_%s' % ( @@ -90,6 +91,13 @@ class MySQLOffsetSpecs(OffsetSpecs): MySQLOffsetSpec.app_name == app_name, MySQLOffsetSpec.revision == 1).all()} + def get_kafka_offsets_by_revision(self, app_name, revision): + return {'%s_%s_%s' % ( + offset.get_app_name(), offset.get_topic(), offset.get_partition() + ): offset for offset in self.session.query(MySQLOffsetSpec).filter( + MySQLOffsetSpec.app_name == app_name, + MySQLOffsetSpec.revision == revision).all()} + def get_most_recent_batch_time_from_offsets(self, app_name, topic): try: # get partition 0 as a representative of all others diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index 299ee5f..b01b8e2 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -288,16 +288,123 @@ class PreHourlyProcessor(Processor): """ return simport.load(cfg.CONF.repositories.offsets)() + @staticmethod + def get_effective_offset_range_list(offset_range_list): + """get effective batch offset range. + Effective batch offset range covers offsets starting + from effective batch revision (defined by effective_batch_revision + config property). By default this method will set the + pyspark Offset.fromOffset for each partition + to have value older than the latest revision + (defaults to latest -1) so that prehourly processor has access + to entire data for the hour. This will also account for and cover + any early arriving data (data that arrives before the start hour). + """ + + offset_specifications = PreHourlyProcessor.get_offset_specs() + + app_name = PreHourlyProcessor.get_app_name() + + topic = PreHourlyProcessor.get_kafka_topic() + + # start offset revision + effective_batch_revision = cfg.CONF.pre_hourly_processor.\ + effective_batch_revision + + effective_batch_spec = offset_specifications\ + .get_kafka_offsets_by_revision(app_name, + effective_batch_revision) + + # get latest revision, if penultimate is unavailable + if not effective_batch_spec: + log.debug("effective batch spec: offsets: revision %s unavailable," + " getting the latest revision instead..." % ( + effective_batch_revision)) + # not available + effective_batch_spec = offset_specifications.get_kafka_offsets( + app_name) + + effective_batch_offsets = PreHourlyProcessor._parse_saved_offsets( + app_name, topic, + effective_batch_spec) + + # for debugging + for effective_key in effective_batch_offsets.keys(): + effective_offset = effective_batch_offsets.get(effective_key, + None) + (effect_app_name, + effect_topic_name, + effect_partition, + effect_from_offset, + effect_until_offset) = effective_offset + log.debug( + "effective batch offsets (from db):" + " OffSetRanges: %s %s %s %s" % ( + effect_topic_name, effect_partition, + effect_from_offset, effect_until_offset)) + + # effective batch revision + effective_offset_range_list = [] + for offset_range in offset_range_list: + part_topic_key = "_".join((offset_range.topic, + str(offset_range.partition))) + effective_offset = effective_batch_offsets.get(part_topic_key, + None) + if effective_offset: + (effect_app_name, + effect_topic_name, + effect_partition, + effect_from_offset, + effect_until_offset) = effective_offset + + log.debug( + "Extending effective offset range:" + " OffSetRanges: %s %s %s-->%s %s" % ( + effect_topic_name, effect_partition, + offset_range.fromOffset, + effect_from_offset, + effect_until_offset)) + + effective_offset_range_list.append( + OffsetRange(offset_range.topic, + offset_range.partition, + effect_from_offset, + offset_range.untilOffset)) + else: + effective_offset_range_list.append( + OffsetRange(offset_range.topic, + offset_range.partition, + offset_range.fromOffset, + offset_range.untilOffset)) + + # return effective offset range list + return effective_offset_range_list + @staticmethod def fetch_pre_hourly_data(spark_context, offset_range_list): """get metrics pre hourly data from offset range list.""" + for o in offset_range_list: + log.debug( + "fetch_pre_hourly: offset_range_list:" + " OffSetRanges: %s %s %s %s" % ( + o.topic, o.partition, o.fromOffset, o.untilOffset)) + + effective_offset_list = PreHourlyProcessor.\ + get_effective_offset_range_list(offset_range_list) + + for o in effective_offset_list: + log.debug( + "fetch_pre_hourly: effective_offset_range_list:" + " OffSetRanges: %s %s %s %s" % ( + o.topic, o.partition, o.fromOffset, o.untilOffset)) + # get kafka stream over the same offsets pre_hourly_rdd = KafkaUtils.createRDD(spark_context, {"metadata.broker.list": cfg.CONF.messaging.brokers}, - offset_range_list) + effective_offset_list) return pre_hourly_rdd @staticmethod @@ -339,10 +446,17 @@ class PreHourlyProcessor(Processor): app_name, topic)) if most_recent_batch_time: + # batches can fire after late metrics slack time, not neccessarily + # at the top of the hour + most_recent_batch_time_truncated = most_recent_batch_time.replace( + minute=0, second=0, microsecond=0) + log.debug("filter out records before : %s" % ( + most_recent_batch_time_truncated.strftime( + '%Y-%m-%dT%H:%M:%S'))) # filter out records before current batch instance_usage_df = instance_usage_df.filter( instance_usage_df.lastrecord_timestamp_string >= - most_recent_batch_time) + most_recent_batch_time_truncated) # determine the timestamp of the most recent top-of-the-hour (which # is the end of the current batch). @@ -351,6 +465,9 @@ class PreHourlyProcessor(Processor): minute=0, second=0, microsecond=0) # filter out records after current batch + log.debug("filter out records after : %s" % ( + truncated_timestamp_to_current_hour.strftime( + '%Y-%m-%dT%H:%M:%S'))) instance_usage_df = instance_usage_df.filter( instance_usage_df.firstrecord_timestamp_string < truncated_timestamp_to_current_hour) diff --git a/tests/functional/processor/test_pre_hourly_processor_agg.py b/tests/functional/processor/test_pre_hourly_processor_agg.py index 4b38ba3..247d412 100644 --- a/tests/functional/processor/test_pre_hourly_processor_agg.py +++ b/tests/functional/processor/test_pre_hourly_processor_agg.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import datetime import os import random import sys @@ -21,10 +22,15 @@ from collections import defaultdict import mock from oslo_config import cfg +from oslo_utils import uuidutils from pyspark.streaming.kafka import OffsetRange +from monasca_common.kafka_lib.common import OffsetResponse + from monasca_transform.config.config_initializer import ConfigInitializer +from monasca_transform.mysql_offset_specs import MySQLOffsetSpecs from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor + from tests.functional.component.insert.dummy_insert import DummyInsert from tests.functional.json_offset_specs import JSONOffsetSpecs from tests.functional.messaging.adapter import DummyAdapter @@ -49,6 +55,174 @@ class TestPreHourlyProcessorAgg(SparkContextTest): DummyAdapter.init() DummyAdapter.adapter_impl.metric_list = [] + # get mysql offset specs + self.kafka_offset_specs = MySQLOffsetSpecs() + + def add_offset_for_test(self, my_app, my_topic, my_partition, + my_from_offset, my_until_offset, + my_batch_time): + """"utility method to populate mysql db with offsets.""" + self.kafka_offset_specs.add(topic=my_topic, partition=my_partition, + app_name=my_app, + from_offset=my_from_offset, + until_offset=my_until_offset, + batch_time_info=my_batch_time) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_app_name') + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_kafka_topic') + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor._get_offsets_from_kafka') + def test_get_processing_offset_range_list(self, + kafka_get_offsets, + kafka_topic_name, + app_name): + + # setup + my_app = uuidutils.generate_uuid() + my_topic = uuidutils.generate_uuid() + + # mock app_name, topic_name, partition + app_name.return_value = my_app + kafka_topic_name.return_value = my_topic + my_partition = 1 + + ret_offset_key = "_".join((my_topic, str(my_partition))) + kafka_get_offsets.side_effect = [ + # mock latest offsets + {ret_offset_key: OffsetResponse(topic=my_topic, + partition=my_partition, + error=None, + offsets=[30])}, + # mock earliest offsets + {ret_offset_key: OffsetResponse(topic=my_topic, + partition=my_partition, + error=None, + offsets=[0])} + ] + + # add offsets + my_until_offset = 0 + my_from_offset = 10 + my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00', + '%Y-%m-%d %H:%M:%S') + self.add_offset_for_test(my_app, my_topic, + my_partition, my_until_offset, + my_from_offset, my_batch_time) + + my_until_offset_2 = 10 + my_from_offset_2 = 20 + my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00', + '%Y-%m-%d %H:%M:%S') + self.add_offset_for_test(my_app, my_topic, + my_partition, my_until_offset_2, + my_from_offset_2, my_batch_time_2) + + # get latest offset spec as dict + current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00', + '%Y-%m-%d %H:%M:%S') + + # use mysql offset repositories + cfg.CONF.set_override( + 'offsets', + 'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs', + group='repositories') + + # list of pyspark.streaming.kafka.OffsetRange objects + offset_range_list = PreHourlyProcessor.\ + get_processing_offset_range_list( + current_batch_time) + + self.assertEqual(my_partition, + offset_range_list[0].partition) + self.assertEqual(my_topic, + offset_range_list[0].topic) + self.assertEqual(20, + offset_range_list[0].fromOffset) + self.assertEqual(30, + offset_range_list[0].untilOffset) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_app_name') + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_kafka_topic') + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor._get_offsets_from_kafka') + def test_get_effective_offset_range_list(self, + kafka_get_offsets, + kafka_topic_name, + app_name): + # setup + my_app = uuidutils.generate_uuid() + my_topic = uuidutils.generate_uuid() + + # mock app_name, topic_name, partition + app_name.return_value = my_app + kafka_topic_name.return_value = my_topic + my_partition = 1 + + ret_offset_key = "_".join((my_topic, str(my_partition))) + kafka_get_offsets.side_effect = [ + # mock latest offsets in kafka + {ret_offset_key: OffsetResponse(topic=my_topic, + partition=my_partition, + error=None, + offsets=[3000])}, + # mock earliest offsets in kafka + {ret_offset_key: OffsetResponse(topic=my_topic, + partition=my_partition, + error=None, + offsets=[0])} + ] + + # add offsets + my_until_offset = 500 + my_from_offset = 1000 + my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00', + '%Y-%m-%d %H:%M:%S') + self.add_offset_for_test(my_app, my_topic, + my_partition, my_until_offset, + my_from_offset, my_batch_time) + + my_until_offset_2 = 1000 + my_from_offset_2 = 2000 + my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00', + '%Y-%m-%d %H:%M:%S') + self.add_offset_for_test(my_app, my_topic, + my_partition, my_until_offset_2, + my_from_offset_2, my_batch_time_2) + + # get latest offset spec as dict + current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00', + '%Y-%m-%d %H:%M:%S') + + # use mysql offset repositories + cfg.CONF.set_override( + 'offsets', + 'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs', + group='repositories') + + # list of pyspark.streaming.kafka.OffsetRange objects + offset_range_list = PreHourlyProcessor.\ + get_processing_offset_range_list( + current_batch_time) + + # effective batch range list + # should cover range of starting from (latest - 1) offset version to + # latest + offset_range_list = PreHourlyProcessor.get_effective_offset_range_list( + offset_range_list) + + self.assertEqual(my_partition, + offset_range_list[0].partition) + self.assertEqual(my_topic, + offset_range_list[0].topic) + self.assertEqual(500, + offset_range_list[0].fromOffset) + self.assertEqual(3000, + offset_range_list[0].untilOffset) + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', DummyInsert) @mock.patch('monasca_transform.processor.pre_hourly_processor.' diff --git a/tests/functional/test_mysql_kafka_offsets.py b/tests/functional/test_mysql_kafka_offsets.py index 7b9f658..1c2f64b 100644 --- a/tests/functional/test_mysql_kafka_offsets.py +++ b/tests/functional/test_mysql_kafka_offsets.py @@ -164,3 +164,48 @@ class TestMySQLOffsetSpecs(unittest.TestCase): int(offset_value.get_from_offset())) self.assertEqual(used_value.get('app_name'), offset_value.get_app_name()) + + def test_get_offset_by_revision(self): + topic_1 = uuidutils.generate_uuid() + partition_1 = 0 + until_offset_1 = 10 + from_offset_1 = 0 + app_name_1 = uuidutils.generate_uuid() + + my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00', + '%Y-%m-%d %H:%M:%S') + + self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, + app_name=app_name_1, + from_offset=from_offset_1, + until_offset=until_offset_1, + batch_time_info=my_batch_time) + + until_offset_2 = 20 + from_offset_2 = 10 + my_batch_time2 = datetime.datetime.strptime('2016-01-01 01:10:00', + '%Y-%m-%d %H:%M:%S') + + self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, + app_name=app_name_1, + from_offset=from_offset_2, + until_offset=until_offset_2, + batch_time_info=my_batch_time2) + + # get penultimate revision + penultimate_revision = 2 + kafka_offset_specs = self.kafka_offset_specs\ + .get_kafka_offsets_by_revision(app_name_1, + penultimate_revision) + + offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1) + offset_value_1 = kafka_offset_specs.get(offset_key_1) + + used_values = {} + used_values[offset_key_1] = { + "topic": topic_1, "partition": partition_1, "app_name": app_name_1, + "from_offset": from_offset_1, "until_offset": until_offset_1 + } + + self.assertions_on_offset(used_value=used_values.get(offset_key_1), + offset_value=offset_value_1) diff --git a/tests/functional/test_resources/config/test_config_with_dummy_messaging_adapter.conf b/tests/functional/test_resources/config/test_config_with_dummy_messaging_adapter.conf index dfb5c38..62ac225 100644 --- a/tests/functional/test_resources/config/test_config_with_dummy_messaging_adapter.conf +++ b/tests/functional/test_resources/config/test_config_with_dummy_messaging_adapter.conf @@ -14,8 +14,16 @@ enable_pre_hourly_processor = False enable_instance_usage_df_cache = False instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_batch_time_filtering = True +effective_batch_revision = 2 [service] enable_record_store_df_cache = False record_store_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_debug_log_entries = true + +[database] +server_type = mysql:thin +host = localhost +database_name = monasca_transform +username = m-transform +password = password \ No newline at end of file