Merge "Add support for fair locks"

This commit is contained in:
Zuul 2018-11-05 23:22:41 +00:00 committed by Gerrit Code Review
commit 0767ddf4c2
4 changed files with 140 additions and 6 deletions

View File

@ -47,6 +47,32 @@ sure that the names of the locks used are carefully chosen (typically by
namespacing them to your app so that other apps will not chose the same
names).
Enabling fair locking
=====================
By default there is no requirement that the lock is ``fair``. That is, it's
possible for a thread to block waiting for the lock, then have another thread
block waiting for the lock, and when the lock is released by the current owner
the second waiter could acquire the lock before the first. In an extreme case
you could have a whole string of other threads acquire the lock before the
first waiter acquires it, resulting in unpredictable amounts of latency.
For cases where this is a problem, it's possible to specify the use of fair
locks::
@lockutils.synchronized('not_thread_process_safe', fair=True)
def not_thread_process_safe():
pass
When using fair locks the lock itself is slightly more expensive (which
shouldn't matter in most cases), but it will ensure that all threads that
block waiting for the lock will acquire it in the order that they blocked.
The exception to this is when specifying both ``external`` and ``fair``
locks. In this case, the ordering *within* a given process will be fair, but
the ordering *between* processes will be determined by the behaviour of the
underlying OS.
Common ways to prefix/namespace the synchronized decorator
==========================================================

View File

@ -87,6 +87,49 @@ ReaderWriterLock = fasteners.ReaderWriterLock
"""
class FairLocks(object):
"""A garbage collected container of fair locks.
With a fair lock, contending lockers will get the lock in the order in
which they tried to acquire it.
This collection internally uses a weak value dictionary so that when a
lock is no longer in use (by any threads) it will automatically be
removed from this container by the garbage collector.
"""
def __init__(self):
self._locks = weakref.WeakValueDictionary()
self._lock = threading.Lock()
def get(self, name):
"""Gets (or creates) a lock with a given name.
:param name: The lock name to get/create (used to associate
previously created names with the same lock).
Returns an newly constructed lock (or an existing one if it was
already created for the given name).
"""
with self._lock:
try:
return self._locks[name]
except KeyError:
# The fasteners module specifies that
# ReaderWriterLock.write_lock() will give FIFO behaviour,
# so we don't need to do anything special ourselves.
rwlock = ReaderWriterLock()
self._locks[name] = rwlock
return rwlock
_fair_locks = FairLocks()
def internal_fair_lock(name):
return _fair_locks.get(name)
class Semaphores(object):
"""A garbage collected container of semaphores.
@ -170,7 +213,7 @@ def internal_lock(name, semaphores=None):
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
do_log=True, semaphores=None, delay=0.01):
do_log=True, semaphores=None, delay=0.01, fair=False):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
@ -200,16 +243,26 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
:param delay: Delay between acquisition attempts (in seconds).
:param fair: Whether or not we want a "fair" lock where contending lockers
will get the lock in the order in which they tried to acquire it.
.. versionchanged:: 0.2
Added *do_log* optional parameter.
.. versionchanged:: 0.3
Added *delay* and *semaphores* optional parameters.
"""
int_lock = internal_lock(name, semaphores=semaphores)
if fair:
if semaphores is not None:
raise NotImplementedError(_('Specifying semaphores is not '
'supported when using fair locks.'))
# The fastners module specifies that write_lock() provides fairness.
int_lock = internal_fair_lock(name).write_lock()
else:
int_lock = internal_lock(name, semaphores=semaphores)
with int_lock:
if do_log:
LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
LOG.debug('Acquired lock "%(lock)s"', {'lock': name})
try:
if external and not CONF.oslo_concurrency.disable_process_locking:
ext_lock = external_lock(name, lock_file_prefix, lock_path)
@ -225,11 +278,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
yield int_lock
finally:
if do_log:
LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
LOG.debug('Releasing lock "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
semaphores=None, delay=0.01):
semaphores=None, delay=0.01, fair=False):
"""Synchronization decorator.
Decorating a method like so::
@ -264,7 +317,8 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
t2 = None
try:
with lock(name, lock_file_prefix, external, lock_path,
do_log=False, semaphores=semaphores, delay=delay):
do_log=False, semaphores=semaphores, delay=delay,
fair=fair):
t2 = timeutils.now()
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
'waited %(wait_secs)0.3fs',

View File

@ -147,6 +147,45 @@ class LockTestCase(test_base.BaseTestCase):
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
"Semaphore leak detected")
def test_lock_internal_fair(self):
"""Check that we're actually fair."""
def f(_id):
with lockutils.lock('testlock', 'test-',
external=False, fair=True):
lock_holder.append(_id)
lock_holder = []
threads = []
# While holding the fair lock, spawn a bunch of threads that all try
# to acquire the lock. They will all block. Then release the lock
# and see what happens.
with lockutils.lock('testlock', 'test-', external=False, fair=True):
for i in range(10):
thread = threading.Thread(target=f, args=(i,))
threads.append(thread)
thread.start()
# Allow some time for the new thread to get queued onto the
# list of pending writers before continuing. This is gross
# but there's no way around it without using knowledge of
# fasteners internals.
time.sleep(0.5)
# Wait for all threads.
for thread in threads:
thread.join()
self.assertEqual(10, len(lock_holder))
# Check that the threads each got the lock in fair order.
for i in range(10):
self.assertEqual(i, lock_holder[i])
def test_fair_lock_with_semaphore(self):
def do_test():
s = lockutils.Semaphores()
with lockutils.lock('testlock', 'test-', semaphores=s, fair=True):
pass
self.assertRaises(NotImplementedError, do_test)
def test_nested_synchronized_external_works(self):
"""We can nest external syncs."""
tempdir = tempfile.mkdtemp()

View File

@ -0,0 +1,15 @@
---
prelude: >
This release includes optional support for fair locks. When fair locks
are specified, blocking waiters will acquire the lock in the order that
they blocked.
features:
- |
We now have optional support for ``fair`` locks. When fair locks are
specified, blocking waiters will acquire the lock in the order that they
blocked. This can be useful to ensure that existing blocked waiters do
not wait indefinitely in the face of large numbers of new attempts to
acquire the lock. When specifying locks as both ``external`` and ``fair``,
the ordering *within* a given process will be fair, but the ordering
*between* processes will be determined by the behaviour of the underlying
OS.