Merge "Sync common db code from Oslo"

This commit is contained in:
Jenkins 2014-03-29 00:15:54 +00:00 committed by Gerrit Code Review
commit 11b1a7ffb7
7 changed files with 160 additions and 123 deletions

View File

@ -40,9 +40,12 @@ database_opts = [
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('mysql_sql_mode',
help='The SQL mode to be used for MySQL sessions '
'(default is empty, meaning do not override '
'any server-side SQL mode setting)'),
default='TRADITIONAL',
help='The SQL mode to be used for MySQL sessions. '
'This option, including the default, overrides any '
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',

View File

@ -168,7 +168,7 @@ def patch_migrate():
sqlite.SQLiteConstraintGenerator)
def db_sync(engine, abs_path, version=None, init_version=0):
def db_sync(engine, abs_path, version=None, init_version=0, sanity_check=True):
"""Upgrade or downgrade a database.
Function runs the upgrade() or downgrade() functions in change scripts.
@ -179,7 +179,9 @@ def db_sync(engine, abs_path, version=None, init_version=0):
If None - database will update to the latest
available version.
:param init_version: Initial database version
:param sanity_check: Require schema sanity checking for all tables
"""
if version is not None:
try:
version = int(version)
@ -189,7 +191,8 @@ def db_sync(engine, abs_path, version=None, init_version=0):
current_version = db_version(engine, abs_path, init_version)
repository = _find_migrate_repo(abs_path)
_db_schema_sanity_check(engine)
if sanity_check:
_db_schema_sanity_check(engine)
if version is None or version > current_version:
return versioning_api.upgrade(engine, repository, version)
else:

View File

@ -29,7 +29,7 @@ from sqlalchemy.orm import object_mapper
from glance.openstack.common import timeutils
class ModelBase(object):
class ModelBase(six.Iterator):
"""Base class for models."""
__table_initialized__ = False
@ -78,10 +78,14 @@ class ModelBase(object):
self._i = iter(columns)
return self
def next(self):
# In Python 3, __next__() has replaced next().
def __next__(self):
n = six.advance_iterator(self._i)
return n, getattr(self, n)
def next(self):
return self.__next__()
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in six.iteritems(values):

View File

@ -16,6 +16,7 @@
"""Provision test environment for specific DB backends"""
import argparse
import logging
import os
import random
import string
@ -26,23 +27,12 @@ import sqlalchemy
from glance.openstack.common.db import exception as exc
SQL_CONNECTION = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', 'sqlite://')
LOG = logging.getLogger(__name__)
def _gen_credentials(*names):
"""Generate credentials."""
auth_dict = {}
for name in names:
val = ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10))
auth_dict[name] = val
return auth_dict
def _get_engine(uri=SQL_CONNECTION):
def get_engine(uri):
"""Engine creation
By default the uri is SQL_CONNECTION which is admin credentials.
Call the function without arguments to get admin connection. Admin
connection required to create temporary user and database for each
particular test. Otherwise use existing connection to recreate connection
@ -62,50 +52,43 @@ def _execute_sql(engine, sql, driver):
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
raise exc.DBConnectionError(msg % SQL_CONNECTION)
LOG.exception(msg % engine.url)
raise exc.DBConnectionError(msg % engine.url)
def create_database(engine):
"""Provide temporary user and database for each particular test."""
driver = engine.name
auth = _gen_credentials('database', 'user', 'passwd')
sqls = {
'mysql': [
"drop database if exists %(database)s;",
"grant all on %(database)s.* to '%(user)s'@'localhost'"
" identified by '%(passwd)s';",
"create database %(database)s;",
],
'postgresql': [
"drop database if exists %(database)s;",
"drop user if exists %(user)s;",
"create user %(user)s with password '%(passwd)s';",
"create database %(database)s owner %(user)s;",
]
auth = {
'database': ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10)),
'user': engine.url.username,
'passwd': engine.url.password,
}
sqls = [
"drop database if exists %(database)s;",
"create database %(database)s;"
]
if driver == 'sqlite':
return 'sqlite:////tmp/%s' % auth['database']
try:
sql_rows = sqls[driver]
except KeyError:
elif driver in ['mysql', 'postgresql']:
sql_query = map(lambda x: x % auth, sqls)
_execute_sql(engine, sql_query, driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
sql_query = map(lambda x: x % auth, sql_rows)
_execute_sql(engine, sql_query, driver)
params = auth.copy()
params['backend'] = driver
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
def drop_database(engine, current_uri):
def drop_database(admin_engine, current_uri):
"""Drop temporary database and user after each particular test."""
engine = _get_engine(current_uri)
admin_engine = _get_engine()
engine = get_engine(current_uri)
driver = engine.name
auth = {'database': engine.url.database, 'user': engine.url.username}
@ -114,26 +97,11 @@ def drop_database(engine, current_uri):
os.remove(auth['database'])
except OSError:
pass
return
sqls = {
'mysql': [
"drop database if exists %(database)s;",
"drop user '%(user)s'@'localhost';",
],
'postgresql': [
"drop database if exists %(database)s;",
"drop user if exists %(user)s;",
]
}
try:
sql_rows = sqls[driver]
except KeyError:
elif driver in ['mysql', 'postgresql']:
sql = "drop database if exists %(database)s;"
_execute_sql(admin_engine, [sql % auth], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
sql_query = map(lambda x: x % auth, sql_rows)
_execute_sql(admin_engine, sql_query, driver)
def main():
@ -172,7 +140,9 @@ def main():
args = parser.parse_args()
engine = _get_engine()
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
'sqlite://')
engine = get_engine(connection_string)
which = args.which
if which == "create":

View File

@ -291,7 +291,7 @@ from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
from glance.openstack.common.db import exception
from glance.openstack.common.gettextutils import _LE, _LW, _LI
from glance.openstack.common.gettextutils import _LE, _LW
from glance.openstack.common import timeutils
@ -428,13 +428,14 @@ def _raise_if_deadlock_error(operational_error, engine_name):
def _wrap_db_error(f):
#TODO(rpodolyaka): in a subsequent commit make this a class decorator to
# ensure it can only applied to Session subclasses instances (as we use
# Session instance bind attribute below)
@functools.wraps(f)
def _wrap(self, *args, **kwargs):
try:
assert issubclass(
self.__class__, sqlalchemy.orm.session.Session
), ('_wrap_db_error() can only be applied to methods of '
'subclasses of sqlalchemy.orm.session.Session.')
return f(self, *args, **kwargs)
except UnicodeEncodeError:
raise exception.DBInvalidUnicodeParameter()
@ -504,25 +505,20 @@ def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _LW('Database server has gone away: %s') % ex
LOG.warning(msg)
# if the database server has gone away, all connections in the pool
# have become invalid and we can safely close all of them here,
# rather than waste time on checking of every single connection
engine.dispose()
# this will be handled by SQLAlchemy and will force it to create
# a new connection and retry the original action
raise sqla_exc.DisconnectionError(msg)
else:
raise
def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
"""Set engine mode to 'traditional'.
Required to prevent silent truncates at insert or update operations
under MySQL. By default MySQL truncates inserted string if it longer
than a declared field just with warning. That is fraught with data
corruption.
"""
_set_session_sql_mode(dbapi_con, connection_rec,
connection_proxy, 'TRADITIONAL')
def _set_session_sql_mode(dbapi_con, connection_rec,
connection_proxy, sql_mode=None):
def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
"""Set the sql_mode session variable.
MySQL supports several server modes. The default is None, but sessions
@ -531,30 +527,54 @@ def _set_session_sql_mode(dbapi_con, connection_rec,
Note: passing in '' (empty string) for sql_mode clears
the SQL mode for the session, overriding a potentially set
server default. Passing in None (the default) makes this
a no-op, meaning if a server-side SQL mode is set, it still applies.
server default.
"""
cursor = dbapi_con.cursor()
if sql_mode is not None:
cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
# Check against the real effective SQL mode. Even when unset by
cursor = dbapi_con.cursor()
cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
def _mysql_get_effective_sql_mode(engine):
"""Returns the effective SQL mode for connections from the engine pool.
Returns ``None`` if the mode isn't available, otherwise returns the mode.
"""
# Get the real effective SQL mode. Even when unset by
# our own config, the server may still be operating in a specific
# SQL mode as set by the server configuration
cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
row = cursor.fetchone()
# SQL mode as set by the server configuration.
# Also note that the checkout listener will be called on execute to
# set the mode if it's registered.
row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
if row is None:
return
return row[1]
def _mysql_check_effective_sql_mode(engine):
"""Logs a message based on the effective SQL mode for MySQL connections."""
realmode = _mysql_get_effective_sql_mode(engine)
if realmode is None:
LOG.warning(_LW('Unable to detect effective SQL mode'))
return
realmode = row[1]
LOG.info(_LI('MySQL server mode set to %s') % realmode)
LOG.debug('MySQL server mode set to %s', realmode)
# 'TRADITIONAL' mode enables several other modes, so
# we need a substring match here
if not ('TRADITIONAL' in realmode.upper() or
'STRICT_ALL_TABLES' in realmode.upper()):
LOG.warning(_LW("MySQL SQL mode is '%s', "
"consider enabling TRADITIONAL or STRICT_ALL_TABLES")
% realmode)
"consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
realmode)
def _mysql_set_mode_callback(engine, sql_mode):
if sql_mode is not None:
mode_callback = functools.partial(_set_session_sql_mode,
sql_mode=sql_mode)
sqlalchemy.event.listen(engine, 'connect', mode_callback)
_mysql_check_effective_sql_mode(engine)
def _is_db_connection_error(args):
@ -582,7 +602,7 @@ def _raise_if_db_connection_lost(error, engine):
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
mysql_traditional_mode=False, idle_timeout=3600,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
connection_trace=False, max_retries=10, retry_interval=10):
@ -629,12 +649,8 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
ping_callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', ping_callback)
if engine.name == 'mysql':
if mysql_traditional_mode:
mysql_sql_mode = 'TRADITIONAL'
if mysql_sql_mode:
mode_callback = functools.partial(_set_session_sql_mode,
sql_mode=mysql_sql_mode)
sqlalchemy.event.listen(engine, 'checkout', mode_callback)
_mysql_set_mode_callback(engine, mysql_sql_mode)
elif 'sqlite' in connection_dict.drivername:
if not sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
@ -765,25 +781,23 @@ class EngineFacade(object):
integrate engine/sessionmaker instances into your apps any way you like
(e.g. one might want to bind a session to a request context). Two important
things to remember:
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
"""
def __init__(self, sql_connection,
sqlite_fk=False, mysql_sql_mode=None,
autocommit=True, expire_on_commit=False, **kwargs):
sqlite_fk=False, autocommit=True,
expire_on_commit=False, **kwargs):
"""Initialize engine and sessionmaker instances.
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param mysql_sql_mode: set SQL mode in MySQL
:type mysql_sql_mode: string
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
@ -792,6 +806,8 @@ class EngineFacade(object):
Keyword arguments:
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.
@ -819,7 +835,7 @@ class EngineFacade(object):
self._engine = create_engine(
sql_connection=sql_connection,
sqlite_fk=sqlite_fk,
mysql_sql_mode=mysql_sql_mode,
mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
idle_timeout=kwargs.get('idle_timeout', 3600),
connection_debug=kwargs.get('connection_debug', 0),
max_pool_size=kwargs.get('max_pool_size'),
@ -858,3 +874,31 @@ class EngineFacade(object):
del kwargs[arg]
return self._session_maker(**kwargs)
@classmethod
def from_config(cls, connection_string, conf,
sqlite_fk=False, autocommit=True, expire_on_commit=False):
"""Initialize EngineFacade using oslo.config config instance options.
:param connection_string: SQLAlchemy connection string
:type connection_string: string
:param conf: oslo.config config instance
:type conf: oslo.config.cfg.ConfigOpts
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
return cls(sql_connection=connection_string,
sqlite_fk=sqlite_fk,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
**dict(conf.database.items()))

View File

@ -22,6 +22,7 @@ import six
from glance.openstack.common.db.sqlalchemy import session
from glance.openstack.common.db.sqlalchemy import utils
from glance.openstack.common.fixture import lockutils
from glance.openstack.common import test
@ -120,6 +121,9 @@ class OpportunisticTestCase(DbTestCase):
FIXTURE = abc.abstractproperty(lambda: None)
def setUp(self):
# TODO(bnemec): Remove this once infra is ready for
# https://review.openstack.org/#/c/74963/ to merge.
self.useFixture(lockutils.LockFixture('opportunistic-db'))
credentials = {
'backend': self.FIXTURE.DRIVER,
'user': self.FIXTURE.USERNAME,

View File

@ -19,7 +19,6 @@
import logging
import re
from migrate.changeset import UniqueConstraint
import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
@ -33,7 +32,6 @@ from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
@ -218,6 +216,9 @@ def model_query(context, model, session, args=None, project_only=False,
:type read_deleted: bool
Usage:
..code:: python
result = (utils.model_query(context, models.Instance, session=session)
.filter_by(uuid=instance_uuid)
.all())
@ -226,7 +227,8 @@ def model_query(context, model, session, args=None, project_only=False,
context, Node,
session=session,
args=(func.count(Node.id), func.sum(Node.ram))
).filter_by(project_id=project_id)
).filter_by(project_id=project_id)
"""
if not read_deleted:
@ -298,6 +300,10 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""Drop unique constraint from table.
DEPRECATED: this function is deprecated and will be removed from glance.db
in a few releases. Please use UniqueConstraint.drop() method directly for
sqlalchemy-migrate migration scripts.
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
@ -314,6 +320,8 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
types by sqlite. For example BigInteger.
"""
from migrate.changeset import UniqueConstraint
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
@ -353,9 +361,9 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = select(columns_for_select,
group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
duplicated_rows_select = sqlalchemy.sql.select(
columns_for_select, group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
@ -365,7 +373,8 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
rows_to_delete_select = select([table.c.id]).where(delete_condition)
rows_to_delete_select = sqlalchemy.sql.select(
[table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name))
@ -476,7 +485,7 @@ def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
else:
c_select.append(table.c.deleted == table.c.id)
ins = InsertFromSelect(new_table, select(c_select))
ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select))
migrate_engine.execute(ins)
table.drop()