summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-06-17 15:06:32 +0000
committerGerrit Code Review <review@openstack.org>2013-06-17 15:06:32 +0000
commit9ae78d0121bb7dc3e2f04bd8fdc8ca4314b7cfc5 (patch)
tree5a53000c1a7fe646b5e9ee83e72c79316a253a75
parent47c6f9c2f8d3af1d72dc7ec1bf79da4d1918da6a (diff)
parentcbb0f683187f49a257e0007b6fc47a2dda50177a (diff)
Merge "Added retry logic for RabbitMQ connection" into release-0.10.1release-0.1
-rw-r--r--muranoapi/common/service.py78
-rw-r--r--muranoapi/common/utils.py77
2 files changed, 118 insertions, 37 deletions
diff --git a/muranoapi/common/service.py b/muranoapi/common/service.py
index 65a1eba..4e8208d 100644
--- a/muranoapi/common/service.py
+++ b/muranoapi/common/service.py
@@ -12,35 +12,37 @@
12# License for the specific language governing permissions and limitations 12# License for the specific language governing permissions and limitations
13# under the License. 13# under the License.
14 14
15import socket
16
17from amqplib.client_0_8 import AMQPConnectionException
15import anyjson 18import anyjson
16from eventlet import patcher 19import eventlet
20from muranoapi.common.utils import retry, handle
17from muranoapi.db.models import Status, Session, Environment 21from muranoapi.db.models import Status, Session, Environment
18from muranoapi.db.session import get_session 22from muranoapi.db.session import get_session
19
20amqp = patcher.import_patched('amqplib.client_0_8')
21
22from muranoapi.openstack.common import service
23from muranoapi.openstack.common import log as logging 23from muranoapi.openstack.common import log as logging
24from muranoapi.common import config 24from muranoapi.common import config
25 25
26amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
26conf = config.CONF.reports 27conf = config.CONF.reports
27rabbitmq = config.CONF.rabbitmq 28rabbitmq = config.CONF.rabbitmq
28log = logging.getLogger(__name__) 29log = logging.getLogger(__name__)
29channel = None
30 30
31 31
32class TaskResultHandlerService(service.Service): 32class TaskResultHandlerService():
33 def __init__(self, threads=1000): 33 thread = None
34 super(TaskResultHandlerService, self).__init__(threads)
35 34
36 def start(self): 35 def start(self):
37 super(TaskResultHandlerService, self).start() 36 self.thread = eventlet.spawn(self.connect)
38 self.tg.add_thread(self._handle_results)
39 37
40 def stop(self): 38 def stop(self):
41 super(TaskResultHandlerService, self).stop() 39 pass
42 40
43 def _handle_results(self): 41 def wait(self):
42 self.thread.wait()
43
44 @retry((socket.error, AMQPConnectionException), tries=-1)
45 def connect(self):
44 connection = amqp.Connection('{0}:{1}'. 46 connection = amqp.Connection('{0}:{1}'.
45 format(rabbitmq.host, rabbitmq.port), 47 format(rabbitmq.host, rabbitmq.port),
46 virtual_host=rabbitmq.virtual_host, 48 virtual_host=rabbitmq.virtual_host,
@@ -67,36 +69,15 @@ class TaskResultHandlerService(service.Service):
67 ch.wait() 69 ch.wait()
68 70
69 71
70def handle_report(msg): 72@handle
71 log.debug(_('Got report message from orchestration engine:\n{0}'.
72 format(msg.body)))
73
74 params = anyjson.deserialize(msg.body)
75 params['entity_id'] = params['id']
76 del params['id']
77
78 status = Status()
79 status.update(params)
80
81 session = get_session()
82 #connect with session
83 conf_session = session.query(Session).filter_by(
84 **{'environment_id': status.environment_id,
85 'state': 'deploying'}).first()
86 status.session_id = conf_session.id
87
88 with session.begin():
89 session.add(status)
90
91
92def handle_result(msg): 73def handle_result(msg):
93 log.debug(_('Got result message from ' 74 log.debug(_('Got result message from '
94 'orchestration engine:\n{0}'.format(msg.body))) 75 'orchestration engine:\n{0}'.format(msg.body)))
95 76
96 environment_result = anyjson.deserialize(msg.body) 77 environment_result = anyjson.deserialize(msg.body)
97 if 'deleted' in environment_result: 78 if 'deleted' in environment_result:
98 log.debug(_('Result for environment {0} is dropped. ' 79 log.debug(_('Result for environment {0} is dropped. Environment '
99 'Environment is deleted'.format(environment_result['id']))) 80 'is deleted'.format(environment_result['id'])))
100 81
101 msg.channel.basic_ack(msg.delivery_tag) 82 msg.channel.basic_ack(msg.delivery_tag)
102 return 83 return
@@ -119,3 +100,26 @@ def handle_result(msg):
119 conf_session.save(session) 100 conf_session.save(session)
120 101
121 msg.channel.basic_ack(msg.delivery_tag) 102 msg.channel.basic_ack(msg.delivery_tag)
103
104
105@handle
106def handle_report(msg):
107 log.debug(_('Got report message from orchestration '
108 'engine:\n{0}'.format(msg.body)))
109
110 params = anyjson.deserialize(msg.body)
111 params['entity_id'] = params['id']
112 del params['id']
113
114 status = Status()
115 status.update(params)
116
117 session = get_session()
118 #connect with session
119 conf_session = session.query(Session).filter_by(
120 **{'environment_id': status.environment_id,
121 'state': 'deploying'}).first()
122 status.session_id = conf_session.id
123
124 with session.begin():
125 session.add(status)
diff --git a/muranoapi/common/utils.py b/muranoapi/common/utils.py
new file mode 100644
index 0000000..d276188
--- /dev/null
+++ b/muranoapi/common/utils.py
@@ -0,0 +1,77 @@
1# Copyright (c) 2013 Mirantis, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import eventlet
16from functools import wraps
17from muranoapi.openstack.common import log as logging
18
19log = logging.getLogger(__name__)
20
21
22def retry(ExceptionToCheck, tries=4, delay=3, backoff=2):
23 """Retry calling the decorated function using an exponential backoff.
24
25 http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
26 original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
27
28 :param ExceptionToCheck: the exception to check. may be a tuple of
29 exceptions to check
30 :type ExceptionToCheck: Exception or tuple
31 :param tries: number of times to try (not retry) before giving up
32 :type tries: int
33 :param delay: initial delay between retries in seconds
34 :type delay: int
35 :param backoff: backoff multiplier e.g. value of 2 will double the delay
36 each retry
37 :type backoff: int
38 """
39
40 def deco_retry(f):
41 @wraps(f)
42 def f_retry(*args, **kwargs):
43 mtries, mdelay = tries, delay
44 forever = mtries == -1
45 while forever or mtries > 1:
46 try:
47 return f(*args, **kwargs)
48 except ExceptionToCheck as e:
49
50 log.exception(e)
51 log.info("Retrying in {0} seconds...".format(mdelay))
52
53 eventlet.sleep(mdelay)
54
55 if not forever:
56 mtries -= 1
57
58 if mdelay < 60:
59 mdelay *= backoff
60 return f(*args, **kwargs)
61
62 return f_retry
63
64 return deco_retry
65
66
67def handle(f):
68 """Handles exception in wrapped function and writes to log."""
69
70 @wraps(f)
71 def f_handle(*args, **kwargs):
72 try:
73 return f(*args, **kwargs)
74 except Exception as e:
75 log.exception(e)
76
77 return f_handle