Merge pull request #23 from johntyree/bug/multiplatform-tests
Fix process_lock and tests on Win32
This commit is contained in:
commit
fc9cd6cbbe
|
@ -214,30 +214,44 @@ class _InterProcessLock(object):
|
|||
return os.path.exists(self.path)
|
||||
|
||||
def trylock(self):
|
||||
raise NotImplementedError()
|
||||
self._trylock(self.lockfile)
|
||||
|
||||
def unlock(self):
|
||||
self._unlock(self.lockfile)
|
||||
|
||||
@staticmethod
|
||||
def _trylock():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def _unlock():
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class _WindowsLock(_InterProcessLock):
|
||||
"""Interprocess lock implementation that works on windows systems."""
|
||||
|
||||
def trylock(self):
|
||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
@staticmethod
|
||||
def _trylock(lockfile):
|
||||
fileno = lockfile.fileno()
|
||||
msvcrt.locking(fileno, msvcrt.LK_NBLCK, 1)
|
||||
|
||||
def unlock(self):
|
||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
@staticmethod
|
||||
def _unlock(lockfile):
|
||||
fileno = lockfile.fileno()
|
||||
msvcrt.locking(fileno, msvcrt.LK_UNLCK, 1)
|
||||
|
||||
|
||||
class _FcntlLock(_InterProcessLock):
|
||||
"""Interprocess lock implementation that works on posix systems."""
|
||||
|
||||
def trylock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
@staticmethod
|
||||
def _trylock(lockfile):
|
||||
fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
def unlock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
||||
@staticmethod
|
||||
def _unlock(lockfile):
|
||||
fcntl.lockf(lockfile, fcntl.LOCK_UN)
|
||||
|
||||
|
||||
if os.name == 'nt':
|
||||
|
|
|
@ -15,12 +15,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import errno
|
||||
import fcntl
|
||||
import multiprocessing
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
|
@ -28,6 +28,8 @@ import time
|
|||
from fasteners import process_lock as pl
|
||||
from fasteners import test
|
||||
|
||||
WIN32 = os.name == 'nt'
|
||||
|
||||
|
||||
class BrokenLock(pl.InterProcessLock):
|
||||
def __init__(self, name, errno_code):
|
||||
|
@ -43,6 +45,87 @@ class BrokenLock(pl.InterProcessLock):
|
|||
raise err
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def scoped_child_processes(children, timeout=0.1, exitcode=0):
|
||||
for child in children:
|
||||
child.daemon = True
|
||||
child.start()
|
||||
yield
|
||||
start = time.time()
|
||||
timed_out = 0
|
||||
|
||||
for child in children:
|
||||
child.join(max(timeout - (time.time() - start), 0))
|
||||
if child.is_alive():
|
||||
timed_out += 1
|
||||
child.terminate()
|
||||
|
||||
if timed_out:
|
||||
msg = "{} child processes killed due to timeout\n".format(timed_out)
|
||||
sys.stderr.write(msg)
|
||||
|
||||
if exitcode is not None:
|
||||
for child in children:
|
||||
c_code = child.exitcode
|
||||
msg = "Child exitcode {} != {}"
|
||||
assert c_code == exitcode, msg.format(c_code, exitcode)
|
||||
|
||||
|
||||
def try_lock(lock_file):
|
||||
try:
|
||||
my_lock = pl.InterProcessLock(lock_file)
|
||||
my_lock.lockfile = open(lock_file, 'w')
|
||||
my_lock.trylock()
|
||||
my_lock.unlock()
|
||||
os._exit(1)
|
||||
except IOError:
|
||||
os._exit(0)
|
||||
|
||||
|
||||
def lock_files(lock_path, handles_dir, num_handles=50):
|
||||
with pl.InterProcessLock(lock_path):
|
||||
|
||||
# Open some files we can use for locking
|
||||
handles = []
|
||||
for n in range(num_handles):
|
||||
path = os.path.join(handles_dir, ('file-%s' % n))
|
||||
handles.append(open(path, 'w'))
|
||||
|
||||
# Loop over all the handles and try locking the file
|
||||
# without blocking, keep a count of how many files we
|
||||
# were able to lock and then unlock. If the lock fails
|
||||
# we get an IOError and bail out with bad exit code
|
||||
count = 0
|
||||
for handle in handles:
|
||||
try:
|
||||
pl.InterProcessLock._trylock(handle)
|
||||
count += 1
|
||||
pl.InterProcessLock._unlock(handle)
|
||||
except IOError:
|
||||
os._exit(2)
|
||||
finally:
|
||||
handle.close()
|
||||
|
||||
# Check if we were able to open all files
|
||||
if count != num_handles:
|
||||
raise AssertionError("Unable to open all handles")
|
||||
|
||||
|
||||
def inter_processlock_helper(lockname, lock_filename, pipe):
|
||||
lock2 = pl.InterProcessLock(lockname)
|
||||
lock2.lockfile = open(lock_filename, 'w')
|
||||
have_lock = False
|
||||
while not have_lock:
|
||||
try:
|
||||
lock2.trylock()
|
||||
have_lock = True
|
||||
except IOError:
|
||||
pass
|
||||
# Hold the lock and wait for the parent
|
||||
pipe.send(None)
|
||||
pipe.recv()
|
||||
|
||||
|
||||
class ProcessLockTest(test.TestCase):
|
||||
def setUp(self):
|
||||
super(ProcessLockTest, self).setUp()
|
||||
|
@ -59,27 +142,13 @@ class ProcessLockTest(test.TestCase):
|
|||
lock_file = os.path.join(self.lock_dir, 'lock')
|
||||
lock = pl.InterProcessLock(lock_file)
|
||||
|
||||
def try_lock():
|
||||
try:
|
||||
my_lock = pl.InterProcessLock(lock_file)
|
||||
my_lock.lockfile = open(lock_file, 'w')
|
||||
my_lock.trylock()
|
||||
my_lock.unlock()
|
||||
os._exit(1)
|
||||
except IOError:
|
||||
os._exit(0)
|
||||
|
||||
def attempt_acquire(count):
|
||||
children = []
|
||||
for i in range(count):
|
||||
child = multiprocessing.Process(target=try_lock)
|
||||
child.start()
|
||||
children.append(child)
|
||||
exit_codes = []
|
||||
for child in children:
|
||||
child.join()
|
||||
exit_codes.append(child.exitcode)
|
||||
return sum(exit_codes)
|
||||
children = [
|
||||
multiprocessing.Process(target=try_lock, args=(lock_file,))
|
||||
for i in range(count)]
|
||||
with scoped_child_processes(children, timeout=10, exitcode=None):
|
||||
pass
|
||||
return sum(c.exitcode for c in children)
|
||||
|
||||
self.assertTrue(lock.acquire())
|
||||
try:
|
||||
|
@ -108,49 +177,17 @@ class ProcessLockTest(test.TestCase):
|
|||
def _do_test_lock_externally(self, lock_dir):
|
||||
lock_path = os.path.join(lock_dir, "lock")
|
||||
|
||||
def lock_files(handles_dir):
|
||||
with pl.InterProcessLock(lock_path):
|
||||
|
||||
# Open some files we can use for locking
|
||||
handles = []
|
||||
for n in range(50):
|
||||
path = os.path.join(handles_dir, ('file-%s' % n))
|
||||
handles.append(open(path, 'w'))
|
||||
|
||||
# Loop over all the handles and try locking the file
|
||||
# without blocking, keep a count of how many files we
|
||||
# were able to lock and then unlock. If the lock fails
|
||||
# we get an IOError and bail out with bad exit code
|
||||
count = 0
|
||||
for handle in handles:
|
||||
try:
|
||||
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
count += 1
|
||||
fcntl.flock(handle, fcntl.LOCK_UN)
|
||||
except IOError:
|
||||
os._exit(2)
|
||||
finally:
|
||||
handle.close()
|
||||
|
||||
# Check if we were able to open all files
|
||||
self.assertEqual(50, count)
|
||||
|
||||
handles_dir = tempfile.mkdtemp()
|
||||
self.tmp_dirs.append(handles_dir)
|
||||
children = []
|
||||
for n in range(50):
|
||||
pid = os.fork()
|
||||
if pid:
|
||||
children.append(pid)
|
||||
else:
|
||||
try:
|
||||
lock_files(handles_dir)
|
||||
finally:
|
||||
os._exit(0)
|
||||
for child in children:
|
||||
(pid, status) = os.waitpid(child, 0)
|
||||
if pid:
|
||||
self.assertEqual(0, status)
|
||||
|
||||
num_handles = 50
|
||||
num_processes = 50
|
||||
args = [lock_path, handles_dir, num_handles]
|
||||
children = [multiprocessing.Process(target=lock_files, args=args)
|
||||
for _ in range(num_processes)]
|
||||
|
||||
with scoped_child_processes(children, timeout=30, exitcode=0):
|
||||
pass
|
||||
|
||||
def test_lock_externally(self):
|
||||
self._do_test_lock_externally(self.lock_dir)
|
||||
|
@ -180,16 +217,20 @@ class ProcessLockTest(test.TestCase):
|
|||
|
||||
def test_interprocess_lock(self):
|
||||
lock_file = os.path.join(self.lock_dir, 'lock')
|
||||
lock_name = 'foo'
|
||||
|
||||
child_pipe, them = multiprocessing.Pipe()
|
||||
child = multiprocessing.Process(
|
||||
target=inter_processlock_helper, args=(lock_name, lock_file, them))
|
||||
|
||||
with scoped_child_processes((child,)):
|
||||
|
||||
pid = os.fork()
|
||||
if pid:
|
||||
# Make sure the child grabs the lock first
|
||||
if not child_pipe.poll(5):
|
||||
self.fail('Timed out waiting for child to grab lock')
|
||||
|
||||
start = time.time()
|
||||
while not os.path.exists(lock_file):
|
||||
if time.time() - start > 5:
|
||||
self.fail('Timed out waiting for child to grab lock')
|
||||
time.sleep(0)
|
||||
lock1 = pl.InterProcessLock('foo')
|
||||
lock1 = pl.InterProcessLock(lock_name)
|
||||
lock1.lockfile = open(lock_file, 'w')
|
||||
# NOTE(bnemec): There is a brief window between when the lock file
|
||||
# is created and when it actually becomes locked. If we happen to
|
||||
|
@ -206,26 +247,10 @@ class ProcessLockTest(test.TestCase):
|
|||
break
|
||||
else:
|
||||
self.fail('Never caught expected lock exception')
|
||||
# We don't need to wait for the full sleep in the child here
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
else:
|
||||
try:
|
||||
lock2 = pl.InterProcessLock('foo')
|
||||
lock2.lockfile = open(lock_file, 'w')
|
||||
have_lock = False
|
||||
while not have_lock:
|
||||
try:
|
||||
lock2.trylock()
|
||||
have_lock = True
|
||||
except IOError:
|
||||
pass
|
||||
finally:
|
||||
# NOTE(bnemec): This is racy, but I don't want to add any
|
||||
# synchronization primitives that might mask a problem
|
||||
# with the one we're trying to test here.
|
||||
time.sleep(.5)
|
||||
os._exit(0)
|
||||
|
||||
child_pipe.send(None)
|
||||
|
||||
@test.testtools.skipIf(WIN32, "Windows cannot open file handles twice")
|
||||
def test_non_destructive(self):
|
||||
lock_file = os.path.join(self.lock_dir, 'not-destroyed')
|
||||
with open(lock_file, 'w') as f:
|
||||
|
|
Loading…
Reference in New Issue