From 20af3ceffb47392e271611473fdbf19cb8397902 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 16 Jun 2015 23:50:36 -0700 Subject: [PATCH] Add and use a small retry helper utility object --- fasteners/_utils.py | 30 +++++++++++++++++++ fasteners/process_lock.py | 62 +++++++++++++-------------------------- 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/fasteners/_utils.py b/fasteners/_utils.py index 108b156..272e84b 100644 --- a/fasteners/_utils.py +++ b/fasteners/_utils.py @@ -17,6 +17,7 @@ # under the License. import logging +import time from monotonic import monotonic as now # noqa @@ -59,6 +60,35 @@ class LockStack(object): am_left -= 1 +class RetryAgain(Exception): + """Exception to signal to retry helper to try again.""" + + +class Retry(object): + """A little retry helper object.""" + + def __init__(self, delay, max_delay, + sleep_func=time.sleep): + self.delay = delay, + self.attempts = 0 + self.max_delay = max_delay + self.sleep_func = sleep_func + + def __call__(self, fn, *args, **kwargs): + while True: + self.attempts += 1 + try: + return fn(*args, **kwargs) + except RetryAgain: + maybe_delay = self.attempts * self.delay + if maybe_delay < self.max_delay: + actual_delay = maybe_delay + else: + actual_delay = self.max_delay + actual_delay = max(0.0, actual_delay) + self.sleep_func(actual_delay) + + class StopWatch(object): """A really basic stop watch.""" diff --git a/fasteners/process_lock.py b/fasteners/process_lock.py index cecfd00..d165304 100644 --- a/fasteners/process_lock.py +++ b/fasteners/process_lock.py @@ -16,8 +16,6 @@ # under the License. import errno -import functools -import itertools import logging import os import threading @@ -30,10 +28,6 @@ from fasteners import _utils LOG = logging.getLogger(__name__) -def _noop_delay(attempts): - return None - - def _ensure_tree(path): """Create a directory (and any ancestor directories required). @@ -92,28 +86,25 @@ class _InterProcessLock(object): self.acquired = False self.sleep_func = sleep_func - def _do_acquire(self, delay_func, blocking, watch): - attempts_iter = itertools.count(1) - while True: - attempts = six.next(attempts_iter) - try: - self.trylock() - except IOError as e: - if e.errno in (errno.EACCES, errno.EAGAIN): - if not blocking or watch.expired(): - return (False, attempts) - else: - delay_func(attempts) + def _try_acquire(self, blocking, watch): + try: + self.trylock() + except IOError as e: + if e.errno in (errno.EACCES, errno.EAGAIN): + if not blocking or watch.expired(): + return False else: - raise threading.ThreadError("Unable to acquire lock on" - " `%(path)s` due to" - " %(exception)s" % - { - 'path': self.path, - 'exception': e, - }) + raise _utils.RetryAgain() else: - return (True, attempts) + raise threading.ThreadError("Unable to acquire lock on" + " `%(path)s` due to" + " %(exception)s" % + { + 'path': self.path, + 'exception': e, + }) + else: + return True def _do_open(self): basedir = os.path.dirname(self.path) @@ -127,15 +118,6 @@ class _InterProcessLock(object): if self.lockfile is None or self.lockfile.closed: self.lockfile = open(self.path, 'a') - def _backoff_multiplier_delay(self, attempts, delay, max_delay): - maybe_delay = attempts * delay - if maybe_delay < max_delay: - actual_delay = maybe_delay - else: - actual_delay = max_delay - actual_delay = max(0.0, actual_delay) - self.sleep_func(actual_delay) - def acquire(self, blocking=True, delay=DELAY_INCREMENT, max_delay=MAX_DELAY, timeout=None): @@ -164,13 +146,9 @@ class _InterProcessLock(object): max_delay = delay self._do_open() watch = _utils.StopWatch(duration=timeout) - if blocking: - delay_func = functools.partial(self._backoff_multiplier_delay, - delay=delay, max_delay=max_delay) - else: - delay_func = _noop_delay + r = _utils.Retry(delay, max_delay, sleep_func=self.sleep_func) with watch: - gotten, attempts = self._do_acquire(delay_func, blocking, watch) + gotten = r(self._try_acquire, blocking, watch) if not gotten: self.acquired = False return False @@ -179,7 +157,7 @@ class _InterProcessLock(object): LOG.log(_utils.BLATHER, "Acquired file lock `%s` after waiting %0.3fs [%s" " attempts were required]", self.path, watch.elapsed(), - attempts) + r.attempts) return True def _do_close(self):