Modifications to make rate calculations work with two-stage
aggregation. Change-Id: I8c7b6112a04ba378ba1911a342cb97e8c388ebc6
This commit is contained in:
parent
a3f7d0e845
commit
615e52d5cd
|
@ -0,0 +1,111 @@
|
|||
# (c) Copyright 2016 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pyspark.sql import functions
|
||||
from pyspark.sql import SQLContext
|
||||
|
||||
from monasca_transform.component import Component
|
||||
from monasca_transform.component.setter import SetterComponent
|
||||
from monasca_transform.transform.transform_utils import InstanceUsageUtils
|
||||
|
||||
import json
|
||||
|
||||
|
||||
class PreHourlyCalculateRate(SetterComponent):
|
||||
|
||||
@staticmethod
|
||||
def _calculate_rate(instance_usage_df):
|
||||
instance_usage_data_json_list = []
|
||||
|
||||
sorted_oldest_ascending_df = instance_usage_df.sort(
|
||||
functions.asc("processing_meta.oldest_timestamp_string"))
|
||||
|
||||
sorted_latest_descending_df = instance_usage_df.sort(
|
||||
functions.desc("processing_meta.latest_timestamp_string"))
|
||||
|
||||
# Calculate the rate change by percentage
|
||||
oldest_dict = sorted_oldest_ascending_df.collect()[0].asDict()
|
||||
oldest_quantity = float(oldest_dict[
|
||||
"processing_meta"]['oldest_quantity'])
|
||||
|
||||
latest_dict = sorted_latest_descending_df.collect()[0].asDict()
|
||||
latest_quantity = float(latest_dict[
|
||||
"processing_meta"]['latest_quantity'])
|
||||
|
||||
rate_percentage = 100 * (
|
||||
(latest_quantity - oldest_quantity) / oldest_quantity)
|
||||
|
||||
# create a new instance usage dict
|
||||
instance_usage_dict = {"tenant_id":
|
||||
latest_dict.get("tenant_id", "all"),
|
||||
"user_id":
|
||||
latest_dict.get("user_id", "all"),
|
||||
"resource_uuid":
|
||||
latest_dict.get("resource_uuid", "all"),
|
||||
"geolocation":
|
||||
latest_dict.get("geolocation", "all"),
|
||||
"region":
|
||||
latest_dict.get("region", "all"),
|
||||
"zone":
|
||||
latest_dict.get("zone", "all"),
|
||||
"host":
|
||||
latest_dict.get("host", "all"),
|
||||
"project_id":
|
||||
latest_dict.get("project_id", "all"),
|
||||
"aggregated_metric_name":
|
||||
latest_dict["aggregated_metric_name"],
|
||||
"quantity": rate_percentage,
|
||||
"firstrecord_timestamp_unix":
|
||||
oldest_dict["firstrecord_timestamp_unix"],
|
||||
"firstrecord_timestamp_string":
|
||||
oldest_dict["firstrecord_timestamp_string"],
|
||||
"lastrecord_timestamp_unix":
|
||||
latest_dict["lastrecord_timestamp_unix"],
|
||||
"lastrecord_timestamp_string":
|
||||
latest_dict["lastrecord_timestamp_string"],
|
||||
"record_count": oldest_dict["record_count"] +
|
||||
latest_dict["record_count"],
|
||||
"service_group":
|
||||
latest_dict.get("service_group",
|
||||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"service_id":
|
||||
latest_dict.get("service_id",
|
||||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"usage_date": latest_dict["usage_date"],
|
||||
"usage_hour": latest_dict["usage_hour"],
|
||||
"usage_minute": latest_dict["usage_minute"],
|
||||
"aggregation_period":
|
||||
latest_dict["aggregation_period"]
|
||||
}
|
||||
|
||||
instance_usage_data_json = json.dumps(instance_usage_dict)
|
||||
instance_usage_data_json_list.append(instance_usage_data_json)
|
||||
|
||||
# convert to rdd
|
||||
spark_context = instance_usage_df.rdd.context
|
||||
return spark_context.parallelize(instance_usage_data_json_list)
|
||||
|
||||
@staticmethod
|
||||
def do_rate_calculation(instance_usage_df):
|
||||
instance_usage_json_rdd = PreHourlyCalculateRate._calculate_rate(
|
||||
instance_usage_df)
|
||||
|
||||
sql_context = SQLContext.getOrCreate(instance_usage_df.rdd.context)
|
||||
instance_usage_trans_df = InstanceUsageUtils.create_df_from_json_rdd(
|
||||
sql_context,
|
||||
instance_usage_json_rdd)
|
||||
|
||||
return instance_usage_trans_df
|
|
@ -141,7 +141,16 @@ class CalculateRate(UsageComponent):
|
|||
{"event_type":
|
||||
latest_dict.get("event_type",
|
||||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE)}
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"oldest_timestamp_string":
|
||||
oldest_dict[
|
||||
"firstrecord_timestamp_string"],
|
||||
"oldest_quantity": oldest_quantity,
|
||||
"latest_timestamp_string":
|
||||
latest_dict[
|
||||
"lastrecord_timestamp_string"],
|
||||
"latest_quantity": latest_quantity
|
||||
}
|
||||
}
|
||||
|
||||
instance_usage_data_json = json.dumps(instance_usage_dict)
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.cpu.utilization_perc_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_cpu_util_perc_project","metric_id":"vm_cpu_util_perc_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"nova.vm.cpu.total_allocated_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"nova_vm_cpu_total_all","metric_id":"nova_vm_cpu_total_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":[],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"filter_by_list": [],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_host","metric_id":"swift_usage_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"filter_by_list": [],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"rate","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":[],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount"],"usage_fetch_operation": "avg","filter_by_list": [],"setter_rollup_group_by_list":["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_host","metric_id":"swift_avail_host"}
|
||||
|
|
|
@ -23,8 +23,9 @@ import logging
|
|||
from monasca_common.simport import simport
|
||||
from oslo_config import cfg
|
||||
|
||||
|
||||
from monasca_transform.component.insert.kafka_insert import KafkaInsert
|
||||
from monasca_transform.component.setter.pre_hourly_calculate_rate import \
|
||||
PreHourlyCalculateRate
|
||||
from monasca_transform.component.setter.rollup_quantity import RollupQuantity
|
||||
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
||||
import DataDrivenSpecsRepo
|
||||
|
@ -166,14 +167,12 @@ class PreHourlyProcessor(Processor):
|
|||
# A+Guide+To+The+Kafka+Protocol#
|
||||
# AGuideToTheKafkaProtocol-OffsetRequest
|
||||
GET_LATEST_OFFSETS = -1
|
||||
latest_dict = PreHourlyProcessor.\
|
||||
_get_offsets_from_kafka(brokers, topic,
|
||||
GET_LATEST_OFFSETS)
|
||||
latest_dict = PreHourlyProcessor._get_offsets_from_kafka(
|
||||
brokers, topic, GET_LATEST_OFFSETS)
|
||||
|
||||
GET_EARLIEST_OFFSETS = -2
|
||||
earliest_dict = PreHourlyProcessor.\
|
||||
_get_offsets_from_kafka(brokers, topic,
|
||||
GET_EARLIEST_OFFSETS)
|
||||
earliest_dict = PreHourlyProcessor._get_offsets_from_kafka(
|
||||
brokers, topic, GET_EARLIEST_OFFSETS)
|
||||
|
||||
for item in latest_dict:
|
||||
until_offset = latest_dict[item].offsets[0]
|
||||
|
@ -200,17 +199,15 @@ class PreHourlyProcessor(Processor):
|
|||
# A+Guide+To+The+Kafka+Protocol#
|
||||
# AGuideToTheKafkaProtocol-OffsetRequest
|
||||
GET_LATEST_OFFSETS = -1
|
||||
latest_dict = PreHourlyProcessor.\
|
||||
_get_offsets_from_kafka(brokers, topic,
|
||||
GET_LATEST_OFFSETS)
|
||||
latest_dict = PreHourlyProcessor._get_offsets_from_kafka(
|
||||
brokers, topic, GET_LATEST_OFFSETS)
|
||||
|
||||
GET_EARLIEST_OFFSETS = -2
|
||||
earliest_dict = PreHourlyProcessor.\
|
||||
_get_offsets_from_kafka(brokers, topic,
|
||||
GET_EARLIEST_OFFSETS)
|
||||
earliest_dict = PreHourlyProcessor._get_offsets_from_kafka(
|
||||
brokers, topic, GET_EARLIEST_OFFSETS)
|
||||
|
||||
saved_dict = PreHourlyProcessor.\
|
||||
_parse_saved_offsets(app_name, topic, saved_offset_spec)
|
||||
saved_dict = PreHourlyProcessor._parse_saved_offsets(
|
||||
app_name, topic, saved_offset_spec)
|
||||
|
||||
for item in latest_dict:
|
||||
# saved spec
|
||||
|
@ -305,10 +302,8 @@ class PreHourlyProcessor(Processor):
|
|||
# convert usage data rdd to instance usage df
|
||||
#
|
||||
sqlc = SQLContext.getOrCreate(pre_hourly_rdd.context)
|
||||
instance_usage_df = \
|
||||
InstanceUsageUtils.create_df_from_json_rdd(
|
||||
sqlc,
|
||||
instance_usage_rdd)
|
||||
instance_usage_df = InstanceUsageUtils.create_df_from_json_rdd(
|
||||
sqlc, instance_usage_rdd)
|
||||
|
||||
return instance_usage_df
|
||||
|
||||
|
@ -323,9 +318,9 @@ class PreHourlyProcessor(Processor):
|
|||
#
|
||||
# do a rollup operation
|
||||
#
|
||||
agg_params = transform_spec_df.select(
|
||||
"aggregation_params_map.pre_hourly_group_by_list")\
|
||||
.collect()[0].asDict()
|
||||
agg_params = (transform_spec_df.select(
|
||||
"aggregation_params_map.pre_hourly_group_by_list")
|
||||
.collect()[0].asDict())
|
||||
pre_hourly_group_by_list = agg_params["pre_hourly_group_by_list"]
|
||||
|
||||
if (len(pre_hourly_group_by_list) == 1 and
|
||||
|
@ -343,16 +338,19 @@ class PreHourlyProcessor(Processor):
|
|||
aggregation_period = agg_params["aggregation_period"]
|
||||
|
||||
# get 2stage operation
|
||||
agg_params = transform_spec_df.select(
|
||||
"aggregation_params_map.pre_hourly_operation")\
|
||||
.collect()[0].asDict()
|
||||
agg_params = (transform_spec_df.select(
|
||||
"aggregation_params_map.pre_hourly_operation")
|
||||
.collect()[0].asDict())
|
||||
pre_hourly_operation = agg_params["pre_hourly_operation"]
|
||||
|
||||
instance_usage_df = \
|
||||
RollupQuantity.do_rollup(pre_hourly_group_by_list,
|
||||
aggregation_period,
|
||||
pre_hourly_operation,
|
||||
instance_usage_df)
|
||||
if pre_hourly_operation != "rate":
|
||||
instance_usage_df = RollupQuantity.do_rollup(
|
||||
pre_hourly_group_by_list, aggregation_period,
|
||||
pre_hourly_operation, instance_usage_df)
|
||||
else:
|
||||
instance_usage_df = PreHourlyCalculateRate.do_rate_calculation(
|
||||
instance_usage_df)
|
||||
|
||||
# insert metrics
|
||||
instance_usage_df = KafkaInsert.insert(transform_context,
|
||||
instance_usage_df)
|
||||
|
@ -367,11 +365,12 @@ class PreHourlyProcessor(Processor):
|
|||
#
|
||||
metric_ids_df = instance_usage_df.select(
|
||||
"processing_meta.metric_id").distinct()
|
||||
|
||||
metric_ids_to_process = [row.metric_id
|
||||
for row in metric_ids_df.collect()]
|
||||
|
||||
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
|
||||
get_data_driven_specs_repo()
|
||||
data_driven_specs_repo = (
|
||||
DataDrivenSpecsRepoFactory.get_data_driven_specs_repo())
|
||||
sqlc = SQLContext.getOrCreate(instance_usage_df.rdd.context)
|
||||
transform_specs_df = data_driven_specs_repo.get_data_driven_specs(
|
||||
sql_context=sqlc,
|
||||
|
@ -385,9 +384,8 @@ class PreHourlyProcessor(Processor):
|
|||
instance_usage_df.processing_meta.metric_id == metric_id)
|
||||
|
||||
# set transform_spec_df in TransformContext
|
||||
transform_context = \
|
||||
TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
|
||||
PreHourlyProcessor.process_instance_usage(
|
||||
transform_context, source_instance_usage_df)
|
||||
|
@ -399,9 +397,9 @@ class PreHourlyProcessor(Processor):
|
|||
offsets available
|
||||
"""
|
||||
|
||||
offset_range_list = \
|
||||
offset_range_list = (
|
||||
PreHourlyProcessor.get_processing_offset_range_list(
|
||||
processing_time)
|
||||
processing_time))
|
||||
|
||||
# get pre hourly data
|
||||
pre_hourly_rdd = PreHourlyProcessor.fetch_pre_hourly_data(
|
||||
|
@ -415,17 +413,17 @@ class PreHourlyProcessor(Processor):
|
|||
# cache instance usage df
|
||||
#
|
||||
if cfg.CONF.pre_hourly_processor.enable_instance_usage_df_cache:
|
||||
storage_level_prop = \
|
||||
cfg.CONF.pre_hourly_processor\
|
||||
.instance_usage_df_cache_storage_level
|
||||
storage_level_prop = (
|
||||
cfg.CONF.pre_hourly_processor
|
||||
.instance_usage_df_cache_storage_level)
|
||||
try:
|
||||
storage_level = StorageUtils.get_storage_level(
|
||||
storage_level_prop)
|
||||
except InvalidCacheStorageLevelException as storage_error:
|
||||
storage_error.value += \
|
||||
" (as specified in " \
|
||||
"pre_hourly_processor.instance_usage_df" \
|
||||
"_cache_storage_level)"
|
||||
storage_error.value += (" (as specified in "
|
||||
"pre_hourly_processor"
|
||||
".instance_usage_df_cache"
|
||||
"_storage_level)")
|
||||
raise
|
||||
instance_usage_df.persist(storage_level)
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
import mock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
|
@ -73,7 +74,7 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
|
|||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
# Verify count of instance usage data
|
||||
self.assertEqual(result, 6)
|
||||
self.assertEqual(result, 9)
|
||||
|
||||
# check aggregation result
|
||||
mem_total_mb_agg_metric = [
|
||||
|
@ -128,6 +129,44 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
|
|||
.get("metric")
|
||||
.get('value_meta').get('record_count'))
|
||||
|
||||
# check aggregation result for swiftlm.diskusage.rate_agg
|
||||
swift_disk_rate_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'swiftlm.diskusage.rate_agg'][0]
|
||||
self.assertTrue(swift_disk_rate_agg_metric is not None)
|
||||
self.assertEqual(59.36612021857923,
|
||||
swift_disk_rate_agg_metric
|
||||
.get('metric').get('value'))
|
||||
self.assertEqual('2016-06-10 20:27:02',
|
||||
swift_disk_rate_agg_metric
|
||||
.get('metric')
|
||||
.get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
self.assertEqual('2016-06-10 20:27:01',
|
||||
swift_disk_rate_agg_metric
|
||||
.get('metric')
|
||||
.get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual(68.0,
|
||||
swift_disk_rate_agg_metric
|
||||
.get('metric')
|
||||
.get('value_meta').get('record_count'))
|
||||
self.assertEqual('useast',
|
||||
swift_disk_rate_agg_metric.get('meta').get('region'))
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
swift_disk_rate_agg_metric.get('meta')
|
||||
.get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
swift_disk_rate_agg_metric.get('metric')
|
||||
.get('dimensions').get('host'))
|
||||
self.assertEqual('all',
|
||||
swift_disk_rate_agg_metric.get('metric')
|
||||
.get('dimensions').get('project_id'))
|
||||
self.assertEqual('hourly',
|
||||
swift_disk_rate_agg_metric.get('metric')
|
||||
.get('dimensions').get('aggregation_period'))
|
||||
|
||||
def simple_count_transform(self, rdd):
|
||||
return rdd.count()
|
||||
|
||||
|
|
|
@ -4,3 +4,6 @@
|
|||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":10283.1})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":10283.1})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":10283.1})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:27:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 4575.0, "latest_timestamp_string": "2016-06-10 20:27:02", "latest_quantity": 5291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:27:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":15.6502})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:37:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 5575.0, "latest_timestamp_string": "2016-06-10 20:37:02", "latest_quantity": 6291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:37:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":16.6502})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"swiftlm.diskusage.rate_agg","record_count":34.0,"user_id":"all","zone":"all","firstrecord_timestamp_string":"2016-06-10 20:47:01","tenant_id":"all","region":"all","usage_hour":"20","usage_date":"2016-06-10","processing_meta":{"oldest_quantity": 6575.0, "latest_timestamp_string": "2016-06-10 20:47:02", "latest_quantity": 7291.0, "metric_id":"swift_usage_rate", "oldest_timestamp_string": "2016-06-10 20:47:01"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp_string":"2016-06-10 20:27:02","firstrecord_timestamp_unix":1465590421.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1465590422.0,"quantity":17.6502})
|
Loading…
Reference in New Issue