Modifications to include processing_meta in pre-hourly metrics.
Change-Id: I3464008cf8695864b75cbbbfd6570db5defa8cb5
This commit is contained in:
parent
615e52d5cd
commit
0365bfa5bb
|
@ -87,6 +87,10 @@ class InsertComponent(Component):
|
|||
"lastrecord_timestamp_string":
|
||||
instance_usage_dict.get(
|
||||
"lastrecord_timestamp_string",
|
||||
Component.DEFAULT_UNAVAILABLE_VALUE),
|
||||
"processing_meta":
|
||||
instance_usage_dict.get(
|
||||
"processing_meta",
|
||||
Component.DEFAULT_UNAVAILABLE_VALUE)}
|
||||
|
||||
metric_part = {"name": instance_usage_dict.get(
|
||||
|
@ -108,6 +112,11 @@ class InsertComponent(Component):
|
|||
"""write data to kafka. extracts and formats
|
||||
metric data and write s the data to kafka
|
||||
"""
|
||||
try:
|
||||
processing_meta = row.processing_meta
|
||||
except AttributeError:
|
||||
processing_meta = {}
|
||||
|
||||
instance_usage_dict = {"tenant_id": row.tenant_id,
|
||||
"user_id": row.user_id,
|
||||
"resource_uuid": row.resource_uuid,
|
||||
|
@ -130,7 +139,8 @@ class InsertComponent(Component):
|
|||
"usage_hour": row.usage_hour,
|
||||
"usage_minute": row.usage_minute,
|
||||
"aggregation_period":
|
||||
row.aggregation_period}
|
||||
row.aggregation_period,
|
||||
"processing_meta": processing_meta}
|
||||
metric = InsertComponent._prepare_metric(instance_usage_dict,
|
||||
agg_params)
|
||||
return metric
|
||||
|
@ -141,7 +151,12 @@ class InsertComponent(Component):
|
|||
"""write data to kafka. extracts and formats
|
||||
metric data and writes the data to kafka
|
||||
"""
|
||||
# add transform spec metric id to processing meta
|
||||
# retrieve the processing meta from the row
|
||||
processing_meta = row.processing_meta
|
||||
# add transform spec metric id to the processing meta
|
||||
if processing_meta:
|
||||
processing_meta["metric_id"] = metric_id
|
||||
else:
|
||||
processing_meta = {"metric_id": metric_id}
|
||||
|
||||
instance_usage_dict = {"tenant_id": row.tenant_id,
|
||||
|
|
|
@ -22,12 +22,26 @@ from monasca_transform.transform.transform_utils import InstanceUsageUtils
|
|||
import json
|
||||
|
||||
|
||||
class PreHourlyCalculateRateException(Exception):
|
||||
"""Exception thrown when doing pre-hourly rate calculations
|
||||
Attributes:
|
||||
value: string representing the error
|
||||
"""
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
|
||||
|
||||
class PreHourlyCalculateRate(SetterComponent):
|
||||
|
||||
@staticmethod
|
||||
def _calculate_rate(instance_usage_df):
|
||||
instance_usage_data_json_list = []
|
||||
|
||||
try:
|
||||
sorted_oldest_ascending_df = instance_usage_df.sort(
|
||||
functions.asc("processing_meta.oldest_timestamp_string"))
|
||||
|
||||
|
@ -37,14 +51,18 @@ class PreHourlyCalculateRate(SetterComponent):
|
|||
# Calculate the rate change by percentage
|
||||
oldest_dict = sorted_oldest_ascending_df.collect()[0].asDict()
|
||||
oldest_quantity = float(oldest_dict[
|
||||
"processing_meta"]['oldest_quantity'])
|
||||
"processing_meta"]["oldest_quantity"])
|
||||
|
||||
latest_dict = sorted_latest_descending_df.collect()[0].asDict()
|
||||
latest_quantity = float(latest_dict[
|
||||
"processing_meta"]['latest_quantity'])
|
||||
"processing_meta"]["latest_quantity"])
|
||||
|
||||
rate_percentage = 100 * (
|
||||
(latest_quantity - oldest_quantity) / oldest_quantity)
|
||||
except Exception as e:
|
||||
raise PreHourlyCalculateRateException(
|
||||
"Exception occurred in pre-hourly rate calculation. Error: %s"
|
||||
% str(e))
|
||||
|
||||
# create a new instance usage dict
|
||||
instance_usage_dict = {"tenant_id":
|
||||
|
|
|
@ -104,6 +104,11 @@ class RollupQuantity(SetterComponent):
|
|||
select_quant_str = "".join((setter_rollup_operation, "(quantity)"))
|
||||
quantity = getattr(row, select_quant_str, 0.0)
|
||||
|
||||
try:
|
||||
processing_meta = row.processing_meta
|
||||
except AttributeError:
|
||||
processing_meta = {}
|
||||
|
||||
# create a new instance usage dict
|
||||
instance_usage_dict = {"tenant_id": getattr(row, "tenant_id",
|
||||
"all"),
|
||||
|
@ -147,7 +152,8 @@ class RollupQuantity(SetterComponent):
|
|||
getattr(row, "usage_minute", "all"),
|
||||
"aggregation_period":
|
||||
getattr(row, "aggregation_period",
|
||||
"all")
|
||||
"all"),
|
||||
"processing_meta": processing_meta
|
||||
}
|
||||
|
||||
instance_usage_data_json = json.dumps(instance_usage_dict)
|
||||
|
|
|
@ -33,6 +33,11 @@ class SetAggregatedMetricName(SetterComponent):
|
|||
|
||||
agg_params = instance_usage_agg_params.agg_params
|
||||
|
||||
try:
|
||||
processing_meta = row.processing_meta
|
||||
except AttributeError:
|
||||
processing_meta = {}
|
||||
|
||||
instance_usage_dict = {"tenant_id": row.tenant_id,
|
||||
"user_id": row.user_id,
|
||||
"resource_uuid": row.resource_uuid,
|
||||
|
@ -58,7 +63,8 @@ class SetAggregatedMetricName(SetterComponent):
|
|||
"usage_date": row.usage_date,
|
||||
"usage_hour": row.usage_hour,
|
||||
"usage_minute": row.usage_minute,
|
||||
"aggregation_period": row.aggregation_period}
|
||||
"aggregation_period": row.aggregation_period,
|
||||
"processing_meta": processing_meta}
|
||||
|
||||
instance_usage_data_json = json.dumps(instance_usage_dict)
|
||||
|
||||
|
|
|
@ -33,6 +33,11 @@ class SetAggregatedPeriod(SetterComponent):
|
|||
|
||||
agg_params = instance_usage_agg_params.agg_params
|
||||
|
||||
try:
|
||||
processing_meta = row.processing_meta
|
||||
except AttributeError:
|
||||
processing_meta = {}
|
||||
|
||||
instance_usage_dict = {"tenant_id": row.tenant_id,
|
||||
"user_id": row.user_id,
|
||||
"resource_uuid": row.resource_uuid,
|
||||
|
@ -59,7 +64,8 @@ class SetAggregatedPeriod(SetterComponent):
|
|||
"usage_hour": row.usage_hour,
|
||||
"usage_minute": row.usage_minute,
|
||||
"aggregation_period":
|
||||
agg_params["aggregation_period"]}
|
||||
agg_params["aggregation_period"],
|
||||
"processing_meta": processing_meta}
|
||||
|
||||
instance_usage_data_json = json.dumps(instance_usage_dict)
|
||||
|
||||
|
|
|
@ -87,10 +87,10 @@ class CalculateRate(UsageComponent):
|
|||
|
||||
# Calculate the rate change by percentage
|
||||
oldest_dict = oldest_rolled_up_instance_usage_df.collect()[0].asDict()
|
||||
oldest_quantity = oldest_dict['quantity']
|
||||
oldest_quantity = float(oldest_dict['quantity'])
|
||||
|
||||
latest_dict = latest_rolled_up_instance_usage_df.collect()[0].asDict()
|
||||
latest_quantity = latest_dict['quantity']
|
||||
latest_quantity = float(latest_dict['quantity'])
|
||||
|
||||
rate_percentage = \
|
||||
((latest_quantity - oldest_quantity) / oldest_quantity) * 100
|
||||
|
|
|
@ -31,6 +31,10 @@ from monasca_common.simport import simport
|
|||
from oslo_config import cfg
|
||||
import time
|
||||
|
||||
from monasca_transform.component.usage.fetch_quantity import \
|
||||
FetchQuantityException
|
||||
from monasca_transform.component.usage.fetch_quantity_util import \
|
||||
FetchQuantityUtilException
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.transform.builder.generic_transform_builder \
|
||||
import GenericTransformBuilder
|
||||
|
@ -278,8 +282,17 @@ class MonMetricsKafkaProcessor(object):
|
|||
transform_context_info=transform_context,
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
|
||||
try:
|
||||
MonMetricsKafkaProcessor.process_metric(
|
||||
transform_context, source_record_store_df)
|
||||
except FetchQuantityException:
|
||||
raise
|
||||
except FetchQuantityUtilException:
|
||||
raise
|
||||
except Exception as e:
|
||||
MonMetricsKafkaProcessor.log_debug(
|
||||
"Exception raised in metric processing for metric: " +
|
||||
str(metric_id) + ". Error: " + str(e))
|
||||
|
||||
@staticmethod
|
||||
def rdd_to_recordstore(rdd_transform_context_rdd):
|
||||
|
|
|
@ -1187,6 +1187,25 @@ class SparkTest(SparkContextTest):
|
|||
.get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
self.assertEqual('3363.4285714285716',
|
||||
diskusage_rate_agg_metric.get('metric')
|
||||
.get('value_meta')
|
||||
.get('processing_meta').get('latest_quantity'))
|
||||
self.assertEqual('2016-06-10 20:27:01',
|
||||
diskusage_rate_agg_metric.get('metric')
|
||||
.get('value_meta')
|
||||
.get('processing_meta')
|
||||
.get('latest_timestamp_string'))
|
||||
self.assertEqual('2721.0',
|
||||
diskusage_rate_agg_metric.get('metric')
|
||||
.get('value_meta')
|
||||
.get('processing_meta').get('oldest_quantity'))
|
||||
self.assertEqual('2016-06-10 20:27:01',
|
||||
diskusage_rate_agg_metric.get('metric')
|
||||
.get('value_meta')
|
||||
.get('processing_meta')
|
||||
.get('oldest_timestamp_string'))
|
||||
|
||||
# Verify nova.vm.cpu.total_allocated_agg metrics
|
||||
nova_vm_cpu_total_alloc_agg_metric = [
|
||||
value for value in metrics
|
||||
|
|
|
@ -4,6 +4,6 @@
|
|||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":10283.1})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":10283.1})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":10283.1})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:27:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 4575.0, "latest_timestamp_string": "2016-06-10 20:27:02", "latest_quantity": 5291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:27:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":15.6502})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:37:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 5575.0, "latest_timestamp_string": "2016-06-10 20:37:02", "latest_quantity": 6291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:37:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":16.6502})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:47:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 6575.0, "latest_timestamp_string": "2016-06-10 20:47:02", "latest_quantity": 7291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:47:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":17.6502})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:27:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": "4575.0", "latest_timestamp_string": "2016-06-10 20:27:02", "latest_quantity": "5291.0", "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:27:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":15.6502})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:37:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": "5575.0", "latest_timestamp_string": "2016-06-10 20:37:02", "latest_quantity": "6291.0", "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:37:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":16.6502})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:47:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": "6575.0", "latest_timestamp_string": "2016-06-10 20:47:02", "latest_quantity": "7291.0", "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:47:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":17.6502})
|
||||
|
|
Loading…
Reference in New Issue