Increase the number of points per InfluxDB batch

This change improves the InfluxDB write performances by increasing to
500 the maximum number of points that are sent per InfluxDB request.
InfluxDB recommends to have a batch size of 5,000 but it cannot be the default
configuration value due to the fixed sized of Heka messages (256K currently)
which leads to silently discard metrics.
Note that the InfluxDB accumulator will flush the data either when
it holds 500 points or when it hasn't data for at least 5 seconds.

Co-Authored-By: Swann Croiset <scroiset@mirantis.com>

Change-Id: I7d238375dc0c231782983fc4901c9a32936fb08a
Partial-Bug: #1581369
This commit is contained in:
Simon Pasquier 2016-05-19 15:25:43 +02:00 committed by Swann Croiset
parent 76987240a9
commit 2cc44ddba0
4 changed files with 31 additions and 4 deletions

View File

@ -815,6 +815,10 @@ InfluxDB.
options: an array. Default: `[]`.
* `time_precision`: *Optional*. Time precision. Valid options: a string.
Default: `ms`.
* `flush_count`: *Optional*. Maximum number of datapoints to send in a single
write request. Valid values: an integer. Default: `5000`.
* `flush_interval`: *Optional*. Maximum number of seconds to wait before
writing data to InfluxDB. Valid values: an integer. Default: `5`.
#### Class: `lma_collector::notifications::input`

View File

@ -20,6 +20,8 @@ class lma_collector::influxdb (
$port,
$tag_fields = $lma_collector::params::influxdb_tag_fields,
$time_precision = $lma_collector::params::influxdb_time_precision,
$flush_count = $lma_collector::params::influxdb_flush_count,
$flush_interval = $lma_collector::params::influxdb_flush_interval,
) inherits lma_collector::params {
include lma_collector::service::metric
@ -27,7 +29,9 @@ class lma_collector::influxdb (
$lua_modules_dir = $lma_collector::params::lua_modules_dir
validate_string($database, $user, $password, $server, $time_precision)
validate_array($tag_fields)
validate_integer([$flush_count, $flush_interval])
heka::filter::sandbox { 'influxdb_accumulator':
config_dir => $lma_collector::params::metric_config_dir,
@ -35,8 +39,8 @@ class lma_collector::influxdb (
message_matcher => $lma_collector::params::influxdb_message_matcher,
ticker_interval => 1,
config => {
flush_interval => $lma_collector::params::influxdb_flush_interval,
flush_count => $lma_collector::params::influxdb_flush_count,
flush_interval => $flush_interval,
flush_count => $flush_count,
tag_fields => join(sort($tag_fields), ' '),
time_precision => $time_precision,
# FIXME(pasquier-s): provide the default_tenant_id & default_user_id

View File

@ -157,6 +157,11 @@ class lma_collector::params {
$elasticsearch_fields = ['Timestamp', 'Type', 'Logger', 'Severity', 'Payload', 'Pid', 'Hostname', 'DynamicFields']
$influxdb_timeout = 5
$influxdb_flush_interval = 5
# InfluxDB recommends a batch size of 5,000 points but we are limited to 500
# due to the hekad_max_message_size. The limit is reached when the influxdb
# accumulator inject data points.
$influxdb_flush_count = 500
$influxdb_tag_fields = []
$influxdb_time_precision = 'ms'
$influxdb_message_matcher = join([

View File

@ -35,8 +35,22 @@ describe 'lma_collector::influxdb' do
it { is_expected.to contain_heka__output__http('influxdb') }
it { is_expected.to contain_heka__encoder__payload('influxdb') }
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
"tag_fields" => "foo zzz", "flush_interval"=> :undef,
"flush_count"=> :undef, "time_precision" => "ms"}) }
"tag_fields" => "foo zzz", "flush_interval"=> 5,
"flush_count"=> 500, "time_precision" => "ms"}) }
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
end
describe 'with flush and precision parameters' do
let(:params) {{ :server => 'localhost',:port => 8086, :user => 'lma', :password => 'lma',
:database => 'lma', :flush_count => 1, :flush_interval => 2,
:time_precision => 's'
}}
it { is_expected.to contain_heka__output__http('influxdb') }
it { is_expected.to contain_heka__encoder__payload('influxdb') }
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
"flush_interval"=> "2", "flush_count"=> "1", "time_precision" => "s",
"tag_fields" => ""
}) }
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
end
end