Send log_messages metric as bulk

Using bulk metrics for the log counters reduces largely the likelihood
of blocking the Heka pipeline. Instead of injecting (x services
* y levels) metric messages, the filter injects only one big message.

This changes also updates the configuration of the metric_collector
service to deserialize the bulk metric to support alarms on log
counters.

Change-Id: Icb71fd6faa4191795c0470ecc24aeafd25794f42
Closes-Bug: #1643280
This commit is contained in:
Simon Pasquier 2017-01-06 15:01:59 +01:00
parent 81962e63d4
commit 72fe1f64fe
10 changed files with 72 additions and 98 deletions

View File

@ -74,7 +74,7 @@ function add_to_bulk_metric(name, value, tags)
end
-- Send the bulk metric message to the Heka pipeline
function inject_bulk_metric(ts, hostname, source)
function inject_bulk_metric(ts, hostname, logger, source, m_type)
if #bulk_datapoints == 0 then
return
end
@ -89,20 +89,23 @@ function inject_bulk_metric(ts, hostname, source)
end
local msg = {
Logger = logger,
Hostname = hostname,
Timestamp = ts,
Payload = payload,
Type = 'bulk_metric', -- prepended with 'heka.sandbox'
Severity = label_to_severity_map.INFO,
Fields = {
source = source
hostname = hostname,
source = source,
type = m_type or metric_type['GAUGE']
}
}
-- reset the local table storing the datapoints
bulk_datapoints = {}
inject_tags(msg)
safe_inject_message(msg)
return safe_inject_message(msg)
end
-- Encode a Lua variable as JSON without raising an exception if the encoding
@ -303,4 +306,9 @@ function get_values_from_metric()
return true, value
end
-- convert a nanosecond value to second
function convert_to_sec(ns)
return math.floor(ns/1e9)
end
return M

View File

@ -31,19 +31,17 @@ end
-- grace_interval parameter allows to take into account log messages that are
-- received in the current interval but emitted before it.
local grace_interval = (read_config('grace_interval') or 0) + 0
local metric_logger = read_config('logger')
local metric_source = read_config('source')
local error_counters = {}
local enter_at
local start_time = os.time()
local function convert_to_sec(ns)
return math.floor(ns/1e9)
end
function process_message ()
-- timestamp values should be converted to seconds because log timestamps
-- have a precision of one second (or millisecond sometimes)
if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(enter_at or 0), start_time) then
if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(utils.convert_to_sec(enter_at or 0), start_time) then
-- skip the log message if it doesn't fall into the current interval
return 0
end
@ -78,18 +76,15 @@ function timer_event(ns)
local delta_sec = (ns - (enter_at or 0)) / 1e9
for dev, value in pairs(error_counters) do
-- Don`t send values at the first ticker interval
-- Don`t send values from first ticker interval
if enter_at ~= nil then
utils.add_to_bulk_metric(
"hdd_errors_rate",
value / delta_sec,
{device=dev, hostname=hostname})
utils.add_to_bulk_metric("hdd_errors_rate", value / delta_sec, {device=dev})
end
error_counters[dev] = 0
end
enter_at = ns
utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
return 0
end

View File

@ -16,7 +16,7 @@ require 'string'
require 'math'
local utils = require 'lma_utils'
function process_table(typ, array, tags)
function process_table(typ, array)
-- NOTE: It has been written for "filters" and "decoders". If we need to
-- use it to collect metrics from other components of the Heka pipeline,
-- we need to ensure that JSON provides names and table with
@ -41,7 +41,10 @@ function process_table(typ, array, tags)
-- strip off the '_decoder'/'_filter' suffix
local name = v['Name']:gsub("_" .. typ, "")
tags['name'] = name
local tags = {
['type'] = typ,
['name'] = name,
}
utils.add_to_bulk_metric('hekad_msg_count', v.ProcessMessageCount.value, tags)
utils.add_to_bulk_metric('hekad_msg_avg_duration', v.ProcessMessageAvgDuration.value, tags)
@ -73,11 +76,10 @@ function process_message ()
for k, v in pairs(data) do
if k == "filters" or k == "decoders" then
local typ = singularize(k)
process_table(typ, v, {hostname=hostname, ['type']=typ})
process_table(singularize(k), v)
end
end
utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
return 0
end

View File

@ -34,11 +34,12 @@ local percentile_thresh = (read_config('percentile') or 90) + 0
-- and also to compensate the delay introduced by log parsing/decoding
-- which leads to arrive too late in its interval.
local grace_time = (read_config('grace_time') or 0) + 0
local metric_logger = read_config('logger')
local metric_source = read_config('source')
local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
local percentile_field_name = string.format('upper_%s', percentile_thresh)
local msg_source = 'http_metric_filter'
local last_tick = os.time() * 1e9
local interval_in_ns = interval * 1e9
@ -84,7 +85,7 @@ function process_message ()
-- keep only the first 2 tokens because some services like Neutron report
-- themselves as 'openstack.<service>.server'
local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
local service = string.gsub(logger, '(%w+)%.(%w+).*', '%1_%2')
if service == nil then
return -1, "Cannot match any service from " .. logger
end
@ -166,7 +167,7 @@ function timer_event(ns)
num_metrics = num_metrics - 1
if num >= bulk_size then
if msg_injected < max_timer_inject then
utils.inject_bulk_metric(ns, hostname, msg_source)
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
msg_injected = msg_injected + 1
num = 0
num_metrics = 0
@ -178,7 +179,7 @@ function timer_event(ns)
all_times[service] = nil
end
if num > 0 then
utils.inject_bulk_metric(ns, hostname, msg_source)
utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
num = 0
num_metrics = 0
end

View File

@ -18,41 +18,33 @@ require 'string'
local utils = require 'lma_utils'
local hostname = read_config('hostname') or error('hostname must be specified')
local interval = (read_config('interval') or error('interval must be specified')) + 0
-- Heka cannot guarantee that logs are processed in real-time so the
-- grace_interval parameter allows to take into account log messages that are
-- received in the current interval but emitted before it.
-- The filter can receive messages that should be discarded because they are
-- way too old (Heka cannot guarantee that logs are processed in real-time).
-- The 'grace_interval' parameter allows to define which log messages should be
-- kept and which should be discarded. For instance, a value of '10' means that
-- the filter will take into account log messages that are at most 10 seconds
-- older than the current time.
local grace_interval = (read_config('grace_interval') or 0) + 0
local logger_matcher = read_config('logger_matcher') or '.*'
local metric_logger = read_config('logger')
local metric_source = read_config('source')
local discovered_services = {}
local logs_counters = {}
local last_timer_events = {}
local current_service = 1
local enter_at
local interval_in_ns = interval * 1e9
local start_time = os.time()
local msg = {
Type = "metric",
Timestamp = nil,
Severity = 6,
}
function convert_to_sec(ns)
return math.floor(ns/1e9)
end
local last_timer_event = os.time() * 1e9
function process_message ()
local severity = read_message("Fields[severity_label]")
local logger = read_message("Logger")
local service = string.match(logger, "^openstack%.(%a+)$")
local service = string.match(logger, logger_matcher)
if service == nil then
return -1, "Cannot match any services from " .. logger
return -1, "Cannot match any service from " .. logger
end
-- timestamp values should be converted to seconds because log timestamps
-- have a precision of one second (or millisecond sometimes)
if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < utils.convert_to_sec(last_timer_event) then
-- skip the the log message if it doesn't fall into the current interval
return 0
end
@ -67,67 +59,29 @@ function process_message ()
end
logs_counters[service][severity] = logs_counters[service][severity] + 1
return 0
end
function timer_event(ns)
-- We can only send a maximum of ten events per call.
-- So we send all metrics about one service and we will proceed with
-- the following services at the next ticker event.
if #discovered_services == 0 then
return 0
end
-- Initialize enter_at during the first call to timer_event
if not enter_at then
enter_at = ns
end
-- To be able to send a metric we need to check if we are within the
-- interval specified in the configuration and if we haven't already sent
-- all metrics.
if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
local service_name = discovered_services[current_service]
local last_timer_event = last_timer_events[service_name] or 0
for service, counters in pairs(logs_counters) do
local delta_sec = (ns - last_timer_event) / 1e9
for level, val in pairs(logs_counters[service_name]) do
-- We don't send the first value
if last_timer_event ~= 0 and delta_sec ~= 0 then
msg.Timestamp = ns
msg.Fields = {
name = 'log_messages',
type = utils.metric_type['DERIVE'],
value = val / delta_sec,
service = service_name,
level = string.lower(level),
hostname = hostname,
tag_fields = {'service', 'level', 'hostname'},
}
utils.inject_tags(msg)
ok, err = utils.safe_inject_message(msg)
if ok ~= 0 then
return -1, err
end
end
-- reset the counter
logs_counters[service_name][level] = 0
for level, val in pairs(counters) do
utils.add_to_bulk_metric(
'log_messages',
val / delta_sec,
{hostname=hostname, service=service, level=string.lower(level)})
-- reset the counter
counters[level] = 0
end
last_timer_events[service_name] = ns
current_service = current_service + 1
end
if ns - enter_at >= interval_in_ns then
enter_at = ns
current_service = 1
last_timer_event = ns
ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
if ok ~= 0 then
return -1, err
end
return 0

View File

@ -65,7 +65,8 @@ define lma_collector::heka (
config_dir => $config_dir,
filename => "${lma_collector::params::plugins_dir}/decoders/metric.lua",
module_directory => $lua_modules_dir,
config => {'deserialize_bulk_metric_for_loggers' => 'aggregated_http_metrics_filter hdd_errors_counter_filter'},
config => {
'deserialize_bulk_metric_for_loggers' => 'aggregated_http_metrics_filter hdd_errors_counter_filter log_counter_filter'},
notify => Class[$service_class],
}

View File

@ -37,6 +37,8 @@ class lma_collector::logs::aggregated_http_metrics (
bulk_size => $bulk_size,
percentile => $percentile,
grace_time => $grace_time,
logger => 'aggregated_http_metrics_filter',
source => 'log_collector',
},
module_directory => $lua_modules_dir,
notify => Class['lma_collector::service::log'],

View File

@ -26,12 +26,15 @@ class lma_collector::logs::counter (
config_dir => $lma_collector::params::log_config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/logs_counter.lua",
message_matcher => 'Type == \'log\' && Logger =~ /^openstack\\./',
ticker_interval => 1,
ticker_interval => 60,
preserve_data => true,
config => {
interval => $interval,
hostname => $hostname,
grace_interval => $grace_interval,
logger_matcher => '^openstack%.(%a+)$',
logger => 'log_counter_filter',
source => 'log_collector',
},
module_directory => $lua_modules_dir,
notify => Class['lma_collector::service::log'],

View File

@ -30,7 +30,9 @@ class lma_collector::logs::hdd_errors_counter (
config => {
hostname => $hostname,
grace_interval => $grace_interval,
patterns => '/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/'
patterns => '/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/',
logger => 'hdd_errors_counter_filter',
source => 'log_collector',
},
module_directory => $lua_modules_dir,
notify => Class['lma_collector::service::log'],

View File

@ -91,6 +91,12 @@ TestLmaUtils = {}
assertEquals(ret, 'foo<BR/>ba')
end
function TestLmaUtils:test_convert_to_sec()
assertEquals(lma_utils.convert_to_sec(1000000001), 1)
assertEquals(lma_utils.convert_to_sec(1999999999), 1)
assertEquals(lma_utils.convert_to_sec(2000000001), 2)
end
lu = LuaUnit
lu:setVerbosity( 1 )
os.exit( lu:run() )