TST: make _do_test_lock_externally work on Windows

We need to access the platform-specific file handle locking
mechanisms, but they are hidden away in the class implementations.

This pushes them into static methods to make it easy to access them
while still allowing sub-classing.
This commit is contained in:
John Tyree 2016-02-15 14:58:19 -06:00
parent 526cccebec
commit 6d9d4ed012
2 changed files with 68 additions and 50 deletions

View File

@ -214,32 +214,44 @@ class _InterProcessLock(object):
return os.path.exists(self.path)
def trylock(self):
raise NotImplementedError()
self._trylock(self.lockfile)
def unlock(self):
self._unlock(self.lockfile)
@staticmethod
def _trylock():
raise NotImplementedError()
@staticmethod
def _unlock():
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
"""Interprocess lock implementation that works on windows systems."""
def trylock(self, lockfile=None):
fileno = (lockfile or self.lockfile).fileno()
@staticmethod
def _trylock(lockfile):
fileno = lockfile.fileno()
msvcrt.locking(fileno, msvcrt.LK_NBLCK, 1)
def unlock(self, lockfile=None):
fileno = (lockfile or self.lockfile).fileno()
@staticmethod
def _unlock(lockfile):
fileno = lockfile.fileno()
msvcrt.locking(fileno, msvcrt.LK_UNLCK, 1)
class _FcntlLock(_InterProcessLock):
"""Interprocess lock implementation that works on posix systems."""
def trylock(self, lockfile=None):
fcntl.lockf(lockfile or self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
@staticmethod
def _trylock(lockfile):
fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self, lockfile=None):
fcntl.lockf(lockfile or self.lockfile, fcntl.LOCK_UN)
@staticmethod
def _unlock(lockfile):
fcntl.lockf(lockfile, fcntl.LOCK_UN)
if os.name == 'nt':

View File

@ -55,6 +55,36 @@ def try_lock(lock_file):
os._exit(0)
def lock_files(lock_path, handles_dir, num_handles=50):
with pl.InterProcessLock(lock_path):
# Open some files we can use for locking
handles = []
for n in range(num_handles):
path = os.path.join(handles_dir, ('file-%s' % n))
handles.append(open(path, 'w'))
# Loop over all the handles and try locking the file
# without blocking, keep a count of how many files we
# were able to lock and then unlock. If the lock fails
# we get an IOError and bail out with bad exit code
count = 0
for handle in handles:
try:
pl.InterProcessLock._trylock(handle)
count += 1
pl.InterProcessLock._unlock(handle)
except IOError:
print(os.getpid())
os._exit(2)
finally:
handle.close()
# Check if we were able to open all files
if count != num_handles:
raise AssertionError("Unable to open all handles")
class ProcessLockTest(test.TestCase):
def setUp(self):
super(ProcessLockTest, self).setUp()
@ -111,49 +141,25 @@ class ProcessLockTest(test.TestCase):
def _do_test_lock_externally(self, lock_dir):
lock_path = os.path.join(lock_dir, "lock")
def lock_files(handles_dir):
with pl.InterProcessLock(lock_path):
# Open some files we can use for locking
handles = []
for n in range(50):
path = os.path.join(handles_dir, ('file-%s' % n))
handles.append(open(path, 'w'))
# Loop over all the handles and try locking the file
# without blocking, keep a count of how many files we
# were able to lock and then unlock. If the lock fails
# we get an IOError and bail out with bad exit code
count = 0
for handle in handles:
try:
pl.InterProcessLock.trylock(handle)
count += 1
pl.InterProcessLock.unlock(handle)
except IOError:
os._exit(2)
finally:
handle.close()
# Check if we were able to open all files
self.assertEqual(50, count)
handles_dir = tempfile.mkdtemp()
self.tmp_dirs.append(handles_dir)
children = []
for n in range(50):
pid = os.fork()
if pid:
children.append(pid)
else:
try:
lock_files(handles_dir)
finally:
os._exit(0)
for child in children:
(pid, status) = os.waitpid(child, 0)
if pid:
self.assertEqual(0, status)
num_handles = 50
num_processes = 50
args = [lock_path, handles_dir, num_handles]
children = [multiprocessing.Process(target=lock_files, args=args)
for _ in range(num_processes)]
# We do this in three loops in an attempt to get all processes up and
# running at the same time
for c in children:
# Just a precaution to avoid hung processes
c.daemon = True
c.start()
for c in children:
c.join(10)
for c in children:
self.assertEqual(0, c.exitcode)
def test_lock_externally(self):
self._do_test_lock_externally(self.lock_dir)