Kafka support

- get kafka IPs
- configure ceilometer.conf
- configure hindsight configs
- deploy hindsight on Kafka nodes
- disable heka when Kafka detected

Change-Id: I3df403affab6f8427141c19a07a3312af7e92630
This commit is contained in:
Volodymyr Kornylyuk 2016-09-13 00:19:20 +03:00
parent 621d57e8a3
commit a14b61fcb1
6 changed files with 190 additions and 123 deletions

View File

@ -70,9 +70,27 @@ create_resources(package, $packages)
# Stop not needed any more service # Stop not needed any more service
service { 'ceilometer-collector': service { 'ceilometer-collector':
ensure => stopped, ensure => stopped,
enable => false, enable => false,
hasstatus => true, hasstatus => true,
}
# Kafka integration
if hiera('telemetry::kafka::enabled') {
ceilometer_config { 'oslo_messaging_kafka/consumer_group': value => 'ceilometer' }
$kafka_ips = hiera('telemetry::kafka::broker_list')
$kafka_url = "kafka://${kafka_ips}"
$rabbit_url = hiera('telemetry::rabbit::url')
ceilometer_config { 'DEFAULT/transport_url': value => $kafka_url }
ceilometer_config { 'notification/messaging_urls': value => [$kafka_url,$rabbit_url] }
ceilometer_config { 'oslo_messaging_notifications/transport_url': value => $kafka_url }
ceilometer_config { 'compute/resource_update_interval': value => 600 }
} }
# TODO validate values before proceed # TODO validate values before proceed
@ -85,13 +103,13 @@ ceilometer_config { 'notification/store_events': value => false }
service { 'ceilometer-service': service { 'ceilometer-service':
ensure => $service_ensure, ensure => $service_ensure,
name => $::ceilometer::params::api_service_name, name => $::ceilometer::params::api_service_name,
enable => $enabled, enable => $enabled,
hasstatus => true, hasstatus => true,
hasrestart => true, hasrestart => true,
tag => 'ceilometer-service', tag => 'ceilometer-service',
} }
Ceilometer_config<||> ~> Service['ceilometer-service'] Ceilometer_config<||> ~> Service['ceilometer-service']

View File

@ -1,118 +1,125 @@
## Get values notice('MODULAR: fuel-plugin-telemetry: heka.pp')
$config_dir = hiera('telemetry::heka::config_dir') if !hiera('telemetry::kafka::enabled') {
$amqp_url = hiera('telemetry::rabbit::url')
$metadata_fields = hiera('telemetry::metadata_fields')
if hiera('telemetry::elasticsearch::server',false) { ## Get values
$ip = hiera('telemetry::elasticsearch::server')
$port = hiera('telemetry::elasticsearch::server')
$elasticsearch_url = "http://${ip}:${port}"
} else {
#no Elasticsearch
#heka failed to start if url schemma is not valid, so we set http here
$elasticsearch_url = 'http://'
}
$influxdb_address = hiera('telemetry::influxdb::address') $config_dir = hiera('telemetry::heka::config_dir')
$influxdb_port = hiera('telemetry::influxdb::port') $amqp_url = hiera('telemetry::rabbit::url')
$influxdb_database = hiera('telemetry::influxdb::database') $metadata_fields = hiera('telemetry::metadata_fields')
$influxdb_user = hiera('telemetry::influxdb::user')
$influxdb_password = hiera('telemetry::influxdb::password')
$modules_dir = hiera('telemetry::lua::modules_dir')
### Heka configuration if hiera('telemetry::elasticsearch::server',false) {
$ip = hiera('telemetry::elasticsearch::server')
$port = hiera('telemetry::elasticsearch::server')
$elasticsearch_url = "http://${ip}:${port}"
} else {
#no Elasticsearch
#heka failed to start if url schemma is not valid, so we set http here
$elasticsearch_url = 'http://'
}
File { $influxdb_address = hiera('telemetry::influxdb::address')
before => Service['telemetry-collector'] $influxdb_port = hiera('telemetry::influxdb::port')
} $influxdb_database = hiera('telemetry::influxdb::database')
$influxdb_user = hiera('telemetry::influxdb::user')
$influxdb_password = hiera('telemetry::influxdb::password')
file { $modules_dir = hiera('telemetry::lua::modules_dir')
"${config_dir}/amqp-openstack_sample.toml": content => template( 'telemetry/heka/amqp-openstack_sample.toml.erb' );
"${config_dir}/decoder-sample.toml": content => template( 'telemetry/heka/decoder-sample.toml.erb' );
"${config_dir}/encoder-influxdb.toml": content => template( 'telemetry/heka/encoder-influxdb.toml.erb' );
"${config_dir}/encoder-resource-elasticsearch.toml": content => template( 'telemetry/heka/encoder-resource-elasticsearch.toml.erb' );
"${config_dir}/file-output-resource.toml": content => template( 'telemetry/heka/file-output-resource.toml.erb' );
"${config_dir}/file-output.toml": content => template( 'telemetry/heka/file-output.toml.erb' );
"${config_dir}/filter-influxdb_accumulator_sample.toml": content => template( 'telemetry/heka/filter-influxdb_accumulator_sample.toml.erb' );
# TODO disable config when Elasticsearch not in use
"${config_dir}/output-elasticsearch-resource2.toml": content => template( 'telemetry/heka/output-elasticsearch-resource2.toml.erb' );
"${config_dir}/output-influxdb-samples.toml": content => template( 'telemetry/heka/output-influxdb-samples.toml.erb' );
}
# Heka Installation ### Heka configuration
$version = hiera('telemetry::heka::version') File {
$max_message_size = hiera('telemetry::heka::max_message_size') before => Service['telemetry-collector']
$max_process_inject = hiera('telemetry::heka::max_process_inject') }
$max_timer_inject = hiera('telemetry::heka::max_timer_inject')
$poolsize = hiera('telemetry::heka::poolsize')
# TODO we dont't need them on controller file {
$install_init_script = false "${config_dir}/amqp-openstack_sample.toml": content => template( 'telemetry/heka/amqp-openstack_sample.toml.erb' );
"${config_dir}/decoder-sample.toml": content => template( 'telemetry/heka/decoder-sample.toml.erb' );
"${config_dir}/encoder-influxdb.toml": content => template( 'telemetry/heka/encoder-influxdb.toml.erb' );
"${config_dir}/encoder-resource-elasticsearch.toml": content => template( 'telemetry/heka/encoder-resource-elasticsearch.toml.erb' );
"${config_dir}/file-output-resource.toml": content => template( 'telemetry/heka/file-output-resource.toml.erb' );
"${config_dir}/file-output.toml": content => template( 'telemetry/heka/file-output.toml.erb' );
"${config_dir}/filter-influxdb_accumulator_sample.toml": content => template( 'telemetry/heka/filter-influxdb_accumulator_sample.toml.erb' );
# TODO disable config when Elasticsearch not in use
"${config_dir}/output-elasticsearch-resource2.toml": content => template( 'telemetry/heka/output-elasticsearch-resource2.toml.erb' );
"${config_dir}/output-influxdb-samples.toml": content => template( 'telemetry/heka/output-influxdb-samples.toml.erb' );
}
# Workaround for heka module # Heka Installation
# to prevent changing home of root user
$user='root'
user { $user: }
::heka { 'telemetry-collector': $version = hiera('telemetry::heka::version')
config_dir => '/etc/telemetry-collector', $max_message_size = hiera('telemetry::heka::max_message_size')
user => $user, $max_process_inject = hiera('telemetry::heka::max_process_inject')
#additional_groups => $additional_groups, $max_timer_inject = hiera('telemetry::heka::max_timer_inject')
hostname => $::hostname, $poolsize = hiera('telemetry::heka::poolsize')
max_message_size => $max_message_size,
max_process_inject => $max_process_inject,
max_timer_inject => $max_timer_inject,
poolsize => $poolsize,
install_init_script => $install_init_script,
version => $version,
}
# Heka pacemaker config # TODO we dont't need them on controller
$install_init_script = false
pacemaker::service { 'telemetry-collector': # Workaround for heka module
ensure => present, # to prevent changing home of root user
prefix => false, $user='root'
primitive_class => 'ocf', user { $user: }
primitive_type => 'ocf-telemetry',
use_handler => false, ::heka { 'telemetry-collector':
complex_type => 'clone', config_dir => '/etc/telemetry-collector',
complex_metadata => { user => $user,
# the resource should start as soon as the dependent resources #additional_groups => $additional_groups,
# (eg RabbitMQ) are running *locally* hostname => $::hostname,
'interleave' => true, max_message_size => $max_message_size,
}, max_process_inject => $max_process_inject,
metadata => { max_timer_inject => $max_timer_inject,
# Make sure that Pacemaker tries to restart the resource if it fails poolsize => $poolsize,
# too many times install_init_script => $install_init_script,
'failure-timeout' => '120s', version => $version,
'migration-threshold' => '3', }
},
parameters => { # Heka pacemaker config
'service_name' => 'telemetry-collector',
'config' => '/etc/telemetry-collector', pacemaker::service { 'telemetry-collector':
'log_file' => '/var/log/telemetry-collector.log', ensure => present,
'user' => $user, prefix => false,
}, primitive_class => 'ocf',
operations => { primitive_type => 'ocf-telemetry',
'monitor' => { use_handler => false,
'interval' => '20', complex_type => 'clone',
'timeout' => '10', complex_metadata => {
# the resource should start as soon as the dependent resources
# (eg RabbitMQ) are running *locally*
'interleave' => true,
}, },
'start' => { metadata => {
'timeout' => '30', # Make sure that Pacemaker tries to restart the resource if it fails
# too many times
'failure-timeout' => '120s',
'migration-threshold' => '3',
}, },
'stop' => { parameters => {
'timeout' => '30', 'service_name' => 'telemetry-collector',
'config' => '/etc/telemetry-collector',
'log_file' => '/var/log/telemetry-collector.log',
'user' => $user,
}, },
}, operations => {
} 'monitor' => {
'interval' => '20',
'timeout' => '10',
},
'start' => {
'timeout' => '30',
},
'stop' => {
'timeout' => '30',
},
},
}
service { 'telemetry-collector':
ensure => 'running',
enable => true,
provider => 'pacemaker',
}
service { 'telemetry-collector':
ensure => 'running',
enable => true,
provider => 'pacemaker',
} }

View File

@ -98,7 +98,8 @@ if $telemetry['influxdb_address'] {
])) ]))
} }
# TODO test for multiple inxlixdb nodes !!! # TODO test for multiple influxdb nodes !!!
# TODO use vip?
$influxdb_address = $nodes_array[0]['network_roles']['management'] $influxdb_address = $nodes_array[0]['network_roles']['management']
$retention_period = $influxdb_grafana['retention_period'] $retention_period = $influxdb_grafana['retention_period']
@ -112,17 +113,34 @@ if $telemetry['influxdb_address'] {
# Rabbit # Rabbit
$rabbit_info = hiera('rabbit') $rabbit_info = hiera('rabbit')
$rabbit_password = $rabbit_info['password'] $rabbit_password = $rabbit_info['password']
$rabbit_user = $rabbit_info['user'] $rabbit_user = $rabbit_info['user']
# TODO take one? # TODO take one?
$amqp_host = hiera('amqp_hosts') $amqp_host = hiera('amqp_hosts')
$amqp_url = "amqp://${rabbit_user}:${rabbit_password}@${amqp_host}/" $amqp_url = "amqp://${rabbit_user}:${rabbit_password}@${amqp_host}/"
$metadata_fields = join(['status deleted container_format min_ram updated_at ', $metadata_fields = join(['status deleted container_format min_ram updated_at ',
'min_disk is_public size checksum created_at disk_format protected instance_host ', 'min_disk is_public size checksum created_at disk_format protected instance_host ',
'host display_name instance_id instance_type status state']) 'host display_name instance_id instance_type status state'])
# Kafka
$kafka_port = 9092
$kafka_nodes = get_nodes_hash_by_roles($network_metadata, ['kafka', 'primary-kafka'])
$kafka_ip_map = get_node_to_ipaddr_map_by_network_role($kafka_nodes, 'management')
if count($kafka_ip_map)>0 {
notice('Kafka nodes found')
$kafka_enabled = true
$kafka_ips = sort(values($kafka_ip_map))
# Format: host:port,host:port for ceiolmeter.conf
$tmp_list = join($kafka_ips,":${kafka_port},")
$broker_list = join([$tmp_list,":${kafka_port}"])
} else {
notice('No Kafka nodes found')
$kafka_enabled = false
}
$calculated_content = inline_template(' $calculated_content = inline_template('
--- ---
ceilometer: ceilometer:
@ -161,6 +179,16 @@ telemetry::heka::max_timer_inject: 10
telemetry::heka::poolsize: 100 telemetry::heka::poolsize: 100
telemetry::heka::config_dir: "/etc/telemetry-collector" telemetry::heka::config_dir: "/etc/telemetry-collector"
<% if @kafka_enabled -%>
telemetry::kafka::broker_list: "<%= @broker_list %>"
telemetry::kafka::nodes_list:
<% @kafka_ips.each do |s| -%>
- "<%= s %>"
<% end -%>
<% end -%>
telemetry::kafka::enabled: <%= @kafka_enabled %>
telemetry::kafka::port: <%= @kafka_port %>
telemetry::rabbit::url: "<%= @amqp_url %>" telemetry::rabbit::url: "<%= @amqp_url %>"
telemetry::metadata_fields: "<%= @metadata_fields %>" telemetry::metadata_fields: "<%= @metadata_fields %>"

View File

@ -12,8 +12,9 @@ $metadata_fields = hiera('telemetry::metadata_fields')
# TODO settings/hiera # TODO settings/hiera
$topics = 'metering.sample' $topics = 'metering.sample'
#TODO kafka integration # Kafka integration
$brokerlist = '"broker1:9092"' $brokerlist = hiera('telemetry::kafka::nodes_list')
$kafka_port = hiera('telemetry::kafka::port')
# Install packages # Install packages
@ -23,6 +24,12 @@ package { 'hindsight': }
package { 'librdkafka1': } package { 'librdkafka1': }
package { 'lua-sandbox-extensions': } package { 'lua-sandbox-extensions': }
package { 'python-oslo.messaging': } package { 'python-oslo.messaging': }
package { 'python-pip': }
package { 'kafka-python':
ensure => '1.2.5',
provider => 'pip'
}
# User/group # User/group
@ -81,12 +88,13 @@ file { '/etc/telemetry_hindsight/hindsight.cfg':
} }
# Templates # Templates
# TODO unhardkode kafka port
$configs = { $configs = {
"${run_dir}/output/influxdb_ceilometer.cfg" => { "${run_dir}/output/influxdb_ceilometer.cfg" => {
content => template( "${templates}/output/influxdb_ceilometer.cfg.erb"), content => template( "${templates}/output/influxdb_ceilometer.cfg.erb"),
}, },
"${run_dir}/input/ceilometer_kafka.cfg" => { "${run_dir}/input/kafka_input.cfg" => {
content => template( "${templates}/input/kafka_input.cfg.erb"), content => template( "${templates}/input/kafka_input.cfg.erb"),
} }
} }

View File

@ -1,9 +1,13 @@
filename = "kafka_input.lua" filename = "kafka_input.lua"
brokerlist = <%= @brokerlist %> <%
brokers = @brokerlist.join(":#{@kafka_port}\", \"")
brokers = '"'+brokers+":#{@kafka_port}\""
-%>
brokerlist = {<%= brokers %>}
topics = {"<%= @topics %>"} topics = {"<%= @topics %>"}
consumer_conf = { consumer_conf = {
["group.id"] = "telemetry_collector", ["group.id"] = "telemetry_collector",
["message.max.bytes"] = 16024, ["message.max.bytes"] = 16024,
} }
topic_conf = {} topic_conf = {}
metadata_fields = "<%= @metadata_fields %>" metadata_fields = "<%= @metadata_fields %>"

View File

@ -197,7 +197,7 @@
type: puppet type: puppet
version: 2.1.0 version: 2.1.0
groups: [primary-controller, controller] groups: [primary-controller, controller]
requires: [post_deployment_start,telemetry-ceilometer-controller] requires: [post_deployment_start,telemetry-hiera,telemetry-ceilometer-controller]
required_for: [post_deployment_end] required_for: [post_deployment_end]
# required_for: [deploy_end] # required_for: [deploy_end]
# requires: [deploy_start, telemetry-ceilometer-controller] # requires: [deploy_start, telemetry-ceilometer-controller]
@ -233,8 +233,9 @@
type: puppet type: puppet
version: 2.1.0 version: 2.1.0
groups: [primary-controller, controller] groups: [primary-controller, controller]
role: [primary-controller, controller, kafka, primary-kafka]
required_for: [post_deployment_end] required_for: [post_deployment_end]
requires: [telemetry-influxdb-create-db] requires: [telemetry-hiera, telemetry-influxdb-create-db]
parameters: parameters:
puppet_manifest: puppet/manifests/lua-scripts.pp puppet_manifest: puppet/manifests/lua-scripts.pp
puppet_modules: puppet/modules:/etc/puppet/modules puppet_modules: puppet/modules:/etc/puppet/modules
@ -245,7 +246,7 @@
version: 2.1.0 version: 2.1.0
groups: [primary-controller, controller] groups: [primary-controller, controller]
required_for: [post_deployment_end] required_for: [post_deployment_end]
requires: [telemetry-lua-scripts] requires: [telemetry-influxdb-create-db]
parameters: parameters:
puppet_manifest: puppet/manifests/heka.pp puppet_manifest: puppet/manifests/heka.pp
puppet_modules: puppet/modules:/etc/puppet/modules puppet_modules: puppet/modules:/etc/puppet/modules
@ -256,7 +257,8 @@
version: 2.1.0 version: 2.1.0
groups: [primary-controller, controller] groups: [primary-controller, controller]
required_for: [post_deployment_end] required_for: [post_deployment_end]
requires: [telemetry-heka] role: [primary-kafka, kafka]
requires: [telemetry-influxdb-create-db, telemetry-hiera]
parameters: parameters:
puppet_manifest: puppet/manifests/hindsight.pp puppet_manifest: puppet/manifests/hindsight.pp
puppet_modules: puppet/modules:/etc/puppet/modules puppet_modules: puppet/modules:/etc/puppet/modules