Modifications to include processing_meta in pre-hourly metrics.

Change-Id: I3464008cf8695864b75cbbbfd6570db5defa8cb5
This commit is contained in:
Flint Calvin 2016-08-14 01:19:47 +00:00
parent 615e52d5cd
commit 0365bfa5bb
9 changed files with 109 additions and 26 deletions

View File

@ -87,6 +87,10 @@ class InsertComponent(Component):
"lastrecord_timestamp_string":
instance_usage_dict.get(
"lastrecord_timestamp_string",
Component.DEFAULT_UNAVAILABLE_VALUE),
"processing_meta":
instance_usage_dict.get(
"processing_meta",
Component.DEFAULT_UNAVAILABLE_VALUE)}
metric_part = {"name": instance_usage_dict.get(
@ -108,6 +112,11 @@ class InsertComponent(Component):
"""write data to kafka. extracts and formats
metric data and write s the data to kafka
"""
try:
processing_meta = row.processing_meta
except AttributeError:
processing_meta = {}
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,
"resource_uuid": row.resource_uuid,
@ -130,7 +139,8 @@ class InsertComponent(Component):
"usage_hour": row.usage_hour,
"usage_minute": row.usage_minute,
"aggregation_period":
row.aggregation_period}
row.aggregation_period,
"processing_meta": processing_meta}
metric = InsertComponent._prepare_metric(instance_usage_dict,
agg_params)
return metric
@ -141,8 +151,13 @@ class InsertComponent(Component):
"""write data to kafka. extracts and formats
metric data and writes the data to kafka
"""
# add transform spec metric id to processing meta
processing_meta = {"metric_id": metric_id}
# retrieve the processing meta from the row
processing_meta = row.processing_meta
# add transform spec metric id to the processing meta
if processing_meta:
processing_meta["metric_id"] = metric_id
else:
processing_meta = {"metric_id": metric_id}
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,

View File

@ -22,29 +22,47 @@ from monasca_transform.transform.transform_utils import InstanceUsageUtils
import json
class PreHourlyCalculateRateException(Exception):
"""Exception thrown when doing pre-hourly rate calculations
Attributes:
value: string representing the error
"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class PreHourlyCalculateRate(SetterComponent):
@staticmethod
def _calculate_rate(instance_usage_df):
instance_usage_data_json_list = []
sorted_oldest_ascending_df = instance_usage_df.sort(
functions.asc("processing_meta.oldest_timestamp_string"))
try:
sorted_oldest_ascending_df = instance_usage_df.sort(
functions.asc("processing_meta.oldest_timestamp_string"))
sorted_latest_descending_df = instance_usage_df.sort(
functions.desc("processing_meta.latest_timestamp_string"))
sorted_latest_descending_df = instance_usage_df.sort(
functions.desc("processing_meta.latest_timestamp_string"))
# Calculate the rate change by percentage
oldest_dict = sorted_oldest_ascending_df.collect()[0].asDict()
oldest_quantity = float(oldest_dict[
"processing_meta"]['oldest_quantity'])
# Calculate the rate change by percentage
oldest_dict = sorted_oldest_ascending_df.collect()[0].asDict()
oldest_quantity = float(oldest_dict[
"processing_meta"]["oldest_quantity"])
latest_dict = sorted_latest_descending_df.collect()[0].asDict()
latest_quantity = float(latest_dict[
"processing_meta"]['latest_quantity'])
latest_dict = sorted_latest_descending_df.collect()[0].asDict()
latest_quantity = float(latest_dict[
"processing_meta"]["latest_quantity"])
rate_percentage = 100 * (
(latest_quantity - oldest_quantity) / oldest_quantity)
rate_percentage = 100 * (
(latest_quantity - oldest_quantity) / oldest_quantity)
except Exception as e:
raise PreHourlyCalculateRateException(
"Exception occurred in pre-hourly rate calculation. Error: %s"
% str(e))
# create a new instance usage dict
instance_usage_dict = {"tenant_id":

View File

@ -104,6 +104,11 @@ class RollupQuantity(SetterComponent):
select_quant_str = "".join((setter_rollup_operation, "(quantity)"))
quantity = getattr(row, select_quant_str, 0.0)
try:
processing_meta = row.processing_meta
except AttributeError:
processing_meta = {}
# create a new instance usage dict
instance_usage_dict = {"tenant_id": getattr(row, "tenant_id",
"all"),
@ -147,7 +152,8 @@ class RollupQuantity(SetterComponent):
getattr(row, "usage_minute", "all"),
"aggregation_period":
getattr(row, "aggregation_period",
"all")
"all"),
"processing_meta": processing_meta
}
instance_usage_data_json = json.dumps(instance_usage_dict)

View File

@ -33,6 +33,11 @@ class SetAggregatedMetricName(SetterComponent):
agg_params = instance_usage_agg_params.agg_params
try:
processing_meta = row.processing_meta
except AttributeError:
processing_meta = {}
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,
"resource_uuid": row.resource_uuid,
@ -58,7 +63,8 @@ class SetAggregatedMetricName(SetterComponent):
"usage_date": row.usage_date,
"usage_hour": row.usage_hour,
"usage_minute": row.usage_minute,
"aggregation_period": row.aggregation_period}
"aggregation_period": row.aggregation_period,
"processing_meta": processing_meta}
instance_usage_data_json = json.dumps(instance_usage_dict)

View File

@ -33,6 +33,11 @@ class SetAggregatedPeriod(SetterComponent):
agg_params = instance_usage_agg_params.agg_params
try:
processing_meta = row.processing_meta
except AttributeError:
processing_meta = {}
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,
"resource_uuid": row.resource_uuid,
@ -59,7 +64,8 @@ class SetAggregatedPeriod(SetterComponent):
"usage_hour": row.usage_hour,
"usage_minute": row.usage_minute,
"aggregation_period":
agg_params["aggregation_period"]}
agg_params["aggregation_period"],
"processing_meta": processing_meta}
instance_usage_data_json = json.dumps(instance_usage_dict)

View File

@ -87,10 +87,10 @@ class CalculateRate(UsageComponent):
# Calculate the rate change by percentage
oldest_dict = oldest_rolled_up_instance_usage_df.collect()[0].asDict()
oldest_quantity = oldest_dict['quantity']
oldest_quantity = float(oldest_dict['quantity'])
latest_dict = latest_rolled_up_instance_usage_df.collect()[0].asDict()
latest_quantity = latest_dict['quantity']
latest_quantity = float(latest_dict['quantity'])
rate_percentage = \
((latest_quantity - oldest_quantity) / oldest_quantity) * 100

View File

@ -31,6 +31,10 @@ from monasca_common.simport import simport
from oslo_config import cfg
import time
from monasca_transform.component.usage.fetch_quantity import \
FetchQuantityException
from monasca_transform.component.usage.fetch_quantity_util import \
FetchQuantityUtilException
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.transform.builder.generic_transform_builder \
import GenericTransformBuilder
@ -278,8 +282,17 @@ class MonMetricsKafkaProcessor(object):
transform_context_info=transform_context,
transform_spec_df_info=transform_spec_df)
MonMetricsKafkaProcessor.process_metric(
transform_context, source_record_store_df)
try:
MonMetricsKafkaProcessor.process_metric(
transform_context, source_record_store_df)
except FetchQuantityException:
raise
except FetchQuantityUtilException:
raise
except Exception as e:
MonMetricsKafkaProcessor.log_debug(
"Exception raised in metric processing for metric: " +
str(metric_id) + ". Error: " + str(e))
@staticmethod
def rdd_to_recordstore(rdd_transform_context_rdd):

View File

@ -1187,6 +1187,25 @@ class SparkTest(SparkContextTest):
.get('value_meta')
.get('lastrecord_timestamp_string'))
self.assertEqual('3363.4285714285716',
diskusage_rate_agg_metric.get('metric')
.get('value_meta')
.get('processing_meta').get('latest_quantity'))
self.assertEqual('2016-06-10 20:27:01',
diskusage_rate_agg_metric.get('metric')
.get('value_meta')
.get('processing_meta')
.get('latest_timestamp_string'))
self.assertEqual('2721.0',
diskusage_rate_agg_metric.get('metric')
.get('value_meta')
.get('processing_meta').get('oldest_quantity'))
self.assertEqual('2016-06-10 20:27:01',
diskusage_rate_agg_metric.get('metric')
.get('value_meta')
.get('processing_meta')
.get('oldest_timestamp_string'))
# Verify nova.vm.cpu.total_allocated_agg metrics
nova_vm_cpu_total_alloc_agg_metric = [
value for value in metrics

View File

@ -4,6 +4,6 @@
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":10283.1})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":10283.1})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":10283.1})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:27:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 4575.0, "latest_timestamp_string": "2016-06-10 20:27:02", "latest_quantity": 5291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:27:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":15.6502})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:37:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 5575.0, "latest_timestamp_string": "2016-06-10 20:37:02", "latest_quantity": 6291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:37:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":16.6502})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:47:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 6575.0, "latest_timestamp_string": "2016-06-10 20:47:02", "latest_quantity": 7291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:47:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":17.6502})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:27:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": "4575.0", "latest_timestamp_string": "2016-06-10 20:27:02", "latest_quantity": "5291.0", "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:27:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":15.6502})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:37:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": "5575.0", "latest_timestamp_string": "2016-06-10 20:37:02", "latest_quantity": "6291.0", "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:37:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":16.6502})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:47:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": "6575.0", "latest_timestamp_string": "2016-06-10 20:47:02", "latest_quantity": "7291.0", "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:47:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":17.6502})