Send log_messages metric as bulk
Using bulk metrics for the log counters reduces largely the likelihood of blocking the Heka pipeline. Instead of injecting (x services * y levels) metric messages, the filter injects only one big message. This changes also updates the configuration of the metric_collector service to deserialize the bulk metric to support alarms on log counters. Change-Id: Icb71fd6faa4191795c0470ecc24aeafd25794f42 Closes-Bug: #1643280
This commit is contained in:
parent
81962e63d4
commit
72fe1f64fe
|
@ -74,7 +74,7 @@ function add_to_bulk_metric(name, value, tags)
|
|||
end
|
||||
|
||||
-- Send the bulk metric message to the Heka pipeline
|
||||
function inject_bulk_metric(ts, hostname, source)
|
||||
function inject_bulk_metric(ts, hostname, logger, source, m_type)
|
||||
if #bulk_datapoints == 0 then
|
||||
return
|
||||
end
|
||||
|
@ -89,20 +89,23 @@ function inject_bulk_metric(ts, hostname, source)
|
|||
end
|
||||
|
||||
local msg = {
|
||||
Logger = logger,
|
||||
Hostname = hostname,
|
||||
Timestamp = ts,
|
||||
Payload = payload,
|
||||
Type = 'bulk_metric', -- prepended with 'heka.sandbox'
|
||||
Severity = label_to_severity_map.INFO,
|
||||
Fields = {
|
||||
source = source
|
||||
hostname = hostname,
|
||||
source = source,
|
||||
type = m_type or metric_type['GAUGE']
|
||||
}
|
||||
}
|
||||
-- reset the local table storing the datapoints
|
||||
bulk_datapoints = {}
|
||||
|
||||
inject_tags(msg)
|
||||
safe_inject_message(msg)
|
||||
return safe_inject_message(msg)
|
||||
end
|
||||
|
||||
-- Encode a Lua variable as JSON without raising an exception if the encoding
|
||||
|
@ -303,4 +306,9 @@ function get_values_from_metric()
|
|||
return true, value
|
||||
end
|
||||
|
||||
-- convert a nanosecond value to second
|
||||
function convert_to_sec(ns)
|
||||
return math.floor(ns/1e9)
|
||||
end
|
||||
|
||||
return M
|
||||
|
|
|
@ -31,19 +31,17 @@ end
|
|||
-- grace_interval parameter allows to take into account log messages that are
|
||||
-- received in the current interval but emitted before it.
|
||||
local grace_interval = (read_config('grace_interval') or 0) + 0
|
||||
local metric_logger = read_config('logger')
|
||||
local metric_source = read_config('source')
|
||||
|
||||
local error_counters = {}
|
||||
local enter_at
|
||||
local start_time = os.time()
|
||||
|
||||
local function convert_to_sec(ns)
|
||||
return math.floor(ns/1e9)
|
||||
end
|
||||
|
||||
function process_message ()
|
||||
-- timestamp values should be converted to seconds because log timestamps
|
||||
-- have a precision of one second (or millisecond sometimes)
|
||||
if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(enter_at or 0), start_time) then
|
||||
if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(utils.convert_to_sec(enter_at or 0), start_time) then
|
||||
-- skip the log message if it doesn't fall into the current interval
|
||||
return 0
|
||||
end
|
||||
|
@ -78,18 +76,15 @@ function timer_event(ns)
|
|||
|
||||
local delta_sec = (ns - (enter_at or 0)) / 1e9
|
||||
for dev, value in pairs(error_counters) do
|
||||
-- Don`t send values at the first ticker interval
|
||||
-- Don`t send values from first ticker interval
|
||||
if enter_at ~= nil then
|
||||
utils.add_to_bulk_metric(
|
||||
"hdd_errors_rate",
|
||||
value / delta_sec,
|
||||
{device=dev, hostname=hostname})
|
||||
utils.add_to_bulk_metric("hdd_errors_rate", value / delta_sec, {device=dev})
|
||||
end
|
||||
error_counters[dev] = 0
|
||||
end
|
||||
|
||||
enter_at = ns
|
||||
utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
|
||||
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
|
||||
|
||||
return 0
|
||||
end
|
||||
|
|
|
@ -16,7 +16,7 @@ require 'string'
|
|||
require 'math'
|
||||
local utils = require 'lma_utils'
|
||||
|
||||
function process_table(typ, array, tags)
|
||||
function process_table(typ, array)
|
||||
-- NOTE: It has been written for "filters" and "decoders". If we need to
|
||||
-- use it to collect metrics from other components of the Heka pipeline,
|
||||
-- we need to ensure that JSON provides names and table with
|
||||
|
@ -41,7 +41,10 @@ function process_table(typ, array, tags)
|
|||
-- strip off the '_decoder'/'_filter' suffix
|
||||
local name = v['Name']:gsub("_" .. typ, "")
|
||||
|
||||
tags['name'] = name
|
||||
local tags = {
|
||||
['type'] = typ,
|
||||
['name'] = name,
|
||||
}
|
||||
|
||||
utils.add_to_bulk_metric('hekad_msg_count', v.ProcessMessageCount.value, tags)
|
||||
utils.add_to_bulk_metric('hekad_msg_avg_duration', v.ProcessMessageAvgDuration.value, tags)
|
||||
|
@ -73,11 +76,10 @@ function process_message ()
|
|||
|
||||
for k, v in pairs(data) do
|
||||
if k == "filters" or k == "decoders" then
|
||||
local typ = singularize(k)
|
||||
process_table(typ, v, {hostname=hostname, ['type']=typ})
|
||||
process_table(singularize(k), v)
|
||||
end
|
||||
end
|
||||
|
||||
utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
|
||||
utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
|
||||
return 0
|
||||
end
|
||||
|
|
|
@ -34,11 +34,12 @@ local percentile_thresh = (read_config('percentile') or 90) + 0
|
|||
-- and also to compensate the delay introduced by log parsing/decoding
|
||||
-- which leads to arrive too late in its interval.
|
||||
local grace_time = (read_config('grace_time') or 0) + 0
|
||||
local metric_logger = read_config('logger')
|
||||
local metric_source = read_config('source')
|
||||
|
||||
local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
|
||||
|
||||
local percentile_field_name = string.format('upper_%s', percentile_thresh)
|
||||
local msg_source = 'http_metric_filter'
|
||||
local last_tick = os.time() * 1e9
|
||||
local interval_in_ns = interval * 1e9
|
||||
|
||||
|
@ -84,7 +85,7 @@ function process_message ()
|
|||
|
||||
-- keep only the first 2 tokens because some services like Neutron report
|
||||
-- themselves as 'openstack.<service>.server'
|
||||
local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
|
||||
local service = string.gsub(logger, '(%w+)%.(%w+).*', '%1_%2')
|
||||
if service == nil then
|
||||
return -1, "Cannot match any service from " .. logger
|
||||
end
|
||||
|
@ -166,7 +167,7 @@ function timer_event(ns)
|
|||
num_metrics = num_metrics - 1
|
||||
if num >= bulk_size then
|
||||
if msg_injected < max_timer_inject then
|
||||
utils.inject_bulk_metric(ns, hostname, msg_source)
|
||||
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
|
||||
msg_injected = msg_injected + 1
|
||||
num = 0
|
||||
num_metrics = 0
|
||||
|
@ -178,7 +179,7 @@ function timer_event(ns)
|
|||
all_times[service] = nil
|
||||
end
|
||||
if num > 0 then
|
||||
utils.inject_bulk_metric(ns, hostname, msg_source)
|
||||
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
|
||||
num = 0
|
||||
num_metrics = 0
|
||||
end
|
||||
|
|
|
@ -18,41 +18,33 @@ require 'string'
|
|||
local utils = require 'lma_utils'
|
||||
|
||||
local hostname = read_config('hostname') or error('hostname must be specified')
|
||||
local interval = (read_config('interval') or error('interval must be specified')) + 0
|
||||
-- Heka cannot guarantee that logs are processed in real-time so the
|
||||
-- grace_interval parameter allows to take into account log messages that are
|
||||
-- received in the current interval but emitted before it.
|
||||
-- The filter can receive messages that should be discarded because they are
|
||||
-- way too old (Heka cannot guarantee that logs are processed in real-time).
|
||||
-- The 'grace_interval' parameter allows to define which log messages should be
|
||||
-- kept and which should be discarded. For instance, a value of '10' means that
|
||||
-- the filter will take into account log messages that are at most 10 seconds
|
||||
-- older than the current time.
|
||||
local grace_interval = (read_config('grace_interval') or 0) + 0
|
||||
local logger_matcher = read_config('logger_matcher') or '.*'
|
||||
local metric_logger = read_config('logger')
|
||||
local metric_source = read_config('source')
|
||||
|
||||
local discovered_services = {}
|
||||
local logs_counters = {}
|
||||
local last_timer_events = {}
|
||||
local current_service = 1
|
||||
local enter_at
|
||||
local interval_in_ns = interval * 1e9
|
||||
local start_time = os.time()
|
||||
local msg = {
|
||||
Type = "metric",
|
||||
Timestamp = nil,
|
||||
Severity = 6,
|
||||
}
|
||||
|
||||
function convert_to_sec(ns)
|
||||
return math.floor(ns/1e9)
|
||||
end
|
||||
local last_timer_event = os.time() * 1e9
|
||||
|
||||
function process_message ()
|
||||
local severity = read_message("Fields[severity_label]")
|
||||
local logger = read_message("Logger")
|
||||
|
||||
local service = string.match(logger, "^openstack%.(%a+)$")
|
||||
local service = string.match(logger, logger_matcher)
|
||||
if service == nil then
|
||||
return -1, "Cannot match any services from " .. logger
|
||||
return -1, "Cannot match any service from " .. logger
|
||||
end
|
||||
|
||||
-- timestamp values should be converted to seconds because log timestamps
|
||||
-- have a precision of one second (or millisecond sometimes)
|
||||
if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
|
||||
if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < utils.convert_to_sec(last_timer_event) then
|
||||
-- skip the the log message if it doesn't fall into the current interval
|
||||
return 0
|
||||
end
|
||||
|
@ -67,67 +59,29 @@ function process_message ()
|
|||
end
|
||||
|
||||
logs_counters[service][severity] = logs_counters[service][severity] + 1
|
||||
|
||||
return 0
|
||||
end
|
||||
|
||||
function timer_event(ns)
|
||||
|
||||
-- We can only send a maximum of ten events per call.
|
||||
-- So we send all metrics about one service and we will proceed with
|
||||
-- the following services at the next ticker event.
|
||||
|
||||
if #discovered_services == 0 then
|
||||
return 0
|
||||
end
|
||||
|
||||
-- Initialize enter_at during the first call to timer_event
|
||||
if not enter_at then
|
||||
enter_at = ns
|
||||
end
|
||||
|
||||
-- To be able to send a metric we need to check if we are within the
|
||||
-- interval specified in the configuration and if we haven't already sent
|
||||
-- all metrics.
|
||||
if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
|
||||
local service_name = discovered_services[current_service]
|
||||
local last_timer_event = last_timer_events[service_name] or 0
|
||||
for service, counters in pairs(logs_counters) do
|
||||
local delta_sec = (ns - last_timer_event) / 1e9
|
||||
|
||||
for level, val in pairs(logs_counters[service_name]) do
|
||||
|
||||
-- We don't send the first value
|
||||
if last_timer_event ~= 0 and delta_sec ~= 0 then
|
||||
msg.Timestamp = ns
|
||||
msg.Fields = {
|
||||
name = 'log_messages',
|
||||
type = utils.metric_type['DERIVE'],
|
||||
value = val / delta_sec,
|
||||
service = service_name,
|
||||
level = string.lower(level),
|
||||
hostname = hostname,
|
||||
tag_fields = {'service', 'level', 'hostname'},
|
||||
}
|
||||
|
||||
utils.inject_tags(msg)
|
||||
ok, err = utils.safe_inject_message(msg)
|
||||
if ok ~= 0 then
|
||||
return -1, err
|
||||
end
|
||||
end
|
||||
|
||||
-- reset the counter
|
||||
logs_counters[service_name][level] = 0
|
||||
for level, val in pairs(counters) do
|
||||
utils.add_to_bulk_metric(
|
||||
'log_messages',
|
||||
val / delta_sec,
|
||||
{hostname=hostname, service=service, level=string.lower(level)})
|
||||
|
||||
-- reset the counter
|
||||
counters[level] = 0
|
||||
end
|
||||
|
||||
last_timer_events[service_name] = ns
|
||||
current_service = current_service + 1
|
||||
end
|
||||
|
||||
if ns - enter_at >= interval_in_ns then
|
||||
enter_at = ns
|
||||
current_service = 1
|
||||
last_timer_event = ns
|
||||
|
||||
ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
|
||||
if ok ~= 0 then
|
||||
return -1, err
|
||||
end
|
||||
|
||||
return 0
|
||||
|
|
|
@ -65,7 +65,8 @@ define lma_collector::heka (
|
|||
config_dir => $config_dir,
|
||||
filename => "${lma_collector::params::plugins_dir}/decoders/metric.lua",
|
||||
module_directory => $lua_modules_dir,
|
||||
config => {'deserialize_bulk_metric_for_loggers' => 'aggregated_http_metrics_filter hdd_errors_counter_filter'},
|
||||
config => {
|
||||
'deserialize_bulk_metric_for_loggers' => 'aggregated_http_metrics_filter hdd_errors_counter_filter log_counter_filter'},
|
||||
notify => Class[$service_class],
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,8 @@ class lma_collector::logs::aggregated_http_metrics (
|
|||
bulk_size => $bulk_size,
|
||||
percentile => $percentile,
|
||||
grace_time => $grace_time,
|
||||
logger => 'aggregated_http_metrics_filter',
|
||||
source => 'log_collector',
|
||||
},
|
||||
module_directory => $lua_modules_dir,
|
||||
notify => Class['lma_collector::service::log'],
|
||||
|
|
|
@ -26,12 +26,15 @@ class lma_collector::logs::counter (
|
|||
config_dir => $lma_collector::params::log_config_dir,
|
||||
filename => "${lma_collector::params::plugins_dir}/filters/logs_counter.lua",
|
||||
message_matcher => 'Type == \'log\' && Logger =~ /^openstack\\./',
|
||||
ticker_interval => 1,
|
||||
ticker_interval => 60,
|
||||
preserve_data => true,
|
||||
config => {
|
||||
interval => $interval,
|
||||
hostname => $hostname,
|
||||
grace_interval => $grace_interval,
|
||||
logger_matcher => '^openstack%.(%a+)$',
|
||||
logger => 'log_counter_filter',
|
||||
source => 'log_collector',
|
||||
},
|
||||
module_directory => $lua_modules_dir,
|
||||
notify => Class['lma_collector::service::log'],
|
||||
|
|
|
@ -30,7 +30,9 @@ class lma_collector::logs::hdd_errors_counter (
|
|||
config => {
|
||||
hostname => $hostname,
|
||||
grace_interval => $grace_interval,
|
||||
patterns => '/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/'
|
||||
patterns => '/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/',
|
||||
logger => 'hdd_errors_counter_filter',
|
||||
source => 'log_collector',
|
||||
},
|
||||
module_directory => $lua_modules_dir,
|
||||
notify => Class['lma_collector::service::log'],
|
||||
|
|
|
@ -91,6 +91,12 @@ TestLmaUtils = {}
|
|||
assertEquals(ret, 'foo<BR/>ba')
|
||||
end
|
||||
|
||||
function TestLmaUtils:test_convert_to_sec()
|
||||
assertEquals(lma_utils.convert_to_sec(1000000001), 1)
|
||||
assertEquals(lma_utils.convert_to_sec(1999999999), 1)
|
||||
assertEquals(lma_utils.convert_to_sec(2000000001), 2)
|
||||
end
|
||||
|
||||
lu = LuaUnit
|
||||
lu:setVerbosity( 1 )
|
||||
os.exit( lu:run() )
|
||||
|
|
Loading…
Reference in New Issue