Increase the number of points per InfluxDB batch
This change increases the maximum number of points that are sent in a single request. InfluxDB recommends to have a batch size of 5,000 so this is now the default configuration value. Note that the InfluxDB accumulator will flush the data either when it holds 5,000 points or when it hasn't data for at least 5 seconds. Change-Id: If07b7d285d216855997254952ca6d7511cff65ec Partial-Bug: #1581369
This commit is contained in:
parent
56ef5c42f4
commit
567562faaf
|
@ -807,6 +807,10 @@ InfluxDB.
|
|||
options: an array. Default: `[]`.
|
||||
* `time_precision`: *Optional*. Time precision. Valid options: a string.
|
||||
Default: `ms`.
|
||||
* `flush_count`: *Optional*. Maximum number of datapoints to send in a single
|
||||
write request. Valid values: an integer. Default: `5000`.
|
||||
* `flush_interval`: *Optional*. Maximum number of seconds to wait before
|
||||
writing data to InfluxDB. Valid values: an integer. Default: `5`.
|
||||
|
||||
#### Class: `lma_collector::notifications::input`
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@ local utils = require 'lma_utils'
|
|||
local l = require 'lpeg'
|
||||
l.locale(l)
|
||||
|
||||
local flush_count = read_config('flush_count') or 100
|
||||
local flush_interval = read_config('flush_interval') or 5
|
||||
local flush_count = (read_config('flush_count') or 100) + 0
|
||||
local flush_interval = (read_config('flush_interval') or 5) + 0
|
||||
local default_tenant_id = read_config("default_tenant_id")
|
||||
local default_user_id = read_config("default_user_id")
|
||||
local time_precision = read_config("time_precision")
|
||||
|
|
|
@ -20,16 +20,16 @@ class lma_collector::influxdb (
|
|||
$port = $lma_collector::params::influxdb_port,
|
||||
$tag_fields = $lma_collector::params::influxdb_tag_fields,
|
||||
$time_precision = $lma_collector::params::influxdb_time_precision,
|
||||
$flush_count = $lma_collector::params::influxdb_flush_count,
|
||||
$flush_interval = $lma_collector::params::influxdb_flush_interval,
|
||||
) inherits lma_collector::params {
|
||||
include lma_collector::service::metric
|
||||
|
||||
$lua_modules_dir = $lma_collector::params::lua_modules_dir
|
||||
|
||||
validate_string($database)
|
||||
validate_string($user)
|
||||
validate_string($password)
|
||||
validate_string($server)
|
||||
validate_string($database, $user, $password, $server, $time_precision)
|
||||
validate_array($tag_fields)
|
||||
validate_integer([$flush_count, $flush_interval])
|
||||
|
||||
heka::filter::sandbox { 'influxdb_accumulator':
|
||||
config_dir => $lma_collector::params::metric_config_dir,
|
||||
|
@ -37,8 +37,8 @@ class lma_collector::influxdb (
|
|||
message_matcher => $lma_collector::params::influxdb_message_matcher,
|
||||
ticker_interval => 1,
|
||||
config => {
|
||||
flush_interval => $lma_collector::params::influxdb_flush_interval,
|
||||
flush_count => $lma_collector::params::influxdb_flush_count,
|
||||
flush_interval => $flush_interval,
|
||||
flush_count => $flush_count,
|
||||
tag_fields => join(sort(concat(['hostname'], $tag_fields)), ' '),
|
||||
time_precision => $time_precision,
|
||||
# FIXME(pasquier-s): provide the default_tenant_id & default_user_id
|
||||
|
|
|
@ -156,6 +156,9 @@ class lma_collector::params {
|
|||
|
||||
$influxdb_port = '8086'
|
||||
$influxdb_timeout = 5
|
||||
$influxdb_flush_interval = 5
|
||||
# InfluxDB recommends a batch size of 5,000 points
|
||||
$influxdb_flush_count = 5000
|
||||
$influxdb_tag_fields = []
|
||||
$influxdb_time_precision = 'ms'
|
||||
$influxdb_message_matcher = join([
|
||||
|
|
|
@ -20,8 +20,8 @@ describe 'lma_collector::influxdb' do
|
|||
end
|
||||
|
||||
describe 'with mandatory parameters' do
|
||||
let(:params) {{ :server => 'localhost', :user => 'lma', :password =>
|
||||
'lma', :database => 'lma' }}
|
||||
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
|
||||
:database => 'lma' }}
|
||||
it { is_expected.to contain_heka__output__http('influxdb') }
|
||||
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator') }
|
||||
|
@ -29,13 +29,27 @@ describe 'lma_collector::influxdb' do
|
|||
end
|
||||
|
||||
describe 'with tag_fields parameter' do
|
||||
let(:params) {{ :server => 'localhost', :user => 'lma', :password =>
|
||||
'lma', :database => 'lma', :tag_fields => ['foo', 'zzz'] }}
|
||||
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
|
||||
:database => 'lma', :tag_fields => ['foo', 'zzz'] }}
|
||||
it { is_expected.to contain_heka__output__http('influxdb') }
|
||||
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
|
||||
"tag_fields" => "foo hostname zzz", "flush_interval"=> :undef,
|
||||
"flush_count"=> :undef, "time_precision" => "ms"}) }
|
||||
"tag_fields" => "foo hostname zzz", "flush_interval"=> "5",
|
||||
"flush_count"=> "5000", "time_precision" => "ms"}) }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
|
||||
end
|
||||
|
||||
describe 'with flush and precision parameters' do
|
||||
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
|
||||
:database => 'lma', :flush_count => 1, :flush_interval => 2,
|
||||
:time_precision => 's'
|
||||
}}
|
||||
it { is_expected.to contain_heka__output__http('influxdb') }
|
||||
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
|
||||
"flush_interval"=> "2", "flush_count"=> "1", "time_precision" => "s",
|
||||
"tag_fields" => "hostname"
|
||||
}) }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue