From 90125aa885a8a09bba71595699d937d8a3c23ba4 Mon Sep 17 00:00:00 2001 From: Gevorg Davoian Date: Fri, 21 Oct 2016 17:18:55 +0300 Subject: [PATCH] Replace retrying with tenacity This patch replaces the legacy retrying library with the newer and more convenient tenacity one, taking into account that: 1) retrying uses milliseconds for wait times, but tenacity uses seconds; 2) retrying has a lot of numeric arguments for specifying behaviour of decorated functions, while tenacity has a few of them, which are specialized objects, thus making the retry-decorator more flexible. Change-Id: Ib6ecffe5d1cf292badbb9eb6db6260f17460f343 Closes-Bug: #1635399 --- oslo_messaging/_drivers/impl_pika.py | 33 +++++++++------ .../_drivers/pika_driver/pika_message.py | 42 +++++++++++-------- .../client/zmq_publisher_manager.py | 12 +++--- .../matchmaker/zmq_matchmaker_redis.py | 32 +++++++------- .../tests/drivers/pika/test_message.py | 11 +++-- requirements.txt | 6 +-- 6 files changed, 74 insertions(+), 62 deletions(-) diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index d29131914..79955ef38 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -16,7 +16,7 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_utils import timeutils import pika_pool -import retrying +import tenacity from oslo_messaging._drivers import base from oslo_messaging._drivers.pika_driver import (pika_connection_factory as @@ -201,14 +201,15 @@ class PikaDriver(base.BaseDriver): else: return False - retrier = ( - None if retry == 0 else - retrying.retry( - stop_max_attempt_number=(None if retry == -1 else retry), - retry_on_exception=on_exception, - wait_fixed=self._pika_engine.rpc_retry_delay * 1000, + if retry: + retrier = tenacity.retry( + stop=(tenacity.stop_never if retry == -1 else + tenacity.stop_after_attempt(retry)), + retry=tenacity.retry_if_exception(on_exception), + wait=tenacity.wait_fixed(self._pika_engine.rpc_retry_delay) ) - ) + else: + retrier = None if target.fanout: return self.cast_all_workers( @@ -312,11 +313,17 @@ class PikaDriver(base.BaseDriver): else: return False - retrier = retrying.retry( - stop_max_attempt_number=(None if retry == -1 else retry), - retry_on_exception=on_exception, - wait_fixed=self._pika_engine.notification_retry_delay * 1000, - ) + if retry: + retrier = tenacity.retry( + stop=(tenacity.stop_never if retry == -1 else + tenacity.stop_after_attempt(retry)), + retry=tenacity.retry_if_exception(on_exception), + wait=tenacity.wait_fixed( + self._pika_engine.notification_retry_delay + ) + ) + else: + retrier = None msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message, ctxt) diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 86ede6af1..959f700a0 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -25,8 +25,8 @@ from oslo_utils import timeutils from pika import exceptions as pika_exceptions from pika import spec as pika_spec import pika_pool -import retrying import six +import tenacity import oslo_messaging @@ -201,14 +201,22 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage): else: return False - retrier = retrying.retry( - stop_max_attempt_number=( - None if self._pika_engine.rpc_reply_retry_attempts == -1 - else self._pika_engine.rpc_reply_retry_attempts - ), - retry_on_exception=on_exception, - wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000, - ) if self._pika_engine.rpc_reply_retry_attempts else None + if self._pika_engine.rpc_reply_retry_attempts: + retrier = tenacity.retry( + stop=( + tenacity.stop_never + if self._pika_engine.rpc_reply_retry_attempts == -1 else + tenacity.stop_after_attempt( + self._pika_engine.rpc_reply_retry_attempts + ) + ), + retry=tenacity.retry_if_exception(on_exception), + wait=tenacity.wait_fixed( + self._pika_engine.rpc_reply_retry_delay + ) + ) + else: + retrier = None try: timeout = (None if self.expiration_time is None else @@ -438,8 +446,8 @@ class PikaOutgoingMessage(object): for routing into durable queues :param stopwatch: StopWatch, stopwatch object for calculating allowed timeouts - :param retrier: retrying.Retrier, configured retrier object for sending - message, if None no retrying is performed + :param retrier: tenacity.Retrying, configured retrier object for + sending message, if None no retrying is performed """ msg_props.delivery_mode = 2 if persistent else 1 @@ -475,8 +483,8 @@ class PikaOutgoingMessage(object): for routing into durable queues :param stopwatch: StopWatch, stopwatch object for calculating allowed timeouts - :param retrier: retrying.Retrier, configured retrier object for sending - message, if None no retrying is performed + :param retrier: tenacity.Retrying, configured retrier object for + sending message, if None no retrying is performed """ msg_dict, msg_props = self._prepare_message_to_send() @@ -506,8 +514,8 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): reply. If None - return immediately without reply waiting :param stopwatch: StopWatch, stopwatch object for calculating allowed timeouts - :param retrier: retrying.Retrier, configured retrier object for sending - message, if None no retrying is performed + :param retrier: tenacity.Retrying, configured retrier object for + sending message, if None no retrying is performed """ msg_dict, msg_props = self._prepare_message_to_send() @@ -595,8 +603,8 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): :param reply_q: String, queue name for sending reply :param stopwatch: StopWatch, stopwatch object for calculating allowed timeouts - :param retrier: retrying.Retrier, configured retrier object for sending - message, if None no retrying is performed + :param retrier: tenacity.Retrying, configured retrier object for + sending message, if None no retrying is performed """ msg_dict, msg_props = self._prepare_message_to_send() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py index 5abb58def..e3f77bdda 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py @@ -16,8 +16,8 @@ import abc import contextlib import logging -import retrying import six +import tenacity from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base from oslo_messaging._drivers.zmq_driver import zmq_async @@ -30,9 +30,9 @@ zmq = zmq_async.import_zmq() def _drop_message_warn(request): LOG.warning(_LW("Matchmaker contains no records for specified " - "target %(target)s. Dropping message %(msg_id)s.") - % {"target": request.target, - "msg_id": request.message_id}) + "target %(target)s. Dropping message %(msg_id)s."), + {"target": request.target, + "msg_id": request.message_id}) def target_not_found_warn(func): @@ -40,7 +40,7 @@ def target_not_found_warn(func): try: return func(self, request, *args, **kwargs) except (zmq_matchmaker_base.MatchmakerUnavailable, - retrying.RetryError): + tenacity.RetryError): _drop_message_warn(request) return _target_not_found_warn @@ -50,7 +50,7 @@ def target_not_found_timeout(func): try: return func(self, request, *args, **kwargs) except (zmq_matchmaker_base.MatchmakerUnavailable, - retrying.RetryError): + tenacity.RetryError): _drop_message_warn(request) self.publisher._raise_timeout(request) return _target_not_found_timeout diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py index 1e6c172fb..640a19d72 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py @@ -20,8 +20,8 @@ import time from oslo_config import cfg from oslo_utils import importutils -from retrying import retry import six +import tenacity from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base from oslo_messaging._drivers.zmq_driver import zmq_address @@ -30,6 +30,7 @@ from oslo_messaging._i18n import _LE, _LI, _LW redis = importutils.try_import('redis') redis_sentinel = importutils.try_import('redis.sentinel') + LOG = logging.getLogger(__name__) @@ -54,8 +55,8 @@ matchmaker_redis_opts = [ default=[], deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", - help='List of Redis Sentinel hosts (fault tolerance mode) e.g.\ - [host:port, host1:port ... ]'), + help='List of Redis Sentinel hosts (fault tolerance mode), ' + 'e.g., [host:port, host1:port ... ]'), cfg.StrOpt('sentinel_group_name', default='oslo-messaging-zeromq', help='Redis replica set name.'), @@ -67,7 +68,7 @@ matchmaker_redis_opts = [ help='Time in ms to wait before the transaction is killed.'), cfg.IntOpt('socket_timeout', default=10000, - help='Timeout in ms on blocking socket operations'), + help='Timeout in ms on blocking socket operations.'), ] _PUBLISHERS_KEY = "PUBLISHERS" @@ -132,11 +133,7 @@ def empty_list_on_error(func): return func_wrapper -def retry_if_connection_error(ex): - return isinstance(ex, zmq_matchmaker_base.MatchmakerUnavailable) - - -def retry_if_empty(hosts): +def is_empty(hosts): return not hosts @@ -239,12 +236,15 @@ class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase): return self._retry_method(target, listener_type, self.get_hosts_fanout) def _retry_method(self, target, listener_type, method): - @retry(retry_on_result=retry_if_empty, - wrap_exception=True, - wait_fixed=self.conf.matchmaker_redis.wait_timeout, - stop_max_delay=self.conf.matchmaker_redis.check_timeout) + wait_timeout = self.conf.matchmaker_redis.wait_timeout / 1000. + check_timeout = self.conf.matchmaker_redis.check_timeout / 1000. + + @tenacity.retry(retry=tenacity.retry_if_result(is_empty), + wait=tenacity.wait_fixed(wait_timeout), + stop=tenacity.stop_after_delay(check_timeout)) def _get_hosts_retry(target, listener_type): return method(target, listener_type) + return _get_hosts_retry(target, listener_type) @@ -362,15 +362,15 @@ class MatchmakerSentinel(MatchmakerRedisBase): def __init__(self, conf, *args, **kwargs): super(MatchmakerSentinel, self).__init__(conf, *args, **kwargs) - socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000. self._sentinel_hosts, password, master_group = \ self._extract_sentinel_hosts() self._sentinel = redis_sentinel.Sentinel( sentinels=self._sentinel_hosts, - socket_timeout=socket_timeout, - password=password) + socket_timeout=self.conf.matchmaker_redis.socket_timeout / 1000., + password=password + ) self._redis_master = self._sentinel.master_for(master_group) self._redis_slave = self._sentinel.slave_for(master_group) diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py index 630fb0df9..40d2c51b7 100644 --- a/oslo_messaging/tests/drivers/pika/test_message.py +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + import functools import unittest @@ -172,7 +173,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): @patch("oslo_messaging._drivers.pika_driver.pika_message." "RpcReplyPikaOutgoingMessage") - @patch("retrying.retry") + @patch("tenacity.retry") def test_positive_reply_for_call_message(self, retry_mock, outgoing_message_mock): @@ -202,13 +203,12 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY ) retry_mock.assert_called_once_with( - retry_on_exception=mock.ANY, stop_max_attempt_number=3, - wait_fixed=250.0 + stop=mock.ANY, retry=mock.ANY, wait=mock.ANY ) @patch("oslo_messaging._drivers.pika_driver.pika_message." "RpcReplyPikaOutgoingMessage") - @patch("retrying.retry") + @patch("tenacity.retry") def test_negative_reply_for_call_message(self, retry_mock, outgoing_message_mock): @@ -241,8 +241,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY ) retry_mock.assert_called_once_with( - retry_on_exception=mock.ANY, stop_max_attempt_number=3, - wait_fixed=250.0 + stop=mock.ANY, retry=mock.ANY, wait=mock.ANY ) diff --git a/requirements.txt b/requirements.txt index 11b527150..6657d3132 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,9 +20,7 @@ monotonic>=0.6 # Apache-2.0 six>=1.9.0 # MIT cachetools>=1.1.0 # MIT License - -# FIXME(markmc): remove this when the drivers no longer -# import eventlet +# FIXME(markmc): remove this when the drivers no longer import eventlet eventlet!=0.18.3,>=0.18.2 # MIT greenlet>=0.3.2 # MIT @@ -41,7 +39,7 @@ pika-pool>=0.1.3 # BSD # used by pika and zmq drivers futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD -retrying!=1.3.0,>=1.2.3 # Apache-2.0 +tenacity>=3.2.1 # Apache-2.0 # middleware oslo.middleware>=3.0.0 # Apache-2.0