fix lock concurrency issues with certain drivers
This change adds a test to ensure that lock acquired from different
threads/processes work as expected. And fixes drivers when they won't.
With file driver, when two threads acquire the lock, when the first one
releases it, the second get a 'ValueError: I/O operation on closed file'
because the first have closed the file after releasing the lock.
Now, the file is open when the lock object is created and we let the GC
to close it when we don't need it anymore
With mysql, postgres, kazoo driver, multiple acquire inside the same
backend session, doesn't work like a exclusive lock, so we track the
status locally when we get the lock inside a process.
Change-Id: Ic990a79c9baebe534409c1a433e21f4a2f9a7d6a
(cherry picked from commit 6bab678e6e
)
This commit is contained in:
parent
8086661f40
commit
a21a2e817b
|
@ -305,6 +305,9 @@ class CoordinationDriver(object):
|
|||
def get_lock(name):
|
||||
"""Return a distributed lock.
|
||||
|
||||
This is a exclusive lock, a second call to acquire() will block or
|
||||
return False.
|
||||
|
||||
:param name: The lock name that is used to identify it across all
|
||||
nodes.
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import errno
|
||||
import logging
|
||||
import os
|
||||
|
||||
import tooz
|
||||
|
@ -21,6 +22,8 @@ from tooz import coordination
|
|||
from tooz.drivers import _retry
|
||||
from tooz import locking
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FileLock(locking.Lock):
|
||||
"""A file based lock."""
|
||||
|
@ -28,9 +31,9 @@ class FileLock(locking.Lock):
|
|||
def __init__(self, path):
|
||||
super(FileLock, self).__init__(path)
|
||||
self.acquired = False
|
||||
self.lockfile = open(self.name, 'a')
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
self.lockfile = open(self.name, 'a')
|
||||
|
||||
@_retry.retry(stop_max_delay=blocking)
|
||||
def _lock():
|
||||
|
@ -56,7 +59,6 @@ class FileLock(locking.Lock):
|
|||
|
||||
def release(self):
|
||||
self.unlock()
|
||||
self.lockfile.close()
|
||||
self.acquired = False
|
||||
|
||||
def lock(self):
|
||||
|
@ -65,6 +67,10 @@ class FileLock(locking.Lock):
|
|||
def unlock(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def __del__(self):
|
||||
if self.acquired:
|
||||
LOG.warn("unreleased lock %s garbage collected" % self.name)
|
||||
|
||||
|
||||
class WindowsFileLock(FileLock):
|
||||
def lock(self):
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import logging
|
||||
|
||||
import pymysql
|
||||
|
||||
import tooz
|
||||
|
@ -21,6 +23,8 @@ from tooz.drivers import _retry
|
|||
from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MySQLLock(locking.Lock):
|
||||
"""A MySQL based lock."""
|
||||
|
@ -28,38 +32,55 @@ class MySQLLock(locking.Lock):
|
|||
def __init__(self, name, parsed_url, options):
|
||||
super(MySQLLock, self).__init__(name)
|
||||
self._conn = MySQLDriver.get_connection(parsed_url, options)
|
||||
self.acquired = False
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
def _acquire(retry=False):
|
||||
|
||||
@_retry.retry(stop_max_delay=blocking)
|
||||
def _lock():
|
||||
# NOTE(sileht): mysql-server (<5.7.5) allows only one lock per
|
||||
# connection at a time:
|
||||
# select GET_LOCK("a", 0);
|
||||
# select GET_LOCK("b", 0); <-- this release lock "a" ...
|
||||
# Or
|
||||
# select GET_LOCK("a", 0);
|
||||
# select GET_LOCK("a", 0); release and lock again "a"
|
||||
#
|
||||
# So, we track locally the lock status with self.acquired
|
||||
if self.acquired is True:
|
||||
if blocking:
|
||||
raise _retry.Retry
|
||||
return False
|
||||
|
||||
try:
|
||||
with self._conn as cur:
|
||||
cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
|
||||
# Can return NULL on error
|
||||
if cur.fetchone()[0] is 1:
|
||||
self.acquired = True
|
||||
return True
|
||||
except pymysql.MySQLError as e:
|
||||
raise coordination.ToozError(utils.exception_message(e))
|
||||
if retry:
|
||||
raise _retry.Retry
|
||||
else:
|
||||
return False
|
||||
|
||||
if blocking is False:
|
||||
return _acquire()
|
||||
else:
|
||||
kwargs = _retry.RETRYING_KWARGS.copy()
|
||||
if blocking is not True:
|
||||
kwargs['stop_max_delay'] = blocking
|
||||
return _retry.Retrying(**kwargs).call(_acquire, retry=True)
|
||||
if blocking:
|
||||
raise _retry.Retry
|
||||
return False
|
||||
|
||||
return _lock()
|
||||
|
||||
def release(self):
|
||||
try:
|
||||
with self._conn as cur:
|
||||
cur.execute("SELECT RELEASE_LOCK(%s);", self.name)
|
||||
return cur.fetchone()[0]
|
||||
cur.fetchone()
|
||||
self.acquired = False
|
||||
except pymysql.MySQLError as e:
|
||||
raise coordination.ToozError(utils.exception_message(e))
|
||||
|
||||
def __del__(self):
|
||||
if self.acquired:
|
||||
LOG.warn("unreleased lock %s garbage collected" % self.name)
|
||||
|
||||
|
||||
class MySQLDriver(coordination.CoordinationDriver):
|
||||
"""A mysql based driver."""
|
||||
|
@ -78,9 +99,7 @@ class MySQLDriver(coordination.CoordinationDriver):
|
|||
self._conn.close()
|
||||
|
||||
def get_lock(self, name):
|
||||
return locking.WeakLockHelper(
|
||||
self._parsed_url.geturl(),
|
||||
MySQLLock, name, self._parsed_url, self._options)
|
||||
return MySQLLock(name, self._parsed_url, self._options)
|
||||
|
||||
@staticmethod
|
||||
def watch_join_group(group_id, callback):
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import contextlib
|
||||
import hashlib
|
||||
import logging
|
||||
|
||||
import psycopg2
|
||||
import six
|
||||
|
@ -26,6 +27,7 @@ from tooz.drivers import _retry
|
|||
from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# See: psycopg/diagnostics_type.c for what kind of fields these
|
||||
# objects may have (things like 'schema_name', 'internal_query'
|
||||
|
@ -90,6 +92,7 @@ class PostgresLock(locking.Lock):
|
|||
def __init__(self, name, parsed_url, options):
|
||||
super(PostgresLock, self).__init__(name)
|
||||
self._conn = PostgresDriver.get_connection(parsed_url, options)
|
||||
self.acquired = False
|
||||
h = hashlib.md5()
|
||||
h.update(name)
|
||||
if six.PY2:
|
||||
|
@ -98,30 +101,46 @@ class PostgresLock(locking.Lock):
|
|||
self.key = h.digest()[0:2]
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
if blocking is True:
|
||||
@_retry.retry(stop_max_delay=blocking)
|
||||
def _lock():
|
||||
# NOTE(sileht) One the same session the lock is not exclusive
|
||||
# so we track it internally if the process already has the lock.
|
||||
if self.acquired is True:
|
||||
if blocking:
|
||||
raise _retry.Retry
|
||||
return False
|
||||
|
||||
with _translating_cursor(self._conn) as cur:
|
||||
cur.execute("SELECT pg_advisory_lock(%s, %s);", self.key)
|
||||
return True
|
||||
elif blocking is False:
|
||||
with _translating_cursor(self._conn) as cur:
|
||||
cur.execute("SELECT pg_try_advisory_lock(%s, %s);", self.key)
|
||||
return cur.fetchone()[0]
|
||||
else:
|
||||
def _acquire():
|
||||
with _translating_cursor(self._conn) as cur:
|
||||
if blocking is True:
|
||||
cur.execute("SELECT pg_advisory_lock(%s, %s);", self.key)
|
||||
cur.fetchone()
|
||||
self.acquired = True
|
||||
return True
|
||||
else:
|
||||
cur.execute("SELECT pg_try_advisory_lock(%s, %s);",
|
||||
self.key)
|
||||
if cur.fetchone()[0] is True:
|
||||
self.acquired = True
|
||||
return True
|
||||
raise _retry.Retry
|
||||
kwargs = _retry.RETRYING_KWARGS.copy()
|
||||
kwargs['stop_max_delay'] = blocking
|
||||
return _retry.Retrying(**kwargs).call(_acquire)
|
||||
elif blocking is False:
|
||||
return False
|
||||
else:
|
||||
raise _retry.Retry
|
||||
|
||||
kwargs = _retry.RETRYING_KWARGS.copy()
|
||||
kwargs['stop_max_delay'] = blocking
|
||||
return _lock()
|
||||
|
||||
def release(self):
|
||||
with _translating_cursor(self._conn) as cur:
|
||||
cur.execute("SELECT pg_advisory_unlock(%s, %s);", self.key)
|
||||
return cur.fetchone()[0]
|
||||
cur.execute("SELECT pg_advisory_unlock(%s, %s);",
|
||||
self.key)
|
||||
cur.fetchone()
|
||||
self.acquired = False
|
||||
|
||||
def __del__(self):
|
||||
if self.acquired:
|
||||
LOG.warn("unreleased lock %s garbage collected" % self.name)
|
||||
|
||||
|
||||
class PostgresDriver(coordination.CoordinationDriver):
|
||||
|
@ -141,9 +160,7 @@ class PostgresDriver(coordination.CoordinationDriver):
|
|||
self._conn.close()
|
||||
|
||||
def get_lock(self, name):
|
||||
return locking.WeakLockHelper(
|
||||
self._parsed_url.geturl(),
|
||||
PostgresLock, name, self._parsed_url, self._options)
|
||||
return PostgresLock(name, self._parsed_url, self._options)
|
||||
|
||||
@staticmethod
|
||||
def watch_join_group(group_id, callback):
|
||||
|
|
|
@ -23,6 +23,7 @@ from kazoo.protocol import paths
|
|||
import six
|
||||
|
||||
from tooz import coordination
|
||||
from tooz.drivers import _retry
|
||||
from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
|
@ -31,13 +32,34 @@ class ZooKeeperLock(locking.Lock):
|
|||
def __init__(self, name, lock):
|
||||
super(ZooKeeperLock, self).__init__(name)
|
||||
self._lock = lock
|
||||
self.acquired = False
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
return self._lock.acquire(blocking=bool(blocking),
|
||||
timeout=blocking)
|
||||
|
||||
@_retry.retry(stop_max_delay=blocking)
|
||||
def _lock():
|
||||
# NOTE(sileht): kazoo lock looks broken:
|
||||
# https://github.com/python-zk/kazoo/issues/291
|
||||
# so we track the exclusivity internally
|
||||
if self.acquired is True:
|
||||
if blocking:
|
||||
raise _retry.Retry
|
||||
return False
|
||||
|
||||
if self._lock.acquire(blocking=bool(blocking),
|
||||
timeout=0):
|
||||
self.acquired = True
|
||||
return True
|
||||
|
||||
if blocking:
|
||||
raise _retry.Retry
|
||||
return False
|
||||
|
||||
return _lock()
|
||||
|
||||
def release(self):
|
||||
return self._lock.release()
|
||||
self._lock.release()
|
||||
self.acquired = False
|
||||
|
||||
|
||||
class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
||||
|
|
|
@ -70,7 +70,6 @@ class SharedWeakLockHelper(Lock):
|
|||
|
||||
def __init__(self, namespace, lockclass, name, *args, **kwargs):
|
||||
super(SharedWeakLockHelper, self).__init__(name)
|
||||
self.acquired = False
|
||||
self._lock_key = "%s:%s" % (namespace, name)
|
||||
self._newlock = lambda: lockclass(
|
||||
self.name, *args, **kwargs)
|
||||
|
@ -93,31 +92,5 @@ class SharedWeakLockHelper(Lock):
|
|||
def release(self):
|
||||
with self.LOCKS_LOCK:
|
||||
l = self.ACQUIRED_LOCKS.pop(self._lock_key)
|
||||
l.release()
|
||||
self.RELEASED_LOCKS[self._lock_key] = l
|
||||
|
||||
|
||||
class WeakLockHelper(Lock):
|
||||
"""Helper for lock that need to rely on a state in memory and
|
||||
be a diffrent object across each coordinator.get_lock(...)
|
||||
"""
|
||||
|
||||
LOCKS_LOCK = threading.Lock()
|
||||
ACQUIRED_LOCKS = dict()
|
||||
|
||||
def __init__(self, namespace, lockclass, name, *args, **kwargs):
|
||||
super(WeakLockHelper, self).__init__(name)
|
||||
self._lock_key = "%s:%s" % (namespace, name)
|
||||
self._lock = lockclass(self.name, *args, **kwargs)
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
if self._lock.acquire(blocking):
|
||||
with self.LOCKS_LOCK:
|
||||
self.ACQUIRED_LOCKS[self._lock_key] = self._lock
|
||||
return True
|
||||
return False
|
||||
|
||||
def release(self):
|
||||
with self.LOCKS_LOCK:
|
||||
self._lock.release()
|
||||
self.ACQUIRED_LOCKS.pop(self._lock_key)
|
||||
l.release()
|
||||
|
|
|
@ -14,9 +14,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from concurrent import futures
|
||||
import testscenarios
|
||||
from testtools import matchers
|
||||
from testtools import testcase
|
||||
|
@ -25,6 +27,15 @@ import tooz.coordination
|
|||
from tooz import tests
|
||||
|
||||
|
||||
def try_to_lock_job(name, coord, url, member_id):
|
||||
if not coord:
|
||||
coord = tooz.coordination.get_coordinator(
|
||||
url, member_id)
|
||||
coord.start()
|
||||
lock2 = coord.get_lock(name)
|
||||
return lock2.acquire(blocking=False)
|
||||
|
||||
|
||||
class TestAPI(testscenarios.TestWithScenarios,
|
||||
tests.TestCaseSkipNotImplemented):
|
||||
|
||||
|
@ -577,6 +588,89 @@ class TestAPI(testscenarios.TestWithScenarios,
|
|||
with lock:
|
||||
pass
|
||||
|
||||
def test_get_lock_concurrency_locking_same_lock(self):
|
||||
lock = self._coord.get_lock(self._get_random_uuid())
|
||||
|
||||
graceful_ending = threading.Event()
|
||||
|
||||
def thread():
|
||||
self.assertTrue(lock.acquire())
|
||||
lock.release()
|
||||
graceful_ending.set()
|
||||
|
||||
t = threading.Thread(target=thread)
|
||||
t.daemon = True
|
||||
with lock:
|
||||
t.start()
|
||||
# Ensure the thread try to get the lock
|
||||
time.sleep(.1)
|
||||
t.join()
|
||||
graceful_ending.wait(.2)
|
||||
self.assertTrue(graceful_ending.is_set())
|
||||
|
||||
def _do_test_get_lock_concurrency_locking_two_lock(self, executor,
|
||||
use_same_coord):
|
||||
name = self._get_random_uuid()
|
||||
lock1 = self._coord.get_lock(name)
|
||||
with lock1:
|
||||
with executor(max_workers=1) as e:
|
||||
coord = self._coord if use_same_coord else None
|
||||
f = e.submit(try_to_lock_job, name, coord, self.url,
|
||||
self._get_random_uuid())
|
||||
self.assertFalse(f.result())
|
||||
|
||||
def test_get_lock_concurrency_locking_two_lock_process(self):
|
||||
self._do_test_get_lock_concurrency_locking_two_lock(
|
||||
futures.ProcessPoolExecutor, False)
|
||||
|
||||
def test_get_lock_concurrency_locking_two_lock_thread1(self):
|
||||
self._do_test_get_lock_concurrency_locking_two_lock(
|
||||
futures.ThreadPoolExecutor, False)
|
||||
|
||||
def test_get_lock_concurrency_locking_two_lock_thread2(self):
|
||||
self._do_test_get_lock_concurrency_locking_two_lock(
|
||||
futures.ThreadPoolExecutor, True)
|
||||
|
||||
def test_get_lock_concurrency_locking2(self):
|
||||
# NOTE(sileht): some database based lock can have only
|
||||
# one lock per connection, this test ensures acquiring a
|
||||
# second lock doesn't release the first one.
|
||||
lock1 = self._coord.get_lock(self._get_random_uuid())
|
||||
lock2 = self._coord.get_lock(self._get_random_uuid())
|
||||
|
||||
graceful_ending = threading.Event()
|
||||
thread_locked = threading.Event()
|
||||
|
||||
def thread():
|
||||
with lock2:
|
||||
self.assertFalse(lock1.acquire(blocking=False))
|
||||
thread_locked.set()
|
||||
graceful_ending.set()
|
||||
|
||||
t = threading.Thread(target=thread)
|
||||
t.daemon = True
|
||||
|
||||
with lock1:
|
||||
t.start()
|
||||
thread_locked.wait()
|
||||
self.assertTrue(thread_locked.is_set())
|
||||
t.join()
|
||||
graceful_ending.wait()
|
||||
self.assertTrue(graceful_ending.is_set())
|
||||
|
||||
def test_get_lock_twice_locked_twice(self):
|
||||
name = self._get_random_uuid()
|
||||
lock1 = self._coord.get_lock(name)
|
||||
lock2 = self._coord.get_lock(name)
|
||||
with lock1:
|
||||
self.assertEqual(False, lock2.acquire(blocking=False))
|
||||
|
||||
def test_get_lock_locked_twice(self):
|
||||
name = self._get_random_uuid()
|
||||
lock = self._coord.get_lock(name)
|
||||
with lock:
|
||||
self.assertEqual(False, lock.acquire(blocking=False))
|
||||
|
||||
def test_get_multiple_locks_with_same_coord(self):
|
||||
name = self._get_random_uuid()
|
||||
lock1 = self._coord.get_lock(name)
|
||||
|
@ -587,21 +681,6 @@ class TestAPI(testscenarios.TestWithScenarios,
|
|||
self._coord.get_lock(name).acquire(blocking=False))
|
||||
lock1.release()
|
||||
|
||||
def test_get_multiple_locks_with_same_coord_without_ref(self):
|
||||
# NOTE(sileht): weird test case who want a lock that can't be
|
||||
# released ? This tests is here to ensure that the
|
||||
# acquired first lock in not vanished by the gc and get accidentally
|
||||
# released.
|
||||
# This test ensures that the consumer application will stuck when it
|
||||
# looses the ref of a acquired lock instead of create a race.
|
||||
# Also, by its nature this tests don't cleanup the created
|
||||
# semaphore by the ipc:// driver, don't close opened files and
|
||||
# sql connections and that the desired behavior.
|
||||
name = self._get_random_uuid()
|
||||
self.assertEqual(True, self._coord.get_lock(name).acquire())
|
||||
self.assertEqual(False,
|
||||
self._coord.get_lock(name).acquire(blocking=False))
|
||||
|
||||
def test_get_lock_multiple_coords(self):
|
||||
member_id2 = self._get_random_uuid()
|
||||
client2 = tooz.coordination.get_coordinator(self.url,
|
||||
|
|
Loading…
Reference in New Issue