Move nova's util.synchronized decorator to openstack common.
In the end I needed to port utils.ensure_tree as well. Resolves bug 1063230. Change-Id: I6e6fa8201de2cac3f17e6c60d7b16f7df7c64116
This commit is contained in:
commit
3aabd9f0ae
|
@ -0,0 +1,232 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import errno
|
||||
import functools
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
import weakref
|
||||
|
||||
from eventlet import greenthread
|
||||
from eventlet import semaphore
|
||||
|
||||
from openstack.common import cfg
|
||||
from openstack.common import fileutils
|
||||
from openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
util_opts = [
|
||||
cfg.BoolOpt('disable_process_locking', default=False,
|
||||
help='Whether to disable inter-process locks'),
|
||||
cfg.StrOpt('lock_path',
|
||||
default=os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
'../')),
|
||||
help='Directory to use for lock files')
|
||||
]
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(util_opts)
|
||||
|
||||
|
||||
class _InterProcessLock(object):
|
||||
"""Lock implementation which allows multiple locks, working around
|
||||
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
||||
not require any cleanup. Since the lock is always held on a file
|
||||
descriptor rather than outside of the process, the lock gets dropped
|
||||
automatically if the process crashes, even if __exit__ is not executed.
|
||||
|
||||
There are no guarantees regarding usage by multiple green threads in a
|
||||
single process here. This lock works only between processes. Exclusive
|
||||
access between local threads should be achieved using the semaphores
|
||||
in the @synchronized decorator.
|
||||
|
||||
Note these locks are released when the descriptor is closed, so it's not
|
||||
safe to close the file descriptor while another green thread holds the
|
||||
lock. Just opening and closing the lock file can break synchronisation,
|
||||
so lock files must be accessed only using this abstraction.
|
||||
"""
|
||||
|
||||
def __init__(self, name):
|
||||
self.lockfile = None
|
||||
self.fname = name
|
||||
|
||||
def __enter__(self):
|
||||
self.lockfile = open(self.fname, 'w')
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Using non-blocking locks since green threads are not
|
||||
# patched to deal with blocking locking calls.
|
||||
# Also upon reading the MSDN docs for locking(), it seems
|
||||
# to have a laughable 10 attempts "blocking" mechanism.
|
||||
self.trylock()
|
||||
return self
|
||||
except IOError, e:
|
||||
if e.errno in (errno.EACCES, errno.EAGAIN):
|
||||
# external locks synchronise things like iptables
|
||||
# updates - give it some time to prevent busy spinning
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
raise
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
try:
|
||||
self.unlock()
|
||||
self.lockfile.close()
|
||||
except IOError:
|
||||
LOG.exception(_("Could not release the acquired lock `%s`"),
|
||||
self.fname)
|
||||
|
||||
def trylock(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def unlock(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class _WindowsLock(_InterProcessLock):
|
||||
def trylock(self):
|
||||
msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
|
||||
|
||||
def unlock(self):
|
||||
msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
|
||||
|
||||
|
||||
class _PosixLock(_InterProcessLock):
|
||||
def trylock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
def unlock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
||||
|
||||
|
||||
if os.name == 'nt':
|
||||
import msvcrt
|
||||
InterProcessLock = _WindowsLock
|
||||
else:
|
||||
import fcntl
|
||||
InterProcessLock = _PosixLock
|
||||
|
||||
_semaphores = weakref.WeakValueDictionary()
|
||||
|
||||
|
||||
def synchronized(name, lock_file_prefix, external=False, lock_path=None):
|
||||
"""Synchronization decorator.
|
||||
|
||||
Decorating a method like so::
|
||||
|
||||
@synchronized('mylock')
|
||||
def foo(self, *args):
|
||||
...
|
||||
|
||||
ensures that only one thread will execute the bar method at a time.
|
||||
|
||||
Different methods can share the same lock::
|
||||
|
||||
@synchronized('mylock')
|
||||
def foo(self, *args):
|
||||
...
|
||||
|
||||
@synchronized('mylock')
|
||||
def bar(self, *args):
|
||||
...
|
||||
|
||||
This way only one of either foo or bar can be executing at a time.
|
||||
|
||||
The lock_file_prefix argument is used to provide lock files on disk with a
|
||||
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
|
||||
|
||||
The external keyword argument denotes whether this lock should work across
|
||||
multiple processes. This means that if two different workers both run a
|
||||
a method decorated with @synchronized('mylock', external=True), only one
|
||||
of them will execute at a time.
|
||||
|
||||
The lock_path keyword argument is used to specify a special location for
|
||||
external lock files to live. If nothing is set, then CONF.lock_path is
|
||||
used as a default.
|
||||
"""
|
||||
|
||||
def wrap(f):
|
||||
@functools.wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
# NOTE(soren): If we ever go natively threaded, this will be racy.
|
||||
# See http://stackoverflow.com/questions/5390569/dyn
|
||||
# amically-allocating-and-destroying-mutexes
|
||||
sem = _semaphores.get(name, semaphore.Semaphore())
|
||||
if name not in _semaphores:
|
||||
# this check is not racy - we're already holding ref locally
|
||||
# so GC won't remove the item and there was no IO switch
|
||||
# (only valid in greenthreads)
|
||||
_semaphores[name] = sem
|
||||
|
||||
with sem:
|
||||
LOG.debug(_('Got semaphore "%(lock)s" for method '
|
||||
'"%(method)s"...'), {'lock': name,
|
||||
'method': f.__name__})
|
||||
if external and not CONF.disable_process_locking:
|
||||
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
|
||||
'method "%(method)s"...'),
|
||||
{'lock': name, 'method': f.__name__})
|
||||
cleanup_dir = False
|
||||
|
||||
# We need a copy of lock_path because it is non-local
|
||||
local_lock_path = lock_path
|
||||
if not local_lock_path:
|
||||
local_lock_path = CONF.lock_path
|
||||
|
||||
if not local_lock_path:
|
||||
cleanup_dir = True
|
||||
local_lock_path = tempfile.mkdtemp()
|
||||
|
||||
if not os.path.exists(local_lock_path):
|
||||
cleanup_dir = True
|
||||
fileutils.ensure_tree(local_lock_path)
|
||||
|
||||
# NOTE(mikal): the lock name cannot contain directory
|
||||
# separators
|
||||
safe_name = name.replace(os.sep, '_')
|
||||
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
|
||||
lock_file_path = os.path.join(local_lock_path,
|
||||
lock_file_name)
|
||||
|
||||
try:
|
||||
lock = InterProcessLock(lock_file_path)
|
||||
with lock:
|
||||
LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
|
||||
'for method "%(method)s"...'),
|
||||
{'lock': name,
|
||||
'path': lock_file_path,
|
||||
'method': f.__name__})
|
||||
retval = f(*args, **kwargs)
|
||||
finally:
|
||||
# NOTE(vish): This removes the tempdir if we needed
|
||||
# to create one. This is used to cleanup
|
||||
# the locks left behind by unit tests.
|
||||
if cleanup_dir:
|
||||
shutil.rmtree(local_lock_path)
|
||||
else:
|
||||
retval = f(*args, **kwargs)
|
||||
|
||||
return retval
|
||||
return inner
|
||||
return wrap
|
|
@ -0,0 +1,165 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Justin Santa Barbara
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import errno
|
||||
import os
|
||||
import select
|
||||
import shutil
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import eventlet
|
||||
from eventlet import greenthread
|
||||
from eventlet import greenpool
|
||||
|
||||
from openstack.common import lockutils
|
||||
from tests import utils as test_utils
|
||||
|
||||
|
||||
class TestFileLocks(test_utils.BaseTestCase):
|
||||
def test_concurrent_green_lock_succeeds(self):
|
||||
"""Verify spawn_n greenthreads with two locks run concurrently."""
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
try:
|
||||
self.completed = False
|
||||
|
||||
def locka(wait):
|
||||
a = lockutils.InterProcessLock(os.path.join(tmpdir, 'a'))
|
||||
with a:
|
||||
wait.wait()
|
||||
self.completed = True
|
||||
|
||||
def lockb(wait):
|
||||
b = lockutils.InterProcessLock(os.path.join(tmpdir, 'b'))
|
||||
with b:
|
||||
wait.wait()
|
||||
|
||||
wait1 = eventlet.event.Event()
|
||||
wait2 = eventlet.event.Event()
|
||||
pool = greenpool.GreenPool()
|
||||
pool.spawn_n(locka, wait1)
|
||||
pool.spawn_n(lockb, wait2)
|
||||
wait2.send()
|
||||
eventlet.sleep(0)
|
||||
wait1.send()
|
||||
pool.waitall()
|
||||
|
||||
self.assertTrue(self.completed)
|
||||
|
||||
finally:
|
||||
if os.path.exists(tmpdir):
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
class LockTestCase(test_utils.BaseTestCase):
|
||||
def test_synchronized_wrapped_function_metadata(self):
|
||||
@lockutils.synchronized('whatever', 'test-')
|
||||
def foo():
|
||||
"""Bar"""
|
||||
pass
|
||||
|
||||
self.assertEquals(foo.__doc__, 'Bar', "Wrapped function's docstring "
|
||||
"got lost")
|
||||
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
|
||||
"got mangled")
|
||||
|
||||
def test_synchronized_internally(self):
|
||||
"""We can lock across multiple green threads"""
|
||||
saved_sem_num = len(lockutils._semaphores)
|
||||
seen_threads = list()
|
||||
|
||||
@lockutils.synchronized('testlock2', 'test-', external=False)
|
||||
def f(id):
|
||||
for x in range(10):
|
||||
seen_threads.append(id)
|
||||
greenthread.sleep(0)
|
||||
|
||||
threads = []
|
||||
pool = greenpool.GreenPool(10)
|
||||
for i in range(10):
|
||||
threads.append(pool.spawn(f, i))
|
||||
|
||||
for thread in threads:
|
||||
thread.wait()
|
||||
|
||||
self.assertEquals(len(seen_threads), 100)
|
||||
# Looking at the seen threads, split it into chunks of 10, and verify
|
||||
# that the last 9 match the first in each chunk.
|
||||
for i in range(10):
|
||||
for j in range(9):
|
||||
self.assertEquals(seen_threads[i * 10],
|
||||
seen_threads[i * 10 + 1 + j])
|
||||
|
||||
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
|
||||
"Semaphore leak detected")
|
||||
|
||||
def test_nested_external_works(self):
|
||||
"""We can nest external syncs"""
|
||||
tempdir = tempfile.mkdtemp()
|
||||
try:
|
||||
self.config(lock_path=tempdir)
|
||||
sentinel = object()
|
||||
|
||||
@lockutils.synchronized('testlock1', 'test-', external=True)
|
||||
def outer_lock():
|
||||
|
||||
@lockutils.synchronized('testlock2', 'test-', external=True)
|
||||
def inner_lock():
|
||||
return sentinel
|
||||
return inner_lock()
|
||||
|
||||
self.assertEqual(sentinel, outer_lock())
|
||||
|
||||
finally:
|
||||
if os.path.exists(tempdir):
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
def test_synchronized_externally(self):
|
||||
"""We can lock across multiple processes"""
|
||||
tempdir = tempfile.mkdtemp()
|
||||
self.config(lock_path=tempdir)
|
||||
rpipe1, wpipe1 = os.pipe()
|
||||
rpipe2, wpipe2 = os.pipe()
|
||||
|
||||
@lockutils.synchronized('testlock1', 'test-', external=True)
|
||||
def f(rpipe, wpipe):
|
||||
try:
|
||||
os.write(wpipe, "foo")
|
||||
except OSError, e:
|
||||
self.assertEquals(e.errno, errno.EPIPE)
|
||||
return
|
||||
|
||||
rfds, _wfds, _efds = select.select([rpipe], [], [], 1)
|
||||
self.assertEquals(len(rfds), 0, "The other process, which was "
|
||||
"supposed to be locked, "
|
||||
"wrote on its end of the "
|
||||
"pipe")
|
||||
os.close(rpipe)
|
||||
|
||||
pid = os.fork()
|
||||
if pid > 0:
|
||||
os.close(wpipe1)
|
||||
os.close(rpipe2)
|
||||
|
||||
f(rpipe1, wpipe2)
|
||||
else:
|
||||
os.close(rpipe1)
|
||||
os.close(wpipe2)
|
||||
|
||||
time.sleep(0.1)
|
||||
f(rpipe2, wpipe1)
|
||||
os._exit(0)
|
Loading…
Reference in New Issue