Merge "Hourly aggregation account for early arrving metrics"

This commit is contained in:
Jenkins 2017-04-19 21:10:22 +00:00 committed by Gerrit Code Review
commit 699103e345
8 changed files with 361 additions and 6 deletions

View File

@ -29,6 +29,7 @@ enable_instance_usage_df_cache = True
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
data_provider=monasca_transform.processor.pre_hourly_processor:PreHourlyProcessorDataProvider
effective_batch_revision=2
#
# Configurable values for the monasca-transform service

View File

@ -27,6 +27,7 @@ enable_pre_hourly_processor = True
enable_instance_usage_df_cache = True
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
effective_batch_revision=2
#
# Configurable values for the monasca-transform service

View File

@ -143,7 +143,8 @@ class ConfigInitializer(object):
'PreHourlyProcessorDataProvider'),
cfg.BoolOpt('enable_instance_usage_df_cache'),
cfg.StrOpt('instance_usage_df_cache_storage_level'),
cfg.BoolOpt('enable_batch_time_filtering')
cfg.BoolOpt('enable_batch_time_filtering'),
cfg.IntOpt('effective_batch_revision', default=2)
]
app_group = cfg.OptGroup(name='pre_hourly_processor',
title='pre_hourly_processor')

View File

@ -79,9 +79,10 @@ class MySQLOffsetSpecs(OffsetSpecs):
version_spec.revision = revision
revision = revision + 1
self.session.query(MySQLOffsetSpec).filter(
MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete(
synchronize_session="fetch")
# delete any revisions excess than required
self.session.query(MySQLOffsetSpec).filter(
MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete(
synchronize_session="fetch")
def get_kafka_offsets(self, app_name):
return {'%s_%s_%s' % (
@ -90,6 +91,13 @@ class MySQLOffsetSpecs(OffsetSpecs):
MySQLOffsetSpec.app_name == app_name,
MySQLOffsetSpec.revision == 1).all()}
def get_kafka_offsets_by_revision(self, app_name, revision):
return {'%s_%s_%s' % (
offset.get_app_name(), offset.get_topic(), offset.get_partition()
): offset for offset in self.session.query(MySQLOffsetSpec).filter(
MySQLOffsetSpec.app_name == app_name,
MySQLOffsetSpec.revision == revision).all()}
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
try:
# get partition 0 as a representative of all others

View File

@ -288,16 +288,123 @@ class PreHourlyProcessor(Processor):
"""
return simport.load(cfg.CONF.repositories.offsets)()
@staticmethod
def get_effective_offset_range_list(offset_range_list):
"""get effective batch offset range.
Effective batch offset range covers offsets starting
from effective batch revision (defined by effective_batch_revision
config property). By default this method will set the
pyspark Offset.fromOffset for each partition
to have value older than the latest revision
(defaults to latest -1) so that prehourly processor has access
to entire data for the hour. This will also account for and cover
any early arriving data (data that arrives before the start hour).
"""
offset_specifications = PreHourlyProcessor.get_offset_specs()
app_name = PreHourlyProcessor.get_app_name()
topic = PreHourlyProcessor.get_kafka_topic()
# start offset revision
effective_batch_revision = cfg.CONF.pre_hourly_processor.\
effective_batch_revision
effective_batch_spec = offset_specifications\
.get_kafka_offsets_by_revision(app_name,
effective_batch_revision)
# get latest revision, if penultimate is unavailable
if not effective_batch_spec:
log.debug("effective batch spec: offsets: revision %s unavailable,"
" getting the latest revision instead..." % (
effective_batch_revision))
# not available
effective_batch_spec = offset_specifications.get_kafka_offsets(
app_name)
effective_batch_offsets = PreHourlyProcessor._parse_saved_offsets(
app_name, topic,
effective_batch_spec)
# for debugging
for effective_key in effective_batch_offsets.keys():
effective_offset = effective_batch_offsets.get(effective_key,
None)
(effect_app_name,
effect_topic_name,
effect_partition,
effect_from_offset,
effect_until_offset) = effective_offset
log.debug(
"effective batch offsets (from db):"
" OffSetRanges: %s %s %s %s" % (
effect_topic_name, effect_partition,
effect_from_offset, effect_until_offset))
# effective batch revision
effective_offset_range_list = []
for offset_range in offset_range_list:
part_topic_key = "_".join((offset_range.topic,
str(offset_range.partition)))
effective_offset = effective_batch_offsets.get(part_topic_key,
None)
if effective_offset:
(effect_app_name,
effect_topic_name,
effect_partition,
effect_from_offset,
effect_until_offset) = effective_offset
log.debug(
"Extending effective offset range:"
" OffSetRanges: %s %s %s-->%s %s" % (
effect_topic_name, effect_partition,
offset_range.fromOffset,
effect_from_offset,
effect_until_offset))
effective_offset_range_list.append(
OffsetRange(offset_range.topic,
offset_range.partition,
effect_from_offset,
offset_range.untilOffset))
else:
effective_offset_range_list.append(
OffsetRange(offset_range.topic,
offset_range.partition,
offset_range.fromOffset,
offset_range.untilOffset))
# return effective offset range list
return effective_offset_range_list
@staticmethod
def fetch_pre_hourly_data(spark_context,
offset_range_list):
"""get metrics pre hourly data from offset range list."""
for o in offset_range_list:
log.debug(
"fetch_pre_hourly: offset_range_list:"
" OffSetRanges: %s %s %s %s" % (
o.topic, o.partition, o.fromOffset, o.untilOffset))
effective_offset_list = PreHourlyProcessor.\
get_effective_offset_range_list(offset_range_list)
for o in effective_offset_list:
log.debug(
"fetch_pre_hourly: effective_offset_range_list:"
" OffSetRanges: %s %s %s %s" % (
o.topic, o.partition, o.fromOffset, o.untilOffset))
# get kafka stream over the same offsets
pre_hourly_rdd = KafkaUtils.createRDD(spark_context,
{"metadata.broker.list":
cfg.CONF.messaging.brokers},
offset_range_list)
effective_offset_list)
return pre_hourly_rdd
@staticmethod
@ -339,10 +446,17 @@ class PreHourlyProcessor(Processor):
app_name, topic))
if most_recent_batch_time:
# batches can fire after late metrics slack time, not neccessarily
# at the top of the hour
most_recent_batch_time_truncated = most_recent_batch_time.replace(
minute=0, second=0, microsecond=0)
log.debug("filter out records before : %s" % (
most_recent_batch_time_truncated.strftime(
'%Y-%m-%dT%H:%M:%S')))
# filter out records before current batch
instance_usage_df = instance_usage_df.filter(
instance_usage_df.lastrecord_timestamp_string >=
most_recent_batch_time)
most_recent_batch_time_truncated)
# determine the timestamp of the most recent top-of-the-hour (which
# is the end of the current batch).
@ -351,6 +465,9 @@ class PreHourlyProcessor(Processor):
minute=0, second=0, microsecond=0)
# filter out records after current batch
log.debug("filter out records after : %s" % (
truncated_timestamp_to_current_hour.strftime(
'%Y-%m-%dT%H:%M:%S')))
instance_usage_df = instance_usage_df.filter(
instance_usage_df.firstrecord_timestamp_string <
truncated_timestamp_to_current_hour)

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import os
import random
import sys
@ -21,10 +22,15 @@ from collections import defaultdict
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from pyspark.streaming.kafka import OffsetRange
from monasca_common.kafka_lib.common import OffsetResponse
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.mysql_offset_specs import MySQLOffsetSpecs
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
from tests.functional.component.insert.dummy_insert import DummyInsert
from tests.functional.json_offset_specs import JSONOffsetSpecs
from tests.functional.messaging.adapter import DummyAdapter
@ -49,6 +55,174 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
DummyAdapter.init()
DummyAdapter.adapter_impl.metric_list = []
# get mysql offset specs
self.kafka_offset_specs = MySQLOffsetSpecs()
def add_offset_for_test(self, my_app, my_topic, my_partition,
my_from_offset, my_until_offset,
my_batch_time):
""""utility method to populate mysql db with offsets."""
self.kafka_offset_specs.add(topic=my_topic, partition=my_partition,
app_name=my_app,
from_offset=my_from_offset,
until_offset=my_until_offset,
batch_time_info=my_batch_time)
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.get_app_name')
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.get_kafka_topic')
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor._get_offsets_from_kafka')
def test_get_processing_offset_range_list(self,
kafka_get_offsets,
kafka_topic_name,
app_name):
# setup
my_app = uuidutils.generate_uuid()
my_topic = uuidutils.generate_uuid()
# mock app_name, topic_name, partition
app_name.return_value = my_app
kafka_topic_name.return_value = my_topic
my_partition = 1
ret_offset_key = "_".join((my_topic, str(my_partition)))
kafka_get_offsets.side_effect = [
# mock latest offsets
{ret_offset_key: OffsetResponse(topic=my_topic,
partition=my_partition,
error=None,
offsets=[30])},
# mock earliest offsets
{ret_offset_key: OffsetResponse(topic=my_topic,
partition=my_partition,
error=None,
offsets=[0])}
]
# add offsets
my_until_offset = 0
my_from_offset = 10
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
'%Y-%m-%d %H:%M:%S')
self.add_offset_for_test(my_app, my_topic,
my_partition, my_until_offset,
my_from_offset, my_batch_time)
my_until_offset_2 = 10
my_from_offset_2 = 20
my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00',
'%Y-%m-%d %H:%M:%S')
self.add_offset_for_test(my_app, my_topic,
my_partition, my_until_offset_2,
my_from_offset_2, my_batch_time_2)
# get latest offset spec as dict
current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00',
'%Y-%m-%d %H:%M:%S')
# use mysql offset repositories
cfg.CONF.set_override(
'offsets',
'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs',
group='repositories')
# list of pyspark.streaming.kafka.OffsetRange objects
offset_range_list = PreHourlyProcessor.\
get_processing_offset_range_list(
current_batch_time)
self.assertEqual(my_partition,
offset_range_list[0].partition)
self.assertEqual(my_topic,
offset_range_list[0].topic)
self.assertEqual(20,
offset_range_list[0].fromOffset)
self.assertEqual(30,
offset_range_list[0].untilOffset)
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.get_app_name')
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.get_kafka_topic')
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor._get_offsets_from_kafka')
def test_get_effective_offset_range_list(self,
kafka_get_offsets,
kafka_topic_name,
app_name):
# setup
my_app = uuidutils.generate_uuid()
my_topic = uuidutils.generate_uuid()
# mock app_name, topic_name, partition
app_name.return_value = my_app
kafka_topic_name.return_value = my_topic
my_partition = 1
ret_offset_key = "_".join((my_topic, str(my_partition)))
kafka_get_offsets.side_effect = [
# mock latest offsets in kafka
{ret_offset_key: OffsetResponse(topic=my_topic,
partition=my_partition,
error=None,
offsets=[3000])},
# mock earliest offsets in kafka
{ret_offset_key: OffsetResponse(topic=my_topic,
partition=my_partition,
error=None,
offsets=[0])}
]
# add offsets
my_until_offset = 500
my_from_offset = 1000
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
'%Y-%m-%d %H:%M:%S')
self.add_offset_for_test(my_app, my_topic,
my_partition, my_until_offset,
my_from_offset, my_batch_time)
my_until_offset_2 = 1000
my_from_offset_2 = 2000
my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00',
'%Y-%m-%d %H:%M:%S')
self.add_offset_for_test(my_app, my_topic,
my_partition, my_until_offset_2,
my_from_offset_2, my_batch_time_2)
# get latest offset spec as dict
current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00',
'%Y-%m-%d %H:%M:%S')
# use mysql offset repositories
cfg.CONF.set_override(
'offsets',
'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs',
group='repositories')
# list of pyspark.streaming.kafka.OffsetRange objects
offset_range_list = PreHourlyProcessor.\
get_processing_offset_range_list(
current_batch_time)
# effective batch range list
# should cover range of starting from (latest - 1) offset version to
# latest
offset_range_list = PreHourlyProcessor.get_effective_offset_range_list(
offset_range_list)
self.assertEqual(my_partition,
offset_range_list[0].partition)
self.assertEqual(my_topic,
offset_range_list[0].topic)
self.assertEqual(500,
offset_range_list[0].fromOffset)
self.assertEqual(3000,
offset_range_list[0].untilOffset)
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
DummyInsert)
@mock.patch('monasca_transform.processor.pre_hourly_processor.'

View File

@ -164,3 +164,48 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
int(offset_value.get_from_offset()))
self.assertEqual(used_value.get('app_name'),
offset_value.get_app_name())
def test_get_offset_by_revision(self):
topic_1 = uuidutils.generate_uuid()
partition_1 = 0
until_offset_1 = 10
from_offset_1 = 0
app_name_1 = uuidutils.generate_uuid()
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
'%Y-%m-%d %H:%M:%S')
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1,
batch_time_info=my_batch_time)
until_offset_2 = 20
from_offset_2 = 10
my_batch_time2 = datetime.datetime.strptime('2016-01-01 01:10:00',
'%Y-%m-%d %H:%M:%S')
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_2,
until_offset=until_offset_2,
batch_time_info=my_batch_time2)
# get penultimate revision
penultimate_revision = 2
kafka_offset_specs = self.kafka_offset_specs\
.get_kafka_offsets_by_revision(app_name_1,
penultimate_revision)
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
offset_value_1 = kafka_offset_specs.get(offset_key_1)
used_values = {}
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
offset_value=offset_value_1)

View File

@ -14,8 +14,16 @@ enable_pre_hourly_processor = False
enable_instance_usage_df_cache = False
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
effective_batch_revision = 2
[service]
enable_record_store_df_cache = False
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_debug_log_entries = true
[database]
server_type = mysql:thin
host = localhost
database_name = monasca_transform
username = m-transform
password = password