From b60624ee2c5f1d6d805619b6c27965a973508da1 Mon Sep 17 00:00:00 2001 From: "Vladimir Sharshov (warpc)" Date: Mon, 12 Oct 2015 19:25:00 +0300 Subject: [PATCH] Move from amqp-gem to bunny Differents: - separate independent chanel for outgoing report; - solid way to redeclare already existed queues; - auto recovery mode in case of network problem by default; - more solid, modern and simple library for AMQP. Also: - implement asynchronous logger for event callbacks. Short words from both gems authors: amqp gem brings in a fair share of EventMachine complexity which cannot be fully eliminated. Event loop blocking, writes that happen at the end of loop tick, uncaught exceptions in event loop silently killing it: it's not worth the pain unless you've already deeply invested in EventMachine and understand how it works. Closes-Bug: #1498847 Closes-Bug: #1487397 Closes-Bug: #1461562 Related-Bug: #1485895 Related-Bug: #1483182 Change-Id: I52d005498ccb978ada158bfa64b1c7de1a24e9b0 --- astute.gemspec | 2 +- bin/astuted | 1 - lib/astute.rb | 3 + lib/astute/server/async_logger.rb | 79 +++++++++++++++ lib/astute/server/dispatcher.rb | 141 +++++++++++++++++---------- lib/astute/server/producer.rb | 49 +++++++--- lib/astute/server/server.rb | 113 +++++++++++++-------- lib/astute/server/worker.rb | 157 +++++++++++++++++------------- specs/astute.spec | 2 +- 9 files changed, 375 insertions(+), 172 deletions(-) create mode 100644 lib/astute/server/async_logger.rb diff --git a/astute.gemspec b/astute.gemspec index 7e9a90b8..3e015360 100644 --- a/astute.gemspec +++ b/astute.gemspec @@ -17,7 +17,7 @@ Gem::Specification.new do |s| s.add_dependency 'net-ssh-multi', '~> 1.1' # Astute as service - s.add_dependency 'amqp', '1.4.1' + s.add_dependency 'bunny', ">= 2.0" s.add_dependency 'raemon', '0.3.0' s.add_development_dependency 'facter' diff --git a/bin/astuted b/bin/astuted index 0b659cbe..a99d73d9 100755 --- a/bin/astuted +++ b/bin/astuted @@ -19,7 +19,6 @@ require 'logger' require 'ostruct' require 'optparse' require 'yaml' -require 'amqp' require 'raemon' options = OpenStruct.new diff --git a/lib/astute.rb b/lib/astute.rb index 3cb2fe68..6b337cc8 100644 --- a/lib/astute.rb +++ b/lib/astute.rb @@ -20,6 +20,7 @@ require 'logger' require 'shellwords' require 'active_support/all' require 'pp' +require 'bunny' require 'astute/ext/exception' require 'astute/ext/deep_copy' @@ -43,6 +44,8 @@ require 'astute/puppet_task' require 'astute/task_manager' require 'astute/pre_delete' require 'astute/version' +require 'astute/server/async_logger' + ['/astute/pre_deployment_actions/*.rb', '/astute/pre_deploy_actions/*.rb', diff --git a/lib/astute/server/async_logger.rb b/lib/astute/server/async_logger.rb new file mode 100644 index 00000000..8011fa55 --- /dev/null +++ b/lib/astute/server/async_logger.rb @@ -0,0 +1,79 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +require 'thread' + +module Astute + module Server + + # Asynchronous singleton logger, which should be used + # in event callbacks of event machine, it doesn't block + # callbacks because writing a message to log takes some time. + # Also synchronous logger, potentially could lead to deadlocks. + # See: + # https://bugs.launchpad.net/fuel/+bug/1453573 + # https://bugs.launchpad.net/fuel/+bug/1487397 + module AsyncLogger + def self.start_up(logger=Logger.new(STDOUT)) + @queue ||= Queue.new + @log = logger + @thread = Thread.new { flush_messages } + end + + def self.shutdown + @thread.kill + end + + def self.add(severity, msg=nil) + return if @shutdown + + @queue.push([severity, msg]) + end + + def self.debug(msg=nil) + add(Logger::Severity::DEBUG, msg) + end + + def self.info(msg=nil) + add(Logger::Severity::INFO, msg) + end + + def self.warn(msg=nil) + add(Logger::Severity::WARN, msg) + end + + def self.error(msg=nil) + add(Logger::Severity::ERROR, msg) + end + + def self.fatal(msg=nil) + add(Logger::Severity::FATAL, msg) + end + + def self.unknown(msg=nil) + add(Logger::Severity::UNKNOWN, msg) + end + + private + + def self.flush_messages + loop do + severity, msg = @queue.pop + @log.add(severity, msg) + end + end + + end + end +end \ No newline at end of file diff --git a/lib/astute/server/dispatcher.rb b/lib/astute/server/dispatcher.rb index 01172d6e..c6c84997 100644 --- a/lib/astute/server/dispatcher.rb +++ b/lib/astute/server/dispatcher.rb @@ -25,7 +25,7 @@ module Astute end def echo(args) - Astute.logger.info 'Running echo command' + Astute.logger.info('Running echo command') args end @@ -43,9 +43,10 @@ module Astute def provision(data, provision_method) - Astute.logger.info("'provision' method called with data:\n#{data.pretty_inspect}") + Astute.logger.info("'provision' method called with data:\n"\ + "#{data.pretty_inspect}") - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) + reporter = create_reporter(data) begin result = @orchestrator.provision( reporter, @@ -53,20 +54,19 @@ module Astute data['args']['provisioning_info'], provision_method ) - - #TODO(vsharshov): Refactoring the deployment aborting messages (StopIteration) rescue => e - Astute.logger.error "Error running provisioning: #{e.message}, trace: #{e.format_backtrace}" + Astute.logger.error("Error running provisioning: #{e.message}, "\ + "trace: #{e.format_backtrace}") raise StopIteration end raise StopIteration if result && result['status'] == 'error' end def deploy(data) - Astute.logger.info("'deploy' method called with data:\n#{data.pretty_inspect}") - - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) + Astute.logger.info("'deploy' method called with data:\n"\ + "#{data.pretty_inspect}") + reporter = create_reporter(data) begin @orchestrator.deploy( reporter, @@ -78,15 +78,16 @@ module Astute reporter.report('status' => 'ready', 'progress' => 100) rescue Timeout::Error msg = "Timeout of deployment is exceeded." - Astute.logger.error msg + Astute.logger.error(msg) reporter.report('status' => 'error', 'error' => msg) end end def granular_deploy(data) - Astute.logger.info("'granular_deploy' method called with data:\n#{data.pretty_inspect}") + Astute.logger.info("'granular_deploy' method called with data:\n"\ + "#{data.pretty_inspect}") - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) + reporter = create_reporter(data) begin @orchestrator.granular_deploy( reporter, @@ -98,7 +99,7 @@ module Astute reporter.report('status' => 'ready', 'progress' => 100) rescue Timeout::Error msg = "Timeout of deployment is exceeded." - Astute.logger.error msg + Astute.logger.error(msg) reporter.report('status' => 'error', 'error' => msg) end end @@ -111,60 +112,80 @@ module Astute Astute.logger.warn("No method for #{subtask}") end end - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) - result = @orchestrator.verify_networks(reporter, data['args']['task_uuid'], data['args']['nodes']) + reporter = create_reporter(data) + result = @orchestrator.verify_networks( + reporter, + data['args']['task_uuid'], + data['args']['nodes'] + ) report_result(result, reporter) end def check_dhcp(data) - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) - result = @orchestrator.check_dhcp(reporter, data['args']['task_uuid'], data['args']['nodes']) + reporter = create_reporter(data) + result = @orchestrator.check_dhcp( + reporter, + data['args']['task_uuid'], + data['args']['nodes'] + ) report_result(result, reporter) end def multicast_verification(data) - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) - result = @orchestrator.multicast_verification(reporter, data['args']['task_uuid'], data['args']['nodes']) + reporter = create_reporter(data) + result = @orchestrator.multicast_verification( + reporter, + data['args']['task_uuid'], + data['args']['nodes'] + ) report_result(result, reporter) end def check_repositories(data) - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) - result = @orchestrator.check_repositories(reporter, data['args']['task_uuid'], data['args']['nodes'], data['args']['urls']) + reporter = create_reporter(data) + result = @orchestrator.check_repositories( + reporter, + data['args']['task_uuid'], + data['args']['nodes'], + data['args']['urls'] + ) report_result(result, reporter) end def check_repositories_with_setup(data) - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) - result = @orchestrator.check_repositories_with_setup(reporter, data['args']['task_uuid'], data['args']['nodes']) + reporter = create_reporter(data) + result = @orchestrator.check_repositories_with_setup( + reporter, + data['args']['task_uuid'], + data['args']['nodes'] + ) report_result(result, reporter) end def dump_environment(data) - task_id = data['args']['task_uuid'] - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], task_id) - @orchestrator.dump_environment(reporter, task_id, data['args']['settings']) + @orchestrator.dump_environment( + create_reporter(data), + data['args']['task_uuid'], + data['args']['settings'] + ) end def remove_nodes(data, reset=false) task_uuid = data['args']['task_uuid'] - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], task_uuid) - nodes = data['args']['nodes'] - engine = data['args']['engine'] - check_ceph = data['args']['check_ceph'] + reporter = create_reporter(data) - result = if nodes.empty? + result = if data['args']['nodes'].empty? Astute.logger.debug("#{task_uuid} Node list is empty") nil else @orchestrator.remove_nodes( reporter, task_uuid, - engine, - nodes, + data['args']['engine'], + data['args']['nodes'], { :reboot => true, - :check_ceph => check_ceph, + :check_ceph => data['args']['check_ceph'], :reset => reset } ) @@ -178,16 +199,9 @@ module Astute end def execute_tasks(data) - task_uuid = data['args']['task_uuid'] - reporter = Astute::Server::Reporter.new( - @producer, - data['respond_to'], - task_uuid - ) - @orchestrator.execute_tasks( - reporter, - task_uuid, + create_reporter(data), + data['args']['task_uuid'], data['args']['tasks'] ) end @@ -197,15 +211,17 @@ module Astute # def stop_deploy_task(data, service_data) - Astute.logger.debug("'stop_deploy_task' service method called with data:\n#{data.pretty_inspect}") + Astute.logger.debug("'stop_deploy_task' service method called with"\ + "data:\n#{data.pretty_inspect}") target_task_uuid = data['args']['stop_task_uuid'] task_uuid = data['args']['task_uuid'] - return unless task_in_queue?(target_task_uuid, service_data[:tasks_queue]) + return unless task_in_queue?(target_task_uuid, + service_data[:tasks_queue]) Astute.logger.debug("Cancel task #{target_task_uuid}. Start") if target_task_uuid == service_data[:tasks_queue].current_task_id - reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], task_uuid) + reporter = create_reporter(data) result = stop_current_task(data, service_data, reporter) report_result(result, reporter) else @@ -215,6 +231,14 @@ module Astute private + def create_reporter(data) + Astute::Server::Reporter.new( + @producer, + data['respond_to'], + data['args']['task_uuid'] + ) + end + def task_in_queue?(task_uuid, tasks_queue) tasks_queue.task_in_queue?(task_uuid) end @@ -224,8 +248,13 @@ module Astute task_uuid = data['args']['task_uuid'] new_task_data = data_for_rm_nodes(data) - Astute.logger.info("Replace running task #{target_task_uuid} to new #{task_uuid} with data:\n#{new_task_data.pretty_inspect}") - service_data[:tasks_queue].replace_task(target_task_uuid, new_task_data) + Astute.logger.info("Replace running task #{target_task_uuid} to "\ + "new #{task_uuid} with data:\n"\ + "#{new_task_data.pretty_inspect}") + service_data[:tasks_queue].replace_task( + target_task_uuid, + new_task_data + ) end def stop_current_task(data, service_data, reporter) @@ -233,15 +262,25 @@ module Astute task_uuid = data['args']['task_uuid'] nodes = data['args']['nodes'] - Astute.logger.info "Try to kill running task #{target_task_uuid}" + Astute.logger.info("Try to kill running task #{target_task_uuid}") service_data[:main_work_thread].kill result = if ['deploy', 'task_deployment', 'granular_deploy'].include? ( service_data[:tasks_queue].current_task_method) @orchestrator.stop_puppet_deploy(reporter, task_uuid, nodes) - @orchestrator.remove_nodes(reporter, task_uuid, data['args']['engine'], nodes) + @orchestrator.remove_nodes( + reporter, + task_uuid, + data['args']['engine'], + nodes + ) else - @orchestrator.stop_provision(reporter, task_uuid, data['args']['engine'], nodes) + @orchestrator.stop_provision( + reporter, + task_uuid, + data['args']['engine'], + nodes + ) end end diff --git a/lib/astute/server/producer.rb b/lib/astute/server/producer.rb index e1fc97a2..74b4800c 100644 --- a/lib/astute/server/producer.rb +++ b/lib/astute/server/producer.rb @@ -14,27 +14,48 @@ module Astute module Server - class Producer def initialize(exchange) @exchange = exchange + @publish_queue = Queue.new + @publish_consumer = Thread.new do + loop do + msg = @publish_queue.pop + publish_from_queue msg + end + end + end + + def publish_from_queue(message) + Astute.logger.info "Casting message to Nailgun:\n"\ + "#{message[:message].pretty_inspect}" + @exchange.publish(message[:message].to_json, message[:options]) + rescue => e + Astute.logger.error "Error publishing message: #{e.message}" end def publish(message, options={}) - default_options = {:routing_key => Astute.config.broker_publisher_queue, - :content_type => 'application/json'} - options = default_options.merge(options) - - EM.next_tick { - begin - Astute.logger.info "Casting message to Nailgun:\n#{message.pretty_inspect}" - @exchange.publish(message.to_json, options) - rescue - Astute.logger.error "Error publishing message: #{$!}" - end + default_options = { + :routing_key => Astute.config.broker_publisher_queue, + :content_type => 'application/json' } - end - end + # Status message manage task status in Nailgun. If we miss some of them, + # user need manually delete them or change it status using DB. + # Persistent option tell RabbitMQ to save message in case of + # unexpected/expected restart. + if message.respond_to?(:keys) && message.keys.map(&:to_s).include?('status') + default_options.merge!({:persistent => true}) + end + + options = default_options.merge(options) + @publish_queue << {:message => message, :options => options} + end + + def stop + @publish_consumer.kill + end + + end # Producer end #Server end #Astute diff --git a/lib/astute/server/server.rb b/lib/astute/server/server.rb index bfdf7d77..6e104489 100644 --- a/lib/astute/server/server.rb +++ b/lib/astute/server/server.rb @@ -20,63 +20,82 @@ module Astute module Server class Server - def initialize(channel, exchange, delegate, producer, service_channel, service_exchange) - @channel = channel - @exchange = exchange + def initialize(channels_and_exchanges, delegate, producer) + @channel = channels_and_exchanges[:channel] + @exchange = channels_and_exchanges[:exchange] @delegate = delegate @producer = producer - @service_channel = service_channel - @service_exchange = service_exchange + @service_channel = channels_and_exchanges[:service_channel] + @service_exchange = channels_and_exchanges[:service_exchange] # NOTE(eli): Generate unique name for service queue # See bug: https://bugs.launchpad.net/fuel/+bug/1485895 @service_queue_name = "naily_service_#{SecureRandom.uuid}" + @watch_thread = nil end def run - @queue = @channel.queue(Astute.config.broker_queue, :durable => true).bind(@exchange) - @service_queue = @service_channel.queue(@service_queue_name, :exclusive => true, :auto_delete => true).bind(@service_exchange) + @queue = @channel.queue( + Astute.config.broker_queue, + :durable => true + ) + @queue.bind(@exchange) + + @service_queue = @service_channel.queue( + @service_queue_name, + :exclusive => true, + :auto_delete => true + ) + @service_queue.bind(@service_exchange) @main_work_thread = nil @tasks_queue = TaskQueue.new - Thread.new(&method(:register_callbacks)) - self + register_callbacks + + run_infinite_loop + @watch_thread.join + end + + def stop + @watch_thread.wakeup end private + def run_infinite_loop + @watch_thread = Thread.new do + Thread.stop + Astute.logger.debug "Stop main thread" + end + end + def register_callbacks main_worker service_worker end def main_worker - @consumer = AMQP::Consumer.new(@channel, @queue, consumer_tag=nil, exclusive=false) - @consumer.on_cancel do |basic_cancel| - Astute.logger.debug("Received cancel notification from in main worker.") - @exchange.auto_recover - @service_exchange.auto_recover - @queue.auto_recover - @service_queue.auto_recover - end - @consumer.on_delivery do |metadata, payload| + @queue.subscribe(:manual_ack => true) do |delivery_info, _, payload| if @main_work_thread.nil? || !@main_work_thread.alive? - Astute.logger.debug "Process message from worker queue:\n#{payload.pretty_inspect}" - metadata.ack - perform_main_job(metadata, payload) + Astute.logger.debug "Process message from worker queue:\n"\ + "#{payload.pretty_inspect}" + @channel.acknowledge(delivery_info.delivery_tag, false) + perform_main_job(payload) else - Astute.logger.debug "Requeue message because worker is busy:\n#{payload.pretty_inspect}" - # Avoid throttle by consume/reject cycle if only one worker is running - EM.add_timer(2) { metadata.reject(:requeue => true) } + Astute.logger.debug "Requeue message because worker is busy:"\ + "\n#{payload.pretty_inspect}" + # Avoid throttle by consume/reject cycle + # if only one worker is running + @channel.reject(delivery_info.delivery_tag, true) end end - @consumer.consume end def service_worker - @service_queue.subscribe do |_, payload| - Astute.logger.debug "Process message from service queue:\n#{payload.pretty_inspect}" - perform_service_job(nil, payload) + @service_queue.subscribe do |_delivery_info, _properties, payload| + Astute.logger.debug "Process message from service queue:\n"\ + "#{payload.pretty_inspect}" + perform_service_job(payload) end end @@ -94,7 +113,7 @@ module Astute end end - def perform_main_job(metadata, payload) + def perform_main_job(payload) @main_work_thread = Thread.new do data = parse_data(payload) @tasks_queue = Astute::Server::TaskQueue.new @@ -109,9 +128,12 @@ module Astute end end - def perform_service_job(metadata, payload) + def perform_service_job(payload) Thread.new do - service_data = {:main_work_thread => @main_work_thread, :tasks_queue => @tasks_queue} + service_data = { + :main_work_thread => @main_work_thread, + :tasks_queue => @tasks_queue + } data = parse_data(payload) send_message_task_in_orchestrator(data) dispatch(data, service_data) @@ -127,8 +149,9 @@ module Astute abort_messages data[(i + 1)..-1] break rescue => ex - Astute.logger.error "Error running RPC method #{message['method']}: #{ex.message}, " \ - "trace: #{ex.format_backtrace}" + Astute.logger.error "Error running RPC method "\ + "#{message['method']}: #{ex.message}, "\ + "trace: #{ex.format_backtrace}" return_results message, { 'status' => 'error', 'error' => "Method #{message['method']}. #{ex.message}.\n" \ @@ -156,7 +179,10 @@ module Astute return end - Astute.logger.debug "Main worker task id is #{@tasks_queue.current_task_id}" if service_data.nil? + if service_data.nil? + Astute.logger.debug "Main worker task id is "\ + "#{@tasks_queue.current_task_id}" + end Astute.logger.info "Processing RPC call '#{data['method']}'" if !service_data @@ -168,7 +194,11 @@ module Astute def return_results(message, results={}) if results.is_a?(Hash) && message['respond_to'] - reporter = Astute::Server::Reporter.new(@producer, message['respond_to'], message['args']['task_uuid']) + reporter = Astute::Server::Reporter.new( + @producer, + message['respond_to'], + message['args']['task_uuid'] + ) reporter.report results end end @@ -179,7 +209,8 @@ module Astute begin messages = JSON.load(data) rescue => e - Astute.logger.error "Error deserializing payload: #{e.message}, trace:\n#{e.backtrace.pretty_inspect}" + Astute.logger.error "Error deserializing payload: #{e.message},"\ + " trace:\n#{e.backtrace.pretty_inspect}" end messages.is_a?(Array) ? messages : [messages] end @@ -197,7 +228,12 @@ module Astute if message['args']['nodes'].instance_of?(Array) err_nodes = message['args']['nodes'].map do |node| - {'uid' => node['uid'], 'status' => 'error', 'error_type' => 'provision', 'progress' => 0} + { + 'uid' => node['uid'], + 'status' => 'error', + 'error_type' => 'provision', + 'progress' => 0 + } end err_msg.merge!('nodes' => err_nodes) @@ -205,7 +241,8 @@ module Astute return_results(message, err_msg) rescue => ex - Astute.logger.debug "Failed to abort '#{message['method']}':\n#{ex.pretty_inspect}" + Astute.logger.debug "Failed to abort '#{message['method']}':\n"\ + "#{ex.pretty_inspect}" end end end diff --git a/lib/astute/server/worker.rb b/lib/astute/server/worker.rb index e16f51b0..d816d5bb 100644 --- a/lib/astute/server/worker.rb +++ b/lib/astute/server/worker.rb @@ -14,6 +14,7 @@ require 'raemon' require 'net/http' +require 'bunny' module Astute module Server @@ -26,39 +27,42 @@ module Astute def start super start_heartbeat + Astute::Server::AsyncLogger.start_up(Astute.logger) + Astute.logger = Astute::Server::AsyncLogger end def stop super - begin - @connection.close{ stop_event_machine } if @connection - ensure - stop_event_machine - end + @connection.stop if defined?(@connection) && @connection.present? + @producer.stop if defined?(@producer) && @producer.present? + @server.stop if defined?(@server) && @server.present? + Astute::Server::AsyncLogger.shutdown end def run Astute.logger.info "Worker initialization" - EM.run do - run_server - end - rescue AMQP::TCPConnectionFailed => e - Astute.logger.warn "TCP connection to AMQP failed: #{e.message}. Retry #{DELAY_SEC} sec later..." + run_server + rescue Bunny::TCPConnectionFailed => e + Astute.logger.warn "TCP connection to AMQP failed: #{e.message}. "\ + "Retry #{DELAY_SEC} sec later..." sleep DELAY_SEC retry - rescue AMQP::PossibleAuthenticationFailureError => e - Astute.logger.warn "If problem repeated more than 5 minutes, please check " \ - "authentication parameters. #{e.message}. Retry #{DELAY_SEC} sec later..." + rescue Bunny::PossibleAuthenticationFailureError => e + Astute.logger.warn "If problem repeated more than 5 minutes, "\ + "please check "\ + "authentication parameters. #{e.message}. "\ + "Retry #{DELAY_SEC} sec later..." sleep DELAY_SEC retry rescue => e - Astute.logger.error "Exception during worker initialization: #{e.message}, trace: #{e.format_backtrace}" + Astute.logger.error "Exception during worker initialization:"\ + " #{e.message}, trace: #{e.format_backtrace}" Astute.logger.warn "Retry #{DELAY_SEC} sec later..." sleep DELAY_SEC retry end - private + private def start_heartbeat @heartbeat ||= Thread.new do @@ -68,76 +72,97 @@ module Astute end def run_server - AMQP.logging = true - AMQP.connect(connection_options) do |connection| - @connection = configure_connection(connection) + @connection = Bunny.new(connection_options) + @connection.start + channels_and_exchanges = declare_channels_and_exchanges(@connection) - @channel = create_channel(@connection) - @exchange = @channel.topic(Astute.config.broker_exchange, :durable => true) - @service_channel = create_channel(@connection, prefetch=false) - @service_exchange = @service_channel.fanout(Astute.config.broker_service_exchange, :auto_delete => true) + @producer = Astute::Server::Producer.new( + channels_and_exchanges[:report_exchange] + ) + delegate = Astute::Server::Dispatcher.new(@producer) + @server = Astute::Server::Server.new( + channels_and_exchanges, + delegate, + @producer + ) - @producer = Astute::Server::Producer.new(@exchange) - @delegate = Astute.config.delegate || Astute::Server::Dispatcher.new(@producer) - @server = Astute::Server::Server.new(@channel, @exchange, @delegate, @producer, @service_channel, @service_exchange) - - @server.run - end + @server.run end - def configure_connection(connection) - connection.on_tcp_connection_loss do |conn, settings| - Astute.logger.warn "Trying to reconnect to message broker. Retry #{DELAY_SEC} sec later..." - EM.add_timer(DELAY_SEC) { conn.reconnect } - end - connection - end + def declare_channels_and_exchanges(connection) + # WARN: Bunny::Channel are designed to assume they are + # not shared between threads. + channel = @connection.create_channel + exchange = channel.topic( + Astute.config.broker_exchange, + :durable => true + ) - def create_channel(connection, prefetch=true) - prefetch_opts = ( prefetch ? {:prefetch => 1} : {} ) - channel = AMQP::Channel.new(connection, connection.next_channel_id, prefetch_opts) - channel.auto_recovery = true - channel.on_error do |ch, error| - if error.reply_code == 406 #PRECONDITION_FAILED - cleanup_rabbitmq_stuff - else - Astute.logger.fatal "Channel error\n#{error.pretty_inspect}" - end - sleep DELAY_SEC # avoid race condition - stop + report_channel = @connection.create_channel + report_exchange = report_channel.topic( + Astute.config.broker_exchange, + :durable => true + ) + + service_channel = @connection.create_channel + service_channel.prefetch(0) + + service_exchange = service_channel.fanout( + Astute.config.broker_service_exchange, + :auto_delete => true + ) + + return { + :exchange => exchange, + :service_exchange => service_exchange, + :channel => channel, + :service_channel => service_channel, + :report_channel => report_channel, + :report_exchange => report_exchange + } + rescue Bunny::PreconditionFailed => e + Astute.logger.warn "Try to remove problem exchanges and queues" + if connection.queue_exists? Astute.config.broker_queue + channel.queue_delete Astute.config.broker_queue end - channel + if connection.queue_exists? Astute.config.broker_publisher_queue + channel.queue_delete Astute.config.broker_publisher_queue + end + + cleanup_rabbitmq_stuff + raise e end def connection_options { :host => Astute.config.broker_host, :port => Astute.config.broker_port, - :username => Astute.config.broker_username, - :password => Astute.config.broker_password, + :user => Astute.config.broker_username, + :pass => Astute.config.broker_password, + :heartbeat => :server }.reject{|k, v| v.nil? } end - def stop_event_machine - EM.stop_event_loop if EM.reactor_running? - end - def cleanup_rabbitmq_stuff Astute.logger.warn "Try to remove problem exchanges and queues" - [Astute.config.broker_exchange, Astute.config.broker_service_exchange].each do |exchange| - rest_delete("/api/exchanges/%2F/#{exchange}") - end - [Astute.config.broker_queue, Astute.config.broker_publisher_queue].each do |queue| - rest_delete("/api/queues/%2F/#{queue}") + [Astute.config.broker_exchange, + Astute.config.broker_service_exchange].each do |exchange| + rest_delete("/api/exchanges/%2F/#{exchange}") end end def rest_delete(url) - http = Net::HTTP.new(Astute.config.broker_host, Astute.config.broker_rest_api_port) + http = Net::HTTP.new( + Astute.config.broker_host, + Astute.config.broker_rest_api_port + ) request = Net::HTTP::Delete.new(url) - request.basic_auth(Astute.config.broker_username, Astute.config.broker_password) + request.basic_auth( + Astute.config.broker_username, + Astute.config.broker_password + ) response = http.request(request) @@ -145,13 +170,13 @@ module Astute when 204 then Astute.logger.debug "Successfully delete object at #{url}" when 404 then else - Astute.logger.error "Failed to perform delete request. Debug information: "\ - "http code: #{response.code}, message: #{response.message},"\ - "body #{response.body}" + Astute.logger.error "Failed to perform delete request. Debug"\ + " information: http code: #{response.code},"\ + " message: #{response.message},"\ + " body #{response.body}" end end - end - + end # Worker end #Server end #Astute diff --git a/specs/astute.spec b/specs/astute.spec index 1feaf070..5a6d5cd3 100644 --- a/specs/astute.spec +++ b/specs/astute.spec @@ -20,7 +20,7 @@ Requires: ruby21-rubygem-activesupport = 3.0.10 Requires: ruby21-rubygem-mcollective-client = 2.4.1 Requires: ruby21-rubygem-symboltable = 1.0.2 Requires: ruby21-rubygem-rest-client = 1.6.7 -Requires: ruby21-rubygem-amqp = 1.4.1 +Requires: ruby21-rubygem-bunny Requires: ruby21-rubygem-raemon = 0.3.0 Requires: ruby21-rubygem-net-ssh = 2.8.0 Requires: ruby21-rubygem-net-ssh-gateway = 1.2.0