diff --git a/devstack/files/monasca-transform/monasca-transform.conf b/devstack/files/monasca-transform/monasca-transform.conf index 792c140..cf8d540 100644 --- a/devstack/files/monasca-transform/monasca-transform.conf +++ b/devstack/files/monasca-transform/monasca-transform.conf @@ -24,9 +24,11 @@ topic_pre_hourly = metrics_pre_hourly pre_hourly_processor_enabled = True [pre_hourly_processor] +late_metric_slack_time = 600 enable_instance_usage_df_cache = True instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_batch_time_filtering = True +data_provider=monasca_transform.processor.pre_hourly_processor:PreHourlyProcessorDataProvider # # Configurable values for the monasca-transform service diff --git a/monasca_transform/config/config_initializer.py b/monasca_transform/config/config_initializer.py index e1f76f4..6a2fd3c 100644 --- a/monasca_transform/config/config_initializer.py +++ b/monasca_transform/config/config_initializer.py @@ -135,6 +135,11 @@ class ConfigInitializer(object): @staticmethod def load_pre_hourly_processor_options(): app_opts = [ + cfg.IntOpt('late_metric_slack_time', default=600), + cfg.StrOpt('data_provider', + default='monasca_transform.processor.' + 'pre_hourly_processor:' + 'PreHourlyProcessorDataProvider'), cfg.BoolOpt('enable_instance_usage_df_cache'), cfg.StrOpt('instance_usage_df_cache_storage_level'), cfg.BoolOpt('enable_batch_time_filtering') diff --git a/monasca_transform/driver/mon_metrics_kafka.py b/monasca_transform/driver/mon_metrics_kafka.py index 7aa4114..1d5db43 100644 --- a/monasca_transform/driver/mon_metrics_kafka.py +++ b/monasca_transform/driver/mon_metrics_kafka.py @@ -498,9 +498,8 @@ class MonMetricsKafkaProcessor(object): batch_time_info) # call pre hourly processor, if its time to run - if (cfg.CONF.stage_processors.pre_hourly_processor_enabled - is True and PreHourlyProcessor.is_time_to_run( - batch_time_info)): + if (cfg.CONF.stage_processors.pre_hourly_processor_enabled and + PreHourlyProcessor.is_time_to_run(batch_time_info)): PreHourlyProcessor.run_processor( record_store_df.rdd.context, batch_time_info) diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index 5f244cb..c5cc152 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -34,6 +34,8 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \ import DataDrivenSpecsRepoFactory from monasca_transform.log_utils import LogUtils from monasca_transform.processor import Processor +from monasca_transform.processor.processor_util import PreHourlyProcessorUtil +from monasca_transform.processor.processor_util import ProcessUtilDataProvider from monasca_transform.transform.storage_utils import \ InvalidCacheStorageLevelException from monasca_transform.transform.storage_utils import StorageUtils @@ -44,6 +46,18 @@ ConfigInitializer.basic_config() log = LogUtils.init_logger(__name__) +class PreHourlyProcessorDataProvider(ProcessUtilDataProvider): + + def get_last_processed(self): + offset_specifications = PreHourlyProcessor.get_offset_specs() + app_name = PreHourlyProcessor.get_app_name() + topic = PreHourlyProcessor.get_kafka_topic() + most_recent_batch_time = ( + offset_specifications.get_most_recent_batch_time_from_offsets( + app_name, topic)) + return most_recent_batch_time + + class PreHourlyProcessor(Processor): """Processor to process usage data published to metrics_pre_hourly topic a and publish final rolled up metrics to metrics topic in kafka. @@ -90,17 +104,7 @@ class PreHourlyProcessor(Processor): @staticmethod def is_time_to_run(check_time): - """return True if its time to run this processor. - For now it just checks to see if its start of the hour - i.e. the minute is 00. - """ - this_min = int(datetime.datetime.strftime(check_time, '%M')) - - # run pre hourly processor only at top of the hour - if this_min == 0: - return True - else: - return False + return PreHourlyProcessorUtil.is_time_to_run(check_time) @staticmethod def _get_offsets_from_kafka(brokers, @@ -220,7 +224,7 @@ class PreHourlyProcessor(Processor): until_offset = latest_dict[item].offsets[0] # from - if (spec_until_offset is not None and int(spec_until_offset) >= 0): + if spec_until_offset is not None and int(spec_until_offset) >= 0: from_offset = spec_until_offset else: from_offset = earliest_dict[item].offsets[0] diff --git a/monasca_transform/processor/processor_util.py b/monasca_transform/processor/processor_util.py new file mode 100644 index 0000000..579a453 --- /dev/null +++ b/monasca_transform/processor/processor_util.py @@ -0,0 +1,104 @@ +# Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import abc +import datetime +from monasca_common.simport import simport +from oslo_config import cfg + +from monasca_transform.log_utils import LogUtils + + +log = LogUtils.init_logger(__name__) + + +class PreHourlyProcessorUtil(object): + + data_provider = None + + @staticmethod + def get_last_processed(): + return PreHourlyProcessorUtil.get_data_provider().get_last_processed() + + @staticmethod + def get_data_provider(): + if not PreHourlyProcessorUtil.data_provider: + PreHourlyProcessorUtil.data_provider = simport.load( + cfg.CONF.pre_hourly_processor.data_provider)() + return PreHourlyProcessorUtil.data_provider + + @staticmethod + def is_time_to_run(check_time): + """return True if its time to run this processor. + For now it just checks to see if its start of the hour + i.e. the minute is 00. + """ + this_hour = int(datetime.datetime.strftime(check_time, '%H')) + this_date = check_time.replace(minute=0, second=0, + microsecond=0, hour=0) + drift_delta = datetime.timedelta( + seconds=cfg.CONF.pre_hourly_processor.late_metric_slack_time) + + top_of_the_hour = check_time.replace(minute=0, second=0, + microsecond=0) + earliest_acceptable_run_time = top_of_the_hour + drift_delta + last_processed = PreHourlyProcessorUtil.get_last_processed() + if last_processed: + hour_last_processed = int( + datetime.datetime.strftime( + last_processed, '%H')) + date_last_processed = last_processed.replace(minute=0, second=0, + microsecond=0, + hour=0) + else: + date_last_processed = None + hour_last_processed = None + + if this_hour == hour_last_processed: + earliest_acceptable_run_time = ( + top_of_the_hour + + datetime.timedelta(hours=1) + + drift_delta + ) + log.debug( + "Pre-hourly task check: Check time = %s, " + "Last processed at %s (hour = %s), " + "Earliest acceptable run time %s " + "(based on configured pre hourly late metrics slack time of %s " + "seconds)" % ( + check_time, + last_processed, + hour_last_processed, + earliest_acceptable_run_time, + cfg.CONF.pre_hourly_processor.late_metric_slack_time + )) + # run pre hourly processor only once from the + # configured time after the top of the hour + if (not hour_last_processed or ( + ((not this_hour == hour_last_processed) or + (this_date > date_last_processed)) and + check_time >= earliest_acceptable_run_time)): + log.debug("Pre-hourly: Yes, it's time to process") + return True + log.debug("Pre-hourly: No, it's NOT time to process") + return False + + +class ProcessUtilDataProvider(object): + + @abc.abstractmethod + def get_last_processed(self): + """return data on last run of processor""" + raise NotImplementedError( + "Class %s doesn't implement is_time_to_run()" + % self.__class__.__name__) diff --git a/tests/functional/test_resources/config/test_config.conf b/tests/functional/test_resources/config/test_config.conf index 9b91f3f..9d57c7f 100644 --- a/tests/functional/test_resources/config/test_config.conf +++ b/tests/functional/test_resources/config/test_config.conf @@ -17,11 +17,23 @@ ca_file = test_ca_file_path enable_pre_hourly_processor = False [pre_hourly_processor] +late_metric_slack_time = 600 enable_instance_usage_df_cache = False instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_batch_time_filtering = True +data_provider=tests.unit.processor.test_is_time_to_run:TestProcessUtilDataProvider [service] enable_record_store_df_cache = False record_store_df_cache_storage_level = MEMORY_ONLY_SER_2 enable_debug_log_entries = true +# the location for the transform-service log +service_log_path=/home/david/ +# the filename for the transform-service log +service_log_filename=monasca-transform.log + + + +# Whether debug-level log entries should be included in the application +# log. If this setting is false, info-level will be used for logging. +enable_debug_log_entries = true \ No newline at end of file diff --git a/tests/unit/processor/test_is_time_to_run.py b/tests/unit/processor/test_is_time_to_run.py new file mode 100644 index 0000000..40ec711 --- /dev/null +++ b/tests/unit/processor/test_is_time_to_run.py @@ -0,0 +1,98 @@ +# Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import datetime +import unittest + +from monasca_transform.config.config_initializer import ConfigInitializer +from monasca_transform.processor.processor_util import PreHourlyProcessorUtil +from monasca_transform.processor.processor_util import ProcessUtilDataProvider + + +class PreHourlyProcessorTest(unittest.TestCase): + + def setUp(self): + ConfigInitializer.basic_config( + default_config_files=[ + 'tests/unit/test_resources/config/' + 'test_config.conf'] + ) + + def test_is_time_to_run_before_late_metric_slack_time(self): + check_time = datetime.datetime( + year=2016, month=11, day=7, hour=11, + minute=9, second=59, microsecond=0) + PreHourlyProcessorUtil.get_data_provider().set_last_processed( + date_time=(check_time + datetime.timedelta(hours=-1))) + self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time)) + + def test_is_time_to_run_after_late_metric_slack_time(self): + check_time = datetime.datetime( + year=2016, month=11, day=7, hour=11, + minute=10, second=0, microsecond=1) + PreHourlyProcessorUtil.get_data_provider().set_last_processed( + date_time=(check_time + datetime.timedelta(hours=-1))) + self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time)) + + def test_is_time_to_run_with_already_done_this_hour(self): + check_time = datetime.datetime( + year=2016, month=11, day=7, hour=11, + minute=30, second=0, microsecond=0) + PreHourlyProcessorUtil.get_data_provider().set_last_processed( + date_time=check_time) + self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time)) + + def test_is_time_to_run_after_midnight_but_before_late_metric_slack_time( + self): + check_time = datetime.datetime( + year=2016, month=11, day=7, hour=0, + minute=5, second=0, microsecond=0) + PreHourlyProcessorUtil.get_data_provider().set_last_processed( + date_time=(check_time + datetime.timedelta(hours=-1))) + self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time)) + + def test_is_time_to_run_after_midnight_and_after_late_metric_slack_time( + self): + check_time = datetime.datetime( + year=2016, month=11, day=7, hour=0, + minute=10, second=0, microsecond=1) + PreHourlyProcessorUtil.get_data_provider().set_last_processed( + date_time=(check_time + datetime.timedelta(hours=-1))) + self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time)) + + def test_am_pm_behaviour(self): + check_time = datetime.datetime( + year=2016, month=11, day=7, hour=22, + minute=10, second=0, microsecond=1) + PreHourlyProcessorUtil.get_data_provider().set_last_processed( + date_time=(check_time + datetime.timedelta(hours=-12))) + self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time)) + + def test_same_time_different_day_behaviour(self): + check_time = datetime.datetime( + year=2016, month=11, day=7, hour=22, + minute=10, second=0, microsecond=1) + PreHourlyProcessorUtil.get_data_provider().set_last_processed( + date_time=(check_time + datetime.timedelta(days=-1))) + self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time)) + + +class TestProcessUtilDataProvider(ProcessUtilDataProvider): + + _last_processed_date_time = None + + def get_last_processed(self): + return self._last_processed_date_time + + def set_last_processed(self, date_time=None): + self._last_processed_date_time = date_time