diff --git a/muranoapi/common/service.py b/muranoapi/common/service.py index 65a1eba39..4e8208d45 100644 --- a/muranoapi/common/service.py +++ b/muranoapi/common/service.py @@ -12,35 +12,37 @@ # License for the specific language governing permissions and limitations # under the License. +import socket + +from amqplib.client_0_8 import AMQPConnectionException import anyjson -from eventlet import patcher +import eventlet +from muranoapi.common.utils import retry, handle from muranoapi.db.models import Status, Session, Environment from muranoapi.db.session import get_session - -amqp = patcher.import_patched('amqplib.client_0_8') - -from muranoapi.openstack.common import service from muranoapi.openstack.common import log as logging from muranoapi.common import config +amqp = eventlet.patcher.import_patched('amqplib.client_0_8') conf = config.CONF.reports rabbitmq = config.CONF.rabbitmq log = logging.getLogger(__name__) -channel = None -class TaskResultHandlerService(service.Service): - def __init__(self, threads=1000): - super(TaskResultHandlerService, self).__init__(threads) +class TaskResultHandlerService(): + thread = None def start(self): - super(TaskResultHandlerService, self).start() - self.tg.add_thread(self._handle_results) + self.thread = eventlet.spawn(self.connect) def stop(self): - super(TaskResultHandlerService, self).stop() + pass - def _handle_results(self): + def wait(self): + self.thread.wait() + + @retry((socket.error, AMQPConnectionException), tries=-1) + def connect(self): connection = amqp.Connection('{0}:{1}'. format(rabbitmq.host, rabbitmq.port), virtual_host=rabbitmq.virtual_host, @@ -67,36 +69,15 @@ class TaskResultHandlerService(service.Service): ch.wait() -def handle_report(msg): - log.debug(_('Got report message from orchestration engine:\n{0}'. - format(msg.body))) - - params = anyjson.deserialize(msg.body) - params['entity_id'] = params['id'] - del params['id'] - - status = Status() - status.update(params) - - session = get_session() - #connect with session - conf_session = session.query(Session).filter_by( - **{'environment_id': status.environment_id, - 'state': 'deploying'}).first() - status.session_id = conf_session.id - - with session.begin(): - session.add(status) - - +@handle def handle_result(msg): log.debug(_('Got result message from ' 'orchestration engine:\n{0}'.format(msg.body))) environment_result = anyjson.deserialize(msg.body) if 'deleted' in environment_result: - log.debug(_('Result for environment {0} is dropped. ' - 'Environment is deleted'.format(environment_result['id']))) + log.debug(_('Result for environment {0} is dropped. Environment ' + 'is deleted'.format(environment_result['id']))) msg.channel.basic_ack(msg.delivery_tag) return @@ -119,3 +100,26 @@ def handle_result(msg): conf_session.save(session) msg.channel.basic_ack(msg.delivery_tag) + + +@handle +def handle_report(msg): + log.debug(_('Got report message from orchestration ' + 'engine:\n{0}'.format(msg.body))) + + params = anyjson.deserialize(msg.body) + params['entity_id'] = params['id'] + del params['id'] + + status = Status() + status.update(params) + + session = get_session() + #connect with session + conf_session = session.query(Session).filter_by( + **{'environment_id': status.environment_id, + 'state': 'deploying'}).first() + status.session_id = conf_session.id + + with session.begin(): + session.add(status) diff --git a/muranoapi/common/utils.py b/muranoapi/common/utils.py new file mode 100644 index 000000000..d2761880b --- /dev/null +++ b/muranoapi/common/utils.py @@ -0,0 +1,77 @@ +# Copyright (c) 2013 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +from functools import wraps +from muranoapi.openstack.common import log as logging + +log = logging.getLogger(__name__) + + +def retry(ExceptionToCheck, tries=4, delay=3, backoff=2): + """Retry calling the decorated function using an exponential backoff. + + http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/ + original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry + + :param ExceptionToCheck: the exception to check. may be a tuple of + exceptions to check + :type ExceptionToCheck: Exception or tuple + :param tries: number of times to try (not retry) before giving up + :type tries: int + :param delay: initial delay between retries in seconds + :type delay: int + :param backoff: backoff multiplier e.g. value of 2 will double the delay + each retry + :type backoff: int + """ + + def deco_retry(f): + @wraps(f) + def f_retry(*args, **kwargs): + mtries, mdelay = tries, delay + forever = mtries == -1 + while forever or mtries > 1: + try: + return f(*args, **kwargs) + except ExceptionToCheck as e: + + log.exception(e) + log.info("Retrying in {0} seconds...".format(mdelay)) + + eventlet.sleep(mdelay) + + if not forever: + mtries -= 1 + + if mdelay < 60: + mdelay *= backoff + return f(*args, **kwargs) + + return f_retry + + return deco_retry + + +def handle(f): + """Handles exception in wrapped function and writes to log.""" + + @wraps(f) + def f_handle(*args, **kwargs): + try: + return f(*args, **kwargs) + except Exception as e: + log.exception(e) + + return f_handle