diff --git a/tests/unit/component/__init__.py b/tests/unit/component/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/component/insert/__init__.py b/tests/unit/component/insert/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monasca_transform/component/insert/dummy_insert.py b/tests/unit/component/insert/dummy_insert.py similarity index 100% rename from monasca_transform/component/insert/dummy_insert.py rename to tests/unit/component/insert/dummy_insert.py diff --git a/tests/unit/component/insert/dummy_insert_pre_hourly.py b/tests/unit/component/insert/dummy_insert_pre_hourly.py new file mode 100644 index 0000000..b3fba96 --- /dev/null +++ b/tests/unit/component/insert/dummy_insert_pre_hourly.py @@ -0,0 +1,69 @@ +# Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from monasca_transform.component.insert import InsertComponent +from oslo_config import cfg +from tests.unit.messaging.adapter import DummyAdapter + + +class DummyInsertPreHourly(InsertComponent): + """Insert component that writes metric data to + to kafka queue + """ + + @staticmethod + def insert(transform_context, instance_usage_df): + """write instance usage data to kafka""" + + transform_spec_df = transform_context.transform_spec_df_info + + agg_params = transform_spec_df.select("metric_id" + ).collect()[0].asDict() + metric_id = agg_params['metric_id'] + + cfg.CONF.set_override('adapter', + 'tests.unit.messaging.adapter:DummyAdapter', + group='messaging') + # Approach 1 + # using foreachPartition to iterate through elements in an + # RDD is the recommended approach so as to not overwhelm kafka with the + # zillion connections (but in our case the MessageAdapter does + # store the adapter_impl so we should not create many producers) + + # using foreachpartitions was causing some serialization (cpickle) + # problems where few libs like kafka.SimpleProducer and oslo_config.cfg + # were not available + # + # removing _write_metrics_from_partition for now in favor of + # Approach 2 + # + + # instance_usage_df_agg_params = instance_usage_df.rdd.map( + # lambda x: InstanceUsageDataAggParams(x, + # agg_params)) + # instance_usage_df_agg_params.foreachPartition( + # DummyInsert._write_metrics_from_partition) + + # + # Approach # 2 + # + # using collect() to fetch all elements of an RDD + # and write to kafka + # + + for instance_usage_row in instance_usage_df.collect(): + instance_usage_dict = InsertComponent\ + ._get_instance_usage_pre_hourly(instance_usage_row, metric_id) + DummyAdapter.send_metric(instance_usage_dict) + return instance_usage_df diff --git a/tests/unit/processor/test_pre_hourly_processor_agg.py b/tests/unit/processor/test_pre_hourly_processor_agg.py index 25d7126..b380e5b 100644 --- a/tests/unit/processor/test_pre_hourly_processor_agg.py +++ b/tests/unit/processor/test_pre_hourly_processor_agg.py @@ -16,10 +16,10 @@ import unittest from pyspark.streaming.kafka import OffsetRange -from monasca_transform.component.insert.dummy_insert import DummyInsert from monasca_transform.config.config_initializer import ConfigInitializer from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor +from tests.unit.component.insert.dummy_insert import DummyInsert from tests.unit.messaging.adapter import DummyAdapter from tests.unit.spark_context_test import SparkContextTest from tests.unit.test_resources.metrics_pre_hourly_data.data_provider \ diff --git a/tests/unit/test_resources/fetch_quantity_data_second_stage/__init__.py b/tests/unit/test_resources/fetch_quantity_data_second_stage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_resources/fetch_quantity_data_second_stage/data_provider.py b/tests/unit/test_resources/fetch_quantity_data_second_stage/data_provider.py new file mode 100644 index 0000000..c8b2832 --- /dev/null +++ b/tests/unit/test_resources/fetch_quantity_data_second_stage/data_provider.py @@ -0,0 +1,24 @@ +# Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + + +class DataProvider(object): + + _resource_path = 'tests/unit/test_resources/'\ + 'fetch_quantity_data_second_stage/' + + fetch_quantity_data_path = os.path.join( + _resource_path, "fetch_quantity_data_second_stage.txt") diff --git a/tests/unit/test_resources/fetch_quantity_data_second_stage/fetch_quantity_data_second_stage.txt b/tests/unit/test_resources/fetch_quantity_data_second_stage/fetch_quantity_data_second_stage.txt new file mode 100644 index 0000000..bedeca1 --- /dev/null +++ b/tests/unit/test_resources/fetch_quantity_data_second_stage/fetch_quantity_data_second_stage.txt @@ -0,0 +1,6 @@ +{'usage_hour': '16', 'geolocation': 'all', 'record_count': 4.0, 'resource_uuid': 'all', 'host': 'mini-mon', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 16:50:00', 'aggregated_metric_name': 'mem.total_mb_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 16:40:00', 'processing_meta': {'metric_id': 'mem_total_all'}, 'firstrecord_timestamp_unix': 1453308000.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1453308600.0, 'quantity': 1234.0} +{'usage_hour': '16', 'geolocation': 'all', 'record_count': 7.0, 'resource_uuid': 'all', 'host': 'mini-mon', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 17:00:00', 'aggregated_metric_name': 'mem.total_mb_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 16:50:00', 'processing_meta': {'metric_id': 'mem_total_all'}, 'firstrecord_timestamp_unix': 1453308600.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1453309200.0, 'quantity': 1213.0} +{'usage_hour': '16', 'geolocation': 'all', 'record_count': 8.0, 'resource_uuid': 'all', 'host': 'mini-mon', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 17:10:00', 'aggregated_metric_name': 'mem.total_mb_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 17:00:00', 'processing_meta': {'metric_id': 'mem_total_all'}, 'firstrecord_timestamp_unix': 1453309200.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1453309800.0, 'quantity': 1314.0} +{'usage_hour': '16', 'geolocation': 'all', 'record_count': 5.0, 'resource_uuid': 'all', 'host': 'mini-mon', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 17:20:00', 'aggregated_metric_name': 'mem.total_mb_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 17:10:00', 'processing_meta': {'metric_id': 'mem_total_all'}, 'firstrecord_timestamp_unix': 1453309800.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1453310400.0, 'quantity': 2318.0} +{'usage_hour': '16', 'geolocation': 'all', 'record_count': 9.0, 'resource_uuid': 'all', 'host': 'mini-mon', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 17:30:00', 'aggregated_metric_name': 'mem.total_mb_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 17:20:00', 'processing_meta': {'metric_id': 'mem_total_all'}, 'firstrecord_timestamp_unix': 1453310400.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1453311000.0, 'quantity': 1218.0} +{'usage_hour': '16', 'geolocation': 'all', 'record_count': 6.0, 'resource_uuid': 'all', 'host': 'mini-mon', 'aggregation_period': 'prehourly', 'usage_minute': 'all', 'service_group': 'all', 'lastrecord_timestamp_string': '2016-01-20 17:40:00', 'aggregated_metric_name': 'mem.total_mb_agg', 'user_id': 'all', 'zone': 'all', 'tenant_id': 'all', 'region': 'all', 'usage_date': '2016-01-20', 'firstrecord_timestamp_string': '2016-01-20 17:30:00', 'processing_meta': {'metric_id': 'mem_total_all'}, 'firstrecord_timestamp_unix': 1453311000.0, 'service_id': 'all', 'project_id': 'all', 'lastrecord_timestamp_unix': 1453311600.0, 'quantity': 1382.0} diff --git a/tests/unit/test_resources/mock_component_manager.py b/tests/unit/test_resources/mock_component_manager.py index a9ef787..43040ad 100644 --- a/tests/unit/test_resources/mock_component_manager.py +++ b/tests/unit/test_resources/mock_component_manager.py @@ -16,9 +16,7 @@ from stevedore.extension import Extension from stevedore.extension import ExtensionManager -from monasca_transform.component.insert.dummy_insert import DummyInsert from monasca_transform.component.insert.prepare_data import PrepareData - from monasca_transform.component.setter.rollup_quantity \ import RollupQuantity from monasca_transform.component.setter.set_aggregated_metric_name \ @@ -31,6 +29,9 @@ from monasca_transform.component.usage.fetch_quantity \ import FetchQuantity from monasca_transform.component.usage.fetch_quantity_util \ import FetchQuantityUtil +from tests.unit.component.insert.dummy_insert import DummyInsert +from tests.unit.component.insert.dummy_insert_pre_hourly \ + import DummyInsertPreHourly class MockComponentManager(object): @@ -88,13 +89,33 @@ class MockComponentManager(object): PrepareData(), None), Extension('insert_data', - 'monasca_transform.component.insert.dummy_insert:' + 'tests.unit.component.insert.dummy_insert:' 'DummyInsert', DummyInsert(), None), Extension('insert_data_pre_hourly', - 'monasca_transform.component.insert.dummy_insert:' + 'tests.unit.component.insert.dummy_insert:' 'DummyInsert', DummyInsert(), None), ]) + + @staticmethod + def get_insert_pre_hourly_cmpt_mgr(): + return ExtensionManager.make_test_instance([Extension( + 'prepare_data', + 'monasca_transform.component.insert.prepare_data:PrepareData', + PrepareData(), + None), + Extension('insert_data', + 'tests.unit.component.insert.dummy_insert:' + 'DummyInsert', + DummyInsert(), + None), + Extension('insert_data_pre_hourly', + 'tests.unit.component.insert.' + 'dummy_insert_pre_hourly:' + 'DummyInsertPreHourly', + DummyInsertPreHourly(), + None), + ]) diff --git a/tests/unit/usage/__init__.py b/tests/unit/usage/__init__.py index e69de29..021ea05 100644 --- a/tests/unit/usage/__init__.py +++ b/tests/unit/usage/__init__.py @@ -0,0 +1,5 @@ +import json + + +def dump_as_ascii_string(dict_obj): + return json.dumps(dict_obj, ensure_ascii=True) diff --git a/tests/unit/usage/test_fetch_quantity_instance_usage_agg.py b/tests/unit/usage/test_fetch_quantity_instance_usage_agg.py new file mode 100644 index 0000000..954b7ab --- /dev/null +++ b/tests/unit/usage/test_fetch_quantity_instance_usage_agg.py @@ -0,0 +1,985 @@ +# Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import json +import mock +import unittest + +from oslo_config import cfg +from pyspark.sql import SQLContext +from pyspark.streaming.kafka import OffsetRange + +from monasca_transform.config.config_initializer import ConfigInitializer +from monasca_transform.driver.mon_metrics_kafka \ + import MonMetricsKafkaProcessor +from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor +from monasca_transform.transform import RddTransformContext +from monasca_transform.transform import TransformContextUtils + +from tests.unit.component.insert.dummy_insert import DummyInsert +from tests.unit.messaging.adapter import DummyAdapter +from tests.unit.spark_context_test import SparkContextTest +from tests.unit.test_resources.fetch_quantity_data.data_provider \ + import DataProvider +from tests.unit.test_resources.fetch_quantity_data_second_stage.data_provider \ + import DataProvider as SecondStageDataProvider +from tests.unit.test_resources.mock_component_manager \ + import MockComponentManager +from tests.unit.test_resources.mock_data_driven_specs_repo \ + import MockDataDrivenSpecsRepo +from tests.unit.usage import dump_as_ascii_string + + +class TestFetchQuantityInstanceUsageAgg(SparkContextTest): + + def setUp(self): + super(TestFetchQuantityInstanceUsageAgg, self).setUp() + # configure the system with a dummy messaging adapter + ConfigInitializer.basic_config( + default_config_files=[ + 'tests/unit/test_resources/config/' + 'test_config_with_dummy_messaging_adapter.conf']) + # reset metric_id list dummy adapter + if not DummyAdapter.adapter_impl: + DummyAdapter.init() + DummyAdapter.adapter_impl.metric_list = [] + + def get_pre_transform_specs_json(self): + """get pre_transform_specs driver table info.""" + pre_transform_specs_json = """ + {"event_processing_params":{"set_default_zone_to":"1", + "set_default_geolocation_to":"1", + "set_default_region_to":"W"}, + "event_type":"mem.total_mb", + "metric_id_list":["mem_total_all"], + "required_raw_fields_list":["creation_time"], + "service_id":"host_metrics"}""" + return [json.loads(pre_transform_specs_json)] + + def get_transform_specs_json_by_operation(self, + usage_fetch_operation): + """get transform_specs driver table info.""" + transform_specs_json = """ + {"aggregation_params_map":{ + "aggregation_pipeline":{"source":"streaming", + "usage":"fetch_quantity", + "setters":["rollup_quantity", + "set_aggregated_metric_name", + "set_aggregated_period"], + "insert":["prepare_data", + "insert_data_pre_hourly"]}, + "aggregated_metric_name": "mem.total_mb_agg", + "aggregation_period": "hourly", + "aggregation_group_by_list": ["host", "metric_id"], + "usage_fetch_operation": "%s", + "setter_rollup_group_by_list": ["host"], + "setter_rollup_operation": "sum", + "pre_hourly_operation":"%s", + "pre_hourly_group_by_list":["default"], + "dimension_list":["aggregation_period", + "host", + "project_id"] + }, + "metric_group":"mem_total_all", + "metric_id":"mem_total_all"}""" + transform_specs_json_operation = \ + transform_specs_json % (usage_fetch_operation, + usage_fetch_operation) + return [json.loads(transform_specs_json_operation)] + + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', + DummyInsert) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.' + 'builder.generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_fetch_quantity_max(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # test operation + test_operation = "max" + + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_pre_hourly_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo(self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_by_operation( + test_operation)) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(DataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + + rdd_monasca = self.spark_context.parallelize(raw_tuple_list) + + # decorate mocked RDD with dummy kafka offsets + myOffsetRanges = [ + OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges() + + transform_context = TransformContextUtils.get_context( + offset_info=myOffsetRanges, + batch_time_info=self.get_dummy_batch_time()) + rdd_monasca_with_offsets = rdd_monasca.map( + lambda x: RddTransformContext(x, transform_context)) + + # Call the primary method in mon_metrics_kafka + MonMetricsKafkaProcessor.rdd_to_recordstore( + rdd_monasca_with_offsets) + + # get the metrics that have been submitted to the dummy message adapter + instance_usage_list = DummyAdapter.adapter_impl.metric_list + instance_usage_list = map(dump_as_ascii_string, + instance_usage_list) + DummyAdapter.adapter_impl.metric_list = [] + instance_usage_rdd = self.spark_context.parallelize( + instance_usage_list) + sql_context = SQLContext(self.spark_context) + instance_usage_df = sql_context.read.json(instance_usage_rdd) + PreHourlyProcessor.do_transform(instance_usage_df) + + metrics = DummyAdapter.adapter_impl.metric_list + mem_total_mb_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'mem.total_mb_agg' and + value.get('metric').get('dimensions').get('host') == + 'mini-mon'][0] + + self.assertTrue(mem_total_mb_agg_metric is not None) + + self.assertEqual('mem.total_mb_agg', + mem_total_mb_agg_metric + .get('metric').get('name')) + + self.assertEqual(8192.0, + mem_total_mb_agg_metric + .get('metric').get('value')) + self.assertEqual('useast', + mem_total_mb_agg_metric + .get('meta').get('region')) + + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + mem_total_mb_agg_metric + .get('meta').get('tenantId')) + self.assertEqual('mini-mon', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('host')) + self.assertEqual('all', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('project_id')) + self.assertEqual('hourly', + mem_total_mb_agg_metric + .get('metric').get('dimensions') + .get('aggregation_period')) + self.assertEqual(4.0, + mem_total_mb_agg_metric + .get('metric').get('value_meta').get('record_count')) + self.assertEqual('2016-01-20 16:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual('2016-01-20 16:40:46', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('lastrecord_timestamp_string')) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', + DummyInsert) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_fetch_quantity_min(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # test operation + test_operation = "min" + + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_pre_hourly_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo(self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_by_operation( + test_operation)) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(DataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + + rdd_monasca = self.spark_context.parallelize(raw_tuple_list) + + # decorate mocked RDD with dummy kafka offsets + myOffsetRanges = [ + OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges() + + transform_context = TransformContextUtils.get_context( + offset_info=myOffsetRanges, + batch_time_info=self.get_dummy_batch_time()) + + rdd_monasca_with_offsets = rdd_monasca.map( + lambda x: RddTransformContext(x, transform_context)) + + # Call the primary method in mon_metrics_kafka + MonMetricsKafkaProcessor.rdd_to_recordstore( + rdd_monasca_with_offsets) + + # get the metrics that have been submitted to the dummy message adapter + instance_usage_list = DummyAdapter.adapter_impl.metric_list + instance_usage_list = map(dump_as_ascii_string, + instance_usage_list) + DummyAdapter.adapter_impl.metric_list = [] + instance_usage_rdd = self.spark_context.parallelize( + instance_usage_list) + sql_context = SQLContext(self.spark_context) + instance_usage_df = sql_context.read.json(instance_usage_rdd) + PreHourlyProcessor.do_transform(instance_usage_df) + + metrics = DummyAdapter.adapter_impl.metric_list + mem_total_mb_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'mem.total_mb_agg' and + value.get('metric').get('dimensions').get('host') == + 'mini-mon'][0] + + self.assertTrue(mem_total_mb_agg_metric is not None) + + self.assertEqual('mem.total_mb_agg', + mem_total_mb_agg_metric + .get('metric').get('name')) + + self.assertEqual(1024.0, + mem_total_mb_agg_metric + .get('metric').get('value')) + self.assertEqual('useast', + mem_total_mb_agg_metric + .get('meta').get('region')) + + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + mem_total_mb_agg_metric + .get('meta').get('tenantId')) + self.assertEqual('mini-mon', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('host')) + self.assertEqual('all', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('project_id')) + self.assertEqual('hourly', + mem_total_mb_agg_metric + .get('metric').get('dimensions') + .get('aggregation_period')) + self.assertEqual(4.0, + mem_total_mb_agg_metric + .get('metric').get('value_meta').get('record_count')) + self.assertEqual('2016-01-20 16:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual('2016-01-20 16:40:46', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('lastrecord_timestamp_string')) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', + DummyInsert) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_fetch_quantity_avg(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # test operation + test_operation = "avg" + + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_pre_hourly_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo(self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_by_operation( + test_operation)) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(DataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + + rdd_monasca = self.spark_context.parallelize(raw_tuple_list) + + # decorate mocked RDD with dummy kafka offsets + myOffsetRanges = [ + OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges() + + transform_context = TransformContextUtils.get_context( + offset_info=myOffsetRanges, + batch_time_info=self.get_dummy_batch_time()) + + rdd_monasca_with_offsets = rdd_monasca.map( + lambda x: RddTransformContext(x, transform_context)) + + # Call the primary method in mon_metrics_kafka + MonMetricsKafkaProcessor.rdd_to_recordstore( + rdd_monasca_with_offsets) + + # get the metrics that have been submitted to the dummy message adapter + instance_usage_list = DummyAdapter.adapter_impl.metric_list + instance_usage_list = map(dump_as_ascii_string, + instance_usage_list) + DummyAdapter.adapter_impl.metric_list = [] + instance_usage_rdd = self.spark_context.parallelize( + instance_usage_list) + sql_context = SQLContext(self.spark_context) + instance_usage_df = sql_context.read.json(instance_usage_rdd) + PreHourlyProcessor.do_transform(instance_usage_df) + + metrics = DummyAdapter.adapter_impl.metric_list + mem_total_mb_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'mem.total_mb_agg' and + value.get('metric').get('dimensions').get('host') == + 'mini-mon'][0] + + self.assertTrue(mem_total_mb_agg_metric is not None) + + self.assertEqual('mem.total_mb_agg', + mem_total_mb_agg_metric + .get('metric').get('name')) + + self.assertEqual(3840.0, + mem_total_mb_agg_metric + .get('metric').get('value')) + self.assertEqual('useast', + mem_total_mb_agg_metric + .get('meta').get('region')) + + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + mem_total_mb_agg_metric + .get('meta').get('tenantId')) + self.assertEqual('mini-mon', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('host')) + self.assertEqual('all', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('project_id')) + self.assertEqual('hourly', + mem_total_mb_agg_metric + .get('metric').get('dimensions') + .get('aggregation_period')) + self.assertEqual(4.0, + mem_total_mb_agg_metric + .get('metric').get('value_meta').get('record_count')) + self.assertEqual('2016-01-20 16:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual('2016-01-20 16:40:46', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('lastrecord_timestamp_string')) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', + DummyInsert) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_fetch_quantity_sum(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # test operation + test_operation = "sum" + + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_pre_hourly_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo(self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_by_operation( + test_operation)) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(DataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + + rdd_monasca = self.spark_context.parallelize(raw_tuple_list) + + # decorate mocked RDD with dummy kafka offsets + myOffsetRanges = [ + OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges() + + transform_context = TransformContextUtils.get_context( + offset_info=myOffsetRanges, + batch_time_info=self.get_dummy_batch_time()) + + rdd_monasca_with_offsets = rdd_monasca.map( + lambda x: RddTransformContext(x, transform_context)) + + # Call the primary method in mon_metrics_kafka + MonMetricsKafkaProcessor.rdd_to_recordstore( + rdd_monasca_with_offsets) + + # get the metrics that have been submitted to the dummy message adapter + instance_usage_list = DummyAdapter.adapter_impl.metric_list + instance_usage_list = map(dump_as_ascii_string, + instance_usage_list) + DummyAdapter.adapter_impl.metric_list = [] + instance_usage_rdd = self.spark_context.parallelize( + instance_usage_list) + sql_context = SQLContext(self.spark_context) + instance_usage_df = sql_context.read.json(instance_usage_rdd) + PreHourlyProcessor.do_transform(instance_usage_df) + + metrics = DummyAdapter.adapter_impl.metric_list + mem_total_mb_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'mem.total_mb_agg' and + value.get('metric').get('dimensions').get('host') == + 'mini-mon'][0] + + self.assertTrue(mem_total_mb_agg_metric is not None) + + self.assertEqual('mem.total_mb_agg', + mem_total_mb_agg_metric + .get('metric').get('name')) + + self.assertEqual(15360.0, + mem_total_mb_agg_metric + .get('metric').get('value')) + self.assertEqual('useast', + mem_total_mb_agg_metric + .get('meta').get('region')) + + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + mem_total_mb_agg_metric + .get('meta').get('tenantId')) + self.assertEqual('mini-mon', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('host')) + self.assertEqual('all', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('project_id')) + self.assertEqual('hourly', + mem_total_mb_agg_metric + .get('metric').get('dimensions') + .get('aggregation_period')) + self.assertEqual(4.0, + mem_total_mb_agg_metric + .get('metric').get('value_meta').get('record_count')) + self.assertEqual('2016-01-20 16:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual('2016-01-20 16:40:46', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('lastrecord_timestamp_string')) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', + DummyInsert) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_fetch_quantity_max_second_stage(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # test operation + test_operation = "max" + + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_pre_hourly_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo(self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_by_operation( + test_operation)) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(SecondStageDataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + instance_usage_list = map(dump_as_ascii_string, + raw_tuple_list) + + # get the instance usage that have been + # submitted to the dummy message adapter + # create a json RDD from instance_usage_list + instance_usage_rdd = self.spark_context.parallelize( + instance_usage_list) + + sql_context = SQLContext(self.spark_context) + instance_usage_df = sql_context.read.json( + instance_usage_rdd) + + # call pre hourly processor + PreHourlyProcessor.do_transform(instance_usage_df) + metrics = DummyAdapter.adapter_impl.metric_list + mem_total_mb_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'mem.total_mb_agg' and + value.get('metric').get('dimensions').get('host') == + 'mini-mon'][0] + self.assertTrue(mem_total_mb_agg_metric is not None) + self.assertEqual('mem.total_mb_agg', + mem_total_mb_agg_metric + .get('metric').get('name')) + + self.assertEqual(2318.0, + mem_total_mb_agg_metric + .get('metric').get('value')) + self.assertEqual('useast', + mem_total_mb_agg_metric + .get('meta').get('region')) + + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + mem_total_mb_agg_metric + .get('meta').get('tenantId')) + self.assertEqual('mini-mon', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('host')) + self.assertEqual('all', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('project_id')) + self.assertEqual('prehourly', + mem_total_mb_agg_metric + .get('metric').get('dimensions') + .get('aggregation_period')) + + self.assertEqual(39.0, + mem_total_mb_agg_metric + .get('metric').get('value_meta').get('record_count')) + self.assertEqual('2016-01-20 16:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual('2016-01-20 17:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('lastrecord_timestamp_string')) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', + DummyInsert) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_fetch_quantity_min_second_stage(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # test operation + test_operation = "min" + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_pre_hourly_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo(self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_by_operation( + test_operation)) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(SecondStageDataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + instance_usage_list = map(dump_as_ascii_string, + raw_tuple_list) + + # create a json RDD from instance_usage_list + instance_usage_rdd = self.spark_context.parallelize( + instance_usage_list) + + sql_context = SQLContext(self.spark_context) + instance_usage_df = sql_context.read.json( + instance_usage_rdd) + + PreHourlyProcessor.do_transform(instance_usage_df) + + metrics = DummyAdapter.adapter_impl.metric_list + mem_total_mb_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'mem.total_mb_agg' and + value.get('metric').get('dimensions').get('host') == + 'mini-mon'][0] + self.assertTrue(mem_total_mb_agg_metric is not None) + + self.assertEqual('mem.total_mb_agg', + mem_total_mb_agg_metric + .get('metric').get('name')) + + self.assertEqual(1213.0, + mem_total_mb_agg_metric + .get('metric').get('value')) + self.assertEqual('useast', + mem_total_mb_agg_metric + .get('meta').get('region')) + + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + mem_total_mb_agg_metric + .get('meta').get('tenantId')) + self.assertEqual('mini-mon', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('host')) + self.assertEqual('all', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('project_id')) + self.assertEqual('prehourly', + mem_total_mb_agg_metric + .get('metric').get('dimensions') + .get('aggregation_period')) + + self.assertEqual(39.0, + mem_total_mb_agg_metric + .get('metric').get('value_meta').get('record_count')) + self.assertEqual('2016-01-20 16:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual('2016-01-20 17:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('lastrecord_timestamp_string')) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', + DummyInsert) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_fetch_quantity_avg_second_stage(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # test operation + test_operation = "avg" + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_pre_hourly_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo(self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_by_operation( + test_operation)) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(SecondStageDataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + instance_usage_list = map(dump_as_ascii_string, + raw_tuple_list) + + # create a json RDD from instance_usage_list + instance_usage_rdd = self.spark_context.parallelize( + instance_usage_list) + + sql_context = SQLContext(self.spark_context) + instance_usage_df = sql_context.read.json( + instance_usage_rdd) + + # call pre hourly processor + PreHourlyProcessor.do_transform(instance_usage_df) + + metrics = DummyAdapter.adapter_impl.metric_list + mem_total_mb_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'mem.total_mb_agg' and + value.get('metric').get('dimensions').get('host') == + 'mini-mon'][0] + self.assertTrue(mem_total_mb_agg_metric is not None) + + self.assertEqual('mem.total_mb_agg', + mem_total_mb_agg_metric + .get('metric').get('name')) + + self.assertEqual(1446.5, + mem_total_mb_agg_metric + .get('metric').get('value')) + self.assertEqual('useast', + mem_total_mb_agg_metric + .get('meta').get('region')) + + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + mem_total_mb_agg_metric + .get('meta').get('tenantId')) + self.assertEqual('mini-mon', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('host')) + self.assertEqual('all', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('project_id')) + self.assertEqual('prehourly', + mem_total_mb_agg_metric + .get('metric').get('dimensions') + .get('aggregation_period')) + + self.assertEqual(39.0, + mem_total_mb_agg_metric + .get('metric').get('value_meta').get('record_count')) + self.assertEqual('2016-01-20 16:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual('2016-01-20 17:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('lastrecord_timestamp_string')) + + @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', + DummyInsert) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_fetch_quantity_sum_second_stage(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # test operation + test_operation = "sum" + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_pre_hourly_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo(self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_by_operation( + test_operation)) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(SecondStageDataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + instance_usage_list = map(dump_as_ascii_string, + raw_tuple_list) + + # create a json RDD from instance_usage_list + instance_usage_rdd = self.spark_context.parallelize( + instance_usage_list) + + sql_context = SQLContext(self.spark_context) + instance_usage_df = sql_context.read.json( + instance_usage_rdd) + + # call pre hourly processor + PreHourlyProcessor.do_transform(instance_usage_df) + + metrics = DummyAdapter.adapter_impl.metric_list + mem_total_mb_agg_metric = [ + value for value in metrics + if value.get('metric').get('name') == + 'mem.total_mb_agg' and + value.get('metric').get('dimensions').get('host') == + 'mini-mon'][0] + self.assertTrue(mem_total_mb_agg_metric is not None) + + self.assertEqual('mem.total_mb_agg', + mem_total_mb_agg_metric + .get('metric').get('name')) + + self.assertEqual(8679.0, + mem_total_mb_agg_metric + .get('metric').get('value')) + self.assertEqual('useast', + mem_total_mb_agg_metric + .get('meta').get('region')) + + self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id, + mem_total_mb_agg_metric + .get('meta').get('tenantId')) + self.assertEqual('mini-mon', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('host')) + self.assertEqual('all', + mem_total_mb_agg_metric + .get('metric').get('dimensions').get('project_id')) + self.assertEqual('prehourly', + mem_total_mb_agg_metric + .get('metric').get('dimensions') + .get('aggregation_period')) + + self.assertEqual(39.0, + mem_total_mb_agg_metric + .get('metric').get('value_meta').get('record_count')) + self.assertEqual('2016-01-20 16:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('firstrecord_timestamp_string')) + self.assertEqual('2016-01-20 17:40:00', + mem_total_mb_agg_metric + .get('metric').get('value_meta') + .get('lastrecord_timestamp_string')) + + +if __name__ == "__main__": + print("PATH *************************************************************") + import sys + print(sys.path) + print("PATH==============================================================") + unittest.main()