Send log_messages metric as bulk
Using bulk metrics for the log counters reduces largely the likelihood of blocking the Heka pipeline. Instead of injecting (x services * y levels) metric messages, the filter injects only one big message. This changes also updates the configuration of the metric_collector service to deserialize the bulk metric to support alarms on log counters. Change-Id: Icb71fd6faa4191795c0470ecc24aeafd25794f42 Closes-Bug: #1643280
This commit is contained in:
parent
81962e63d4
commit
72fe1f64fe
|
@ -74,7 +74,7 @@ function add_to_bulk_metric(name, value, tags)
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Send the bulk metric message to the Heka pipeline
|
-- Send the bulk metric message to the Heka pipeline
|
||||||
function inject_bulk_metric(ts, hostname, source)
|
function inject_bulk_metric(ts, hostname, logger, source, m_type)
|
||||||
if #bulk_datapoints == 0 then
|
if #bulk_datapoints == 0 then
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
@ -89,20 +89,23 @@ function inject_bulk_metric(ts, hostname, source)
|
||||||
end
|
end
|
||||||
|
|
||||||
local msg = {
|
local msg = {
|
||||||
|
Logger = logger,
|
||||||
Hostname = hostname,
|
Hostname = hostname,
|
||||||
Timestamp = ts,
|
Timestamp = ts,
|
||||||
Payload = payload,
|
Payload = payload,
|
||||||
Type = 'bulk_metric', -- prepended with 'heka.sandbox'
|
Type = 'bulk_metric', -- prepended with 'heka.sandbox'
|
||||||
Severity = label_to_severity_map.INFO,
|
Severity = label_to_severity_map.INFO,
|
||||||
Fields = {
|
Fields = {
|
||||||
source = source
|
hostname = hostname,
|
||||||
|
source = source,
|
||||||
|
type = m_type or metric_type['GAUGE']
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
-- reset the local table storing the datapoints
|
-- reset the local table storing the datapoints
|
||||||
bulk_datapoints = {}
|
bulk_datapoints = {}
|
||||||
|
|
||||||
inject_tags(msg)
|
inject_tags(msg)
|
||||||
safe_inject_message(msg)
|
return safe_inject_message(msg)
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Encode a Lua variable as JSON without raising an exception if the encoding
|
-- Encode a Lua variable as JSON without raising an exception if the encoding
|
||||||
|
@ -303,4 +306,9 @@ function get_values_from_metric()
|
||||||
return true, value
|
return true, value
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-- convert a nanosecond value to second
|
||||||
|
function convert_to_sec(ns)
|
||||||
|
return math.floor(ns/1e9)
|
||||||
|
end
|
||||||
|
|
||||||
return M
|
return M
|
||||||
|
|
|
@ -31,19 +31,17 @@ end
|
||||||
-- grace_interval parameter allows to take into account log messages that are
|
-- grace_interval parameter allows to take into account log messages that are
|
||||||
-- received in the current interval but emitted before it.
|
-- received in the current interval but emitted before it.
|
||||||
local grace_interval = (read_config('grace_interval') or 0) + 0
|
local grace_interval = (read_config('grace_interval') or 0) + 0
|
||||||
|
local metric_logger = read_config('logger')
|
||||||
|
local metric_source = read_config('source')
|
||||||
|
|
||||||
local error_counters = {}
|
local error_counters = {}
|
||||||
local enter_at
|
local enter_at
|
||||||
local start_time = os.time()
|
local start_time = os.time()
|
||||||
|
|
||||||
local function convert_to_sec(ns)
|
|
||||||
return math.floor(ns/1e9)
|
|
||||||
end
|
|
||||||
|
|
||||||
function process_message ()
|
function process_message ()
|
||||||
-- timestamp values should be converted to seconds because log timestamps
|
-- timestamp values should be converted to seconds because log timestamps
|
||||||
-- have a precision of one second (or millisecond sometimes)
|
-- have a precision of one second (or millisecond sometimes)
|
||||||
if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(enter_at or 0), start_time) then
|
if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(utils.convert_to_sec(enter_at or 0), start_time) then
|
||||||
-- skip the log message if it doesn't fall into the current interval
|
-- skip the log message if it doesn't fall into the current interval
|
||||||
return 0
|
return 0
|
||||||
end
|
end
|
||||||
|
@ -78,18 +76,15 @@ function timer_event(ns)
|
||||||
|
|
||||||
local delta_sec = (ns - (enter_at or 0)) / 1e9
|
local delta_sec = (ns - (enter_at or 0)) / 1e9
|
||||||
for dev, value in pairs(error_counters) do
|
for dev, value in pairs(error_counters) do
|
||||||
-- Don`t send values at the first ticker interval
|
-- Don`t send values from first ticker interval
|
||||||
if enter_at ~= nil then
|
if enter_at ~= nil then
|
||||||
utils.add_to_bulk_metric(
|
utils.add_to_bulk_metric("hdd_errors_rate", value / delta_sec, {device=dev})
|
||||||
"hdd_errors_rate",
|
|
||||||
value / delta_sec,
|
|
||||||
{device=dev, hostname=hostname})
|
|
||||||
end
|
end
|
||||||
error_counters[dev] = 0
|
error_counters[dev] = 0
|
||||||
end
|
end
|
||||||
|
|
||||||
enter_at = ns
|
enter_at = ns
|
||||||
utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
|
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
end
|
end
|
||||||
|
|
|
@ -16,7 +16,7 @@ require 'string'
|
||||||
require 'math'
|
require 'math'
|
||||||
local utils = require 'lma_utils'
|
local utils = require 'lma_utils'
|
||||||
|
|
||||||
function process_table(typ, array, tags)
|
function process_table(typ, array)
|
||||||
-- NOTE: It has been written for "filters" and "decoders". If we need to
|
-- NOTE: It has been written for "filters" and "decoders". If we need to
|
||||||
-- use it to collect metrics from other components of the Heka pipeline,
|
-- use it to collect metrics from other components of the Heka pipeline,
|
||||||
-- we need to ensure that JSON provides names and table with
|
-- we need to ensure that JSON provides names and table with
|
||||||
|
@ -41,7 +41,10 @@ function process_table(typ, array, tags)
|
||||||
-- strip off the '_decoder'/'_filter' suffix
|
-- strip off the '_decoder'/'_filter' suffix
|
||||||
local name = v['Name']:gsub("_" .. typ, "")
|
local name = v['Name']:gsub("_" .. typ, "")
|
||||||
|
|
||||||
tags['name'] = name
|
local tags = {
|
||||||
|
['type'] = typ,
|
||||||
|
['name'] = name,
|
||||||
|
}
|
||||||
|
|
||||||
utils.add_to_bulk_metric('hekad_msg_count', v.ProcessMessageCount.value, tags)
|
utils.add_to_bulk_metric('hekad_msg_count', v.ProcessMessageCount.value, tags)
|
||||||
utils.add_to_bulk_metric('hekad_msg_avg_duration', v.ProcessMessageAvgDuration.value, tags)
|
utils.add_to_bulk_metric('hekad_msg_avg_duration', v.ProcessMessageAvgDuration.value, tags)
|
||||||
|
@ -73,11 +76,10 @@ function process_message ()
|
||||||
|
|
||||||
for k, v in pairs(data) do
|
for k, v in pairs(data) do
|
||||||
if k == "filters" or k == "decoders" then
|
if k == "filters" or k == "decoders" then
|
||||||
local typ = singularize(k)
|
process_table(singularize(k), v)
|
||||||
process_table(typ, v, {hostname=hostname, ['type']=typ})
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
|
utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
|
||||||
return 0
|
return 0
|
||||||
end
|
end
|
||||||
|
|
|
@ -34,11 +34,12 @@ local percentile_thresh = (read_config('percentile') or 90) + 0
|
||||||
-- and also to compensate the delay introduced by log parsing/decoding
|
-- and also to compensate the delay introduced by log parsing/decoding
|
||||||
-- which leads to arrive too late in its interval.
|
-- which leads to arrive too late in its interval.
|
||||||
local grace_time = (read_config('grace_time') or 0) + 0
|
local grace_time = (read_config('grace_time') or 0) + 0
|
||||||
|
local metric_logger = read_config('logger')
|
||||||
|
local metric_source = read_config('source')
|
||||||
|
|
||||||
local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
|
local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
|
||||||
|
|
||||||
local percentile_field_name = string.format('upper_%s', percentile_thresh)
|
local percentile_field_name = string.format('upper_%s', percentile_thresh)
|
||||||
local msg_source = 'http_metric_filter'
|
|
||||||
local last_tick = os.time() * 1e9
|
local last_tick = os.time() * 1e9
|
||||||
local interval_in_ns = interval * 1e9
|
local interval_in_ns = interval * 1e9
|
||||||
|
|
||||||
|
@ -84,7 +85,7 @@ function process_message ()
|
||||||
|
|
||||||
-- keep only the first 2 tokens because some services like Neutron report
|
-- keep only the first 2 tokens because some services like Neutron report
|
||||||
-- themselves as 'openstack.<service>.server'
|
-- themselves as 'openstack.<service>.server'
|
||||||
local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
|
local service = string.gsub(logger, '(%w+)%.(%w+).*', '%1_%2')
|
||||||
if service == nil then
|
if service == nil then
|
||||||
return -1, "Cannot match any service from " .. logger
|
return -1, "Cannot match any service from " .. logger
|
||||||
end
|
end
|
||||||
|
@ -166,7 +167,7 @@ function timer_event(ns)
|
||||||
num_metrics = num_metrics - 1
|
num_metrics = num_metrics - 1
|
||||||
if num >= bulk_size then
|
if num >= bulk_size then
|
||||||
if msg_injected < max_timer_inject then
|
if msg_injected < max_timer_inject then
|
||||||
utils.inject_bulk_metric(ns, hostname, msg_source)
|
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
|
||||||
msg_injected = msg_injected + 1
|
msg_injected = msg_injected + 1
|
||||||
num = 0
|
num = 0
|
||||||
num_metrics = 0
|
num_metrics = 0
|
||||||
|
@ -178,7 +179,7 @@ function timer_event(ns)
|
||||||
all_times[service] = nil
|
all_times[service] = nil
|
||||||
end
|
end
|
||||||
if num > 0 then
|
if num > 0 then
|
||||||
utils.inject_bulk_metric(ns, hostname, msg_source)
|
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
|
||||||
num = 0
|
num = 0
|
||||||
num_metrics = 0
|
num_metrics = 0
|
||||||
end
|
end
|
||||||
|
|
|
@ -18,41 +18,33 @@ require 'string'
|
||||||
local utils = require 'lma_utils'
|
local utils = require 'lma_utils'
|
||||||
|
|
||||||
local hostname = read_config('hostname') or error('hostname must be specified')
|
local hostname = read_config('hostname') or error('hostname must be specified')
|
||||||
local interval = (read_config('interval') or error('interval must be specified')) + 0
|
-- The filter can receive messages that should be discarded because they are
|
||||||
-- Heka cannot guarantee that logs are processed in real-time so the
|
-- way too old (Heka cannot guarantee that logs are processed in real-time).
|
||||||
-- grace_interval parameter allows to take into account log messages that are
|
-- The 'grace_interval' parameter allows to define which log messages should be
|
||||||
-- received in the current interval but emitted before it.
|
-- kept and which should be discarded. For instance, a value of '10' means that
|
||||||
|
-- the filter will take into account log messages that are at most 10 seconds
|
||||||
|
-- older than the current time.
|
||||||
local grace_interval = (read_config('grace_interval') or 0) + 0
|
local grace_interval = (read_config('grace_interval') or 0) + 0
|
||||||
|
local logger_matcher = read_config('logger_matcher') or '.*'
|
||||||
|
local metric_logger = read_config('logger')
|
||||||
|
local metric_source = read_config('source')
|
||||||
|
|
||||||
local discovered_services = {}
|
local discovered_services = {}
|
||||||
local logs_counters = {}
|
local logs_counters = {}
|
||||||
local last_timer_events = {}
|
local last_timer_event = os.time() * 1e9
|
||||||
local current_service = 1
|
|
||||||
local enter_at
|
|
||||||
local interval_in_ns = interval * 1e9
|
|
||||||
local start_time = os.time()
|
|
||||||
local msg = {
|
|
||||||
Type = "metric",
|
|
||||||
Timestamp = nil,
|
|
||||||
Severity = 6,
|
|
||||||
}
|
|
||||||
|
|
||||||
function convert_to_sec(ns)
|
|
||||||
return math.floor(ns/1e9)
|
|
||||||
end
|
|
||||||
|
|
||||||
function process_message ()
|
function process_message ()
|
||||||
local severity = read_message("Fields[severity_label]")
|
local severity = read_message("Fields[severity_label]")
|
||||||
local logger = read_message("Logger")
|
local logger = read_message("Logger")
|
||||||
|
|
||||||
local service = string.match(logger, "^openstack%.(%a+)$")
|
local service = string.match(logger, logger_matcher)
|
||||||
if service == nil then
|
if service == nil then
|
||||||
return -1, "Cannot match any services from " .. logger
|
return -1, "Cannot match any service from " .. logger
|
||||||
end
|
end
|
||||||
|
|
||||||
-- timestamp values should be converted to seconds because log timestamps
|
-- timestamp values should be converted to seconds because log timestamps
|
||||||
-- have a precision of one second (or millisecond sometimes)
|
-- have a precision of one second (or millisecond sometimes)
|
||||||
if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
|
if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < utils.convert_to_sec(last_timer_event) then
|
||||||
-- skip the the log message if it doesn't fall into the current interval
|
-- skip the the log message if it doesn't fall into the current interval
|
||||||
return 0
|
return 0
|
||||||
end
|
end
|
||||||
|
@ -67,67 +59,29 @@ function process_message ()
|
||||||
end
|
end
|
||||||
|
|
||||||
logs_counters[service][severity] = logs_counters[service][severity] + 1
|
logs_counters[service][severity] = logs_counters[service][severity] + 1
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
end
|
end
|
||||||
|
|
||||||
function timer_event(ns)
|
function timer_event(ns)
|
||||||
|
for service, counters in pairs(logs_counters) do
|
||||||
-- We can only send a maximum of ten events per call.
|
|
||||||
-- So we send all metrics about one service and we will proceed with
|
|
||||||
-- the following services at the next ticker event.
|
|
||||||
|
|
||||||
if #discovered_services == 0 then
|
|
||||||
return 0
|
|
||||||
end
|
|
||||||
|
|
||||||
-- Initialize enter_at during the first call to timer_event
|
|
||||||
if not enter_at then
|
|
||||||
enter_at = ns
|
|
||||||
end
|
|
||||||
|
|
||||||
-- To be able to send a metric we need to check if we are within the
|
|
||||||
-- interval specified in the configuration and if we haven't already sent
|
|
||||||
-- all metrics.
|
|
||||||
if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
|
|
||||||
local service_name = discovered_services[current_service]
|
|
||||||
local last_timer_event = last_timer_events[service_name] or 0
|
|
||||||
local delta_sec = (ns - last_timer_event) / 1e9
|
local delta_sec = (ns - last_timer_event) / 1e9
|
||||||
|
|
||||||
for level, val in pairs(logs_counters[service_name]) do
|
for level, val in pairs(counters) do
|
||||||
|
utils.add_to_bulk_metric(
|
||||||
-- We don't send the first value
|
'log_messages',
|
||||||
if last_timer_event ~= 0 and delta_sec ~= 0 then
|
val / delta_sec,
|
||||||
msg.Timestamp = ns
|
{hostname=hostname, service=service, level=string.lower(level)})
|
||||||
msg.Fields = {
|
|
||||||
name = 'log_messages',
|
|
||||||
type = utils.metric_type['DERIVE'],
|
|
||||||
value = val / delta_sec,
|
|
||||||
service = service_name,
|
|
||||||
level = string.lower(level),
|
|
||||||
hostname = hostname,
|
|
||||||
tag_fields = {'service', 'level', 'hostname'},
|
|
||||||
}
|
|
||||||
|
|
||||||
utils.inject_tags(msg)
|
|
||||||
ok, err = utils.safe_inject_message(msg)
|
|
||||||
if ok ~= 0 then
|
|
||||||
return -1, err
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
-- reset the counter
|
|
||||||
logs_counters[service_name][level] = 0
|
|
||||||
|
|
||||||
|
-- reset the counter
|
||||||
|
counters[level] = 0
|
||||||
end
|
end
|
||||||
|
|
||||||
last_timer_events[service_name] = ns
|
|
||||||
current_service = current_service + 1
|
|
||||||
end
|
end
|
||||||
|
|
||||||
if ns - enter_at >= interval_in_ns then
|
last_timer_event = ns
|
||||||
enter_at = ns
|
|
||||||
current_service = 1
|
ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
|
||||||
|
if ok ~= 0 then
|
||||||
|
return -1, err
|
||||||
end
|
end
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
|
@ -65,7 +65,8 @@ define lma_collector::heka (
|
||||||
config_dir => $config_dir,
|
config_dir => $config_dir,
|
||||||
filename => "${lma_collector::params::plugins_dir}/decoders/metric.lua",
|
filename => "${lma_collector::params::plugins_dir}/decoders/metric.lua",
|
||||||
module_directory => $lua_modules_dir,
|
module_directory => $lua_modules_dir,
|
||||||
config => {'deserialize_bulk_metric_for_loggers' => 'aggregated_http_metrics_filter hdd_errors_counter_filter'},
|
config => {
|
||||||
|
'deserialize_bulk_metric_for_loggers' => 'aggregated_http_metrics_filter hdd_errors_counter_filter log_counter_filter'},
|
||||||
notify => Class[$service_class],
|
notify => Class[$service_class],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,8 @@ class lma_collector::logs::aggregated_http_metrics (
|
||||||
bulk_size => $bulk_size,
|
bulk_size => $bulk_size,
|
||||||
percentile => $percentile,
|
percentile => $percentile,
|
||||||
grace_time => $grace_time,
|
grace_time => $grace_time,
|
||||||
|
logger => 'aggregated_http_metrics_filter',
|
||||||
|
source => 'log_collector',
|
||||||
},
|
},
|
||||||
module_directory => $lua_modules_dir,
|
module_directory => $lua_modules_dir,
|
||||||
notify => Class['lma_collector::service::log'],
|
notify => Class['lma_collector::service::log'],
|
||||||
|
|
|
@ -26,12 +26,15 @@ class lma_collector::logs::counter (
|
||||||
config_dir => $lma_collector::params::log_config_dir,
|
config_dir => $lma_collector::params::log_config_dir,
|
||||||
filename => "${lma_collector::params::plugins_dir}/filters/logs_counter.lua",
|
filename => "${lma_collector::params::plugins_dir}/filters/logs_counter.lua",
|
||||||
message_matcher => 'Type == \'log\' && Logger =~ /^openstack\\./',
|
message_matcher => 'Type == \'log\' && Logger =~ /^openstack\\./',
|
||||||
ticker_interval => 1,
|
ticker_interval => 60,
|
||||||
preserve_data => true,
|
preserve_data => true,
|
||||||
config => {
|
config => {
|
||||||
interval => $interval,
|
interval => $interval,
|
||||||
hostname => $hostname,
|
hostname => $hostname,
|
||||||
grace_interval => $grace_interval,
|
grace_interval => $grace_interval,
|
||||||
|
logger_matcher => '^openstack%.(%a+)$',
|
||||||
|
logger => 'log_counter_filter',
|
||||||
|
source => 'log_collector',
|
||||||
},
|
},
|
||||||
module_directory => $lua_modules_dir,
|
module_directory => $lua_modules_dir,
|
||||||
notify => Class['lma_collector::service::log'],
|
notify => Class['lma_collector::service::log'],
|
||||||
|
|
|
@ -30,7 +30,9 @@ class lma_collector::logs::hdd_errors_counter (
|
||||||
config => {
|
config => {
|
||||||
hostname => $hostname,
|
hostname => $hostname,
|
||||||
grace_interval => $grace_interval,
|
grace_interval => $grace_interval,
|
||||||
patterns => '/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/'
|
patterns => '/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/',
|
||||||
|
logger => 'hdd_errors_counter_filter',
|
||||||
|
source => 'log_collector',
|
||||||
},
|
},
|
||||||
module_directory => $lua_modules_dir,
|
module_directory => $lua_modules_dir,
|
||||||
notify => Class['lma_collector::service::log'],
|
notify => Class['lma_collector::service::log'],
|
||||||
|
|
|
@ -91,6 +91,12 @@ TestLmaUtils = {}
|
||||||
assertEquals(ret, 'foo<BR/>ba')
|
assertEquals(ret, 'foo<BR/>ba')
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function TestLmaUtils:test_convert_to_sec()
|
||||||
|
assertEquals(lma_utils.convert_to_sec(1000000001), 1)
|
||||||
|
assertEquals(lma_utils.convert_to_sec(1999999999), 1)
|
||||||
|
assertEquals(lma_utils.convert_to_sec(2000000001), 2)
|
||||||
|
end
|
||||||
|
|
||||||
lu = LuaUnit
|
lu = LuaUnit
|
||||||
lu:setVerbosity( 1 )
|
lu:setVerbosity( 1 )
|
||||||
os.exit( lu:run() )
|
os.exit( lu:run() )
|
||||||
|
|
Loading…
Reference in New Issue