Hourly aggregation account for early arrving metrics
With this change pre hourly processor which does the hourly aggregation (second stage) and writes the final aggregated metrics to metris topic in kafka now accounts for any early arriving metrics. This change along with two previous changes to pre hourly processor that added 1.) configurable late metrics slack time (https://review.openstack.org/#/c/394497/), and 2.) batch filtering (https://review.openstack.org/#/c/363100/) will make sure all late arriving or early arriving metrics for an hour are aggregated appropriately. Also made improvement in MySQL offset to call delete excess revisions only once. Change-Id: I919cddf343821fe52ad6a1d4170362311f84c0e4
This commit is contained in:
parent
f99a3faf68
commit
2da390414e
|
@ -29,6 +29,7 @@ enable_instance_usage_df_cache = True
|
|||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
data_provider=monasca_transform.processor.pre_hourly_processor:PreHourlyProcessorDataProvider
|
||||
effective_batch_revision=2
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
|
|
@ -27,6 +27,7 @@ enable_pre_hourly_processor = True
|
|||
enable_instance_usage_df_cache = True
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
effective_batch_revision=2
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
|
|
@ -143,7 +143,8 @@ class ConfigInitializer(object):
|
|||
'PreHourlyProcessorDataProvider'),
|
||||
cfg.BoolOpt('enable_instance_usage_df_cache'),
|
||||
cfg.StrOpt('instance_usage_df_cache_storage_level'),
|
||||
cfg.BoolOpt('enable_batch_time_filtering')
|
||||
cfg.BoolOpt('enable_batch_time_filtering'),
|
||||
cfg.IntOpt('effective_batch_revision', default=2)
|
||||
]
|
||||
app_group = cfg.OptGroup(name='pre_hourly_processor',
|
||||
title='pre_hourly_processor')
|
||||
|
|
|
@ -79,9 +79,10 @@ class MySQLOffsetSpecs(OffsetSpecs):
|
|||
version_spec.revision = revision
|
||||
revision = revision + 1
|
||||
|
||||
self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete(
|
||||
synchronize_session="fetch")
|
||||
# delete any revisions excess than required
|
||||
self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete(
|
||||
synchronize_session="fetch")
|
||||
|
||||
def get_kafka_offsets(self, app_name):
|
||||
return {'%s_%s_%s' % (
|
||||
|
@ -90,6 +91,13 @@ class MySQLOffsetSpecs(OffsetSpecs):
|
|||
MySQLOffsetSpec.app_name == app_name,
|
||||
MySQLOffsetSpec.revision == 1).all()}
|
||||
|
||||
def get_kafka_offsets_by_revision(self, app_name, revision):
|
||||
return {'%s_%s_%s' % (
|
||||
offset.get_app_name(), offset.get_topic(), offset.get_partition()
|
||||
): offset for offset in self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.app_name == app_name,
|
||||
MySQLOffsetSpec.revision == revision).all()}
|
||||
|
||||
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
|
||||
try:
|
||||
# get partition 0 as a representative of all others
|
||||
|
|
|
@ -288,16 +288,123 @@ class PreHourlyProcessor(Processor):
|
|||
"""
|
||||
return simport.load(cfg.CONF.repositories.offsets)()
|
||||
|
||||
@staticmethod
|
||||
def get_effective_offset_range_list(offset_range_list):
|
||||
"""get effective batch offset range.
|
||||
Effective batch offset range covers offsets starting
|
||||
from effective batch revision (defined by effective_batch_revision
|
||||
config property). By default this method will set the
|
||||
pyspark Offset.fromOffset for each partition
|
||||
to have value older than the latest revision
|
||||
(defaults to latest -1) so that prehourly processor has access
|
||||
to entire data for the hour. This will also account for and cover
|
||||
any early arriving data (data that arrives before the start hour).
|
||||
"""
|
||||
|
||||
offset_specifications = PreHourlyProcessor.get_offset_specs()
|
||||
|
||||
app_name = PreHourlyProcessor.get_app_name()
|
||||
|
||||
topic = PreHourlyProcessor.get_kafka_topic()
|
||||
|
||||
# start offset revision
|
||||
effective_batch_revision = cfg.CONF.pre_hourly_processor.\
|
||||
effective_batch_revision
|
||||
|
||||
effective_batch_spec = offset_specifications\
|
||||
.get_kafka_offsets_by_revision(app_name,
|
||||
effective_batch_revision)
|
||||
|
||||
# get latest revision, if penultimate is unavailable
|
||||
if not effective_batch_spec:
|
||||
log.debug("effective batch spec: offsets: revision %s unavailable,"
|
||||
" getting the latest revision instead..." % (
|
||||
effective_batch_revision))
|
||||
# not available
|
||||
effective_batch_spec = offset_specifications.get_kafka_offsets(
|
||||
app_name)
|
||||
|
||||
effective_batch_offsets = PreHourlyProcessor._parse_saved_offsets(
|
||||
app_name, topic,
|
||||
effective_batch_spec)
|
||||
|
||||
# for debugging
|
||||
for effective_key in effective_batch_offsets.keys():
|
||||
effective_offset = effective_batch_offsets.get(effective_key,
|
||||
None)
|
||||
(effect_app_name,
|
||||
effect_topic_name,
|
||||
effect_partition,
|
||||
effect_from_offset,
|
||||
effect_until_offset) = effective_offset
|
||||
log.debug(
|
||||
"effective batch offsets (from db):"
|
||||
" OffSetRanges: %s %s %s %s" % (
|
||||
effect_topic_name, effect_partition,
|
||||
effect_from_offset, effect_until_offset))
|
||||
|
||||
# effective batch revision
|
||||
effective_offset_range_list = []
|
||||
for offset_range in offset_range_list:
|
||||
part_topic_key = "_".join((offset_range.topic,
|
||||
str(offset_range.partition)))
|
||||
effective_offset = effective_batch_offsets.get(part_topic_key,
|
||||
None)
|
||||
if effective_offset:
|
||||
(effect_app_name,
|
||||
effect_topic_name,
|
||||
effect_partition,
|
||||
effect_from_offset,
|
||||
effect_until_offset) = effective_offset
|
||||
|
||||
log.debug(
|
||||
"Extending effective offset range:"
|
||||
" OffSetRanges: %s %s %s-->%s %s" % (
|
||||
effect_topic_name, effect_partition,
|
||||
offset_range.fromOffset,
|
||||
effect_from_offset,
|
||||
effect_until_offset))
|
||||
|
||||
effective_offset_range_list.append(
|
||||
OffsetRange(offset_range.topic,
|
||||
offset_range.partition,
|
||||
effect_from_offset,
|
||||
offset_range.untilOffset))
|
||||
else:
|
||||
effective_offset_range_list.append(
|
||||
OffsetRange(offset_range.topic,
|
||||
offset_range.partition,
|
||||
offset_range.fromOffset,
|
||||
offset_range.untilOffset))
|
||||
|
||||
# return effective offset range list
|
||||
return effective_offset_range_list
|
||||
|
||||
@staticmethod
|
||||
def fetch_pre_hourly_data(spark_context,
|
||||
offset_range_list):
|
||||
"""get metrics pre hourly data from offset range list."""
|
||||
|
||||
for o in offset_range_list:
|
||||
log.debug(
|
||||
"fetch_pre_hourly: offset_range_list:"
|
||||
" OffSetRanges: %s %s %s %s" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
||||
|
||||
effective_offset_list = PreHourlyProcessor.\
|
||||
get_effective_offset_range_list(offset_range_list)
|
||||
|
||||
for o in effective_offset_list:
|
||||
log.debug(
|
||||
"fetch_pre_hourly: effective_offset_range_list:"
|
||||
" OffSetRanges: %s %s %s %s" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
||||
|
||||
# get kafka stream over the same offsets
|
||||
pre_hourly_rdd = KafkaUtils.createRDD(spark_context,
|
||||
{"metadata.broker.list":
|
||||
cfg.CONF.messaging.brokers},
|
||||
offset_range_list)
|
||||
effective_offset_list)
|
||||
return pre_hourly_rdd
|
||||
|
||||
@staticmethod
|
||||
|
@ -339,10 +446,17 @@ class PreHourlyProcessor(Processor):
|
|||
app_name, topic))
|
||||
|
||||
if most_recent_batch_time:
|
||||
# batches can fire after late metrics slack time, not neccessarily
|
||||
# at the top of the hour
|
||||
most_recent_batch_time_truncated = most_recent_batch_time.replace(
|
||||
minute=0, second=0, microsecond=0)
|
||||
log.debug("filter out records before : %s" % (
|
||||
most_recent_batch_time_truncated.strftime(
|
||||
'%Y-%m-%dT%H:%M:%S')))
|
||||
# filter out records before current batch
|
||||
instance_usage_df = instance_usage_df.filter(
|
||||
instance_usage_df.lastrecord_timestamp_string >=
|
||||
most_recent_batch_time)
|
||||
most_recent_batch_time_truncated)
|
||||
|
||||
# determine the timestamp of the most recent top-of-the-hour (which
|
||||
# is the end of the current batch).
|
||||
|
@ -351,6 +465,9 @@ class PreHourlyProcessor(Processor):
|
|||
minute=0, second=0, microsecond=0)
|
||||
|
||||
# filter out records after current batch
|
||||
log.debug("filter out records after : %s" % (
|
||||
truncated_timestamp_to_current_hour.strftime(
|
||||
'%Y-%m-%dT%H:%M:%S')))
|
||||
instance_usage_df = instance_usage_df.filter(
|
||||
instance_usage_df.firstrecord_timestamp_string <
|
||||
truncated_timestamp_to_current_hour)
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
|
@ -21,10 +22,15 @@ from collections import defaultdict
|
|||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
from monasca_common.kafka_lib.common import OffsetResponse
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.mysql_offset_specs import MySQLOffsetSpecs
|
||||
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
|
||||
|
||||
from tests.functional.component.insert.dummy_insert import DummyInsert
|
||||
from tests.functional.json_offset_specs import JSONOffsetSpecs
|
||||
from tests.functional.messaging.adapter import DummyAdapter
|
||||
|
@ -49,6 +55,174 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
|
|||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
# get mysql offset specs
|
||||
self.kafka_offset_specs = MySQLOffsetSpecs()
|
||||
|
||||
def add_offset_for_test(self, my_app, my_topic, my_partition,
|
||||
my_from_offset, my_until_offset,
|
||||
my_batch_time):
|
||||
""""utility method to populate mysql db with offsets."""
|
||||
self.kafka_offset_specs.add(topic=my_topic, partition=my_partition,
|
||||
app_name=my_app,
|
||||
from_offset=my_from_offset,
|
||||
until_offset=my_until_offset,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_app_name')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_kafka_topic')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor._get_offsets_from_kafka')
|
||||
def test_get_processing_offset_range_list(self,
|
||||
kafka_get_offsets,
|
||||
kafka_topic_name,
|
||||
app_name):
|
||||
|
||||
# setup
|
||||
my_app = uuidutils.generate_uuid()
|
||||
my_topic = uuidutils.generate_uuid()
|
||||
|
||||
# mock app_name, topic_name, partition
|
||||
app_name.return_value = my_app
|
||||
kafka_topic_name.return_value = my_topic
|
||||
my_partition = 1
|
||||
|
||||
ret_offset_key = "_".join((my_topic, str(my_partition)))
|
||||
kafka_get_offsets.side_effect = [
|
||||
# mock latest offsets
|
||||
{ret_offset_key: OffsetResponse(topic=my_topic,
|
||||
partition=my_partition,
|
||||
error=None,
|
||||
offsets=[30])},
|
||||
# mock earliest offsets
|
||||
{ret_offset_key: OffsetResponse(topic=my_topic,
|
||||
partition=my_partition,
|
||||
error=None,
|
||||
offsets=[0])}
|
||||
]
|
||||
|
||||
# add offsets
|
||||
my_until_offset = 0
|
||||
my_from_offset = 10
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
self.add_offset_for_test(my_app, my_topic,
|
||||
my_partition, my_until_offset,
|
||||
my_from_offset, my_batch_time)
|
||||
|
||||
my_until_offset_2 = 10
|
||||
my_from_offset_2 = 20
|
||||
my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
self.add_offset_for_test(my_app, my_topic,
|
||||
my_partition, my_until_offset_2,
|
||||
my_from_offset_2, my_batch_time_2)
|
||||
|
||||
# get latest offset spec as dict
|
||||
current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# use mysql offset repositories
|
||||
cfg.CONF.set_override(
|
||||
'offsets',
|
||||
'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs',
|
||||
group='repositories')
|
||||
|
||||
# list of pyspark.streaming.kafka.OffsetRange objects
|
||||
offset_range_list = PreHourlyProcessor.\
|
||||
get_processing_offset_range_list(
|
||||
current_batch_time)
|
||||
|
||||
self.assertEqual(my_partition,
|
||||
offset_range_list[0].partition)
|
||||
self.assertEqual(my_topic,
|
||||
offset_range_list[0].topic)
|
||||
self.assertEqual(20,
|
||||
offset_range_list[0].fromOffset)
|
||||
self.assertEqual(30,
|
||||
offset_range_list[0].untilOffset)
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_app_name')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_kafka_topic')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor._get_offsets_from_kafka')
|
||||
def test_get_effective_offset_range_list(self,
|
||||
kafka_get_offsets,
|
||||
kafka_topic_name,
|
||||
app_name):
|
||||
# setup
|
||||
my_app = uuidutils.generate_uuid()
|
||||
my_topic = uuidutils.generate_uuid()
|
||||
|
||||
# mock app_name, topic_name, partition
|
||||
app_name.return_value = my_app
|
||||
kafka_topic_name.return_value = my_topic
|
||||
my_partition = 1
|
||||
|
||||
ret_offset_key = "_".join((my_topic, str(my_partition)))
|
||||
kafka_get_offsets.side_effect = [
|
||||
# mock latest offsets in kafka
|
||||
{ret_offset_key: OffsetResponse(topic=my_topic,
|
||||
partition=my_partition,
|
||||
error=None,
|
||||
offsets=[3000])},
|
||||
# mock earliest offsets in kafka
|
||||
{ret_offset_key: OffsetResponse(topic=my_topic,
|
||||
partition=my_partition,
|
||||
error=None,
|
||||
offsets=[0])}
|
||||
]
|
||||
|
||||
# add offsets
|
||||
my_until_offset = 500
|
||||
my_from_offset = 1000
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
self.add_offset_for_test(my_app, my_topic,
|
||||
my_partition, my_until_offset,
|
||||
my_from_offset, my_batch_time)
|
||||
|
||||
my_until_offset_2 = 1000
|
||||
my_from_offset_2 = 2000
|
||||
my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
self.add_offset_for_test(my_app, my_topic,
|
||||
my_partition, my_until_offset_2,
|
||||
my_from_offset_2, my_batch_time_2)
|
||||
|
||||
# get latest offset spec as dict
|
||||
current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# use mysql offset repositories
|
||||
cfg.CONF.set_override(
|
||||
'offsets',
|
||||
'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs',
|
||||
group='repositories')
|
||||
|
||||
# list of pyspark.streaming.kafka.OffsetRange objects
|
||||
offset_range_list = PreHourlyProcessor.\
|
||||
get_processing_offset_range_list(
|
||||
current_batch_time)
|
||||
|
||||
# effective batch range list
|
||||
# should cover range of starting from (latest - 1) offset version to
|
||||
# latest
|
||||
offset_range_list = PreHourlyProcessor.get_effective_offset_range_list(
|
||||
offset_range_list)
|
||||
|
||||
self.assertEqual(my_partition,
|
||||
offset_range_list[0].partition)
|
||||
self.assertEqual(my_topic,
|
||||
offset_range_list[0].topic)
|
||||
self.assertEqual(500,
|
||||
offset_range_list[0].fromOffset)
|
||||
self.assertEqual(3000,
|
||||
offset_range_list[0].untilOffset)
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
|
|
|
@ -164,3 +164,48 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
|
|||
int(offset_value.get_from_offset()))
|
||||
self.assertEqual(used_value.get('app_name'),
|
||||
offset_value.get_app_name())
|
||||
|
||||
def test_get_offset_by_revision(self):
|
||||
topic_1 = uuidutils.generate_uuid()
|
||||
partition_1 = 0
|
||||
until_offset_1 = 10
|
||||
from_offset_1 = 0
|
||||
app_name_1 = uuidutils.generate_uuid()
|
||||
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_1,
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
until_offset_2 = 20
|
||||
from_offset_2 = 10
|
||||
my_batch_time2 = datetime.datetime.strptime('2016-01-01 01:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_2,
|
||||
until_offset=until_offset_2,
|
||||
batch_time_info=my_batch_time2)
|
||||
|
||||
# get penultimate revision
|
||||
penultimate_revision = 2
|
||||
kafka_offset_specs = self.kafka_offset_specs\
|
||||
.get_kafka_offsets_by_revision(app_name_1,
|
||||
penultimate_revision)
|
||||
|
||||
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
|
||||
offset_value_1 = kafka_offset_specs.get(offset_key_1)
|
||||
|
||||
used_values = {}
|
||||
used_values[offset_key_1] = {
|
||||
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
|
||||
"from_offset": from_offset_1, "until_offset": until_offset_1
|
||||
}
|
||||
|
||||
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
|
||||
offset_value=offset_value_1)
|
||||
|
|
|
@ -14,8 +14,16 @@ enable_pre_hourly_processor = False
|
|||
enable_instance_usage_df_cache = False
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
effective_batch_revision = 2
|
||||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_debug_log_entries = true
|
||||
|
||||
[database]
|
||||
server_type = mysql:thin
|
||||
host = localhost
|
||||
database_name = monasca_transform
|
||||
username = m-transform
|
||||
password = password
|
Loading…
Reference in New Issue