Made changes to prevent multiple metrics in the same batch.

Change-Id: Iec9935c21d8b65bf79067d4a009859c898b75993
This commit is contained in:
Flint Calvin 2016-08-30 18:13:01 +00:00
parent 424077c581
commit bf2e42b3e0
11 changed files with 176 additions and 5 deletions

View File

@ -26,6 +26,7 @@ pre_hourly_processor_enabled = True
[pre_hourly_processor]
enable_instance_usage_df_cache = True
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
#
# Configurable values for the monasca-transform service

View File

@ -26,6 +26,7 @@ enable_pre_hourly_processor = True
[pre_hourly_processor]
enable_instance_usage_df_cache = True
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
#
# Configurable values for the monasca-transform service

View File

@ -135,7 +135,8 @@ class ConfigInitializer(object):
def load_pre_hourly_processor_options():
app_opts = [
cfg.BoolOpt('enable_instance_usage_df_cache'),
cfg.StrOpt('instance_usage_df_cache_storage_level')
cfg.StrOpt('instance_usage_df_cache_storage_level'),
cfg.BoolOpt('enable_batch_time_filtering')
]
app_group = cfg.OptGroup(name='pre_hourly_processor',
title='pre_hourly_processor')

View File

@ -88,6 +88,22 @@ class MySQLOffsetSpecs(OffsetSpecs):
MySQLOffsetSpec.app_name == app_name,
MySQLOffsetSpec.revision == 1).all()}
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
try:
# get partition 0 as a representative of all others
offset = self.session.query(MySQLOffsetSpec).filter(
MySQLOffsetSpec.app_name == app_name,
MySQLOffsetSpec.topic == topic,
MySQLOffsetSpec.partition == 0,
MySQLOffsetSpec.revision == 1).one()
most_recent_batch_time = datetime.datetime.strptime(
offset.get_batch_time(),
'%Y-%m-%d %H:%M:%S')
except Exception:
most_recent_batch_time = None
return most_recent_batch_time
def delete_all_kafka_offsets(self, app_name):
try:
self.session.query(MySQLOffsetSpec).filter(

View File

@ -98,6 +98,13 @@ class OffsetSpecs(object):
"Class %s doesn't implement delete_all_kafka_offsets()"
% self.__class__.__name__)
@abc.abstractmethod
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
raise NotImplementedError(
"Class %s doesn't implement "
"get_most_recent_batch_time_from_offsets()"
% self.__class__.__name__)
class JSONOffsetSpecs(OffsetSpecs):
@ -190,6 +197,19 @@ class JSONOffsetSpecs(OffsetSpecs):
def get_kafka_offsets(self, app_name):
return self._kafka_offsets
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
try:
# get partition 0 as a representative of all others
key = "%s_%s_%s" % (app_name, topic, 0)
offset = self._kafka_offsets[key]
most_recent_batch_time = datetime.datetime.strptime(
offset.get_batch_time(),
'%Y-%m-%d %H:%M:%S')
except Exception:
most_recent_batch_time = None
return most_recent_batch_time
def delete_all_kafka_offsets(self, app_name):
log.info("Deleting json offsets file: %s", self.kafka_offset_spec_file)
os.remove(self.kafka_offset_spec_file)

View File

@ -244,7 +244,7 @@ class PreHourlyProcessor(Processor):
available in kafka.
"""
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
offset_specifications = PreHourlyProcessor.get_offset_specs()
# get application name, will be used to get offsets from database
app_name = PreHourlyProcessor.get_app_name()
@ -277,6 +277,12 @@ class PreHourlyProcessor(Processor):
saved_offset_spec)
return offset_range_list
@staticmethod
def get_offset_specs():
"""get offset specifications.
"""
return simport.load(cfg.CONF.repositories.offsets)()
@staticmethod
def fetch_pre_hourly_data(spark_context,
offset_range_list):
@ -305,6 +311,45 @@ class PreHourlyProcessor(Processor):
instance_usage_df = InstanceUsageUtils.create_df_from_json_rdd(
sqlc, instance_usage_rdd)
if cfg.CONF.pre_hourly_processor.enable_batch_time_filtering:
instance_usage_df = (
PreHourlyProcessor.filter_out_records_not_in_current_batch(
instance_usage_df))
return instance_usage_df
@staticmethod
def filter_out_records_not_in_current_batch(instance_usage_df):
"""Filter out any records which don't pertain to the
current batch (i.e., records before or after the
batch currently being processed).
"""
# get the most recent batch time from the stored offsets
offset_specifications = PreHourlyProcessor.get_offset_specs()
app_name = PreHourlyProcessor.get_app_name()
topic = PreHourlyProcessor.get_kafka_topic()
most_recent_batch_time = (
offset_specifications.get_most_recent_batch_time_from_offsets(
app_name, topic))
if most_recent_batch_time:
# filter out records before current batch
instance_usage_df = instance_usage_df.filter(
instance_usage_df.lastrecord_timestamp_string >=
most_recent_batch_time)
# determine the timestamp of the most recent top-of-the-hour (which
# is the end of the current batch).
current_time = datetime.datetime.now()
truncated_timestamp_to_current_hour = current_time.replace(
minute=0, second=0, microsecond=0)
# filter out records after current batch
instance_usage_df = instance_usage_df.filter(
instance_usage_df.firstrecord_timestamp_string <
truncated_timestamp_to_current_hour)
return instance_usage_df
@staticmethod

View File

@ -12,7 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
import random
import sys
import unittest
import uuid
from oslo_config import cfg
from pyspark.streaming.kafka import OffsetRange
@ -26,9 +30,13 @@ from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.metrics_pre_hourly_data.data_provider \
import DataProvider
from monasca_transform.offset_specs import JSONOffsetSpecs
class TestPreHourlyProcessorAgg(SparkContextTest):
test_resources_path = 'tests/unit/test_resources'
def setUp(self):
super(TestPreHourlyProcessorAgg, self).setUp()
# configure the system with a dummy messaging adapter
@ -43,19 +51,44 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
DummyInsert)
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.get_offset_specs')
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.fetch_pre_hourly_data')
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.get_processing_offset_range_list')
def test_pre_hourly_processor(self,
offset_range_list,
pre_hourly_data):
pre_hourly_data,
offset_specs):
# load components
myOffsetRanges = [
OffsetRange("metrics_pre_hourly", 1, 10, 20)]
OffsetRange("metrics_pre_hourly", 0, 10, 20)]
offset_range_list.return_value = myOffsetRanges
filename = '%s.json' % str(uuid.uuid4())
file_path = os.path.join(self.test_resources_path, filename)
json_offset_specs = JSONOffsetSpecs(
path=self.test_resources_path,
filename=filename
)
app_name = "mon_metrics_kafka_pre_hourly"
topic = "metrics_pre_hourly"
partition = 0
until_offset = random.randint(0, sys.maxsize)
from_offset = random.randint(0, sys.maxsize)
my_batch_time = self.get_dummy_batch_time()
json_offset_specs.add(topic=topic, partition=partition,
app_name=app_name,
from_offset=from_offset,
until_offset=until_offset,
batch_time_info=my_batch_time)
offset_specs.return_value = json_offset_specs
# Create an RDD out of the mocked instance usage data
with open(DataProvider.metrics_pre_hourly_data_path) as f:
raw_lines = f.read().splitlines()
@ -167,13 +200,14 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
swift_disk_rate_agg_metric.get('metric')
.get('dimensions').get('aggregation_period'))
os.remove(file_path)
def simple_count_transform(self, rdd):
return rdd.count()
if __name__ == "__main__":
print("PATH *************************************************************")
import sys
print(sys.path)
print("PATH==============================================================")
unittest.main()

View File

@ -234,6 +234,36 @@ class TestJSONOffsetSpecs(unittest.TestCase):
offset_value_updated.get('until_offset'))
os.remove(file_path)
def test_get_most_recent_batch_time(self):
filename = '%s.json' % str(uuid.uuid4())
file_path = os.path.join(self.test_resources_path, filename)
json_offset_specs = JSONOffsetSpecs(
path=self.test_resources_path,
filename=filename
)
app_name = "mon_metrics_kafka"
topic_1 = str(uuid.uuid4())
partition_1 = 0
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
my_batch_time = self.get_dummy_batch_time()
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name,
from_offset=from_offset_1,
until_offset=until_offset_1,
batch_time_info=my_batch_time)
most_recent_batch_time = (
json_offset_specs.get_most_recent_batch_time_from_offsets(
app_name, topic_1))
self.assertEqual(most_recent_batch_time, my_batch_time)
os.remove(file_path)
def load_offset_file_as_json(self, file_path):
with open(file_path, 'r') as f:
json_file = json.load(f)

View File

@ -131,6 +131,27 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
self.assertEqual(until_offset_2,
updated_offset_value.get_until_offset())
def test_get_most_recent_batch_time(self):
topic_1 = str(uuid.uuid4())
partition_1 = 0
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
my_batch_time = self.get_dummy_batch_time()
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1,
batch_time_info=my_batch_time)
most_recent_batch_time = (
self.kafka_offset_specs.get_most_recent_batch_time_from_offsets(
app_name_1, topic_1))
self.assertEqual(most_recent_batch_time, my_batch_time)
def assertions_on_offset(self, used_value=None, offset_value=None):
self.assertEqual(used_value.get('topic'),
offset_value.get_topic())

View File

@ -19,6 +19,7 @@ enable_pre_hourly_processor = False
[pre_hourly_processor]
enable_instance_usage_df_cache = False
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
[service]
enable_record_store_df_cache = False

View File

@ -9,6 +9,7 @@ enable_pre_hourly_processor = False
[pre_hourly_processor]
enable_instance_usage_df_cache = False
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_batch_time_filtering = True
[service]
enable_record_store_df_cache = False