Increase the Elasticsearch bulk size when required

In some environments (especially using slow HDD drives), the
Elasticsearch backends may fail to ingest logs fast enough. As a result
the log_collector service running on the controller nodes are blocked.

To alleviate this issue, this change increases the bulk size for nodes
that generate lots of logs:
- controllers which run OpenStack API services in addition to Pacemaker.
- all nodes when the environment's log level is set to debug.

In such cases, the flush_count parameter is increased to 100 (instead of
10 by default).

Change-Id: Ifdfbcb8ff0292f695dee4deab45560f126bde242
Closes-Bug: #1617211
(cherry picked from commit 83db24f549)
This commit is contained in:
Swann Croiset 2016-08-17 11:38:10 +02:00 committed by Simon Pasquier
parent c3106130a5
commit 55b1c6daa4
5 changed files with 41 additions and 4 deletions

View File

@ -300,9 +300,11 @@ if hiera('lma::collector::elasticsearch::server', false) {
}
class { 'lma_collector::elasticsearch':
server => hiera('lma::collector::elasticsearch::server'),
port => hiera('lma::collector::elasticsearch::rest_port'),
require => Class['lma_collector'],
server => hiera('lma::collector::elasticsearch::server'),
port => hiera('lma::collector::elasticsearch::rest_port'),
flush_interval => hiera('lma::collector::elasticsearch::flush_interval'),
flush_count => hiera('lma::collector::elasticsearch::flush_count'),
require => Class['lma_collector'],
}
if $is_mysql_server {

View File

@ -73,6 +73,16 @@ if ($plugin_data) {
$es_is_deployed = false
}
$es_flush_interval = 5
if $is_controller_node or hiera('debug', false) {
# Increase the flush count when debug level log is enabled or for
# controllers because OpenStack APIs + Pacemaker can generate many log
# messages.
$es_flush_count = 100
} else {
$es_flush_count = 10
}
# InfluxDB
$is_influxdb_node = roles_include(['influxdb_grafana', 'primary-influxdb_grafana'])
$influxdb_listen_address = get_network_role_property('influxdb_vip', 'ipaddr')
@ -179,6 +189,8 @@ lma::collector::monitor::mysql_password: <%= @mysql_password %>
<% if @es_is_deployed -%>
lma::collector::elasticsearch::server: <%= @es_server %>
lma::collector::elasticsearch::rest_port: 9200
lma::collector::elasticsearch::flush_interval: <%= @es_flush_interval %>
lma::collector::elasticsearch::flush_count: <%= @es_flush_count %>
<% if @is_elasticsearch_node -%>
lma::collector::elasticsearch::listen_address: <%= @es_listen_address %>
<% end -%>

View File

@ -488,6 +488,10 @@ Elasticsearch for indexing.
* `server`: *Required*. Elasticsearch server name. Valid options: a string.
* `port`: *Required*. Elasticsearch service port. Valid options: an integer.
* `flush_interval`: *Optional*. Interval at which accumulated messages should
be bulk indexed into Elasticsearch, in seconds. Default: `5`.
* `flush_count`: *Optional*. Number of messages that, if processed, will
trigger them to be bulk indexed into Elasticsearch. Default: `10`.
#### Class: `lma_collector::logs::keystone_wsgi`

View File

@ -15,7 +15,10 @@
class lma_collector::elasticsearch (
$server,
$port,
) inherits lma_collector::params {
$flush_interval = 5,
$flush_count = 10,
) {
include lma_collector::params
include lma_collector::service::log
validate_string($server)
@ -38,6 +41,8 @@ class lma_collector::elasticsearch (
max_buffer_size => $lma_collector::params::buffering_max_buffer_size_for_log,
max_file_size => $lma_collector::params::buffering_max_file_size_for_log,
queue_full_action => $lma_collector::params::queue_full_action_for_log,
flush_interval => $flush_interval,
flush_count => $flush_count,
require => Heka::Encoder::Es_json['elasticsearch'],
notify => Class['lma_collector::service::log'],
}

View File

@ -24,4 +24,18 @@ describe 'lma_collector::elasticsearch' do
it { is_expected.to contain_heka__output__elasticsearch('elasticsearch') }
it { is_expected.to contain_heka__encoder__es_json('elasticsearch') }
end
describe 'with localhost server and flush_* parameters' do
let(:params) {{ :server => 'localhost', :port => 9200,
:flush_interval => 10, :flush_count => 100,
}}
it {
is_expected.to contain_heka__output__elasticsearch('elasticsearch').with(
:flush_interval => 10,
:flush_count => 100,
:server => 'localhost',
:port => 9200,
)
}
end
end