Merge "Move synchronized body to a first-class function"

This commit is contained in:
Jenkins 2013-07-18 13:11:19 +00:00 committed by Gerrit Code Review
commit 0ae9ebb507
2 changed files with 188 additions and 127 deletions

View File

@ -16,6 +16,7 @@
# under the License.
import contextlib
import errno
import functools
import os
@ -135,6 +136,96 @@ else:
_semaphores = weakref.WeakValueDictionary()
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `semaphore.Semaphore` instance unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator.
@ -157,105 +248,18 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
...
This way only one of either foo or bar can be executing at a time.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
{'function': f.__name__})
return f(*args, **kwargs)
with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" '
'for method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at '
'%(path)s for method '
'"%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at '
'%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
finally:
local.strong_store.locks_held.remove(name)
return retval
LOG.debug(_('Semaphore / lock released "%(function)s"'),
{'function': f.__name__})
return inner
return wrap

View File

@ -22,12 +22,14 @@ import tempfile
import eventlet
from eventlet import greenpool
from eventlet import greenthread
from eventlet import semaphore
from openstack.common import lockutils
from tests import utils
class TestFileLocks(utils.BaseTestCase):
def test_concurrent_green_lock_succeeds(self):
"""Verify spawn_n greenthreads with two locks run concurrently."""
tmpdir = tempfile.mkdtemp()
@ -63,6 +65,7 @@ class TestFileLocks(utils.BaseTestCase):
class LockTestCase(utils.BaseTestCase):
def test_synchronized_wrapped_function_metadata(self):
@lockutils.synchronized('whatever', 'test-')
def foo():
@ -74,16 +77,16 @@ class LockTestCase(utils.BaseTestCase):
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
def test_synchronized_internally(self):
def test_lock_internally(self):
"""We can lock across multiple green threads."""
saved_sem_num = len(lockutils._semaphores)
seen_threads = list()
@lockutils.synchronized('testlock2', 'test-', external=False)
def f(id):
for x in range(10):
seen_threads.append(id)
greenthread.sleep(0)
def f(_id):
with lockutils.lock('testlock2', 'test-', external=False):
for x in range(10):
seen_threads.append(_id)
greenthread.sleep(0)
threads = []
pool = greenpool.GreenPool(10)
@ -104,7 +107,7 @@ class LockTestCase(utils.BaseTestCase):
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
"Semaphore leak detected")
def test_nested_external_works(self):
def test_nested_synchronized_external_works(self):
"""We can nest external syncs."""
tempdir = tempfile.mkdtemp()
try:
@ -125,34 +128,35 @@ class LockTestCase(utils.BaseTestCase):
if os.path.exists(tempdir):
shutil.rmtree(tempdir)
def _do_test_synchronized_externally(self):
def _do_test_lock_externally(self):
"""We can lock across multiple processes."""
@lockutils.synchronized('external', 'test-', external=True)
def lock_files(handles_dir):
# Open some files we can use for locking
handles = []
for n in range(50):
path = os.path.join(handles_dir, ('file-%s' % n))
handles.append(open(path, 'w'))
# Loop over all the handles and try locking the file
# without blocking, keep a count of how many files we
# were able to lock and then unlock. If the lock fails
# we get an IOError and bail out with bad exit code
count = 0
for handle in handles:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
count += 1
fcntl.flock(handle, fcntl.LOCK_UN)
except IOError:
os._exit(2)
finally:
handle.close()
with lockutils.lock('external', 'test-', external=True):
# Open some files we can use for locking
handles = []
for n in range(50):
path = os.path.join(handles_dir, ('file-%s' % n))
handles.append(open(path, 'w'))
# Check if we were able to open all files
self.assertEqual(50, count)
# Loop over all the handles and try locking the file
# without blocking, keep a count of how many files we
# were able to lock and then unlock. If the lock fails
# we get an IOError and bail out with bad exit code
count = 0
for handle in handles:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
count += 1
fcntl.flock(handle, fcntl.LOCK_UN)
except IOError:
os._exit(2)
finally:
handle.close()
# Check if we were able to open all files
self.assertEqual(50, count)
handles_dir = tempfile.mkdtemp()
try:
@ -175,23 +179,23 @@ class LockTestCase(utils.BaseTestCase):
if os.path.exists(handles_dir):
shutil.rmtree(handles_dir, ignore_errors=True)
def test_synchronized_externally(self):
def test_lock_externally(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir)
try:
self._do_test_synchronized_externally()
self._do_test_lock_externally()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_synchronized_externally_lock_dir_not_exist(self):
def test_lock_externally_lock_dir_not_exist(self):
lock_dir = tempfile.mkdtemp()
os.rmdir(lock_dir)
self.config(lock_path=lock_dir)
try:
self._do_test_synchronized_externally()
self._do_test_lock_externally()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
@ -239,3 +243,56 @@ class LockTestCase(utils.BaseTestCase):
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_contextlock(self):
lock_dir = tempfile.mkdtemp()
try:
# Note(flaper87): Lock is not external, which means
# a semaphore will be yielded
with lockutils.lock("test") as sem:
self.assertTrue(isinstance(sem, semaphore.Semaphore))
# NOTE(flaper87): Lock is external so an InterProcessLock
# will be yielded.
with lockutils.lock("test2", external=True,
lock_path=lock_dir):
path = os.path.join(lock_dir, "test2")
self.assertTrue(os.path.exists(path))
with lockutils.lock("test1",
external=True,
lock_path=lock_dir) as lock1:
self.assertTrue(isinstance(lock1,
lockutils.InterProcessLock))
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_contextlock_unlocks(self):
lock_dir = tempfile.mkdtemp()
sem = None
try:
with lockutils.lock("test") as sem:
self.assertTrue(isinstance(sem, semaphore.Semaphore))
with lockutils.lock("test2", external=True,
lock_path=lock_dir):
path = os.path.join(lock_dir, "test2")
self.assertTrue(os.path.exists(path))
# NOTE(flaper87): Lock should be free
with lockutils.lock("test2", external=True,
lock_path=lock_dir):
path = os.path.join(lock_dir, "test2")
self.assertTrue(os.path.exists(path))
# NOTE(flaper87): Lock should be free
# but semaphore should already exist.
with lockutils.lock("test") as sem2:
self.assertEqual(sem, sem2)
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)