Fix issues in Lua modules
Fixed: - Using table for brokerlist instead of string - Importing elasticsearch module in encoder - Importing influx module instead of influxdb Change-Id: Iea0aa51f9c7ac6a72e045676618475db81a0310b
This commit is contained in:
parent
fca0513f0b
commit
93695c158c
|
@ -14,7 +14,6 @@
|
|||
require "string"
|
||||
require "cjson"
|
||||
local utils = require "lma_utils"
|
||||
local elasticsearch = require "elasticsearch"
|
||||
|
||||
local index = read_config("index") or "index"
|
||||
local type_name = read_config("type_name") or "source"
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
require "string"
|
||||
require "cjson"
|
||||
local utils = require "lma_utils"
|
||||
local elasticsearch = require "elasticsearch"
|
||||
local encoder_module = read_config("encoder") or error("Encoder should be defined")
|
||||
|
||||
local encode = require(encoder_module).encode
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
require "kafka"
|
||||
require "table"
|
||||
local util = require "lma_utils"
|
||||
|
||||
local brokerlist = read_config("brokerlist") or error("brokerlist must be set")
|
||||
|
@ -20,6 +21,10 @@ if decoder_module then
|
|||
end
|
||||
end
|
||||
|
||||
if type(brokerlist) == "table" then
|
||||
brokerlist = table.concat(brokerlist, ",")
|
||||
end
|
||||
|
||||
local consumer = kafka.consumer(brokerlist, topics, consumer_conf, topic_conf)
|
||||
|
||||
local err_msg = {
|
||||
|
|
|
@ -19,15 +19,6 @@ flush_count = 50000
|
|||
flush_on_shutdown = false
|
||||
preserve_data = not flush_on_shutdown --in most cases this should be the inverse of flush_on_shutdown
|
||||
discard_on_error = false
|
||||
|
||||
-- See the elasticsearch module directory for the various encoders and configuration documentation.
|
||||
encoder_module = "heka.elasticsearch.moz_telemetry"
|
||||
encoder_cfg = {
|
||||
es_index_from_timestamp = true,
|
||||
index = "%{Logger}-%{%Y.%m.%d}",
|
||||
type_name = "%{Type}-%{Hostname}",
|
||||
fields = {"Fields[request]", "Fields[http_user_agent]"},
|
||||
}
|
||||
```
|
||||
--]]
|
||||
|
||||
|
@ -51,7 +42,11 @@ local flush_count = read_config("flush_count") or 5000
|
|||
local last_flush = time()
|
||||
|
||||
local encoder_module = read_config("encoder_module") or error("Encoder should be defined")
|
||||
local encoder = require(encoder_module)
|
||||
local encode = require(encoder_module).encode
|
||||
if not encode then
|
||||
error("Encode function should be implemented by ".. encoder_module)
|
||||
end
|
||||
|
||||
|
||||
local client
|
||||
local function create_client()
|
||||
|
@ -146,7 +141,7 @@ retry = false
|
|||
|
||||
function process_message()
|
||||
if not retry then
|
||||
local code, msg = encoder.encode()
|
||||
local code, msg = encode()
|
||||
if code == 0 and string.len(msg) > 0 then
|
||||
_, lines = string.gsub(msg, '\n', '\n')
|
||||
batch_count = batch_count + lines
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
local os = require 'os'
|
||||
local http = require 'socket.http'
|
||||
|
||||
local influxdb = require 'influx'
|
||||
local influxdb = require 'influxdb'
|
||||
|
||||
local influxdb_host = read_config('host') or error('influxdb host is required')
|
||||
local influxdb_port = read_config('port') or error('influxdb port is required')
|
||||
|
|
Loading…
Reference in New Issue