From 567562faaf6ac60880eec4ab9e66154b70f2ad9e Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Thu, 19 May 2016 15:25:43 +0200 Subject: [PATCH] Increase the number of points per InfluxDB batch This change increases the maximum number of points that are sent in a single request. InfluxDB recommends to have a batch size of 5,000 so this is now the default configuration value. Note that the InfluxDB accumulator will flush the data either when it holds 5,000 points or when it hasn't data for at least 5 seconds. Change-Id: If07b7d285d216855997254952ca6d7511cff65ec Partial-Bug: #1581369 --- .../puppet/modules/lma_collector/README.md | 4 +++ .../plugins/filters/influxdb_accumulator.lua | 4 +-- .../lma_collector/manifests/influxdb.pp | 12 ++++----- .../modules/lma_collector/manifests/params.pp | 3 +++ .../classes/lma_collector_influxdb_spec.rb | 26 ++++++++++++++----- 5 files changed, 35 insertions(+), 14 deletions(-) diff --git a/deployment_scripts/puppet/modules/lma_collector/README.md b/deployment_scripts/puppet/modules/lma_collector/README.md index 0558277c0..b931000fb 100644 --- a/deployment_scripts/puppet/modules/lma_collector/README.md +++ b/deployment_scripts/puppet/modules/lma_collector/README.md @@ -807,6 +807,10 @@ InfluxDB. options: an array. Default: `[]`. * `time_precision`: *Optional*. Time precision. Valid options: a string. Default: `ms`. +* `flush_count`: *Optional*. Maximum number of datapoints to send in a single + write request. Valid values: an integer. Default: `5000`. +* `flush_interval`: *Optional*. Maximum number of seconds to wait before + writing data to InfluxDB. Valid values: an integer. Default: `5`. #### Class: `lma_collector::notifications::input` diff --git a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/influxdb_accumulator.lua b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/influxdb_accumulator.lua index d9a79c529..2a5319e65 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/influxdb_accumulator.lua +++ b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/influxdb_accumulator.lua @@ -19,8 +19,8 @@ local utils = require 'lma_utils' local l = require 'lpeg' l.locale(l) -local flush_count = read_config('flush_count') or 100 -local flush_interval = read_config('flush_interval') or 5 +local flush_count = (read_config('flush_count') or 100) + 0 +local flush_interval = (read_config('flush_interval') or 5) + 0 local default_tenant_id = read_config("default_tenant_id") local default_user_id = read_config("default_user_id") local time_precision = read_config("time_precision") diff --git a/deployment_scripts/puppet/modules/lma_collector/manifests/influxdb.pp b/deployment_scripts/puppet/modules/lma_collector/manifests/influxdb.pp index 3329d06c8..094258dd2 100644 --- a/deployment_scripts/puppet/modules/lma_collector/manifests/influxdb.pp +++ b/deployment_scripts/puppet/modules/lma_collector/manifests/influxdb.pp @@ -20,16 +20,16 @@ class lma_collector::influxdb ( $port = $lma_collector::params::influxdb_port, $tag_fields = $lma_collector::params::influxdb_tag_fields, $time_precision = $lma_collector::params::influxdb_time_precision, + $flush_count = $lma_collector::params::influxdb_flush_count, + $flush_interval = $lma_collector::params::influxdb_flush_interval, ) inherits lma_collector::params { include lma_collector::service::metric $lua_modules_dir = $lma_collector::params::lua_modules_dir - validate_string($database) - validate_string($user) - validate_string($password) - validate_string($server) + validate_string($database, $user, $password, $server, $time_precision) validate_array($tag_fields) + validate_integer([$flush_count, $flush_interval]) heka::filter::sandbox { 'influxdb_accumulator': config_dir => $lma_collector::params::metric_config_dir, @@ -37,8 +37,8 @@ class lma_collector::influxdb ( message_matcher => $lma_collector::params::influxdb_message_matcher, ticker_interval => 1, config => { - flush_interval => $lma_collector::params::influxdb_flush_interval, - flush_count => $lma_collector::params::influxdb_flush_count, + flush_interval => $flush_interval, + flush_count => $flush_count, tag_fields => join(sort(concat(['hostname'], $tag_fields)), ' '), time_precision => $time_precision, # FIXME(pasquier-s): provide the default_tenant_id & default_user_id diff --git a/deployment_scripts/puppet/modules/lma_collector/manifests/params.pp b/deployment_scripts/puppet/modules/lma_collector/manifests/params.pp index 2b527a3a9..9378c24eb 100644 --- a/deployment_scripts/puppet/modules/lma_collector/manifests/params.pp +++ b/deployment_scripts/puppet/modules/lma_collector/manifests/params.pp @@ -156,6 +156,9 @@ class lma_collector::params { $influxdb_port = '8086' $influxdb_timeout = 5 + $influxdb_flush_interval = 5 + # InfluxDB recommends a batch size of 5,000 points + $influxdb_flush_count = 5000 $influxdb_tag_fields = [] $influxdb_time_precision = 'ms' $influxdb_message_matcher = join([ diff --git a/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_influxdb_spec.rb b/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_influxdb_spec.rb index f526d6439..0d5779c90 100644 --- a/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_influxdb_spec.rb +++ b/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_influxdb_spec.rb @@ -20,8 +20,8 @@ describe 'lma_collector::influxdb' do end describe 'with mandatory parameters' do - let(:params) {{ :server => 'localhost', :user => 'lma', :password => - 'lma', :database => 'lma' }} + let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma', + :database => 'lma' }} it { is_expected.to contain_heka__output__http('influxdb') } it { is_expected.to contain_heka__encoder__payload('influxdb') } it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator') } @@ -29,13 +29,27 @@ describe 'lma_collector::influxdb' do end describe 'with tag_fields parameter' do - let(:params) {{ :server => 'localhost', :user => 'lma', :password => - 'lma', :database => 'lma', :tag_fields => ['foo', 'zzz'] }} + let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma', + :database => 'lma', :tag_fields => ['foo', 'zzz'] }} it { is_expected.to contain_heka__output__http('influxdb') } it { is_expected.to contain_heka__encoder__payload('influxdb') } it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({ - "tag_fields" => "foo hostname zzz", "flush_interval"=> :undef, - "flush_count"=> :undef, "time_precision" => "ms"}) } + "tag_fields" => "foo hostname zzz", "flush_interval"=> "5", + "flush_count"=> "5000", "time_precision" => "ms"}) } + it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') } + end + + describe 'with flush and precision parameters' do + let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma', + :database => 'lma', :flush_count => 1, :flush_interval => 2, + :time_precision => 's' + }} + it { is_expected.to contain_heka__output__http('influxdb') } + it { is_expected.to contain_heka__encoder__payload('influxdb') } + it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({ + "flush_interval"=> "2", "flush_count"=> "1", "time_precision" => "s", + "tag_fields" => "hostname" + }) } it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') } end end