Merge "Sync oslo lockutils to nova"

This commit is contained in:
Jenkins 2014-06-30 04:39:19 +00:00 committed by Gerrit Code Review
commit 4e35f5f2ee
1 changed files with 155 additions and 80 deletions

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import fcntl
import functools
import os
import shutil
@ -29,8 +29,7 @@ import weakref
from oslo.config import cfg
from nova.openstack.common import fileutils
from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import local
from nova.openstack.common.gettextutils import _, _LE, _LI
from nova.openstack.common import log as logging
@ -39,10 +38,10 @@ LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
help='Enables or disables inter-process locks.'),
cfg.StrOpt('lock_path',
default=os.environ.get("NOVA_LOCK_PATH"),
help=('Directory to use for lock files.'))
help='Directory to use for lock files.')
]
@ -54,7 +53,7 @@ def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
class _FileLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
@ -76,7 +75,13 @@ class _InterProcessLock(object):
self.lockfile = None
self.fname = name
def __enter__(self):
def acquire(self):
basedir = os.path.dirname(self.fname)
if not os.path.exists(basedir):
fileutils.ensure_tree(basedir)
LOG.info(_LI('Created lock path: %s'), basedir)
self.lockfile = open(self.fname, 'w')
while True:
@ -86,23 +91,41 @@ class _InterProcessLock(object):
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
LOG.debug('Got file lock "%s"', self.fname)
return True
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise
raise threading.ThreadError(_("Unable to acquire lock on"
" `%(filename)s` due to"
" %(exception)s") %
{
'filename': self.fname,
'exception': e,
})
def __exit__(self, exc_type, exc_val, exc_tb):
def __enter__(self):
self.acquire()
return self
def release(self):
try:
self.unlock()
self.lockfile.close()
LOG.debug('Released file lock "%s"', self.fname)
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"),
LOG.exception(_LE("Could not release the acquired lock `%s`"),
self.fname)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
return os.path.exists(self.fname)
def trylock(self):
raise NotImplementedError()
@ -110,7 +133,7 @@ class _InterProcessLock(object):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
class _WindowsLock(_FileLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
@ -118,7 +141,7 @@ class _WindowsLock(_InterProcessLock):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
class _FcntlLock(_FileLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
@ -126,17 +149,120 @@ class _PosixLock(_InterProcessLock):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
class _PosixLock(object):
def __init__(self, name):
# Hash the name because it's not valid to have POSIX semaphore
# names with things like / in them. Then use base64 to encode
# the digest() instead taking the hexdigest() because the
# result is shorter and most systems can't have shm sempahore
# names longer than 31 characters.
h = hashlib.sha1()
h.update(name.encode('ascii'))
self.name = str((b'/' + base64.urlsafe_b64encode(
h.digest())).decode('ascii'))
def acquire(self, timeout=None):
self.semaphore = posix_ipc.Semaphore(self.name,
flags=posix_ipc.O_CREAT,
initial_value=1)
self.semaphore.acquire(timeout)
return self
def __enter__(self):
self.acquire()
return self
def release(self):
self.semaphore.release()
self.semaphore.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
try:
semaphore = posix_ipc.Semaphore(self.name)
except posix_ipc.ExistentialError:
return False
else:
semaphore.close()
return True
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
FileLock = _WindowsLock
else:
import fcntl
import base64
import hashlib
import posix_ipc
InterProcessLock = _PosixLock
FileLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
def _get_lock_path(name, lock_file_prefix, lock_path=None):
# NOTE(mikal): the lock name cannot contain directory
# separators
name = name.replace(os.sep, '_')
if lock_file_prefix:
sep = '' if lock_file_prefix.endswith('-') else '-'
name = '%s%s%s' % (lock_file_prefix, sep, name)
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
# NOTE(bnemec): Create a fake lock path for posix locks so we don't
# unnecessarily raise the RequiredOptError below.
if InterProcessLock is not _PosixLock:
raise cfg.RequiredOptError('lock_path')
local_lock_path = 'posixlock:/'
return os.path.join(local_lock_path, name)
def external_lock(name, lock_file_prefix=None, lock_path=None):
LOG.debug('Attempting to grab external lock "%(lock)s"',
{'lock': name})
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
# NOTE(bnemec): If an explicit lock_path was passed to us then it
# means the caller is relying on file-based locking behavior, so
# we can't use posix locks for those calls.
if lock_path:
return FileLock(lock_file_path)
return InterProcessLock(lock_file_path)
def remove_external_lock_file(name, lock_file_prefix=None):
"""Remove a external lock file when it's not used anymore
This will be helpful when we have a lot of lock files
"""
with internal_lock(name):
lock_file_path = _get_lock_path(name, lock_file_prefix)
try:
os.remove(lock_file_path)
except OSError:
LOG.info(_LI('Failed to remove file %(file)s'),
{'file': lock_file_path})
def internal_lock(name):
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
LOG.debug('Got semaphore "%(lock)s"', {'lock': name})
return sem
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
@ -146,73 +272,22 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
should work across multiple processes. This means that if two different
workers both run a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
"""
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
int_lock = internal_lock(name)
with int_lock:
if external and not CONF.disable_process_locking:
ext_lock = external_lock(name, lock_file_prefix, lock_path)
with ext_lock:
yield ext_lock
else:
yield int_lock
LOG.debug('Released semaphore "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
@ -244,11 +319,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
def inner(*args, **kwargs):
try:
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
LOG.debug('Got semaphore / lock "%(function)s"',
{'function': f.__name__})
return f(*args, **kwargs)
finally:
LOG.debug(_('Semaphore / lock released "%(function)s"'),
LOG.debug('Semaphore / lock released "%(function)s"',
{'function': f.__name__})
return inner
return wrap