Added aggregation results to application log
Made changes such that debug-level log entries are written to the application log noting which aggregated metrics are submitted during pre-hourly and hourly processing. Change-Id: I64c6a18233614fe680aa0b084570ee7885f316e5
This commit is contained in:
parent
87a8960467
commit
0ea79c0305
|
@ -43,6 +43,10 @@ coordinator_group = monasca-transform
|
|||
# queries (in seconds)
|
||||
election_polling_frequency = 15
|
||||
|
||||
# Whether debug-level log entries should be included in the application
|
||||
# log. If this setting is false, info-level will be used for logging.
|
||||
enable_debug_log_entries = true
|
||||
|
||||
# The path for the monasca-transform Spark driver
|
||||
spark_driver = /opt/monasca/transform/lib/driver.py
|
||||
|
||||
|
|
|
@ -43,6 +43,10 @@ coordinator_group = monasca-transform
|
|||
# queries (in seconds)
|
||||
election_polling_frequency = 15
|
||||
|
||||
# Whether debug-level log entries should be included in the application
|
||||
# log. If this setting is false, info-level will be used for logging.
|
||||
enable_debug_log_entries = true
|
||||
|
||||
# The path for the setup file to be executed
|
||||
setup_file = /opt/stack/monasca-transform/setup.py
|
||||
|
||||
|
|
|
@ -100,6 +100,7 @@ class ConfigInitializer(object):
|
|||
cfg.StrOpt('coordinator_address'),
|
||||
cfg.StrOpt('coordinator_group'),
|
||||
cfg.FloatOpt('election_polling_frequency'),
|
||||
cfg.BoolOpt('enable_debug_log_entries', default='false'),
|
||||
cfg.StrOpt('setup_file'),
|
||||
cfg.StrOpt('setup_target'),
|
||||
cfg.StrOpt('spark_driver'),
|
||||
|
|
|
@ -60,12 +60,14 @@ ConfigInitializer.basic_config()
|
|||
log = logging.getLogger(__name__)
|
||||
_h = logging.FileHandler('%s/%s' % (
|
||||
cfg.CONF.service.service_log_path,
|
||||
cfg.CONF.service.service_log_filename)
|
||||
)
|
||||
cfg.CONF.service.service_log_filename))
|
||||
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
|
||||
"%(lineno)s - %(levelname)s - %(message)s'"))
|
||||
log.addHandler(_h)
|
||||
log.setLevel(logging.DEBUG)
|
||||
if cfg.CONF.service.enable_debug_log_entries:
|
||||
log.setLevel(logging.DEBUG)
|
||||
else:
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
|
||||
class MonMetricsKafkaProcessor(object):
|
||||
|
@ -248,8 +250,8 @@ class MonMetricsKafkaProcessor(object):
|
|||
"""
|
||||
|
||||
# call processing chain
|
||||
GenericTransformBuilder.do_transform(transform_context,
|
||||
record_store_df)
|
||||
return GenericTransformBuilder.do_transform(
|
||||
transform_context, record_store_df)
|
||||
|
||||
@staticmethod
|
||||
def process_metrics(transform_context, record_store_df):
|
||||
|
@ -283,8 +285,24 @@ class MonMetricsKafkaProcessor(object):
|
|||
transform_spec_df_info=transform_spec_df)
|
||||
|
||||
try:
|
||||
MonMetricsKafkaProcessor.process_metric(
|
||||
transform_context, source_record_store_df)
|
||||
agg_inst_usage_df = (
|
||||
MonMetricsKafkaProcessor.process_metric(
|
||||
transform_context, source_record_store_df))
|
||||
|
||||
# if running in debug mode, write out the aggregated metric
|
||||
# name just processed (along with the count of how many of
|
||||
# these were aggregated) to the application log.
|
||||
if log.isEnabledFor(logging.DEBUG):
|
||||
agg_inst_usage_collection = agg_inst_usage_df.collect()
|
||||
collection_len = len(agg_inst_usage_collection)
|
||||
if collection_len > 0:
|
||||
agg_inst_usage_dict = (
|
||||
agg_inst_usage_collection[0].asDict())
|
||||
log.debug("Submitted pre-hourly aggregated metric: "
|
||||
"%s (%s)",
|
||||
agg_inst_usage_dict[
|
||||
"aggregated_metric_name"],
|
||||
str(collection_len))
|
||||
except FetchQuantityException:
|
||||
raise
|
||||
except FetchQuantityUtilException:
|
||||
|
|
|
@ -27,6 +27,7 @@ from monasca_transform.component.insert.kafka_insert import KafkaInsert
|
|||
from monasca_transform.component.setter.pre_hourly_calculate_rate import \
|
||||
PreHourlyCalculateRate
|
||||
from monasca_transform.component.setter.rollup_quantity import RollupQuantity
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
||||
import DataDrivenSpecsRepo
|
||||
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
||||
|
@ -38,7 +39,20 @@ from monasca_transform.transform.storage_utils import StorageUtils
|
|||
from monasca_transform.transform.transform_utils import InstanceUsageUtils
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
ConfigInitializer.basic_config()
|
||||
|
||||
# initialize logger
|
||||
log = logging.getLogger(__name__)
|
||||
_h = logging.FileHandler('%s/%s' % (
|
||||
cfg.CONF.service.service_log_path,
|
||||
cfg.CONF.service.service_log_filename))
|
||||
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
|
||||
"%(lineno)s - %(levelname)s - %(message)s'"))
|
||||
log.addHandler(_h)
|
||||
if cfg.CONF.service.enable_debug_log_entries:
|
||||
log.setLevel(logging.DEBUG)
|
||||
else:
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
|
||||
class PreHourlyProcessor(Processor):
|
||||
|
@ -46,10 +60,6 @@ class PreHourlyProcessor(Processor):
|
|||
and publish final rolled up metrics to metrics topic in kafka.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def log_debug(message):
|
||||
LOG.debug(message)
|
||||
|
||||
@staticmethod
|
||||
def save_kafka_offsets(current_offsets,
|
||||
batch_time_info):
|
||||
|
@ -60,7 +70,7 @@ class PreHourlyProcessor(Processor):
|
|||
app_name = PreHourlyProcessor.get_app_name()
|
||||
|
||||
for o in current_offsets:
|
||||
PreHourlyProcessor.log_debug(
|
||||
log.debug(
|
||||
"saving: OffSetRanges: %s %s %s %s, "
|
||||
"batch_time_info: %s" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset,
|
||||
|
@ -254,10 +264,9 @@ class PreHourlyProcessor(Processor):
|
|||
# get kafka topic to fetch data
|
||||
topic = PreHourlyProcessor.get_kafka_topic()
|
||||
|
||||
offset_range_list = []
|
||||
if len(saved_offset_spec) < 1:
|
||||
|
||||
PreHourlyProcessor.log_debug(
|
||||
log.debug(
|
||||
"No saved offsets available..."
|
||||
"connecting to kafka and fetching "
|
||||
"from earliest available offset ...")
|
||||
|
@ -266,7 +275,7 @@ class PreHourlyProcessor(Processor):
|
|||
cfg.CONF.messaging.brokers,
|
||||
topic)
|
||||
else:
|
||||
PreHourlyProcessor.log_debug(
|
||||
log.debug(
|
||||
"Saved offsets available..."
|
||||
"connecting to kafka and fetching from saved offset ...")
|
||||
|
||||
|
@ -432,9 +441,21 @@ class PreHourlyProcessor(Processor):
|
|||
transform_context = TransformContextUtils.get_context(
|
||||
transform_spec_df_info=transform_spec_df)
|
||||
|
||||
PreHourlyProcessor.process_instance_usage(
|
||||
agg_inst_usage_df = PreHourlyProcessor.process_instance_usage(
|
||||
transform_context, source_instance_usage_df)
|
||||
|
||||
# if running in debug mode, write out the aggregated metric
|
||||
# name just processed (along with the count of how many of these
|
||||
# were aggregated) to the application log.
|
||||
if log.isEnabledFor(logging.DEBUG):
|
||||
agg_inst_usage_collection = agg_inst_usage_df.collect()
|
||||
collection_len = len(agg_inst_usage_collection)
|
||||
if collection_len > 0:
|
||||
agg_inst_usage_dict = agg_inst_usage_collection[0].asDict()
|
||||
log.debug("Submitted hourly aggregated metric: %s (%s)",
|
||||
agg_inst_usage_dict["aggregated_metric_name"],
|
||||
str(collection_len))
|
||||
|
||||
@staticmethod
|
||||
def run_processor(spark_context, processing_time):
|
||||
"""process data in metrics_pre_hourly queue, starting
|
||||
|
|
|
@ -23,4 +23,5 @@ enable_batch_time_filtering = True
|
|||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_debug_log_entries = true
|
||||
|
|
|
@ -13,4 +13,5 @@ enable_batch_time_filtering = True
|
|||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_debug_log_entries = true
|
||||
|
|
Loading…
Reference in New Issue