From 0ea79c0305a9835b155114f553202742fa4b9bae Mon Sep 17 00:00:00 2001 From: Flint Calvin Date: Tue, 20 Sep 2016 21:34:23 +0000 Subject: [PATCH] Added aggregation results to application log Made changes such that debug-level log entries are written to the application log noting which aggregated metrics are submitted during pre-hourly and hourly processing. Change-Id: I64c6a18233614fe680aa0b084570ee7885f316e5 --- .../monasca-transform/monasca-transform.conf | 4 ++ etc/monasca-transform.conf | 4 ++ .../config/config_initializer.py | 1 + monasca_transform/driver/mon_metrics_kafka.py | 32 +++++++++++---- .../processor/pre_hourly_processor.py | 41 ++++++++++++++----- .../test_resources/config/test_config.conf | 3 +- ...t_config_with_dummy_messaging_adapter.conf | 3 +- 7 files changed, 69 insertions(+), 19 deletions(-) diff --git a/devstack/files/monasca-transform/monasca-transform.conf b/devstack/files/monasca-transform/monasca-transform.conf index 749d1b5..c17e613 100644 --- a/devstack/files/monasca-transform/monasca-transform.conf +++ b/devstack/files/monasca-transform/monasca-transform.conf @@ -43,6 +43,10 @@ coordinator_group = monasca-transform # queries (in seconds) election_polling_frequency = 15 +# Whether debug-level log entries should be included in the application +# log. If this setting is false, info-level will be used for logging. +enable_debug_log_entries = true + # The path for the monasca-transform Spark driver spark_driver = /opt/monasca/transform/lib/driver.py diff --git a/etc/monasca-transform.conf b/etc/monasca-transform.conf index 72a2084..de13e3e 100644 --- a/etc/monasca-transform.conf +++ b/etc/monasca-transform.conf @@ -43,6 +43,10 @@ coordinator_group = monasca-transform # queries (in seconds) election_polling_frequency = 15 +# Whether debug-level log entries should be included in the application +# log. If this setting is false, info-level will be used for logging. +enable_debug_log_entries = true + # The path for the setup file to be executed setup_file = /opt/stack/monasca-transform/setup.py diff --git a/monasca_transform/config/config_initializer.py b/monasca_transform/config/config_initializer.py index 328c1ab..b181bd9 100644 --- a/monasca_transform/config/config_initializer.py +++ b/monasca_transform/config/config_initializer.py @@ -100,6 +100,7 @@ class ConfigInitializer(object): cfg.StrOpt('coordinator_address'), cfg.StrOpt('coordinator_group'), cfg.FloatOpt('election_polling_frequency'), + cfg.BoolOpt('enable_debug_log_entries', default='false'), cfg.StrOpt('setup_file'), cfg.StrOpt('setup_target'), cfg.StrOpt('spark_driver'), diff --git a/monasca_transform/driver/mon_metrics_kafka.py b/monasca_transform/driver/mon_metrics_kafka.py index 916b7c1..919c6a6 100644 --- a/monasca_transform/driver/mon_metrics_kafka.py +++ b/monasca_transform/driver/mon_metrics_kafka.py @@ -60,12 +60,14 @@ ConfigInitializer.basic_config() log = logging.getLogger(__name__) _h = logging.FileHandler('%s/%s' % ( cfg.CONF.service.service_log_path, - cfg.CONF.service.service_log_filename) -) + cfg.CONF.service.service_log_filename)) _h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:" "%(lineno)s - %(levelname)s - %(message)s'")) log.addHandler(_h) -log.setLevel(logging.DEBUG) +if cfg.CONF.service.enable_debug_log_entries: + log.setLevel(logging.DEBUG) +else: + log.setLevel(logging.INFO) class MonMetricsKafkaProcessor(object): @@ -248,8 +250,8 @@ class MonMetricsKafkaProcessor(object): """ # call processing chain - GenericTransformBuilder.do_transform(transform_context, - record_store_df) + return GenericTransformBuilder.do_transform( + transform_context, record_store_df) @staticmethod def process_metrics(transform_context, record_store_df): @@ -283,8 +285,24 @@ class MonMetricsKafkaProcessor(object): transform_spec_df_info=transform_spec_df) try: - MonMetricsKafkaProcessor.process_metric( - transform_context, source_record_store_df) + agg_inst_usage_df = ( + MonMetricsKafkaProcessor.process_metric( + transform_context, source_record_store_df)) + + # if running in debug mode, write out the aggregated metric + # name just processed (along with the count of how many of + # these were aggregated) to the application log. + if log.isEnabledFor(logging.DEBUG): + agg_inst_usage_collection = agg_inst_usage_df.collect() + collection_len = len(agg_inst_usage_collection) + if collection_len > 0: + agg_inst_usage_dict = ( + agg_inst_usage_collection[0].asDict()) + log.debug("Submitted pre-hourly aggregated metric: " + "%s (%s)", + agg_inst_usage_dict[ + "aggregated_metric_name"], + str(collection_len)) except FetchQuantityException: raise except FetchQuantityUtilException: diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index 60a3d1b..5b0a54e 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -27,6 +27,7 @@ from monasca_transform.component.insert.kafka_insert import KafkaInsert from monasca_transform.component.setter.pre_hourly_calculate_rate import \ PreHourlyCalculateRate from monasca_transform.component.setter.rollup_quantity import RollupQuantity +from monasca_transform.config.config_initializer import ConfigInitializer from monasca_transform.data_driven_specs.data_driven_specs_repo \ import DataDrivenSpecsRepo from monasca_transform.data_driven_specs.data_driven_specs_repo \ @@ -38,7 +39,20 @@ from monasca_transform.transform.storage_utils import StorageUtils from monasca_transform.transform.transform_utils import InstanceUsageUtils from monasca_transform.transform import TransformContextUtils -LOG = logging.getLogger(__name__) +ConfigInitializer.basic_config() + +# initialize logger +log = logging.getLogger(__name__) +_h = logging.FileHandler('%s/%s' % ( + cfg.CONF.service.service_log_path, + cfg.CONF.service.service_log_filename)) +_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:" + "%(lineno)s - %(levelname)s - %(message)s'")) +log.addHandler(_h) +if cfg.CONF.service.enable_debug_log_entries: + log.setLevel(logging.DEBUG) +else: + log.setLevel(logging.INFO) class PreHourlyProcessor(Processor): @@ -46,10 +60,6 @@ class PreHourlyProcessor(Processor): and publish final rolled up metrics to metrics topic in kafka. """ - @staticmethod - def log_debug(message): - LOG.debug(message) - @staticmethod def save_kafka_offsets(current_offsets, batch_time_info): @@ -60,7 +70,7 @@ class PreHourlyProcessor(Processor): app_name = PreHourlyProcessor.get_app_name() for o in current_offsets: - PreHourlyProcessor.log_debug( + log.debug( "saving: OffSetRanges: %s %s %s %s, " "batch_time_info: %s" % ( o.topic, o.partition, o.fromOffset, o.untilOffset, @@ -254,10 +264,9 @@ class PreHourlyProcessor(Processor): # get kafka topic to fetch data topic = PreHourlyProcessor.get_kafka_topic() - offset_range_list = [] if len(saved_offset_spec) < 1: - PreHourlyProcessor.log_debug( + log.debug( "No saved offsets available..." "connecting to kafka and fetching " "from earliest available offset ...") @@ -266,7 +275,7 @@ class PreHourlyProcessor(Processor): cfg.CONF.messaging.brokers, topic) else: - PreHourlyProcessor.log_debug( + log.debug( "Saved offsets available..." "connecting to kafka and fetching from saved offset ...") @@ -432,9 +441,21 @@ class PreHourlyProcessor(Processor): transform_context = TransformContextUtils.get_context( transform_spec_df_info=transform_spec_df) - PreHourlyProcessor.process_instance_usage( + agg_inst_usage_df = PreHourlyProcessor.process_instance_usage( transform_context, source_instance_usage_df) + # if running in debug mode, write out the aggregated metric + # name just processed (along with the count of how many of these + # were aggregated) to the application log. + if log.isEnabledFor(logging.DEBUG): + agg_inst_usage_collection = agg_inst_usage_df.collect() + collection_len = len(agg_inst_usage_collection) + if collection_len > 0: + agg_inst_usage_dict = agg_inst_usage_collection[0].asDict() + log.debug("Submitted hourly aggregated metric: %s (%s)", + agg_inst_usage_dict["aggregated_metric_name"], + str(collection_len)) + @staticmethod def run_processor(spark_context, processing_time): """process data in metrics_pre_hourly queue, starting diff --git a/tests/unit/test_resources/config/test_config.conf b/tests/unit/test_resources/config/test_config.conf index f1c0f37..9b91f3f 100644 --- a/tests/unit/test_resources/config/test_config.conf +++ b/tests/unit/test_resources/config/test_config.conf @@ -23,4 +23,5 @@ enable_batch_time_filtering = True [service] enable_record_store_df_cache = False -record_store_df_cache_storage_level = MEMORY_ONLY_SER_2 \ No newline at end of file +record_store_df_cache_storage_level = MEMORY_ONLY_SER_2 +enable_debug_log_entries = true diff --git a/tests/unit/test_resources/config/test_config_with_dummy_messaging_adapter.conf b/tests/unit/test_resources/config/test_config_with_dummy_messaging_adapter.conf index 610eca6..74b19b9 100644 --- a/tests/unit/test_resources/config/test_config_with_dummy_messaging_adapter.conf +++ b/tests/unit/test_resources/config/test_config_with_dummy_messaging_adapter.conf @@ -13,4 +13,5 @@ enable_batch_time_filtering = True [service] enable_record_store_df_cache = False -record_store_df_cache_storage_level = MEMORY_ONLY_SER_2 \ No newline at end of file +record_store_df_cache_storage_level = MEMORY_ONLY_SER_2 +enable_debug_log_entries = true