add test cases for first and second stage of host and vm cpu usage processing
Add more test data. Change-Id: I824823cf7b6447ddd62f6d67d0de855d70ba4713
This commit is contained in:
parent
a9775506cb
commit
6b40f661ad
|
@ -0,0 +1,8 @@
|
|||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 7.0, 'resource_uuid': 'all', 'host': 'all', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 16:40:00', 'aggregated_metric_name': 'vcpus_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 16:40:00', 'processing_meta': {'metric_id': 'vcpus_all'}, 'firstrecord_timestamp_unix': 1453308000.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1453308000.0, 'quantity': 0.0}
|
||||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 7.0, 'resource_uuid': 'all', 'host': 'all', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 16:40:00', 'aggregated_metric_name': 'vcpus_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': '103e4d4d14bc4fdda4a9c73d1643e1d7', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 16:40:00', 'processing_meta': {'metric_id': 'vcpus_project'}, 'firstrecord_timestamp_unix': 1453308000.0, 'service_id': 'all', 'project_id': '103e4d4d14bc4fdda4a9c73d1643e1d7', 'lastrecord_timestamp_unix': 1453308000.0, 'quantity': 0.0}
|
||||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 13.0, 'resource_uuid': 'all', 'host': 'all', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-03-07 16:10:38', 'aggregated_metric_name': 'cpu.total_logical_cores_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-03-07', 'firstrecord_timestamp_string': '2016-03-07 16:09:23', 'processing_meta': {'metric_id': 'cpu_total_all'}, 'firstrecord_timestamp_unix': 1457366963.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1457367038.0, 'quantity': 15.0}
|
||||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 7.0, 'resource_uuid': 'all', 'host': 'devstack', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-03-07 16:10:38', 'aggregated_metric_name': 'cpu.total_logical_cores_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-03-07', 'firstrecord_timestamp_string': '2016-03-07 16:09:23', 'processing_meta': {'metric_id': 'cpu_total_host'}, 'firstrecord_timestamp_unix': 1457366963.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1457367038.0, 'quantity': 6.0}
|
||||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 6.0, 'resource_uuid': 'all', 'host': 'mini-mon', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-03-07 16:10:38', 'aggregated_metric_name': 'cpu.total_logical_cores_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-03-07', 'firstrecord_timestamp_string': '2016-03-07 16:09:23', 'processing_meta': {'metric_id': 'cpu_total_host'}, 'firstrecord_timestamp_unix': 1457366963.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1457367038.0, 'quantity': 9.0}
|
||||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 7.0, 'resource_uuid': 'all', 'host': 'devstack', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-03-07 16:10:38', 'aggregated_metric_name': 'cpu.utilized_logical_cores_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-03-07', 'firstrecord_timestamp_string': '2016-03-07 16:09:23', 'processing_meta': {'metric_id': 'cpu_util_host'}, 'firstrecord_timestamp_unix': 1457366963.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1457367038.0, 'quantity': 3.0}
|
||||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 6.0, 'resource_uuid': 'all', 'host': 'mini-mon', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-03-07 16:10:38', 'aggregated_metric_name': 'cpu.utilized_logical_cores_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-03-07', 'firstrecord_timestamp_string': '2016-03-07 16:09:23', 'processing_meta': {'metric_id': 'cpu_util_host'}, 'firstrecord_timestamp_unix': 1457366963.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1457367038.0, 'quantity': 5.0}
|
||||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 13.0, 'resource_uuid': 'all', 'host': 'all', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-03-07 16:10:38', 'aggregated_metric_name': 'cpu.utilized_logical_cores_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-03-07', 'firstrecord_timestamp_string': '2016-03-07 16:09:23', 'processing_meta': {'metric_id': 'cpu_util_all'}, 'firstrecord_timestamp_unix': 1457366963.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1457367038.0, 'quantity': 8.0}
|
|
@ -0,0 +1,23 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
|
||||
class DataProvider(object):
|
||||
|
||||
_resource_path = 'tests/unit/test_resources/cpu_kafka_data_second_stage/'
|
||||
|
||||
kafka_data_path = os.path.join(_resource_path,
|
||||
"cpu_kafka_data.txt")
|
|
@ -0,0 +1,25 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
|
||||
class DataProvider(object):
|
||||
|
||||
_resource_path = 'tests/unit/test_resources/kafka_data_second_stage/'
|
||||
|
||||
kafka_data_path_by_project = os.path.join(_resource_path,
|
||||
"kafka_data_by_project.txt")
|
||||
kafka_data_path_by_all = os.path.join(_resource_path,
|
||||
"kafka_data_by_all.txt")
|
|
@ -0,0 +1 @@
|
|||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 14.0, 'resource_uuid': 'all', 'host': 'all', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 16:40:46', 'aggregated_metric_name': 'vcpus_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 16:40:00', 'processing_meta': {'metric_id': 'vcpus_all'}, 'firstrecord_timestamp_unix': 1453308000.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1453308046.0, 'quantity': 7.0}
|
|
@ -0,0 +1,2 @@
|
|||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 8.0, 'resource_uuid': 'all', 'host': 'all', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 16:40:46', 'aggregated_metric_name': 'vcpus_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': '9647fd5030b04a799b0411cc38c4102d', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 16:40:05', 'processing_meta': {'metric_id': 'vcpus_project'}, 'firstrecord_timestamp_unix': 1453308005.0, 'service_id': 'all', 'project_id': '9647fd5030b04a799b0411cc38c4102d', 'lastrecord_timestamp_unix': 1453308046.0, 'quantity': 6.0}
|
||||
{'usage_hour': '16', 'geolocation': 'all', 'record_count': 6.0, 'resource_uuid': 'all', 'host': 'all', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 16:40:42', 'aggregated_metric_name': 'vcpus_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': '8647fd5030b04a799b0411cc38c4102d', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 16:40:00', 'processing_meta': {'metric_id': 'vcpus_project'}, 'firstrecord_timestamp_unix': 1453308000.0, 'service_id': 'all', 'project_id': '8647fd5030b04a799b0411cc38c4102d', 'lastrecord_timestamp_unix': 1453308042.0, 'quantity': 1.0}
|
|
@ -0,0 +1,629 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.driver.mon_metrics_kafka \
|
||||
import MonMetricsKafkaProcessor
|
||||
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
from pyspark.sql import SQLContext
|
||||
from tests.unit.component.insert.dummy_insert import DummyInsert
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
from tests.unit.spark_context_test import SparkContextTest
|
||||
from tests.unit.test_resources.cpu_kafka_data.data_provider import DataProvider
|
||||
from tests.unit.test_resources.cpu_kafka_data_second_stage.data_provider \
|
||||
import DataProvider as SecondStageDataProvider
|
||||
from tests.unit.test_resources.mock_component_manager \
|
||||
import MockComponentManager
|
||||
from tests.unit.usage import dump_as_ascii_string
|
||||
|
||||
|
||||
class SparkTest(SparkContextTest):
|
||||
|
||||
def setUp(self):
|
||||
super(SparkTest, self).setUp()
|
||||
# configure the system with a dummy messaging adapter
|
||||
ConfigInitializer.basic_config(
|
||||
default_config_files=[
|
||||
'tests/unit/test_resources/config/'
|
||||
'test_config_with_dummy_messaging_adapter.conf'])
|
||||
# reset metric_id list dummy adapter
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_insert_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_setter_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_usage_component_manager')
|
||||
def test_rdd_to_recordstore(self,
|
||||
usage_manager,
|
||||
setter_manager,
|
||||
insert_manager):
|
||||
|
||||
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
|
||||
setter_manager.return_value = \
|
||||
MockComponentManager.get_setter_cmpt_mgr()
|
||||
insert_manager.return_value = \
|
||||
MockComponentManager.get_insert_pre_hourly_cmpt_mgr()
|
||||
|
||||
# Create an RDD out of the mocked Monasca metrics
|
||||
with open(DataProvider.kafka_data_path) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
|
||||
|
||||
rdd_monasca = self.spark_context.parallelize(raw_tuple_list)
|
||||
|
||||
# decorate mocked RDD with dummy kafka offsets
|
||||
myOffsetRanges = [
|
||||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
# Call the primary method in mon_metrics_kafka
|
||||
MonMetricsKafkaProcessor.rdd_to_recordstore(
|
||||
rdd_monasca_with_offsets)
|
||||
|
||||
host_usage_list = DummyAdapter.adapter_impl.metric_list
|
||||
host_usage_list = map(dump_as_ascii_string,
|
||||
host_usage_list)
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
host_usage_rdd = self.spark_context.parallelize(host_usage_list)
|
||||
sql_context = SQLContext(self.spark_context)
|
||||
host_usage_df = sql_context.read.json(host_usage_rdd)
|
||||
PreHourlyProcessor.do_transform(host_usage_df)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
# Verify cpu.total_logical_cores_agg for all hosts
|
||||
total_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.total_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'all'][0]
|
||||
|
||||
self.assertEqual(15.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('hourly',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(13.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.total_logical_cores_agg for mini-mon host
|
||||
total_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.total_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'mini-mon'][0]
|
||||
|
||||
self.assertEqual(9.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('hourly',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(6.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.total_logical_cores_agg for devstack host
|
||||
total_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.total_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'devstack'][0]
|
||||
|
||||
self.assertEqual(6.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('hourly',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(7.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.utilized_logical_cores_agg for all hosts
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.utilized_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'all'][0]
|
||||
|
||||
self.assertEqual(8.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('hourly',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(13.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.utilized_logical_cores_agg for the mini-mon host
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.utilized_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'mini-mon'][0]
|
||||
|
||||
self.assertEqual(5.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('hourly',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(6.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.utilized_logical_cores_agg for the devstack host
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.utilized_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'devstack'][0]
|
||||
|
||||
self.assertEqual(3.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('hourly',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(7.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_insert_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_setter_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_usage_component_manager')
|
||||
def test_rdd_to_recordstore_second_stage(self,
|
||||
usage_manager,
|
||||
setter_manager,
|
||||
insert_manager):
|
||||
|
||||
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
|
||||
setter_manager.return_value = \
|
||||
MockComponentManager.get_setter_cmpt_mgr()
|
||||
insert_manager.return_value = \
|
||||
MockComponentManager.get_insert_pre_hourly_cmpt_mgr()
|
||||
|
||||
# Create an RDD out of the mocked Monasca metrics
|
||||
with open(SecondStageDataProvider.kafka_data_path) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
|
||||
host_usage_list = map(dump_as_ascii_string,
|
||||
raw_tuple_list)
|
||||
sql_context = SQLContext(self.spark_context)
|
||||
host_usage_rdd = self.spark_context.parallelize(host_usage_list)
|
||||
host_usage_df = sql_context.read.json(host_usage_rdd)
|
||||
PreHourlyProcessor.do_transform(host_usage_df)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
# Verify cpu.total_logical_cores_agg for all hosts
|
||||
total_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.total_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'all'][0]
|
||||
|
||||
self.assertEqual(15.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('prehourly',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(13.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.total_logical_cores_agg for mini-mon host
|
||||
total_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.total_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'mini-mon'][0]
|
||||
|
||||
self.assertEqual(9.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('prehourly',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(6.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.total_logical_cores_agg for devstack host
|
||||
total_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.total_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'devstack'][0]
|
||||
|
||||
self.assertEqual(6.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('prehourly',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(7.0,
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
total_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.utilized_logical_cores_agg for all hosts
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.utilized_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'all'][0]
|
||||
|
||||
self.assertEqual(8.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('prehourly',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(13.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.utilized_logical_cores_agg for the mini-mon host
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.utilized_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'mini-mon'][0]
|
||||
|
||||
self.assertEqual(5.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('prehourly',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(6.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
# Verify cpu.utilized_logical_cores_agg for the devstack host
|
||||
utilized_cpu_logical_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'cpu.utilized_logical_cores_agg' and
|
||||
value.get('metric').get('dimensions').get('host') ==
|
||||
'devstack'][0]
|
||||
|
||||
self.assertEqual(3.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('project_id'))
|
||||
self.assertEqual('prehourly',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(7.0,
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('record_count'))
|
||||
self.assertEqual('2016-03-07 16:09:23',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-03-07 16:10:38',
|
||||
utilized_cpu_logical_agg_metric.get(
|
||||
'metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
|
||||
def simple_count_transform(rdd):
|
||||
return rdd.count()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("PATH *************************************************************")
|
||||
import sys
|
||||
print(sys.path)
|
||||
print("PATH==============================================================")
|
||||
unittest.main()
|
|
@ -0,0 +1,602 @@
|
|||
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import json
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.driver.mon_metrics_kafka \
|
||||
import MonMetricsKafkaProcessor
|
||||
|
||||
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
from pyspark.sql import SQLContext
|
||||
from tests.unit.component.insert.dummy_insert import DummyInsert
|
||||
from tests.unit.messaging.adapter import DummyAdapter
|
||||
from tests.unit.spark_context_test import SparkContextTest
|
||||
from tests.unit.test_resources.kafka_data.data_provider import DataProvider
|
||||
from tests.unit.test_resources.kafka_data_second_stage.data_provider \
|
||||
import DataProvider as SecondStageDataProvider
|
||||
from tests.unit.test_resources.mock_component_manager \
|
||||
import MockComponentManager
|
||||
from tests.unit.test_resources.mock_data_driven_specs_repo \
|
||||
import MockDataDrivenSpecsRepo
|
||||
from tests.unit.usage import dump_as_ascii_string
|
||||
|
||||
|
||||
class TestVmCpuAllocatedAgg(SparkContextTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestVmCpuAllocatedAgg, self).setUp()
|
||||
# configure the system with a dummy messaging adapter
|
||||
ConfigInitializer.basic_config(
|
||||
default_config_files=[
|
||||
'tests/unit/test_resources/config/'
|
||||
'test_config_with_dummy_messaging_adapter.conf'])
|
||||
# reset metric_id list dummy adapter
|
||||
if not DummyAdapter.adapter_impl:
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
def get_pre_transform_specs_json_by_project(self):
|
||||
"""get pre_transform_specs driver table info."""
|
||||
pre_transform_specs_json = """
|
||||
{"event_processing_params":{"set_default_zone_to":"1",
|
||||
"set_default_geolocation_to":"1",
|
||||
"set_default_region_to":"W"},
|
||||
"event_type":"vcpus",
|
||||
"metric_id_list":["vcpus_project"],
|
||||
"required_raw_fields_list":["creation_time"],
|
||||
"service_id":"host_metrics"}"""
|
||||
return [json.loads(pre_transform_specs_json)]
|
||||
|
||||
def get_transform_specs_json_by_project(self):
|
||||
"""get transform_specs driver table info."""
|
||||
transform_specs_json = """
|
||||
{"aggregation_params_map":{
|
||||
"aggregation_pipeline":{"source":"streaming",
|
||||
"usage":"fetch_quantity",
|
||||
"setters":["rollup_quantity",
|
||||
"set_aggregated_metric_name",
|
||||
"set_aggregated_period"],
|
||||
"insert":["prepare_data",
|
||||
"insert_data_pre_hourly"]},
|
||||
"aggregated_metric_name": "vcpus_agg",
|
||||
"aggregation_period": "hourly",
|
||||
"aggregation_group_by_list": ["host", "metric_id", "tenant_id"],
|
||||
"usage_fetch_operation": "latest",
|
||||
"setter_rollup_group_by_list": ["tenant_id"],
|
||||
"setter_rollup_operation": "sum",
|
||||
"pre_hourly_operation":"sum",
|
||||
"pre_hourly_group_by_list":["default"],
|
||||
|
||||
"dimension_list":["aggregation_period",
|
||||
"host",
|
||||
"project_id"]
|
||||
},
|
||||
"metric_group":"vcpus_project",
|
||||
"metric_id":"vcpus_project"}"""
|
||||
return [json.loads(transform_specs_json)]
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.'
|
||||
'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_insert_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_setter_component_manager')
|
||||
@mock.patch('monasca_transform.transform.'
|
||||
'builder.generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_usage_component_manager')
|
||||
def test_vcpus_by_project(self,
|
||||
usage_manager,
|
||||
setter_manager,
|
||||
insert_manager,
|
||||
data_driven_specs_repo):
|
||||
|
||||
# load components
|
||||
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
|
||||
setter_manager.return_value = \
|
||||
MockComponentManager.get_setter_cmpt_mgr()
|
||||
insert_manager.return_value = \
|
||||
MockComponentManager.get_insert_pre_hourly_cmpt_mgr()
|
||||
|
||||
# init mock driver tables
|
||||
data_driven_specs_repo.return_value = \
|
||||
MockDataDrivenSpecsRepo(self.spark_context,
|
||||
self.
|
||||
get_pre_transform_specs_json_by_project(),
|
||||
self.get_transform_specs_json_by_project())
|
||||
|
||||
# Create an RDD out of the mocked Monasca metrics
|
||||
with open(DataProvider.kafka_data_path) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
|
||||
|
||||
rdd_monasca = self.spark_context.parallelize(raw_tuple_list)
|
||||
|
||||
# decorate mocked RDD with dummy kafka offsets
|
||||
myOffsetRanges = [
|
||||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
# Call the primary method in mon_metrics_kafka
|
||||
MonMetricsKafkaProcessor.rdd_to_recordstore(
|
||||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
vm_cpu_list = DummyAdapter.adapter_impl.metric_list
|
||||
vm_cpu_list = map(dump_as_ascii_string, vm_cpu_list)
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
vm_cpu_rdd = self.spark_context.parallelize(vm_cpu_list)
|
||||
sql_context = SQLContext(self.spark_context)
|
||||
vm_cpu_df = sql_context.read.json(vm_cpu_rdd)
|
||||
PreHourlyProcessor.do_transform(vm_cpu_df)
|
||||
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
vcpus_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'vcpus_agg' and
|
||||
value.get('metric').get('dimensions').get('project_id') ==
|
||||
'9647fd5030b04a799b0411cc38c4102d'][0]
|
||||
|
||||
self.assertTrue(vcpus_agg_metric is not None)
|
||||
|
||||
self.assertEqual(6.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions').get('host'))
|
||||
self.assertEqual('hourly',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(8.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta').get('record_count'))
|
||||
self.assertEqual('2016-01-20 16:40:05',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-01-20 16:40:46',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
vcpus_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'vcpus_agg' and
|
||||
value.get('metric').get('dimensions').get('project_id') ==
|
||||
'8647fd5030b04a799b0411cc38c4102d'][0]
|
||||
|
||||
self.assertTrue(vcpus_agg_metric is not None)
|
||||
|
||||
self.assertEqual(1.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions').get('host'))
|
||||
self.assertEqual('hourly',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(6.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta').get('record_count'))
|
||||
self.assertEqual('2016-01-20 16:40:00',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-01-20 16:40:42',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.'
|
||||
'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_insert_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_setter_component_manager')
|
||||
@mock.patch('monasca_transform.transform.'
|
||||
'builder.generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_usage_component_manager')
|
||||
def test_vcpus_by_project_second_stage(self,
|
||||
usage_manager,
|
||||
setter_manager,
|
||||
insert_manager,
|
||||
data_driven_specs_repo):
|
||||
|
||||
# load components
|
||||
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
|
||||
setter_manager.return_value = \
|
||||
MockComponentManager.get_setter_cmpt_mgr()
|
||||
insert_manager.return_value = \
|
||||
MockComponentManager.get_insert_pre_hourly_cmpt_mgr()
|
||||
|
||||
# init mock driver tables
|
||||
data_driven_specs_repo.return_value = \
|
||||
MockDataDrivenSpecsRepo(self.spark_context,
|
||||
self.
|
||||
get_pre_transform_specs_json_by_project(),
|
||||
self.get_transform_specs_json_by_project())
|
||||
|
||||
# Create an RDD out of the mocked Monasca metrics
|
||||
with open(SecondStageDataProvider.kafka_data_path_by_project) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
|
||||
|
||||
vm_cpu_rdd = self.spark_context.parallelize(raw_tuple_list)
|
||||
sql_context = SQLContext(self.spark_context)
|
||||
vm_cpu_df = sql_context.read.json(vm_cpu_rdd)
|
||||
PreHourlyProcessor.do_transform(vm_cpu_df)
|
||||
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
vcpus_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'vcpus_agg' and
|
||||
value.get('metric').get('dimensions').get('project_id') ==
|
||||
'9647fd5030b04a799b0411cc38c4102d'][0]
|
||||
|
||||
self.assertTrue(vcpus_agg_metric is not None)
|
||||
|
||||
self.assertEqual(6.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions').get('host'))
|
||||
self.assertEqual('prehourly',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(8.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta').get('record_count'))
|
||||
self.assertEqual('2016-01-20 16:40:05',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-01-20 16:40:46',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
vcpus_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'vcpus_agg' and
|
||||
value.get('metric').get('dimensions').get('project_id') ==
|
||||
'8647fd5030b04a799b0411cc38c4102d'][0]
|
||||
|
||||
self.assertTrue(vcpus_agg_metric is not None)
|
||||
|
||||
self.assertEqual(1.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions').get('host'))
|
||||
self.assertEqual('prehourly',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(6.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta').get('record_count'))
|
||||
self.assertEqual('2016-01-20 16:40:00',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-01-20 16:40:42',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
def get_pre_transform_specs_json_by_all(self):
|
||||
"""get pre_transform_specs driver table info."""
|
||||
pre_transform_specs_json = """
|
||||
{"event_processing_params":{"set_default_zone_to":"1",
|
||||
"set_default_geolocation_to":"1",
|
||||
"set_default_region_to":"W"},
|
||||
"event_type":"vcpus",
|
||||
"metric_id_list":["vcpus_all"],
|
||||
"required_raw_fields_list":["creation_time"],
|
||||
"service_id":"host_metrics"}"""
|
||||
return [json.loads(pre_transform_specs_json)]
|
||||
|
||||
def get_transform_specs_json_by_all(self):
|
||||
"""get transform_specs driver table info."""
|
||||
transform_specs_json = """
|
||||
{"aggregation_params_map":{
|
||||
"aggregation_pipeline":{"source":"streaming",
|
||||
"usage":"fetch_quantity",
|
||||
"setters":["rollup_quantity",
|
||||
"set_aggregated_metric_name",
|
||||
"set_aggregated_period"],
|
||||
"insert":["prepare_data",
|
||||
"insert_data_pre_hourly"]},
|
||||
"aggregated_metric_name": "vcpus_agg",
|
||||
"aggregation_period": "hourly",
|
||||
"aggregation_group_by_list": ["host", "metric_id"],
|
||||
"usage_fetch_operation": "latest",
|
||||
"setter_rollup_group_by_list": [],
|
||||
"setter_rollup_operation": "sum",
|
||||
"pre_hourly_group_by_list":["default"],
|
||||
"pre_hourly_operation":"sum",
|
||||
|
||||
"dimension_list":["aggregation_period",
|
||||
"host",
|
||||
"project_id"]
|
||||
},
|
||||
"metric_group":"vcpus_all",
|
||||
"metric_id":"vcpus_all"}"""
|
||||
return [json.loads(transform_specs_json)]
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.'
|
||||
'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_insert_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_setter_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_usage_component_manager')
|
||||
def test_vcpus_by_all(self,
|
||||
usage_manager,
|
||||
setter_manager,
|
||||
insert_manager,
|
||||
data_driven_specs_repo):
|
||||
|
||||
# load components
|
||||
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
|
||||
setter_manager.return_value = \
|
||||
MockComponentManager.get_setter_cmpt_mgr()
|
||||
insert_manager.return_value = \
|
||||
MockComponentManager.get_insert_pre_hourly_cmpt_mgr()
|
||||
|
||||
# init mock driver tables
|
||||
data_driven_specs_repo.return_value = \
|
||||
MockDataDrivenSpecsRepo(
|
||||
self.spark_context,
|
||||
self.get_pre_transform_specs_json_by_all(),
|
||||
self.get_transform_specs_json_by_all())
|
||||
|
||||
# Create an RDD out of the mocked Monasca metrics
|
||||
with open(DataProvider.kafka_data_path) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
|
||||
|
||||
rdd_monasca = self.spark_context.parallelize(raw_tuple_list)
|
||||
|
||||
# decorate mocked RDD with dummy kafka offsets
|
||||
myOffsetRanges = [
|
||||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
# Call the primary method in mon_metrics_kafka
|
||||
MonMetricsKafkaProcessor.rdd_to_recordstore(
|
||||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
vm_cpu_list = map(dump_as_ascii_string, metrics)
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
vm_cpu_rdd = self.spark_context.parallelize(vm_cpu_list)
|
||||
sql_context = SQLContext(self.spark_context)
|
||||
vm_cpu_df = sql_context.read.json(vm_cpu_rdd)
|
||||
PreHourlyProcessor.do_transform(vm_cpu_df)
|
||||
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
vcpus_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'vcpus_agg' and
|
||||
value.get('metric').get('dimensions').get('project_id') ==
|
||||
'all'][0]
|
||||
|
||||
self.assertTrue(vcpus_agg_metric is not None)
|
||||
|
||||
self.assertEqual(7.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions').get('host'))
|
||||
self.assertEqual('hourly',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(14.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta').get('record_count'))
|
||||
self.assertEqual('2016-01-20 16:40:00',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-01-20 16:40:46',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.'
|
||||
'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_insert_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_setter_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_usage_component_manager')
|
||||
def test_vcpus_by_all_second_stage(self,
|
||||
usage_manager,
|
||||
setter_manager,
|
||||
insert_manager,
|
||||
data_driven_specs_repo):
|
||||
|
||||
# load components
|
||||
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
|
||||
setter_manager.return_value = \
|
||||
MockComponentManager.get_setter_cmpt_mgr()
|
||||
insert_manager.return_value = \
|
||||
MockComponentManager.get_insert_pre_hourly_cmpt_mgr()
|
||||
|
||||
# init mock driver tables
|
||||
data_driven_specs_repo.return_value = \
|
||||
MockDataDrivenSpecsRepo(
|
||||
self.spark_context,
|
||||
self.get_pre_transform_specs_json_by_all(),
|
||||
self.get_transform_specs_json_by_all())
|
||||
|
||||
# Create an RDD out of the mocked Monasca metrics
|
||||
with open(SecondStageDataProvider.kafka_data_path_by_all) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
|
||||
vm_cpu_rdd = self.spark_context.parallelize(raw_tuple_list)
|
||||
sql_context = SQLContext(self.spark_context)
|
||||
vm_cpu_df = sql_context.read.json(vm_cpu_rdd)
|
||||
PreHourlyProcessor.do_transform(vm_cpu_df)
|
||||
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
vcpus_agg_metric = [
|
||||
value for value in metrics
|
||||
if value.get('metric').get('name') ==
|
||||
'vcpus_agg' and
|
||||
value.get('metric').get('dimensions').get('project_id') ==
|
||||
'all'][0]
|
||||
|
||||
self.assertTrue(vcpus_agg_metric is not None)
|
||||
|
||||
self.assertEqual(7.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value'))
|
||||
self.assertEqual('useast',
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('region'))
|
||||
|
||||
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
|
||||
vcpus_agg_metric
|
||||
.get('meta').get('tenantId'))
|
||||
self.assertEqual('all',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions').get('host'))
|
||||
self.assertEqual('prehourly',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('dimensions')
|
||||
.get('aggregation_period'))
|
||||
|
||||
self.assertEqual(14.0,
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta').get('record_count'))
|
||||
self.assertEqual('2016-01-20 16:40:00',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('firstrecord_timestamp_string'))
|
||||
self.assertEqual('2016-01-20 16:40:46',
|
||||
vcpus_agg_metric
|
||||
.get('metric').get('value_meta')
|
||||
.get('lastrecord_timestamp_string'))
|
||||
|
||||
|
||||
def simple_count_transform(rdd):
|
||||
return rdd.count()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("PATH *************************************************************")
|
||||
import sys
|
||||
print(sys.path)
|
||||
print("PATH==============================================================")
|
||||
unittest.main()
|
Loading…
Reference in New Issue