Revert "Implement asynchronous logger for event callbacks"
This reverts commit e63709d16b
.
AsyncLogger's worker thread takes almost 100% CPU because
it infinitely iterates over a logger queue. A simple
"sleep 0.2" will solve this problem, but I have doubts
about that AsyncLogger helps us anyway. It's better to
revert this fix and investigate a real cause of our problems.
Closes-Bug: 1491794
Related-Bug: 1487397
Change-Id: I95d57de9ef92c717b8b07f7832ab258c0b35b6a5
This commit is contained in:
parent
e63709d16b
commit
e790f66091
|
@ -65,7 +65,6 @@ require 'astute/server/server'
|
|||
require 'astute/server/producer'
|
||||
require 'astute/server/dispatcher'
|
||||
require 'astute/server/reporter'
|
||||
require 'astute/server/async_logger'
|
||||
|
||||
module Astute
|
||||
# Library
|
||||
|
|
|
@ -1,85 +0,0 @@
|
|||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
require 'thread'
|
||||
|
||||
module Astute
|
||||
module Server
|
||||
|
||||
# Asynchronous singleton logger, which should be used
|
||||
# in event callbacks of event machine, it doesn't block
|
||||
# callbacks because writing a message to log takes some time.
|
||||
# Also synchronous logger, potentially could lead to deadlocks.
|
||||
# See:
|
||||
# https://bugs.launchpad.net/fuel/+bug/1453573
|
||||
# https://bugs.launchpad.net/fuel/+bug/1487397
|
||||
module AsyncLogger
|
||||
def self.start_up(logger=Logger.new(STDOUT))
|
||||
@queue ||= Queue.new
|
||||
@shutdown = false
|
||||
@log = logger
|
||||
@thread = Thread.new { flush_messages }
|
||||
end
|
||||
|
||||
def self.shutdown
|
||||
# Shutdown logger gracefully, wait until all messages
|
||||
# are written into the log
|
||||
@shutdown = true
|
||||
@thread.join
|
||||
end
|
||||
|
||||
def self.add(severity, msg=nil)
|
||||
return if @shutdown
|
||||
|
||||
@queue.push([severity, msg])
|
||||
end
|
||||
|
||||
def self.debug(msg=nil)
|
||||
add(Logger::Severity::DEBUG, msg)
|
||||
end
|
||||
|
||||
def self.info(msg=nil)
|
||||
add(Logger::Severity::INFO, msg)
|
||||
end
|
||||
|
||||
def self.warn(msg=nil)
|
||||
add(Logger::Severity::WARN, msg)
|
||||
end
|
||||
|
||||
def self.error(msg=nil)
|
||||
add(Logger::Severity::ERROR, msg)
|
||||
end
|
||||
|
||||
def self.fatal(msg=nil)
|
||||
add(Logger::Severity::FATAL, msg)
|
||||
end
|
||||
|
||||
def self.unknown(msg=nil)
|
||||
add(Logger::Severity::UNKNOWN, msg)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def self.flush_messages
|
||||
until @shutdown do
|
||||
until @queue.empty?
|
||||
severity, msg = @queue.pop
|
||||
@log.add(severity, msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
|
@ -53,7 +53,7 @@ module Astute
|
|||
def main_worker
|
||||
@consumer = AMQP::Consumer.new(@channel, @queue, consumer_tag=nil, exclusive=false)
|
||||
@consumer.on_cancel do |basic_cancel|
|
||||
Astute::Server::AsyncLogger.debug("Received cancel notification from in main worker.")
|
||||
Astute.logger.debug("Received cancel notification from in main worker.")
|
||||
@exchange.auto_recover
|
||||
@service_exchange.auto_recover
|
||||
@queue.auto_recover
|
||||
|
@ -61,11 +61,11 @@ module Astute
|
|||
end
|
||||
@consumer.on_delivery do |metadata, payload|
|
||||
if @main_work_thread.nil? || !@main_work_thread.alive?
|
||||
Astute::Server::AsyncLogger.debug "Process message from worker queue: #{payload.inspect}"
|
||||
Astute.logger.debug "Process message from worker queue: #{payload.inspect}"
|
||||
metadata.ack
|
||||
perform_main_job(metadata, payload)
|
||||
else
|
||||
Astute::Server::AsyncLogger.debug "Requeue message because worker is busy: #{payload.inspect}"
|
||||
Astute.logger.debug "Requeue message because worker is busy: #{payload.inspect}"
|
||||
# Avoid throttle by consume/reject cycle if only one worker is running
|
||||
EM.add_timer(2) { metadata.reject(:requeue => true) }
|
||||
end
|
||||
|
@ -75,7 +75,7 @@ module Astute
|
|||
|
||||
def service_worker
|
||||
@service_queue.subscribe do |_, payload|
|
||||
Astute::Server::AsyncLogger.debug "Process message from service queue: #{payload.inspect}"
|
||||
Astute.logger.debug "Process message from service queue: #{payload.inspect}"
|
||||
perform_service_job(nil, payload)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -26,7 +26,6 @@ module Astute
|
|||
def start
|
||||
super
|
||||
start_heartbeat
|
||||
Astute::Server::AsyncLogger.start_up(Astute.logger)
|
||||
end
|
||||
|
||||
def stop
|
||||
|
@ -35,7 +34,6 @@ module Astute
|
|||
@connection.close{ stop_event_machine } if @connection
|
||||
ensure
|
||||
stop_event_machine
|
||||
Astute::Server::AsyncLogger.shutdown
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -89,7 +87,7 @@ module Astute
|
|||
|
||||
def configure_connection(connection)
|
||||
connection.on_tcp_connection_loss do |conn, settings|
|
||||
Astute::Server::AsyncLogger.warn "Trying to reconnect to message broker. Retry #{DELAY_SEC} sec later..."
|
||||
Astute.logger.warn "Trying to reconnect to message broker. Retry #{DELAY_SEC} sec later..."
|
||||
EM.add_timer(DELAY_SEC) { conn.reconnect }
|
||||
end
|
||||
connection
|
||||
|
@ -103,7 +101,7 @@ module Astute
|
|||
if error.reply_code == 406 #PRECONDITION_FAILED
|
||||
cleanup_rabbitmq_stuff
|
||||
else
|
||||
Astute::Server::AsyncLogger.fatal "Channel error #{error.inspect}"
|
||||
Astute.logger.fatal "Channel error #{error.inspect}"
|
||||
end
|
||||
sleep DELAY_SEC # avoid race condition
|
||||
stop
|
||||
|
|
Loading…
Reference in New Issue