Merge "sql: close connection for lock if not used"
This commit is contained in:
commit
ed15f7cc1c
|
@ -35,7 +35,7 @@ class MySQLLock(locking.Lock):
|
|||
def __init__(self, name, parsed_url, options):
|
||||
super(MySQLLock, self).__init__(name)
|
||||
self.acquired = False
|
||||
self._conn = MySQLDriver.get_connection(parsed_url, options)
|
||||
self._conn = MySQLDriver.get_connection(parsed_url, options, True)
|
||||
|
||||
def acquire(self, blocking=True, shared=False):
|
||||
|
||||
|
@ -59,6 +59,8 @@ class MySQLLock(locking.Lock):
|
|||
return False
|
||||
|
||||
try:
|
||||
if not self._conn.open:
|
||||
self._conn.connect()
|
||||
with self._conn as cur:
|
||||
cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
|
||||
# Can return NULL on error
|
||||
|
@ -66,6 +68,7 @@ class MySQLLock(locking.Lock):
|
|||
self.acquired = True
|
||||
return True
|
||||
except pymysql.MySQLError as e:
|
||||
self._conn.close()
|
||||
utils.raise_with_cause(
|
||||
tooz.ToozError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
|
@ -73,6 +76,7 @@ class MySQLLock(locking.Lock):
|
|||
|
||||
if blocking:
|
||||
raise _retry.TryAgain
|
||||
self._conn.close()
|
||||
return False
|
||||
|
||||
return _lock()
|
||||
|
@ -84,8 +88,9 @@ class MySQLLock(locking.Lock):
|
|||
with self._conn as cur:
|
||||
cur.execute("SELECT RELEASE_LOCK(%s);", self.name)
|
||||
cur.fetchone()
|
||||
self.acquired = False
|
||||
return True
|
||||
self.acquired = False
|
||||
self._conn.close()
|
||||
return True
|
||||
except pymysql.MySQLError as e:
|
||||
utils.raise_with_cause(tooz.ToozError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
|
@ -159,7 +164,7 @@ class MySQLDriver(coordination.CoordinationDriver):
|
|||
raise tooz.NotImplemented
|
||||
|
||||
@staticmethod
|
||||
def get_connection(parsed_url, options):
|
||||
def get_connection(parsed_url, options, defer_connect=False):
|
||||
host = parsed_url.hostname
|
||||
port = parsed_url.port or MySQLLock.MYSQL_DEFAULT_PORT
|
||||
dbname = parsed_url.path[1:]
|
||||
|
@ -173,13 +178,15 @@ class MySQLDriver(coordination.CoordinationDriver):
|
|||
port=port,
|
||||
user=username,
|
||||
passwd=password,
|
||||
database=dbname)
|
||||
database=dbname,
|
||||
defer_connect=defer_connect)
|
||||
else:
|
||||
return pymysql.Connect(host=host,
|
||||
port=port,
|
||||
user=username,
|
||||
passwd=password,
|
||||
database=dbname)
|
||||
database=dbname,
|
||||
defer_connect=defer_connect)
|
||||
except (pymysql.err.OperationalError, pymysql.err.InternalError) as e:
|
||||
utils.raise_with_cause(coordination.ToozConnectionError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
|
|
|
@ -96,7 +96,9 @@ class PostgresLock(locking.Lock):
|
|||
def __init__(self, name, parsed_url, options):
|
||||
super(PostgresLock, self).__init__(name)
|
||||
self.acquired = False
|
||||
self._conn = PostgresDriver.get_connection(parsed_url, options)
|
||||
self._conn = None
|
||||
self._parsed_url = parsed_url
|
||||
self._options = options
|
||||
h = hashlib.md5()
|
||||
h.update(name)
|
||||
if six.PY2:
|
||||
|
@ -118,34 +120,46 @@ class PostgresLock(locking.Lock):
|
|||
raise _retry.TryAgain
|
||||
return False
|
||||
|
||||
with _translating_cursor(self._conn) as cur:
|
||||
if blocking is True:
|
||||
cur.execute("SELECT pg_advisory_lock(%s, %s);", self.key)
|
||||
cur.fetchone()
|
||||
self.acquired = True
|
||||
return True
|
||||
else:
|
||||
cur.execute("SELECT pg_try_advisory_lock(%s, %s);",
|
||||
self.key)
|
||||
if cur.fetchone()[0] is True:
|
||||
if not self._conn or self._conn.closed:
|
||||
self._conn = PostgresDriver.get_connection(self._parsed_url,
|
||||
self._options)
|
||||
try:
|
||||
with _translating_cursor(self._conn) as cur:
|
||||
if blocking is True:
|
||||
cur.execute("SELECT pg_advisory_lock(%s, %s);",
|
||||
self.key)
|
||||
cur.fetchone()
|
||||
self.acquired = True
|
||||
return True
|
||||
elif blocking is False:
|
||||
return False
|
||||
else:
|
||||
raise _retry.TryAgain
|
||||
cur.execute("SELECT pg_try_advisory_lock(%s, %s);",
|
||||
self.key)
|
||||
if cur.fetchone()[0] is True:
|
||||
self.acquired = True
|
||||
return True
|
||||
elif blocking is False:
|
||||
self._conn.close()
|
||||
return False
|
||||
else:
|
||||
raise _retry.TryAgain
|
||||
except _retry.TryAgain:
|
||||
pass # contine to retrieve lock on same conn
|
||||
except Exception:
|
||||
self._conn.close()
|
||||
raise
|
||||
|
||||
return _lock()
|
||||
|
||||
def release(self):
|
||||
if not self.acquired:
|
||||
return False
|
||||
|
||||
with _translating_cursor(self._conn) as cur:
|
||||
cur.execute("SELECT pg_advisory_unlock(%s, %s);",
|
||||
self.key)
|
||||
cur.execute("SELECT pg_advisory_unlock(%s, %s);", self.key)
|
||||
cur.fetchone()
|
||||
self.acquired = False
|
||||
return True
|
||||
self.acquired = False
|
||||
self._conn.close()
|
||||
return True
|
||||
|
||||
def __del__(self):
|
||||
if self.acquired:
|
||||
|
|
Loading…
Reference in New Issue