Deserialize aggregated HTTP in the metric collector

This patch adds a custom decoder to deserialize bulk metrics.
The deserialization of bulk_metric message is configurable per Logger and
is disabled by default.
Currently, only the HTTP metrics are deserialized (Logger: aggregated_http_metrics_filter)
To deserialize several Loggers, a list of Loggers separated by space can
be configured for the sandbox.

blueprint: keystone-performance-monitoring

Change-Id: I08e8c816d889f4b0917d766b000fb25330eff174
This commit is contained in:
Swann Croiset 2016-04-21 14:34:33 +02:00
parent 5d7eff0d8c
commit c7dcbdcaf9
3 changed files with 102 additions and 2 deletions

View File

@ -0,0 +1,90 @@
-- Copyright 2016 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require "cjson"
require "string"
local l = require 'lpeg'
l.locale(l)
local loggers_pattern = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
local loggers_list = loggers_pattern:match(read_config('deserialize_bulk_metric_for_loggers') or '')
local loggers = {}
for _, logger in ipairs(loggers_list) do
loggers[logger] = true
end
local utils = require 'lma_utils'
function process_message ()
local msg = decode_message(read_message("raw"))
if string.match(msg.Type, 'bulk_metric$') and loggers[msg.Logger] ~= nil then
local ok, metrics = pcall(cjson.decode, msg.Payload)
if not ok then
return -1, metrics
end
local new_msg = {
Timestamp = msg.Timestamp,
Hostname = msg.Hostname,
Severity = msg.Severity,
Logger = msg.Logger,
Type = nil,
Payload = '',
Fields = {},
}
for _, metric in ipairs(metrics) do
local fields = {}
local metric_type
if metric.value then
metric_type = 'metric'
fields['value'] = metric.value
else
metric_type = 'multivalue_metric'
local value_fields = {}
for k, v in pairs(metric.values) do
fields[k] = v
table.insert(value_fields, k)
end
fields['value_fields'] = value_fields
end
local tag_fields = {}
for t, v in pairs(metric.tags or {}) do
fields[t] = v
table.insert(tag_fields, t)
end
fields['tag_fields'] = tag_fields
fields['name'] = metric.name
fields['hostname'] = msg.Hostname
new_msg.Type = metric_type
new_msg.Fields = fields
utils.inject_tags(new_msg)
ok, err = utils.safe_inject_message(new_msg)
if ok ~= 0 then
return -1, err
end
end
else -- simple metric
utils.inject_tags(msg)
ok, err = utils.safe_inject_message(msg)
if ok ~= 0 then
return -1, err
end
end
return 0
end

View File

@ -61,12 +61,20 @@ define lma_collector::heka (
$service_class = 'lma_collector::service::metric'
$dashboard_port = $lma_collector::params::metric_dashboard_port
heka::decoder::sandbox { 'metric':
config_dir => $config_dir,
filename => "${lma_collector::params::plugins_dir}/decoders/metric.lua",
module_directory => $lua_modules_dir,
config => {'deserialize_bulk_metric_for_loggers' => 'aggregated_http_metrics_filter'},
notify => Class[$service_class],
}
heka::input::tcp { 'metric':
config_dir => $config_dir,
address => $lma_collector::params::metric_input_address,
port => $lma_collector::params::metric_input_port,
decoder => 'ProtobufDecoder',
require => ::Heka[$title],
decoder => 'metric',
require => [::Heka[$title], Heka::Decoder::Sandbox['metric']],
notify => Class[$service_class],
}

View File

@ -39,6 +39,7 @@ describe 'lma_collector::heka' do
'poolsize' => 100,
)
should contain_heka__input__tcp('metric')
should contain_heka__decoder__sandbox('metric' )
should contain_heka__filter__sandbox('heka_monitoring_metric_collector')
should contain_heka__output__dashboard('dashboard_metric_collector' )
}
@ -65,6 +66,7 @@ describe 'lma_collector::heka' do
'poolsize' => 42,
)
should contain_heka__input__tcp('metric')
should contain_heka__decoder__sandbox('metric' )
is_expected.to_not contain_heka__filter__sandbox('heka_monitoring_metric_collector')
is_expected.to_not contain_heka__output__dashboard('dashboard_metric_collector' )
}