Update openstack/common/lockutils

The following commits are in this update:

79e6bc6 fix lockutils.lock() to make it thread-safe
ace5120 Add main() to lockutils that creates temp dir for locks
537d8e2 Allow lockutils to get lock_path conf from envvar
d498c42 Fix to properly log when we release a semaphore
29d387c Add LockFixture to lockutils
3e3ac0c Modify lockutils.py due to dispose of eventlet
90b6a65 Fix locking bug
27d4b41 Move synchronized body to a first-class function
15c17fb Make lock_file_prefix optional
1a2df89 Enable H302 hacking check
b41862d Use param keyword for docstrings
b873454 Added convenience APIs for lockutils
120ddef Improve Python 3.x compatibility
88d316f Locking edge case when lock_path does not exist
21925b6 Fix locking issues in Windows
547ab34 Fix Copyright Headers - Rename LLC to Foundation

Change-Id: Ie9b99ed80d603bdc7459ff7e25e3fba919429216
This commit is contained in:
Michael Still 2013-11-16 13:41:49 +11:00
parent 0b6136cdbc
commit 0f78733c2a
1 changed files with 155 additions and 83 deletions

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -16,19 +16,23 @@
# under the License.
import contextlib
import errno
import functools
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import weakref
from eventlet import semaphore
from oslo.config import cfg
from trove.openstack.common import fileutils
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import local
from trove.openstack.common import log as logging
@ -39,9 +43,8 @@ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
default=os.path.abspath(os.path.join(os.path.dirname(__file__),
'../')),
help='Directory to use for lock files')
default=os.environ.get("TROVE_LOCK_PATH"),
help=('Directory to use for lock files.'))
]
@ -49,6 +52,10 @@ CONF = cfg.CONF
CONF.register_opts(util_opts)
def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
@ -82,7 +89,7 @@ class _InterProcessLock(object):
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError, e:
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
@ -107,10 +114,10 @@ class _InterProcessLock(object):
class _WindowsLock(_InterProcessLock):
def trylock(self):
msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
@ -129,9 +136,88 @@ else:
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
def synchronized(name, lock_file_prefix, external=False, lock_path=None):
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@ -140,7 +226,7 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
def foo(self, *args):
...
ensures that only one thread will execute the bar method at a time.
ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
@ -153,81 +239,67 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
...
This way only one of either foo or bar can be executing at a time.
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
The external keyword argument denotes whether this lock should work across
multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time.
The lock_path keyword argument is used to specify a special location for
external lock files to live. If nothing is set, then CONF.lock_path is
used as a default.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
'method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
cleanup_dir = True
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
'for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to cleanup
# the locks left behind by unit tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
return retval
try:
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
{'function': f.__name__})
return f(*args, **kwargs)
finally:
LOG.debug(_('Semaphore / lock released "%(function)s"'),
{'function': f.__name__})
return inner
return wrap
def synchronized_with_prefix(lock_file_prefix):
"""Partial object generator for the synchronization decorator.
Redefine @synchronized in each project like so::
(in nova/utils.py)
from nova.openstack.common import lockutils
synchronized = lockutils.synchronized_with_prefix('nova-')
(in nova/foo.py)
from nova import utils
@utils.synchronized('mylock')
def bar(self, *args):
...
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix.
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
def main(argv):
"""Create a dir for locks and pass it to command from arguments
If you run this:
python -m openstack.common.lockutils python setup.py testr <etc>
a temporary directory will be created for all your locks and passed to all
your tests in an environment variable. The temporary dir will be deleted
afterwards and the return value will be preserved.
"""
lock_dir = tempfile.mkdtemp()
os.environ["TROVE_LOCK_PATH"] = lock_dir
try:
ret_val = subprocess.call(argv[1:])
finally:
shutil.rmtree(lock_dir, ignore_errors=True)
return ret_val
if __name__ == '__main__':
sys.exit(main(sys.argv))