[mcollective] Some fixes in mcollective

This commit is contained in:
Vladimir Kozhukalov 2012-09-11 12:31:15 +04:00 committed by default
parent 80636789dc
commit 5e2b61ddfa
11 changed files with 103 additions and 57 deletions

View File

@ -7,11 +7,12 @@ require 'naily/server/config'
require 'naily/server/daemon'
Naily::Server::Config.define do |config|
config.driver = :amqp
config.amqp_host = "127.0.0.1"
config.amqp_port = 5672
config.amqp_username = "guest"
config.amqp_password = "guest"
config.topic_exchange_name = "nailgun.topic"
config.topic_exchange_name = "nailgun"
config.topic_queue_name = "mcollective"
config.topic_queue_routing_key = "mcollective"
end

8
mcollective/bin/run_simple.rb Executable file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env ruby
$LOAD_PATH.unshift(File.expand_path(File.join(File.dirname(__FILE__), "..", "lib")))
require 'naily/mcclient/simple'
client = Naily::MCClient::Simple.new
client.run

View File

@ -7,9 +7,9 @@ module Naily
include Helpers
def initialize channel, message
logger.debug("Publish message: payload: #{message}")
logger.debug("Publish message: #{message}")
AMQP::Exchange.new(channel, :direct, message.exchange_name,
:auto_delete => true) do |exchange|
:durable => false, :auto_delete => true) do |exchange|
exchange.publish(message, :routing_key => message.routing_key) do
logger.debug("Publish message: complete")
end

View File

@ -9,28 +9,31 @@ module Naily
include Helpers
def initialize message
logger.debug("Initializing driver dependent handler: Naily::Amqp::Handler")
@message = message
@real_handler = get_real_handler
end
def get_real_handler
case message.rpc_method.to_sym
logger.debug("RPC method: #{@message.rpc_method}")
case @message.rpc_method.to_sym
when :echo
return Naily::Handler::Echo.new @message.rpc_method_args.to_hash
return Naily::Handler::Echo.new @message.rpc_method_args
when :mco
return Naily::Handler::Mco.new @message.rpc_method_args.to_hash
return Naily::Handler::Mco.new @message.rpc_method_args
else
return Naily::Handler::Null.new @message.rpc_method_args.to_hash
return Naily::Handler::Null.new @message.rpc_method_args
end
end
def handle
logger.debug("Handler request: #{@message}")
@real_handler.handle do |response|
logger.debug("Handler response: #{response}")
response ||= {}
if @message.call?
body = {
:result => handler_response,
:result => response,
:failure => nil,
:ending => false
}

View File

@ -5,57 +5,53 @@ module Naily
module Amqp
class Message
include Helpers
attr_reader :metadata
def payload= p
@payload = JSON.parse(p)
end
def payload
JSON.dump(@payload)
end
attr_accessor :header
attr_accessor :body
def valid?
return false if not @payload
return false if not @body
end
def to_s
self.payload
JSON.dump(self.body)
end
end
class Request < Message
def initialize m=nil, p=nil
@metadata = m
self.payload = p
# HERE HEADER IS NOT AMQP HEADER BUT
# AMQP HEADER ATTRIBUTES. IT IS HASH
def initialize h=nil, b=nil
@header = h
@body = b
end
def valid?
call_valid_actions = ["status"]
cast_valid_actions = ["deploy"]
return false if not @payload
return false if not @payload["action"]
return false if self.call? and not call_valid_actions.include?(self.action)
return false if not self.call? and not cast_valid_actions.include?(self.action)
return false if self.call? and not @payload["msg_id"]
return false if not @body
return false if not @body["method"]
return false if self.call? and not @body["msg_id"]
true
end
def call?
return true if @payload["msg_id"]
return true if @body["msg_id"]
false
end
def msg_id
@payload["msg_id"]
def rpc_method
return @body["method"]
end
def action
@payload["action"]
def rpc_method_args
return @body["args"]
end
def msg_id
@body["msg_id"]
end
end
@ -64,9 +60,9 @@ module Naily
attr_accessor :routing_key
attr_accessor :exchange_name
def initialize p=nil, options={}
self.payload = p
def initialize b=nil, options={}
self.body = b
self.routing_key = options[:routing_key] if options[:routing_key]
self.exchange_name = options[:exchange_name] if options[:exchange_name]
end

View File

@ -1,4 +1,5 @@
require 'amqp'
require 'json'
require 'naily/amqp/helpers'
module Naily
@ -11,19 +12,25 @@ module Naily
logger.debug("Initializing topic consumer: exchange: #{exchange_name} "\
"queue: #{queue_name} routing_key: #{routing_key}")
@parent = parent
AMQP::Exchange.new(channel, :topic, exchange_name) do |exchange|
AMQP::Exchange.new(channel, :topic, exchange_name,
:auto_delete => false, :durable => false) do |exchange|
logger.debug("Exchange has been declared: #{exchange.name}")
AMQP::Queue.new(channel, queue_name, :exclusive => true,
:auto_delete => true) do |queue|
logger.debug("Queue has been declared: #{queue.name}")
queue.bind(exchange, :routing_key => routing_key) do
queue.subscribe(:ack => true) do |metadata, payload|
message = Request.new(metadata, payload)
logger.debug("Received message: #{message}")
logger.debug("Queue: #{queue.name} has been bound "\
"to exchange: #{exchange.name}")
queue.subscribe(:ack => true) do |header, body|
logger.debug("Received message: #{body}")
message = Request.new(header.to_hash, JSON.load(body))
if message.valid?
logger.debug("Message valid. Handling.")
@parent.handle(message)
else
logger.error("Received message is not valid")
end
metadata.ack
header.ack
end
if blk
yield self

View File

@ -5,6 +5,9 @@ module Naily
def available_roles
end
def runonce params={}

View File

@ -1,10 +1,16 @@
require 'naily/framework/async'
require 'naily/mcclient/simple'
require 'naily/mcclient/blocking'
module Naily
module Handler
class Mco
def initialize args
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
@logger.debug("Initializing driver independent handler: Naily::Handler::Mco")
@args = args
end
@ -19,7 +25,9 @@ module Naily
end
async = Naily::Framework::Async.new client
async.call @args["action"], @args["action_args"] do |result|
@logger.debug("Asynced call returned result: #{result}")
yield({'result' => 'Action ended: #{result}'})
end
end

View File

@ -1,3 +1,4 @@
require 'logger'
require 'mcollective'
require 'naily/framework/client'
@ -8,15 +9,21 @@ module Naily
include Naily::Framework::Client
def initialize
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
@logger.debug("Initializing mco client: Naily::MCClient::Simple")
@mc = rpcclient('naily')
@mc.verbose = true
end
def run
def run *args
@logger.debug("Client action: run")
responses = []
@mc.runonce().each do |response|
responses << response
end
@logger.debug("Client action ended")
end
def disconnect

View File

@ -3,6 +3,7 @@ module Naily
module Config
extend self
attr_accessor :driver
attr_accessor :amqp_host
attr_accessor :amqp_port
attr_accessor :amqp_username

View File

@ -9,21 +9,33 @@ module Naily
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
@options = {
:host => Config.amqp_host,
:port => Config.amqp_port,
:username => Config.amqp_username,
:password => Config.amqp_password,
:topic_exchange_name => Config.topic_exchange_name,
:topic_queue_name => Config.topic_queue_name,
:topic_queue_routing_key => Config.topic_queue_routing_key
}
case Config.driver
when :amqp
@logger.debug("Naily driver is Naily::Amqp::Driver")
@driver_class = Naily::Amqp::Driver
@driver_options = {
:host => Config.amqp_host,
:port => Config.amqp_port,
:username => Config.amqp_username,
:password => Config.amqp_password,
:topic_exchange_name => Config.topic_exchange_name,
:topic_queue_name => Config.topic_queue_name,
:topic_queue_routing_key => Config.topic_queue_routing_key
}
else
@logger.error("Unknown driver. Raising exception.")
raise "Unknown driver."
end
end
def run
EM.run do
driver = Naily::Amqp::Driver.new @options
driver = @driver_class.new @driver_options
Signal.trap("INT") do
@logger.debug("INT signal has been caught")