switch naily to master/workers model

This commit is contained in:
Alexander Pavlenko 2013-03-25 15:44:37 +04:00
parent 37d8299c25
commit 70e962abcf
5 changed files with 106 additions and 99 deletions

View File

@ -1,12 +1,12 @@
#!/usr/bin/env ruby
require 'naily'
require 'logger'
require 'ostruct'
require 'optparse'
require 'yaml'
require 'amqp'
require 'raemon'
options = OpenStruct.new
options.daemonize = false
@ -14,117 +14,62 @@ options.pidfile = '/var/run/nailyd.pid'
options.config_path = '/etc/naily/nailyd.conf'
options.log_path = nil
options.log_level = 'debug'
options.workers = 10
OptionParser.new do |opts|
opts.banner = 'Usage: nailyd [options]'
opts.separator ''
opts.separator 'Options:'
opts.on('-d', '--[no-]daemonize', 'Daemonize server') { |flag| options.daemonize = flag }
opts.on('-P', '--pidfile PATH', 'Path to pidfile') { |path| options.pidfile = path }
opts.separator "\nOptions:"
opts.on('-d', '--[no-]daemonize', 'Daemonize server') do |flag|
options.daemonize = flag
end
opts.on('-P', '--pidfile PATH', 'Path to pidfile') do |path|
options.pidfile = path
end
opts.on('-w', '--workers NUMBER', 'Number of worker processes') do |number|
options.workers = number.to_i
end
opts.on('-c', '--config PATH', 'Use custom config file') do |path|
unless File.exists?(path)
puts "Error: config file #{options.config_path} was not found"
puts "Error: config file #{path} was not found"
exit
end
options.config_path = path
end
opts.on('-l', '--logfile PATH' 'Log file path') do |path|
options.log_path = path
end
levels = %w{fatal error warn info debug}
opts.on('--loglevel LEVEL', levels, "Logging level (#{levels.join(', ')})") do |level|
options.log_level = level
end
opts.on_tail('-h', '--help', 'Show this message') do
puts opts
exit
end
opts.on_tail('-v', '--version', 'Show version') do
puts Naily::VERSION
exit
end
end.parse!
Naily.config.update(YAML.load(File.read(options.config_path))) if File.exists?(options.config_path)
if options.daemonize
# After daemonize we can't log to STDOUT, pick a default log file
options.log_path ||= "#{Dir.pwd}/naily.log"
require 'daemons'
Daemons.daemonize :app_name => 'naily'
end
Naily.config.update(YAML.load(File.read(options.config_path))) if File.exists?(options.config_path)
Naily.logger = options.log_path ? Logger.new(options.log_path) : Logger.new(STDOUT)
Naily.logger.level = Logger.const_get(options.log_level.upcase)
Naily.logger.formatter = proc {|severity, datetime, progname, msg|
severity_map = {'DEBUG' => 'debug', 'INFO' => 'info', 'WARN' => 'warning',
'ERROR' => 'err', 'FATAL' => 'crit'}
"#{datetime.strftime("%Y-%m-%dT%H:%M:%S")} #{severity_map[severity]}: #{msg}\n"
}
begin
File.open(options.pidfile, 'w') { |f| f.write($$) } if options.daemonize
rescue
Naily.logger.error "Exception: #{$!}"
end
def connection_options
{
:host => Naily.config.broker_host,
:port => Naily.config.broker_port,
:username => Naily.config.broker_username,
:password => Naily.config.broker_password,
}.reject { |k, v| v.nil? }
Naily.logger.formatter = proc do |severity, datetime, progname, msg|
severity_map = {'DEBUG' => 'debug', 'INFO' => 'info', 'WARN' => 'warning', 'ERROR' => 'err', 'FATAL' => 'crit'}
"#{Process.pid} #{datetime.strftime("%Y-%m-%dT%H:%M:%S")} #{severity_map[severity]}: #{msg}\n"
end
Naily.logger.info "Starting..."
EM.run do
AMQP.logging = true
connection = AMQP.connect(connection_options)
channel = AMQP::Channel.new(connection)
channel.on_error do |ch, error|
Naily.logger.fatal "Channel error #{error.inspect}"
stop(connection)
end
exchange = channel.topic(Naily.config.broker_exchange, :durable => true)
producer = Naily::Producer.new(channel, exchange)
delegate = Naily.config.delegate || Naily::Dispatcher.new(producer)
server = Naily::Server.new(channel, exchange, delegate, producer)
server.run
# Example
#reporter = Naily::Reporter.new(producer, "some_method")
#reporter.report("some haha")
Signal.trap('INT') do
Naily.logger.info "Got INT signal, stopping"
stop(connection)
end
Signal.trap('TERM') do
Naily.logger.info "Got TERM signal, stopping"
stop(connection)
end
def stop(connection, &block)
if connection
connection.close { EM.stop(&block) }
else
EM.stop(&block)
end
end
end
File.unlink(options.pidfile) if options.daemonize # NOTE: bug?
Raemon::Master.start(options.workers, Naily::Worker,
:detach => options.daemonize,
:name => 'naily',
:pid_file => options.pidfile,
:logger => Naily.logger
)

View File

@ -6,26 +6,18 @@ require 'logger'
require 'json'
module Naily
autoload 'Worker', 'naily/worker'
autoload 'Server', 'naily/server'
autoload 'Producer', 'naily/producer'
autoload 'Dispatcher', 'naily/dispatcher'
autoload 'Reporter', 'naily/reporter'
@logger ||= Logger.new(STDOUT)
@logger.formatter = proc {|severity, datetime, progname, msg|
severity_map = {'DEBUG' => 'debug', 'INFO' => 'info', 'WARN' => 'warning',
'ERROR' => 'err', 'FATAL' => 'crit'}
"#{datetime.strftime("%Y-%m-%dT%H:%M:%S")} #{severity_map[severity]}: #{msg}\n"
}
Astute.logger = @logger
def self.logger
@logger
end
def self.logger=(logger)
Astute.logger = logger
@logger = logger
@logger = logger
end
end

View File

@ -1,5 +1,4 @@
require 'json'
require 'thread'
module Naily
class Server
@ -14,11 +13,8 @@ module Naily
queue = @channel.queue(Naily.config.broker_queue, :durable => true)
queue.bind(@exchange, :routing_key => Naily.config.broker_queue)
Astute::MClient.semaphore = Mutex.new
queue.subscribe do |header, payload|
Thread.new do
dispatch payload
end
dispatch payload
end
Naily.logger.info "Server started"
@ -49,14 +45,13 @@ module Naily
Naily.logger.info "Processing RPC call #{data['method']}"
begin
result = @delegate.send(data['method'], data)
@delegate.send(data['method'], data)
rescue Exception => e
Naily.logger.error "Error running RPC method #{data['method']}: #{e.message}, trace: #{e.backtrace.inspect}"
if data['respond_to']
reporter = Naily::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid'])
reporter.report({'status' => 'error', 'error' => "Error occurred while running method '#{data['method']}'. See logs of Orchestrator for details."})
end
return
end
end
end

75
naily/lib/naily/worker.rb Normal file
View File

@ -0,0 +1,75 @@
require 'raemon'
module Naily
class Worker
include Raemon::Worker
def start
super
start_heartbeat
end
def stop
super
begin
@connection.close{ stop_event_machine }
ensure
stop_event_machine
end
end
def run
EM.run do
initialize_server.run
end
end
private
def start_heartbeat
@heartbeat ||= Thread.new do
sleep 30
heartbeat!
end
end
def initialize_server
initialize_amqp
@server = Naily::Server.new(@channel, @exchange, @delegate, @producer)
end
def initialize_amqp
AMQP.logging = true
@connection = AMQP.connect(connection_options)
create_channel
@exchange = @channel.topic(Naily.config.broker_exchange, :durable => true)
@producer = Naily::Producer.new(@channel, @exchange)
@delegate = Naily.config.delegate || Naily::Dispatcher.new(@producer)
rescue => ex
Naily.logger.error "Exception during AMQP connection initialization: #{ex}"
sleep 15
retry
end
def create_channel
@channel = AMQP::Channel.new(@connection)
@channel.on_error do |ch, error|
Naily.logger.fatal "Channel error #{error.inspect}"
stop
end
end
def connection_options
{
:host => Naily.config.broker_host,
:port => Naily.config.broker_port,
:username => Naily.config.broker_username,
:password => Naily.config.broker_password,
}.reject{|k, v| v.nil? }
end
def stop_event_machine
EM.stop_event_loop if EM.reactor_running?
end
end
end

View File

@ -11,11 +11,11 @@ Gem::Specification.new do |s|
s.authors = ['Maxim Kulkin']
s.email = ['mkulkin@mirantis.com']
s.add_dependency 'daemons'
s.add_dependency 'amqp'
s.add_dependency 'symboltable', '>= 1.0.2'
s.add_dependency 'amqp', '0.9.10'
s.add_dependency 'astute'
s.add_dependency 'json'
s.add_dependency 'json', '1.6.1'
s.add_dependency 'raemon', '0.3.0'
s.add_dependency 'symboltable', '1.0.2'
s.files = Dir.glob("{bin,lib}/**/*")
s.executables = ['nailyd']