Move synchronized body to a first-class function

synchronized is being widely used throughout OpenStack for managing
locks by decorating methods / functions. However, it's functionality is
trapped within that decorator definition and makes it difficult to
manage locks by using it in other cases (like w/o decorating).

This patch moves decorator's body into a first-class function called
lock which is intended to be used as a context manager type (with
statement).

Some examples:

    with lockutils.lock("test") as sem:
        print('Inside the lock')

Things I'm not 100% convinced:

    * The `lock` function yields a Semaphore when external is False and
    an InterProcessLock instance otherwise. Although it is not 'good' to
    yield different values depending on the input parameters, it's
    pretty explicit (by reading the documentation) that external locks
    are handled differently. Other options for this case are:

        1. Always yield a Semaphore instance
        2. Don't yield anything

The patch doesn't break backward compatibility so no change is needed to
existing projects.

Implements blueprint cache-backend-abstraction

Change-Id: I96b2ee84da5a2dcd7f0bd469f8a1e52b74e50c75
This commit is contained in:
Flaper Fesp 2013-06-10 14:57:42 +02:00
parent 82433a46dc
commit 4a5687b0b5
2 changed files with 188 additions and 127 deletions

View File

@ -16,6 +16,7 @@
# under the License.
import contextlib
import errno
import functools
import os
@ -135,6 +136,96 @@ else:
_semaphores = weakref.WeakValueDictionary()
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `semaphore.Semaphore` instance unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator.
@ -157,105 +248,18 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
...
This way only one of either foo or bar can be executing at a time.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
{'function': f.__name__})
return f(*args, **kwargs)
with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" '
'for method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at '
'%(path)s for method '
'"%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at '
'%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
finally:
local.strong_store.locks_held.remove(name)
return retval
LOG.debug(_('Semaphore / lock released "%(function)s"'),
{'function': f.__name__})
return inner
return wrap

View File

@ -22,12 +22,14 @@ import tempfile
import eventlet
from eventlet import greenpool
from eventlet import greenthread
from eventlet import semaphore
from openstack.common import lockutils
from tests import utils
class TestFileLocks(utils.BaseTestCase):
def test_concurrent_green_lock_succeeds(self):
"""Verify spawn_n greenthreads with two locks run concurrently."""
tmpdir = tempfile.mkdtemp()
@ -63,6 +65,7 @@ class TestFileLocks(utils.BaseTestCase):
class LockTestCase(utils.BaseTestCase):
def test_synchronized_wrapped_function_metadata(self):
@lockutils.synchronized('whatever', 'test-')
def foo():
@ -74,16 +77,16 @@ class LockTestCase(utils.BaseTestCase):
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
def test_synchronized_internally(self):
def test_lock_internally(self):
"""We can lock across multiple green threads."""
saved_sem_num = len(lockutils._semaphores)
seen_threads = list()
@lockutils.synchronized('testlock2', 'test-', external=False)
def f(id):
for x in range(10):
seen_threads.append(id)
greenthread.sleep(0)
def f(_id):
with lockutils.lock('testlock2', 'test-', external=False):
for x in range(10):
seen_threads.append(_id)
greenthread.sleep(0)
threads = []
pool = greenpool.GreenPool(10)
@ -104,7 +107,7 @@ class LockTestCase(utils.BaseTestCase):
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
"Semaphore leak detected")
def test_nested_external_works(self):
def test_nested_synchronized_external_works(self):
"""We can nest external syncs."""
tempdir = tempfile.mkdtemp()
try:
@ -125,34 +128,35 @@ class LockTestCase(utils.BaseTestCase):
if os.path.exists(tempdir):
shutil.rmtree(tempdir)
def _do_test_synchronized_externally(self):
def _do_test_lock_externally(self):
"""We can lock across multiple processes."""
@lockutils.synchronized('external', 'test-', external=True)
def lock_files(handles_dir):
# Open some files we can use for locking
handles = []
for n in range(50):
path = os.path.join(handles_dir, ('file-%s' % n))
handles.append(open(path, 'w'))
# Loop over all the handles and try locking the file
# without blocking, keep a count of how many files we
# were able to lock and then unlock. If the lock fails
# we get an IOError and bail out with bad exit code
count = 0
for handle in handles:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
count += 1
fcntl.flock(handle, fcntl.LOCK_UN)
except IOError:
os._exit(2)
finally:
handle.close()
with lockutils.lock('external', 'test-', external=True):
# Open some files we can use for locking
handles = []
for n in range(50):
path = os.path.join(handles_dir, ('file-%s' % n))
handles.append(open(path, 'w'))
# Check if we were able to open all files
self.assertEqual(50, count)
# Loop over all the handles and try locking the file
# without blocking, keep a count of how many files we
# were able to lock and then unlock. If the lock fails
# we get an IOError and bail out with bad exit code
count = 0
for handle in handles:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
count += 1
fcntl.flock(handle, fcntl.LOCK_UN)
except IOError:
os._exit(2)
finally:
handle.close()
# Check if we were able to open all files
self.assertEqual(50, count)
handles_dir = tempfile.mkdtemp()
try:
@ -175,23 +179,23 @@ class LockTestCase(utils.BaseTestCase):
if os.path.exists(handles_dir):
shutil.rmtree(handles_dir, ignore_errors=True)
def test_synchronized_externally(self):
def test_lock_externally(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir)
try:
self._do_test_synchronized_externally()
self._do_test_lock_externally()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_synchronized_externally_lock_dir_not_exist(self):
def test_lock_externally_lock_dir_not_exist(self):
lock_dir = tempfile.mkdtemp()
os.rmdir(lock_dir)
self.config(lock_path=lock_dir)
try:
self._do_test_synchronized_externally()
self._do_test_lock_externally()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
@ -239,3 +243,56 @@ class LockTestCase(utils.BaseTestCase):
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_contextlock(self):
lock_dir = tempfile.mkdtemp()
try:
# Note(flaper87): Lock is not external, which means
# a semaphore will be yielded
with lockutils.lock("test") as sem:
self.assertTrue(isinstance(sem, semaphore.Semaphore))
# NOTE(flaper87): Lock is external so an InterProcessLock
# will be yielded.
with lockutils.lock("test2", external=True,
lock_path=lock_dir):
path = os.path.join(lock_dir, "test2")
self.assertTrue(os.path.exists(path))
with lockutils.lock("test1",
external=True,
lock_path=lock_dir) as lock1:
self.assertTrue(isinstance(lock1,
lockutils.InterProcessLock))
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_contextlock_unlocks(self):
lock_dir = tempfile.mkdtemp()
sem = None
try:
with lockutils.lock("test") as sem:
self.assertTrue(isinstance(sem, semaphore.Semaphore))
with lockutils.lock("test2", external=True,
lock_path=lock_dir):
path = os.path.join(lock_dir, "test2")
self.assertTrue(os.path.exists(path))
# NOTE(flaper87): Lock should be free
with lockutils.lock("test2", external=True,
lock_path=lock_dir):
path = os.path.join(lock_dir, "test2")
self.assertTrue(os.path.exists(path))
# NOTE(flaper87): Lock should be free
# but semaphore should already exist.
with lockutils.lock("test") as sem2:
self.assertEqual(sem, sem2)
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)