Merge "Corrected catch up aggregation logic"

This commit is contained in:
Jenkins 2017-03-03 19:12:24 +00:00 committed by Gerrit Code Review
commit e6f7057786
2 changed files with 55 additions and 37 deletions

View File

@ -38,56 +38,63 @@ class PreHourlyProcessorUtil(object):
return PreHourlyProcessorUtil.data_provider
@staticmethod
def is_time_to_run(check_time):
def is_time_to_run(check_date_time):
"""return True if its time to run this processor.
For now it just checks to see if its start of the hour
i.e. the minute is 00.
It is time to run the processor if:
The processor has no previous recorded run time.
It is more than the configured 'late_metric_slack_time' (to allow
for the arrival of tardy metrics) past the hour and the processor
has not yet run for this hour
"""
this_hour = int(datetime.datetime.strftime(check_time, '%H'))
this_date = check_time.replace(minute=0, second=0,
microsecond=0, hour=0)
drift_delta = datetime.timedelta(
check_hour = int(datetime.datetime.strftime(check_date_time, '%H'))
check_date = check_date_time.replace(minute=0, second=0,
microsecond=0, hour=0)
slack = datetime.timedelta(
seconds=cfg.CONF.pre_hourly_processor.late_metric_slack_time)
top_of_the_hour = check_time.replace(minute=0, second=0,
microsecond=0)
earliest_acceptable_run_time = top_of_the_hour + drift_delta
last_processed = PreHourlyProcessorUtil.get_last_processed()
if last_processed:
hour_last_processed = int(
top_of_the_hour_date_time = check_date_time.replace(
minute=0, second=0, microsecond=0)
earliest_acceptable_run_date_time = top_of_the_hour_date_time + slack
last_processed_date_time = PreHourlyProcessorUtil.get_last_processed()
if last_processed_date_time:
last_processed_hour = int(
datetime.datetime.strftime(
last_processed, '%H'))
date_last_processed = last_processed.replace(minute=0, second=0,
microsecond=0,
hour=0)
last_processed_date_time, '%H'))
last_processed_date = last_processed_date_time.replace(
minute=0, second=0, microsecond=0, hour=0)
else:
date_last_processed = None
hour_last_processed = None
last_processed_date = None
last_processed_hour = None
if this_hour == hour_last_processed:
earliest_acceptable_run_time = (
top_of_the_hour +
if (check_hour == last_processed_hour
and last_processed_date == check_date):
earliest_acceptable_run_date_time = (
top_of_the_hour_date_time +
datetime.timedelta(hours=1) +
drift_delta
slack
)
log.debug(
"Pre-hourly task check: Check time = %s, "
"Pre-hourly task check: Now date: %s, "
"Date last processed: %s, Check time = %s, "
"Last processed at %s (hour = %s), "
"Earliest acceptable run time %s "
"(based on configured pre hourly late metrics slack time of %s "
"seconds)" % (
check_time,
last_processed,
hour_last_processed,
earliest_acceptable_run_time,
check_date,
last_processed_date,
check_date_time,
last_processed_date_time,
last_processed_hour,
earliest_acceptable_run_date_time,
cfg.CONF.pre_hourly_processor.late_metric_slack_time
))
# run pre hourly processor only once from the
# configured time after the top of the hour
if (not hour_last_processed or (
((not this_hour == hour_last_processed) or
(this_date > date_last_processed)) and
check_time >= earliest_acceptable_run_time)):
if (not last_processed_date_time or (
((not check_hour == last_processed_hour) or
(check_date > last_processed_date)) and
check_date_time >= earliest_acceptable_run_date_time)):
log.debug("Pre-hourly: Yes, it's time to process")
return True
log.debug("Pre-hourly: No, it's NOT time to process")

View File

@ -15,6 +15,12 @@ import datetime
import unittest
from monasca_transform.config.config_initializer import ConfigInitializer
ConfigInitializer.basic_config(
default_config_files=[
'tests/unit/test_resources/config/'
'test_config.conf']
)
from monasca_transform.processor.processor_util import PreHourlyProcessorUtil
from monasca_transform.processor.processor_util import ProcessUtilDataProvider
@ -22,11 +28,7 @@ from monasca_transform.processor.processor_util import ProcessUtilDataProvider
class PreHourlyProcessorTest(unittest.TestCase):
def setUp(self):
ConfigInitializer.basic_config(
default_config_files=[
'tests/unit/test_resources/config/'
'test_config.conf']
)
pass
def test_is_time_to_run_before_late_metric_slack_time(self):
check_time = datetime.datetime(
@ -70,6 +72,15 @@ class PreHourlyProcessorTest(unittest.TestCase):
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_after_midnight_having_already_run(
self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=0,
minute=20, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(minutes=-10)))
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_am_pm_behaviour(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=22,