diff --git a/monasca_transform/component/insert/__init__.py b/monasca_transform/component/insert/__init__.py index 291cab2..407dc7d 100644 --- a/monasca_transform/component/insert/__init__.py +++ b/monasca_transform/component/insert/__init__.py @@ -11,14 +11,20 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - import abc +import json import time +from monasca_common.validation import metrics as metric_validator from monasca_transform.component import Component +from monasca_transform.config.config_initializer import ConfigInitializer +from monasca_transform.log_utils import LogUtils from oslo_config import cfg +ConfigInitializer.basic_config() +log = LogUtils.init_logger(__name__) + class InsertComponent(Component): @@ -33,6 +39,20 @@ class InsertComponent(Component): def get_component_type(): return Component.INSERT_COMPONENT_TYPE + @staticmethod + def _validate_metric(metric): + """validate monasca metric. + """ + try: + # validate metric part, without the wrapper + metric_validator.validate(metric["metric"]) + except Exception as e: + log.info("Metric %s is invalid: Exception : %s" + % (json.dumps(metric), + e.message)) + return False + return True + @staticmethod def _prepare_metric(instance_usage_dict, agg_params): """transform instance usage rdd to a monasca metric. diff --git a/monasca_transform/component/insert/kafka_insert.py b/monasca_transform/component/insert/kafka_insert.py index b4019a5..819c1f5 100644 --- a/monasca_transform/component/insert/kafka_insert.py +++ b/monasca_transform/component/insert/kafka_insert.py @@ -57,10 +57,11 @@ class KafkaInsert(InsertComponent): # Approach # 2 # using collect() to fetch all elements of an RDD and write to # kafka - # for instance_usage_row in instance_usage_df.collect(): metric = InsertComponent._get_metric( instance_usage_row, agg_params) - KafkaMessageAdapter.send_metric(metric) + # validate metric part + if InsertComponent._validate_metric(metric): + KafkaMessageAdapter.send_metric(metric) return instance_usage_df diff --git a/monasca_transform/driver/mon_metrics_kafka.py b/monasca_transform/driver/mon_metrics_kafka.py index 919c6a6..06e8eee 100644 --- a/monasca_transform/driver/mon_metrics_kafka.py +++ b/monasca_transform/driver/mon_metrics_kafka.py @@ -36,6 +36,7 @@ from monasca_transform.component.usage.fetch_quantity import \ from monasca_transform.component.usage.fetch_quantity_util import \ FetchQuantityUtilException from monasca_transform.config.config_initializer import ConfigInitializer +from monasca_transform.log_utils import LogUtils from monasca_transform.transform.builder.generic_transform_builder \ import GenericTransformBuilder @@ -55,19 +56,7 @@ from monasca_transform.transform.transform_utils import MonMetricUtils from monasca_transform.transform import TransformContextUtils ConfigInitializer.basic_config() - -# initialize logger -log = logging.getLogger(__name__) -_h = logging.FileHandler('%s/%s' % ( - cfg.CONF.service.service_log_path, - cfg.CONF.service.service_log_filename)) -_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:" - "%(lineno)s - %(levelname)s - %(message)s'")) -log.addHandler(_h) -if cfg.CONF.service.enable_debug_log_entries: - log.setLevel(logging.DEBUG) -else: - log.setLevel(logging.INFO) +log = LogUtils.init_logger(__name__) class MonMetricsKafkaProcessor(object): diff --git a/monasca_transform/log_utils.py b/monasca_transform/log_utils.py index b8c833b..c2842ca 100644 --- a/monasca_transform/log_utils.py +++ b/monasca_transform/log_utils.py @@ -13,6 +13,7 @@ # under the License. import logging +from oslo_config import cfg class LogUtils(object): @@ -31,3 +32,22 @@ class LogUtils(object): debugstr = "\n".join((debugstr, "type: %s" % (type(obj)))) debugstr = "\n".join((debugstr, "dir: %s" % (dir(obj)), sep)) LogUtils.log_debug(debugstr) + + @staticmethod + def init_logger(logger_name): + + # initialize logger + log = logging.getLogger(logger_name) + _h = logging.FileHandler('%s/%s' % ( + cfg.CONF.service.service_log_path, + cfg.CONF.service.service_log_filename)) + _h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:" + "%(lineno)s - %(levelname)s" + " - %(message)s'")) + log.addHandler(_h) + if cfg.CONF.service.enable_debug_log_entries: + log.setLevel(logging.DEBUG) + else: + log.setLevel(logging.INFO) + + return log diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index 5b0a54e..5f244cb 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -32,6 +32,7 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \ import DataDrivenSpecsRepo from monasca_transform.data_driven_specs.data_driven_specs_repo \ import DataDrivenSpecsRepoFactory +from monasca_transform.log_utils import LogUtils from monasca_transform.processor import Processor from monasca_transform.transform.storage_utils import \ InvalidCacheStorageLevelException @@ -40,19 +41,7 @@ from monasca_transform.transform.transform_utils import InstanceUsageUtils from monasca_transform.transform import TransformContextUtils ConfigInitializer.basic_config() - -# initialize logger -log = logging.getLogger(__name__) -_h = logging.FileHandler('%s/%s' % ( - cfg.CONF.service.service_log_path, - cfg.CONF.service.service_log_filename)) -_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:" - "%(lineno)s - %(levelname)s - %(message)s'")) -log.addHandler(_h) -if cfg.CONF.service.enable_debug_log_entries: - log.setLevel(logging.DEBUG) -else: - log.setLevel(logging.INFO) +log = LogUtils.init_logger(__name__) class PreHourlyProcessor(Processor): diff --git a/requirements.txt b/requirements.txt index 818f1fc..836c2a2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,3 @@ oslo.config>=1.2.1 oslo.log oslo.service tooz - diff --git a/test-requirements.txt b/test-requirements.txt index 0d892db..36cc77b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,6 @@ # mock object framework hacking>=0.10.2 -flake8>=2.2.4 +flake8>=2.2.4,<3.0.0 nose==1.3.0 mock>=1.0.1 tox diff --git a/tests/unit/component/insert/dummy_insert.py b/tests/unit/component/insert/dummy_insert.py index c51dfa3..af3c8fe 100644 --- a/tests/unit/component/insert/dummy_insert.py +++ b/tests/unit/component/insert/dummy_insert.py @@ -65,5 +65,7 @@ class DummyInsert(InsertComponent): for instance_usage_row in instance_usage_df.collect(): metric = InsertComponent._get_metric(instance_usage_row, agg_params) - DummyAdapter.send_metric(metric) + # validate metric part + if InsertComponent._validate_metric(metric): + DummyAdapter.send_metric(metric) return instance_usage_df diff --git a/tests/unit/usage/test_fetch_quantity_agg.py b/tests/unit/usage/test_fetch_quantity_agg.py index ff2052d..1eb3150 100644 --- a/tests/unit/usage/test_fetch_quantity_agg.py +++ b/tests/unit/usage/test_fetch_quantity_agg.py @@ -91,6 +91,31 @@ class TestFetchQuantityAgg(SparkContextTest): return [json.loads(transform_specs_json_operation)] + def get_transform_specs_json_invalid_name(self): + """get transform_specs driver table info.""" + transform_specs_json = """ + {"aggregation_params_map":{ + "aggregation_pipeline":{"source":"streaming", + "usage":"fetch_quantity", + "setters":["rollup_quantity", + "set_aggregated_metric_name", + "set_aggregated_period"], + "insert":["prepare_data", + "insert_data"]}, + "aggregated_metric_name": "&invalidmetricname", + "aggregation_period": "hourly", + "aggregation_group_by_list": ["host", "metric_id"], + "usage_fetch_operation": "sum", + "setter_rollup_group_by_list": ["host"], + "setter_rollup_operation": "sum", + "dimension_list":["aggregation_period", + "host", + "project_id"] + }, + "metric_group":"mem_total_all", + "metric_id":"mem_total_all"}""" + return [json.loads(transform_specs_json)] + def get_invalid_filter_transform_specs_json(self, field_to_filter, filter_expression, @@ -1020,6 +1045,67 @@ class TestFetchQuantityAgg(SparkContextTest): self.assertTrue("Encountered invalid filter details:" in e.value) self.assertTrue("filter operation = invalid." in e.value) + @mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.' + 'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_insert_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_setter_component_manager') + @mock.patch('monasca_transform.transform.builder.' + 'generic_transform_builder.GenericTransformBuilder.' + '_get_usage_component_manager') + def test_invalid_aggregated_metric_name(self, + usage_manager, + setter_manager, + insert_manager, + data_driven_specs_repo): + + # load components + usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr() + setter_manager.return_value = \ + MockComponentManager.get_setter_cmpt_mgr() + insert_manager.return_value = \ + MockComponentManager.get_insert_cmpt_mgr() + + # init mock driver tables + data_driven_specs_repo.return_value = \ + MockDataDrivenSpecsRepo( + self.spark_context, + self.get_pre_transform_specs_json(), + self.get_transform_specs_json_invalid_name()) + + # Create an emulated set of Kafka messages (these were gathered + # by extracting Monasca messages from the Metrics queue on mini-mon). + + # Create an RDD out of the mocked Monasca metrics + with open(DataProvider.fetch_quantity_data_path) as f: + raw_lines = f.read().splitlines() + raw_tuple_list = [eval(raw_line) for raw_line in raw_lines] + + rdd_monasca = self.spark_context.parallelize(raw_tuple_list) + + # decorate mocked RDD with dummy kafka offsets + myOffsetRanges = [ + OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges() + + transform_context = TransformContextUtils.get_context( + offset_info=myOffsetRanges, + batch_time_info=self.get_dummy_batch_time()) + + rdd_monasca_with_offsets = rdd_monasca.map( + lambda x: RddTransformContext(x, transform_context)) + + # Call the primary method in mon_metrics_kafka + MonMetricsKafkaProcessor.rdd_to_recordstore( + rdd_monasca_with_offsets) + + # get the metrics that have been submitted to the dummy message adapter + metrics = DummyAdapter.adapter_impl.metric_list + + # metrics should be empty + self.assertFalse(metrics) if __name__ == "__main__": print("PATH *************************************************************")