Modifications to make rate calculations work with two-stage

aggregation.

Change-Id: I8c7b6112a04ba378ba1911a342cb97e8c388ebc6
This commit is contained in:
Flint Calvin 2016-08-08 22:05:40 +00:00
parent a3f7d0e845
commit 615e52d5cd
6 changed files with 209 additions and 49 deletions

View File

@ -0,0 +1,111 @@
# (c) Copyright 2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import functions
from pyspark.sql import SQLContext
from monasca_transform.component import Component
from monasca_transform.component.setter import SetterComponent
from monasca_transform.transform.transform_utils import InstanceUsageUtils
import json
class PreHourlyCalculateRate(SetterComponent):
@staticmethod
def _calculate_rate(instance_usage_df):
instance_usage_data_json_list = []
sorted_oldest_ascending_df = instance_usage_df.sort(
functions.asc("processing_meta.oldest_timestamp_string"))
sorted_latest_descending_df = instance_usage_df.sort(
functions.desc("processing_meta.latest_timestamp_string"))
# Calculate the rate change by percentage
oldest_dict = sorted_oldest_ascending_df.collect()[0].asDict()
oldest_quantity = float(oldest_dict[
"processing_meta"]['oldest_quantity'])
latest_dict = sorted_latest_descending_df.collect()[0].asDict()
latest_quantity = float(latest_dict[
"processing_meta"]['latest_quantity'])
rate_percentage = 100 * (
(latest_quantity - oldest_quantity) / oldest_quantity)
# create a new instance usage dict
instance_usage_dict = {"tenant_id":
latest_dict.get("tenant_id", "all"),
"user_id":
latest_dict.get("user_id", "all"),
"resource_uuid":
latest_dict.get("resource_uuid", "all"),
"geolocation":
latest_dict.get("geolocation", "all"),
"region":
latest_dict.get("region", "all"),
"zone":
latest_dict.get("zone", "all"),
"host":
latest_dict.get("host", "all"),
"project_id":
latest_dict.get("project_id", "all"),
"aggregated_metric_name":
latest_dict["aggregated_metric_name"],
"quantity": rate_percentage,
"firstrecord_timestamp_unix":
oldest_dict["firstrecord_timestamp_unix"],
"firstrecord_timestamp_string":
oldest_dict["firstrecord_timestamp_string"],
"lastrecord_timestamp_unix":
latest_dict["lastrecord_timestamp_unix"],
"lastrecord_timestamp_string":
latest_dict["lastrecord_timestamp_string"],
"record_count": oldest_dict["record_count"] +
latest_dict["record_count"],
"service_group":
latest_dict.get("service_group",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"service_id":
latest_dict.get("service_id",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"usage_date": latest_dict["usage_date"],
"usage_hour": latest_dict["usage_hour"],
"usage_minute": latest_dict["usage_minute"],
"aggregation_period":
latest_dict["aggregation_period"]
}
instance_usage_data_json = json.dumps(instance_usage_dict)
instance_usage_data_json_list.append(instance_usage_data_json)
# convert to rdd
spark_context = instance_usage_df.rdd.context
return spark_context.parallelize(instance_usage_data_json_list)
@staticmethod
def do_rate_calculation(instance_usage_df):
instance_usage_json_rdd = PreHourlyCalculateRate._calculate_rate(
instance_usage_df)
sql_context = SQLContext.getOrCreate(instance_usage_df.rdd.context)
instance_usage_trans_df = InstanceUsageUtils.create_df_from_json_rdd(
sql_context,
instance_usage_json_rdd)
return instance_usage_trans_df

View File

@ -141,7 +141,16 @@ class CalculateRate(UsageComponent):
{"event_type":
latest_dict.get("event_type",
Component.
DEFAULT_UNAVAILABLE_VALUE)}
DEFAULT_UNAVAILABLE_VALUE),
"oldest_timestamp_string":
oldest_dict[
"firstrecord_timestamp_string"],
"oldest_quantity": oldest_quantity,
"latest_timestamp_string":
latest_dict[
"lastrecord_timestamp_string"],
"latest_quantity": latest_quantity
}
}
instance_usage_data_json = json.dumps(instance_usage_dict)

View File

@ -19,7 +19,7 @@
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.cpu.utilization_perc_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_cpu_util_perc_project","metric_id":"vm_cpu_util_perc_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"nova.vm.cpu.total_allocated_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"nova_vm_cpu_total_all","metric_id":"nova_vm_cpu_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":[],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"filter_by_list": [],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_host","metric_id":"swift_usage_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"filter_by_list": [],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"rate","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":[],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_host","metric_id":"swift_avail_host"}

View File

@ -23,8 +23,9 @@ import logging
from monasca_common.simport import simport
from oslo_config import cfg
from monasca_transform.component.insert.kafka_insert import KafkaInsert
from monasca_transform.component.setter.pre_hourly_calculate_rate import \
PreHourlyCalculateRate
from monasca_transform.component.setter.rollup_quantity import RollupQuantity
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepo
@ -166,14 +167,12 @@ class PreHourlyProcessor(Processor):
# A+Guide+To+The+Kafka+Protocol#
# AGuideToTheKafkaProtocol-OffsetRequest
GET_LATEST_OFFSETS = -1
latest_dict = PreHourlyProcessor.\
_get_offsets_from_kafka(brokers, topic,
GET_LATEST_OFFSETS)
latest_dict = PreHourlyProcessor._get_offsets_from_kafka(
brokers, topic, GET_LATEST_OFFSETS)
GET_EARLIEST_OFFSETS = -2
earliest_dict = PreHourlyProcessor.\
_get_offsets_from_kafka(brokers, topic,
GET_EARLIEST_OFFSETS)
earliest_dict = PreHourlyProcessor._get_offsets_from_kafka(
brokers, topic, GET_EARLIEST_OFFSETS)
for item in latest_dict:
until_offset = latest_dict[item].offsets[0]
@ -200,17 +199,15 @@ class PreHourlyProcessor(Processor):
# A+Guide+To+The+Kafka+Protocol#
# AGuideToTheKafkaProtocol-OffsetRequest
GET_LATEST_OFFSETS = -1
latest_dict = PreHourlyProcessor.\
_get_offsets_from_kafka(brokers, topic,
GET_LATEST_OFFSETS)
latest_dict = PreHourlyProcessor._get_offsets_from_kafka(
brokers, topic, GET_LATEST_OFFSETS)
GET_EARLIEST_OFFSETS = -2
earliest_dict = PreHourlyProcessor.\
_get_offsets_from_kafka(brokers, topic,
GET_EARLIEST_OFFSETS)
earliest_dict = PreHourlyProcessor._get_offsets_from_kafka(
brokers, topic, GET_EARLIEST_OFFSETS)
saved_dict = PreHourlyProcessor.\
_parse_saved_offsets(app_name, topic, saved_offset_spec)
saved_dict = PreHourlyProcessor._parse_saved_offsets(
app_name, topic, saved_offset_spec)
for item in latest_dict:
# saved spec
@ -305,10 +302,8 @@ class PreHourlyProcessor(Processor):
# convert usage data rdd to instance usage df
#
sqlc = SQLContext.getOrCreate(pre_hourly_rdd.context)
instance_usage_df = \
InstanceUsageUtils.create_df_from_json_rdd(
sqlc,
instance_usage_rdd)
instance_usage_df = InstanceUsageUtils.create_df_from_json_rdd(
sqlc, instance_usage_rdd)
return instance_usage_df
@ -323,9 +318,9 @@ class PreHourlyProcessor(Processor):
#
# do a rollup operation
#
agg_params = transform_spec_df.select(
"aggregation_params_map.pre_hourly_group_by_list")\
.collect()[0].asDict()
agg_params = (transform_spec_df.select(
"aggregation_params_map.pre_hourly_group_by_list")
.collect()[0].asDict())
pre_hourly_group_by_list = agg_params["pre_hourly_group_by_list"]
if (len(pre_hourly_group_by_list) == 1 and
@ -343,16 +338,19 @@ class PreHourlyProcessor(Processor):
aggregation_period = agg_params["aggregation_period"]
# get 2stage operation
agg_params = transform_spec_df.select(
"aggregation_params_map.pre_hourly_operation")\
.collect()[0].asDict()
agg_params = (transform_spec_df.select(
"aggregation_params_map.pre_hourly_operation")
.collect()[0].asDict())
pre_hourly_operation = agg_params["pre_hourly_operation"]
instance_usage_df = \
RollupQuantity.do_rollup(pre_hourly_group_by_list,
aggregation_period,
pre_hourly_operation,
instance_usage_df)
if pre_hourly_operation != "rate":
instance_usage_df = RollupQuantity.do_rollup(
pre_hourly_group_by_list, aggregation_period,
pre_hourly_operation, instance_usage_df)
else:
instance_usage_df = PreHourlyCalculateRate.do_rate_calculation(
instance_usage_df)
# insert metrics
instance_usage_df = KafkaInsert.insert(transform_context,
instance_usage_df)
@ -367,11 +365,12 @@ class PreHourlyProcessor(Processor):
#
metric_ids_df = instance_usage_df.select(
"processing_meta.metric_id").distinct()
metric_ids_to_process = [row.metric_id
for row in metric_ids_df.collect()]
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
get_data_driven_specs_repo()
data_driven_specs_repo = (
DataDrivenSpecsRepoFactory.get_data_driven_specs_repo())
sqlc = SQLContext.getOrCreate(instance_usage_df.rdd.context)
transform_specs_df = data_driven_specs_repo.get_data_driven_specs(
sql_context=sqlc,
@ -385,9 +384,8 @@ class PreHourlyProcessor(Processor):
instance_usage_df.processing_meta.metric_id == metric_id)
# set transform_spec_df in TransformContext
transform_context = \
TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
transform_context = TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
PreHourlyProcessor.process_instance_usage(
transform_context, source_instance_usage_df)
@ -399,9 +397,9 @@ class PreHourlyProcessor(Processor):
offsets available
"""
offset_range_list = \
offset_range_list = (
PreHourlyProcessor.get_processing_offset_range_list(
processing_time)
processing_time))
# get pre hourly data
pre_hourly_rdd = PreHourlyProcessor.fetch_pre_hourly_data(
@ -415,17 +413,17 @@ class PreHourlyProcessor(Processor):
# cache instance usage df
#
if cfg.CONF.pre_hourly_processor.enable_instance_usage_df_cache:
storage_level_prop = \
cfg.CONF.pre_hourly_processor\
.instance_usage_df_cache_storage_level
storage_level_prop = (
cfg.CONF.pre_hourly_processor
.instance_usage_df_cache_storage_level)
try:
storage_level = StorageUtils.get_storage_level(
storage_level_prop)
except InvalidCacheStorageLevelException as storage_error:
storage_error.value += \
" (as specified in " \
"pre_hourly_processor.instance_usage_df" \
"_cache_storage_level)"
storage_error.value += (" (as specified in "
"pre_hourly_processor"
".instance_usage_df_cache"
"_storage_level)")
raise
instance_usage_df.persist(storage_level)

View File

@ -14,6 +14,7 @@
import mock
import unittest
from oslo_config import cfg
from pyspark.streaming.kafka import OffsetRange
from monasca_transform.config.config_initializer import ConfigInitializer
@ -73,7 +74,7 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
metrics = DummyAdapter.adapter_impl.metric_list
# Verify count of instance usage data
self.assertEqual(result, 6)
self.assertEqual(result, 9)
# check aggregation result
mem_total_mb_agg_metric = [
@ -128,6 +129,44 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
.get("metric")
.get('value_meta').get('record_count'))
# check aggregation result for swiftlm.diskusage.rate_agg
swift_disk_rate_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'swiftlm.diskusage.rate_agg'][0]
self.assertTrue(swift_disk_rate_agg_metric is not None)
self.assertEqual(59.36612021857923,
swift_disk_rate_agg_metric
.get('metric').get('value'))
self.assertEqual('2016-06-10 20:27:02',
swift_disk_rate_agg_metric
.get('metric')
.get('value_meta')
.get('lastrecord_timestamp_string'))
self.assertEqual('2016-06-10 20:27:01',
swift_disk_rate_agg_metric
.get('metric')
.get('value_meta')
.get('firstrecord_timestamp_string'))
self.assertEqual(68.0,
swift_disk_rate_agg_metric
.get('metric')
.get('value_meta').get('record_count'))
self.assertEqual('useast',
swift_disk_rate_agg_metric.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
swift_disk_rate_agg_metric.get('meta')
.get('tenantId'))
self.assertEqual('all',
swift_disk_rate_agg_metric.get('metric')
.get('dimensions').get('host'))
self.assertEqual('all',
swift_disk_rate_agg_metric.get('metric')
.get('dimensions').get('project_id'))
self.assertEqual('hourly',
swift_disk_rate_agg_metric.get('metric')
.get('dimensions').get('aggregation_period'))
def simple_count_transform(self, rdd):
return rdd.count()

View File

@ -4,3 +4,6 @@
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":10283.1})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":10283.1})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":10283.1})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:27:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 4575.0, "latest_timestamp_string": "2016-06-10 20:27:02", "latest_quantity": 5291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:27:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":15.6502})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:37:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 5575.0, "latest_timestamp_string": "2016-06-10 20:37:02", "latest_quantity": 6291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:37:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":16.6502})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:47:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 6575.0, "latest_timestamp_string": "2016-06-10 20:47:02", "latest_quantity": 7291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:47:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":17.6502})