Made changes to prevent multiple metrics in the same batch.
Change-Id: Iec9935c21d8b65bf79067d4a009859c898b75993
This commit is contained in:
parent
424077c581
commit
bf2e42b3e0
|
@ -26,6 +26,7 @@ pre_hourly_processor_enabled = True
|
||||||
[pre_hourly_processor]
|
[pre_hourly_processor]
|
||||||
enable_instance_usage_df_cache = True
|
enable_instance_usage_df_cache = True
|
||||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||||
|
enable_batch_time_filtering = True
|
||||||
|
|
||||||
#
|
#
|
||||||
# Configurable values for the monasca-transform service
|
# Configurable values for the monasca-transform service
|
||||||
|
|
|
@ -26,6 +26,7 @@ enable_pre_hourly_processor = True
|
||||||
[pre_hourly_processor]
|
[pre_hourly_processor]
|
||||||
enable_instance_usage_df_cache = True
|
enable_instance_usage_df_cache = True
|
||||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||||
|
enable_batch_time_filtering = True
|
||||||
|
|
||||||
#
|
#
|
||||||
# Configurable values for the monasca-transform service
|
# Configurable values for the monasca-transform service
|
||||||
|
|
|
@ -135,7 +135,8 @@ class ConfigInitializer(object):
|
||||||
def load_pre_hourly_processor_options():
|
def load_pre_hourly_processor_options():
|
||||||
app_opts = [
|
app_opts = [
|
||||||
cfg.BoolOpt('enable_instance_usage_df_cache'),
|
cfg.BoolOpt('enable_instance_usage_df_cache'),
|
||||||
cfg.StrOpt('instance_usage_df_cache_storage_level')
|
cfg.StrOpt('instance_usage_df_cache_storage_level'),
|
||||||
|
cfg.BoolOpt('enable_batch_time_filtering')
|
||||||
]
|
]
|
||||||
app_group = cfg.OptGroup(name='pre_hourly_processor',
|
app_group = cfg.OptGroup(name='pre_hourly_processor',
|
||||||
title='pre_hourly_processor')
|
title='pre_hourly_processor')
|
||||||
|
|
|
@ -88,6 +88,22 @@ class MySQLOffsetSpecs(OffsetSpecs):
|
||||||
MySQLOffsetSpec.app_name == app_name,
|
MySQLOffsetSpec.app_name == app_name,
|
||||||
MySQLOffsetSpec.revision == 1).all()}
|
MySQLOffsetSpec.revision == 1).all()}
|
||||||
|
|
||||||
|
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
|
||||||
|
try:
|
||||||
|
# get partition 0 as a representative of all others
|
||||||
|
offset = self.session.query(MySQLOffsetSpec).filter(
|
||||||
|
MySQLOffsetSpec.app_name == app_name,
|
||||||
|
MySQLOffsetSpec.topic == topic,
|
||||||
|
MySQLOffsetSpec.partition == 0,
|
||||||
|
MySQLOffsetSpec.revision == 1).one()
|
||||||
|
most_recent_batch_time = datetime.datetime.strptime(
|
||||||
|
offset.get_batch_time(),
|
||||||
|
'%Y-%m-%d %H:%M:%S')
|
||||||
|
except Exception:
|
||||||
|
most_recent_batch_time = None
|
||||||
|
|
||||||
|
return most_recent_batch_time
|
||||||
|
|
||||||
def delete_all_kafka_offsets(self, app_name):
|
def delete_all_kafka_offsets(self, app_name):
|
||||||
try:
|
try:
|
||||||
self.session.query(MySQLOffsetSpec).filter(
|
self.session.query(MySQLOffsetSpec).filter(
|
||||||
|
|
|
@ -98,6 +98,13 @@ class OffsetSpecs(object):
|
||||||
"Class %s doesn't implement delete_all_kafka_offsets()"
|
"Class %s doesn't implement delete_all_kafka_offsets()"
|
||||||
% self.__class__.__name__)
|
% self.__class__.__name__)
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
|
||||||
|
raise NotImplementedError(
|
||||||
|
"Class %s doesn't implement "
|
||||||
|
"get_most_recent_batch_time_from_offsets()"
|
||||||
|
% self.__class__.__name__)
|
||||||
|
|
||||||
|
|
||||||
class JSONOffsetSpecs(OffsetSpecs):
|
class JSONOffsetSpecs(OffsetSpecs):
|
||||||
|
|
||||||
|
@ -190,6 +197,19 @@ class JSONOffsetSpecs(OffsetSpecs):
|
||||||
def get_kafka_offsets(self, app_name):
|
def get_kafka_offsets(self, app_name):
|
||||||
return self._kafka_offsets
|
return self._kafka_offsets
|
||||||
|
|
||||||
|
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
|
||||||
|
try:
|
||||||
|
# get partition 0 as a representative of all others
|
||||||
|
key = "%s_%s_%s" % (app_name, topic, 0)
|
||||||
|
offset = self._kafka_offsets[key]
|
||||||
|
most_recent_batch_time = datetime.datetime.strptime(
|
||||||
|
offset.get_batch_time(),
|
||||||
|
'%Y-%m-%d %H:%M:%S')
|
||||||
|
except Exception:
|
||||||
|
most_recent_batch_time = None
|
||||||
|
|
||||||
|
return most_recent_batch_time
|
||||||
|
|
||||||
def delete_all_kafka_offsets(self, app_name):
|
def delete_all_kafka_offsets(self, app_name):
|
||||||
log.info("Deleting json offsets file: %s", self.kafka_offset_spec_file)
|
log.info("Deleting json offsets file: %s", self.kafka_offset_spec_file)
|
||||||
os.remove(self.kafka_offset_spec_file)
|
os.remove(self.kafka_offset_spec_file)
|
||||||
|
|
|
@ -244,7 +244,7 @@ class PreHourlyProcessor(Processor):
|
||||||
available in kafka.
|
available in kafka.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
|
offset_specifications = PreHourlyProcessor.get_offset_specs()
|
||||||
|
|
||||||
# get application name, will be used to get offsets from database
|
# get application name, will be used to get offsets from database
|
||||||
app_name = PreHourlyProcessor.get_app_name()
|
app_name = PreHourlyProcessor.get_app_name()
|
||||||
|
@ -277,6 +277,12 @@ class PreHourlyProcessor(Processor):
|
||||||
saved_offset_spec)
|
saved_offset_spec)
|
||||||
return offset_range_list
|
return offset_range_list
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_offset_specs():
|
||||||
|
"""get offset specifications.
|
||||||
|
"""
|
||||||
|
return simport.load(cfg.CONF.repositories.offsets)()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def fetch_pre_hourly_data(spark_context,
|
def fetch_pre_hourly_data(spark_context,
|
||||||
offset_range_list):
|
offset_range_list):
|
||||||
|
@ -305,6 +311,45 @@ class PreHourlyProcessor(Processor):
|
||||||
instance_usage_df = InstanceUsageUtils.create_df_from_json_rdd(
|
instance_usage_df = InstanceUsageUtils.create_df_from_json_rdd(
|
||||||
sqlc, instance_usage_rdd)
|
sqlc, instance_usage_rdd)
|
||||||
|
|
||||||
|
if cfg.CONF.pre_hourly_processor.enable_batch_time_filtering:
|
||||||
|
instance_usage_df = (
|
||||||
|
PreHourlyProcessor.filter_out_records_not_in_current_batch(
|
||||||
|
instance_usage_df))
|
||||||
|
|
||||||
|
return instance_usage_df
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def filter_out_records_not_in_current_batch(instance_usage_df):
|
||||||
|
"""Filter out any records which don't pertain to the
|
||||||
|
current batch (i.e., records before or after the
|
||||||
|
batch currently being processed).
|
||||||
|
"""
|
||||||
|
# get the most recent batch time from the stored offsets
|
||||||
|
|
||||||
|
offset_specifications = PreHourlyProcessor.get_offset_specs()
|
||||||
|
app_name = PreHourlyProcessor.get_app_name()
|
||||||
|
topic = PreHourlyProcessor.get_kafka_topic()
|
||||||
|
most_recent_batch_time = (
|
||||||
|
offset_specifications.get_most_recent_batch_time_from_offsets(
|
||||||
|
app_name, topic))
|
||||||
|
|
||||||
|
if most_recent_batch_time:
|
||||||
|
# filter out records before current batch
|
||||||
|
instance_usage_df = instance_usage_df.filter(
|
||||||
|
instance_usage_df.lastrecord_timestamp_string >=
|
||||||
|
most_recent_batch_time)
|
||||||
|
|
||||||
|
# determine the timestamp of the most recent top-of-the-hour (which
|
||||||
|
# is the end of the current batch).
|
||||||
|
current_time = datetime.datetime.now()
|
||||||
|
truncated_timestamp_to_current_hour = current_time.replace(
|
||||||
|
minute=0, second=0, microsecond=0)
|
||||||
|
|
||||||
|
# filter out records after current batch
|
||||||
|
instance_usage_df = instance_usage_df.filter(
|
||||||
|
instance_usage_df.firstrecord_timestamp_string <
|
||||||
|
truncated_timestamp_to_current_hour)
|
||||||
|
|
||||||
return instance_usage_df
|
return instance_usage_df
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
|
@ -12,7 +12,11 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
import mock
|
import mock
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
import sys
|
||||||
import unittest
|
import unittest
|
||||||
|
import uuid
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from pyspark.streaming.kafka import OffsetRange
|
from pyspark.streaming.kafka import OffsetRange
|
||||||
|
@ -26,9 +30,13 @@ from tests.unit.spark_context_test import SparkContextTest
|
||||||
from tests.unit.test_resources.metrics_pre_hourly_data.data_provider \
|
from tests.unit.test_resources.metrics_pre_hourly_data.data_provider \
|
||||||
import DataProvider
|
import DataProvider
|
||||||
|
|
||||||
|
from monasca_transform.offset_specs import JSONOffsetSpecs
|
||||||
|
|
||||||
|
|
||||||
class TestPreHourlyProcessorAgg(SparkContextTest):
|
class TestPreHourlyProcessorAgg(SparkContextTest):
|
||||||
|
|
||||||
|
test_resources_path = 'tests/unit/test_resources'
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestPreHourlyProcessorAgg, self).setUp()
|
super(TestPreHourlyProcessorAgg, self).setUp()
|
||||||
# configure the system with a dummy messaging adapter
|
# configure the system with a dummy messaging adapter
|
||||||
|
@ -43,19 +51,44 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
|
||||||
|
|
||||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||||
DummyInsert)
|
DummyInsert)
|
||||||
|
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||||
|
'PreHourlyProcessor.get_offset_specs')
|
||||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||||
'PreHourlyProcessor.fetch_pre_hourly_data')
|
'PreHourlyProcessor.fetch_pre_hourly_data')
|
||||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||||
'PreHourlyProcessor.get_processing_offset_range_list')
|
'PreHourlyProcessor.get_processing_offset_range_list')
|
||||||
def test_pre_hourly_processor(self,
|
def test_pre_hourly_processor(self,
|
||||||
offset_range_list,
|
offset_range_list,
|
||||||
pre_hourly_data):
|
pre_hourly_data,
|
||||||
|
offset_specs):
|
||||||
|
|
||||||
# load components
|
# load components
|
||||||
myOffsetRanges = [
|
myOffsetRanges = [
|
||||||
OffsetRange("metrics_pre_hourly", 1, 10, 20)]
|
OffsetRange("metrics_pre_hourly", 0, 10, 20)]
|
||||||
offset_range_list.return_value = myOffsetRanges
|
offset_range_list.return_value = myOffsetRanges
|
||||||
|
|
||||||
|
filename = '%s.json' % str(uuid.uuid4())
|
||||||
|
file_path = os.path.join(self.test_resources_path, filename)
|
||||||
|
json_offset_specs = JSONOffsetSpecs(
|
||||||
|
path=self.test_resources_path,
|
||||||
|
filename=filename
|
||||||
|
)
|
||||||
|
app_name = "mon_metrics_kafka_pre_hourly"
|
||||||
|
topic = "metrics_pre_hourly"
|
||||||
|
partition = 0
|
||||||
|
until_offset = random.randint(0, sys.maxsize)
|
||||||
|
from_offset = random.randint(0, sys.maxsize)
|
||||||
|
|
||||||
|
my_batch_time = self.get_dummy_batch_time()
|
||||||
|
|
||||||
|
json_offset_specs.add(topic=topic, partition=partition,
|
||||||
|
app_name=app_name,
|
||||||
|
from_offset=from_offset,
|
||||||
|
until_offset=until_offset,
|
||||||
|
batch_time_info=my_batch_time)
|
||||||
|
|
||||||
|
offset_specs.return_value = json_offset_specs
|
||||||
|
|
||||||
# Create an RDD out of the mocked instance usage data
|
# Create an RDD out of the mocked instance usage data
|
||||||
with open(DataProvider.metrics_pre_hourly_data_path) as f:
|
with open(DataProvider.metrics_pre_hourly_data_path) as f:
|
||||||
raw_lines = f.read().splitlines()
|
raw_lines = f.read().splitlines()
|
||||||
|
@ -167,13 +200,14 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
|
||||||
swift_disk_rate_agg_metric.get('metric')
|
swift_disk_rate_agg_metric.get('metric')
|
||||||
.get('dimensions').get('aggregation_period'))
|
.get('dimensions').get('aggregation_period'))
|
||||||
|
|
||||||
|
os.remove(file_path)
|
||||||
|
|
||||||
def simple_count_transform(self, rdd):
|
def simple_count_transform(self, rdd):
|
||||||
return rdd.count()
|
return rdd.count()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
print("PATH *************************************************************")
|
print("PATH *************************************************************")
|
||||||
import sys
|
|
||||||
print(sys.path)
|
print(sys.path)
|
||||||
print("PATH==============================================================")
|
print("PATH==============================================================")
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -234,6 +234,36 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
||||||
offset_value_updated.get('until_offset'))
|
offset_value_updated.get('until_offset'))
|
||||||
os.remove(file_path)
|
os.remove(file_path)
|
||||||
|
|
||||||
|
def test_get_most_recent_batch_time(self):
|
||||||
|
filename = '%s.json' % str(uuid.uuid4())
|
||||||
|
file_path = os.path.join(self.test_resources_path, filename)
|
||||||
|
json_offset_specs = JSONOffsetSpecs(
|
||||||
|
path=self.test_resources_path,
|
||||||
|
filename=filename
|
||||||
|
)
|
||||||
|
app_name = "mon_metrics_kafka"
|
||||||
|
|
||||||
|
topic_1 = str(uuid.uuid4())
|
||||||
|
partition_1 = 0
|
||||||
|
until_offset_1 = random.randint(0, sys.maxsize)
|
||||||
|
from_offset_1 = random.randint(0, sys.maxsize)
|
||||||
|
|
||||||
|
my_batch_time = self.get_dummy_batch_time()
|
||||||
|
|
||||||
|
json_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||||
|
app_name=app_name,
|
||||||
|
from_offset=from_offset_1,
|
||||||
|
until_offset=until_offset_1,
|
||||||
|
batch_time_info=my_batch_time)
|
||||||
|
|
||||||
|
most_recent_batch_time = (
|
||||||
|
json_offset_specs.get_most_recent_batch_time_from_offsets(
|
||||||
|
app_name, topic_1))
|
||||||
|
|
||||||
|
self.assertEqual(most_recent_batch_time, my_batch_time)
|
||||||
|
|
||||||
|
os.remove(file_path)
|
||||||
|
|
||||||
def load_offset_file_as_json(self, file_path):
|
def load_offset_file_as_json(self, file_path):
|
||||||
with open(file_path, 'r') as f:
|
with open(file_path, 'r') as f:
|
||||||
json_file = json.load(f)
|
json_file = json.load(f)
|
||||||
|
|
|
@ -131,6 +131,27 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
|
||||||
self.assertEqual(until_offset_2,
|
self.assertEqual(until_offset_2,
|
||||||
updated_offset_value.get_until_offset())
|
updated_offset_value.get_until_offset())
|
||||||
|
|
||||||
|
def test_get_most_recent_batch_time(self):
|
||||||
|
topic_1 = str(uuid.uuid4())
|
||||||
|
partition_1 = 0
|
||||||
|
until_offset_1 = random.randint(0, sys.maxsize)
|
||||||
|
from_offset_1 = random.randint(0, sys.maxsize)
|
||||||
|
app_name_1 = str(uuid.uuid4())
|
||||||
|
|
||||||
|
my_batch_time = self.get_dummy_batch_time()
|
||||||
|
|
||||||
|
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||||
|
app_name=app_name_1,
|
||||||
|
from_offset=from_offset_1,
|
||||||
|
until_offset=until_offset_1,
|
||||||
|
batch_time_info=my_batch_time)
|
||||||
|
|
||||||
|
most_recent_batch_time = (
|
||||||
|
self.kafka_offset_specs.get_most_recent_batch_time_from_offsets(
|
||||||
|
app_name_1, topic_1))
|
||||||
|
|
||||||
|
self.assertEqual(most_recent_batch_time, my_batch_time)
|
||||||
|
|
||||||
def assertions_on_offset(self, used_value=None, offset_value=None):
|
def assertions_on_offset(self, used_value=None, offset_value=None):
|
||||||
self.assertEqual(used_value.get('topic'),
|
self.assertEqual(used_value.get('topic'),
|
||||||
offset_value.get_topic())
|
offset_value.get_topic())
|
||||||
|
|
|
@ -19,6 +19,7 @@ enable_pre_hourly_processor = False
|
||||||
[pre_hourly_processor]
|
[pre_hourly_processor]
|
||||||
enable_instance_usage_df_cache = False
|
enable_instance_usage_df_cache = False
|
||||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||||
|
enable_batch_time_filtering = True
|
||||||
|
|
||||||
[service]
|
[service]
|
||||||
enable_record_store_df_cache = False
|
enable_record_store_df_cache = False
|
||||||
|
|
|
@ -9,6 +9,7 @@ enable_pre_hourly_processor = False
|
||||||
[pre_hourly_processor]
|
[pre_hourly_processor]
|
||||||
enable_instance_usage_df_cache = False
|
enable_instance_usage_df_cache = False
|
||||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||||
|
enable_batch_time_filtering = True
|
||||||
|
|
||||||
[service]
|
[service]
|
||||||
enable_record_store_df_cache = False
|
enable_record_store_df_cache = False
|
||||||
|
|
Loading…
Reference in New Issue