Add configurable amnesty period for late metrics

Added configuration option to allow the pre-hourly transformation to be
done at a specified period past the hour.  This includes a check to
ensure that if not done yet for the hour but overdue processing is done
at the earliest time.

Change-Id: I8882f3089ca748ce435b4e9a92196a72a0a8e63f
This commit is contained in:
David C Kennedy 2016-11-07 15:23:32 +00:00
parent d76af6a0f0
commit 26e53336d4
7 changed files with 239 additions and 15 deletions

View File

@ -24,9 +24,11 @@ topic_pre_hourly = metrics_pre_hourly
pre_hourly_processor_enabled = True
[pre_hourly_processor]
late_metric_slack_time = 600
enable_instance_usage_df_cache = True
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
data_provider=monasca_transform.processor.pre_hourly_processor:PreHourlyProcessorDataProvider
#
# Configurable values for the monasca-transform service

View File

@ -135,6 +135,11 @@ class ConfigInitializer(object):
@staticmethod
def load_pre_hourly_processor_options():
app_opts = [
cfg.IntOpt('late_metric_slack_time', default=600),
cfg.StrOpt('data_provider',
default='monasca_transform.processor.'
'pre_hourly_processor:'
'PreHourlyProcessorDataProvider'),
cfg.BoolOpt('enable_instance_usage_df_cache'),
cfg.StrOpt('instance_usage_df_cache_storage_level'),
cfg.BoolOpt('enable_batch_time_filtering')

View File

@ -511,9 +511,8 @@ class MonMetricsKafkaProcessor(object):
batch_time_info)
# call pre hourly processor, if its time to run
if (cfg.CONF.stage_processors.pre_hourly_processor_enabled
is True and PreHourlyProcessor.is_time_to_run(
batch_time_info)):
if (cfg.CONF.stage_processors.pre_hourly_processor_enabled and
PreHourlyProcessor.is_time_to_run(batch_time_info)):
PreHourlyProcessor.run_processor(
record_store_df.rdd.context,
batch_time_info)

View File

@ -34,6 +34,8 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepoFactory
from monasca_transform.log_utils import LogUtils
from monasca_transform.processor import Processor
from monasca_transform.processor.processor_util import PreHourlyProcessorUtil
from monasca_transform.processor.processor_util import ProcessUtilDataProvider
from monasca_transform.transform.storage_utils import \
InvalidCacheStorageLevelException
from monasca_transform.transform.storage_utils import StorageUtils
@ -44,6 +46,18 @@ ConfigInitializer.basic_config()
log = LogUtils.init_logger(__name__)
class PreHourlyProcessorDataProvider(ProcessUtilDataProvider):
def get_last_processed(self):
offset_specifications = PreHourlyProcessor.get_offset_specs()
app_name = PreHourlyProcessor.get_app_name()
topic = PreHourlyProcessor.get_kafka_topic()
most_recent_batch_time = (
offset_specifications.get_most_recent_batch_time_from_offsets(
app_name, topic))
return most_recent_batch_time
class PreHourlyProcessor(Processor):
"""Processor to process usage data published to metrics_pre_hourly topic a
and publish final rolled up metrics to metrics topic in kafka.
@ -90,17 +104,7 @@ class PreHourlyProcessor(Processor):
@staticmethod
def is_time_to_run(check_time):
"""return True if its time to run this processor.
For now it just checks to see if its start of the hour
i.e. the minute is 00.
"""
this_min = int(datetime.datetime.strftime(check_time, '%M'))
# run pre hourly processor only at top of the hour
if this_min == 0:
return True
else:
return False
return PreHourlyProcessorUtil.is_time_to_run(check_time)
@staticmethod
def _get_offsets_from_kafka(brokers,
@ -220,7 +224,7 @@ class PreHourlyProcessor(Processor):
until_offset = latest_dict[item].offsets[0]
# from
if (spec_until_offset is not None and int(spec_until_offset) >= 0):
if spec_until_offset is not None and int(spec_until_offset) >= 0:
from_offset = spec_until_offset
else:
from_offset = earliest_dict[item].offsets[0]

View File

@ -0,0 +1,104 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import datetime
from monasca_common.simport import simport
from oslo_config import cfg
from monasca_transform.log_utils import LogUtils
log = LogUtils.init_logger(__name__)
class PreHourlyProcessorUtil(object):
data_provider = None
@staticmethod
def get_last_processed():
return PreHourlyProcessorUtil.get_data_provider().get_last_processed()
@staticmethod
def get_data_provider():
if not PreHourlyProcessorUtil.data_provider:
PreHourlyProcessorUtil.data_provider = simport.load(
cfg.CONF.pre_hourly_processor.data_provider)()
return PreHourlyProcessorUtil.data_provider
@staticmethod
def is_time_to_run(check_time):
"""return True if its time to run this processor.
For now it just checks to see if its start of the hour
i.e. the minute is 00.
"""
this_hour = int(datetime.datetime.strftime(check_time, '%H'))
this_date = check_time.replace(minute=0, second=0,
microsecond=0, hour=0)
drift_delta = datetime.timedelta(
seconds=cfg.CONF.pre_hourly_processor.late_metric_slack_time)
top_of_the_hour = check_time.replace(minute=0, second=0,
microsecond=0)
earliest_acceptable_run_time = top_of_the_hour + drift_delta
last_processed = PreHourlyProcessorUtil.get_last_processed()
if last_processed:
hour_last_processed = int(
datetime.datetime.strftime(
last_processed, '%H'))
date_last_processed = last_processed.replace(minute=0, second=0,
microsecond=0,
hour=0)
else:
date_last_processed = None
hour_last_processed = None
if this_hour == hour_last_processed:
earliest_acceptable_run_time = (
top_of_the_hour +
datetime.timedelta(hours=1) +
drift_delta
)
log.debug(
"Pre-hourly task check: Check time = %s, "
"Last processed at %s (hour = %s), "
"Earliest acceptable run time %s "
"(based on configured pre hourly late metrics slack time of %s "
"seconds)" % (
check_time,
last_processed,
hour_last_processed,
earliest_acceptable_run_time,
cfg.CONF.pre_hourly_processor.late_metric_slack_time
))
# run pre hourly processor only once from the
# configured time after the top of the hour
if (not hour_last_processed or (
((not this_hour == hour_last_processed) or
(this_date > date_last_processed)) and
check_time >= earliest_acceptable_run_time)):
log.debug("Pre-hourly: Yes, it's time to process")
return True
log.debug("Pre-hourly: No, it's NOT time to process")
return False
class ProcessUtilDataProvider(object):
@abc.abstractmethod
def get_last_processed(self):
"""return data on last run of processor"""
raise NotImplementedError(
"Class %s doesn't implement is_time_to_run()"
% self.__class__.__name__)

View File

@ -17,11 +17,23 @@ ca_file = test_ca_file_path
enable_pre_hourly_processor = False
[pre_hourly_processor]
late_metric_slack_time = 600
enable_instance_usage_df_cache = False
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
data_provider=tests.unit.processor.test_is_time_to_run:TestProcessUtilDataProvider
[service]
enable_record_store_df_cache = False
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_debug_log_entries = true
# the location for the transform-service log
service_log_path=/home/david/
# the filename for the transform-service log
service_log_filename=monasca-transform.log
# Whether debug-level log entries should be included in the application
# log. If this setting is false, info-level will be used for logging.
enable_debug_log_entries = true

View File

@ -0,0 +1,98 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import unittest
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.processor.processor_util import PreHourlyProcessorUtil
from monasca_transform.processor.processor_util import ProcessUtilDataProvider
class PreHourlyProcessorTest(unittest.TestCase):
def setUp(self):
ConfigInitializer.basic_config(
default_config_files=[
'tests/unit/test_resources/config/'
'test_config.conf']
)
def test_is_time_to_run_before_late_metric_slack_time(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=11,
minute=9, second=59, microsecond=0)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_late_metric_slack_time(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=11,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_with_already_done_this_hour(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=11,
minute=30, second=0, microsecond=0)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=check_time)
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_midnight_but_before_late_metric_slack_time(
self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=0,
minute=5, second=0, microsecond=0)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_midnight_and_after_late_metric_slack_time(
self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=0,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_am_pm_behaviour(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=22,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-12)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_same_time_different_day_behaviour(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=22,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(days=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
class TestProcessUtilDataProvider(ProcessUtilDataProvider):
_last_processed_date_time = None
def get_last_processed(self):
return self._last_processed_date_time
def set_last_processed(self, date_time=None):
self._last_processed_date_time = date_time