Added retry logic for RabbitMQ connection

Change-Id: I7707cd4c161f070fc95a47596902bbbc62906a1c
This commit is contained in:
Serg Melikyan 2013-06-17 18:40:14 +04:00
parent 62600772c0
commit cbb0f68318
2 changed files with 118 additions and 37 deletions

View File

@ -12,35 +12,37 @@
# License for the specific language governing permissions and limitations
# under the License.
import socket
from amqplib.client_0_8 import AMQPConnectionException
import anyjson
from eventlet import patcher
import eventlet
from muranoapi.common.utils import retry, handle
from muranoapi.db.models import Status, Session, Environment
from muranoapi.db.session import get_session
amqp = patcher.import_patched('amqplib.client_0_8')
from muranoapi.openstack.common import service
from muranoapi.openstack.common import log as logging
from muranoapi.common import config
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
conf = config.CONF.reports
rabbitmq = config.CONF.rabbitmq
log = logging.getLogger(__name__)
channel = None
class TaskResultHandlerService(service.Service):
def __init__(self, threads=1000):
super(TaskResultHandlerService, self).__init__(threads)
class TaskResultHandlerService():
thread = None
def start(self):
super(TaskResultHandlerService, self).start()
self.tg.add_thread(self._handle_results)
self.thread = eventlet.spawn(self.connect)
def stop(self):
super(TaskResultHandlerService, self).stop()
pass
def _handle_results(self):
def wait(self):
self.thread.wait()
@retry((socket.error, AMQPConnectionException), tries=-1)
def connect(self):
connection = amqp.Connection('{0}:{1}'.
format(rabbitmq.host, rabbitmq.port),
virtual_host=rabbitmq.virtual_host,
@ -67,36 +69,15 @@ class TaskResultHandlerService(service.Service):
ch.wait()
def handle_report(msg):
log.debug(_('Got report message from orchestration engine:\n{0}'.
format(msg.body)))
params = anyjson.deserialize(msg.body)
params['entity_id'] = params['id']
del params['id']
status = Status()
status.update(params)
session = get_session()
#connect with session
conf_session = session.query(Session).filter_by(
**{'environment_id': status.environment_id,
'state': 'deploying'}).first()
status.session_id = conf_session.id
with session.begin():
session.add(status)
@handle
def handle_result(msg):
log.debug(_('Got result message from '
'orchestration engine:\n{0}'.format(msg.body)))
environment_result = anyjson.deserialize(msg.body)
if 'deleted' in environment_result:
log.debug(_('Result for environment {0} is dropped. '
'Environment is deleted'.format(environment_result['id'])))
log.debug(_('Result for environment {0} is dropped. Environment '
'is deleted'.format(environment_result['id'])))
msg.channel.basic_ack(msg.delivery_tag)
return
@ -119,3 +100,26 @@ def handle_result(msg):
conf_session.save(session)
msg.channel.basic_ack(msg.delivery_tag)
@handle
def handle_report(msg):
log.debug(_('Got report message from orchestration '
'engine:\n{0}'.format(msg.body)))
params = anyjson.deserialize(msg.body)
params['entity_id'] = params['id']
del params['id']
status = Status()
status.update(params)
session = get_session()
#connect with session
conf_session = session.query(Session).filter_by(
**{'environment_id': status.environment_id,
'state': 'deploying'}).first()
status.session_id = conf_session.id
with session.begin():
session.add(status)

77
muranoapi/common/utils.py Normal file
View File

@ -0,0 +1,77 @@
# Copyright (c) 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from functools import wraps
from muranoapi.openstack.common import log as logging
log = logging.getLogger(__name__)
def retry(ExceptionToCheck, tries=4, delay=3, backoff=2):
"""Retry calling the decorated function using an exponential backoff.
http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
:param ExceptionToCheck: the exception to check. may be a tuple of
exceptions to check
:type ExceptionToCheck: Exception or tuple
:param tries: number of times to try (not retry) before giving up
:type tries: int
:param delay: initial delay between retries in seconds
:type delay: int
:param backoff: backoff multiplier e.g. value of 2 will double the delay
each retry
:type backoff: int
"""
def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
forever = mtries == -1
while forever or mtries > 1:
try:
return f(*args, **kwargs)
except ExceptionToCheck as e:
log.exception(e)
log.info("Retrying in {0} seconds...".format(mdelay))
eventlet.sleep(mdelay)
if not forever:
mtries -= 1
if mdelay < 60:
mdelay *= backoff
return f(*args, **kwargs)
return f_retry
return deco_retry
def handle(f):
"""Handles exception in wrapped function and writes to log."""
@wraps(f)
def f_handle(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
log.exception(e)
return f_handle