diff --git a/monasca_transform/component/setter/pre_hourly_calculate_rate.py b/monasca_transform/component/setter/pre_hourly_calculate_rate.py new file mode 100644 index 0000000..7fccda6 --- /dev/null +++ b/monasca_transform/component/setter/pre_hourly_calculate_rate.py @@ -0,0 +1,111 @@ +# (c) Copyright 2016 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from pyspark.sql import functions +from pyspark.sql import SQLContext + +from monasca_transform.component import Component +from monasca_transform.component.setter import SetterComponent +from monasca_transform.transform.transform_utils import InstanceUsageUtils + +import json + + +class PreHourlyCalculateRate(SetterComponent): + + @staticmethod + def _calculate_rate(instance_usage_df): + instance_usage_data_json_list = [] + + sorted_oldest_ascending_df = instance_usage_df.sort( + functions.asc("processing_meta.oldest_timestamp_string")) + + sorted_latest_descending_df = instance_usage_df.sort( + functions.desc("processing_meta.latest_timestamp_string")) + + # Calculate the rate change by percentage + oldest_dict = sorted_oldest_ascending_df.collect()[0].asDict() + oldest_quantity = float(oldest_dict[ + "processing_meta"]['oldest_quantity']) + + latest_dict = sorted_latest_descending_df.collect()[0].asDict() + latest_quantity = float(latest_dict[ + "processing_meta"]['latest_quantity']) + + rate_percentage = 100 * ( + (latest_quantity - oldest_quantity) / oldest_quantity) + + # create a new instance usage dict + instance_usage_dict = {"tenant_id": + latest_dict.get("tenant_id", "all"), + "user_id": + latest_dict.get("user_id", "all"), + "resource_uuid": + latest_dict.get("resource_uuid", "all"), + "geolocation": + latest_dict.get("geolocation", "all"), + "region": + latest_dict.get("region", "all"), + "zone": + latest_dict.get("zone", "all"), + "host": + latest_dict.get("host", "all"), + "project_id": + latest_dict.get("project_id", "all"), + "aggregated_metric_name": + latest_dict["aggregated_metric_name"], + "quantity": rate_percentage, + "firstrecord_timestamp_unix": + oldest_dict["firstrecord_timestamp_unix"], + "firstrecord_timestamp_string": + oldest_dict["firstrecord_timestamp_string"], + "lastrecord_timestamp_unix": + latest_dict["lastrecord_timestamp_unix"], + "lastrecord_timestamp_string": + latest_dict["lastrecord_timestamp_string"], + "record_count": oldest_dict["record_count"] + + latest_dict["record_count"], + "service_group": + latest_dict.get("service_group", + Component. + DEFAULT_UNAVAILABLE_VALUE), + "service_id": + latest_dict.get("service_id", + Component. + DEFAULT_UNAVAILABLE_VALUE), + "usage_date": latest_dict["usage_date"], + "usage_hour": latest_dict["usage_hour"], + "usage_minute": latest_dict["usage_minute"], + "aggregation_period": + latest_dict["aggregation_period"] + } + + instance_usage_data_json = json.dumps(instance_usage_dict) + instance_usage_data_json_list.append(instance_usage_data_json) + + # convert to rdd + spark_context = instance_usage_df.rdd.context + return spark_context.parallelize(instance_usage_data_json_list) + + @staticmethod + def do_rate_calculation(instance_usage_df): + instance_usage_json_rdd = PreHourlyCalculateRate._calculate_rate( + instance_usage_df) + + sql_context = SQLContext.getOrCreate(instance_usage_df.rdd.context) + instance_usage_trans_df = InstanceUsageUtils.create_df_from_json_rdd( + sql_context, + instance_usage_json_rdd) + + return instance_usage_trans_df diff --git a/monasca_transform/component/usage/calculate_rate.py b/monasca_transform/component/usage/calculate_rate.py index c771515..f14c660 100644 --- a/monasca_transform/component/usage/calculate_rate.py +++ b/monasca_transform/component/usage/calculate_rate.py @@ -141,7 +141,16 @@ class CalculateRate(UsageComponent): {"event_type": latest_dict.get("event_type", Component. - DEFAULT_UNAVAILABLE_VALUE)} + DEFAULT_UNAVAILABLE_VALUE), + "oldest_timestamp_string": + oldest_dict[ + "firstrecord_timestamp_string"], + "oldest_quantity": oldest_quantity, + "latest_timestamp_string": + latest_dict[ + "lastrecord_timestamp_string"], + "latest_quantity": latest_quantity + } } instance_usage_data_json = json.dumps(instance_usage_dict) diff --git a/monasca_transform/data_driven_specs/transform_specs/transform_specs.json b/monasca_transform/data_driven_specs/transform_specs/transform_specs.json index 1110f3e..0d3d918 100644 --- a/monasca_transform/data_driven_specs/transform_specs/transform_specs.json +++ b/monasca_transform/data_driven_specs/transform_specs/transform_specs.json @@ -19,7 +19,7 @@ {"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.cpu.utilization_perc_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_cpu_util_perc_project","metric_id":"vm_cpu_util_perc_project"} {"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"nova.vm.cpu.total_allocated_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"nova_vm_cpu_total_all","metric_id":"nova_vm_cpu_total_all"} {"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":[],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_all"} -{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_host"} -{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"filter_by_list": [],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"} +{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_host","metric_id":"swift_usage_host"} +{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"filter_by_list": [],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"rate","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"} {"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":[],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_all"} -{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_host"} +{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_host","metric_id":"swift_avail_host"} diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index fc014a9..e9fa3c8 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -23,8 +23,9 @@ import logging from monasca_common.simport import simport from oslo_config import cfg - from monasca_transform.component.insert.kafka_insert import KafkaInsert +from monasca_transform.component.setter.pre_hourly_calculate_rate import \ + PreHourlyCalculateRate from monasca_transform.component.setter.rollup_quantity import RollupQuantity from monasca_transform.data_driven_specs.data_driven_specs_repo \ import DataDrivenSpecsRepo @@ -166,14 +167,12 @@ class PreHourlyProcessor(Processor): # A+Guide+To+The+Kafka+Protocol# # AGuideToTheKafkaProtocol-OffsetRequest GET_LATEST_OFFSETS = -1 - latest_dict = PreHourlyProcessor.\ - _get_offsets_from_kafka(brokers, topic, - GET_LATEST_OFFSETS) + latest_dict = PreHourlyProcessor._get_offsets_from_kafka( + brokers, topic, GET_LATEST_OFFSETS) GET_EARLIEST_OFFSETS = -2 - earliest_dict = PreHourlyProcessor.\ - _get_offsets_from_kafka(brokers, topic, - GET_EARLIEST_OFFSETS) + earliest_dict = PreHourlyProcessor._get_offsets_from_kafka( + brokers, topic, GET_EARLIEST_OFFSETS) for item in latest_dict: until_offset = latest_dict[item].offsets[0] @@ -200,17 +199,15 @@ class PreHourlyProcessor(Processor): # A+Guide+To+The+Kafka+Protocol# # AGuideToTheKafkaProtocol-OffsetRequest GET_LATEST_OFFSETS = -1 - latest_dict = PreHourlyProcessor.\ - _get_offsets_from_kafka(brokers, topic, - GET_LATEST_OFFSETS) + latest_dict = PreHourlyProcessor._get_offsets_from_kafka( + brokers, topic, GET_LATEST_OFFSETS) GET_EARLIEST_OFFSETS = -2 - earliest_dict = PreHourlyProcessor.\ - _get_offsets_from_kafka(brokers, topic, - GET_EARLIEST_OFFSETS) + earliest_dict = PreHourlyProcessor._get_offsets_from_kafka( + brokers, topic, GET_EARLIEST_OFFSETS) - saved_dict = PreHourlyProcessor.\ - _parse_saved_offsets(app_name, topic, saved_offset_spec) + saved_dict = PreHourlyProcessor._parse_saved_offsets( + app_name, topic, saved_offset_spec) for item in latest_dict: # saved spec @@ -305,10 +302,8 @@ class PreHourlyProcessor(Processor): # convert usage data rdd to instance usage df # sqlc = SQLContext.getOrCreate(pre_hourly_rdd.context) - instance_usage_df = \ - InstanceUsageUtils.create_df_from_json_rdd( - sqlc, - instance_usage_rdd) + instance_usage_df = InstanceUsageUtils.create_df_from_json_rdd( + sqlc, instance_usage_rdd) return instance_usage_df @@ -323,9 +318,9 @@ class PreHourlyProcessor(Processor): # # do a rollup operation # - agg_params = transform_spec_df.select( - "aggregation_params_map.pre_hourly_group_by_list")\ - .collect()[0].asDict() + agg_params = (transform_spec_df.select( + "aggregation_params_map.pre_hourly_group_by_list") + .collect()[0].asDict()) pre_hourly_group_by_list = agg_params["pre_hourly_group_by_list"] if (len(pre_hourly_group_by_list) == 1 and @@ -343,16 +338,19 @@ class PreHourlyProcessor(Processor): aggregation_period = agg_params["aggregation_period"] # get 2stage operation - agg_params = transform_spec_df.select( - "aggregation_params_map.pre_hourly_operation")\ - .collect()[0].asDict() + agg_params = (transform_spec_df.select( + "aggregation_params_map.pre_hourly_operation") + .collect()[0].asDict()) pre_hourly_operation = agg_params["pre_hourly_operation"] - instance_usage_df = \ - RollupQuantity.do_rollup(pre_hourly_group_by_list, - aggregation_period, - pre_hourly_operation, - instance_usage_df) + if pre_hourly_operation != "rate": + instance_usage_df = RollupQuantity.do_rollup( + pre_hourly_group_by_list, aggregation_period, + pre_hourly_operation, instance_usage_df) + else: + instance_usage_df = PreHourlyCalculateRate.do_rate_calculation( + instance_usage_df) + # insert metrics instance_usage_df = KafkaInsert.insert(transform_context, instance_usage_df) @@ -367,11 +365,12 @@ class PreHourlyProcessor(Processor): # metric_ids_df = instance_usage_df.select( "processing_meta.metric_id").distinct() + metric_ids_to_process = [row.metric_id for row in metric_ids_df.collect()] - data_driven_specs_repo = DataDrivenSpecsRepoFactory.\ - get_data_driven_specs_repo() + data_driven_specs_repo = ( + DataDrivenSpecsRepoFactory.get_data_driven_specs_repo()) sqlc = SQLContext.getOrCreate(instance_usage_df.rdd.context) transform_specs_df = data_driven_specs_repo.get_data_driven_specs( sql_context=sqlc, @@ -385,9 +384,8 @@ class PreHourlyProcessor(Processor): instance_usage_df.processing_meta.metric_id == metric_id) # set transform_spec_df in TransformContext - transform_context = \ - TransformContextUtils.get_context( - transform_spec_df_info=transform_spec_df) + transform_context = TransformContextUtils.get_context( + transform_spec_df_info=transform_spec_df) PreHourlyProcessor.process_instance_usage( transform_context, source_instance_usage_df) @@ -399,9 +397,9 @@ class PreHourlyProcessor(Processor): offsets available """ - offset_range_list = \ + offset_range_list = ( PreHourlyProcessor.get_processing_offset_range_list( - processing_time) + processing_time)) # get pre hourly data pre_hourly_rdd = PreHourlyProcessor.fetch_pre_hourly_data( @@ -415,17 +413,17 @@ class PreHourlyProcessor(Processor): # cache instance usage df # if cfg.CONF.pre_hourly_processor.enable_instance_usage_df_cache: - storage_level_prop = \ - cfg.CONF.pre_hourly_processor\ - .instance_usage_df_cache_storage_level + storage_level_prop = ( + cfg.CONF.pre_hourly_processor + .instance_usage_df_cache_storage_level) try: storage_level = StorageUtils.get_storage_level( storage_level_prop) except InvalidCacheStorageLevelException as storage_error: - storage_error.value += \ - " (as specified in " \ - "pre_hourly_processor.instance_usage_df" \ - "_cache_storage_level)" + storage_error.value += (" (as specified in " + "pre_hourly_processor" + ".instance_usage_df_cache" + "_storage_level)") raise instance_usage_df.persist(storage_level) diff --git a/tests/unit/processor/test_pre_hourly_processor_agg.py b/tests/unit/processor/test_pre_hourly_processor_agg.py index b380e5b..0639747 100644 --- a/tests/unit/processor/test_pre_hourly_processor_agg.py +++ b/tests/unit/processor/test_pre_hourly_processor_agg.py @@ -14,6 +14,7 @@ import mock import unittest +from oslo_config import cfg from pyspark.streaming.kafka import OffsetRange from monasca_transform.config.config_initializer import ConfigInitializer @@ -73,7 +74,7 @@ class TestPreHourlyProcessorAgg(SparkContextTest): metrics = DummyAdapter.adapter_impl.metric_list # Verify count of instance usage data - self.assertEqual(result, 6) + self.assertEqual(result, 9) # check aggregation result mem_total_mb_agg_metric = [ @@ -128,6 +129,44 @@ class TestPreHourlyProcessorAgg(SparkContextTest): .get("metric") .get('value_meta').get('record_count')) + # check aggregation result for swiftlm.diskusage.rate_agg + swift_disk_rate_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'swiftlm.diskusage.rate_agg'][0] + self.assertTrue(swift_disk_rate_agg_metric is not None) + self.assertEqual(59.36612021857923, + swift_disk_rate_agg_metric + .get('metric').get('value')) + self.assertEqual('2016-06-10 20:27:02', + swift_disk_rate_agg_metric + .get('metric') + .get('value_meta') + .get('lastrecord_timestamp_string')) + self.assertEqual('2016-06-10 20:27:01', + swift_disk_rate_agg_metric + .get('metric') + .get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual(68.0, + swift_disk_rate_agg_metric + .get('metric') + .get('value_meta').get('record_count')) + self.assertEqual('useast', + swift_disk_rate_agg_metric.get('meta').get('region')) + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + swift_disk_rate_agg_metric.get('meta') + .get('tenantId')) + self.assertEqual('all', + swift_disk_rate_agg_metric.get('metric') + .get('dimensions').get('host')) + self.assertEqual('all', + swift_disk_rate_agg_metric.get('metric') + .get('dimensions').get('project_id')) + self.assertEqual('hourly', + swift_disk_rate_agg_metric.get('metric') + .get('dimensions').get('aggregation_period')) + def simple_count_transform(self, rdd): return rdd.count() diff --git a/tests/unit/test_resources/metrics_pre_hourly_data/metrics_pre_hourly_data.txt b/tests/unit/test_resources/metrics_pre_hourly_data/metrics_pre_hourly_data.txt index ef7d82f..5d912ef 100644 --- a/tests/unit/test_resources/metrics_pre_hourly_data/metrics_pre_hourly_data.txt +++ b/tests/unit/test_resources/metrics_pre_hourly_data/metrics_pre_hourly_data.txt @@ -4,3 +4,6 @@ ('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":10283.1}) ('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":10283.1}) ('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":10283.1}) +('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:27:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 4575.0, "latest_timestamp_string": "2016-06-10 20:27:02", "latest_quantity": 5291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:27:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":15.6502}) +('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:37:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 5575.0, "latest_timestamp_string": "2016-06-10 20:37:02", "latest_quantity": 6291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:37:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":16.6502}) +('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:47:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 6575.0, "latest_timestamp_string": "2016-06-10 20:47:02", "latest_quantity": 7291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:47:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":17.6502}) \ No newline at end of file