Increase the number of points per InfluxDB batch
This change improves the InfluxDB write performances by increasing to 500 the maximum number of points that are sent per InfluxDB request. InfluxDB recommends to have a batch size of 5,000 but it cannot be the default configuration value due to the fixed sized of Heka messages (256K currently) which leads to silently discard metrics. Note that the InfluxDB accumulator will flush the data either when it holds 500 points or when it hasn't data for at least 5 seconds. Co-Authored-By: Swann Croiset <scroiset@mirantis.com> Change-Id: I7d238375dc0c231782983fc4901c9a32936fb08a Partial-Bug: #1581369
This commit is contained in:
parent
76987240a9
commit
2cc44ddba0
|
@ -815,6 +815,10 @@ InfluxDB.
|
|||
options: an array. Default: `[]`.
|
||||
* `time_precision`: *Optional*. Time precision. Valid options: a string.
|
||||
Default: `ms`.
|
||||
* `flush_count`: *Optional*. Maximum number of datapoints to send in a single
|
||||
write request. Valid values: an integer. Default: `5000`.
|
||||
* `flush_interval`: *Optional*. Maximum number of seconds to wait before
|
||||
writing data to InfluxDB. Valid values: an integer. Default: `5`.
|
||||
|
||||
#### Class: `lma_collector::notifications::input`
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ class lma_collector::influxdb (
|
|||
$port,
|
||||
$tag_fields = $lma_collector::params::influxdb_tag_fields,
|
||||
$time_precision = $lma_collector::params::influxdb_time_precision,
|
||||
$flush_count = $lma_collector::params::influxdb_flush_count,
|
||||
$flush_interval = $lma_collector::params::influxdb_flush_interval,
|
||||
) inherits lma_collector::params {
|
||||
include lma_collector::service::metric
|
||||
|
||||
|
@ -27,7 +29,9 @@ class lma_collector::influxdb (
|
|||
|
||||
$lua_modules_dir = $lma_collector::params::lua_modules_dir
|
||||
|
||||
validate_string($database, $user, $password, $server, $time_precision)
|
||||
validate_array($tag_fields)
|
||||
validate_integer([$flush_count, $flush_interval])
|
||||
|
||||
heka::filter::sandbox { 'influxdb_accumulator':
|
||||
config_dir => $lma_collector::params::metric_config_dir,
|
||||
|
@ -35,8 +39,8 @@ class lma_collector::influxdb (
|
|||
message_matcher => $lma_collector::params::influxdb_message_matcher,
|
||||
ticker_interval => 1,
|
||||
config => {
|
||||
flush_interval => $lma_collector::params::influxdb_flush_interval,
|
||||
flush_count => $lma_collector::params::influxdb_flush_count,
|
||||
flush_interval => $flush_interval,
|
||||
flush_count => $flush_count,
|
||||
tag_fields => join(sort($tag_fields), ' '),
|
||||
time_precision => $time_precision,
|
||||
# FIXME(pasquier-s): provide the default_tenant_id & default_user_id
|
||||
|
|
|
@ -157,6 +157,11 @@ class lma_collector::params {
|
|||
$elasticsearch_fields = ['Timestamp', 'Type', 'Logger', 'Severity', 'Payload', 'Pid', 'Hostname', 'DynamicFields']
|
||||
|
||||
$influxdb_timeout = 5
|
||||
$influxdb_flush_interval = 5
|
||||
# InfluxDB recommends a batch size of 5,000 points but we are limited to 500
|
||||
# due to the hekad_max_message_size. The limit is reached when the influxdb
|
||||
# accumulator inject data points.
|
||||
$influxdb_flush_count = 500
|
||||
$influxdb_tag_fields = []
|
||||
$influxdb_time_precision = 'ms'
|
||||
$influxdb_message_matcher = join([
|
||||
|
|
|
@ -35,8 +35,22 @@ describe 'lma_collector::influxdb' do
|
|||
it { is_expected.to contain_heka__output__http('influxdb') }
|
||||
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
|
||||
"tag_fields" => "foo zzz", "flush_interval"=> :undef,
|
||||
"flush_count"=> :undef, "time_precision" => "ms"}) }
|
||||
"tag_fields" => "foo zzz", "flush_interval"=> 5,
|
||||
"flush_count"=> 500, "time_precision" => "ms"}) }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
|
||||
end
|
||||
|
||||
describe 'with flush and precision parameters' do
|
||||
let(:params) {{ :server => 'localhost',:port => 8086, :user => 'lma', :password => 'lma',
|
||||
:database => 'lma', :flush_count => 1, :flush_interval => 2,
|
||||
:time_precision => 's'
|
||||
}}
|
||||
it { is_expected.to contain_heka__output__http('influxdb') }
|
||||
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
|
||||
"flush_interval"=> "2", "flush_count"=> "1", "time_precision" => "s",
|
||||
"tag_fields" => ""
|
||||
}) }
|
||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue