Increase the number of points per InfluxDB batch

This change increases the maximum number of points that are sent in a
single request. InfluxDB recommends to have a batch size of 5,000 so
this is now the default configuration value. Note that the InfluxDB
accumulator will flush the data either when it holds 5,000 points or
when it hasn't data for at least 5 seconds.

Change-Id: If07b7d285d216855997254952ca6d7511cff65ec
Partial-Bug: #1581369
This commit is contained in:
Simon Pasquier 2016-05-19 15:25:43 +02:00
parent 56ef5c42f4
commit 567562faaf
5 changed files with 35 additions and 14 deletions

View File

@ -807,6 +807,10 @@ InfluxDB.
options: an array. Default: `[]`.
* `time_precision`: *Optional*. Time precision. Valid options: a string.
Default: `ms`.
* `flush_count`: *Optional*. Maximum number of datapoints to send in a single
write request. Valid values: an integer. Default: `5000`.
* `flush_interval`: *Optional*. Maximum number of seconds to wait before
writing data to InfluxDB. Valid values: an integer. Default: `5`.
#### Class: `lma_collector::notifications::input`

View File

@ -19,8 +19,8 @@ local utils = require 'lma_utils'
local l = require 'lpeg'
l.locale(l)
local flush_count = read_config('flush_count') or 100
local flush_interval = read_config('flush_interval') or 5
local flush_count = (read_config('flush_count') or 100) + 0
local flush_interval = (read_config('flush_interval') or 5) + 0
local default_tenant_id = read_config("default_tenant_id")
local default_user_id = read_config("default_user_id")
local time_precision = read_config("time_precision")

View File

@ -20,16 +20,16 @@ class lma_collector::influxdb (
$port = $lma_collector::params::influxdb_port,
$tag_fields = $lma_collector::params::influxdb_tag_fields,
$time_precision = $lma_collector::params::influxdb_time_precision,
$flush_count = $lma_collector::params::influxdb_flush_count,
$flush_interval = $lma_collector::params::influxdb_flush_interval,
) inherits lma_collector::params {
include lma_collector::service::metric
$lua_modules_dir = $lma_collector::params::lua_modules_dir
validate_string($database)
validate_string($user)
validate_string($password)
validate_string($server)
validate_string($database, $user, $password, $server, $time_precision)
validate_array($tag_fields)
validate_integer([$flush_count, $flush_interval])
heka::filter::sandbox { 'influxdb_accumulator':
config_dir => $lma_collector::params::metric_config_dir,
@ -37,8 +37,8 @@ class lma_collector::influxdb (
message_matcher => $lma_collector::params::influxdb_message_matcher,
ticker_interval => 1,
config => {
flush_interval => $lma_collector::params::influxdb_flush_interval,
flush_count => $lma_collector::params::influxdb_flush_count,
flush_interval => $flush_interval,
flush_count => $flush_count,
tag_fields => join(sort(concat(['hostname'], $tag_fields)), ' '),
time_precision => $time_precision,
# FIXME(pasquier-s): provide the default_tenant_id & default_user_id

View File

@ -156,6 +156,9 @@ class lma_collector::params {
$influxdb_port = '8086'
$influxdb_timeout = 5
$influxdb_flush_interval = 5
# InfluxDB recommends a batch size of 5,000 points
$influxdb_flush_count = 5000
$influxdb_tag_fields = []
$influxdb_time_precision = 'ms'
$influxdb_message_matcher = join([

View File

@ -20,8 +20,8 @@ describe 'lma_collector::influxdb' do
end
describe 'with mandatory parameters' do
let(:params) {{ :server => 'localhost', :user => 'lma', :password =>
'lma', :database => 'lma' }}
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
:database => 'lma' }}
it { is_expected.to contain_heka__output__http('influxdb') }
it { is_expected.to contain_heka__encoder__payload('influxdb') }
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator') }
@ -29,13 +29,27 @@ describe 'lma_collector::influxdb' do
end
describe 'with tag_fields parameter' do
let(:params) {{ :server => 'localhost', :user => 'lma', :password =>
'lma', :database => 'lma', :tag_fields => ['foo', 'zzz'] }}
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
:database => 'lma', :tag_fields => ['foo', 'zzz'] }}
it { is_expected.to contain_heka__output__http('influxdb') }
it { is_expected.to contain_heka__encoder__payload('influxdb') }
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
"tag_fields" => "foo hostname zzz", "flush_interval"=> :undef,
"flush_count"=> :undef, "time_precision" => "ms"}) }
"tag_fields" => "foo hostname zzz", "flush_interval"=> "5",
"flush_count"=> "5000", "time_precision" => "ms"}) }
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
end
describe 'with flush and precision parameters' do
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
:database => 'lma', :flush_count => 1, :flush_interval => 2,
:time_precision => 's'
}}
it { is_expected.to contain_heka__output__http('influxdb') }
it { is_expected.to contain_heka__encoder__payload('influxdb') }
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
"flush_interval"=> "2", "flush_count"=> "1", "time_precision" => "s",
"tag_fields" => "hostname"
}) }
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
end
end