Two stage transformation

Breaking down the aggregation into two stages.

The first stage aggregates raw metrics frequently and is
implemented as a Spark Streaming job which
aggregates metrics at a configurable time interval
(defaults to 10 minutes) and writes the intermediate
aggregated data, or instance usage data
to new "metrics_pre_hourly" kafka topic.

The second stage is implemented
as a batch job using Spark Streaming createRDD
direct stream batch API, which is triggered by the
first stage only when first stage runs at the
top of the hour.

Also enhanced kafka offsets table to keep track
of offsets from two stages along with streaming
batch time, last time version row got updated
and revision number. By default it should keep
last 10 revisions to the offsets for each
application.

Change-Id: Ib2bf7df6b32ca27c89442a23283a89fea802d146
This commit is contained in:
Ashwin Agate 2016-06-03 02:40:43 +00:00
parent d8e73f3bde
commit 00b874a6b3
49 changed files with 1487 additions and 246 deletions

View File

@ -12,10 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.driver.mon_metrics_kafka import invoke
activate_this_file = "/opt/monasca/transform/venv/bin/activate_this.py"
execfile(activate_this_file, dict(__file__=activate_this_file))
from monasca_transform.driver.mon_metrics_kafka import invoke
invoke()

View File

@ -3,6 +3,7 @@
[repositories]
offsets = monasca_transform.mysql_offset_specs:MySQLOffsetSpecs
data_driven_specs = monasca_transform.data_driven_specs.mysql_data_driven_specs_repo:MySQLDataDrivenSpecsRepo
offsets_max_revisions = 10
[database]
server_type = mysql
@ -16,6 +17,15 @@ adapter = monasca_transform.messaging.adapter:KafkaMessageAdapter
topic = metrics
brokers=192.168.15.6:9092
publish_kafka_tenant_id = d2cb21079930415a9f2a33588b9f2bb6
adapter_pre_hourly = monasca_transform.messaging.adapter:KafkaMessageAdapterPreHourly
topic_pre_hourly = metrics_pre_hourly
[stage_processors]
pre_hourly_processor_enabled = True
[pre_hourly_processor]
enable_instance_usage_df_cache = True
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
#
# Configurable values for the monasca-transform service
@ -54,7 +64,13 @@ spark_home = /opt/spark/current
spark_python_files = /opt/monasca/transform/lib/monasca-transform.zip
# How often the stream should be read (in seconds)
stream_interval = 300
stream_interval = 600
# The working directory for monasca-transform
work_dir = /var/run/monasca/transform
# enable caching of record store df
enable_record_store_df_cache = True
# set spark storage level for record store df cache
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2

View File

@ -4,14 +4,19 @@ USE `monasca_transform`;
SET foreign_key_checks = 0;
CREATE TABLE IF NOT EXISTS `kafka_offsets` (
`id` INTEGER AUTO_INCREMENT NOT NULL,
`topic` varchar(128) NOT NULL,
`until_offset` BIGINT NULL,
`from_offset` BIGINT NULL,
`app_name` varchar(128) NOT NULL,
`partition` integer NOT NULL,
PRIMARY KEY (`app_name`, `topic`, `partition`)
`batch_time` varchar(20) NOT NULL,
`last_updated` varchar(20) NOT NULL,
`revision` integer NOT NULL,
PRIMARY KEY (`id`, `app_name`, `topic`, `partition`, `revision`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE IF NOT EXISTS `transform_specs` (
`metric_id` varchar(128) NOT NULL,
`transform_spec` varchar(2048) NOT NULL,

View File

@ -14,12 +14,11 @@
import sys
from monasca_transform.service.transform_service import main_service
activate_this_file = "/opt/monasca/transform/venv/bin/activate_this.py"
execfile(activate_this_file, dict(__file__=activate_this_file))
from monasca_transform.service.transform_service import main_service
def main():
main_service()

View File

@ -278,6 +278,8 @@ function install_monasca_transform {
create_and_populate_monasca_transform_database
# create metrics pre hourly topic in kafka
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 64 --topic metrics_pre_hourly
}
@ -344,7 +346,7 @@ function install_spark {
if [ ! -f /opt/spark/download/${SPARK_TARBALL_NAME} ]
then
sudo curl ${APACHE_MIRROR}/spark/spark-${SPARK_VERSION}/${SPARK_TARBALL_NAME} -o /opt/spark/download/${SPARK_TARBALL_NAME}
sudo curl -m 600 ${APACHE_MIRROR}/spark/spark-${SPARK_VERSION}/${SPARK_TARBALL_NAME} -o /opt/spark/download/${SPARK_TARBALL_NAME}
fi
sudo chown spark:spark /opt/spark/download/${SPARK_TARBALL_NAME}

View File

@ -3,6 +3,7 @@
[repositories]
offsets = monasca_transform.mysql_offset_specs:MySQLOffsetSpecs
data_driven_specs = monasca_transform.data_driven_specs.mysql_data_driven_specs_repo:MySQLDataDrivenSpecsRepo
offsets_max_revisions = 10
[database]
server_type = mysql
@ -16,6 +17,15 @@ adapter = monasca_transform.messaging.adapter:KafkaMessageAdapter
topic = metrics
brokers = localhost:9092
publish_kafka_tenant_id = d2cb21079930415a9f2a33588b9f2bb6
adapter_pre_hourly = monasca_transform.messaging.adapter:KafkaMessageAdapterPreHourly
topic_pre_hourly = metrics_pre_hourly
[stage_processors]
enable_pre_hourly_processor = True
[pre_hourly_processor]
enable_instance_usage_df_cache = True
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
#
# Configurable values for the monasca-transform service
@ -60,7 +70,10 @@ spark_home = /opt/spark/current
spark_python_files = /opt/stack/monasca-transform/dist/monasca_transform-0.0.1.egg
# How often the stream should be read (in seconds)
stream_interval = 300
stream_interval = 600
# The working directory for monasca-transform
work_dir = /opt/stack/monasca-transform
enable_record_store_df_cache = True
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2

View File

@ -16,7 +16,6 @@ import abc
import time
from monasca_transform.component import Component
from monasca_transform.messaging.adapter import MessageAdapter
from oslo_config import cfg
@ -103,7 +102,7 @@ class InsertComponent(Component):
return metric
@staticmethod
def _write_metric(row, agg_params):
def _get_metric(row, agg_params):
"""write data to kafka. extracts and formats
metric data and write s the data to kafka
"""
@ -132,8 +131,46 @@ class InsertComponent(Component):
row.aggregation_period}
metric = InsertComponent._prepare_metric(instance_usage_dict,
agg_params)
return metric
MessageAdapter.send_metric(metric)
@staticmethod
def _get_instance_usage_pre_hourly(row,
metric_id):
"""write data to kafka. extracts and formats
metric data and writes the data to kafka
"""
# add transform spec metric id to processing meta
processing_meta = {"metric_id": metric_id}
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,
"resource_uuid": row.resource_uuid,
"geolocation": row.geolocation,
"region": row.region,
"zone": row.zone,
"host": row.host,
"project_id": row.project_id,
"aggregated_metric_name":
row.aggregated_metric_name,
"quantity": row.quantity,
"firstrecord_timestamp":
row.firstrecord_timestamp_string,
"lastrecord_timestamp":
row.lastrecord_timestamp_string,
"firstrecord_timestamp_unix":
row.firstrecord_timestamp_unix,
"lastrecord_timestamp_unix":
row.lastrecord_timestamp_unix,
"record_count": row.record_count,
"service_group": row.service_group,
"service_id": row.service_id,
"usage_date": row.usage_date,
"usage_hour": row.usage_hour,
"usage_minute": row.usage_minute,
"aggregation_period":
row.aggregation_period,
"processing_meta": processing_meta}
return instance_usage_dict
@staticmethod
def _write_metrics_from_partition(partlistiter):

View File

@ -13,12 +13,12 @@
# under the License.
from monasca_transform.component.insert import InsertComponent
from oslo_config import cfg
from tests.unit.messaging.adapter import DummyAdapter
class DummyInsert(InsertComponent):
"""Insert component that writes instance usage data
"""Insert component that writes metric data to
to kafka queue
"""
@ -63,7 +63,7 @@ class DummyInsert(InsertComponent):
#
for instance_usage_row in instance_usage_df.collect():
InsertComponent._write_metric(instance_usage_row,
agg_params)
metric = InsertComponent._get_metric(instance_usage_row,
agg_params)
DummyAdapter.send_metric(metric)
return instance_usage_df

View File

@ -14,6 +14,7 @@
from monasca_transform.component.insert import InsertComponent
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.messaging.adapter import KafkaMessageAdapter
class KafkaInsert(InsertComponent):
@ -59,6 +60,7 @@ class KafkaInsert(InsertComponent):
#
for instance_usage_row in instance_usage_df.collect():
InsertComponent._write_metric(instance_usage_row, agg_params)
metric = InsertComponent._get_metric(
instance_usage_row, agg_params)
KafkaMessageAdapter.send_metric(metric)
return instance_usage_df

View File

@ -0,0 +1,46 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.component.insert import InsertComponent
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.messaging.adapter import KafkaMessageAdapterPreHourly
class KafkaInsertPreHourly(InsertComponent):
"""Insert component that writes instance usage data
to kafka queue
"""
@staticmethod
def insert(transform_context, instance_usage_df):
"""write instance usage data to kafka"""
# object to init config
ConfigInitializer.basic_config()
transform_spec_df = transform_context.transform_spec_df_info
agg_params = transform_spec_df.select(
"metric_id").\
collect()[0].asDict()
metric_id = agg_params["metric_id"]
for instance_usage_row in instance_usage_df.collect():
instance_usage_dict = \
InsertComponent._get_instance_usage_pre_hourly(
instance_usage_row,
metric_id)
KafkaMessageAdapterPreHourly.send_metric(instance_usage_dict)
return instance_usage_df

View File

@ -24,7 +24,7 @@ class PrepareData(InsertComponent):
"""write instance usage data to kafka"""
#
# FIXME: add instance usage data validation
# TODO(someone) add instance usage data validation
#
return instance_usage_df

View File

@ -52,7 +52,6 @@ class RollupQuantity(SetterComponent):
@staticmethod
def _rollup_quantity(instance_usage_df,
transform_spec_df,
setter_rollup_group_by_list,
setter_rollup_operation):
@ -202,7 +201,35 @@ class RollupQuantity(SetterComponent):
# perform rollup operation
instance_usage_json_rdd = RollupQuantity._rollup_quantity(
instance_usage_df, transform_spec_df,
instance_usage_df,
group_by_columns_list,
str(setter_rollup_operation))
sql_context = SQLContext.getOrCreate(instance_usage_df.rdd.context)
instance_usage_trans_df = InstanceUsageUtils.create_df_from_json_rdd(
sql_context,
instance_usage_json_rdd)
return instance_usage_trans_df
@staticmethod
def do_rollup(setter_rollup_group_by_list,
aggregation_period,
setter_rollup_operation,
instance_usage_df):
# get aggregation period
group_by_period_list = \
ComponentUtils._get_instance_group_by_period_list(
aggregation_period)
# group by columns list
group_by_columns_list = group_by_period_list + \
setter_rollup_group_by_list
# perform rollup operation
instance_usage_json_rdd = RollupQuantity._rollup_quantity(
instance_usage_df,
group_by_columns_list,
str(setter_rollup_operation))

View File

@ -261,15 +261,15 @@ class FetchQuantity(UsageComponent):
Component.
DEFAULT_UNAVAILABLE_VALUE),
"usage_date":
getattr(row, "usage_date",
getattr(row, "event_date",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"usage_hour":
getattr(row, "usage_hour",
getattr(row, "event_hour",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"usage_minute":
getattr(row, "usage_minute",
getattr(row, "event_minute",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"aggregation_period":

View File

@ -23,6 +23,8 @@ class ConfigInitializer(object):
ConfigInitializer.load_database_options()
ConfigInitializer.load_messaging_options()
ConfigInitializer.load_service_options()
ConfigInitializer.load_stage_processors_options()
ConfigInitializer.load_pre_hourly_processor_options()
if not default_config_files:
default_config_files = ['/etc/monasca-transform.conf',
'etc/monasca-transform.conf']
@ -43,7 +45,9 @@ class ConfigInitializer(object):
default='monasca_transform.data_driven_specs.'
'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo',
help='Repository for metric and event data_driven_specs'
)
),
cfg.IntOpt('offsets_max_revisions', default=10,
help="Max revisions of offsets for each application")
]
repo_group = cfg.OptGroup(name='repositories', title='repositories')
cfg.CONF.register_group(repo_group)
@ -76,7 +80,13 @@ class ConfigInitializer(object):
help='Messaging brokers'),
cfg.StrOpt('publish_kafka_tenant_id',
default='111111',
help='publish aggregated metrics tenant')
help='publish aggregated metrics tenant'),
cfg.StrOpt('adapter_pre_hourly',
default='monasca_transform.messaging.adapter:'
'KafkaMessageAdapterPreHourly',
help='Message adapter implementation'),
cfg.StrOpt('topic_pre_hourly', default='metrics_pre_hourly',
help='Messaging topic pre hourly')
]
messaging_group = cfg.OptGroup(name='messaging', title='messaging')
cfg.CONF.register_group(messaging_group)
@ -99,8 +109,31 @@ class ConfigInitializer(object):
cfg.StrOpt('spark_python_files'),
cfg.IntOpt('stream_interval'),
cfg.StrOpt('work_dir'),
cfg.StrOpt('spark_home')
cfg.StrOpt('spark_home'),
cfg.BoolOpt('enable_record_store_df_cache'),
cfg.StrOpt('record_store_df_cache_storage_level')
]
service_group = cfg.OptGroup(name='service', title='service')
cfg.CONF.register_group(service_group)
cfg.CONF.register_opts(service_opts, group=service_group)
@staticmethod
def load_stage_processors_options():
app_opts = [
cfg.BoolOpt('pre_hourly_processor_enabled'),
]
app_group = cfg.OptGroup(name='stage_processors',
title='stage_processors')
cfg.CONF.register_group(app_group)
cfg.CONF.register_opts(app_opts, group=app_group)
@staticmethod
def load_pre_hourly_processor_options():
app_opts = [
cfg.BoolOpt('enable_instance_usage_df_cache'),
cfg.StrOpt('instance_usage_df_cache_storage_level')
]
app_group = cfg.OptGroup(name='pre_hourly_processor',
title='pre_hourly_processor')
cfg.CONF.register_group(app_group)
cfg.CONF.register_opts(app_opts, group=app_group)

View File

@ -1,23 +1,23 @@
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"mem_total_all","metric_id":"mem_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"mem.usable_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"mem_usable_all","metric_id":"mem_usable_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_total_mb_all","metric_id":"vm_mem_total_mb_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_total_mb_project","metric_id":"vm_mem_total_mb_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.used_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_used_mb_all","metric_id":"vm_mem_used_mb_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.used_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_used_mb_project","metric_id":"vm_mem_used_mb_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"disk.total_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"disk_total_all","metric_id":"disk_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"disk.total_used_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"disk_usable_all","metric_id":"disk_usable_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"nova.vm.disk.total_allocated_gb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"nova_disk_total_allocated_gb_all","metric_id":"nova_disk_total_allocated_gb_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.disk.allocation_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_disk_allocation_all","metric_id":"vm_disk_allocation_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.disk.allocation_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_disk_allocation_project","metric_id":"vm_disk_allocation_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_total_all","metric_id":"cpu_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_total_host","metric_id":"cpu_total_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_util_all","metric_id":"cpu_util_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_util_host","metric_id":"cpu_util_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vcpus_all","metric_id":"vcpus_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vcpus_project","metric_id":"vcpus_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.cpu.utilization_perc_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_cpu_util_perc_project","metric_id":"vm_cpu_util_perc_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"mem_total_all","metric_id":"mem_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"mem.usable_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"mem_usable_all","metric_id":"mem_usable_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_mem_total_mb_all","metric_id":"vm_mem_total_mb_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_mem_total_mb_project","metric_id":"vm_mem_total_mb_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.mem.used_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_mem_used_mb_all","metric_id":"vm_mem_used_mb_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.mem.used_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_mem_used_mb_project","metric_id":"vm_mem_used_mb_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"disk.total_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"disk_total_all","metric_id":"disk_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"disk.total_used_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"disk_usable_all","metric_id":"disk_usable_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"nova.vm.disk.total_allocated_gb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"nova_disk_total_allocated_gb_all","metric_id":"nova_disk_total_allocated_gb_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.disk.allocation_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_disk_allocation_all","metric_id":"vm_disk_allocation_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.disk.allocation_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_disk_allocation_project","metric_id":"vm_disk_allocation_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"cpu_total_all","metric_id":"cpu_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"cpu_total_host","metric_id":"cpu_total_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"cpu_util_all","metric_id":"cpu_util_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"cpu_util_host","metric_id":"cpu_util_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vcpus_all","metric_id":"vcpus_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vcpus_project","metric_id":"vcpus_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"vm.cpu.utilization_perc_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id", "resource_uuid"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"vm_cpu_util_perc_project","metric_id":"vm_cpu_util_perc_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.size_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_all","metric_id":"swift_usage_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"calculate_rate","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.rate_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"setter_rollup_group_by_list": [],"dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_usage_rate","metric_id":"swift_usage_rate"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data_pre_hourly"]},"aggregated_metric_name":"swiftlm.diskusage.val.avail_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "mount", "device"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["host"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"],"pre_hourly_operation":"avg","pre_hourly_group_by_list":["default"]},"metric_group":"swift_avail_all","metric_id":"swift_avail_host"}

View File

@ -41,7 +41,10 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepoFactory
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
from monasca_transform.transform import RddTransformContext
from monasca_transform.transform.storage_utils import StorageUtils
from monasca_transform.transform.transform_utils import MonMetricUtils
from monasca_transform.transform import TransformContextUtils
@ -65,7 +68,7 @@ class MonMetricsKafkaProcessor(object):
log.debug(message)
@staticmethod
def store_offset_ranges(rdd):
def store_offset_ranges(batch_time, rdd):
if rdd.isEmpty():
MonMetricsKafkaProcessor.log_debug(
"storeOffsetRanges: nothing to process...")
@ -73,7 +76,9 @@ class MonMetricsKafkaProcessor(object):
else:
my_offset_ranges = rdd.offsetRanges()
transform_context = \
TransformContextUtils.get_context(offset_info=my_offset_ranges)
TransformContextUtils.get_context(offset_info=my_offset_ranges,
batch_time_info=batch_time
)
rdd_transform_context = \
rdd.map(lambda x: RddTransformContext(x, transform_context))
return rdd_transform_context
@ -87,8 +92,8 @@ class MonMetricsKafkaProcessor(object):
@staticmethod
def get_kafka_stream(topic, streaming_context):
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
saved_offset_spec = offset_specifications.get_kafka_offsets()
app_name = streaming_context.sparkContext.appName
saved_offset_spec = offset_specifications.get_kafka_offsets(app_name)
if len(saved_offset_spec) < 1:
MonMetricsKafkaProcessor.log_debug(
@ -157,25 +162,29 @@ class MonMetricsKafkaProcessor(object):
counts.pprint(9999)
@staticmethod
def save_kafka_offsets(current_offsets, app_name):
def save_kafka_offsets(current_offsets, app_name,
batch_time_info):
"""save current offsets to offset specification."""
# get the offsets from global var
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
for o in current_offsets:
MonMetricsKafkaProcessor.log_debug(
"adding offset: topic:{%s}, partition:{%s}, fromOffset:{%s}, "
"untilOffset:{%s}" % (
o.topic, o.partition, o.fromOffset, o.untilOffset))
offset_specs.add(
app_name, o.topic, o.partition, o.fromOffset, o.untilOffset)
"saving: OffSetRanges: %s %s %s %s, "
"batch_time_info: %s" % (
o.topic, o.partition, o.fromOffset, o.untilOffset,
str(batch_time_info)))
# add new offsets, update revision
offset_specs.add_all_offsets(app_name,
current_offsets,
batch_time_info)
@staticmethod
def reset_kafka_offsets():
def reset_kafka_offsets(app_name):
"""delete all offsets from the offset specification."""
# get the offsets from global var
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
offset_specs.delete_all_kafka_offsets()
offset_specs.delete_all_kafka_offsets(app_name)
@staticmethod
def _validate_raw_mon_metrics(row):
@ -437,24 +446,47 @@ class MonMetricsKafkaProcessor(object):
rdd_transform_context = rdd_transform_context_rdd.first()
transform_context = rdd_transform_context.transform_context_info
#
# cache record store rdd
#
if cfg.CONF.service.enable_record_store_df_cache:
storage_level_prop = \
cfg.CONF.service.record_store_df_cache_storage_level
storage_level = StorageUtils.get_storage_level(
storage_level_prop)
record_store_df.persist(storage_level)
#
# start processing metrics available in record_store data
#
MonMetricsKafkaProcessor.process_metrics(transform_context,
record_store_df)
#
# extract kafka offsets stored in rdd and save
#
# remove df from cache
if cfg.CONF.service.enable_record_store_df_cache:
record_store_df.unpersist()
#
# extract kafka offsets and batch processing time
# stored in transform_context and save offsets
#
offsets = transform_context.offset_info
for o in offsets:
MonMetricsKafkaProcessor.log_debug(
"going to save: OffSetRanges: %s %s %s %s" % (
o.topic, o.partition, o.fromOffset, o.untilOffset))
# batch time
batch_time_info = \
transform_context.batch_time_info
MonMetricsKafkaProcessor.save_kafka_offsets(
offsets, rdd_transform_context_rdd.context.appName)
offsets, rdd_transform_context_rdd.context.appName,
batch_time_info)
# call pre hourly processor, if its time to run
if (cfg.CONF.stage_processors.pre_hourly_processor_enabled
is True and PreHourlyProcessor.is_time_to_run(
batch_time_info)):
PreHourlyProcessor.run_processor(
record_store_df.rdd.context,
batch_time_info)
@staticmethod
def transform_to_recordstore(kvs):
@ -526,9 +558,13 @@ def invoke():
# One exception that can occur here is the result of the saved
# kafka offsets being obsolete/out of range. Delete the saved
# offsets to improve the chance of success on the next execution.
# TODO(someone) prevent deleting all offsets for an application,
# but just the latest revision
MonMetricsKafkaProcessor.log_debug(
"Deleting saved offsets for chance of success on next execution")
MonMetricsKafkaProcessor.reset_kafka_offsets()
MonMetricsKafkaProcessor.reset_kafka_offsets(application_name)
if __name__ == "__main__":
invoke()

View File

@ -22,20 +22,6 @@ import simport
class MessageAdapter(object):
adapter_impl = None
@staticmethod
def init():
# object to keep track of offsets
MessageAdapter.adapter_impl = simport.load(
cfg.CONF.messaging.adapter)()
@staticmethod
def send_metric(metric):
if not MessageAdapter.adapter_impl:
MessageAdapter.init()
MessageAdapter.adapter_impl.do_send_metric(metric)
@abc.abstractmethod
def do_send_metric(self, metric):
raise NotImplementedError(
@ -45,13 +31,55 @@ class MessageAdapter(object):
class KafkaMessageAdapter(MessageAdapter):
adapter_impl = None
def __init__(self):
client_for_writing = KafkaClient(cfg.CONF.messaging.brokers)
self.producer = SimpleProducer(client_for_writing)
self.topic = cfg.CONF.messaging.topic
@staticmethod
def init():
# object to keep track of offsets
KafkaMessageAdapter.adapter_impl = simport.load(
cfg.CONF.messaging.adapter)()
def do_send_metric(self, metric):
self.producer.send_messages(
self.topic,
json.dumps(metric, separators=(',', ':')))
return
@staticmethod
def send_metric(metric):
if not KafkaMessageAdapter.adapter_impl:
KafkaMessageAdapter.init()
KafkaMessageAdapter.adapter_impl.do_send_metric(metric)
class KafkaMessageAdapterPreHourly(MessageAdapter):
adapter_impl = None
def __init__(self):
client_for_writing = KafkaClient(cfg.CONF.messaging.brokers)
self.producer = SimpleProducer(client_for_writing)
self.topic = cfg.CONF.messaging.topic_pre_hourly
@staticmethod
def init():
# object to keep track of offsets
KafkaMessageAdapterPreHourly.adapter_impl = simport.load(
cfg.CONF.messaging.adapter_pre_hourly)()
def do_send_metric(self, metric):
self.producer.send_messages(
self.topic,
json.dumps(metric, separators=(',', ':')))
return
@staticmethod
def send_metric(metric):
if not KafkaMessageAdapterPreHourly.adapter_impl:
KafkaMessageAdapterPreHourly.init()
KafkaMessageAdapterPreHourly.adapter_impl.do_send_metric(metric)

View File

@ -11,11 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_config import cfg
from sqlalchemy import create_engine
from sqlalchemy import desc
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import sessionmaker
from monasca_transform.offset_specs import OffsetSpec
@ -25,9 +25,18 @@ Base = automap_base()
class MySQLOffsetSpec(Base, OffsetSpec):
__tablename__ = 'kafka_offsets'
def __str__(self):
return "%s,%s,%s,%s,%s,%s,%s,%s" % (str(self.id),
str(self.topic),
str(self.partition),
str(self.until_offset),
str(self.from_offset),
str(self.batch_time),
str(self.last_updated),
str(self.revision))
class MySQLOffsetSpecs(OffsetSpecs):
@ -41,8 +50,7 @@ class MySQLOffsetSpecs(OffsetSpecs):
database_uid,
database_pwd,
database_server,
database_name
), isolation_level="READ UNCOMMITTED")
database_name), isolation_level="READ UNCOMMITTED")
db.echo = True
# reflect the tables
@ -51,36 +59,119 @@ class MySQLOffsetSpecs(OffsetSpecs):
Session = sessionmaker(bind=db)
self.session = Session()
def add(self, app_name, topic, partition,
from_offset, until_offset):
try:
offset_spec = self.session.query(MySQLOffsetSpec).filter_by(
app_name=app_name, topic=topic,
partition=partition).one()
offset_spec.from_offset = from_offset
offset_spec.until_offset = until_offset
self.session.commit()
# keep these many offset versions around
self.MAX_REVISIONS = cfg.CONF.repositories.offsets_max_revisions
def _manage_offset_revisions(self):
"""manage offset versions"""
distinct_offset_specs = self.session.query(
MySQLOffsetSpec).group_by(MySQLOffsetSpec.app_name,
MySQLOffsetSpec.topic,
MySQLOffsetSpec.partition
).all()
for distinct_offset_spec in distinct_offset_specs:
ordered_versions = self.session.query(
MySQLOffsetSpec).filter_by(
app_name=distinct_offset_spec.app_name,
topic=distinct_offset_spec.topic,
partition=distinct_offset_spec.partition).order_by(
desc(MySQLOffsetSpec.id)).all()
revision = 1
for version_spec in ordered_versions:
version_spec.revision = revision
revision = revision + 1
self.session.query(MySQLOffsetSpec).filter(
MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete(
synchronize_session="fetch")
def get_kafka_offsets(self, app_name):
return {'%s_%s_%s' % (
offset.get_app_name(), offset.get_topic(), offset.get_partition()
): offset for offset in self.session.query(MySQLOffsetSpec).filter(
MySQLOffsetSpec.app_name == app_name,
MySQLOffsetSpec.revision == 1).all()}
def delete_all_kafka_offsets(self, app_name):
try:
self.session.query(MySQLOffsetSpec).filter(
MySQLOffsetSpec.app_name == app_name).delete()
self.session.commit()
except Exception:
# Seems like there isn't much that can be done in this situation
pass
def add_all_offsets(self, app_name, offsets,
batch_time_info):
"""add offsets. """
try:
# batch time
batch_time = \
batch_time_info.strftime(
'%Y-%m-%d %H:%M:%S')
# last updated
last_updated = \
datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')
NEW_REVISION_NO = -1
for o in offsets:
offset_spec = MySQLOffsetSpec(
topic=o.topic,
app_name=app_name,
partition=o.partition,
from_offset=o.fromOffset,
until_offset=o.untilOffset,
batch_time=batch_time,
last_updated=last_updated,
revision=NEW_REVISION_NO)
self.session.add(offset_spec)
# manage versions
self._manage_offset_revisions()
self.session.commit()
except Exception:
self.session.rollback()
raise
def add(self, app_name, topic, partition,
from_offset, until_offset, batch_time_info):
"""add offset info. """
try:
# batch time
batch_time = \
batch_time_info.strftime(
'%Y-%m-%d %H:%M:%S')
# last updated
last_updated = \
datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')
NEW_REVISION_NO = -1
except NoResultFound:
offset_spec = MySQLOffsetSpec(
topic=topic,
app_name=app_name,
partition=partition,
from_offset=from_offset,
until_offset=until_offset)
until_offset=until_offset,
batch_time=batch_time,
last_updated=last_updated,
revision=NEW_REVISION_NO)
self.session.add(offset_spec)
# manage versions
self._manage_offset_revisions()
self.session.commit()
def get_kafka_offsets(self):
return {'%s_%s_%s' % (
offset.get_app_name(), offset.get_topic(), offset.get_partition()
): offset for offset in self.session.query(MySQLOffsetSpec).all()}
def delete_all_kafka_offsets(self):
try:
self.session.query(MySQLOffsetSpec).delete()
self.session.commit()
except Exception:
# Seems like there isn't much that can be done in this situation
pass
self.session.rollback()
raise

View File

@ -13,24 +13,30 @@
# under the License.
import abc
import datetime
import json
import logging
import os
import six
log = logging.getLogger(__name__)
class OffsetSpec(object):
def __init__(self, app_name=None, topic=None, partition=None,
from_offset=None, until_offset=None):
from_offset=None, until_offset=None,
batch_time=None, last_updated=None,
revision=None):
self.app_name = app_name
self.topic = topic
self.partition = partition
self.from_offset = from_offset
self.until_offset = until_offset
self.batch_time = batch_time
self.last_updated = last_updated
self.revision = revision
def get_app_name(self):
return self.app_name
@ -47,6 +53,15 @@ class OffsetSpec(object):
def get_until_offset(self):
return self.until_offset
def get_batch_time(self):
return self.batch_time
def get_last_updated(self):
return self.last_updated
def get_revision(self):
return self.revision
@six.add_metaclass(abc.ABCMeta)
class OffsetSpecs(object):
@ -56,20 +71,29 @@ class OffsetSpecs(object):
@abc.abstractmethod
def add(self, app_name, topic, partition,
from_offset, until_offset):
from_offset, until_offset, batch_time_info):
raise NotImplementedError(
"Class %s doesn't implement add(self, app_name, topic, "
"partition, from_offset, until_offset)"
"partition, from_offset, until_offset, batch_time,"
"last_updated, revision)"
% self.__class__.__name__)
@abc.abstractmethod
def get_kafka_offsets(self):
def add_all_offsets(self, app_name, offsets, batch_time_info):
raise NotImplementedError(
"Class %s doesn't implement add(self, app_name, topic, "
"partition, from_offset, until_offset, batch_time,"
"last_updated, revision)"
% self.__class__.__name__)
@abc.abstractmethod
def get_kafka_offsets(self, app_name):
raise NotImplementedError(
"Class %s doesn't implement get_kafka_offsets()"
% self.__class__.__name__)
@abc.abstractmethod
def delete_all_kafka_offsets(self):
def delete_all_kafka_offsets(self, app_name):
raise NotImplementedError(
"Class %s doesn't implement delete_all_kafka_offsets()"
% self.__class__.__name__)
@ -91,9 +115,12 @@ class JSONOffsetSpecs(OffsetSpecs):
self._kafka_offsets[key] = OffsetSpec(
app_name=value.get('app_name'),
topic=value.get('topic'),
until_offset=value.get('until_offset'),
partition=value.get('partition'),
from_offset=value.get('from_offset'),
partition=value.get('partition')
until_offset=value.get('until_offset'),
batch_time=value.get('batch_time'),
last_updated=value.get('last_updated'),
revision=value.get('revision')
)
except Exception:
log.info('Invalid or corrupts offsets file found at %s,'
@ -122,10 +149,26 @@ class JSONOffsetSpecs(OffsetSpecs):
"topic": offset_value.get_topic(),
"partition": offset_value.get_partition(),
"from_offset": offset_value.get_from_offset(),
"until_offset": offset_value.get_until_offset()}
"until_offset": offset_value.get_until_offset(),
"batch_time": offset_value.get_batch_time(),
"last_updated": offset_value.get_last_updated(),
"revision": offset_value.get_revision()}
def add(self, app_name, topic, partition,
from_offset, until_offset):
from_offset, until_offset, batch_time_info):
# batch time
batch_time = \
batch_time_info.strftime(
'%Y-%m-%d %H:%M:%S')
# last updated
last_updated = \
datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')
NEW_REVISION_NO = 1
key_name = "%s_%s_%s" % (
app_name, topic, partition)
offset = OffsetSpec(
@ -133,7 +176,10 @@ class JSONOffsetSpecs(OffsetSpecs):
topic=topic,
partition=partition,
from_offset=from_offset,
until_offset=until_offset
until_offset=until_offset,
batch_time=batch_time,
last_updated=last_updated,
revision=NEW_REVISION_NO
)
log.info('Adding offset %s for key %s to current offsets: %s' %
(offset, key_name, self._kafka_offsets))
@ -141,9 +187,44 @@ class JSONOffsetSpecs(OffsetSpecs):
log.info('Added so kafka offset is now %s', self._kafka_offsets)
self._save()
def get_kafka_offsets(self):
def get_kafka_offsets(self, app_name):
return self._kafka_offsets
def delete_all_kafka_offsets(self):
def delete_all_kafka_offsets(self, app_name):
log.info("Deleting json offsets file: %s", self.kafka_offset_spec_file)
os.remove(self.kafka_offset_spec_file)
def add_all_offsets(self, app_name, offsets, batch_time_info):
# batch time
batch_time = \
batch_time_info.strftime(
'%Y-%m-%d %H:%M:%S')
# last updated
last_updated = \
datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')
NEW_REVISION_NO = -1
for o in offsets:
key_name = "%s_%s_%s" % (
app_name, o.topic, o.partition)
offset = OffsetSpec(
topic=o.topic,
app_name=app_name,
partition=o.partition,
from_offset=o.fromOffset,
until_offset=o.untilOffset,
batch_time=batch_time,
last_updated=last_updated,
revision=NEW_REVISION_NO)
log.info('Adding offset %s for key %s to current offsets: %s' %
(offset, key_name, self._kafka_offsets))
self._kafka_offsets[key_name] = offset
log.info('Added so kafka offset is now %s', self._kafka_offsets)
self._save()

View File

@ -0,0 +1,41 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Processor(object):
"""processor object """
@abc.abstractmethod
def get_app_name(self):
"""get name of this application. Will be used to
store offsets in database
"""
raise NotImplementedError(
"Class %s doesn't implement get_app_name()"
% self.__class__.__name__)
@abc.abstractmethod
def is_time_to_run(self, current_time):
"""return True if its time to run this processor"""
raise NotImplementedError(
"Class %s doesn't implement is_time_to_run()"
% self.__class__.__name__)
@abc.abstractmethod
def run_processor(self, time):
"""Run application"""
raise NotImplementedError(
"Class %s doesn't implement run_processor()"
% self.__class__.__name__)

View File

@ -0,0 +1,432 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kafka.common import OffsetRequestPayload
from kafka import KafkaClient
from pyspark.sql import SQLContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import OffsetRange
import datetime
import logging
from oslo_config import cfg
import simport
from monasca_transform.component.insert.kafka_insert import KafkaInsert
from monasca_transform.component.setter.rollup_quantity import RollupQuantity
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepo
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepoFactory
from monasca_transform.processor import Processor
from monasca_transform.transform.storage_utils import StorageUtils
from monasca_transform.transform.transform_utils import InstanceUsageUtils
from monasca_transform.transform import TransformContextUtils
LOG = logging.getLogger(__name__)
class PreHourlyProcessor(Processor):
"""Processor to process usage data published to metrics_pre_hourly topic a
and publish final rolled up metrics to metrics topic in kafka.
"""
@staticmethod
def log_debug(message):
LOG.debug(message)
@staticmethod
def save_kafka_offsets(current_offsets,
batch_time_info):
"""save current offsets to offset specification."""
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
app_name = PreHourlyProcessor.get_app_name()
for o in current_offsets:
PreHourlyProcessor.log_debug(
"saving: OffSetRanges: %s %s %s %s, "
"batch_time_info: %s" % (
o.topic, o.partition, o.fromOffset, o.untilOffset,
str(batch_time_info)))
# add new offsets, update revision
offset_specs.add_all_offsets(app_name,
current_offsets,
batch_time_info)
@staticmethod
def reset_kafka_offsets(app_name):
"""delete all offsets from the offset specification."""
# get the offsets from global var
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
offset_specs.delete_all_kafka_offsets(app_name)
@staticmethod
def get_app_name():
"""get name of this application. Will be used to
store offsets in database
"""
return "mon_metrics_kafka_pre_hourly"
@staticmethod
def get_kafka_topic():
"""get name of kafka topic for transformation."""
return "metrics_pre_hourly"
@staticmethod
def is_time_to_run(check_time):
"""return True if its time to run this processor.
For now it just checks to see if its start of the hour
i.e. the minute is 00.
"""
this_min = int(datetime.datetime.strftime(check_time, '%M'))
# run pre hourly processor only at top of the hour
if this_min == 0:
return True
else:
return False
@staticmethod
def _get_offsets_from_kafka(brokers,
topic,
offset_time):
"""get dict representing kafka
offsets.
"""
# get client
client = KafkaClient(brokers)
# get partitions for a topic
partitions = client.topic_partitions[topic]
# https://cwiki.apache.org/confluence/display/KAFKA/
# A+Guide+To+The+Kafka+Protocol#
# AGuideToTheKafkaProtocol-OffsetRequest
MAX_OFFSETS = 1
offset_requests = [OffsetRequestPayload(topic,
part_name,
offset_time,
MAX_OFFSETS) for part_name
in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
offset_dict = {}
for response in offsets_responses:
key = "_".join((response.topic,
str(response.partition)))
offset_dict[key] = response
return offset_dict
@staticmethod
def _parse_saved_offsets(app_name, topic, saved_offset_spec):
"""get dict representing saved
offsets.
"""
offset_dict = {}
for key, value in saved_offset_spec.items():
if key.startswith("%s_%s" % (app_name, topic)):
spec_app_name = value.get_app_name()
spec_topic = value.get_topic()
spec_partition = int(value.get_partition())
spec_from_offset = value.get_from_offset()
spec_until_offset = value.get_until_offset()
key = "_".join((spec_topic,
str(spec_partition)))
offset_dict[key] = (spec_app_name,
spec_topic,
spec_partition,
spec_from_offset,
spec_until_offset)
return offset_dict
@staticmethod
def _get_new_offset_range_list(brokers, topic):
"""get offset range from earliest to latest."""
offset_range_list = []
# https://cwiki.apache.org/confluence/display/KAFKA/
# A+Guide+To+The+Kafka+Protocol#
# AGuideToTheKafkaProtocol-OffsetRequest
GET_LATEST_OFFSETS = -1
latest_dict = PreHourlyProcessor.\
_get_offsets_from_kafka(brokers, topic,
GET_LATEST_OFFSETS)
GET_EARLIEST_OFFSETS = -2
earliest_dict = PreHourlyProcessor.\
_get_offsets_from_kafka(brokers, topic,
GET_EARLIEST_OFFSETS)
for item in latest_dict:
until_offset = latest_dict[item].offsets[0]
from_offset = earliest_dict[item].offsets[0]
partition = latest_dict[item].partition
topic = latest_dict[item].topic
offset_range_list.append(OffsetRange(topic,
partition,
from_offset,
until_offset))
return offset_range_list
@staticmethod
def _get_offset_range_list(brokers,
topic,
app_name,
saved_offset_spec):
"""get offset range from saved offset to latest.
"""
offset_range_list = []
# https://cwiki.apache.org/confluence/display/KAFKA/
# A+Guide+To+The+Kafka+Protocol#
# AGuideToTheKafkaProtocol-OffsetRequest
GET_LATEST_OFFSETS = -1
latest_dict = PreHourlyProcessor.\
_get_offsets_from_kafka(brokers, topic,
GET_LATEST_OFFSETS)
GET_EARLIEST_OFFSETS = -2
earliest_dict = PreHourlyProcessor.\
_get_offsets_from_kafka(brokers, topic,
GET_EARLIEST_OFFSETS)
saved_dict = PreHourlyProcessor.\
_parse_saved_offsets(app_name, topic, saved_offset_spec)
for item in latest_dict:
# saved spec
(spec_app_name,
spec_topic_name,
spec_partition,
spec_from_offset,
spec_until_offset) = saved_dict[item]
# until
until_offset = latest_dict[item].offsets[0]
# from
if (spec_until_offset is not None and int(spec_until_offset) >= 0):
from_offset = spec_until_offset
else:
from_offset = earliest_dict[item].offsets[0]
partition = latest_dict[item].partition
topic = latest_dict[item].topic
offset_range_list.append(OffsetRange(topic,
partition,
from_offset,
until_offset))
return offset_range_list
@staticmethod
def get_processing_offset_range_list(processing_time):
"""get offset range to fetch data from. The
range will last from the last saved offsets to current offsets
available. If there are no last saved offsets available in the
database the starting offsets will be set to the earliest
available in kafka.
"""
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
# get application name, will be used to get offsets from database
app_name = PreHourlyProcessor.get_app_name()
saved_offset_spec = offset_specifications.get_kafka_offsets(app_name)
# get kafka topic to fetch data
topic = PreHourlyProcessor.get_kafka_topic()
offset_range_list = []
if len(saved_offset_spec) < 1:
PreHourlyProcessor.log_debug(
"No saved offsets available..."
"connecting to kafka and fetching "
"from earliest available offset ...")
offset_range_list = PreHourlyProcessor._get_new_offset_range_list(
cfg.CONF.messaging.brokers,
topic)
else:
PreHourlyProcessor.log_debug(
"Saved offsets available..."
"connecting to kafka and fetching from saved offset ...")
offset_range_list = PreHourlyProcessor._get_offset_range_list(
cfg.CONF.messaging.brokers,
topic,
app_name,
saved_offset_spec)
return offset_range_list
@staticmethod
def fetch_pre_hourly_data(spark_context,
offset_range_list):
"""get metrics pre hourly data from offset range list."""
# get kafka stream over the same offsets
pre_hourly_rdd = KafkaUtils.createRDD(spark_context,
{"metadata.broker.list":
cfg.CONF.messaging.brokers},
offset_range_list)
return pre_hourly_rdd
@staticmethod
def pre_hourly_to_instance_usage_df(pre_hourly_rdd):
"""convert raw pre hourly data into instance usage dataframe."""
#
# extract second column containing instance usage data
#
instance_usage_rdd = pre_hourly_rdd.map(
lambda iud: iud[1])
#
# convert usage data rdd to instance usage df
#
sqlc = SQLContext.getOrCreate(pre_hourly_rdd.context)
instance_usage_df = \
InstanceUsageUtils.create_df_from_json_rdd(
sqlc,
instance_usage_rdd)
return instance_usage_df
@staticmethod
def process_instance_usage(transform_context, instance_usage_df):
"""second stage aggregation. Aggregate instance usage rdd
data and write results to metrics topic in kafka.
"""
transform_spec_df = transform_context.transform_spec_df_info
#
# do a rollup operation
#
agg_params = transform_spec_df.select(
"aggregation_params_map.pre_hourly_group_by_list")\
.collect()[0].asDict()
pre_hourly_group_by_list = agg_params["pre_hourly_group_by_list"]
if (len(pre_hourly_group_by_list) == 1 and
pre_hourly_group_by_list[0] == "default"):
pre_hourly_group_by_list = ["tenant_id", "user_id",
"resource_uuid",
"geolocation", "region", "zone",
"host", "project_id",
"aggregated_metric_name",
"aggregation_period"]
# get aggregation period
agg_params = transform_spec_df.select(
"aggregation_params_map.aggregation_period").collect()[0].asDict()
aggregation_period = agg_params["aggregation_period"]
# get 2stage operation
agg_params = transform_spec_df.select(
"aggregation_params_map.pre_hourly_operation")\
.collect()[0].asDict()
pre_hourly_operation = agg_params["pre_hourly_operation"]
instance_usage_df = \
RollupQuantity.do_rollup(pre_hourly_group_by_list,
aggregation_period,
pre_hourly_operation,
instance_usage_df)
# insert metrics
instance_usage_df = KafkaInsert.insert(transform_context,
instance_usage_df)
return instance_usage_df
@staticmethod
def do_transform(instance_usage_df):
"""start processing (aggregating) metrics
"""
#
# look in instance_usage_df for list of metrics to be processed
#
metric_ids_df = instance_usage_df.select(
"processing_meta.metric_id").distinct()
metric_ids_to_process = [row.metric_id
for row in metric_ids_df.collect()]
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
get_data_driven_specs_repo()
sqlc = SQLContext.getOrCreate(instance_usage_df.rdd.context)
transform_specs_df = data_driven_specs_repo.get_data_driven_specs(
sql_context=sqlc,
data_driven_spec_type=DataDrivenSpecsRepo.transform_specs_type)
for metric_id in metric_ids_to_process:
transform_spec_df = transform_specs_df.select(
["aggregation_params_map", "metric_id"]
).where(transform_specs_df.metric_id == metric_id)
source_instance_usage_df = instance_usage_df.select("*").where(
instance_usage_df.processing_meta.metric_id == metric_id)
# set transform_spec_df in TransformContext
transform_context = \
TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
PreHourlyProcessor.process_instance_usage(
transform_context, source_instance_usage_df)
@staticmethod
def run_processor(spark_context, processing_time):
"""process data in metrics_pre_hourly queue, starting
from the last saved offsets, else start from earliest
offsets available
"""
offset_range_list = \
PreHourlyProcessor.get_processing_offset_range_list(
processing_time)
# get pre hourly data
pre_hourly_rdd = PreHourlyProcessor.fetch_pre_hourly_data(
spark_context, offset_range_list)
# get instance usage df
instance_usage_df = PreHourlyProcessor.pre_hourly_to_instance_usage_df(
pre_hourly_rdd)
#
# cache instance usage df
#
if cfg.CONF.pre_hourly_processor.enable_instance_usage_df_cache:
storage_level_prop = \
cfg.CONF.pre_hourly_processor\
.instance_usage_df_cache_storage_level
storage_level = StorageUtils.get_storage_level(
storage_level_prop)
instance_usage_df.persist(storage_level)
# aggregate pre hourly data
PreHourlyProcessor.do_transform(instance_usage_df)
# remove cache
if cfg.CONF.pre_hourly_processor.enable_instance_usage_df_cache:
instance_usage_df.unpersist()
# save latest metrics_pre_hourly offsets in the database
PreHourlyProcessor.save_kafka_offsets(offset_range_list,
processing_time)

View File

@ -18,7 +18,8 @@ from collections import namedtuple
TransformContextBase = namedtuple("TransformContext",
["config_info",
"offset_info",
"transform_spec_df_info"])
"transform_spec_df_info",
"batch_time_info"])
class TransformContext(TransformContextBase):
@ -32,6 +33,7 @@ class TransformContext(TransformContextBase):
offset_info - current kafka offset information
transform_spec_df - processing information from
transform_spec aggregation driver table
batch_datetime_info - current batch processing datetime
"""
RddTransformContextBase = namedtuple("RddTransformContext",
@ -56,12 +58,14 @@ class TransformContextUtils(object):
def get_context(transform_context_info=None,
config_info=None,
offset_info=None,
transform_spec_df_info=None):
transform_spec_df_info=None,
batch_time_info=None):
if transform_context_info is None:
return TransformContext(config_info,
offset_info,
transform_spec_df_info)
transform_spec_df_info,
batch_time_info)
else:
if config_info is None or config_info == "":
# get from passed in transform_context
@ -77,6 +81,13 @@ class TransformContextUtils(object):
transform_spec_df_info = \
transform_context_info.transform_spec_df_info
if batch_time_info is None or \
batch_time_info == "":
# get from passed in transform_context
batch_time_info = \
transform_context_info.batch_time_info
return TransformContext(config_info,
offset_info,
transform_spec_df_info)
transform_spec_df_info,
batch_time_info)

View File

@ -0,0 +1,49 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark import StorageLevel
class StorageUtils(object):
"""storage util functions"""
@staticmethod
def get_storage_level(storage_level_str):
"""get pyspark storage level from storage level
string
"""
if (storage_level_str == "DISK_ONLY"):
return StorageLevel.DISK_ONLY
elif (storage_level_str == "DISK_ONLY_2"):
return StorageLevel.DISK_ONLY_2
elif (storage_level_str == "MEMORY_AND_DISK"):
return StorageLevel.MEMORY_AND_DISK
elif (storage_level_str == "MEMORY_AND_DISK_2"):
return StorageLevel.MEMORY_AND_DISK_2
elif (storage_level_str == "MEMORY_AND_DISK_SER"):
return StorageLevel.MEMORY_AND_DISK_SER
elif (storage_level_str == "MEMORY_AND_DISK_SER_2"):
return StorageLevel.MEMORY_AND_DISK_SER_2
elif (storage_level_str == "MEMORY_ONLY"):
return StorageLevel.MEMORY_ONLY
elif (storage_level_str == "MEMORY_ONLY_2"):
return StorageLevel.MEMORY_ONLY_2
elif (storage_level_str == "MEMORY_ONLY_SER"):
return StorageLevel.MEMORY_ONLY_SER
elif (storage_level_str == "MEMORY_ONLY_SER_2"):
return StorageLevel.MEMORY_ONLY_SER_2
elif (storage_level_str == "OFF_HEAP"):
return StorageLevel.OFF_HEAP
else:
return StorageLevel.MEMORY_ONLY

View File

@ -15,7 +15,6 @@
import logging
from pyspark.sql import SQLContext
from pyspark.sql.types import ArrayType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import MapType
@ -178,6 +177,12 @@ class TransformSpecsUtils(TransformUtils):
StringType(), True),
StructField("aggregated_metric_name",
StringType(), True),
StructField("pre_hourly_group_by_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("pre_hourly_operation",
StringType(), True),
StructField("aggregation_pipeline",
StructType([source, usage,
setters, insert]),

View File

@ -32,6 +32,7 @@ monasca_transform.setter =
monasca_transform.insert =
prepare_data = monasca_transform.component.insert.prepare_data:PrepareData
insert_data = monasca_transform.component.insert.kafka_insert:KafkaInsert
insert_data_pre_hourly = monasca_transform.component.insert.kafka_insert_pre_hourly:KafkaInsertPreHourly
[global]
setup-hooks =

View File

@ -11,7 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from pyspark.sql import SQLContext
@ -67,7 +66,8 @@ class TransformBuilderTest(SparkContextTest):
sql_context, metric_proc_json_path)
transform_context = TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
transform_spec_df_info=transform_spec_df,
batch_time_info=self.get_dummy_batch_time())
# invoke the generic transformation builder
instance_usage_df = GenericTransformBuilder.do_transform(

View File

@ -11,23 +11,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import call
from mock import MagicMock
import unittest
from oslo_config import cfg
from pyspark.streaming.kafka import OffsetRange
import mock
from mock import call
from mock import MagicMock
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.driver.mon_metrics_kafka \
import MonMetricsKafkaProcessor
from monasca_transform.messaging.adapter import MessageAdapter
from monasca_transform.transform import RddTransformContext
from monasca_transform.transform import TransformContextUtils
from tests.unit.messaging.adapter import DummyAdapter
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.kafka_data.data_provider import DataProvider
from tests.unit.test_resources.mock_component_manager \
@ -67,9 +65,9 @@ class SparkTest(SparkContextTest):
'tests/unit/test_resources/config/'
'test_config_with_dummy_messaging_adapter.conf'])
# reset metric_id list dummy adapter
if not MessageAdapter.adapter_impl:
MessageAdapter.init()
MessageAdapter.adapter_impl.metric_list = []
if not DummyAdapter.adapter_impl:
DummyAdapter.init()
DummyAdapter.adapter_impl.metric_list = []
@mock.patch('monasca_transform.transform.builder.'
'generic_transform_builder.GenericTransformBuilder.'
@ -106,7 +104,9 @@ class SparkTest(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -121,7 +121,7 @@ class SparkTest(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
# Verify mem.total_mb_agg metrics
total_mb_agg_metric = [

View File

@ -11,13 +11,27 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.messaging.adapter import MessageAdapter
import simport
class DummyAdapter(MessageAdapter):
adapter_impl = None
metric_list = []
@staticmethod
def init():
# object to keep track of offsets
DummyAdapter.adapter_impl = simport.load(
"tests.unit.messaging.adapter:DummyAdapter")()
def do_send_metric(self, metric):
self.metric_list.append(metric)
@staticmethod
def send_metric(metric):
if not DummyAdapter.adapter_impl:
DummyAdapter.init()
DummyAdapter.adapter_impl.do_send_metric(metric)

View File

View File

@ -0,0 +1,136 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from pyspark.streaming.kafka import OffsetRange
from monasca_transform.component.insert.dummy_insert import DummyInsert
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
from tests.unit.messaging.adapter import DummyAdapter
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.metrics_pre_hourly_data.data_provider \
import DataProvider
class TestPreHourlyProcessorAgg(SparkContextTest):
def setUp(self):
super(TestPreHourlyProcessorAgg, self).setUp()
# configure the system with a dummy messaging adapter
ConfigInitializer.basic_config(
default_config_files=[
'tests/unit/test_resources/config/'
'test_config_with_dummy_messaging_adapter.conf'])
# reset metric_id list dummy adapter
if not DummyAdapter.adapter_impl:
DummyAdapter.init()
DummyAdapter.adapter_impl.metric_list = []
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
DummyInsert)
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.fetch_pre_hourly_data')
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
'PreHourlyProcessor.get_processing_offset_range_list')
def test_pre_hourly_processor(self,
offset_range_list,
pre_hourly_data):
# load components
myOffsetRanges = [
OffsetRange("metrics_pre_hourly", 1, 10, 20)]
offset_range_list.return_value = myOffsetRanges
# Create an RDD out of the mocked instance usage data
with open(DataProvider.metrics_pre_hourly_data_path) as f:
raw_lines = f.read().splitlines()
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
pre_hourly_rdd_data = self.spark_context.parallelize(raw_tuple_list)
pre_hourly_data.return_value = pre_hourly_rdd_data
# Do something simple with the RDD
result = self.simple_count_transform(pre_hourly_rdd_data)
# run pre hourly processor
PreHourlyProcessor.run_processor(
self.spark_context, self.get_dummy_batch_time())
# get the metrics that have been submitted to the dummy message adapter
metrics = DummyAdapter.adapter_impl.metric_list
# Verify count of instance usage data
self.assertEqual(result, 6)
# check aggregation result
mem_total_mb_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'mem.total_mb_agg' and
value.get('metric').get('dimensions').get('host') ==
'all'][0]
self.assertTrue(mem_total_mb_agg_metric is not None)
self.assertEqual(16049.0,
mem_total_mb_agg_metric
.get('metric').get('value'))
# agg meta
self.assertEqual("2016-06-20 11:49:44",
mem_total_mb_agg_metric
.get("metric")
.get('value_meta').get('lastrecord_timestamp'))
self.assertEqual("2016-06-20 11:24:59",
mem_total_mb_agg_metric
.get("metric")
.get('value_meta').get('firstrecord_timestamp'))
self.assertEqual(60.0,
mem_total_mb_agg_metric
.get("metric")
.get('value_meta').get('record_count'))
mem_usable_mb_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'mem.usable_mb_agg' and
value.get('metric').get('dimensions').get('host') ==
'all'][0]
self.assertTrue(mem_usable_mb_agg_metric is not None)
self.assertEqual(10283.1,
mem_usable_mb_agg_metric
.get('metric').get('value'))
# agg meta
self.assertEqual("2016-06-20 11:49:44",
mem_usable_mb_agg_metric
.get("metric")
.get('value_meta').get('lastrecord_timestamp'))
self.assertEqual("2016-06-20 11:24:59",
mem_usable_mb_agg_metric
.get("metric")
.get('value_meta').get('firstrecord_timestamp'))
self.assertEqual(60.0,
mem_usable_mb_agg_metric
.get("metric")
.get('value_meta').get('record_count'))
def simple_count_transform(self, rdd):
return rdd.count()
if __name__ == "__main__":
print("PATH *************************************************************")
import sys
print(sys.path)
print("PATH==============================================================")
unittest.main()

View File

@ -11,7 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
from monasca_transform.transform.transform_utils import RecordStoreUtils
@ -44,9 +43,9 @@ class SetAggregatedMetricNameTest(SparkContextTest):
self.sql_context,
DataProvider.transform_spec_path)
transform_context = \
TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
transform_context = TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df,
batch_time_info=self.get_dummy_batch_time())
instance_usage_df = FetchQuantity.usage(
transform_context, record_store_df)

View File

@ -11,7 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
from monasca_transform.transform.transform_utils import RecordStoreUtils
@ -40,9 +39,9 @@ class UsageComponentTest(SparkContextTest):
transform_spec_df = TransformSpecsUtils.create_df_from_json(
self.sql_context, DataProvider.transform_spec_path)
transform_context = \
TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
transform_context = TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df,
batch_time_info=self.get_dummy_batch_time())
instance_usage_df = FetchQuantity.usage(
transform_context, record_store_df)

View File

@ -14,6 +14,8 @@
from pyspark.context import SparkContext
from pyspark import SparkConf
import datetime
import unittest
@ -21,11 +23,24 @@ class SparkContextTest(unittest.TestCase):
def setUp(self):
# Create a local Spark context with 4 cores
spark_conf = SparkConf().setMaster('local[4]')
spark_conf = SparkConf().setMaster('local[4]').\
setAppName("monasca-transform unit tests").\
set("spark.sql.shuffle.partitions", "10")
self.spark_context = SparkContext.getOrCreate(conf=spark_conf)
# quiet logging
logger = self.spark_context._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
logger.LogManager.getLogger("akka").setLevel(logger.Level.WARN)
def tearDown(self):
# we don't stop the spark context because it doesn't work cleanly,
# a context is left behind that cannot work. Instead we rely on the
# context to be shutdown at the end of the tests run
pass
def get_dummy_batch_time(self):
"""get a batch time for all tests."""
my_batch_time = datetime.datetime.strptime('2016-01-01 00:00:00',
'%Y-%m-%d %H:%M:%S')
return my_batch_time

View File

@ -11,7 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import json
import os
import random
@ -33,11 +33,18 @@ class TestJSONOffsetSpecs(unittest.TestCase):
def tearDown(self):
pass
def get_dummy_batch_time(self):
"""get a batch time for all tests."""
my_batch_time = datetime.datetime.strptime('2016-01-01 00:00:00',
'%Y-%m-%d %H:%M:%S')
return my_batch_time
def test_read_offsets_on_start(self):
json_offset_specs = JSONOffsetSpecs(
path=self.test_resources_path,
filename='test_read_offsets_on_start.json')
kafka_offsets = json_offset_specs.get_kafka_offsets()
app_name = "mon_metrics_kafka"
kafka_offsets = json_offset_specs.get_kafka_offsets(app_name)
self.assertEqual(1, len(kafka_offsets))
offset_key_0 = kafka_offsets.iterkeys().next()
self.assertEqual('mon_metrics_kafka_metrics_0', offset_key_0)
@ -45,7 +52,7 @@ class TestJSONOffsetSpecs(unittest.TestCase):
self.assertEqual('metrics', offset_value_0.get_topic())
self.assertEqual(85081, offset_value_0.get_until_offset())
self.assertEqual(0, offset_value_0.get_partition())
self.assertEqual('mon_metrics_kafka', offset_value_0.get_app_name())
self.assertEqual(app_name, offset_value_0.get_app_name())
self.assertEqual(84790, offset_value_0.get_from_offset())
def test_write_offsets_each_add(self):
@ -60,11 +67,13 @@ class TestJSONOffsetSpecs(unittest.TestCase):
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
my_batch_time = self.get_dummy_batch_time()
used_values = {}
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1, from_offset=from_offset_1,
until_offset=until_offset_1)
until_offset=until_offset_1,
batch_time_info=my_batch_time)
kafka_offset_dict = self.load_offset_file_as_json(file_path)
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
@ -83,7 +92,8 @@ class TestJSONOffsetSpecs(unittest.TestCase):
app_name_2 = str(uuid.uuid4())
json_offset_specs.add(topic=topic_2, partition=partition_2,
app_name=app_name_2, from_offset=from_offset_2,
until_offset=until_offset_2)
until_offset=until_offset_2,
batch_time_info=my_batch_time)
offset_key_2 = "%s_%s_%s" % (app_name_2, topic_2, partition_2)
used_values[offset_key_2] = {
"topic": topic_2, "partition": partition_2, "app_name": app_name_2,
@ -127,10 +137,13 @@ class TestJSONOffsetSpecs(unittest.TestCase):
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
my_batch_time = self.get_dummy_batch_time()
used_values = {}
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1, from_offset=from_offset_1,
until_offset=until_offset_1)
until_offset=until_offset_1,
batch_time_info=my_batch_time)
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
@ -143,7 +156,8 @@ class TestJSONOffsetSpecs(unittest.TestCase):
app_name_2 = str(uuid.uuid4())
json_offset_specs.add(topic=topic_2, partition=partition_2,
app_name=app_name_2, from_offset=from_offset_2,
until_offset=until_offset_2)
until_offset=until_offset_2,
batch_time_info=my_batch_time)
offset_key_2 = "%s_%s_%s" % (app_name_2, topic_2, partition_2)
used_values[offset_key_2] = {
"topic": topic_2, "partition": partition_2, "app_name": app_name_2,
@ -152,12 +166,20 @@ class TestJSONOffsetSpecs(unittest.TestCase):
# now create a new JSONOffsetSpecs
json_offset_specs_2 = JSONOffsetSpecs(self.test_resources_path,
filename)
found_offsets = json_offset_specs_2.get_kafka_offsets()
found_offsets = json_offset_specs_2.get_kafka_offsets(app_name_2)
json_found_offsets = {key: JSONOffsetSpecs.as_dict(value)
for key, value in found_offsets.items()}
for key, value in used_values.items():
found_value = json_found_offsets.get(key)
self.assertEqual(value, found_value)
self.assertEqual(value.get("app_name"),
found_value.get("app_name"))
self.assertEqual(value.get("topic"), found_value.get("topic"))
self.assertEqual(value.get("partition"),
found_value.get("partition"))
self.assertEqual(value.get("from_offset"),
found_value.get("from_offset"))
self.assertEqual(value.get("until_offset"),
found_value.get("until_offset"))
os.remove(os.path.join(self.test_resources_path, filename))
@ -173,11 +195,13 @@ class TestJSONOffsetSpecs(unittest.TestCase):
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
my_batch_time = self.get_dummy_batch_time()
used_values = {}
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1, from_offset=from_offset_1,
until_offset=until_offset_1)
until_offset=until_offset_1,
batch_time_info=my_batch_time)
kafka_offset_dict = self.load_offset_file_as_json(file_path)
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
@ -199,7 +223,8 @@ class TestJSONOffsetSpecs(unittest.TestCase):
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1, from_offset=from_offset_2,
until_offset=until_offset_2)
until_offset=until_offset_2,
batch_time_info=my_batch_time)
kafka_offset_dict = self.load_offset_file_as_json(file_path)
offset_value_updated = kafka_offset_dict.get(offset_key_1)

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import random
import sys
import unittest
@ -30,6 +31,12 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
def tearDown(self):
pass
def get_dummy_batch_time(self):
"""get a batch time for all tests."""
my_batch_time = datetime.datetime.strptime('2016-01-01 00:00:00',
'%Y-%m-%d %H:%M:%S')
return my_batch_time
def test_add_offset(self):
topic_1 = str(uuid.uuid4())
@ -39,47 +46,53 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
app_name_1 = str(uuid.uuid4())
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
my_batch_time = self.get_dummy_batch_time()
used_values = {}
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1)
until_offset=until_offset_1,
batch_time_info=my_batch_time)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
app_name_1)
offset_value_1 = kafka_offset_specs.get(offset_key_1)
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
offset_value=offset_value_1)
def test_add_another_offset(self):
offset_specs_at_outset = self.kafka_offset_specs.get_kafka_offsets()
offset_count = len(offset_specs_at_outset)
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
my_batch_time = self.get_dummy_batch_time()
used_values = {}
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1)
until_offset=until_offset_1,
batch_time_info=my_batch_time)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
app_name_1)
offset_value_1 = kafka_offset_specs.get(offset_key_1)
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
offset_value=offset_value_1)
self.assertEqual(offset_count + 1,
len(self.kafka_offset_specs.get_kafka_offsets()))
self.assertEqual(1,
len(self.kafka_offset_specs.get_kafka_offsets(
app_name_1)))
def test_update_offset_values(self):
topic_1 = str(uuid.uuid4())
@ -89,10 +102,13 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
app_name_1 = str(uuid.uuid4())
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
my_batch_time = self.get_dummy_batch_time()
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1)
until_offset=until_offset_1,
batch_time_info=my_batch_time)
until_offset_2 = random.randint(0, sys.maxsize)
while until_offset_2 == until_offset_1:
@ -105,9 +121,11 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_2,
until_offset=until_offset_2)
until_offset=until_offset_2,
batch_time_info=my_batch_time)
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
app_name_1)
updated_offset_value = kafka_offset_specs.get(offset_key_1)
self.assertEqual(from_offset_2, updated_offset_value.get_from_offset())
self.assertEqual(until_offset_2,

View File

@ -10,3 +10,14 @@ host = test_host_name
database_name = test_database_name
username = test_database_user_name
password = test_database_password
[stage_processors]
enable_pre_hourly_processor = False
[pre_hourly_processor]
enable_instance_usage_df_cache = False
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
[service]
enable_record_store_df_cache = False
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2

View File

@ -1,4 +1,15 @@
[DEFAULTS]
[messaging]
adapter = tests.unit.messaging.adapter:DummyAdapter
adapter = tests.unit.messaging.adapter:DummyAdapter
[stage_processors]
enable_pre_hourly_processor = False
[pre_hourly_processor]
enable_instance_usage_df_cache = False
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
[service]
enable_record_store_df_cache = False
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2

View File

@ -0,0 +1,23 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
class DataProvider(object):
_resource_path = 'tests/unit/test_resources/metrics_pre_hourly_data/'
metrics_pre_hourly_data_path = os.path.join(_resource_path,
"metrics_pre_hourly_data.txt")

View File

@ -0,0 +1,6 @@
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.total_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_total_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":16049.0})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.total_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_total_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":16049.0})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.total_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_total_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":16049.0})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:24:59","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:29:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422184.0,"quantity":10283.1})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:29:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:39:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466422784.0,"quantity":10283.1})
('',{"service_group":"all","geolocation":"all","aggregated_metric_name":"mem.usable_mb_agg","record_count":20.0,"user_id":"all","zone":"all","firstrecord_timestamp":"2016-06-20 11:39:44","tenant_id":"all","region":"all","usage_hour":"11","usage_date":"2016-06-20","processing_meta":{"metric_id":"mem_usable_all"},"resource_uuid":"all","aggregation_period":"hourly","host":"all","lastrecord_timestamp":"2016-06-20 11:49:44","firstrecord_timestamp_unix":1466421899.0,"service_id":"all","project_id":"all","usage_minute":"all","lastrecord_timestamp_unix":1466423384.0,"quantity":10283.1})

View File

@ -92,4 +92,9 @@ class MockComponentManager(object):
'DummyInsert',
DummyInsert(),
None),
Extension('insert_data_pre_hourly',
'monasca_transform.component.insert.dummy_insert:'
'DummyInsert',
DummyInsert(),
None),
])

View File

@ -11,23 +11,20 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import unittest
from oslo_config import cfg
from pyspark.streaming.kafka import OffsetRange
import json
import mock
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.driver.mon_metrics_kafka \
import MonMetricsKafkaProcessor
from monasca_transform.messaging.adapter import MessageAdapter
from monasca_transform.transform import RddTransformContext
from monasca_transform.transform import TransformContextUtils
from tests.unit.messaging.adapter import DummyAdapter
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.fetch_quantity_data.data_provider \
import DataProvider
@ -47,9 +44,9 @@ class TestFetchQuantityAgg(SparkContextTest):
'tests/unit/test_resources/config/'
'test_config_with_dummy_messaging_adapter.conf'])
# reset metric_id list dummy adapter
if not MessageAdapter.adapter_impl:
MessageAdapter.init()
MessageAdapter.adapter_impl.metric_list = []
if not DummyAdapter.adapter_impl:
DummyAdapter.init()
DummyAdapter.adapter_impl.metric_list = []
def get_pre_transform_specs_json(self):
"""get pre_transform_specs driver table info."""
@ -141,7 +138,9 @@ class TestFetchQuantityAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -150,7 +149,7 @@ class TestFetchQuantityAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
mem_total_mb_agg_metric = [
value for value in metrics
@ -247,7 +246,9 @@ class TestFetchQuantityAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -256,7 +257,7 @@ class TestFetchQuantityAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
mem_total_mb_agg_metric = [
value for value in metrics
@ -352,7 +353,8 @@ class TestFetchQuantityAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -361,7 +363,7 @@ class TestFetchQuantityAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
mem_total_mb_agg_metric = [
value for value in metrics
@ -457,7 +459,9 @@ class TestFetchQuantityAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -466,7 +470,7 @@ class TestFetchQuantityAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
mem_total_mb_agg_metric = [
value for value in metrics
@ -562,7 +566,9 @@ class TestFetchQuantityAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -571,7 +577,7 @@ class TestFetchQuantityAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
mem_total_mb_agg_metric = [
value for value in metrics
@ -667,7 +673,9 @@ class TestFetchQuantityAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -676,7 +684,7 @@ class TestFetchQuantityAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
mem_total_mb_agg_metric = [
value for value in metrics

View File

@ -11,14 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import unittest
from oslo_config import cfg
from pyspark.streaming.kafka import OffsetRange
import json
import mock
from monasca_transform.component.usage.fetch_quantity_util import \
FetchQuantityUtilException
@ -26,10 +25,10 @@ from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.driver.mon_metrics_kafka \
import MonMetricsKafkaProcessor
from monasca_transform.messaging.adapter import MessageAdapter
from monasca_transform.transform import RddTransformContext
from monasca_transform.transform import TransformContextUtils
from tests.unit.messaging.adapter import DummyAdapter
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.cpu_kafka_data.data_provider import DataProvider
from tests.unit.test_resources.mock_component_manager \
@ -48,9 +47,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
'tests/unit/test_resources/config/'
'test_config_with_dummy_messaging_adapter.conf'])
# reset metric_id list dummy adapter
if not MessageAdapter.adapter_impl:
MessageAdapter.init()
MessageAdapter.adapter_impl.metric_list = []
if not DummyAdapter.adapter_impl:
DummyAdapter.init()
DummyAdapter.adapter_impl.metric_list = []
def get_pre_transform_specs_json(self):
"""get pre_transform_specs driver table info."""
@ -164,7 +163,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -173,7 +174,7 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
utilized_cpu_logical_agg_metric = [
value for value in metrics
@ -265,7 +266,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -274,7 +277,7 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
utilized_cpu_logical_agg_metric = [
value for value in metrics
@ -366,7 +369,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -375,7 +380,7 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
utilized_cpu_logical_agg_metric = [
value for value in metrics
@ -467,7 +472,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -529,7 +536,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -591,7 +600,9 @@ class TestFetchQuantityUtilAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))

View File

@ -11,21 +11,19 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from oslo_config import cfg
from pyspark.streaming.kafka import OffsetRange
import mock
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.driver.mon_metrics_kafka \
import MonMetricsKafkaProcessor
from monasca_transform.messaging.adapter import MessageAdapter
from monasca_transform.transform import RddTransformContext
from monasca_transform.transform import TransformContextUtils
from tests.unit.messaging.adapter import DummyAdapter
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.cpu_kafka_data.data_provider import DataProvider
from tests.unit.test_resources.mock_component_manager \
@ -42,9 +40,9 @@ class SparkTest(SparkContextTest):
'tests/unit/test_resources/config/'
'test_config_with_dummy_messaging_adapter.conf'])
# reset metric_id list dummy adapter
if not MessageAdapter.adapter_impl:
MessageAdapter.init()
MessageAdapter.adapter_impl.metric_list = []
if not DummyAdapter.adapter_impl:
DummyAdapter.init()
DummyAdapter.adapter_impl.metric_list = []
@mock.patch('monasca_transform.transform.builder.'
'generic_transform_builder.GenericTransformBuilder.'
@ -81,7 +79,9 @@ class SparkTest(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -90,7 +90,7 @@ class SparkTest(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
# Verify cpu.total_logical_cores_agg for all hosts
total_cpu_logical_agg_metric = [

View File

@ -11,7 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
from monasca_transform.component.usage.fetch_quantity \
@ -40,9 +39,9 @@ class UsageComponentTest(SparkContextTest):
self.sql_context,
DataProvider.transform_spec_path)
transform_context = \
TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
transform_context = TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df,
batch_time_info=self.get_dummy_batch_time())
instance_usage_df = FetchQuantity.usage(
transform_context, record_store_df)

View File

@ -11,23 +11,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import unittest
from oslo_config import cfg
from pyspark.streaming.kafka import OffsetRange
import json
import mock
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.driver.mon_metrics_kafka \
import MonMetricsKafkaProcessor
from monasca_transform.messaging.adapter import MessageAdapter
from monasca_transform.transform import RddTransformContext
from monasca_transform.transform import TransformContextUtils
from tests.unit.messaging.adapter import DummyAdapter
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.kafka_data.data_provider import DataProvider
from tests.unit.test_resources.mock_component_manager \
@ -46,9 +44,9 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
'tests/unit/test_resources/config/'
'test_config_with_dummy_messaging_adapter.conf'])
# reset metric_id list dummy adapter
if not MessageAdapter.adapter_impl:
MessageAdapter.init()
MessageAdapter.adapter_impl.metric_list = []
if not DummyAdapter.adapter_impl:
DummyAdapter.init()
DummyAdapter.adapter_impl.metric_list = []
def get_pre_transform_specs_json_by_project(self):
"""get pre_transform_specs driver table info."""
@ -134,7 +132,9 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -143,7 +143,7 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
vcpus_agg_metric = [
value for value in metrics
@ -307,7 +307,9 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
@ -316,7 +318,7 @@ class TestVmCpuAllocatedAgg(SparkContextTest):
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
metrics = DummyAdapter.adapter_impl.metric_list
vcpus_agg_metric = [
value for value in metrics

View File

@ -28,7 +28,6 @@ Vagrant.configure(2) do |config|
# Create a private network, which allows host-only access to the machine
# using a specific IP.
config.vm.network "private_network", ip: "192.168.15.6"
# Create a public network, which generally matched to bridged network.
# Bridged networks make the machine appear as another physical device on
# your network.
@ -62,7 +61,7 @@ Vagrant.configure(2) do |config|
# # Customize the amount of memory on the VM:
vb.memory = "16384"
vb.cpus = "2"
vb.cpus = "4"
end
if ENV['http_proxy'] && !ENV['http_proxy'].empty?
# we need to configure a proxy for maven also
@ -70,6 +69,10 @@ Vagrant.configure(2) do |config|
destination: "settings.xml"
config.vm.provision "shell", path: "setup_maven_proxy.sh"
end
config.vm.network "forwarded_port", guest: 18080, host: 18080
config.vm.network "forwarded_port", guest: 18081, host: 18081
config.vm.network "forwarded_port", guest: 4040, host: 4040
# install dependencies for our process
config.vm.provision "shell", path: "install.sh"
# provision the environments

View File

@ -31,7 +31,7 @@ MONASCA_METRICS_DB=${MONASCA_METRICS_DB:-influxdb}
# This line will enable all of Monasca.
enable_plugin monasca-api /home/vagrant/monasca-api
# using the above, patched monasca-api we can disable some unnecessary services
disable_service monasca-persister
#disable_service monasca-persister
disable_service monasca-thresh
disable_service monasca-notification
disable_service horizon

View File

@ -31,6 +31,7 @@ commands =
tests/unit/usage/test_fetch_quantity_agg.py \
tests/unit/usage/test_fetch_quantity_util_agg.py \
tests/unit/usage/test_host_cpu_usage_component.py \
tests/unit/processor/test_pre_hourly_processor_agg.py \
tests/unit/usage/test_usage_component.py \
tests/unit/usage/test_vm_cpu_allocated_agg.py -e tests_to_fix
@ -48,6 +49,7 @@ max-complexity = 30
# H302 import only modules
# H904 Wrap long lines in parentheses instead of a backslash (DEPRECATED)
# H405 Multiline docstring separated by empty line
ignore = H302,H904,H405
# E402 module level import not at top of file FIXME remove this
ignore = H302,H904,H405,E402
show-source = True
exclude=.venv,.git,.tox,dist,*egg,build,tests_to_fix