Add configurable amnesty period for late metrics
Added configuration option to allow the pre-hourly transformation to be done at a specified period past the hour. This includes a check to ensure that if not done yet for the hour but overdue processing is done at the earliest time. Change-Id: I8882f3089ca748ce435b4e9a92196a72a0a8e63f
This commit is contained in:
parent
d76af6a0f0
commit
26e53336d4
|
@ -24,9 +24,11 @@ topic_pre_hourly = metrics_pre_hourly
|
|||
pre_hourly_processor_enabled = True
|
||||
|
||||
[pre_hourly_processor]
|
||||
late_metric_slack_time = 600
|
||||
enable_instance_usage_df_cache = True
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
data_provider=monasca_transform.processor.pre_hourly_processor:PreHourlyProcessorDataProvider
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
|
|
@ -135,6 +135,11 @@ class ConfigInitializer(object):
|
|||
@staticmethod
|
||||
def load_pre_hourly_processor_options():
|
||||
app_opts = [
|
||||
cfg.IntOpt('late_metric_slack_time', default=600),
|
||||
cfg.StrOpt('data_provider',
|
||||
default='monasca_transform.processor.'
|
||||
'pre_hourly_processor:'
|
||||
'PreHourlyProcessorDataProvider'),
|
||||
cfg.BoolOpt('enable_instance_usage_df_cache'),
|
||||
cfg.StrOpt('instance_usage_df_cache_storage_level'),
|
||||
cfg.BoolOpt('enable_batch_time_filtering')
|
||||
|
|
|
@ -511,9 +511,8 @@ class MonMetricsKafkaProcessor(object):
|
|||
batch_time_info)
|
||||
|
||||
# call pre hourly processor, if its time to run
|
||||
if (cfg.CONF.stage_processors.pre_hourly_processor_enabled
|
||||
is True and PreHourlyProcessor.is_time_to_run(
|
||||
batch_time_info)):
|
||||
if (cfg.CONF.stage_processors.pre_hourly_processor_enabled and
|
||||
PreHourlyProcessor.is_time_to_run(batch_time_info)):
|
||||
PreHourlyProcessor.run_processor(
|
||||
record_store_df.rdd.context,
|
||||
batch_time_info)
|
||||
|
|
|
@ -34,6 +34,8 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
|||
import DataDrivenSpecsRepoFactory
|
||||
from monasca_transform.log_utils import LogUtils
|
||||
from monasca_transform.processor import Processor
|
||||
from monasca_transform.processor.processor_util import PreHourlyProcessorUtil
|
||||
from monasca_transform.processor.processor_util import ProcessUtilDataProvider
|
||||
from monasca_transform.transform.storage_utils import \
|
||||
InvalidCacheStorageLevelException
|
||||
from monasca_transform.transform.storage_utils import StorageUtils
|
||||
|
@ -44,6 +46,18 @@ ConfigInitializer.basic_config()
|
|||
log = LogUtils.init_logger(__name__)
|
||||
|
||||
|
||||
class PreHourlyProcessorDataProvider(ProcessUtilDataProvider):
|
||||
|
||||
def get_last_processed(self):
|
||||
offset_specifications = PreHourlyProcessor.get_offset_specs()
|
||||
app_name = PreHourlyProcessor.get_app_name()
|
||||
topic = PreHourlyProcessor.get_kafka_topic()
|
||||
most_recent_batch_time = (
|
||||
offset_specifications.get_most_recent_batch_time_from_offsets(
|
||||
app_name, topic))
|
||||
return most_recent_batch_time
|
||||
|
||||
|
||||
class PreHourlyProcessor(Processor):
|
||||
"""Processor to process usage data published to metrics_pre_hourly topic a
|
||||
and publish final rolled up metrics to metrics topic in kafka.
|
||||
|
@ -90,17 +104,7 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def is_time_to_run(check_time):
|
||||
"""return True if its time to run this processor.
|
||||
For now it just checks to see if its start of the hour
|
||||
i.e. the minute is 00.
|
||||
"""
|
||||
this_min = int(datetime.datetime.strftime(check_time, '%M'))
|
||||
|
||||
# run pre hourly processor only at top of the hour
|
||||
if this_min == 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
return PreHourlyProcessorUtil.is_time_to_run(check_time)
|
||||
|
||||
@staticmethod
|
||||
def _get_offsets_from_kafka(brokers,
|
||||
|
@ -220,7 +224,7 @@ class PreHourlyProcessor(Processor):
|
|||
until_offset = latest_dict[item].offsets[0]
|
||||
|
||||
# from
|
||||
if (spec_until_offset is not None and int(spec_until_offset) >= 0):
|
||||
if spec_until_offset is not None and int(spec_until_offset) >= 0:
|
||||
from_offset = spec_until_offset
|
||||
else:
|
||||
from_offset = earliest_dict[item].offsets[0]
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
import datetime
|
||||
from monasca_common.simport import simport
|
||||
from oslo_config import cfg
|
||||
|
||||
from monasca_transform.log_utils import LogUtils
|
||||
|
||||
|
||||
log = LogUtils.init_logger(__name__)
|
||||
|
||||
|
||||
class PreHourlyProcessorUtil(object):
|
||||
|
||||
data_provider = None
|
||||
|
||||
@staticmethod
|
||||
def get_last_processed():
|
||||
return PreHourlyProcessorUtil.get_data_provider().get_last_processed()
|
||||
|
||||
@staticmethod
|
||||
def get_data_provider():
|
||||
if not PreHourlyProcessorUtil.data_provider:
|
||||
PreHourlyProcessorUtil.data_provider = simport.load(
|
||||
cfg.CONF.pre_hourly_processor.data_provider)()
|
||||
return PreHourlyProcessorUtil.data_provider
|
||||
|
||||
@staticmethod
|
||||
def is_time_to_run(check_time):
|
||||
"""return True if its time to run this processor.
|
||||
For now it just checks to see if its start of the hour
|
||||
i.e. the minute is 00.
|
||||
"""
|
||||
this_hour = int(datetime.datetime.strftime(check_time, '%H'))
|
||||
this_date = check_time.replace(minute=0, second=0,
|
||||
microsecond=0, hour=0)
|
||||
drift_delta = datetime.timedelta(
|
||||
seconds=cfg.CONF.pre_hourly_processor.late_metric_slack_time)
|
||||
|
||||
top_of_the_hour = check_time.replace(minute=0, second=0,
|
||||
microsecond=0)
|
||||
earliest_acceptable_run_time = top_of_the_hour + drift_delta
|
||||
last_processed = PreHourlyProcessorUtil.get_last_processed()
|
||||
if last_processed:
|
||||
hour_last_processed = int(
|
||||
datetime.datetime.strftime(
|
||||
last_processed, '%H'))
|
||||
date_last_processed = last_processed.replace(minute=0, second=0,
|
||||
microsecond=0,
|
||||
hour=0)
|
||||
else:
|
||||
date_last_processed = None
|
||||
hour_last_processed = None
|
||||
|
||||
if this_hour == hour_last_processed:
|
||||
earliest_acceptable_run_time = (
|
||||
top_of_the_hour +
|
||||
datetime.timedelta(hours=1) +
|
||||
drift_delta
|
||||
)
|
||||
log.debug(
|
||||
"Pre-hourly task check: Check time = %s, "
|
||||
"Last processed at %s (hour = %s), "
|
||||
"Earliest acceptable run time %s "
|
||||
"(based on configured pre hourly late metrics slack time of %s "
|
||||
"seconds)" % (
|
||||
check_time,
|
||||
last_processed,
|
||||
hour_last_processed,
|
||||
earliest_acceptable_run_time,
|
||||
cfg.CONF.pre_hourly_processor.late_metric_slack_time
|
||||
))
|
||||
# run pre hourly processor only once from the
|
||||
# configured time after the top of the hour
|
||||
if (not hour_last_processed or (
|
||||
((not this_hour == hour_last_processed) or
|
||||
(this_date > date_last_processed)) and
|
||||
check_time >= earliest_acceptable_run_time)):
|
||||
log.debug("Pre-hourly: Yes, it's time to process")
|
||||
return True
|
||||
log.debug("Pre-hourly: No, it's NOT time to process")
|
||||
return False
|
||||
|
||||
|
||||
class ProcessUtilDataProvider(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_last_processed(self):
|
||||
"""return data on last run of processor"""
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement is_time_to_run()"
|
||||
% self.__class__.__name__)
|
|
@ -17,11 +17,23 @@ ca_file = test_ca_file_path
|
|||
enable_pre_hourly_processor = False
|
||||
|
||||
[pre_hourly_processor]
|
||||
late_metric_slack_time = 600
|
||||
enable_instance_usage_df_cache = False
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
data_provider=tests.unit.processor.test_is_time_to_run:TestProcessUtilDataProvider
|
||||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_debug_log_entries = true
|
||||
# the location for the transform-service log
|
||||
service_log_path=/home/david/
|
||||
# the filename for the transform-service log
|
||||
service_log_filename=monasca-transform.log
|
||||
|
||||
|
||||
|
||||
# Whether debug-level log entries should be included in the application
|
||||
# log. If this setting is false, info-level will be used for logging.
|
||||
enable_debug_log_entries = true
|
|
@ -0,0 +1,98 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import unittest
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.processor.processor_util import PreHourlyProcessorUtil
|
||||
from monasca_transform.processor.processor_util import ProcessUtilDataProvider
|
||||
|
||||
|
||||
class PreHourlyProcessorTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
ConfigInitializer.basic_config(
|
||||
default_config_files=[
|
||||
'tests/unit/test_resources/config/'
|
||||
'test_config.conf']
|
||||
)
|
||||
|
||||
def test_is_time_to_run_before_late_metric_slack_time(self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=11,
|
||||
minute=9, second=59, microsecond=0)
|
||||
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||
date_time=(check_time + datetime.timedelta(hours=-1)))
|
||||
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
def test_is_time_to_run_after_late_metric_slack_time(self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=11,
|
||||
minute=10, second=0, microsecond=1)
|
||||
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||
date_time=(check_time + datetime.timedelta(hours=-1)))
|
||||
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
def test_is_time_to_run_with_already_done_this_hour(self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=11,
|
||||
minute=30, second=0, microsecond=0)
|
||||
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||
date_time=check_time)
|
||||
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
def test_is_time_to_run_after_midnight_but_before_late_metric_slack_time(
|
||||
self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=0,
|
||||
minute=5, second=0, microsecond=0)
|
||||
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||
date_time=(check_time + datetime.timedelta(hours=-1)))
|
||||
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
def test_is_time_to_run_after_midnight_and_after_late_metric_slack_time(
|
||||
self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=0,
|
||||
minute=10, second=0, microsecond=1)
|
||||
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||
date_time=(check_time + datetime.timedelta(hours=-1)))
|
||||
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
def test_am_pm_behaviour(self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=22,
|
||||
minute=10, second=0, microsecond=1)
|
||||
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||
date_time=(check_time + datetime.timedelta(hours=-12)))
|
||||
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
def test_same_time_different_day_behaviour(self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=22,
|
||||
minute=10, second=0, microsecond=1)
|
||||
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||
date_time=(check_time + datetime.timedelta(days=-1)))
|
||||
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
|
||||
class TestProcessUtilDataProvider(ProcessUtilDataProvider):
|
||||
|
||||
_last_processed_date_time = None
|
||||
|
||||
def get_last_processed(self):
|
||||
return self._last_processed_date_time
|
||||
|
||||
def set_last_processed(self, date_time=None):
|
||||
self._last_processed_date_time = date_time
|
Loading…
Reference in New Issue