Two stage transformation
Breaking down the aggregation into two stages. The first stage aggregates raw metrics frequently and is implemented as a Spark Streaming job which aggregates metrics at a configurable time interval (defaults to 10 minutes) and writes the intermediate aggregated data, or instance usage data to new "metrics_pre_hourly" kafka topic. The second stage is implemented as a batch job using Spark Streaming createRDD direct stream batch API, which is triggered by the first stage only when first stage runs at the top of the hour. Also enhanced kafka offsets table to keep track of offsets from two stages along with streaming batch time, last time version row got updated and revision number. By default it should keep last 10 revisions to the offsets for each application. Change-Id: Ib2bf7df6b32ca27c89442a23283a89fea802d146
This commit is contained in:
parent
d8e73f3bde
commit
00b874a6b3
|
@ -12,10 +12,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca_transform.driver.mon_metrics_kafka import invoke
|
||||
|
||||
|
||||
activate_this_file = "/opt/monasca/transform/venv/bin/activate_this.py"
|
||||
execfile(activate_this_file, dict(__file__=activate_this_file))
|
||||
|
||||
from monasca_transform.driver.mon_metrics_kafka import invoke
|
||||
|
||||
invoke()
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
[repositories]
|
||||
offsets = monasca_transform.mysql_offset_specs:MySQLOffsetSpecs
|
||||
data_driven_specs = monasca_transform.data_driven_specs.mysql_data_driven_specs_repo:MySQLDataDrivenSpecsRepo
|
||||
offsets_max_revisions = 10
|
||||
|
||||
[database]
|
||||
server_type = mysql
|
||||
|
@ -16,6 +17,15 @@ adapter = monasca_transform.messaging.adapter:KafkaMessageAdapter
|
|||
topic = metrics
|
||||
brokers=192.168.15.6:9092
|
||||
publish_kafka_tenant_id = d2cb21079930415a9f2a33588b9f2bb6
|
||||
adapter_pre_hourly = monasca_transform.messaging.adapter:KafkaMessageAdapterPreHourly
|
||||
topic_pre_hourly = metrics_pre_hourly
|
||||
|
||||
[stage_processors]
|
||||
pre_hourly_processor_enabled = True
|
||||
|
||||
[pre_hourly_processor]
|
||||
enable_instance_usage_df_cache = True
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
@ -54,7 +64,13 @@ spark_home = /opt/spark/current
|
|||
spark_python_files = /opt/monasca/transform/lib/monasca-transform.zip
|
||||
|
||||
# How often the stream should be read (in seconds)
|
||||
stream_interval = 300
|
||||
stream_interval = 600
|
||||
|
||||
# The working directory for monasca-transform
|
||||
work_dir = /var/run/monasca/transform
|
||||
|
||||
# enable caching of record store df
|
||||
enable_record_store_df_cache = True
|
||||
|
||||
# set spark storage level for record store df cache
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
|
@ -4,14 +4,19 @@ USE `monasca_transform`;
|
|||
SET foreign_key_checks = 0;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `kafka_offsets` (
|
||||
`id` INTEGER AUTO_INCREMENT NOT NULL,
|
||||
`topic` varchar(128) NOT NULL,
|
||||
`until_offset` BIGINT NULL,
|
||||
`from_offset` BIGINT NULL,
|
||||
`app_name` varchar(128) NOT NULL,
|
||||
`partition` integer NOT NULL,
|
||||
PRIMARY KEY (`app_name`, `topic`, `partition`)
|
||||
`batch_time` varchar(20) NOT NULL,
|
||||
`last_updated` varchar(20) NOT NULL,
|
||||
`revision` integer NOT NULL,
|
||||
PRIMARY KEY (`id`, `app_name`, `topic`, `partition`, `revision`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `transform_specs` (
|
||||
`metric_id` varchar(128) NOT NULL,
|
||||
`transform_spec` varchar(2048) NOT NULL,
|
||||
|
|
|
@ -14,12 +14,11 @@
|
|||
|
||||
import sys
|
||||
|
||||
from monasca_transform.service.transform_service import main_service
|
||||
|
||||
|
||||
activate_this_file = "/opt/monasca/transform/venv/bin/activate_this.py"
|
||||
execfile(activate_this_file, dict(__file__=activate_this_file))
|
||||
|
||||
from monasca_transform.service.transform_service import main_service
|
||||
|
||||
|
||||
def main():
|
||||
main_service()
|
||||
|
|
|
@ -278,6 +278,8 @@ function install_monasca_transform {
|
|||
|
||||
create_and_populate_monasca_transform_database
|
||||
|
||||
# create metrics pre hourly topic in kafka
|
||||
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 64 --topic metrics_pre_hourly
|
||||
}
|
||||
|
||||
|
||||
|
@ -344,7 +346,7 @@ function install_spark {
|
|||
|
||||
if [ ! -f /opt/spark/download/${SPARK_TARBALL_NAME} ]
|
||||
then
|
||||
sudo curl ${APACHE_MIRROR}/spark/spark-${SPARK_VERSION}/${SPARK_TARBALL_NAME} -o /opt/spark/download/${SPARK_TARBALL_NAME}
|
||||
sudo curl -m 600 ${APACHE_MIRROR}/spark/spark-${SPARK_VERSION}/${SPARK_TARBALL_NAME} -o /opt/spark/download/${SPARK_TARBALL_NAME}
|
||||
fi
|
||||
|
||||
sudo chown spark:spark /opt/spark/download/${SPARK_TARBALL_NAME}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
[repositories]
|
||||
offsets = monasca_transform.mysql_offset_specs:MySQLOffsetSpecs
|
||||
data_driven_specs = monasca_transform.data_driven_specs.mysql_data_driven_specs_repo:MySQLDataDrivenSpecsRepo
|
||||
offsets_max_revisions = 10
|
||||
|
||||
[database]
|
||||
server_type = mysql
|
||||
|
@ -16,6 +17,15 @@ adapter = monasca_transform.messaging.adapter:KafkaMessageAdapter
|
|||
topic = metrics
|
||||
brokers = localhost:9092
|
||||
publish_kafka_tenant_id = d2cb21079930415a9f2a33588b9f2bb6
|
||||
adapter_pre_hourly = monasca_transform.messaging.adapter:KafkaMessageAdapterPreHourly
|
||||
topic_pre_hourly = metrics_pre_hourly
|
||||
|
||||
[stage_processors]
|
||||
enable_pre_hourly_processor = True
|
||||
|
||||
[pre_hourly_processor]
|
||||
enable_instance_usage_df_cache = True
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
@ -60,7 +70,10 @@ spark_home = /opt/spark/current
|
|||
spark_python_files = /opt/stack/monasca-transform/dist/monasca_transform-0.0.1.egg
|
||||
|
||||
# How often the stream should be read (in seconds)
|
||||
stream_interval = 300
|
||||
stream_interval = 600
|
||||
|
||||
# The working directory for monasca-transform
|
||||
work_dir = /opt/stack/monasca-transform
|
||||
|
||||
enable_record_store_df_cache = True
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
|
|
|
@ -16,7 +16,6 @@ import abc
|
|||
import time
|
||||
|
||||
from monasca_transform.component import Component
|
||||
from monasca_transform.messaging.adapter import MessageAdapter
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
|
@ -103,7 +102,7 @@ class InsertComponent(Component):
|
|||
return metric
|
||||
|
||||
@staticmethod
|
||||
def _write_metric(row, agg_params):
|
||||
def _get_metric(row, agg_params):
|
||||
"""write data to kafka. extracts and formats
|
||||
metric data and write s the data to kafka
|
||||
"""
|
||||
|
@ -132,8 +131,46 @@ class InsertComponent(Component):
|
|||
row.aggregation_period}
|
||||
metric = InsertComponent._prepare_metric(instance_usage_dict,
|
||||
agg_params)
|
||||
return metric
|
||||
|
||||
MessageAdapter.send_metric(metric)
|
||||
@staticmethod
|
||||
def _get_instance_usage_pre_hourly(row,
|
||||
metric_id):
|
||||
"""write data to kafka. extracts and formats
|
||||
metric data and writes the data to kafka
|
||||
"""
|
||||
# add transform spec metric id to processing meta
|
||||
processing_meta = {"metric_id": metric_id}
|
||||
|
||||
instance_usage_dict = {"tenant_id": row.tenant_id,
|
||||
"user_id": row.user_id,
|
||||
"resource_uuid": row.resource_uuid,
|
||||
"geolocation": row.geolocation,
|
||||
"region": row.region,
|
||||
"zone": row.zone,
|
||||
"host": row.host,
|
||||
"project_id": row.project_id,
|
||||
"aggregated_metric_name":
|
||||
row.aggregated_metric_name,
|
||||
"quantity": row.quantity,
|
||||
"firstrecord_timestamp":
|
||||
row.firstrecord_timestamp_string,
|
||||
"lastrecord_timestamp":
|
||||
row.lastrecord_timestamp_string,
|
||||
"firstrecord_timestamp_unix":
|
||||
row.firstrecord_timestamp_unix,
|
||||
"lastrecord_timestamp_unix":
|
||||
row.lastrecord_timestamp_unix,
|
||||
"record_count": row.record_count,
|
||||
"service_group": row.service_group,
|
||||
"service_id": row.service_id,
|
||||
"usage_date": row.usage_date,
|
||||
"usage_hour": row.usage_hour,
|
||||
"usage_minute": row.usage_minute,
|
||||
"aggregation_period":
|
||||
row.aggregation_period,
|
||||
"processing_meta": processing_meta}
|
||||
return instance_usage_dict
|
||||
|
||||
@staticmethod
|
||||
def _write_metrics_from_partition(partlistiter):
|
||||
|
|
|
@ -13,12 +13,12 @@
|
|||
# under the License.
|
||||
|
||||
from monasca_transform.component.insert import InsertComponent
|
||||
|
||||
from oslo_config import cfg
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
|
||||
|
||||
class DummyInsert(InsertComponent):
|
||||
"""Insert component that writes instance usage data
|
||||
"""Insert component that writes metric data to
|
||||
to kafka queue
|
||||
"""
|
||||
|
||||
|
@ -63,7 +63,7 @@ class DummyInsert(InsertComponent):
|
|||
#
|
||||
|
||||
for instance_usage_row in instance_usage_df.collect():
|
||||
InsertComponent._write_metric(instance_usage_row,
|
||||
agg_params)
|
||||
|
||||
metric = InsertComponent._get_metric(instance_usage_row,
|
||||
agg_params)
|
||||
DummyAdapter.send_metric(metric)
|
||||
return instance_usage_df
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
from monasca_transform.component.insert import InsertComponent
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.messaging.adapter import KafkaMessageAdapter
|
||||
|
||||
|
||||
class KafkaInsert(InsertComponent):
|
||||
|
@ -59,6 +60,7 @@ class KafkaInsert(InsertComponent):
|
|||
#
|
||||
|
||||
for instance_usage_row in instance_usage_df.collect():
|
||||
InsertComponent._write_metric(instance_usage_row, agg_params)
|
||||
|
||||
metric = InsertComponent._get_metric(
|
||||
instance_usage_row, agg_params)
|
||||
KafkaMessageAdapter.send_metric(metric)
|
||||
return instance_usage_df
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca_transform.component.insert import InsertComponent
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.messaging.adapter import KafkaMessageAdapterPreHourly
|
||||
|
||||
|
||||
class KafkaInsertPreHourly(InsertComponent):
|
||||
"""Insert component that writes instance usage data
|
||||
to kafka queue
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def insert(transform_context, instance_usage_df):
|
||||
"""write instance usage data to kafka"""
|
||||
|
||||
# object to init config
|
||||
ConfigInitializer.basic_config()
|
||||
|
||||
transform_spec_df = transform_context.transform_spec_df_info
|
||||
|
||||
agg_params = transform_spec_df.select(
|
||||
"metric_id").\
|
||||
collect()[0].asDict()
|
||||
metric_id = agg_params["metric_id"]
|
||||
|
||||
for instance_usage_row in instance_usage_df.collect():
|
||||
instance_usage_dict = \
|
||||
InsertComponent._get_instance_usage_pre_hourly(
|
||||
instance_usage_row,
|
||||
metric_id)
|
||||
KafkaMessageAdapterPreHourly.send_metric(instance_usage_dict)
|
||||
|
||||
return instance_usage_df
|
|
@ -24,7 +24,7 @@ class PrepareData(InsertComponent):
|
|||
"""write instance usage data to kafka"""
|
||||
|
||||
#
|
||||
# FIXME: add instance usage data validation
|
||||
# TODO(someone) add instance usage data validation
|
||||
#
|
||||
|
||||
return instance_usage_df
|
||||
|
|
|
@ -52,7 +52,6 @@ class RollupQuantity(SetterComponent):
|
|||
|
||||
@staticmethod
|
||||
def _rollup_quantity(instance_usage_df,
|
||||
transform_spec_df,
|
||||
setter_rollup_group_by_list,
|
||||
setter_rollup_operation):
|
||||
|
||||
|
@ -202,7 +201,35 @@ class RollupQuantity(SetterComponent):
|
|||
|
||||
# perform rollup operation
|
||||
instance_usage_json_rdd = RollupQuantity._rollup_quantity(
|
||||
instance_usage_df, transform_spec_df,
|
||||
instance_usage_df,
|
||||
group_by_columns_list,
|
||||
str(setter_rollup_operation))
|
||||
|
||||
sql_context = SQLContext.getOrCreate(instance_usage_df.rdd.context)
|
||||
instance_usage_trans_df = InstanceUsageUtils.create_df_from_json_rdd(
|
||||
sql_context,
|
||||
instance_usage_json_rdd)
|
||||
|
||||
return instance_usage_trans_df
|
||||
|
||||
@staticmethod
|
||||
def do_rollup(setter_rollup_group_by_list,
|
||||
aggregation_period,
|
||||
setter_rollup_operation,
|
||||
instance_usage_df):
|
||||
|
||||
# get aggregation period
|
||||
group_by_period_list = \
|
||||
ComponentUtils._get_instance_group_by_period_list(
|
||||
aggregation_period)
|
||||
|
||||
# group by columns list
|
||||
group_by_columns_list = group_by_period_list + \
|
||||
setter_rollup_group_by_list
|
||||
|
||||
# perform rollup operation
|
||||
instance_usage_json_rdd = RollupQuantity._rollup_quantity(
|
||||
instance_usage_df,
|
||||
group_by_columns_list,
|
||||
str(setter_rollup_operation))
|
||||
|
||||
|
|
|
@ -261,15 +261,15 @@ class FetchQuantity(UsageComponent):
|
|||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"usage_date":
|
||||
getattr(row, "usage_date",
|
||||
getattr(row, "event_date",
|
||||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"usage_hour":
|
||||
getattr(row, "usage_hour",
|
||||
getattr(row, "event_hour",
|
||||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"usage_minute":
|
||||
getattr(row, "usage_minute",
|
||||
getattr(row, "event_minute",
|
||||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"aggregation_period":
|
||||
|
|
|
@ -23,6 +23,8 @@ class ConfigInitializer(object):
|
|||
ConfigInitializer.load_database_options()
|
||||
ConfigInitializer.load_messaging_options()
|
||||
ConfigInitializer.load_service_options()
|
||||
ConfigInitializer.load_stage_processors_options()
|
||||
ConfigInitializer.load_pre_hourly_processor_options()
|
||||
if not default_config_files:
|
||||
default_config_files = ['/etc/monasca-transform.conf',
|
||||
'etc/monasca-transform.conf']
|
||||
|
@ -43,7 +45,9 @@ class ConfigInitializer(object):
|
|||
default='monasca_transform.data_driven_specs.'
|
||||
'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo',
|
||||
help='Repository for metric and event data_driven_specs'
|
||||
)
|
||||
),
|
||||
cfg.IntOpt('offsets_max_revisions', default=10,
|
||||
help="Max revisions of offsets for each application")
|
||||
]
|
||||
repo_group = cfg.OptGroup(name='repositories', title='repositories')
|
||||
cfg.CONF.register_group(repo_group)
|
||||
|
@ -76,7 +80,13 @@ class ConfigInitializer(object):
|
|||
help='Messaging brokers'),
|
||||
cfg.StrOpt('publish_kafka_tenant_id',
|
||||
default='111111',
|
||||
help='publish aggregated metrics tenant')
|
||||
help='publish aggregated metrics tenant'),
|
||||
cfg.StrOpt('adapter_pre_hourly',
|
||||
default='monasca_transform.messaging.adapter:'
|
||||
'KafkaMessageAdapterPreHourly',
|
||||
help='Message adapter implementation'),
|
||||
cfg.StrOpt('topic_pre_hourly', default='metrics_pre_hourly',
|
||||
help='Messaging topic pre hourly')
|
||||
]
|
||||
messaging_group = cfg.OptGroup(name='messaging', title='messaging')
|
||||
cfg.CONF.register_group(messaging_group)
|
||||
|
@ -99,8 +109,31 @@ class ConfigInitializer(object):
|
|||
cfg.StrOpt('spark_python_files'),
|
||||
cfg.IntOpt('stream_interval'),
|
||||
cfg.StrOpt('work_dir'),
|
||||
cfg.StrOpt('spark_home')
|
||||
cfg.StrOpt('spark_home'),
|
||||
cfg.BoolOpt('enable_record_store_df_cache'),
|
||||
cfg.StrOpt('record_store_df_cache_storage_level')
|
||||
]
|
||||
service_group = cfg.OptGroup(name='service', title='service')
|
||||
cfg.CONF.register_group(service_group)
|
||||
cfg.CONF.register_opts(service_opts, group=service_group)
|
||||
|
||||
@staticmethod
|
||||
def load_stage_processors_options():
|
||||
app_opts = [
|
||||
cfg.BoolOpt('pre_hourly_processor_enabled'),
|
||||
]
|
||||
app_group = cfg.OptGroup(name='stage_processors',
|
||||
title='stage_processors')
|
||||
cfg.CONF.register_group(app_group)
|
||||
cfg.CONF.register_opts(app_opts, group=app_group)
|
||||
|
||||
@staticmethod
|
||||
def load_pre_hourly_processor_options():
|
||||
app_opts = [
|
||||
cfg.BoolOpt('enable_instance_usage_df_cache'),
|
||||
cfg.StrOpt('instance_usage_df_cache_storage_level')
|
||||
]
|
||||
app_group = cfg.OptGroup(name='pre_hourly_processor',
|
||||
title='pre_hourly_processor')
|
||||
cfg.CONF.register_group(app_group)
|
||||
cfg.CONF.register_opts(app_opts, group=app_group)
|
||||
|
|
|
@ -1,23 +1,23 @@
|
|||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"mem_total_all","metric_id":"mem_total_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"mem.usable_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"mem_usable_all","metric_id":"mem_usable_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_total_mb_all","metric_id":"vm_mem_total_mb_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_total_mb_project","metric_id":"vm_mem_total_mb_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.used_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_used_mb_all","metric_id":"vm_mem_used_mb_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.used_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_used_mb_project","metric_id":"vm_mem_used_mb_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"disk.total_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"disk_total_all","metric_id":"disk_total_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"disk.total_used_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"disk_usable_all","metric_id":"disk_usable_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"nova.vm.disk.total_allocated_gb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"nova_disk_total_allocated_gb_all","metric_id":"nova_disk_total_allocated_gb_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.disk.allocation_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_disk_allocation_all","metric_id":"vm_disk_allocation_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.disk.allocation_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_disk_allocation_project","metric_id":"vm_disk_allocation_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_total_all","metric_id":"cpu_total_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_total_host","metric_id":"cpu_total_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_util_all","metric_id":"cpu_util_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_util_host","metric_id":"cpu_util_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vcpus_all","metric_id":"vcpus_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vcpus_project","metric_id":"vcpus_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.cpu.utilization_perc_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_cpu_util_perc_project","metric_id":"vm_cpu_util_perc_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"mem_total_all","metric_id":"mem_total_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"mem.usable_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"mem_usable_all","metric_id":"mem_usable_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_mem_total_mb_all","metric_id":"vm_mem_total_mb_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_mem_total_mb_project","metric_id":"vm_mem_total_mb_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.mem.used_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_mem_used_mb_all","metric_id":"vm_mem_used_mb_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.mem.used_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_mem_used_mb_project","metric_id":"vm_mem_used_mb_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"disk.total_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"disk_total_all","metric_id":"disk_total_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"disk.total_used_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"disk_usable_all","metric_id":"disk_usable_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"nova.vm.disk.total_allocated_gb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"nova_disk_total_allocated_gb_all","metric_id":"nova_disk_total_allocated_gb_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.disk.allocation_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_disk_allocation_all","metric_id":"vm_disk_allocation_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.disk.allocation_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_disk_allocation_project","metric_id":"vm_disk_allocation_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"cpu_total_all","metric_id":"cpu_total_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"cpu_total_host","metric_id":"cpu_total_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"cpu_util_all","metric_id":"cpu_util_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"cpu_util_host","metric_id":"cpu_util_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vcpus_all","metric_id":"vcpus_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vcpus_project","metric_id":"vcpus_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.cpu.utilization_perc_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_cpu_util_perc_project","metric_id":"vm_cpu_util_perc_project"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_all"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_host"}
|
||||
|
|
|
@ -41,7 +41,10 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
|||
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
||||
import DataDrivenSpecsRepoFactory
|
||||
|
||||
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
|
||||
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform.storage_utils import StorageUtils
|
||||
from monasca_transform.transform.transform_utils import MonMetricUtils
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
|
@ -65,7 +68,7 @@ class MonMetricsKafkaProcessor(object):
|
|||
log.debug(message)
|
||||
|
||||
@staticmethod
|
||||
def store_offset_ranges(rdd):
|
||||
def store_offset_ranges(batch_time, rdd):
|
||||
if rdd.isEmpty():
|
||||
MonMetricsKafkaProcessor.log_debug(
|
||||
"storeOffsetRanges: nothing to process...")
|
||||
|
@ -73,7 +76,9 @@ class MonMetricsKafkaProcessor(object):
|
|||
else:
|
||||
my_offset_ranges = rdd.offsetRanges()
|
||||
transform_context = \
|
||||
TransformContextUtils.get_context(offset_info=my_offset_ranges)
|
||||
TransformContextUtils.get_context(offset_info=my_offset_ranges,
|
||||
batch_time_info=batch_time
|
||||
)
|
||||
rdd_transform_context = \
|
||||
rdd.map(lambda x: RddTransformContext(x, transform_context))
|
||||
return rdd_transform_context
|
||||
|
@ -87,8 +92,8 @@ class MonMetricsKafkaProcessor(object):
|
|||
@staticmethod
|
||||
def get_kafka_stream(topic, streaming_context):
|
||||
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
|
||||
saved_offset_spec = offset_specifications.get_kafka_offsets()
|
||||
app_name = streaming_context.sparkContext.appName
|
||||
saved_offset_spec = offset_specifications.get_kafka_offsets(app_name)
|
||||
if len(saved_offset_spec) < 1:
|
||||
|
||||
MonMetricsKafkaProcessor.log_debug(
|
||||
|
@ -157,25 +162,29 @@ class MonMetricsKafkaProcessor(object):
|
|||
counts.pprint(9999)
|
||||
|
||||
@staticmethod
|
||||
def save_kafka_offsets(current_offsets, app_name):
|
||||
def save_kafka_offsets(current_offsets, app_name,
|
||||
batch_time_info):
|
||||
"""save current offsets to offset specification."""
|
||||
# get the offsets from global var
|
||||
|
||||
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
|
||||
|
||||
for o in current_offsets:
|
||||
MonMetricsKafkaProcessor.log_debug(
|
||||
"adding offset: topic:{%s}, partition:{%s}, fromOffset:{%s}, "
|
||||
"untilOffset:{%s}" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
||||
offset_specs.add(
|
||||
app_name, o.topic, o.partition, o.fromOffset, o.untilOffset)
|
||||
"saving: OffSetRanges: %s %s %s %s, "
|
||||
"batch_time_info: %s" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset,
|
||||
str(batch_time_info)))
|
||||
# add new offsets, update revision
|
||||
offset_specs.add_all_offsets(app_name,
|
||||
current_offsets,
|
||||
batch_time_info)
|
||||
|
||||
@staticmethod
|
||||
def reset_kafka_offsets():
|
||||
def reset_kafka_offsets(app_name):
|
||||
"""delete all offsets from the offset specification."""
|
||||
# get the offsets from global var
|
||||
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
|
||||
offset_specs.delete_all_kafka_offsets()
|
||||
offset_specs.delete_all_kafka_offsets(app_name)
|
||||
|
||||
@staticmethod
|
||||
def _validate_raw_mon_metrics(row):
|
||||
|
@ -437,24 +446,47 @@ class MonMetricsKafkaProcessor(object):
|
|||
rdd_transform_context = rdd_transform_context_rdd.first()
|
||||
transform_context = rdd_transform_context.transform_context_info
|
||||
|
||||
#
|
||||
# cache record store rdd
|
||||
#
|
||||
if cfg.CONF.service.enable_record_store_df_cache:
|
||||
storage_level_prop = \
|
||||
cfg.CONF.service.record_store_df_cache_storage_level
|
||||
storage_level = StorageUtils.get_storage_level(
|
||||
storage_level_prop)
|
||||
record_store_df.persist(storage_level)
|
||||
|
||||
#
|
||||
# start processing metrics available in record_store data
|
||||
#
|
||||
MonMetricsKafkaProcessor.process_metrics(transform_context,
|
||||
record_store_df)
|
||||
|
||||
#
|
||||
# extract kafka offsets stored in rdd and save
|
||||
#
|
||||
# remove df from cache
|
||||
if cfg.CONF.service.enable_record_store_df_cache:
|
||||
record_store_df.unpersist()
|
||||
|
||||
#
|
||||
# extract kafka offsets and batch processing time
|
||||
# stored in transform_context and save offsets
|
||||
#
|
||||
offsets = transform_context.offset_info
|
||||
|
||||
for o in offsets:
|
||||
MonMetricsKafkaProcessor.log_debug(
|
||||
"going to save: OffSetRanges: %s %s %s %s" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
||||
# batch time
|
||||
batch_time_info = \
|
||||
transform_context.batch_time_info
|
||||
|
||||
MonMetricsKafkaProcessor.save_kafka_offsets(
|
||||
offsets, rdd_transform_context_rdd.context.appName)
|
||||
offsets, rdd_transform_context_rdd.context.appName,
|
||||
batch_time_info)
|
||||
|
||||
# call pre hourly processor, if its time to run
|
||||
if (cfg.CONF.stage_processors.pre_hourly_processor_enabled
|
||||
is True and PreHourlyProcessor.is_time_to_run(
|
||||
batch_time_info)):
|
||||
PreHourlyProcessor.run_processor(
|
||||
record_store_df.rdd.context,
|
||||
batch_time_info)
|
||||
|
||||
@staticmethod
|
||||
def transform_to_recordstore(kvs):
|
||||
|
@ -526,9 +558,13 @@ def invoke():
|
|||
# One exception that can occur here is the result of the saved
|
||||
# kafka offsets being obsolete/out of range. Delete the saved
|
||||
# offsets to improve the chance of success on the next execution.
|
||||
|
||||
# TODO(someone) prevent deleting all offsets for an application,
|
||||
# but just the latest revision
|
||||
MonMetricsKafkaProcessor.log_debug(
|
||||
"Deleting saved offsets for chance of success on next execution")
|
||||
MonMetricsKafkaProcessor.reset_kafka_offsets()
|
||||
|
||||
MonMetricsKafkaProcessor.reset_kafka_offsets(application_name)
|
||||
|
||||
if __name__ == "__main__":
|
||||
invoke()
|
||||
|
|
|
@ -22,20 +22,6 @@ import simport
|
|||
|
||||
class MessageAdapter(object):
|
||||
|
||||
adapter_impl = None
|
||||
|
||||
@staticmethod
|
||||
def init():
|
||||
# object to keep track of offsets
|
||||
MessageAdapter.adapter_impl = simport.load(
|
||||
cfg.CONF.messaging.adapter)()
|
||||
|
||||
@staticmethod
|
||||
def send_metric(metric):
|
||||
if not MessageAdapter.adapter_impl:
|
||||
MessageAdapter.init()
|
||||
MessageAdapter.adapter_impl.do_send_metric(metric)
|
||||
|
||||
@abc.abstractmethod
|
||||
def do_send_metric(self, metric):
|
||||
raise NotImplementedError(
|
||||
|
@ -45,13 +31,55 @@ class MessageAdapter(object):
|
|||
|
||||
class KafkaMessageAdapter(MessageAdapter):
|
||||
|
||||
adapter_impl = None
|
||||
|
||||
def __init__(self):
|
||||
client_for_writing = KafkaClient(cfg.CONF.messaging.brokers)
|
||||
self.producer = SimpleProducer(client_for_writing)
|
||||
self.topic = cfg.CONF.messaging.topic
|
||||
|
||||
@staticmethod
|
||||
def init():
|
||||
# object to keep track of offsets
|
||||
KafkaMessageAdapter.adapter_impl = simport.load(
|
||||
cfg.CONF.messaging.adapter)()
|
||||
|
||||
def do_send_metric(self, metric):
|
||||
self.producer.send_messages(
|
||||
self.topic,
|
||||
json.dumps(metric, separators=(',', ':')))
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def send_metric(metric):
|
||||
if not KafkaMessageAdapter.adapter_impl:
|
||||
KafkaMessageAdapter.init()
|
||||
KafkaMessageAdapter.adapter_impl.do_send_metric(metric)
|
||||
|
||||
|
||||
class KafkaMessageAdapterPreHourly(MessageAdapter):
|
||||
|
||||
adapter_impl = None
|
||||
|
||||
def __init__(self):
|
||||
client_for_writing = KafkaClient(cfg.CONF.messaging.brokers)
|
||||
self.producer = SimpleProducer(client_for_writing)
|
||||
self.topic = cfg.CONF.messaging.topic_pre_hourly
|
||||
|
||||
@staticmethod
|
||||
def init():
|
||||
# object to keep track of offsets
|
||||
KafkaMessageAdapterPreHourly.adapter_impl = simport.load(
|
||||
cfg.CONF.messaging.adapter_pre_hourly)()
|
||||
|
||||
def do_send_metric(self, metric):
|
||||
self.producer.send_messages(
|
||||
self.topic,
|
||||
json.dumps(metric, separators=(',', ':')))
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def send_metric(metric):
|
||||
if not KafkaMessageAdapterPreHourly.adapter_impl:
|
||||
KafkaMessageAdapterPreHourly.init()
|
||||
KafkaMessageAdapterPreHourly.adapter_impl.do_send_metric(metric)
|
||||
|
|
|
@ -11,11 +11,11 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
from oslo_config import cfg
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import desc
|
||||
from sqlalchemy.ext.automap import automap_base
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from monasca_transform.offset_specs import OffsetSpec
|
||||
|
@ -25,9 +25,18 @@ Base = automap_base()
|
|||
|
||||
|
||||
class MySQLOffsetSpec(Base, OffsetSpec):
|
||||
|
||||
__tablename__ = 'kafka_offsets'
|
||||
|
||||
def __str__(self):
|
||||
return "%s,%s,%s,%s,%s,%s,%s,%s" % (str(self.id),
|
||||
str(self.topic),
|
||||
str(self.partition),
|
||||
str(self.until_offset),
|
||||
str(self.from_offset),
|
||||
str(self.batch_time),
|
||||
str(self.last_updated),
|
||||
str(self.revision))
|
||||
|
||||
|
||||
class MySQLOffsetSpecs(OffsetSpecs):
|
||||
|
||||
|
@ -41,8 +50,7 @@ class MySQLOffsetSpecs(OffsetSpecs):
|
|||
database_uid,
|
||||
database_pwd,
|
||||
database_server,
|
||||
database_name
|
||||
), isolation_level="READ UNCOMMITTED")
|
||||
database_name), isolation_level="READ UNCOMMITTED")
|
||||
|
||||
db.echo = True
|
||||
# reflect the tables
|
||||
|
@ -51,36 +59,119 @@ class MySQLOffsetSpecs(OffsetSpecs):
|
|||
Session = sessionmaker(bind=db)
|
||||
self.session = Session()
|
||||
|
||||
def add(self, app_name, topic, partition,
|
||||
from_offset, until_offset):
|
||||
try:
|
||||
offset_spec = self.session.query(MySQLOffsetSpec).filter_by(
|
||||
app_name=app_name, topic=topic,
|
||||
partition=partition).one()
|
||||
offset_spec.from_offset = from_offset
|
||||
offset_spec.until_offset = until_offset
|
||||
self.session.commit()
|
||||
# keep these many offset versions around
|
||||
self.MAX_REVISIONS = cfg.CONF.repositories.offsets_max_revisions
|
||||
|
||||
def _manage_offset_revisions(self):
|
||||
"""manage offset versions"""
|
||||
distinct_offset_specs = self.session.query(
|
||||
MySQLOffsetSpec).group_by(MySQLOffsetSpec.app_name,
|
||||
MySQLOffsetSpec.topic,
|
||||
MySQLOffsetSpec.partition
|
||||
).all()
|
||||
|
||||
for distinct_offset_spec in distinct_offset_specs:
|
||||
ordered_versions = self.session.query(
|
||||
MySQLOffsetSpec).filter_by(
|
||||
app_name=distinct_offset_spec.app_name,
|
||||
topic=distinct_offset_spec.topic,
|
||||
partition=distinct_offset_spec.partition).order_by(
|
||||
desc(MySQLOffsetSpec.id)).all()
|
||||
|
||||
revision = 1
|
||||
for version_spec in ordered_versions:
|
||||
version_spec.revision = revision
|
||||
revision = revision + 1
|
||||
|
||||
self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete(
|
||||
synchronize_session="fetch")
|
||||
|
||||
def get_kafka_offsets(self, app_name):
|
||||
return {'%s_%s_%s' % (
|
||||
offset.get_app_name(), offset.get_topic(), offset.get_partition()
|
||||
): offset for offset in self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.app_name == app_name,
|
||||
MySQLOffsetSpec.revision == 1).all()}
|
||||
|
||||
def delete_all_kafka_offsets(self, app_name):
|
||||
try:
|
||||
self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.app_name == app_name).delete()
|
||||
self.session.commit()
|
||||
except Exception:
|
||||
# Seems like there isn't much that can be done in this situation
|
||||
pass
|
||||
|
||||
def add_all_offsets(self, app_name, offsets,
|
||||
batch_time_info):
|
||||
"""add offsets. """
|
||||
try:
|
||||
|
||||
# batch time
|
||||
batch_time = \
|
||||
batch_time_info.strftime(
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# last updated
|
||||
last_updated = \
|
||||
datetime.datetime.now().strftime(
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
NEW_REVISION_NO = -1
|
||||
|
||||
for o in offsets:
|
||||
offset_spec = MySQLOffsetSpec(
|
||||
topic=o.topic,
|
||||
app_name=app_name,
|
||||
partition=o.partition,
|
||||
from_offset=o.fromOffset,
|
||||
until_offset=o.untilOffset,
|
||||
batch_time=batch_time,
|
||||
last_updated=last_updated,
|
||||
revision=NEW_REVISION_NO)
|
||||
self.session.add(offset_spec)
|
||||
|
||||
# manage versions
|
||||
self._manage_offset_revisions()
|
||||
|
||||
self.session.commit()
|
||||
except Exception:
|
||||
self.session.rollback()
|
||||
raise
|
||||
|
||||
def add(self, app_name, topic, partition,
|
||||
from_offset, until_offset, batch_time_info):
|
||||
"""add offset info. """
|
||||
try:
|
||||
# batch time
|
||||
batch_time = \
|
||||
batch_time_info.strftime(
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# last updated
|
||||
last_updated = \
|
||||
datetime.datetime.now().strftime(
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
NEW_REVISION_NO = -1
|
||||
|
||||
except NoResultFound:
|
||||
offset_spec = MySQLOffsetSpec(
|
||||
topic=topic,
|
||||
app_name=app_name,
|
||||
partition=partition,
|
||||
from_offset=from_offset,
|
||||
until_offset=until_offset)
|
||||
until_offset=until_offset,
|
||||
batch_time=batch_time,
|
||||
last_updated=last_updated,
|
||||
revision=NEW_REVISION_NO)
|
||||
|
||||
self.session.add(offset_spec)
|
||||
|
||||
# manage versions
|
||||
self._manage_offset_revisions()
|
||||
|
||||
self.session.commit()
|
||||
|
||||
def get_kafka_offsets(self):
|
||||
return {'%s_%s_%s' % (
|
||||
offset.get_app_name(), offset.get_topic(), offset.get_partition()
|
||||
): offset for offset in self.session.query(MySQLOffsetSpec).all()}
|
||||
|
||||
def delete_all_kafka_offsets(self):
|
||||
try:
|
||||
self.session.query(MySQLOffsetSpec).delete()
|
||||
self.session.commit()
|
||||
|
||||
except Exception:
|
||||
# Seems like there isn't much that can be done in this situation
|
||||
pass
|
||||
self.session.rollback()
|
||||
raise
|
||||
|
|
|
@ -13,24 +13,30 @@
|
|||
# under the License.
|
||||
|
||||
import abc
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import six
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OffsetSpec(object):
|
||||
|
||||
def __init__(self, app_name=None, topic=None, partition=None,
|
||||
from_offset=None, until_offset=None):
|
||||
from_offset=None, until_offset=None,
|
||||
batch_time=None, last_updated=None,
|
||||
revision=None):
|
||||
|
||||
self.app_name = app_name
|
||||
self.topic = topic
|
||||
self.partition = partition
|
||||
self.from_offset = from_offset
|
||||
self.until_offset = until_offset
|
||||
self.batch_time = batch_time
|
||||
self.last_updated = last_updated
|
||||
self.revision = revision
|
||||
|
||||
def get_app_name(self):
|
||||
return self.app_name
|
||||
|
@ -47,6 +53,15 @@ class OffsetSpec(object):
|
|||
def get_until_offset(self):
|
||||
return self.until_offset
|
||||
|
||||
def get_batch_time(self):
|
||||
return self.batch_time
|
||||
|
||||
def get_last_updated(self):
|
||||
return self.last_updated
|
||||
|
||||
def get_revision(self):
|
||||
return self.revision
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class OffsetSpecs(object):
|
||||
|
@ -56,20 +71,29 @@ class OffsetSpecs(object):
|
|||
|
||||
@abc.abstractmethod
|
||||
def add(self, app_name, topic, partition,
|
||||
from_offset, until_offset):
|
||||
from_offset, until_offset, batch_time_info):
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement add(self, app_name, topic, "
|
||||
"partition, from_offset, until_offset)"
|
||||
"partition, from_offset, until_offset, batch_time,"
|
||||
"last_updated, revision)"
|
||||
% self.__class__.__name__)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_kafka_offsets(self):
|
||||
def add_all_offsets(self, app_name, offsets, batch_time_info):
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement add(self, app_name, topic, "
|
||||
"partition, from_offset, until_offset, batch_time,"
|
||||
"last_updated, revision)"
|
||||
% self.__class__.__name__)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_kafka_offsets(self, app_name):
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement get_kafka_offsets()"
|
||||
% self.__class__.__name__)
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_all_kafka_offsets(self):
|
||||
def delete_all_kafka_offsets(self, app_name):
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement delete_all_kafka_offsets()"
|
||||
% self.__class__.__name__)
|
||||
|
@ -91,9 +115,12 @@ class JSONOffsetSpecs(OffsetSpecs):
|
|||
self._kafka_offsets[key] = OffsetSpec(
|
||||
app_name=value.get('app_name'),
|
||||
topic=value.get('topic'),
|
||||
until_offset=value.get('until_offset'),
|
||||
partition=value.get('partition'),
|
||||
from_offset=value.get('from_offset'),
|
||||
partition=value.get('partition')
|
||||
until_offset=value.get('until_offset'),
|
||||
batch_time=value.get('batch_time'),
|
||||
last_updated=value.get('last_updated'),
|
||||
revision=value.get('revision')
|
||||
)
|
||||
except Exception:
|
||||
log.info('Invalid or corrupts offsets file found at %s,'
|
||||
|
@ -122,10 +149,26 @@ class JSONOffsetSpecs(OffsetSpecs):
|
|||
"topic": offset_value.get_topic(),
|
||||
"partition": offset_value.get_partition(),
|
||||
"from_offset": offset_value.get_from_offset(),
|
||||
"until_offset": offset_value.get_until_offset()}
|
||||
"until_offset": offset_value.get_until_offset(),
|
||||
"batch_time": offset_value.get_batch_time(),
|
||||
"last_updated": offset_value.get_last_updated(),
|
||||
"revision": offset_value.get_revision()}
|
||||
|
||||
def add(self, app_name, topic, partition,
|
||||
from_offset, until_offset):
|
||||
from_offset, until_offset, batch_time_info):
|
||||
|
||||
# batch time
|
||||
batch_time = \
|
||||
batch_time_info.strftime(
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# last updated
|
||||
last_updated = \
|
||||
datetime.datetime.now().strftime(
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
NEW_REVISION_NO = 1
|
||||
|
||||
key_name = "%s_%s_%s" % (
|
||||
app_name, topic, partition)
|
||||
offset = OffsetSpec(
|
||||
|
@ -133,7 +176,10 @@ class JSONOffsetSpecs(OffsetSpecs):
|
|||
topic=topic,
|
||||
partition=partition,
|
||||
from_offset=from_offset,
|
||||
until_offset=until_offset
|
||||
until_offset=until_offset,
|
||||
batch_time=batch_time,
|
||||
last_updated=last_updated,
|
||||
revision=NEW_REVISION_NO
|
||||
)
|
||||
log.info('Adding offset %s for key %s to current offsets: %s' %
|
||||
(offset, key_name, self._kafka_offsets))
|
||||
|
@ -141,9 +187,44 @@ class JSONOffsetSpecs(OffsetSpecs):
|
|||
log.info('Added so kafka offset is now %s', self._kafka_offsets)
|
||||
self._save()
|
||||
|
||||
def get_kafka_offsets(self):
|
||||
def get_kafka_offsets(self, app_name):
|
||||
return self._kafka_offsets
|
||||
|
||||
def delete_all_kafka_offsets(self):
|
||||
def delete_all_kafka_offsets(self, app_name):
|
||||
log.info("Deleting json offsets file: %s", self.kafka_offset_spec_file)
|
||||
os.remove(self.kafka_offset_spec_file)
|
||||
|
||||
def add_all_offsets(self, app_name, offsets, batch_time_info):
|
||||
|
||||
# batch time
|
||||
batch_time = \
|
||||
batch_time_info.strftime(
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# last updated
|
||||
last_updated = \
|
||||
datetime.datetime.now().strftime(
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
NEW_REVISION_NO = -1
|
||||
|
||||
for o in offsets:
|
||||
|
||||
key_name = "%s_%s_%s" % (
|
||||
app_name, o.topic, o.partition)
|
||||
|
||||
offset = OffsetSpec(
|
||||
topic=o.topic,
|
||||
app_name=app_name,
|
||||
partition=o.partition,
|
||||
from_offset=o.fromOffset,
|
||||
until_offset=o.untilOffset,
|
||||
batch_time=batch_time,
|
||||
last_updated=last_updated,
|
||||
revision=NEW_REVISION_NO)
|
||||
|
||||
log.info('Adding offset %s for key %s to current offsets: %s' %
|
||||
(offset, key_name, self._kafka_offsets))
|
||||
self._kafka_offsets[key_name] = offset
|
||||
log.info('Added so kafka offset is now %s', self._kafka_offsets)
|
||||
self._save()
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
|
||||
|
||||
class Processor(object):
|
||||
"""processor object """
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_app_name(self):
|
||||
"""get name of this application. Will be used to
|
||||
store offsets in database
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement get_app_name()"
|
||||
% self.__class__.__name__)
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_time_to_run(self, current_time):
|
||||
"""return True if its time to run this processor"""
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement is_time_to_run()"
|
||||
% self.__class__.__name__)
|
||||
|
||||
@abc.abstractmethod
|
||||
def run_processor(self, time):
|
||||
"""Run application"""
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement run_processor()"
|
||||
% self.__class__.__name__)
|
|
@ -0,0 +1,432 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from kafka.common import OffsetRequestPayload
|
||||
from kafka import KafkaClient
|
||||
|
||||
from pyspark.sql import SQLContext
|
||||
from pyspark.streaming.kafka import KafkaUtils
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from oslo_config import cfg
|
||||
import simport
|
||||
|
||||
|
||||
from monasca_transform.component.insert.kafka_insert import KafkaInsert
|
||||
from monasca_transform.component.setter.rollup_quantity import RollupQuantity
|
||||
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
||||
import DataDrivenSpecsRepo
|
||||
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
||||
import DataDrivenSpecsRepoFactory
|
||||
from monasca_transform.processor import Processor
|
||||
from monasca_transform.transform.storage_utils import StorageUtils
|
||||
from monasca_transform.transform.transform_utils import InstanceUsageUtils
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PreHourlyProcessor(Processor):
|
||||
"""Processor to process usage data published to metrics_pre_hourly topic a
|
||||
and publish final rolled up metrics to metrics topic in kafka.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def log_debug(message):
|
||||
LOG.debug(message)
|
||||
|
||||
@staticmethod
|
||||
def save_kafka_offsets(current_offsets,
|
||||
batch_time_info):
|
||||
"""save current offsets to offset specification."""
|
||||
|
||||
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
|
||||
|
||||
app_name = PreHourlyProcessor.get_app_name()
|
||||
|
||||
for o in current_offsets:
|
||||
PreHourlyProcessor.log_debug(
|
||||
"saving: OffSetRanges: %s %s %s %s, "
|
||||
"batch_time_info: %s" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset,
|
||||
str(batch_time_info)))
|
||||
# add new offsets, update revision
|
||||
offset_specs.add_all_offsets(app_name,
|
||||
current_offsets,
|
||||
batch_time_info)
|
||||
|
||||
@staticmethod
|
||||
def reset_kafka_offsets(app_name):
|
||||
"""delete all offsets from the offset specification."""
|
||||
# get the offsets from global var
|
||||
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
|
||||
offset_specs.delete_all_kafka_offsets(app_name)
|
||||
|
||||
@staticmethod
|
||||
def get_app_name():
|
||||
"""get name of this application. Will be used to
|
||||
store offsets in database
|
||||
"""
|
||||
return "mon_metrics_kafka_pre_hourly"
|
||||
|
||||
@staticmethod
|
||||
def get_kafka_topic():
|
||||
"""get name of kafka topic for transformation."""
|
||||
return "metrics_pre_hourly"
|
||||
|
||||
@staticmethod
|
||||
def is_time_to_run(check_time):
|
||||
"""return True if its time to run this processor.
|
||||
For now it just checks to see if its start of the hour
|
||||
i.e. the minute is 00.
|
||||
"""
|
||||
this_min = int(datetime.datetime.strftime(check_time, '%M'))
|
||||
|
||||
# run pre hourly processor only at top of the hour
|
||||
if this_min == 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _get_offsets_from_kafka(brokers,
|
||||
topic,
|
||||
offset_time):
|
||||
"""get dict representing kafka
|
||||
offsets.
|
||||
"""
|
||||
# get client
|
||||
client = KafkaClient(brokers)
|
||||
|
||||
# get partitions for a topic
|
||||
partitions = client.topic_partitions[topic]
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/
|
||||
# A+Guide+To+The+Kafka+Protocol#
|
||||
# AGuideToTheKafkaProtocol-OffsetRequest
|
||||
MAX_OFFSETS = 1
|
||||
offset_requests = [OffsetRequestPayload(topic,
|
||||
part_name,
|
||||
offset_time,
|
||||
MAX_OFFSETS) for part_name
|
||||
in partitions.keys()]
|
||||
|
||||
offsets_responses = client.send_offset_request(offset_requests)
|
||||
|
||||
offset_dict = {}
|
||||
for response in offsets_responses:
|
||||
key = "_".join((response.topic,
|
||||
str(response.partition)))
|
||||
offset_dict[key] = response
|
||||
|
||||
return offset_dict
|
||||
|
||||
@staticmethod
|
||||
def _parse_saved_offsets(app_name, topic, saved_offset_spec):
|
||||
"""get dict representing saved
|
||||
offsets.
|
||||
"""
|
||||
offset_dict = {}
|
||||
for key, value in saved_offset_spec.items():
|
||||
if key.startswith("%s_%s" % (app_name, topic)):
|
||||
spec_app_name = value.get_app_name()
|
||||
spec_topic = value.get_topic()
|
||||
spec_partition = int(value.get_partition())
|
||||
spec_from_offset = value.get_from_offset()
|
||||
spec_until_offset = value.get_until_offset()
|
||||
key = "_".join((spec_topic,
|
||||
str(spec_partition)))
|
||||
offset_dict[key] = (spec_app_name,
|
||||
spec_topic,
|
||||
spec_partition,
|
||||
spec_from_offset,
|
||||
spec_until_offset)
|
||||
return offset_dict
|
||||
|
||||
@staticmethod
|
||||
def _get_new_offset_range_list(brokers, topic):
|
||||
"""get offset range from earliest to latest."""
|
||||
offset_range_list = []
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/
|
||||
# A+Guide+To+The+Kafka+Protocol#
|
||||
# AGuideToTheKafkaProtocol-OffsetRequest
|
||||
GET_LATEST_OFFSETS = -1
|
||||
latest_dict = PreHourlyProcessor.\
|
||||
_get_offsets_from_kafka(brokers, topic,
|
||||
GET_LATEST_OFFSETS)
|
||||
|
||||
GET_EARLIEST_OFFSETS = -2
|
||||
earliest_dict = PreHourlyProcessor.\
|
||||
_get_offsets_from_kafka(brokers, topic,
|
||||
GET_EARLIEST_OFFSETS)
|
||||
|
||||
for item in latest_dict:
|
||||
until_offset = latest_dict[item].offsets[0]
|
||||
from_offset = earliest_dict[item].offsets[0]
|
||||
partition = latest_dict[item].partition
|
||||
topic = latest_dict[item].topic
|
||||
offset_range_list.append(OffsetRange(topic,
|
||||
partition,
|
||||
from_offset,
|
||||
until_offset))
|
||||
|
||||
return offset_range_list
|
||||
|
||||
@staticmethod
|
||||
def _get_offset_range_list(brokers,
|
||||
topic,
|
||||
app_name,
|
||||
saved_offset_spec):
|
||||
"""get offset range from saved offset to latest.
|
||||
"""
|
||||
offset_range_list = []
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/
|
||||
# A+Guide+To+The+Kafka+Protocol#
|
||||
# AGuideToTheKafkaProtocol-OffsetRequest
|
||||
GET_LATEST_OFFSETS = -1
|
||||
latest_dict = PreHourlyProcessor.\
|
||||
_get_offsets_from_kafka(brokers, topic,
|
||||
GET_LATEST_OFFSETS)
|
||||
|
||||
GET_EARLIEST_OFFSETS = -2
|
||||
earliest_dict = PreHourlyProcessor.\
|
||||
_get_offsets_from_kafka(brokers, topic,
|
||||
GET_EARLIEST_OFFSETS)
|
||||
|
||||
saved_dict = PreHourlyProcessor.\
|
||||
_parse_saved_offsets(app_name, topic, saved_offset_spec)
|
||||
|
||||
for item in latest_dict:
|
||||
# saved spec
|
||||
(spec_app_name,
|
||||
spec_topic_name,
|
||||
spec_partition,
|
||||
spec_from_offset,
|
||||
spec_until_offset) = saved_dict[item]
|
||||
|
||||
# until
|
||||
until_offset = latest_dict[item].offsets[0]
|
||||
|
||||
# from
|
||||
if (spec_until_offset is not None and int(spec_until_offset) >= 0):
|
||||
from_offset = spec_until_offset
|
||||
else:
|
||||
from_offset = earliest_dict[item].offsets[0]
|
||||
|
||||
partition = latest_dict[item].partition
|
||||
topic = latest_dict[item].topic
|
||||
offset_range_list.append(OffsetRange(topic,
|
||||
partition,
|
||||
from_offset,
|
||||
until_offset))
|
||||
|
||||
return offset_range_list
|
||||
|
||||
@staticmethod
|
||||
def get_processing_offset_range_list(processing_time):
|
||||
"""get offset range to fetch data from. The
|
||||
range will last from the last saved offsets to current offsets
|
||||
available. If there are no last saved offsets available in the
|
||||
database the starting offsets will be set to the earliest
|
||||
available in kafka.
|
||||
"""
|
||||
|
||||
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
|
||||
|
||||
# get application name, will be used to get offsets from database
|
||||
app_name = PreHourlyProcessor.get_app_name()
|
||||
|
||||
saved_offset_spec = offset_specifications.get_kafka_offsets(app_name)
|
||||
|
||||
# get kafka topic to fetch data
|
||||
topic = PreHourlyProcessor.get_kafka_topic()
|
||||
|
||||
offset_range_list = []
|
||||
if len(saved_offset_spec) < 1:
|
||||
|
||||
PreHourlyProcessor.log_debug(
|
||||
"No saved offsets available..."
|
||||
"connecting to kafka and fetching "
|
||||
"from earliest available offset ...")
|
||||
|
||||
offset_range_list = PreHourlyProcessor._get_new_offset_range_list(
|
||||
cfg.CONF.messaging.brokers,
|
||||
topic)
|
||||
else:
|
||||
PreHourlyProcessor.log_debug(
|
||||
"Saved offsets available..."
|
||||
"connecting to kafka and fetching from saved offset ...")
|
||||
|
||||
offset_range_list = PreHourlyProcessor._get_offset_range_list(
|
||||
cfg.CONF.messaging.brokers,
|
||||
topic,
|
||||
app_name,
|
||||
saved_offset_spec)
|
||||
return offset_range_list
|
||||
|
||||
@staticmethod
|
||||
def fetch_pre_hourly_data(spark_context,
|
||||
offset_range_list):
|
||||
"""get metrics pre hourly data from offset range list."""
|
||||
|
||||
# get kafka stream over the same offsets
|
||||
pre_hourly_rdd = KafkaUtils.createRDD(spark_context,
|
||||
{"metadata.broker.list":
|
||||
cfg.CONF.messaging.brokers},
|
||||
offset_range_list)
|
||||
return pre_hourly_rdd
|
||||
|
||||
@staticmethod
|
||||
def pre_hourly_to_instance_usage_df(pre_hourly_rdd):
|
||||
"""convert raw pre hourly data into instance usage dataframe."""
|
||||
#
|
||||
# extract second column containing instance usage data
|
||||
#
|
||||
instance_usage_rdd = pre_hourly_rdd.map(
|
||||
lambda iud: iud[1])
|
||||
|
||||
#
|
||||
# convert usage data rdd to instance usage df
|
||||
#
|
||||
sqlc = SQLContext.getOrCreate(pre_hourly_rdd.context)
|
||||
instance_usage_df = \
|
||||
InstanceUsageUtils.create_df_from_json_rdd(
|
||||
sqlc,
|
||||
instance_usage_rdd)
|
||||
|
||||
return instance_usage_df
|
||||
|
||||
@staticmethod
|
||||
def process_instance_usage(transform_context, instance_usage_df):
|
||||
"""second stage aggregation. Aggregate instance usage rdd
|
||||
data and write results to metrics topic in kafka.
|
||||
"""
|
||||
|
||||
transform_spec_df = transform_context.transform_spec_df_info
|
||||
|
||||
#
|
||||
# do a rollup operation
|
||||
#
|
||||
agg_params = transform_spec_df.select(
|
||||
"aggregation_params_map.pre_hourly_group_by_list")\
|
||||
.collect()[0].asDict()
|
||||
pre_hourly_group_by_list = agg_params["pre_hourly_group_by_list"]
|
||||
|
||||
if (len(pre_hourly_group_by_list) == 1 and
|
||||
pre_hourly_group_by_list[0] == "default"):
|
||||
pre_hourly_group_by_list = ["tenant_id", "user_id",
|
||||
"resource_uuid",
|
||||
"geolocation", "region", "zone",
|
||||
"host", "project_id",
|
||||
"aggregated_metric_name",
|
||||
"aggregation_period"]
|
||||
|
||||
# get aggregation period
|
||||
agg_params = transform_spec_df.select(
|
||||
"aggregation_params_map.aggregation_period").collect()[0].asDict()
|
||||
aggregation_period = agg_params["aggregation_period"]
|
||||
|
||||
# get 2stage operation
|
||||
agg_params = transform_spec_df.select(
|
||||
"aggregation_params_map.pre_hourly_operation")\
|
||||
.collect()[0].asDict()
|
||||
pre_hourly_operation = agg_params["pre_hourly_operation"]
|
||||
|
||||
instance_usage_df = \
|
||||
RollupQuantity.do_rollup(pre_hourly_group_by_list,
|
||||
aggregation_period,
|
||||
pre_hourly_operation,
|
||||
instance_usage_df)
|
||||
# insert metrics
|
||||
instance_usage_df = KafkaInsert.insert(transform_context,
|
||||
instance_usage_df)
|
||||
return instance_usage_df
|
||||
|
||||
@staticmethod
|
||||
def do_transform(instance_usage_df):
|
||||
"""start processing (aggregating) metrics
|
||||
"""
|
||||
#
|
||||
# look in instance_usage_df for list of metrics to be processed
|
||||
#
|
||||
metric_ids_df = instance_usage_df.select(
|
||||
"processing_meta.metric_id").distinct()
|
||||
metric_ids_to_process = [row.metric_id
|
||||
for row in metric_ids_df.collect()]
|
||||
|
||||
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
|
||||
get_data_driven_specs_repo()
|
||||
sqlc = SQLContext.getOrCreate(instance_usage_df.rdd.context)
|
||||
transform_specs_df = data_driven_specs_repo.get_data_driven_specs(
|
||||
sql_context=sqlc,
|
||||
data_driven_spec_type=DataDrivenSpecsRepo.transform_specs_type)
|
||||
|
||||
for metric_id in metric_ids_to_process:
|
||||
transform_spec_df = transform_specs_df.select(
|
||||
["aggregation_params_map", "metric_id"]
|
||||
).where(transform_specs_df.metric_id == metric_id)
|
||||
source_instance_usage_df = instance_usage_df.select("*").where(
|
||||
instance_usage_df.processing_meta.metric_id == metric_id)
|
||||
|
||||
# set transform_spec_df in TransformContext
|
||||
transform_context = \
|
||||
TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
|
||||
PreHourlyProcessor.process_instance_usage(
|
||||
transform_context, source_instance_usage_df)
|
||||
|
||||
@staticmethod
|
||||
def run_processor(spark_context, processing_time):
|
||||
"""process data in metrics_pre_hourly queue, starting
|
||||
from the last saved offsets, else start from earliest
|
||||
offsets available
|
||||
"""
|
||||
|
||||
offset_range_list = \
|
||||
PreHourlyProcessor.get_processing_offset_range_list(
|
||||
processing_time)
|
||||
|
||||
# get pre hourly data
|
||||
pre_hourly_rdd = PreHourlyProcessor.fetch_pre_hourly_data(
|
||||
spark_context, offset_range_list)
|
||||
|
||||
# get instance usage df
|
||||
instance_usage_df = PreHourlyProcessor.pre_hourly_to_instance_usage_df(
|
||||
pre_hourly_rdd)
|
||||
|
||||
#
|
||||
# cache instance usage df
|
||||
#
|
||||
if cfg.CONF.pre_hourly_processor.enable_instance_usage_df_cache:
|
||||
storage_level_prop = \
|
||||
cfg.CONF.pre_hourly_processor\
|
||||
.instance_usage_df_cache_storage_level
|
||||
storage_level = StorageUtils.get_storage_level(
|
||||
storage_level_prop)
|
||||
instance_usage_df.persist(storage_level)
|
||||
|
||||
# aggregate pre hourly data
|
||||
PreHourlyProcessor.do_transform(instance_usage_df)
|
||||
|
||||
# remove cache
|
||||
if cfg.CONF.pre_hourly_processor.enable_instance_usage_df_cache:
|
||||
instance_usage_df.unpersist()
|
||||
|
||||
# save latest metrics_pre_hourly offsets in the database
|
||||
PreHourlyProcessor.save_kafka_offsets(offset_range_list,
|
||||
processing_time)
|
|
@ -18,7 +18,8 @@ from collections import namedtuple
|
|||
TransformContextBase = namedtuple("TransformContext",
|
||||
["config_info",
|
||||
"offset_info",
|
||||
"transform_spec_df_info"])
|
||||
"transform_spec_df_info",
|
||||
"batch_time_info"])
|
||||
|
||||
|
||||
class TransformContext(TransformContextBase):
|
||||
|
@ -32,6 +33,7 @@ class TransformContext(TransformContextBase):
|
|||
offset_info - current kafka offset information
|
||||
transform_spec_df - processing information from
|
||||
transform_spec aggregation driver table
|
||||
batch_datetime_info - current batch processing datetime
|
||||
"""
|
||||
|
||||
RddTransformContextBase = namedtuple("RddTransformContext",
|
||||
|
@ -56,12 +58,14 @@ class TransformContextUtils(object):
|
|||
def get_context(transform_context_info=None,
|
||||
config_info=None,
|
||||
offset_info=None,
|
||||
transform_spec_df_info=None):
|
||||
transform_spec_df_info=None,
|
||||
batch_time_info=None):
|
||||
|
||||
if transform_context_info is None:
|
||||
return TransformContext(config_info,
|
||||
offset_info,
|
||||
transform_spec_df_info)
|
||||
transform_spec_df_info,
|
||||
batch_time_info)
|
||||
else:
|
||||
if config_info is None or config_info == "":
|
||||
# get from passed in transform_context
|
||||
|
@ -77,6 +81,13 @@ class TransformContextUtils(object):
|
|||
transform_spec_df_info = \
|
||||
transform_context_info.transform_spec_df_info
|
||||
|
||||
if batch_time_info is None or \
|
||||
batch_time_info == "":
|
||||
# get from passed in transform_context
|
||||
batch_time_info = \
|
||||
transform_context_info.batch_time_info
|
||||
|
||||
return TransformContext(config_info,
|
||||
offset_info,
|
||||
transform_spec_df_info)
|
||||
transform_spec_df_info,
|
||||
batch_time_info)
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pyspark import StorageLevel
|
||||
|
||||
|
||||
class StorageUtils(object):
|
||||
"""storage util functions"""
|
||||
|
||||
@staticmethod
|
||||
def get_storage_level(storage_level_str):
|
||||
"""get pyspark storage level from storage level
|
||||
string
|
||||
"""
|
||||
if (storage_level_str == "DISK_ONLY"):
|
||||
return StorageLevel.DISK_ONLY
|
||||
elif (storage_level_str == "DISK_ONLY_2"):
|
||||
return StorageLevel.DISK_ONLY_2
|
||||
elif (storage_level_str == "MEMORY_AND_DISK"):
|
||||
return StorageLevel.MEMORY_AND_DISK
|
||||
elif (storage_level_str == "MEMORY_AND_DISK_2"):
|
||||
return StorageLevel.MEMORY_AND_DISK_2
|
||||
elif (storage_level_str == "MEMORY_AND_DISK_SER"):
|
||||
return StorageLevel.MEMORY_AND_DISK_SER
|
||||
elif (storage_level_str == "MEMORY_AND_DISK_SER_2"):
|
||||
return StorageLevel.MEMORY_AND_DISK_SER_2
|
||||
elif (storage_level_str == "MEMORY_ONLY"):
|
||||
return StorageLevel.MEMORY_ONLY
|
||||
elif (storage_level_str == "MEMORY_ONLY_2"):
|
||||
return StorageLevel.MEMORY_ONLY_2
|
||||
elif (storage_level_str == "MEMORY_ONLY_SER"):
|
||||
return StorageLevel.MEMORY_ONLY_SER
|
||||
elif (storage_level_str == "MEMORY_ONLY_SER_2"):
|
||||
return StorageLevel.MEMORY_ONLY_SER_2
|
||||
elif (storage_level_str == "OFF_HEAP"):
|
||||
return StorageLevel.OFF_HEAP
|
||||
else:
|
||||
return StorageLevel.MEMORY_ONLY
|
|
@ -15,7 +15,6 @@
|
|||
import logging
|
||||
|
||||
from pyspark.sql import SQLContext
|
||||
|
||||
from pyspark.sql.types import ArrayType
|
||||
from pyspark.sql.types import DoubleType
|
||||
from pyspark.sql.types import MapType
|
||||
|
@ -178,6 +177,12 @@ class TransformSpecsUtils(TransformUtils):
|
|||
StringType(), True),
|
||||
StructField("aggregated_metric_name",
|
||||
StringType(), True),
|
||||
StructField("pre_hourly_group_by_list",
|
||||
ArrayType(StringType(),
|
||||
containsNull=False),
|
||||
True),
|
||||
StructField("pre_hourly_operation",
|
||||
StringType(), True),
|
||||
StructField("aggregation_pipeline",
|
||||
StructType([source, usage,
|
||||
setters, insert]),
|
||||
|
|
|
@ -32,6 +32,7 @@ monasca_transform.setter =
|
|||
monasca_transform.insert =
|
||||
prepare_data = monasca_transform.component.insert.prepare_data:PrepareData
|
||||
insert_data = monasca_transform.component.insert.kafka_insert:KafkaInsert
|
||||
insert_data_pre_hourly = monasca_transform.component.insert.kafka_insert_pre_hourly:KafkaInsertPreHourly
|
||||
|
||||
[global]
|
||||
setup-hooks =
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from pyspark.sql import SQLContext
|
||||
|
||||
|
@ -67,7 +66,8 @@ class TransformBuilderTest(SparkContextTest):
|
|||
sql_context, metric_proc_json_path)
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
transform_spec_df_info=transform_spec_df,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
# invoke the generic transformation builder
|
||||
instance_usage_df = GenericTransformBuilder.do_transform(
|
||||
|
|
|
@ -11,23 +11,21 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from mock import call
|
||||
from mock import MagicMock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
import mock
|
||||
from mock import call
|
||||
from mock import MagicMock
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.driver.mon_metrics_kafka \
|
||||
import MonMetricsKafkaProcessor
|
||||
from monasca_transform.messaging.adapter import MessageAdapter
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
from tests.unit.spark_context_test import SparkContextTest
|
||||
from tests.unit.test_resources.kafka_data.data_provider import DataProvider
|
||||
from tests.unit.test_resources.mock_component_manager \
|
||||
|
@ -67,9 +65,9 @@ class SparkTest(SparkContextTest):
|
|||
'tests/unit/test_resources/config/'
|
||||
'test_config_with_dummy_messaging_adapter.conf'])
|
||||
# reset metric_id list dummy adapter
|
||||
if not MessageAdapter.adapter_impl:
|
||||
MessageAdapter.init()
|
||||
MessageAdapter.adapter_impl.metric_list = []
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
|
@ -106,7 +104,9 @@ class SparkTest(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -121,7 +121,7 @@ class SparkTest(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
# Verify mem.total_mb_agg metrics
|
||||
total_mb_agg_metric = [
|
||||
|
|
|
@ -11,13 +11,27 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca_transform.messaging.adapter import MessageAdapter
|
||||
import simport
|
||||
|
||||
|
||||
class DummyAdapter(MessageAdapter):
|
||||
|
||||
adapter_impl = None
|
||||
|
||||
metric_list = []
|
||||
|
||||
@staticmethod
|
||||
def init():
|
||||
# object to keep track of offsets
|
||||
DummyAdapter.adapter_impl = simport.load(
|
||||
"tests.unit.messaging.adapter:DummyAdapter")()
|
||||
|
||||
def do_send_metric(self, metric):
|
||||
self.metric_list.append(metric)
|
||||
|
||||
@staticmethod
|
||||
def send_metric(metric):
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.do_send_metric(metric)
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
from monasca_transform.component.insert.dummy_insert import DummyInsert
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
|
||||
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
from tests.unit.spark_context_test import SparkContextTest
|
||||
from tests.unit.test_resources.metrics_pre_hourly_data.data_provider \
|
||||
import DataProvider
|
||||
|
||||
|
||||
class TestPreHourlyProcessorAgg(SparkContextTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPreHourlyProcessorAgg, self).setUp()
|
||||
# configure the system with a dummy messaging adapter
|
||||
ConfigInitializer.basic_config(
|
||||
default_config_files=[
|
||||
'tests/unit/test_resources/config/'
|
||||
'test_config_with_dummy_messaging_adapter.conf'])
|
||||
# reset metric_id list dummy adapter
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.fetch_pre_hourly_data')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_processing_offset_range_list')
|
||||
def test_pre_hourly_processor(self,
|
||||
offset_range_list,
|
||||
pre_hourly_data):
|
||||
|
||||
# load components
|
||||
myOffsetRanges = [
|
||||
OffsetRange("metrics_pre_hourly", 1, 10, 20)]
|
||||
offset_range_list.return_value = myOffsetRanges
|
||||
|
||||
# Create an RDD out of the mocked instance usage data
|
||||
with open(DataProvider.metrics_pre_hourly_data_path) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
|
||||
pre_hourly_rdd_data = self.spark_context.parallelize(raw_tuple_list)
|
||||
pre_hourly_data.return_value = pre_hourly_rdd_data
|
||||
|
||||
# Do something simple with the RDD
|
||||
result = self.simple_count_transform(pre_hourly_rdd_data)
|
||||
|
||||
# run pre hourly processor
|
||||
PreHourlyProcessor.run_processor(
|
||||
self.spark_context, self.get_dummy_batch_time())
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
# Verify count of instance usage data
|
||||
self.assertEqual(result, 6)
|
||||
|
||||
# check aggregation result
|
||||
mem_total_mb_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'mem.total_mb_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'all'][0]
|
||||
self.assertTrue(mem_total_mb_agg_metric is not None)
|
||||
self.assertEqual(16049.0,
|
||||
mem_total_mb_agg_metric
|
||||
.get('metric').get('value'))
|
||||
# agg meta
|
||||
self.assertEqual("2016-06-20 11:49:44",
|
||||
mem_total_mb_agg_metric
|
||||
.get("metric")
|
||||
.get('value_meta').get('lastrecord_timestamp'))
|
||||
self.assertEqual("2016-06-20 11:24:59",
|
||||
mem_total_mb_agg_metric
|
||||
.get("metric")
|
||||
.get('value_meta').get('firstrecord_timestamp'))
|
||||
self.assertEqual(60.0,
|
||||
mem_total_mb_agg_metric
|
||||
.get("metric")
|
||||
.get('value_meta').get('record_count'))
|
||||
|
||||
mem_usable_mb_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'mem.usable_mb_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'all'][0]
|
||||
self.assertTrue(mem_usable_mb_agg_metric is not None)
|
||||
self.assertEqual(10283.1,
|
||||
mem_usable_mb_agg_metric
|
||||
.get('metric').get('value'))
|
||||
# agg meta
|
||||
self.assertEqual("2016-06-20 11:49:44",
|
||||
mem_usable_mb_agg_metric
|
||||
.get("metric")
|
||||
.get('value_meta').get('lastrecord_timestamp'))
|
||||
self.assertEqual("2016-06-20 11:24:59",
|
||||
mem_usable_mb_agg_metric
|
||||
.get("metric")
|
||||
.get('value_meta').get('firstrecord_timestamp'))
|
||||
self.assertEqual(60.0,
|
||||
mem_usable_mb_agg_metric
|
||||
.get("metric")
|
||||
.get('value_meta').get('record_count'))
|
||||
|
||||
def simple_count_transform(self, rdd):
|
||||
return rdd.count()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("PATH *************************************************************")
|
||||
import sys
|
||||
print(sys.path)
|
||||
print("PATH==============================================================")
|
||||
unittest.main()
|
|
@ -11,7 +11,6 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pyspark.sql import SQLContext
|
||||
|
||||
from monasca_transform.transform.transform_utils import RecordStoreUtils
|
||||
|
@ -44,9 +43,9 @@ class SetAggregatedMetricNameTest(SparkContextTest):
|
|||
self.sql_context,
|
||||
DataProvider.transform_spec_path)
|
||||
|
||||
transform_context = \
|
||||
TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
instance_usage_df = FetchQuantity.usage(
|
||||
transform_context, record_store_df)
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pyspark.sql import SQLContext
|
||||
|
||||
from monasca_transform.transform.transform_utils import RecordStoreUtils
|
||||
|
@ -40,9 +39,9 @@ class UsageComponentTest(SparkContextTest):
|
|||
transform_spec_df = TransformSpecsUtils.create_df_from_json(
|
||||
self.sql_context, DataProvider.transform_spec_path)
|
||||
|
||||
transform_context = \
|
||||
TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
instance_usage_df = FetchQuantity.usage(
|
||||
transform_context, record_store_df)
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
from pyspark.context import SparkContext
|
||||
from pyspark import SparkConf
|
||||
|
||||
import datetime
|
||||
import unittest
|
||||
|
||||
|
||||
|
@ -21,11 +23,24 @@ class SparkContextTest(unittest.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
# Create a local Spark context with 4 cores
|
||||
spark_conf = SparkConf().setMaster('local[4]')
|
||||
spark_conf = SparkConf().setMaster('local[4]').\
|
||||
setAppName("monasca-transform unit tests").\
|
||||
set("spark.sql.shuffle.partitions", "10")
|
||||
self.spark_context = SparkContext.getOrCreate(conf=spark_conf)
|
||||
|
||||
# quiet logging
|
||||
logger = self.spark_context._jvm.org.apache.log4j
|
||||
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
|
||||
logger.LogManager.getLogger("akka").setLevel(logger.Level.WARN)
|
||||
|
||||
def tearDown(self):
|
||||
# we don't stop the spark context because it doesn't work cleanly,
|
||||
# a context is left behind that cannot work. Instead we rely on the
|
||||
# context to be shutdown at the end of the tests run
|
||||
pass
|
||||
|
||||
def get_dummy_batch_time(self):
|
||||
"""get a batch time for all tests."""
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:00:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
return my_batch_time
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
|
@ -33,11 +33,18 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def get_dummy_batch_time(self):
|
||||
"""get a batch time for all tests."""
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:00:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
return my_batch_time
|
||||
|
||||
def test_read_offsets_on_start(self):
|
||||
json_offset_specs = JSONOffsetSpecs(
|
||||
path=self.test_resources_path,
|
||||
filename='test_read_offsets_on_start.json')
|
||||
kafka_offsets = json_offset_specs.get_kafka_offsets()
|
||||
app_name = "mon_metrics_kafka"
|
||||
kafka_offsets = json_offset_specs.get_kafka_offsets(app_name)
|
||||
self.assertEqual(1, len(kafka_offsets))
|
||||
offset_key_0 = kafka_offsets.iterkeys().next()
|
||||
self.assertEqual('mon_metrics_kafka_metrics_0', offset_key_0)
|
||||
|
@ -45,7 +52,7 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
self.assertEqual('metrics', offset_value_0.get_topic())
|
||||
self.assertEqual(85081, offset_value_0.get_until_offset())
|
||||
self.assertEqual(0, offset_value_0.get_partition())
|
||||
self.assertEqual('mon_metrics_kafka', offset_value_0.get_app_name())
|
||||
self.assertEqual(app_name, offset_value_0.get_app_name())
|
||||
self.assertEqual(84790, offset_value_0.get_from_offset())
|
||||
|
||||
def test_write_offsets_each_add(self):
|
||||
|
@ -60,11 +67,13 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
until_offset_1 = random.randint(0, sys.maxsize)
|
||||
from_offset_1 = random.randint(0, sys.maxsize)
|
||||
app_name_1 = str(uuid.uuid4())
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
used_values = {}
|
||||
json_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1, from_offset=from_offset_1,
|
||||
until_offset=until_offset_1)
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
kafka_offset_dict = self.load_offset_file_as_json(file_path)
|
||||
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
|
||||
|
@ -83,7 +92,8 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
app_name_2 = str(uuid.uuid4())
|
||||
json_offset_specs.add(topic=topic_2, partition=partition_2,
|
||||
app_name=app_name_2, from_offset=from_offset_2,
|
||||
until_offset=until_offset_2)
|
||||
until_offset=until_offset_2,
|
||||
batch_time_info=my_batch_time)
|
||||
offset_key_2 = "%s_%s_%s" % (app_name_2, topic_2, partition_2)
|
||||
used_values[offset_key_2] = {
|
||||
"topic": topic_2, "partition": partition_2, "app_name": app_name_2,
|
||||
|
@ -127,10 +137,13 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
until_offset_1 = random.randint(0, sys.maxsize)
|
||||
from_offset_1 = random.randint(0, sys.maxsize)
|
||||
app_name_1 = str(uuid.uuid4())
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
used_values = {}
|
||||
json_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1, from_offset=from_offset_1,
|
||||
until_offset=until_offset_1)
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
|
||||
used_values[offset_key_1] = {
|
||||
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
|
||||
|
@ -143,7 +156,8 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
app_name_2 = str(uuid.uuid4())
|
||||
json_offset_specs.add(topic=topic_2, partition=partition_2,
|
||||
app_name=app_name_2, from_offset=from_offset_2,
|
||||
until_offset=until_offset_2)
|
||||
until_offset=until_offset_2,
|
||||
batch_time_info=my_batch_time)
|
||||
offset_key_2 = "%s_%s_%s" % (app_name_2, topic_2, partition_2)
|
||||
used_values[offset_key_2] = {
|
||||
"topic": topic_2, "partition": partition_2, "app_name": app_name_2,
|
||||
|
@ -152,12 +166,20 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
# now create a new JSONOffsetSpecs
|
||||
json_offset_specs_2 = JSONOffsetSpecs(self.test_resources_path,
|
||||
filename)
|
||||
found_offsets = json_offset_specs_2.get_kafka_offsets()
|
||||
found_offsets = json_offset_specs_2.get_kafka_offsets(app_name_2)
|
||||
json_found_offsets = {key: JSONOffsetSpecs.as_dict(value)
|
||||
for key, value in found_offsets.items()}
|
||||
for key, value in used_values.items():
|
||||
found_value = json_found_offsets.get(key)
|
||||
self.assertEqual(value, found_value)
|
||||
self.assertEqual(value.get("app_name"),
|
||||
found_value.get("app_name"))
|
||||
self.assertEqual(value.get("topic"), found_value.get("topic"))
|
||||
self.assertEqual(value.get("partition"),
|
||||
found_value.get("partition"))
|
||||
self.assertEqual(value.get("from_offset"),
|
||||
found_value.get("from_offset"))
|
||||
self.assertEqual(value.get("until_offset"),
|
||||
found_value.get("until_offset"))
|
||||
|
||||
os.remove(os.path.join(self.test_resources_path, filename))
|
||||
|
||||
|
@ -173,11 +195,13 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
until_offset_1 = random.randint(0, sys.maxsize)
|
||||
from_offset_1 = random.randint(0, sys.maxsize)
|
||||
app_name_1 = str(uuid.uuid4())
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
used_values = {}
|
||||
json_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1, from_offset=from_offset_1,
|
||||
until_offset=until_offset_1)
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
kafka_offset_dict = self.load_offset_file_as_json(file_path)
|
||||
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
|
||||
|
@ -199,7 +223,8 @@ class TestJSONOffsetSpecs(unittest.TestCase):
|
|||
|
||||
json_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1, from_offset=from_offset_2,
|
||||
until_offset=until_offset_2)
|
||||
until_offset=until_offset_2,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
kafka_offset_dict = self.load_offset_file_as_json(file_path)
|
||||
offset_value_updated = kafka_offset_dict.get(offset_key_1)
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import random
|
||||
import sys
|
||||
import unittest
|
||||
|
@ -30,6 +31,12 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
|
|||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def get_dummy_batch_time(self):
|
||||
"""get a batch time for all tests."""
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:00:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
return my_batch_time
|
||||
|
||||
def test_add_offset(self):
|
||||
|
||||
topic_1 = str(uuid.uuid4())
|
||||
|
@ -39,47 +46,53 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
|
|||
app_name_1 = str(uuid.uuid4())
|
||||
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
|
||||
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
used_values = {}
|
||||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_1,
|
||||
until_offset=until_offset_1)
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
used_values[offset_key_1] = {
|
||||
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
|
||||
"from_offset": from_offset_1, "until_offset": until_offset_1
|
||||
}
|
||||
|
||||
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
|
||||
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
|
||||
app_name_1)
|
||||
offset_value_1 = kafka_offset_specs.get(offset_key_1)
|
||||
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
|
||||
offset_value=offset_value_1)
|
||||
|
||||
def test_add_another_offset(self):
|
||||
offset_specs_at_outset = self.kafka_offset_specs.get_kafka_offsets()
|
||||
offset_count = len(offset_specs_at_outset)
|
||||
topic_1 = str(uuid.uuid4())
|
||||
partition_1 = random.randint(0, 1024)
|
||||
until_offset_1 = random.randint(0, sys.maxsize)
|
||||
from_offset_1 = random.randint(0, sys.maxsize)
|
||||
app_name_1 = str(uuid.uuid4())
|
||||
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
used_values = {}
|
||||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_1,
|
||||
until_offset=until_offset_1)
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
used_values[offset_key_1] = {
|
||||
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
|
||||
"from_offset": from_offset_1, "until_offset": until_offset_1
|
||||
}
|
||||
|
||||
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
|
||||
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
|
||||
app_name_1)
|
||||
offset_value_1 = kafka_offset_specs.get(offset_key_1)
|
||||
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
|
||||
offset_value=offset_value_1)
|
||||
self.assertEqual(offset_count + 1,
|
||||
len(self.kafka_offset_specs.get_kafka_offsets()))
|
||||
self.assertEqual(1,
|
||||
len(self.kafka_offset_specs.get_kafka_offsets(
|
||||
app_name_1)))
|
||||
|
||||
def test_update_offset_values(self):
|
||||
topic_1 = str(uuid.uuid4())
|
||||
|
@ -89,10 +102,13 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
|
|||
app_name_1 = str(uuid.uuid4())
|
||||
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
|
||||
|
||||
my_batch_time = self.get_dummy_batch_time()
|
||||
|
||||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_1,
|
||||
until_offset=until_offset_1)
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
until_offset_2 = random.randint(0, sys.maxsize)
|
||||
while until_offset_2 == until_offset_1:
|
||||
|
@ -105,9 +121,11 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
|
|||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_2,
|
||||
until_offset=until_offset_2)
|
||||
until_offset=until_offset_2,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
|
||||
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
|
||||
app_name_1)
|
||||
updated_offset_value = kafka_offset_specs.get(offset_key_1)
|
||||
self.assertEqual(from_offset_2, updated_offset_value.get_from_offset())
|
||||
self.assertEqual(until_offset_2,
|
||||
|
|
|
@ -10,3 +10,14 @@ host = test_host_name
|
|||
database_name = test_database_name
|
||||
username = test_database_user_name
|
||||
password = test_database_password
|
||||
|
||||
[stage_processors]
|
||||
enable_pre_hourly_processor = False
|
||||
|
||||
[pre_hourly_processor]
|
||||
enable_instance_usage_df_cache = False
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
|
@ -1,4 +1,15 @@
|
|||
[DEFAULTS]
|
||||
|
||||
[messaging]
|
||||
adapter = tests.unit.messaging.adapter:DummyAdapter
|
||||
adapter = tests.unit.messaging.adapter:DummyAdapter
|
||||
|
||||
[stage_processors]
|
||||
enable_pre_hourly_processor = False
|
||||
|
||||
[pre_hourly_processor]
|
||||
enable_instance_usage_df_cache = False
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
|
@ -0,0 +1,23 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
|
||||
class DataProvider(object):
|
||||
|
||||
_resource_path = 'tests/unit/test_resources/metrics_pre_hourly_data/'
|
||||
|
||||
metrics_pre_hourly_data_path = os.path.join(_resource_path,
|
||||
"metrics_pre_hourly_data.txt")
|
|
@ -0,0 +1,6 @@
|
|||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.total_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_total_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":16049.0})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.total_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_total_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":16049.0})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.total_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_total_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":16049.0})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":10283.1})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":10283.1})
|
||||
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":10283.1})
|
|
@ -92,4 +92,9 @@ class MockComponentManager(object):
|
|||
'DummyInsert',
|
||||
DummyInsert(),
|
||||
None),
|
||||
Extension('insert_data_pre_hourly',
|
||||
'monasca_transform.component.insert.dummy_insert:'
|
||||
'DummyInsert',
|
||||
DummyInsert(),
|
||||
None),
|
||||
])
|
||||
|
|
|
@ -11,23 +11,20 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
import json
|
||||
import mock
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.driver.mon_metrics_kafka \
|
||||
import MonMetricsKafkaProcessor
|
||||
|
||||
from monasca_transform.messaging.adapter import MessageAdapter
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
from tests.unit.spark_context_test import SparkContextTest
|
||||
from tests.unit.test_resources.fetch_quantity_data.data_provider \
|
||||
import DataProvider
|
||||
|
@ -47,9 +44,9 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
'tests/unit/test_resources/config/'
|
||||
'test_config_with_dummy_messaging_adapter.conf'])
|
||||
# reset metric_id list dummy adapter
|
||||
if not MessageAdapter.adapter_impl:
|
||||
MessageAdapter.init()
|
||||
MessageAdapter.adapter_impl.metric_list = []
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
def get_pre_transform_specs_json(self):
|
||||
"""get pre_transform_specs driver table info."""
|
||||
|
@ -141,7 +138,9 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -150,7 +149,7 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
mem_total_mb_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -247,7 +246,9 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -256,7 +257,7 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
mem_total_mb_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -352,7 +353,8 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -361,7 +363,7 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
mem_total_mb_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -457,7 +459,9 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -466,7 +470,7 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
mem_total_mb_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -562,7 +566,9 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -571,7 +577,7 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
mem_total_mb_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -667,7 +673,9 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -676,7 +684,7 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
mem_total_mb_agg_metric = [
|
||||
value for value in metrics
|
||||
|
|
|
@ -11,14 +11,13 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
import json
|
||||
import mock
|
||||
from monasca_transform.component.usage.fetch_quantity_util import \
|
||||
FetchQuantityUtilException
|
||||
|
||||
|
@ -26,10 +25,10 @@ from monasca_transform.config.config_initializer import ConfigInitializer
|
|||
from monasca_transform.driver.mon_metrics_kafka \
|
||||
import MonMetricsKafkaProcessor
|
||||
|
||||
from monasca_transform.messaging.adapter import MessageAdapter
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
from tests.unit.spark_context_test import SparkContextTest
|
||||
from tests.unit.test_resources.cpu_kafka_data.data_provider import DataProvider
|
||||
from tests.unit.test_resources.mock_component_manager \
|
||||
|
@ -48,9 +47,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
'tests/unit/test_resources/config/'
|
||||
'test_config_with_dummy_messaging_adapter.conf'])
|
||||
# reset metric_id list dummy adapter
|
||||
if not MessageAdapter.adapter_impl:
|
||||
MessageAdapter.init()
|
||||
MessageAdapter.adapter_impl.metric_list = []
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
def get_pre_transform_specs_json(self):
|
||||
"""get pre_transform_specs driver table info."""
|
||||
|
@ -164,7 +163,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -173,7 +174,7 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -265,7 +266,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -274,7 +277,7 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -366,7 +369,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -375,7 +380,7 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -467,7 +472,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -529,7 +536,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -591,7 +600,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
|
|
@ -11,21 +11,19 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
import mock
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.driver.mon_metrics_kafka \
|
||||
import MonMetricsKafkaProcessor
|
||||
from monasca_transform.messaging.adapter import MessageAdapter
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
from tests.unit.spark_context_test import SparkContextTest
|
||||
from tests.unit.test_resources.cpu_kafka_data.data_provider import DataProvider
|
||||
from tests.unit.test_resources.mock_component_manager \
|
||||
|
@ -42,9 +40,9 @@ class SparkTest(SparkContextTest):
|
|||
'tests/unit/test_resources/config/'
|
||||
'test_config_with_dummy_messaging_adapter.conf'])
|
||||
# reset metric_id list dummy adapter
|
||||
if not MessageAdapter.adapter_impl:
|
||||
MessageAdapter.init()
|
||||
MessageAdapter.adapter_impl.metric_list = []
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
|
@ -81,7 +79,9 @@ class SparkTest(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -90,7 +90,7 @@ class SparkTest(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
# Verify cpu.total_logical_cores_agg for all hosts
|
||||
total_cpu_logical_agg_metric = [
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pyspark.sql import SQLContext
|
||||
|
||||
from monasca_transform.component.usage.fetch_quantity \
|
||||
|
@ -40,9 +39,9 @@ class UsageComponentTest(SparkContextTest):
|
|||
self.sql_context,
|
||||
DataProvider.transform_spec_path)
|
||||
|
||||
transform_context = \
|
||||
TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
instance_usage_df = FetchQuantity.usage(
|
||||
transform_context, record_store_df)
|
||||
|
|
|
@ -11,23 +11,21 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
import json
|
||||
import mock
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.driver.mon_metrics_kafka \
|
||||
import MonMetricsKafkaProcessor
|
||||
|
||||
from monasca_transform.messaging.adapter import MessageAdapter
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
from tests.unit.spark_context_test import SparkContextTest
|
||||
from tests.unit.test_resources.kafka_data.data_provider import DataProvider
|
||||
from tests.unit.test_resources.mock_component_manager \
|
||||
|
@ -46,9 +44,9 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
|
|||
'tests/unit/test_resources/config/'
|
||||
'test_config_with_dummy_messaging_adapter.conf'])
|
||||
# reset metric_id list dummy adapter
|
||||
if not MessageAdapter.adapter_impl:
|
||||
MessageAdapter.init()
|
||||
MessageAdapter.adapter_impl.metric_list = []
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
def get_pre_transform_specs_json_by_project(self):
|
||||
"""get pre_transform_specs driver table info."""
|
||||
|
@ -134,7 +132,9 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -143,7 +143,7 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
vcpus_agg_metric = [
|
||||
value for value in metrics
|
||||
|
@ -307,7 +307,9 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
|
|||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges)
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
|
@ -316,7 +318,7 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
|
|||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = MessageAdapter.adapter_impl.metric_list
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
vcpus_agg_metric = [
|
||||
value for value in metrics
|
||||
|
|
|
@ -28,7 +28,6 @@ Vagrant.configure(2) do |config|
|
|||
# Create a private network, which allows host-only access to the machine
|
||||
# using a specific IP.
|
||||
config.vm.network "private_network", ip: "192.168.15.6"
|
||||
|
||||
# Create a public network, which generally matched to bridged network.
|
||||
# Bridged networks make the machine appear as another physical device on
|
||||
# your network.
|
||||
|
@ -62,7 +61,7 @@ Vagrant.configure(2) do |config|
|
|||
|
||||
# # Customize the amount of memory on the VM:
|
||||
vb.memory = "16384"
|
||||
vb.cpus = "2"
|
||||
vb.cpus = "4"
|
||||
end
|
||||
if ENV['http_proxy'] && !ENV['http_proxy'].empty?
|
||||
# we need to configure a proxy for maven also
|
||||
|
@ -70,6 +69,10 @@ Vagrant.configure(2) do |config|
|
|||
destination: "settings.xml"
|
||||
config.vm.provision "shell", path: "setup_maven_proxy.sh"
|
||||
end
|
||||
config.vm.network "forwarded_port", guest: 18080, host: 18080
|
||||
config.vm.network "forwarded_port", guest: 18081, host: 18081
|
||||
config.vm.network "forwarded_port", guest: 4040, host: 4040
|
||||
|
||||
# install dependencies for our process
|
||||
config.vm.provision "shell", path: "install.sh"
|
||||
# provision the environments
|
||||
|
|
|
@ -31,7 +31,7 @@ MONASCA_METRICS_DB=${MONASCA_METRICS_DB:-influxdb}
|
|||
# This line will enable all of Monasca.
|
||||
enable_plugin monasca-api /home/vagrant/monasca-api
|
||||
# using the above, patched monasca-api we can disable some unnecessary services
|
||||
disable_service monasca-persister
|
||||
#disable_service monasca-persister
|
||||
disable_service monasca-thresh
|
||||
disable_service monasca-notification
|
||||
disable_service horizon
|
||||
|
|
4
tox.ini
4
tox.ini
|
@ -31,6 +31,7 @@ commands =
|
|||
tests/unit/usage/test_fetch_quantity_agg.py \
|
||||
tests/unit/usage/test_fetch_quantity_util_agg.py \
|
||||
tests/unit/usage/test_host_cpu_usage_component.py \
|
||||
tests/unit/processor/test_pre_hourly_processor_agg.py \
|
||||
tests/unit/usage/test_usage_component.py \
|
||||
tests/unit/usage/test_vm_cpu_allocated_agg.py -e tests_to_fix
|
||||
|
||||
|
@ -48,6 +49,7 @@ max-complexity = 30
|
|||
# H302 import only modules
|
||||
# H904 Wrap long lines in parentheses instead of a backslash (DEPRECATED)
|
||||
# H405 Multiline docstring separated by empty line
|
||||
ignore = H302,H904,H405
|
||||
# E402 module level import not at top of file FIXME remove this
|
||||
ignore = H302,H904,H405,E402
|
||||
show-source = True
|
||||
exclude=.venv,.git,.tox,dist,*egg,build,tests_to_fix
|
||||
|
|
Loading…
Reference in New Issue