diff --git a/deployment_scripts/puppet/modules/lma_collector/files/plugins/decoders/collectd.lua b/deployment_scripts/puppet/modules/lma_collector/files/plugins/decoders/collectd.lua index 6427110a8..952788bf6 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/plugins/decoders/collectd.lua +++ b/deployment_scripts/puppet/modules/lma_collector/files/plugins/decoders/collectd.lua @@ -31,6 +31,28 @@ local processes_map = { ps_vm = 'memory_virtual', } +-- The following table keeps a list of metrics from plugin where the +-- hostname is not relevant. +local hostname_free = { + -- Add "metric_source = true" to skip the hostname for all metrics + -- from the metric_source + -- Add "metric_source = { list of metrics } to skip hostname for a + -- subset of metrics. The list of metrics is referenced through the + -- field 'type_instance'. + hypervisor_stats = { + total_free_disk_GB = true, + total_free_ram_MB = true, + total_free_vcpus = true, + total_used_disk_GB = true, + total_used_ram_MB = true, + total_used_vcpus = true, + total_running_instances = true, + total_running_tasks = true, + }, + check_openstack_api = true, + http_check = true, +} + -- this is needed for the libvirt metrics because in that case, collectd sends -- the instance's ID instead of the hostname in the 'host' attribute local hostname = read_config('hostname') or error('hostname must be specified') @@ -70,11 +92,11 @@ function process_message () Severity = 6, Type = "metric", Fields = { - hostname = sample['host'], interval = sample['interval'], source = metric_source, type = sample['dstypes'][i], value = value, + tag_fields = {}, } } @@ -82,6 +104,20 @@ function process_message () -- always consistent on metric namespaces so we need a few if/else -- statements to cover all cases. + -- Check if hostname is needed or not + local add_hostname = true + if hostname_free[metric_source] == true then + add_hostname = false + elseif hostname_free[metric_source] and + hostname_free[metric_source][sample['type_instance']] then + add_hostname = false + end + + if add_hostname then + msg['Fields']['hostname'] = sample['host'] + table.insert(msg['Fields']['tag_fields'], 'hostname') + end + if sample['meta'] and sample['meta']['service_check'] then msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check' msg['Fields']['details'] = sample['meta']['failure'] @@ -106,18 +142,17 @@ function process_message () msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance'] msg['Fields']['fs'] = mount - msg['Fields']['tag_fields'] = { 'fs' } + table.insert(msg['Fields']['tag_fields'], 'fs') elseif metric_source == 'disk' then msg['Fields']['name'] = metric_name msg['Fields']['device'] = sample['plugin_instance'] - msg['Fields']['tag_fields'] = { 'device' } + table.insert(msg['Fields']['tag_fields'], 'device') elseif metric_source == 'cpu' then msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance'] msg['Fields']['cpu_number'] = sample['plugin_instance'] - msg['Fields']['tag_fields'] = { 'cpu_number' } + table.insert(msg['Fields']['tag_fields'], 'cpu_number') elseif metric_source == 'netlink' then local netlink_metric = sample['type'] - if netlink_metric == 'if_rx_errors' then netlink_metric = 'if_errors_rx' elseif netlink_metric == 'if_tx_errors' then @@ -134,12 +169,12 @@ function process_message () end msg['Fields']['name'] = netlink_metric msg['Fields']['interface'] = sample['plugin_instance'] - msg['Fields']['tag_fields'] = { 'interface' } + table.insert(msg['Fields']['tag_fields'], 'interface') elseif metric_source == 'processes' then if processes_map[sample['type']] then -- metrics related to a specific process msg['Fields']['service'] = sample['plugin_instance'] - msg['Fields']['tag_fields'] = { 'service' } + table.insert(msg['Fields']['tag_fields'], 'service') msg['Fields']['name'] = 'lma_components' if processes_map[sample['type']] ~= '' then msg['Fields']['name'] = msg['Fields']['name'] .. sep .. processes_map[sample['type']] @@ -160,7 +195,7 @@ function process_message () if sample['type'] == 'ps_state' then msg['Fields']['name'] = msg['Fields']['name'] .. sep .. 'count' msg['Fields']['state'] = sample['type_instance'] - msg['Fields']['tag_fields'] = { 'state' } + table.insert(msg['Fields']['tag_fields'], 'state') else msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type'] end @@ -173,11 +208,11 @@ function process_message () elseif sample['type'] == 'mysql_commands' then msg['Fields']['name'] = sample['type'] msg['Fields']['statement'] = sample['type_instance'] - msg['Fields']['tag_fields'] = { 'statement' } + table.insert(msg['Fields']['tag_fields'], 'statement') elseif sample['type'] == 'mysql_handler' then msg['Fields']['name'] = sample['type'] msg['Fields']['handler'] = sample['type_instance'] - msg['Fields']['tag_fields'] = { 'handler' } + table.insert(msg['Fields']['tag_fields'], 'handler') else msg['Fields']['name'] = metric_name end @@ -185,7 +220,7 @@ function process_message () -- For OpenStack API metrics, plugin_instance = msg['Fields']['name'] = 'openstack_check_api' msg['Fields']['service'] = sample['plugin_instance'] - msg['Fields']['tag_fields'] = { 'service' } + table.insert(msg['Fields']['tag_fields'], 'service') if sample['meta'] then msg['Fields']['os_region'] = sample['meta']['region'] end @@ -205,49 +240,52 @@ function process_message () msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance'] if sample['meta'] and sample['meta']['queue'] then msg['Fields']['queue'] = sample['meta']['queue'] - msg['Fields']['tag_fields'] = { 'queue' } + table.insert(msg['Fields']['tag_fields'], 'queue') end elseif metric_source == 'nova' then if sample['plugin_instance'] == 'nova_services' or sample['plugin_instance'] == 'nova_service' then msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance'] - msg['Fields']['tag_fields'] = { 'service', 'state' } msg['Fields']['service'] = sample['meta']['service'] msg['Fields']['state'] = sample['meta']['state'] + table.insert(msg['Fields']['tag_fields'], 'service') + table.insert(msg['Fields']['tag_fields'], 'state') if sample['plugin_instance'] == 'nova_service' then msg['Fields']['hostname'] = sample['meta']['host'] end else msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance']) - msg['Fields']['tag_fields'] = { 'state' } msg['Fields']['state'] = sample['type_instance'] + table.insert(msg['Fields']['tag_fields'], 'state') end elseif metric_source == 'cinder' then if sample['plugin_instance'] == 'cinder_services' or sample['plugin_instance'] == 'cinder_service' then msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance'] - msg['Fields']['tag_fields'] = { 'service', 'state' } msg['Fields']['service'] = sample['meta']['service'] msg['Fields']['state'] = sample['meta']['state'] + table.insert(msg['Fields']['tag_fields'], 'service') + table.insert(msg['Fields']['tag_fields'], 'state') if sample['plugin_instance'] == 'cinder_service' then msg['Fields']['hostname'] = sample['meta']['host'] end else msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance']) - msg['Fields']['tag_fields'] = { 'state' } msg['Fields']['state'] = sample['type_instance'] + table.insert(msg['Fields']['tag_fields'], 'state') end elseif metric_source == 'glance' then msg['Fields']['name'] = 'openstack' .. sep .. 'glance' .. sep .. sample['type_instance'] - msg['Fields']['tag_fields'] = { 'state', 'visibility' } msg['Fields']['state'] = sample['meta']['status'] msg['Fields']['visibility'] = sample['meta']['visibility'] + table.insert(msg['Fields']['tag_fields'], 'state') + table.insert(msg['Fields']['tag_fields'], 'visibility') elseif metric_source == 'keystone' then - msg['Fields']['name'] = 'openstack' .. sep .. 'keystone' .. sep .. sample['type_instance'] - if sample['meta']['state'] then - msg['Fields']['tag_fields'] = { 'state' } - msg['Fields']['state'] = sample['meta']['state'] - end + msg['Fields']['name'] = 'openstack' .. sep .. 'keystone' .. sep .. sample['type_instance'] + if sample['meta']['state'] then + msg['Fields']['state'] = sample['meta']['state'] + table.insert(msg['Fields']['tag_fields'], 'state') + end elseif metric_source == 'neutron' then if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then skip_it = true @@ -256,23 +294,25 @@ function process_message () elseif sample['type_instance'] == 'neutron_agents' or sample['type_instance'] == 'neutron_agent' then msg['Fields']['name'] = 'openstack_' .. sample['type_instance'] - msg['Fields']['tag_fields'] = { 'service', 'state' } msg['Fields']['service'] = sample['meta']['service'] msg['Fields']['state'] = sample['meta']['state'] + table.insert(msg['Fields']['tag_fields'], 'service') + table.insert(msg['Fields']['tag_fields'], 'state') if sample['type_instance'] == 'neutron_agent' then msg['Fields']['hostname'] = sample['meta']['host'] end elseif string.match(sample['type_instance'], '^ports') then local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$') msg['Fields']['name'] = 'openstack' .. sep .. 'neutron' .. sep .. replace_dot_by_sep(resource) - msg['Fields']['tag_fields'] = { 'owner', 'state' } msg['Fields']['owner'] = owner msg['Fields']['state'] = state + table.insert(msg['Fields']['tag_fields'], 'owner') + table.insert(msg['Fields']['tag_fields'], 'state') else local resource, state = string.match(sample['type_instance'], '^([^.]+)%.(.+)$') msg['Fields']['name'] = 'openstack' .. sep .. 'neutron' .. sep .. replace_dot_by_sep(resource) - msg['Fields']['tag_fields'] = { 'state' } msg['Fields']['state'] = state + table.insert(msg['Fields']['tag_fields'], 'state') end elseif metric_source == 'memcached' then msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '') @@ -280,8 +320,8 @@ function process_message () msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance'] if sample['meta'] then if sample['meta']['backend'] then - msg['Fields']['tag_fields'] = { 'backend' } msg['Fields']['backend'] = sample['meta']['backend'] + table.insert(msg['Fields']['tag_fields'], 'backend') if sample['meta']['state'] then msg['Fields']['state'] = sample['meta']['state'] table.insert(msg['Fields']['tag_fields'], 'state') @@ -291,8 +331,8 @@ function process_message () table.insert(msg['Fields']['tag_fields'], 'server') end elseif sample['meta']['frontend'] then - msg['Fields']['tag_fields'] = { 'frontend' } msg['Fields']['frontend'] = sample['meta']['frontend'] + table.insert(msg['Fields']['tag_fields'], 'frontend') end end elseif metric_source == 'apache' then @@ -301,17 +341,18 @@ function process_message () elseif metric_source == 'ceph_osd_perf' then msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type'] - msg['Fields']['tag_fields'] = { 'cluster', 'osd' } msg['Fields']['cluster'] = sample['plugin_instance'] msg['Fields']['osd'] = sample['type_instance'] + table.insert(msg['Fields']['tag_fields'], 'cluster') + table.insert(msg['Fields']['tag_fields'], 'osd') elseif metric_source:match('^ceph') then msg['Fields']['name'] = 'ceph' .. sep .. sample['type'] if sample['dsnames'][i] ~= 'value' then msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i] end - msg['Fields']['tag_fields'] = { 'cluster' } msg['Fields']['cluster'] = sample['plugin_instance'] + table.insert(msg['Fields']['tag_fields'], 'cluster') if sample['type_instance'] ~= '' then local additional_tag @@ -323,51 +364,48 @@ function process_message () additional_tag = 'osd' end if additional_tag then - msg['Fields']['tag_fields'][2] = additional_tag msg['Fields'][additional_tag] = sample['type_instance'] + table.insert(msg['Fields']['tag_fields'], additional_tag) end end elseif metric_source == 'pacemaker' then + if sample['meta'] and sample['meta']['host'] then + msg['Fields']['hostname'] = sample['meta']['host'] + end + msg['Fields']['name'] = metric_source .. sep .. sample['type_instance'] -- add dimension fields - local t = {} for _, v in ipairs({'status', 'resource'}) do if sample['meta'] and sample['meta'][v] then - t[#t+1] = v msg['Fields'][v] = sample['meta'][v] + table.insert(msg['Fields']['tag_fields'], v) end end - if #t > 0 then - msg['Fields']['tag_fields'] = t - end - - if sample['meta'] and sample['meta']['host'] then - msg['Fields']['hostname'] = sample['meta']['host'] - end elseif metric_source == 'users' then -- 'users' is a reserved name for InfluxDB v0.9 msg['Fields']['name'] = 'logged_users' elseif metric_source == 'libvirt' then -- collectd sends the instance's ID in the 'host' field msg['Fields']['instance_id'] = sample['host'] - msg['Fields']['tag_fields'] = { 'instance_id' } + table.insert(msg['Fields']['tag_fields'], 'instance_id') msg['Fields']['hostname'] = hostname msg['Hostname'] = hostname + if string.match(sample['type'], '^disk_') then msg['Fields']['name'] = 'virt' .. sep .. sample['type'] .. sep .. sample['dsnames'][i] msg['Fields']['device'] = sample['type_instance'] - msg['Fields']['tag_fields'][2] = 'device' + table.insert(msg['Fields']['tag_fields'], 'device') elseif string.match(sample['type'], '^if_') then msg['Fields']['name'] = 'virt' .. sep .. sample['type'] .. sep .. sample['dsnames'][i] msg['Fields']['interface'] = sample['type_instance'] - msg['Fields']['tag_fields'][2] = 'interface' + table.insert(msg['Fields']['tag_fields'], 'interface') elseif sample['type'] == 'virt_cpu_total' then msg['Fields']['name'] = 'virt_cpu_time' elseif sample['type'] == 'virt_vcpu' then msg['Fields']['name'] = 'virt_vcpu_time' msg['Fields']['vcpu_number'] = sample['type_instance'] - msg['Fields']['tag_fields'][2] = 'vcpu_number' + table.insert(msg['Fields']['tag_fields'], 'vcpu_number') else msg['Fields']['name'] = 'virt' .. sep .. metric_name end @@ -376,13 +414,18 @@ function process_message () elseif metric_source == 'http_check' then msg['Fields']['name'] = metric_source msg['Fields']['service'] = sample['type_instance'] - msg['Fields']['tag_fields'] = { 'service' } + table.insert(msg['Fields']['tag_fields'], 'service') else msg['Fields']['name'] = replace_dot_by_sep(metric_name) end if not skip_it then utils.inject_tags(msg) + -- Before injecting the message we need to check that tag_fields is not an + -- empty table otherwise the protobuf encoder fails to encode the table. + if #msg['Fields']['tag_fields'] == 0 then + msg['Fields']['tag_fields'] = nil + end utils.safe_inject_message(msg) if metric_source == 'swap' and metric_name == 'swap_used' and swap_size > 0 then -- collectd 5.4.0 doesn't report the used swap in diff --git a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/http_metrics_aggregator.lua b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/http_metrics_aggregator.lua index af28f2aac..1756a5998 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/http_metrics_aggregator.lua +++ b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/http_metrics_aggregator.lua @@ -160,7 +160,7 @@ function timer_event(ns) end end bucket.times = nil - utils.add_to_bulk_metric(metric_name, bucket, {http_method=method, http_status=status}) + utils.add_to_bulk_metric(metric_name, bucket, {hostname=hostname, http_method=method, http_status=status}) all_times[service][method][status] = nil num = num + 1 num_metrics = num_metrics - 1 diff --git a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/instance_state.lua b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/instance_state.lua index 49cb8f9fe..9354267da 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/instance_state.lua +++ b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/instance_state.lua @@ -40,7 +40,7 @@ function process_message () tenant_id = read_message("Fields[tenant_id]"), user_id = read_message("Fields[user_id]"), state = state, - tag_fields = { 'state' }, + tag_fields = { 'state', 'hostname' }, } utils.inject_tags(msg) diff --git a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/logs_counter.lua b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/logs_counter.lua index 64c2ce80d..7da74e913 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/logs_counter.lua +++ b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/logs_counter.lua @@ -106,7 +106,7 @@ function timer_event(ns) service = service_name, level = string.lower(level), hostname = hostname, - tag_fields = {'service', 'level'}, + tag_fields = {'service', 'level', 'hostname'}, } utils.inject_tags(msg) diff --git a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/resource_creation_time.lua b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/resource_creation_time.lua index 8200782c8..0047ed3ee 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/resource_creation_time.lua +++ b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/resource_creation_time.lua @@ -74,6 +74,7 @@ function process_message () value = {value = math.floor((completed_at - started_at)/1e6 + 0.5) / 1e3, representation = 's'}, tenant_id = read_message("Fields[tenant_id]"), user_id = read_message("Fields[user_id]"), + tag_fields = {'hostname'}, } utils.inject_tags(msg) diff --git a/deployment_scripts/puppet/modules/lma_collector/manifests/influxdb.pp b/deployment_scripts/puppet/modules/lma_collector/manifests/influxdb.pp index 399ea1001..37a3dab07 100644 --- a/deployment_scripts/puppet/modules/lma_collector/manifests/influxdb.pp +++ b/deployment_scripts/puppet/modules/lma_collector/manifests/influxdb.pp @@ -37,7 +37,7 @@ class lma_collector::influxdb ( config => { flush_interval => $lma_collector::params::influxdb_flush_interval, flush_count => $lma_collector::params::influxdb_flush_count, - tag_fields => join(sort(concat(['hostname'], $tag_fields)), ' '), + tag_fields => join(sort($tag_fields), ' '), time_precision => $time_precision, # FIXME(pasquier-s): provide the default_tenant_id & default_user_id # parameters but this requires to request Keystone since we only have diff --git a/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_influxdb_spec.rb b/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_influxdb_spec.rb index 0b3006d76..f197a2936 100644 --- a/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_influxdb_spec.rb +++ b/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_influxdb_spec.rb @@ -35,7 +35,7 @@ describe 'lma_collector::influxdb' do it { is_expected.to contain_heka__output__http('influxdb') } it { is_expected.to contain_heka__encoder__payload('influxdb') } it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({ - "tag_fields" => "foo hostname zzz", "flush_interval"=> :undef, + "tag_fields" => "foo zzz", "flush_interval"=> :undef, "flush_count"=> :undef, "time_precision" => "ms"}) } it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') } end