Add and use a small retry helper utility object

This commit is contained in:
Joshua Harlow 2015-06-16 23:50:36 -07:00
parent 2bd3b364b1
commit 20af3ceffb
2 changed files with 50 additions and 42 deletions

View File

@ -17,6 +17,7 @@
# under the License.
import logging
import time
from monotonic import monotonic as now # noqa
@ -59,6 +60,35 @@ class LockStack(object):
am_left -= 1
class RetryAgain(Exception):
"""Exception to signal to retry helper to try again."""
class Retry(object):
"""A little retry helper object."""
def __init__(self, delay, max_delay,
sleep_func=time.sleep):
self.delay = delay,
self.attempts = 0
self.max_delay = max_delay
self.sleep_func = sleep_func
def __call__(self, fn, *args, **kwargs):
while True:
self.attempts += 1
try:
return fn(*args, **kwargs)
except RetryAgain:
maybe_delay = self.attempts * self.delay
if maybe_delay < self.max_delay:
actual_delay = maybe_delay
else:
actual_delay = self.max_delay
actual_delay = max(0.0, actual_delay)
self.sleep_func(actual_delay)
class StopWatch(object):
"""A really basic stop watch."""

View File

@ -16,8 +16,6 @@
# under the License.
import errno
import functools
import itertools
import logging
import os
import threading
@ -30,10 +28,6 @@ from fasteners import _utils
LOG = logging.getLogger(__name__)
def _noop_delay(attempts):
return None
def _ensure_tree(path):
"""Create a directory (and any ancestor directories required).
@ -92,28 +86,25 @@ class _InterProcessLock(object):
self.acquired = False
self.sleep_func = sleep_func
def _do_acquire(self, delay_func, blocking, watch):
attempts_iter = itertools.count(1)
while True:
attempts = six.next(attempts_iter)
try:
self.trylock()
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
if not blocking or watch.expired():
return (False, attempts)
else:
delay_func(attempts)
def _try_acquire(self, blocking, watch):
try:
self.trylock()
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
if not blocking or watch.expired():
return False
else:
raise threading.ThreadError("Unable to acquire lock on"
" `%(path)s` due to"
" %(exception)s" %
{
'path': self.path,
'exception': e,
})
raise _utils.RetryAgain()
else:
return (True, attempts)
raise threading.ThreadError("Unable to acquire lock on"
" `%(path)s` due to"
" %(exception)s" %
{
'path': self.path,
'exception': e,
})
else:
return True
def _do_open(self):
basedir = os.path.dirname(self.path)
@ -127,15 +118,6 @@ class _InterProcessLock(object):
if self.lockfile is None or self.lockfile.closed:
self.lockfile = open(self.path, 'a')
def _backoff_multiplier_delay(self, attempts, delay, max_delay):
maybe_delay = attempts * delay
if maybe_delay < max_delay:
actual_delay = maybe_delay
else:
actual_delay = max_delay
actual_delay = max(0.0, actual_delay)
self.sleep_func(actual_delay)
def acquire(self, blocking=True,
delay=DELAY_INCREMENT, max_delay=MAX_DELAY,
timeout=None):
@ -164,13 +146,9 @@ class _InterProcessLock(object):
max_delay = delay
self._do_open()
watch = _utils.StopWatch(duration=timeout)
if blocking:
delay_func = functools.partial(self._backoff_multiplier_delay,
delay=delay, max_delay=max_delay)
else:
delay_func = _noop_delay
r = _utils.Retry(delay, max_delay, sleep_func=self.sleep_func)
with watch:
gotten, attempts = self._do_acquire(delay_func, blocking, watch)
gotten = r(self._try_acquire, blocking, watch)
if not gotten:
self.acquired = False
return False
@ -179,7 +157,7 @@ class _InterProcessLock(object):
LOG.log(_utils.BLATHER,
"Acquired file lock `%s` after waiting %0.3fs [%s"
" attempts were required]", self.path, watch.elapsed(),
attempts)
r.attempts)
return True
def _do_close(self):