Made changes to prevent multiple metrics in the same batch.
Change-Id: Iec9935c21d8b65bf79067d4a009859c898b75993
This commit is contained in:
parent
424077c581
commit
bf2e42b3e0
|
@ -26,6 +26,7 @@ pre_hourly_processor_enabled = True
|
|||
[pre_hourly_processor]
|
||||
enable_instance_usage_df_cache = True
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
|
|
@ -26,6 +26,7 @@ enable_pre_hourly_processor = True
|
|||
[pre_hourly_processor]
|
||||
enable_instance_usage_df_cache = True
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
|
|
@ -135,7 +135,8 @@ class ConfigInitializer(object):
|
|||
def load_pre_hourly_processor_options():
|
||||
app_opts = [
|
||||
cfg.BoolOpt('enable_instance_usage_df_cache'),
|
||||
cfg.StrOpt('instance_usage_df_cache_storage_level')
|
||||
cfg.StrOpt('instance_usage_df_cache_storage_level'),
|
||||
cfg.BoolOpt('enable_batch_time_filtering')
|
||||
]
|
||||
app_group = cfg.OptGroup(name='pre_hourly_processor',
|
||||
title='pre_hourly_processor')
|
||||
|
|
|
@ -88,6 +88,22 @@ class MySQLOffsetSpecs(OffsetSpecs):
|
|||
MySQLOffsetSpec.app_name == app_name,
|
||||
MySQLOffsetSpec.revision == 1).all()}
|
||||
|
||||
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
|
||||
try:
|
||||
# get partition 0 as a representative of all others
|
||||
offset = self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.app_name == app_name,
|
||||
MySQLOffsetSpec.topic == topic,
|
||||
MySQLOffsetSpec.partition == 0,
|
||||
MySQLOffsetSpec.revision == 1).one()
|
||||
most_recent_batch_time = datetime.datetime.strptime(
|
||||
offset.get_batch_time(),
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
except Exception:
|
||||
most_recent_batch_time = None
|
||||
|
||||
return most_recent_batch_time
|
||||
|
||||
def delete_all_kafka_offsets(self, app_name):
|
||||
try:
|
||||
self.session.query(MySQLOffsetSpec).filter(
|
||||
|
|
|
@ -98,6 +98,13 @@ class OffsetSpecs(object):
|
|||
"Class %s doesn't implement delete_all_kafka_offsets()"
|
||||
% self.__class__.__name__)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement "
|
||||
"get_most_recent_batch_time_from_offsets()"
|
||||
% self.__class__.__name__)
|
||||
|
||||
|
||||
class JSONOffsetSpecs(OffsetSpecs):
|
||||
|
||||
|
@ -190,6 +197,19 @@ class JSONOffsetSpecs(OffsetSpecs):
|
|||
def get_kafka_offsets(self, app_name):
|
||||
return self._kafka_offsets
|
||||
|
||||
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
|
||||
try:
|
||||
# get partition 0 as a representative of all others
|
||||
key = "%s_%s_%s" % (app_name, topic, 0)
|
||||
offset = self._kafka_offsets[key]
|
||||
most_recent_batch_time = datetime.datetime.strptime(
|
||||
offset.get_batch_time(),
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
except Exception:
|
||||
most_recent_batch_time = None
|
||||
|
||||
return most_recent_batch_time
|
||||
|
||||
def delete_all_kafka_offsets(self, app_name):
|
||||
log.info("Deleting json offsets file: %s", self.kafka_offset_spec_file)
|
||||
os.remove(self.kafka_offset_spec_file)
|
||||
|
|
|
@ -244,7 +244,7 @@ class PreHourlyProcessor(Processor):
|
|||
available in kafka.
|
||||
"""
|
||||
|
||||
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
|
||||
offset_specifications = PreHourlyProcessor.get_offset_specs()
|
||||
|
||||
# get application name, will be used to get offsets from database
|
||||
app_name = PreHourlyProcessor.get_app_name()
|
||||
|
@ -277,6 +277,12 @@ class PreHourlyProcessor(Processor):
|
|||
saved_offset_spec)
|
||||
return offset_range_list
|
||||
|
||||
@staticmethod
|
||||
def get_offset_specs():
|
||||
"""get offset specifications.
|
||||
"""
|
||||
return simport.load(cfg.CONF.repositories.offsets)()
|
||||
|
||||
@staticmethod
|
||||
def fetch_pre_hourly_data(spark_context,
|
||||
offset_range_list):
|
||||
|
@ -305,6 +311,45 @@ class PreHourlyProcessor(Processor):
|
|||
instance_usage_df = InstanceUsageUtils.create_df_from_json_rdd(
|
||||
sqlc, instance_usage_rdd)
|
||||
|
||||
if cfg.CONF.pre_hourly_processor.enable_batch_time_filtering:
|
||||
instance_usage_df = (
|
||||
PreHourlyProcessor.filter_out_records_not_in_current_batch(
|
||||
instance_usage_df))
|
||||
|
||||
return instance_usage_df
|
||||
|
||||
@staticmethod
|
||||
def filter_out_records_not_in_current_batch(instance_usage_df):
|
||||
"""Filter out any records which don't pertain to the
|
||||
current batch (i.e., records before or after the
|
||||
batch currently being processed).
|
||||
"""
|
||||
# get the most recent batch time from the stored offsets
|
||||
|
||||
offset_specifications = PreHourlyProcessor.get_offset_specs()
|
||||
app_name = PreHourlyProcessor.get_app_name()
|
||||
topic = PreHourlyProcessor.get_kafka_topic()
|
||||
most_recent_batch_time = (
|
||||
offset_specifications.get_most_recent_batch_time_from_offsets(
|
||||
app_name, topic))
|
||||
|
||||
if most_recent_batch_time:
|
||||
# filter out records before current batch
|
||||
instance_usage_df = instance_usage_df.filter(
|
||||
instance_usage_df.lastrecord_timestamp_string >=
|
||||
most_recent_batch_time)
|
||||
|
||||
# determine the timestamp of the most recent top-of-the-hour (which
|
||||
# is the end of the current batch).
|
||||
current_time = datetime.datetime.now()
|
||||
truncated_timestamp_to_current_hour = current_time.replace(
|
||||
minute=0, second=0, microsecond=0)
|
||||
|
||||
# filter out records after current batch
|
||||
instance_usage_df = instance_usage_df.filter(
|
||||
instance_usage_df.firstrecord_timestamp_string <
|
||||
truncated_timestamp_to_current_hour)
|
||||
|
||||
return instance_usage_df
|
||||
|
||||
@staticmethod
|
||||
|
|
|
@ -12,7 +12,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import mock
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import unittest
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
@ -26,9 +30,13 @@ from tests.unit.spark_context_test import SparkContextTest
|
|||
from tests.unit.test_resources.metrics_pre_hourly_data.data_provider \
|
||||
import DataProvider
|
||||
|
||||
from monasca_transform.offset_specs import JSONOffsetSpecs
|
||||
|
||||
|
||||
class TestPreHourlyProcessorAgg(SparkContextTest):
|
||||
|
||||
test_resources_path = 'tests/unit/test_resources'
|
||||
|
||||
def setUp(self):
|
||||
super(TestPreHourlyProcessorAgg, self).setUp()
|
||||
# configure the system with a dummy messaging adapter
|
||||
|
@ -43,19 +51,44 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
|
|||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_offset_specs')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.fetch_pre_hourly_data')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_processing_offset_range_list')
|
||||
def test_pre_hourly_processor(self,
|
||||
offset_range_list,
|
||||
pre_hourly_data):
|
||||
pre_hourly_data,
|
||||
offset_specs):
|
||||
|
||||
# load components
|
||||
myOffsetRanges = [
|
||||
OffsetRange("metrics_pre_hourly", 1, 10, 20)]
|
||||
OffsetRange("metrics_pre_hourly", 0, 10, 20)]
|
||||
offset_range_list.return_value = myOffsetRanges
|
||||
|
||||
filename = '%s.json' % str(uuid.uuid4())
|
||||
file_path = os.path.join(self.test_resources_path, filename)
|
||||
json_offset_specs = JSONOffsetSpecs(
|
||||
path=self.test_resources_path,
|
||||
filename=filename
|
||||
)
|
||||
app_name = "mon_metrics_kafka_pre_hourly"
|
||||
topic = "metrics_pre_hourly"
|
||||
partition = 0
|
||||
until_offset = random.randint(0, sys.maxsize)
|
||||
from_offset = random.randint(0, sys.maxsize)
|
||||
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
json_offset_specs.add(topic=topic, partition=partition,
|
||||
app_name=app_name,
|
||||
from_offset=from_offset,
|
||||
until_offset=until_offset,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
offset_specs.return_value = json_offset_specs
|
||||
|
||||
# Create an RDD out of the mocked instance usage data
|
||||
with open(DataProvider.metrics_pre_hourly_data_path) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
|
@ -167,13 +200,14 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
|
|||
swift_disk_rate_agg_metric.get('metric')
|
||||
.get('dimensions').get('aggregation_period'))
|
||||
|
||||
os.remove(file_path)
|
||||
|
||||
def simple_count_transform(self, rdd):
|
||||
return rdd.count()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("PATH *************************************************************")
|
||||
import sys
|
||||
print(sys.path)
|
||||
print("PATH==============================================================")
|
||||
unittest.main()
|
||||
|
|
|
@ -234,6 +234,36 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
offset_value_updated.get('until_offset'))
|
||||
os.remove(file_path)
|
||||
|
||||
def test_get_most_recent_batch_time(self):
|
||||
filename = '%s.json' % str(uuid.uuid4())
|
||||
file_path = os.path.join(self.test_resources_path, filename)
|
||||
json_offset_specs = JSONOffsetSpecs(
|
||||
path=self.test_resources_path,
|
||||
filename=filename
|
||||
)
|
||||
app_name = "mon_metrics_kafka"
|
||||
|
||||
topic_1 = str(uuid.uuid4())
|
||||
partition_1 = 0
|
||||
until_offset_1 = random.randint(0, sys.maxsize)
|
||||
from_offset_1 = random.randint(0, sys.maxsize)
|
||||
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
json_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name,
|
||||
from_offset=from_offset_1,
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
most_recent_batch_time = (
|
||||
json_offset_specs.get_most_recent_batch_time_from_offsets(
|
||||
app_name, topic_1))
|
||||
|
||||
self.assertEqual(most_recent_batch_time, my_batch_time)
|
||||
|
||||
os.remove(file_path)
|
||||
|
||||
def load_offset_file_as_json(self, file_path):
|
||||
with open(file_path, 'r') as f:
|
||||
json_file = json.load(f)
|
||||
|
|
|
@ -131,6 +131,27 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
|
|||
self.assertEqual(until_offset_2,
|
||||
updated_offset_value.get_until_offset())
|
||||
|
||||
def test_get_most_recent_batch_time(self):
|
||||
topic_1 = str(uuid.uuid4())
|
||||
partition_1 = 0
|
||||
until_offset_1 = random.randint(0, sys.maxsize)
|
||||
from_offset_1 = random.randint(0, sys.maxsize)
|
||||
app_name_1 = str(uuid.uuid4())
|
||||
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_1,
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
most_recent_batch_time = (
|
||||
self.kafka_offset_specs.get_most_recent_batch_time_from_offsets(
|
||||
app_name_1, topic_1))
|
||||
|
||||
self.assertEqual(most_recent_batch_time, my_batch_time)
|
||||
|
||||
def assertions_on_offset(self, used_value=None, offset_value=None):
|
||||
self.assertEqual(used_value.get('topic'),
|
||||
offset_value.get_topic())
|
||||
|
|
|
@ -19,6 +19,7 @@ enable_pre_hourly_processor = False
|
|||
[pre_hourly_processor]
|
||||
enable_instance_usage_df_cache = False
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
|
|
|
@ -9,6 +9,7 @@ enable_pre_hourly_processor = False
|
|||
[pre_hourly_processor]
|
||||
enable_instance_usage_df_cache = False
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
|
|
Loading…
Reference in New Issue