Added aggregation results to application log

Made changes such that debug-level log entries are written to
the application log noting which aggregated metrics are submitted
during pre-hourly and hourly processing.

Change-Id: I64c6a18233614fe680aa0b084570ee7885f316e5
This commit is contained in:
Flint Calvin 2016-09-20 21:34:23 +00:00
parent 87a8960467
commit 0ea79c0305
7 changed files with 69 additions and 19 deletions

View File

@ -43,6 +43,10 @@ coordinator_group = monasca-transform
# queries (in seconds)
election_polling_frequency = 15
# Whether debug-level log entries should be included in the application
# log. If this setting is false, info-level will be used for logging.
enable_debug_log_entries = true
# The path for the monasca-transform Spark driver
spark_driver = /opt/monasca/transform/lib/driver.py

View File

@ -43,6 +43,10 @@ coordinator_group = monasca-transform
# queries (in seconds)
election_polling_frequency = 15
# Whether debug-level log entries should be included in the application
# log. If this setting is false, info-level will be used for logging.
enable_debug_log_entries = true
# The path for the setup file to be executed
setup_file = /opt/stack/monasca-transform/setup.py

View File

@ -100,6 +100,7 @@ class ConfigInitializer(object):
cfg.StrOpt('coordinator_address'),
cfg.StrOpt('coordinator_group'),
cfg.FloatOpt('election_polling_frequency'),
cfg.BoolOpt('enable_debug_log_entries', default='false'),
cfg.StrOpt('setup_file'),
cfg.StrOpt('setup_target'),
cfg.StrOpt('spark_driver'),

View File

@ -60,12 +60,14 @@ ConfigInitializer.basic_config()
log = logging.getLogger(__name__)
_h = logging.FileHandler('%s/%s' % (
cfg.CONF.service.service_log_path,
cfg.CONF.service.service_log_filename)
)
cfg.CONF.service.service_log_filename))
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s - %(message)s'"))
log.addHandler(_h)
log.setLevel(logging.DEBUG)
if cfg.CONF.service.enable_debug_log_entries:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
class MonMetricsKafkaProcessor(object):
@ -248,8 +250,8 @@ class MonMetricsKafkaProcessor(object):
"""
# call processing chain
GenericTransformBuilder.do_transform(transform_context,
record_store_df)
return GenericTransformBuilder.do_transform(
transform_context, record_store_df)
@staticmethod
def process_metrics(transform_context, record_store_df):
@ -283,8 +285,24 @@ class MonMetricsKafkaProcessor(object):
transform_spec_df_info=transform_spec_df)
try:
MonMetricsKafkaProcessor.process_metric(
transform_context, source_record_store_df)
agg_inst_usage_df = (
MonMetricsKafkaProcessor.process_metric(
transform_context, source_record_store_df))
# if running in debug mode, write out the aggregated metric
# name just processed (along with the count of how many of
# these were aggregated) to the application log.
if log.isEnabledFor(logging.DEBUG):
agg_inst_usage_collection = agg_inst_usage_df.collect()
collection_len = len(agg_inst_usage_collection)
if collection_len > 0:
agg_inst_usage_dict = (
agg_inst_usage_collection[0].asDict())
log.debug("Submitted pre-hourly aggregated metric: "
"%s (%s)",
agg_inst_usage_dict[
"aggregated_metric_name"],
str(collection_len))
except FetchQuantityException:
raise
except FetchQuantityUtilException:

View File

@ -27,6 +27,7 @@ from monasca_transform.component.insert.kafka_insert import KafkaInsert
from monasca_transform.component.setter.pre_hourly_calculate_rate import \
PreHourlyCalculateRate
from monasca_transform.component.setter.rollup_quantity import RollupQuantity
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepo
from monasca_transform.data_driven_specs.data_driven_specs_repo \
@ -38,7 +39,20 @@ from monasca_transform.transform.storage_utils import StorageUtils
from monasca_transform.transform.transform_utils import InstanceUsageUtils
from monasca_transform.transform import TransformContextUtils
LOG = logging.getLogger(__name__)
ConfigInitializer.basic_config()
# initialize logger
log = logging.getLogger(__name__)
_h = logging.FileHandler('%s/%s' % (
cfg.CONF.service.service_log_path,
cfg.CONF.service.service_log_filename))
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s - %(message)s'"))
log.addHandler(_h)
if cfg.CONF.service.enable_debug_log_entries:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
class PreHourlyProcessor(Processor):
@ -46,10 +60,6 @@ class PreHourlyProcessor(Processor):
and publish final rolled up metrics to metrics topic in kafka.
"""
@staticmethod
def log_debug(message):
LOG.debug(message)
@staticmethod
def save_kafka_offsets(current_offsets,
batch_time_info):
@ -60,7 +70,7 @@ class PreHourlyProcessor(Processor):
app_name = PreHourlyProcessor.get_app_name()
for o in current_offsets:
PreHourlyProcessor.log_debug(
log.debug(
"saving: OffSetRanges: %s %s %s %s, "
"batch_time_info: %s" % (
o.topic, o.partition, o.fromOffset, o.untilOffset,
@ -254,10 +264,9 @@ class PreHourlyProcessor(Processor):
# get kafka topic to fetch data
topic = PreHourlyProcessor.get_kafka_topic()
offset_range_list = []
if len(saved_offset_spec) < 1:
PreHourlyProcessor.log_debug(
log.debug(
"No saved offsets available..."
"connecting to kafka and fetching "
"from earliest available offset ...")
@ -266,7 +275,7 @@ class PreHourlyProcessor(Processor):
cfg.CONF.messaging.brokers,
topic)
else:
PreHourlyProcessor.log_debug(
log.debug(
"Saved offsets available..."
"connecting to kafka and fetching from saved offset ...")
@ -432,9 +441,21 @@ class PreHourlyProcessor(Processor):
transform_context = TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
PreHourlyProcessor.process_instance_usage(
agg_inst_usage_df = PreHourlyProcessor.process_instance_usage(
transform_context, source_instance_usage_df)
# if running in debug mode, write out the aggregated metric
# name just processed (along with the count of how many of these
# were aggregated) to the application log.
if log.isEnabledFor(logging.DEBUG):
agg_inst_usage_collection = agg_inst_usage_df.collect()
collection_len = len(agg_inst_usage_collection)
if collection_len > 0:
agg_inst_usage_dict = agg_inst_usage_collection[0].asDict()
log.debug("Submitted hourly aggregated metric: %s (%s)",
agg_inst_usage_dict["aggregated_metric_name"],
str(collection_len))
@staticmethod
def run_processor(spark_context, processing_time):
"""process data in metrics_pre_hourly queue, starting

View File

@ -23,4 +23,5 @@ enable_batch_time_filtering = True
[service]
enable_record_store_df_cache = False
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_debug_log_entries = true

View File

@ -13,4 +13,5 @@ enable_batch_time_filtering = True
[service]
enable_record_store_df_cache = False
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
enable_debug_log_entries = true