Enables db2 server disconnects to be handled pessimistically

A checkout listener can be added to sqlalchemy to be able to handle
database disconnect pessimistically and avoid an error in the event of
a DB restart or simply a temp connection issue due to network
glitches. At the moment only mysql is enabled with this feature. This
patch enables db2 as well.

Sync oslo db.sqlalchemy from commit:
bb4d7a2534f6087d828e22784dc263522d309592

Partial-Bug: #1231657

Change-Id: Ib89de07f004975528baffdfd19aae689b089a324
This commit is contained in:
JUN JIE NAN 2013-12-12 15:23:46 +08:00
parent 582cf8f047
commit cb0a2a0591
2 changed files with 22 additions and 14 deletions

View File

@ -601,18 +601,24 @@ def _thread_yield(dbapi_con, con_record):
time.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL connections checked out of the pool are alive.
def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL and DB2 connections are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
cursor = dbapi_conn.cursor()
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError as ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warning(_('Got mysql server has gone away: %s'), ex)
raise sqla_exc.DisconnectionError("Database server went away")
ping_sql = 'select 1'
if engine.name == 'ibm_db_sa':
# DB2 requires a table expression
ping_sql = 'select 1 from (values (1)) AS t1'
cursor.execute(ping_sql)
except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _('Database server has gone away: %s') % ex
LOG.warning(msg)
raise sqla_exc.DisconnectionError(msg)
else:
raise
@ -670,8 +676,9 @@ def create_engine(sql_connection, sqlite_fk=False):
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
if engine.name in ['mysql', 'ibm_db_sa']:
callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', callback)
elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',

View File

@ -17,6 +17,7 @@
import ConfigParser
import functools
import os
import subprocess
import lockfile
import sqlalchemy
@ -24,7 +25,6 @@ import sqlalchemy.exc
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
from heat.openstack.common import processutils
from heat.openstack.common.py3kcompat import urlutils
from heat.openstack.common import test
@ -158,13 +158,13 @@ class BaseMigrationTestCase(test.BaseTestCase):
super(BaseMigrationTestCase, self).tearDown()
def execute_cmd(self, cmd=None):
out, err = processutils.trycmd(cmd, shell=True, discard_warnings=True)
output = out or err
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = process.communicate()[0]
LOG.debug(output)
self.assertEqual('', err,
self.assertEqual(0, process.returncode,
"Failed to run: %s\n%s" % (cmd, output))
@_set_db_lock('pgadmin', 'tests-')
def _reset_pg(self, conn_pieces):
(user, password, database, host) = get_db_connection_info(conn_pieces)
os.environ['PGPASSWORD'] = password
@ -186,6 +186,7 @@ class BaseMigrationTestCase(test.BaseTestCase):
os.unsetenv('PGPASSWORD')
os.unsetenv('PGUSER')
@_set_db_lock(lock_prefix='migration_tests-')
def _reset_databases(self):
for key, engine in self.engines.items():
conn_string = self.test_databases[key]