# Copyright 2016 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. input { kafka { bootstrap_servers => "%KAFKA_SERVICE_HOST%:%KAFKA_SERVICE_PORT%" topics => ["transformed-log"] group_id => "log-metric" client_id => "monasca_log_metrics" consumer_threads => 4 codec => json } } filter { # drop logs that have not set log level if ![log][level] { drop { periodic_flush => true } } else { ruby { code => " log_level = event.get('[log][level]').downcase event.set('[log][level]', log_level) " } } # drop logs with log level not in warning,error if [log][level] not in [warning,error] { drop { periodic_flush => true } } ruby { code => " log_level = event.get('[log][level]').downcase log_ts = Time.now.to_f * 1000.0 # metric name metric_name = 'log.%s' % log_level # build metric metric = {} metric['name'] = metric_name metric['timestamp'] = log_ts metric['value'] = 1 metric['dimensions'] = event.get('[log][dimensions]') metric['value_meta'] = {} event.set('[metric]',metric.to_hash) " } mutate { remove_field => ["log", "@version", "@timestamp", "log_level_original", "tags"] } } output { kafka { bootstrap_servers => "%KAFKA_SERVICE_HOST%:%KAFKA_SERVICE_PORT%" topic_id => "metrics" client_id => "monasca_log_metrics" compression_type => "none" codec => json } }