diff --git a/glance/cmd/manage.py b/glance/cmd/manage.py index f0ecbaf30d..6213c44e88 100755 --- a/glance/cmd/manage.py +++ b/glance/cmd/manage.py @@ -43,12 +43,13 @@ from oslo.config import cfg from glance.common import config from glance.common import exception from glance.db import migration as db_migration +from glance.db.sqlalchemy import api as db_api from glance.openstack.common.db.sqlalchemy import migration from glance.openstack.common import log from glance.openstack.common import strutils CONF = cfg.CONF -CONF.import_group("database", "glance.openstack.common.db.sqlalchemy.session") +CONF.import_group("database", "glance.openstack.common.db.options") # Decorators for actions @@ -67,23 +68,30 @@ class DbCommands(object): def version(self): """Print database's current migration level""" - print(migration.db_version(db_migration.MIGRATE_REPO_PATH, + print(migration.db_version(db_api.get_engine(), + db_migration.MIGRATE_REPO_PATH, db_migration.INIT_VERSION)) @args('--version', metavar='', help='Database version') def upgrade(self, version=None): """Upgrade the database's migration level""" - migration.db_sync(db_migration.MIGRATE_REPO_PATH, version) + migration.db_sync(db_api.get_engine(), + db_migration.MIGRATE_REPO_PATH, + version) @args('--version', metavar='', help='Database version') def downgrade(self, version=None): """Downgrade the database's migration level""" - migration.db_sync(db_migration.MIGRATE_REPO_PATH, version) + migration.db_sync(db_api.get_engine(), + db_migration.MIGRATE_REPO_PATH, + version) @args('--version', metavar='', help='Database version') def version_control(self, version=None): """Place a database under migration control""" - migration.db_version_control(db_migration.MIGRATE_REPO_PATH, version) + migration.db_version_control(db_api.get_engine(), + db_migration.MIGRATE_REPO_PATH, + version) @args('--version', metavar='', help='Database version') @args('--current_version', metavar='', @@ -94,9 +102,12 @@ class DbCommands(object): creating first if necessary. """ if current_version is not None: - migration.db_version_control(db_migration.MIGRATE_REPO_PATH, + migration.db_version_control(db_api.get_engine(), + db_migration.MIGRATE_REPO_PATH, current_version) - migration.db_sync(db_migration.MIGRATE_REPO_PATH, version) + migration.db_sync(db_api.get_engine(), + db_migration.MIGRATE_REPO_PATH, + version) class DbLegacyCommands(object): diff --git a/glance/db/migration.py b/glance/db/migration.py index 2d30b90d7b..ba1d8bc21f 100644 --- a/glance/db/migration.py +++ b/glance/db/migration.py @@ -22,14 +22,14 @@ import os from glance.common import utils - +from glance.db.sqlalchemy import api as db_api IMPL = utils.LazyPluggable( 'backend', config_group='database', sqlalchemy='glance.openstack.common.db.sqlalchemy.migration') -INIT_VERSION = 000 +INIT_VERSION = 0 MIGRATE_REPO_PATH = os.path.join( os.path.abspath(os.path.dirname(__file__)), @@ -38,12 +38,9 @@ MIGRATE_REPO_PATH = os.path.join( ) -def db_sync(version=None): +def db_sync(version=None, init_version=0): """Migrate the database to `version` or the most recent version.""" - return IMPL.db_sync(abs_path=MIGRATE_REPO_PATH, version=version) - - -def db_version(): - """Display the current database version.""" - return IMPL.db_version(abs_path=MIGRATE_REPO_PATH, - init_version=INIT_VERSION) + return IMPL.db_sync(engine=db_api.get_engine(), + abs_path=MIGRATE_REPO_PATH, + version=version, + init_version=init_version) diff --git a/glance/db/sqlalchemy/api.py b/glance/db/sqlalchemy/api.py index 8e2b1db27c..2927a64392 100644 --- a/glance/db/sqlalchemy/api.py +++ b/glance/db/sqlalchemy/api.py @@ -44,13 +44,39 @@ STATUSES = ['active', 'saving', 'queued', 'killed', 'pending_delete', CONF = cfg.CONF CONF.import_opt('debug', 'glance.openstack.common.log') +CONF.import_opt('connection', 'glance.openstack.common.db.options', + group='database') + + +_FACADE = None + + +def _create_facade_lazily(): + global _FACADE + if _FACADE is None: + _FACADE = session.EngineFacade( + CONF.database.connection, + **dict(CONF.database.iteritems())) + return _FACADE + + +def get_engine(): + facade = _create_facade_lazily() + return facade.get_engine() + + +def get_session(autocommit=True, expire_on_commit=False): + facade = _create_facade_lazily() + return facade.get_session(autocommit=autocommit, + expire_on_commit=expire_on_commit) def clear_db_env(): """ Unset global configuration variables for database. """ - session.cleanup() + global _FACADE + _FACADE = None def _check_mutate_authorization(context, image_ref): @@ -65,10 +91,6 @@ def _check_mutate_authorization(context, image_ref): raise exc_class(msg) -_get_session = session.get_session -get_engine = session.get_engine - - def image_create(context, values): """Create an image from the values dictionary.""" return _image_update(context, values, None, purge_props=False) @@ -87,7 +109,7 @@ def image_update(context, image_id, values, purge_props=False, def image_destroy(context, image_id): """Destroy the image or raise if it does not exist.""" - session = _get_session() + session = get_session() with session.begin(): image_ref = _image_get(context, image_id, session=session) @@ -141,7 +163,7 @@ def _check_image_id(image_id): def _image_get(context, image_id, session=None, force_show_deleted=False): """Get an image or raise if it does not exist.""" _check_image_id(image_id) - session = session or _get_session() + session = session or get_session() try: query = session.query(models.Image)\ @@ -411,7 +433,7 @@ def _make_image_property_condition(key, value): def _select_images_query(context, image_conditions, admin_as_user, member_status, visibility): - session = _get_session() + session = get_session() img_conditional_clause = sa_sql.and_(*image_conditions) @@ -593,7 +615,7 @@ def _image_update(context, values, image_id, purge_props=False, #NOTE(jbresnah) values is altered in this so a copy is needed values = values.copy() - session = _get_session() + session = get_session() with session.begin(): # Remove the properties passed in the values mapping. We @@ -767,7 +789,7 @@ def _image_child_entry_delete_all(child_model_cls, image_id, delete_time=None, :rtype: int :return: The number of child entries got soft-deleted. """ - session = session or _get_session() + session = session or get_session() query = session.query(child_model_cls) \ .filter_by(image_id=image_id) \ @@ -801,7 +823,7 @@ def image_property_delete(context, prop_ref, image_ref, session=None): """ Used internally by image_property_create and image_property_update. """ - session = session or _get_session() + session = session or get_session() prop = session.query(models.ImageProperty).filter_by(image_id=image_ref, name=prop_ref).one() prop.delete(session=session) @@ -840,7 +862,7 @@ def _image_member_format(member_ref): def image_member_update(context, memb_id, values): """Update an ImageMember object.""" - session = _get_session() + session = get_session() memb_ref = _image_member_get(context, memb_id, session) _image_member_update(context, memb_ref, values, session) return _image_member_format(memb_ref) @@ -858,7 +880,7 @@ def _image_member_update(context, memb_ref, values, session=None): def image_member_delete(context, memb_id, session=None): """Delete an ImageMember object.""" - session = session or _get_session() + session = session or get_session() member_ref = _image_member_get(context, memb_id, session) _image_member_delete(context, member_ref, session) @@ -890,7 +912,7 @@ def image_member_find(context, image_id=None, member=None, status=None): :param image_id: identifier of image entity :param member: tenant to which membership has been granted """ - session = _get_session() + session = get_session() members = _image_member_find(context, session, image_id, member, status) return [_image_member_format(m) for m in members] @@ -923,7 +945,7 @@ def image_member_count(context, image_id): :param image_id: identifier of image entity """ - session = _get_session() + session = get_session() if not image_id: msg = _("Image id is required.") @@ -950,7 +972,7 @@ def _can_show_deleted(context): def image_tag_set_all(context, image_id, tags): - session = _get_session() + session = get_session() existing_tags = set(image_tag_get_all(context, image_id, session)) tags = set(tags) @@ -968,7 +990,7 @@ def image_tag_set_all(context, image_id, tags): def image_tag_create(context, image_id, value, session=None): """Create an image tag.""" - session = session or _get_session() + session = session or get_session() tag_ref = models.ImageTag(image_id=image_id, value=value) tag_ref.save(session=session) return tag_ref['value'] @@ -977,7 +999,7 @@ def image_tag_create(context, image_id, value, session=None): def image_tag_delete(context, image_id, value, session=None): """Delete an image tag.""" _check_image_id(image_id) - session = session or _get_session() + session = session or get_session() query = session.query(models.ImageTag)\ .filter_by(image_id=image_id)\ .filter_by(value=value)\ @@ -1002,7 +1024,7 @@ def _image_tag_delete_all(context, image_id, delete_time=None, session=None): def image_tag_get_all(context, image_id, session=None): """Get a list of tags for a specific image.""" _check_image_id(image_id) - session = session or _get_session() + session = session or get_session() tags = session.query(models.ImageTag)\ .filter_by(image_id=image_id)\ .filter_by(deleted=False)\ @@ -1013,7 +1035,7 @@ def image_tag_get_all(context, image_id, session=None): def user_get_storage_usage(context, owner_id, image_id=None, session=None): _check_image_id(image_id) - session = session or _get_session() + session = session or get_session() total_size = _image_get_disk_usage_by_owner( owner_id, session, image_id=image_id) return total_size @@ -1033,7 +1055,7 @@ def _task_info_format(task_info_ref): def _task_info_create(context, task_id, values, session=None): """Create an TaskInfo object""" - session = session or _get_session() + session = session or get_session() task_info_ref = models.TaskInfo() task_info_ref.task_id = task_id task_info_ref.update(values) @@ -1043,7 +1065,7 @@ def _task_info_create(context, task_id, values, session=None): def _task_info_update(context, task_id, values, session=None): """Update an TaskInfo object""" - session = session or _get_session() + session = session or get_session() task_info_ref = _task_info_get(context, task_id, session=session) if task_info_ref: task_info_ref.update(values) @@ -1053,7 +1075,7 @@ def _task_info_update(context, task_id, values, session=None): def _task_info_get(context, task_id, session=None): """Fetch an TaskInfo entity by task_id""" - session = session or _get_session() + session = session or get_session() query = session.query(models.TaskInfo) query = query.filter_by(task_id=task_id) try: @@ -1071,7 +1093,7 @@ def task_create(context, values, session=None): """Create a task object""" values = values.copy() - session = session or _get_session() + session = session or get_session() with session.begin(): task_info_values = _pop_task_info_values(values) @@ -1099,7 +1121,7 @@ def _pop_task_info_values(values): def task_update(context, task_id, values, session=None): """Update a task object""" - session = session or _get_session() + session = session or get_session() with session.begin(): task_info_values = _pop_task_info_values(values) @@ -1129,7 +1151,7 @@ def task_get(context, task_id, session=None, force_show_deleted=False): def task_delete(context, task_id, session=None): """Delete a task""" - session = session or _get_session() + session = session or get_session() task_ref = _task_get(context, task_id, session=session) task_ref.delete(session=session) return _task_format(task_ref, task_ref.info) @@ -1152,7 +1174,7 @@ def task_get_all(context, filters=None, marker=None, limit=None, """ filters = filters or {} - session = _get_session() + session = get_session() query = session.query(models.Task) if not (context.is_admin or admin_as_user == True) and \ @@ -1215,7 +1237,7 @@ def _is_task_visible(context, task): def _task_get(context, task_id, session=None, force_show_deleted=False): """Fetch a task entity by id""" - session = session or _get_session() + session = session or get_session() query = session.query(models.Task).options( sa_orm.joinedload(models.Task.info) ).filter_by(id=task_id) diff --git a/glance/db/sqlalchemy/models.py b/glance/db/sqlalchemy/models.py index e135330d0e..232abec2de 100644 --- a/glance/db/sqlalchemy/models.py +++ b/glance/db/sqlalchemy/models.py @@ -72,6 +72,10 @@ class GlanceBase(models.ModelBase, models.TimestampMixin): __protected_attributes__ = set([ "created_at", "updated_at", "deleted_at", "deleted"]) + def save(self, session=None): + from glance.db.sqlalchemy import api as db_api + super(GlanceBase, self).save(session or db_api.get_session()) + created_at = Column(DateTime, default=timeutils.utcnow, nullable=False) # TODO(vsergeyev): Column `updated_at` have no default value in diff --git a/glance/openstack/common/__init__.py b/glance/openstack/common/__init__.py index e69de29bb2..2a00f3bc49 100644 --- a/glance/openstack/common/__init__.py +++ b/glance/openstack/common/__init__.py @@ -0,0 +1,2 @@ +import six +six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox')) diff --git a/glance/openstack/common/db/api.py b/glance/openstack/common/db/api.py index c16d0075f3..0a5cef3a69 100644 --- a/glance/openstack/common/db/api.py +++ b/glance/openstack/common/db/api.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright (c) 2013 Rackspace Hosting # All Rights Reserved. # @@ -17,90 +15,148 @@ """Multiple DB API backend support. -Supported configuration options: - -The following two parameters are in the 'database' group: -`backend`: DB backend name or full module path to DB backend module. -`use_tpool`: Enable thread pooling of DB API calls. - A DB backend module should implement a method named 'get_backend' which takes no arguments. The method can return any object that implements DB API methods. - -*NOTE*: There are bugs in eventlet when using tpool combined with -threading locks. The python logging module happens to use such locks. To -work around this issue, be sure to specify thread=False with -eventlet.monkey_patch(). - -A bug for eventlet has been filed here: - -https://bitbucket.org/eventlet/eventlet/issue/137/ """ + import functools +import logging +import threading +import time -from oslo.config import cfg - +from glance.openstack.common.db import exception +from glance.openstack.common.gettextutils import _LE from glance.openstack.common import importutils -from glance.openstack.common import lockutils -db_opts = [ - cfg.StrOpt('backend', - default='sqlalchemy', - deprecated_name='db_backend', - deprecated_group='DEFAULT', - help='The backend to use for db'), - cfg.BoolOpt('use_tpool', - default=False, - deprecated_name='dbapi_use_tpool', - deprecated_group='DEFAULT', - help='Enable the experimental use of thread pooling for ' - 'all DB API calls') -] +LOG = logging.getLogger(__name__) -CONF = cfg.CONF -CONF.register_opts(db_opts, 'database') + +def safe_for_db_retry(f): + """Enable db-retry for decorated function, if config option enabled.""" + f.__dict__['enable_retry'] = True + return f + + +class wrap_db_retry(object): + """Retry db.api methods, if DBConnectionError() raised + + Retry decorated db.api methods. If we enabled `use_db_reconnect` + in config, this decorator will be applied to all db.api functions, + marked with @safe_for_db_retry decorator. + Decorator catchs DBConnectionError() and retries function in a + loop until it succeeds, or until maximum retries count will be reached. + """ + + def __init__(self, retry_interval, max_retries, inc_retry_interval, + max_retry_interval): + super(wrap_db_retry, self).__init__() + + self.retry_interval = retry_interval + self.max_retries = max_retries + self.inc_retry_interval = inc_retry_interval + self.max_retry_interval = max_retry_interval + + def __call__(self, f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + next_interval = self.retry_interval + remaining = self.max_retries + + while True: + try: + return f(*args, **kwargs) + except exception.DBConnectionError as e: + if remaining == 0: + LOG.exception(_LE('DB exceeded retry limit.')) + raise exception.DBError(e) + if remaining != -1: + remaining -= 1 + LOG.exception(_LE('DB connection error.')) + # NOTE(vsergeyev): We are using patched time module, so + # this effectively yields the execution + # context to another green thread. + time.sleep(next_interval) + if self.inc_retry_interval: + next_interval = min( + next_interval * 2, + self.max_retry_interval + ) + return wrapper class DBAPI(object): - def __init__(self, backend_mapping=None): - if backend_mapping is None: - backend_mapping = {} - self.__backend = None - self.__backend_mapping = backend_mapping + def __init__(self, backend_name, backend_mapping=None, lazy=False, + **kwargs): + """Initialize the chosen DB API backend. + + :param backend_name: name of the backend to load + :type backend_name: str + + :param backend_mapping: backend name -> module/class to load mapping + :type backend_mapping: dict + + :param lazy: load the DB backend lazily on the first DB API method call + :type lazy: bool + + Keyword arguments: + + :keyword use_db_reconnect: retry DB transactions on disconnect or not + :type use_db_reconnect: bool + + :keyword retry_interval: seconds between transaction retries + :type retry_interval: int + + :keyword inc_retry_interval: increase retry interval or not + :type inc_retry_interval: bool + + :keyword max_retry_interval: max interval value between retries + :type max_retry_interval: int + + :keyword max_retries: max number of retries before an error is raised + :type max_retries: int - @lockutils.synchronized('dbapi_backend', 'glance-') - def __get_backend(self): - """Get the actual backend. May be a module or an instance of - a class. Doesn't matter to us. We do this synchronized as it's - possible multiple greenthreads started very quickly trying to do - DB calls and eventlet can switch threads before self.__backend gets - assigned. """ - if self.__backend: - # Another thread assigned it - return self.__backend - backend_name = CONF.database.backend - self.__use_tpool = CONF.database.use_tpool - if self.__use_tpool: - from eventlet import tpool - self.__tpool = tpool - # Import the untranslated name if we don't have a - # mapping. - backend_path = self.__backend_mapping.get(backend_name, - backend_name) - backend_mod = importutils.import_module(backend_path) - self.__backend = backend_mod.get_backend() - return self.__backend + + self._backend = None + self._backend_name = backend_name + self._backend_mapping = backend_mapping or {} + self._lock = threading.Lock() + + if not lazy: + self._load_backend() + + self.use_db_reconnect = kwargs.get('use_db_reconnect', False) + self.retry_interval = kwargs.get('retry_interval', 1) + self.inc_retry_interval = kwargs.get('inc_retry_interval', True) + self.max_retry_interval = kwargs.get('max_retry_interval', 10) + self.max_retries = kwargs.get('max_retries', 20) + + def _load_backend(self): + with self._lock: + if not self._backend: + # Import the untranslated name if we don't have a mapping + backend_path = self._backend_mapping.get(self._backend_name, + self._backend_name) + backend_mod = importutils.import_module(backend_path) + self._backend = backend_mod.get_backend() def __getattr__(self, key): - backend = self.__backend or self.__get_backend() - attr = getattr(backend, key) - if not self.__use_tpool or not hasattr(attr, '__call__'): + if not self._backend: + self._load_backend() + + attr = getattr(self._backend, key) + if not hasattr(attr, '__call__'): return attr + # NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry + # DB API methods, decorated with @safe_for_db_retry + # on disconnect. + if self.use_db_reconnect and hasattr(attr, 'enable_retry'): + attr = wrap_db_retry( + retry_interval=self.retry_interval, + max_retries=self.max_retries, + inc_retry_interval=self.inc_retry_interval, + max_retry_interval=self.max_retry_interval)(attr) - def tpool_wrapper(*args, **kwargs): - return self.__tpool.execute(attr, *args, **kwargs) - - functools.update_wrapper(tpool_wrapper, attr) - return tpool_wrapper + return attr diff --git a/glance/openstack/common/db/exception.py b/glance/openstack/common/db/exception.py index 0ee9c17c57..e3792bd8f3 100644 --- a/glance/openstack/common/db/exception.py +++ b/glance/openstack/common/db/exception.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -18,14 +16,16 @@ """DB related custom exceptions.""" -from glance.openstack.common.gettextutils import _ # noqa +import six + +from glance.openstack.common.gettextutils import _ class DBError(Exception): """Wraps an implementation specific exception.""" def __init__(self, inner_exception=None): self.inner_exception = inner_exception - super(DBError, self).__init__(str(inner_exception)) + super(DBError, self).__init__(six.text_type(inner_exception)) class DBDuplicateEntry(DBError): @@ -48,4 +48,9 @@ class DBInvalidUnicodeParameter(Exception): class DbMigrationError(DBError): """Wraps migration specific exception.""" def __init__(self, message=None): - super(DbMigrationError, self).__init__(str(message)) + super(DbMigrationError, self).__init__(message) + + +class DBConnectionError(DBError): + """Wraps connection specific exception.""" + pass diff --git a/glance/openstack/common/db/options.py b/glance/openstack/common/db/options.py new file mode 100644 index 0000000000..a8cafa3740 --- /dev/null +++ b/glance/openstack/common/db/options.py @@ -0,0 +1,168 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +from oslo.config import cfg + + +database_opts = [ + cfg.StrOpt('sqlite_db', + deprecated_group='DEFAULT', + default='glance.sqlite', + help='The file name to use with SQLite'), + cfg.BoolOpt('sqlite_synchronous', + deprecated_group='DEFAULT', + default=True, + help='If True, SQLite uses synchronous mode'), + cfg.StrOpt('backend', + default='sqlalchemy', + deprecated_name='db_backend', + deprecated_group='DEFAULT', + help='The backend to use for db'), + cfg.StrOpt('connection', + help='The SQLAlchemy connection string used to connect to the ' + 'database', + secret=True, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_connection', + group='DATABASE'), + cfg.DeprecatedOpt('connection', + group='sql'), ]), + cfg.StrOpt('mysql_sql_mode', + help='The SQL mode to be used for MySQL sessions ' + '(default is empty, meaning do not override ' + 'any server-side SQL mode setting)'), + cfg.IntOpt('idle_timeout', + default=3600, + deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_idle_timeout', + group='DATABASE'), + cfg.DeprecatedOpt('idle_timeout', + group='sql')], + help='Timeout before idle sql connections are reaped'), + cfg.IntOpt('min_pool_size', + default=1, + deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_min_pool_size', + group='DATABASE')], + help='Minimum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('max_pool_size', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_pool_size', + group='DATABASE')], + help='Maximum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('max_retries', + default=10, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_retries', + group='DATABASE')], + help='Maximum db connection retries during startup. ' + '(setting -1 implies an infinite retry count)'), + cfg.IntOpt('retry_interval', + default=10, + deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', + group='DEFAULT'), + cfg.DeprecatedOpt('reconnect_interval', + group='DATABASE')], + help='Interval between retries of opening a sql connection'), + cfg.IntOpt('max_overflow', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', + group='DEFAULT'), + cfg.DeprecatedOpt('sqlalchemy_max_overflow', + group='DATABASE')], + help='If set, use this value for max_overflow with sqlalchemy'), + cfg.IntOpt('connection_debug', + default=0, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', + group='DEFAULT')], + help='Verbosity of SQL debugging information. 0=None, ' + '100=Everything'), + cfg.BoolOpt('connection_trace', + default=False, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', + group='DEFAULT')], + help='Add python stack traces to SQL as comment strings'), + cfg.IntOpt('pool_timeout', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', + group='DATABASE')], + help='If set, use this value for pool_timeout with sqlalchemy'), + cfg.BoolOpt('use_db_reconnect', + default=False, + help='Enable the experimental use of database reconnect ' + 'on connection lost'), + cfg.IntOpt('db_retry_interval', + default=1, + help='seconds between db connection retries'), + cfg.BoolOpt('db_inc_retry_interval', + default=True, + help='Whether to increase interval between db connection ' + 'retries, up to db_max_retry_interval'), + cfg.IntOpt('db_max_retry_interval', + default=10, + help='max seconds between db connection retries, if ' + 'db_inc_retry_interval is enabled'), + cfg.IntOpt('db_max_retries', + default=20, + help='maximum db connection retries before error is raised. ' + '(setting -1 implies an infinite retry count)'), +] + +CONF = cfg.CONF +CONF.register_opts(database_opts, 'database') + + +def set_defaults(sql_connection, sqlite_db, max_pool_size=None, + max_overflow=None, pool_timeout=None): + """Set defaults for configuration variables.""" + cfg.set_defaults(database_opts, + connection=sql_connection, + sqlite_db=sqlite_db) + # Update the QueuePool defaults + if max_pool_size is not None: + cfg.set_defaults(database_opts, + max_pool_size=max_pool_size) + if max_overflow is not None: + cfg.set_defaults(database_opts, + max_overflow=max_overflow) + if pool_timeout is not None: + cfg.set_defaults(database_opts, + pool_timeout=pool_timeout) + + +def list_opts(): + """Returns a list of oslo.config options available in the library. + + The returned list includes all oslo.config options which may be registered + at runtime by the library. + + Each element of the list is a tuple. The first element is the name of the + group under which the list of elements in the second element will be + registered. A group name of None corresponds to the [DEFAULT] group in + config files. + + The purpose of this is to allow tools like the Oslo sample config file + generator to discover the options exposed to users by this library. + + :returns: a list of (group_name, opts) tuples + """ + return [('database', copy.deepcopy(database_opts))] diff --git a/glance/openstack/common/db/sqlalchemy/migration.py b/glance/openstack/common/db/sqlalchemy/migration.py index 4031f7b793..12bd1719d7 100644 --- a/glance/openstack/common/db/sqlalchemy/migration.py +++ b/glance/openstack/common/db/sqlalchemy/migration.py @@ -51,13 +51,9 @@ import sqlalchemy from sqlalchemy.schema import UniqueConstraint from glance.openstack.common.db import exception -from glance.openstack.common.db.sqlalchemy import session as db_session from glance.openstack.common.gettextutils import _ -get_engine = db_session.get_engine - - def _get_unique_constraints(self, table): """Retrieve information about existing unique constraints of the table @@ -172,11 +168,12 @@ def patch_migrate(): sqlite.SQLiteConstraintGenerator) -def db_sync(abs_path, version=None, init_version=0): +def db_sync(engine, abs_path, version=None, init_version=0): """Upgrade or downgrade a database. Function runs the upgrade() or downgrade() functions in change scripts. + :param engine: SQLAlchemy engine instance for a given database :param abs_path: Absolute path to migrate repository. :param version: Database will upgrade/downgrade until this version. If None - database will update to the latest @@ -190,18 +187,23 @@ def db_sync(abs_path, version=None, init_version=0): raise exception.DbMigrationError( message=_("version should be an integer")) - current_version = db_version(abs_path, init_version) + current_version = db_version(engine, abs_path, init_version) repository = _find_migrate_repo(abs_path) - _db_schema_sanity_check() + _db_schema_sanity_check(engine) if version is None or version > current_version: - return versioning_api.upgrade(get_engine(), repository, version) + return versioning_api.upgrade(engine, repository, version) else: - return versioning_api.downgrade(get_engine(), repository, + return versioning_api.downgrade(engine, repository, version) -def _db_schema_sanity_check(): - engine = get_engine() +def _db_schema_sanity_check(engine): + """Ensure all database tables were created with required parameters. + + :param engine: SQLAlchemy engine instance for a given database + + """ + if engine.name == 'mysql': onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION ' 'from information_schema.TABLES ' @@ -216,23 +218,23 @@ def _db_schema_sanity_check(): ) % ','.join(table_names)) -def db_version(abs_path, init_version): +def db_version(engine, abs_path, init_version): """Show the current version of the repository. + :param engine: SQLAlchemy engine instance for a given database :param abs_path: Absolute path to migrate repository :param version: Initial database version """ repository = _find_migrate_repo(abs_path) try: - return versioning_api.db_version(get_engine(), repository) + return versioning_api.db_version(engine, repository) except versioning_exceptions.DatabaseNotControlledError: meta = sqlalchemy.MetaData() - engine = get_engine() meta.reflect(bind=engine) tables = meta.tables if len(tables) == 0 or 'alembic_version' in tables: - db_version_control(abs_path, init_version) - return versioning_api.db_version(get_engine(), repository) + db_version_control(engine, abs_path, version=init_version) + return versioning_api.db_version(engine, repository) else: raise exception.DbMigrationError( message=_( @@ -241,17 +243,18 @@ def db_version(abs_path, init_version): "manually.")) -def db_version_control(abs_path, version=None): +def db_version_control(engine, abs_path, version=None): """Mark a database as under this repository's version control. Once a database is under version control, schema changes should only be done via change scripts in this repository. + :param engine: SQLAlchemy engine instance for a given database :param abs_path: Absolute path to migrate repository :param version: Initial database version """ repository = _find_migrate_repo(abs_path) - versioning_api.version_control(get_engine(), repository, version) + versioning_api.version_control(engine, repository, version) return version diff --git a/glance/openstack/common/db/sqlalchemy/models.py b/glance/openstack/common/db/sqlalchemy/models.py index d68fe655f9..71dcc7178c 100644 --- a/glance/openstack/common/db/sqlalchemy/models.py +++ b/glance/openstack/common/db/sqlalchemy/models.py @@ -26,7 +26,6 @@ from sqlalchemy import Column, Integer from sqlalchemy import DateTime from sqlalchemy.orm import object_mapper -from glance.openstack.common.db.sqlalchemy import session as sa from glance.openstack.common import timeutils @@ -34,10 +33,9 @@ class ModelBase(object): """Base class for models.""" __table_initialized__ = False - def save(self, session=None): + def save(self, session): """Save this object.""" - if not session: - session = sa.get_session() + # NOTE(boris-42): This part of code should be look like: # session.add(self) # session.flush() @@ -110,7 +108,7 @@ class SoftDeleteMixin(object): deleted_at = Column(DateTime) deleted = Column(Integer, default=0) - def soft_delete(self, session=None): + def soft_delete(self, session): """Mark this object as deleted.""" self.deleted = self.id self.deleted_at = timeutils.utcnow() diff --git a/glance/openstack/common/db/sqlalchemy/session.py b/glance/openstack/common/db/sqlalchemy/session.py index ad6b7c2b75..6eecef80c5 100644 --- a/glance/openstack/common/db/sqlalchemy/session.py +++ b/glance/openstack/common/db/sqlalchemy/session.py @@ -16,19 +16,6 @@ """Session Handling for SQLAlchemy backend. -Initializing: - -* Call `set_defaults()` with the minimal of the following kwargs: - ``sql_connection``, ``sqlite_db`` - - Example: - - .. code:: python - - session.set_defaults( - sql_connection="sqlite:///var/lib/glance/sqlite.db", - sqlite_db="/var/lib/glance/sqlite.db") - Recommended ways to use sessions within this framework: * Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``. @@ -87,7 +74,7 @@ Recommended ways to use sessions within this framework: .. code:: python def create_many_foo(context, foos): - session = get_session() + session = sessionmaker() with session.begin(): for foo in foos: foo_ref = models.Foo() @@ -95,7 +82,7 @@ Recommended ways to use sessions within this framework: session.add(foo_ref) def update_bar(context, foo_id, newbar): - session = get_session() + session = sessionmaker() with session.begin(): foo_ref = (model_query(context, models.Foo, session). filter_by(id=foo_id). @@ -142,7 +129,7 @@ Recommended ways to use sessions within this framework: foo1 = models.Foo() foo2 = models.Foo() foo1.id = foo2.id = 1 - session = get_session() + session = sessionmaker() try: with session.begin(): session.add(foo1) @@ -168,7 +155,7 @@ Recommended ways to use sessions within this framework: .. code:: python def myfunc(foo): - session = get_session() + session = sessionmaker() with session.begin(): # do some database things bar = _private_func(foo, session) @@ -176,7 +163,7 @@ Recommended ways to use sessions within this framework: def _private_func(foo, session=None): if not session: - session = get_session() + session = sessionmaker() with session.begin(subtransaction=True): # do some other database things return bar @@ -240,7 +227,7 @@ Efficient use of soft deletes: def complex_soft_delete_with_synchronization_bar(session=None): if session is None: - session = get_session() + session = sessionmaker() with session.begin(subtransactions=True): count = (model_query(BarModel). find(some_condition). @@ -257,7 +244,7 @@ Efficient use of soft deletes: .. code:: python def soft_delete_bar_model(): - session = get_session() + session = sessionmaker() with session.begin(): bar_ref = model_query(BarModel).find(some_condition).first() # Work with bar_ref @@ -269,7 +256,7 @@ Efficient use of soft deletes: .. code:: python def soft_delete_multi_models(): - session = get_session() + session = sessionmaker() with session.begin(): query = (model_query(BarModel, session=session). find(some_condition)) @@ -293,11 +280,9 @@ Efficient use of soft deletes: import functools import logging -import os.path import re import time -from oslo.config import cfg import six from sqlalchemy import exc as sqla_exc from sqlalchemy.interfaces import PoolListener @@ -306,150 +291,12 @@ from sqlalchemy.pool import NullPool, StaticPool from sqlalchemy.sql.expression import literal_column from glance.openstack.common.db import exception -from glance.openstack.common.gettextutils import _ +from glance.openstack.common.gettextutils import _LE, _LW, _LI from glance.openstack.common import timeutils -sqlite_db_opts = [ - cfg.StrOpt('sqlite_db', - default='glance.sqlite', - help='The file name to use with SQLite'), - cfg.BoolOpt('sqlite_synchronous', - default=True, - help='If True, SQLite uses synchronous mode'), -] - -database_opts = [ - cfg.StrOpt('connection', - default='sqlite:///' + - os.path.abspath(os.path.join(os.path.dirname(__file__), - '../', '$sqlite_db')), - help='The SQLAlchemy connection string used to connect to the ' - 'database', - secret=True, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_connection', - group='DATABASE'), - cfg.DeprecatedOpt('connection', - group='sql'), ]), - cfg.StrOpt('slave_connection', - default='', - secret=True, - help='The SQLAlchemy connection string used to connect to the ' - 'slave database'), - cfg.IntOpt('idle_timeout', - default=3600, - deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_idle_timeout', - group='DATABASE'), - cfg.DeprecatedOpt('idle_timeout', - group='sql')], - help='Timeout before idle sql connections are reaped'), - cfg.IntOpt('min_pool_size', - default=1, - deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_min_pool_size', - group='DATABASE')], - help='Minimum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('max_pool_size', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_max_pool_size', - group='DATABASE')], - help='Maximum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('max_retries', - default=10, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_max_retries', - group='DATABASE')], - help='Maximum db connection retries during startup. ' - '(setting -1 implies an infinite retry count)'), - cfg.IntOpt('retry_interval', - default=10, - deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', - group='DEFAULT'), - cfg.DeprecatedOpt('reconnect_interval', - group='DATABASE')], - help='Interval between retries of opening a sql connection'), - cfg.IntOpt('max_overflow', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', - group='DEFAULT'), - cfg.DeprecatedOpt('sqlalchemy_max_overflow', - group='DATABASE')], - help='If set, use this value for max_overflow with sqlalchemy'), - cfg.IntOpt('connection_debug', - default=0, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', - group='DEFAULT')], - help='Verbosity of SQL debugging information. 0=None, ' - '100=Everything'), - cfg.BoolOpt('connection_trace', - default=False, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', - group='DEFAULT')], - help='Add python stack traces to SQL as comment strings'), - cfg.IntOpt('pool_timeout', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', - group='DATABASE')], - help='If set, use this value for pool_timeout with sqlalchemy'), -] - -CONF = cfg.CONF -CONF.register_opts(sqlite_db_opts) -CONF.register_opts(database_opts, 'database') LOG = logging.getLogger(__name__) -_ENGINE = None -_MAKER = None -_SLAVE_ENGINE = None -_SLAVE_MAKER = None - - -def set_defaults(sql_connection, sqlite_db, max_pool_size=None, - max_overflow=None, pool_timeout=None): - """Set defaults for configuration variables.""" - cfg.set_defaults(database_opts, - connection=sql_connection) - cfg.set_defaults(sqlite_db_opts, - sqlite_db=sqlite_db) - # Update the QueuePool defaults - if max_pool_size is not None: - cfg.set_defaults(database_opts, - max_pool_size=max_pool_size) - if max_overflow is not None: - cfg.set_defaults(database_opts, - max_overflow=max_overflow) - if pool_timeout is not None: - cfg.set_defaults(database_opts, - pool_timeout=pool_timeout) - - -def cleanup(): - global _ENGINE, _MAKER - global _SLAVE_ENGINE, _SLAVE_MAKER - - if _MAKER: - _MAKER.close_all() - _MAKER = None - if _ENGINE: - _ENGINE.dispose() - _ENGINE = None - if _SLAVE_MAKER: - _SLAVE_MAKER.close_all() - _SLAVE_MAKER = None - if _SLAVE_ENGINE: - _SLAVE_ENGINE.dispose() - _SLAVE_ENGINE = None - class SqliteForeignKeysListener(PoolListener): """Ensures that the foreign key constraints are enforced in SQLite. @@ -462,30 +309,6 @@ class SqliteForeignKeysListener(PoolListener): dbapi_con.execute('pragma foreign_keys=ON') -def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False, - slave_session=False, mysql_traditional_mode=False): - """Return a SQLAlchemy session.""" - global _MAKER - global _SLAVE_MAKER - maker = _MAKER - - if slave_session: - maker = _SLAVE_MAKER - - if maker is None: - engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session, - mysql_traditional_mode=mysql_traditional_mode) - maker = get_maker(engine, autocommit, expire_on_commit) - - if slave_session: - _SLAVE_MAKER = maker - else: - _MAKER = maker - - session = maker() - return session - - # note(boris-42): In current versions of DB backends unique constraint # violation messages follow the structure: # @@ -509,11 +332,20 @@ def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False, # 'c1'") # N columns - (IntegrityError) (1062, "Duplicate entry 'values joined # with -' for key 'name_of_our_constraint'") +# +# ibm_db_sa: +# N columns - (IntegrityError) SQL0803N One or more values in the INSERT +# statement, UPDATE statement, or foreign key update caused by a +# DELETE statement are not valid because the primary key, unique +# constraint or unique index identified by "2" constrains table +# "NOVA.KEY_PAIRS" from having duplicate values for the index +# key. _DUP_KEY_RE_DB = { "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")), "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),), - "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),) + "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),), + "ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),), } @@ -535,7 +367,7 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name): return [columns] return columns[len(uniqbase):].split("0")[1:] - if engine_name not in ["mysql", "sqlite", "postgresql"]: + if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]: return # FIXME(johannes): The usage of the .message attribute has been @@ -550,7 +382,12 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name): else: return - columns = match.group(1) + # NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the + # columns so we have to omit that from the DBDuplicateEntry error. + columns = '' + + if engine_name != 'ibm_db_sa': + columns = match.group(1) if engine_name == "sqlite": columns = [c.split('.')[-1] for c in columns.strip().split(", ")] @@ -591,57 +428,39 @@ def _raise_if_deadlock_error(operational_error, engine_name): def _wrap_db_error(f): + #TODO(rpodolyaka): in a subsequent commit make this a class decorator to + # ensure it can only applied to Session subclasses instances (as we use + # Session instance bind attribute below) + @functools.wraps(f) - def _wrap(*args, **kwargs): + def _wrap(self, *args, **kwargs): try: - return f(*args, **kwargs) + return f(self, *args, **kwargs) except UnicodeEncodeError: raise exception.DBInvalidUnicodeParameter() - # note(boris-42): We should catch unique constraint violation and - # wrap it by our own DBDuplicateEntry exception. Unique constraint - # violation is wrapped by IntegrityError. except sqla_exc.OperationalError as e: - _raise_if_deadlock_error(e, get_engine().name) + _raise_if_db_connection_lost(e, self.bind) + _raise_if_deadlock_error(e, self.bind.dialect.name) # NOTE(comstud): A lot of code is checking for OperationalError # so let's not wrap it for now. raise + # note(boris-42): We should catch unique constraint violation and + # wrap it by our own DBDuplicateEntry exception. Unique constraint + # violation is wrapped by IntegrityError. except sqla_exc.IntegrityError as e: # note(boris-42): SqlAlchemy doesn't unify errors from different # DBs so we must do this. Also in some tables (for example # instance_types) there are more than one unique constraint. This # means we should get names of columns, which values violate # unique constraint, from error message. - _raise_if_duplicate_entry_error(e, get_engine().name) + _raise_if_duplicate_entry_error(e, self.bind.dialect.name) raise exception.DBError(e) except Exception as e: - LOG.exception(_('DB exception wrapped.')) + LOG.exception(_LE('DB exception wrapped.')) raise exception.DBError(e) return _wrap -def get_engine(sqlite_fk=False, slave_engine=False, - mysql_traditional_mode=False): - """Return a SQLAlchemy engine.""" - global _ENGINE - global _SLAVE_ENGINE - engine = _ENGINE - db_uri = CONF.database.connection - - if slave_engine: - engine = _SLAVE_ENGINE - db_uri = CONF.database.slave_connection - - if engine is None: - engine = create_engine(db_uri, sqlite_fk=sqlite_fk, - mysql_traditional_mode=mysql_traditional_mode) - if slave_engine: - _SLAVE_ENGINE = engine - else: - _ENGINE = engine - - return engine - - def _synchronous_switch_listener(dbapi_conn, connection_rec): """Switch sqlite connections to non-synchronous mode.""" dbapi_conn.execute("PRAGMA synchronous = OFF") @@ -683,7 +502,7 @@ def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy): cursor.execute(ping_sql) except Exception as ex: if engine.dialect.is_disconnect(ex, dbapi_conn, cursor): - msg = _('Database server has gone away: %s') % ex + msg = _LW('Database server has gone away: %s') % ex LOG.warning(msg) raise sqla_exc.DisconnectionError(msg) else: @@ -698,7 +517,44 @@ def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy): than a declared field just with warning. That is fraught with data corruption. """ - dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;") + _set_session_sql_mode(dbapi_con, connection_rec, + connection_proxy, 'TRADITIONAL') + + +def _set_session_sql_mode(dbapi_con, connection_rec, + connection_proxy, sql_mode=None): + """Set the sql_mode session variable. + + MySQL supports several server modes. The default is None, but sessions + may choose to enable server modes like TRADITIONAL, ANSI, + several STRICT_* modes and others. + + Note: passing in '' (empty string) for sql_mode clears + the SQL mode for the session, overriding a potentially set + server default. Passing in None (the default) makes this + a no-op, meaning if a server-side SQL mode is set, it still applies. + """ + cursor = dbapi_con.cursor() + if sql_mode is not None: + cursor.execute("SET SESSION sql_mode = %s", [sql_mode]) + + # Check against the real effective SQL mode. Even when unset by + # our own config, the server may still be operating in a specific + # SQL mode as set by the server configuration + cursor.execute("SHOW VARIABLES LIKE 'sql_mode'") + row = cursor.fetchone() + if row is None: + LOG.warning(_LW('Unable to detect effective SQL mode')) + return + realmode = row[1] + LOG.info(_LI('MySQL server mode set to %s') % realmode) + # 'TRADITIONAL' mode enables several other modes, so + # we need a substring match here + if not ('TRADITIONAL' in realmode.upper() or + 'STRICT_ALL_TABLES' in realmode.upper()): + LOG.warning(_LW("MySQL SQL mode is '%s', " + "consider enabling TRADITIONAL or STRICT_ALL_TABLES") + % realmode) def _is_db_connection_error(args): @@ -713,69 +569,79 @@ def _is_db_connection_error(args): return False -def create_engine(sql_connection, sqlite_fk=False, - mysql_traditional_mode=False): +def _raise_if_db_connection_lost(error, engine): + # NOTE(vsergeyev): Function is_disconnect(e, connection, cursor) + # requires connection and cursor in incoming parameters, + # but we have no possibility to create connection if DB + # is not available, so in such case reconnect fails. + # But is_disconnect() ignores these parameters, so it + # makes sense to pass to function None as placeholder + # instead of connection and cursor. + if engine.dialect.is_disconnect(error, None, None): + raise exception.DBConnectionError(error) + + +def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, + mysql_traditional_mode=False, idle_timeout=3600, + connection_debug=0, max_pool_size=None, max_overflow=None, + pool_timeout=None, sqlite_synchronous=True, + connection_trace=False, max_retries=10, retry_interval=10): """Return a new SQLAlchemy engine.""" - # NOTE(geekinutah): At this point we could be connecting to the normal - # db handle or the slave db handle. Things like - # _wrap_db_error aren't going to work well if their - # backends don't match. Let's check. - _assert_matching_drivers() + connection_dict = sqlalchemy.engine.url.make_url(sql_connection) engine_args = { - "pool_recycle": CONF.database.idle_timeout, - "echo": False, + "pool_recycle": idle_timeout, 'convert_unicode': True, } - # Map our SQL debug level to SQLAlchemy's options - if CONF.database.connection_debug >= 100: - engine_args['echo'] = 'debug' - elif CONF.database.connection_debug >= 50: - engine_args['echo'] = True + logger = logging.getLogger('sqlalchemy.engine') + + # Map SQL debug level to Python log level + if connection_debug >= 100: + logger.setLevel(logging.DEBUG) + elif connection_debug >= 50: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.WARNING) if "sqlite" in connection_dict.drivername: if sqlite_fk: engine_args["listeners"] = [SqliteForeignKeysListener()] engine_args["poolclass"] = NullPool - if CONF.database.connection == "sqlite://": + if sql_connection == "sqlite://": engine_args["poolclass"] = StaticPool engine_args["connect_args"] = {'check_same_thread': False} else: - if CONF.database.max_pool_size is not None: - engine_args['pool_size'] = CONF.database.max_pool_size - if CONF.database.max_overflow is not None: - engine_args['max_overflow'] = CONF.database.max_overflow - if CONF.database.pool_timeout is not None: - engine_args['pool_timeout'] = CONF.database.pool_timeout + if max_pool_size is not None: + engine_args['pool_size'] = max_pool_size + if max_overflow is not None: + engine_args['max_overflow'] = max_overflow + if pool_timeout is not None: + engine_args['pool_timeout'] = pool_timeout engine = sqlalchemy.create_engine(sql_connection, **engine_args) sqlalchemy.event.listen(engine, 'checkin', _thread_yield) if engine.name in ['mysql', 'ibm_db_sa']: - callback = functools.partial(_ping_listener, engine) - sqlalchemy.event.listen(engine, 'checkout', callback) + ping_callback = functools.partial(_ping_listener, engine) + sqlalchemy.event.listen(engine, 'checkout', ping_callback) if engine.name == 'mysql': if mysql_traditional_mode: - sqlalchemy.event.listen(engine, 'checkout', - _set_mode_traditional) - else: - LOG.warning(_("This application has not enabled MySQL " - "traditional mode, which means silent " - "data corruption may occur. " - "Please encourage the application " - "developers to enable this mode.")) + mysql_sql_mode = 'TRADITIONAL' + if mysql_sql_mode: + mode_callback = functools.partial(_set_session_sql_mode, + sql_mode=mysql_sql_mode) + sqlalchemy.event.listen(engine, 'checkout', mode_callback) elif 'sqlite' in connection_dict.drivername: - if not CONF.sqlite_synchronous: + if not sqlite_synchronous: sqlalchemy.event.listen(engine, 'connect', _synchronous_switch_listener) sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener) - if (CONF.database.connection_trace and - engine.dialect.dbapi.__name__ == 'MySQLdb'): + if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb': _patch_mysqldb_with_stacktrace_comments() try: @@ -784,15 +650,15 @@ def create_engine(sql_connection, sqlite_fk=False, if not _is_db_connection_error(e.args[0]): raise - remaining = CONF.database.max_retries + remaining = max_retries if remaining == -1: remaining = 'infinite' while True: - msg = _('SQL connection failed. %s attempts left.') + msg = _LW('SQL connection failed. %s attempts left.') LOG.warning(msg % remaining) if remaining != 'infinite': remaining -= 1 - time.sleep(CONF.database.retry_interval) + time.sleep(retry_interval) try: engine.connect() break @@ -879,13 +745,116 @@ def _patch_mysqldb_with_stacktrace_comments(): setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) -def _assert_matching_drivers(): - """Make sure slave handle and normal handle have the same driver.""" - # NOTE(geekinutah): There's no use case for writing to one backend and - # reading from another. Who knows what the future holds? - if CONF.database.slave_connection == '': - return +class EngineFacade(object): + """A helper class for removing of global engine instances from glance.db. - normal = sqlalchemy.engine.url.make_url(CONF.database.connection) - slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection) - assert normal.drivername == slave.drivername + As a library, glance.db can't decide where to store/when to create engine + and sessionmaker instances, so this must be left for a target application. + + On the other hand, in order to simplify the adoption of glance.db changes, + we'll provide a helper class, which creates engine and sessionmaker + on its instantiation and provides get_engine()/get_session() methods + that are compatible with corresponding utility functions that currently + exist in target projects, e.g. in Nova. + + engine/sessionmaker instances will still be global (and they are meant to + be global), but they will be stored in the app context, rather that in the + glance.db context. + + Note: using of this helper is completely optional and you are encouraged to + integrate engine/sessionmaker instances into your apps any way you like + (e.g. one might want to bind a session to a request context). Two important + things to remember: + 1. An Engine instance is effectively a pool of DB connections, so it's + meant to be shared (and it's thread-safe). + 2. A Session instance is not meant to be shared and represents a DB + transactional context (i.e. it's not thread-safe). sessionmaker is + a factory of sessions. + + """ + + def __init__(self, sql_connection, + sqlite_fk=False, mysql_sql_mode=None, + autocommit=True, expire_on_commit=False, **kwargs): + """Initialize engine and sessionmaker instances. + + :param sqlite_fk: enable foreign keys in SQLite + :type sqlite_fk: bool + + :param mysql_sql_mode: set SQL mode in MySQL + :type mysql_sql_mode: string + + :param autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :param expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + Keyword arguments: + + :keyword idle_timeout: timeout before idle sql connections are reaped + (defaults to 3600) + :keyword connection_debug: verbosity of SQL debugging information. + 0=None, 100=Everything (defaults to 0) + :keyword max_pool_size: maximum number of SQL connections to keep open + in a pool (defaults to SQLAlchemy settings) + :keyword max_overflow: if set, use this value for max_overflow with + sqlalchemy (defaults to SQLAlchemy settings) + :keyword pool_timeout: if set, use this value for pool_timeout with + sqlalchemy (defaults to SQLAlchemy settings) + :keyword sqlite_synchronous: if True, SQLite uses synchronous mode + (defaults to True) + :keyword connection_trace: add python stack traces to SQL as comment + strings (defaults to False) + :keyword max_retries: maximum db connection retries during startup. + (setting -1 implies an infinite retry count) + (defaults to 10) + :keyword retry_interval: interval between retries of opening a sql + connection (defaults to 10) + + """ + + super(EngineFacade, self).__init__() + + self._engine = create_engine( + sql_connection=sql_connection, + sqlite_fk=sqlite_fk, + mysql_sql_mode=mysql_sql_mode, + idle_timeout=kwargs.get('idle_timeout', 3600), + connection_debug=kwargs.get('connection_debug', 0), + max_pool_size=kwargs.get('max_pool_size'), + max_overflow=kwargs.get('max_overflow'), + pool_timeout=kwargs.get('pool_timeout'), + sqlite_synchronous=kwargs.get('sqlite_synchronous', True), + connection_trace=kwargs.get('connection_trace', False), + max_retries=kwargs.get('max_retries', 10), + retry_interval=kwargs.get('retry_interval', 10)) + self._session_maker = get_maker( + engine=self._engine, + autocommit=autocommit, + expire_on_commit=expire_on_commit) + + def get_engine(self): + """Get the engine instance (note, that it's shared).""" + + return self._engine + + def get_session(self, **kwargs): + """Get a Session instance. + + If passed, keyword arguments values override the ones used when the + sessionmaker instance was created. + + :keyword autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :keyword expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + """ + + for arg in kwargs: + if arg not in ('autocommit', 'expire_on_commit'): + del kwargs[arg] + + return self._session_maker(**kwargs) diff --git a/glance/openstack/common/db/sqlalchemy/test_base.py b/glance/openstack/common/db/sqlalchemy/test_base.py index 08b2edc8ef..235ed5748c 100644 --- a/glance/openstack/common/db/sqlalchemy/test_base.py +++ b/glance/openstack/common/db/sqlalchemy/test_base.py @@ -18,7 +18,6 @@ import functools import os import fixtures -from oslo.config import cfg import six from glance.openstack.common.db.sqlalchemy import session @@ -38,18 +37,17 @@ class DbFixture(fixtures.Fixture): def _get_uri(self): return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://') - def __init__(self): + def __init__(self, test): super(DbFixture, self).__init__() - self.conf = cfg.CONF - self.conf.import_opt('connection', - 'glance.openstack.common.db.sqlalchemy.session', - group='database') + + self.test = test def setUp(self): super(DbFixture, self).setUp() - self.conf.set_default('connection', self._get_uri(), group='database') - self.addCleanup(self.conf.reset) + self.test.engine = session.create_engine(self._get_uri()) + self.test.sessionmaker = session.get_maker(self.test.engine) + self.addCleanup(self.test.engine.dispose) class DbTestCase(test.BaseTestCase): @@ -64,9 +62,7 @@ class DbTestCase(test.BaseTestCase): def setUp(self): super(DbTestCase, self).setUp() - self.useFixture(self.FIXTURE()) - - self.addCleanup(session.cleanup) + self.useFixture(self.FIXTURE(self)) ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql'] @@ -83,11 +79,10 @@ def backend_specific(*dialects): if not set(dialects).issubset(ALLOWED_DIALECTS): raise ValueError( "Please use allowed dialects: %s" % ALLOWED_DIALECTS) - engine = session.get_engine() - if engine.name not in dialects: + if self.engine.name not in dialects: msg = ('The test "%s" can be run ' 'only on %s. Current engine is %s.') - args = (f.__name__, ' '.join(dialects), engine.name) + args = (f.__name__, ' '.join(dialects), self.engine.name) self.skip(msg % args) else: return f(self) diff --git a/glance/openstack/common/db/sqlalchemy/test_migrations.py b/glance/openstack/common/db/sqlalchemy/test_migrations.py index 4c5d84706c..a49129758c 100644 --- a/glance/openstack/common/db/sqlalchemy/test_migrations.py +++ b/glance/openstack/common/db/sqlalchemy/test_migrations.py @@ -21,12 +21,12 @@ import subprocess import lockfile from six import moves +from six.moves.urllib import parse import sqlalchemy import sqlalchemy.exc from glance.openstack.common.db.sqlalchemy import utils -from glance.openstack.common.gettextutils import _ -from glance.openstack.common.py3kcompat import urlutils +from glance.openstack.common.gettextutils import _LE from glance.openstack.common import test LOG = logging.getLogger(__name__) @@ -60,10 +60,10 @@ def _set_db_lock(lock_path=None, lock_prefix=None): path = lock_path or os.environ.get("GLANCE_LOCK_PATH") lock = lockfile.FileLock(os.path.join(path, lock_prefix)) with lock: - LOG.debug(_('Got lock "%s"') % f.__name__) + LOG.debug('Got lock "%s"' % f.__name__) return f(*args, **kwargs) finally: - LOG.debug(_('Lock released "%s"') % f.__name__) + LOG.debug('Lock released "%s"' % f.__name__) return wrapper return decorator @@ -153,7 +153,7 @@ class BaseMigrationTestCase(test.BaseTestCase): def _reset_databases(self): for key, engine in self.engines.items(): conn_string = self.test_databases[key] - conn_pieces = urlutils.urlparse(conn_string) + conn_pieces = parse.urlparse(conn_string) engine.dispose() if conn_string.startswith('sqlite'): # We can just delete the SQLite database, which is @@ -264,6 +264,6 @@ class WalkVersionsMixin(object): if check: check(engine, data) except Exception: - LOG.error("Failed to migrate to version %s on engine %s" % + LOG.error(_LE("Failed to migrate to version %s on engine %s") % (version, engine)) raise diff --git a/glance/openstack/common/db/sqlalchemy/utils.py b/glance/openstack/common/db/sqlalchemy/utils.py index b367b0a558..4d68f0c619 100644 --- a/glance/openstack/common/db/sqlalchemy/utils.py +++ b/glance/openstack/common/db/sqlalchemy/utils.py @@ -30,6 +30,7 @@ from sqlalchemy import func from sqlalchemy import Index from sqlalchemy import Integer from sqlalchemy import MetaData +from sqlalchemy import or_ from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql.expression import UpdateBase from sqlalchemy.sql import select @@ -37,7 +38,9 @@ from sqlalchemy import String from sqlalchemy import Table from sqlalchemy.types import NullType -from glance.openstack.common.gettextutils import _ +from glance.openstack.common import context as request_context +from glance.openstack.common.db.sqlalchemy import models +from glance.openstack.common.gettextutils import _, _LI, _LW from glance.openstack.common import timeutils @@ -93,7 +96,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None, if 'id' not in sort_keys: # TODO(justinsb): If this ever gives a false-positive, check # the actual primary key, rather than assuming its id - LOG.warning(_('Id not in sort_keys; is sort_keys unique?')) + LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?')) assert(not (sort_dir and sort_dirs)) @@ -156,6 +159,94 @@ def paginate_query(query, model, limit, sort_keys, marker=None, return query +def _read_deleted_filter(query, db_model, read_deleted): + if 'deleted' not in db_model.__table__.columns: + raise ValueError(_("There is no `deleted` column in `%s` table. " + "Project doesn't use soft-deleted feature.") + % db_model.__name__) + + default_deleted_value = db_model.__table__.c.deleted.default.arg + if read_deleted == 'no': + query = query.filter(db_model.deleted == default_deleted_value) + elif read_deleted == 'yes': + pass # omit the filter to include deleted and active + elif read_deleted == 'only': + query = query.filter(db_model.deleted != default_deleted_value) + else: + raise ValueError(_("Unrecognized read_deleted value '%s'") + % read_deleted) + return query + + +def _project_filter(query, db_model, context, project_only): + if project_only and 'project_id' not in db_model.__table__.columns: + raise ValueError(_("There is no `project_id` column in `%s` table.") + % db_model.__name__) + + if request_context.is_user_context(context) and project_only: + if project_only == 'allow_none': + is_none = None + query = query.filter(or_(db_model.project_id == context.project_id, + db_model.project_id == is_none)) + else: + query = query.filter(db_model.project_id == context.project_id) + + return query + + +def model_query(context, model, session, args=None, project_only=False, + read_deleted=None): + """Query helper that accounts for context's `read_deleted` field. + + :param context: context to query under + + :param model: Model to query. Must be a subclass of ModelBase. + :type model: models.ModelBase + + :param session: The session to use. + :type session: sqlalchemy.orm.session.Session + + :param args: Arguments to query. If None - model is used. + :type args: tuple + + :param project_only: If present and context is user-type, then restrict + query to match the context's project_id. If set to + 'allow_none', restriction includes project_id = None. + :type project_only: bool + + :param read_deleted: If present, overrides context's read_deleted field. + :type read_deleted: bool + + Usage: + result = (utils.model_query(context, models.Instance, session=session) + .filter_by(uuid=instance_uuid) + .all()) + + query = utils.model_query( + context, Node, + session=session, + args=(func.count(Node.id), func.sum(Node.ram)) + ).filter_by(project_id=project_id) + """ + + if not read_deleted: + if hasattr(context, 'read_deleted'): + # NOTE(viktors): some projects use `read_deleted` attribute in + # their contexts instead of `show_deleted`. + read_deleted = context.read_deleted + else: + read_deleted = context.show_deleted + + if not issubclass(model, models.ModelBase): + raise TypeError(_("model should be a subclass of ModelBase")) + + query = session.query(model) if not args else session.query(*args) + query = _read_deleted_filter(query, model, read_deleted) + query = _project_filter(query, model, context, project_only) + + return query + + def get_table(engine, name): """Returns an sqlalchemy table dynamically from db. @@ -276,8 +367,8 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name, rows_to_delete_select = select([table.c.id]).where(delete_condition) for row in migrate_engine.execute(rows_to_delete_select).fetchall(): - LOG.info(_("Deleting duplicated row with id: %(id)s from table: " - "%(table)s") % dict(id=row[0], table=table_name)) + LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: " + "%(table)s") % dict(id=row[0], table=table_name)) if use_soft_delete: delete_statement = table.update().\ diff --git a/glance/openstack/common/gettextutils.py b/glance/openstack/common/gettextutils.py index c47c99c88c..6f39d28612 100644 --- a/glance/openstack/common/gettextutils.py +++ b/glance/openstack/common/gettextutils.py @@ -23,6 +23,7 @@ Usual usage in an openstack.common module: """ import copy +import functools import gettext import locale from logging import handlers @@ -35,6 +36,17 @@ import six _localedir = os.environ.get('glance'.upper() + '_LOCALEDIR') _t = gettext.translation('glance', localedir=_localedir, fallback=True) +# We use separate translation catalogs for each log level, so set up a +# mapping between the log level name and the translator. The domain +# for the log level is project_name + "-log-" + log_level so messages +# for each level end up in their own catalog. +_t_log_levels = dict( + (level, gettext.translation('glance' + '-log-' + level, + localedir=_localedir, + fallback=True)) + for level in ['info', 'warning', 'error', 'critical'] +) + _AVAILABLE_LANGUAGES = {} USE_LAZY = False @@ -60,6 +72,28 @@ def _(msg): return _t.ugettext(msg) +def _log_translation(msg, level): + """Build a single translation of a log message + """ + if USE_LAZY: + return Message(msg, domain='glance' + '-log-' + level) + else: + translator = _t_log_levels[level] + if six.PY3: + return translator.gettext(msg) + return translator.ugettext(msg) + +# Translators for log levels. +# +# The abbreviated names are meant to reflect the usual use of a short +# name like '_'. The "L" is for "log" and the other letter comes from +# the level. +_LI = functools.partial(_log_translation, level='info') +_LW = functools.partial(_log_translation, level='warning') +_LE = functools.partial(_log_translation, level='error') +_LC = functools.partial(_log_translation, level='critical') + + def install(domain, lazy=False): """Install a _() function using the given translation domain. diff --git a/glance/tests/functional/__init__.py b/glance/tests/functional/__init__.py index d3016d3db1..51351471b8 100644 --- a/glance/tests/functional/__init__.py +++ b/glance/tests/functional/__init__.py @@ -35,10 +35,10 @@ import time import fixtures import six.moves.urllib.parse as urlparse -from sqlalchemy import create_engine import testtools from glance.common import utils +from glance.db.sqlalchemy import api as db_api from glance.openstack.common import jsonutils from glance.openstack.common import units from glance import tests as glance_tests @@ -915,8 +915,7 @@ class FunctionalTest(test_utils.BaseTestCase): DB verification within the functional tests. The raw result set is returned. """ - engine = create_engine(self.registry_server.sql_connection, - pool_recycle=30) + engine = db_api.get_engine() return engine.execute(sql) def copy_data_file(self, file_name, dst_dir): diff --git a/glance/tests/unit/test_manage.py b/glance/tests/unit/test_manage.py index 5933f5647c..4336446149 100644 --- a/glance/tests/unit/test_manage.py +++ b/glance/tests/unit/test_manage.py @@ -20,29 +20,38 @@ import testtools import glance from glance.cmd import manage from glance.db import migration as db_migration +from glance.db.sqlalchemy import api as db_api from glance.openstack.common.db.sqlalchemy import migration -class TestLegacyManage(testtools.TestCase): +class TestManageBase(testtools.TestCase): def setUp(self): - super(TestLegacyManage, self).setUp() + super(TestManageBase, self).setUp() def clear_conf(): manage.CONF.reset() manage.CONF.unregister_opt(manage.command_opt) self.addCleanup(clear_conf) + self.patcher = mock.patch('glance.db.sqlalchemy.api.get_engine') + self.patcher.start() + self.addCleanup(self.patcher.stop) + def _main_test_helper(self, argv, func_name=None, *exp_args): self.useFixture(fixtures.MonkeyPatch('sys.argv', argv)) manage.main() func_name.assert_called_once_with(*exp_args) + +class TestLegacyManage(TestManageBase): + def test_legacy_db_version(self): migration.db_version = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db_version'], glance.openstack.common.db.sqlalchemy. migration.db_version, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, 0) def test_legacy_db_sync(self): @@ -50,92 +59,92 @@ class TestLegacyManage(testtools.TestCase): self._main_test_helper(['glance.cmd.manage', 'db_sync'], glance.openstack.common.db.sqlalchemy. migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, None) def test_legacy_db_upgrade(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db_upgrade'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, None) def test_legacy_db_version_control(self): migration.db_version_control = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db_version_control'], migration.db_version_control, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, None) def test_legacy_db_sync_version(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db_sync', '20'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, '20') def test_legacy_db_upgrade_version(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db_upgrade', '20'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, '20') def test_legacy_db_downgrade_version(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db_downgrade', '20'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, '20') -class TestManage(testtools.TestCase): - - def setUp(self): - super(TestManage, self).setUp() - - def clear_conf(): - manage.CONF.reset() - manage.CONF.unregister_opt(manage.command_opt) - self.addCleanup(clear_conf) - - def _main_test_helper(self, argv, func_name=None, *exp_args): - self.useFixture(fixtures.MonkeyPatch('sys.argv', argv)) - manage.main() - func_name.assert_called_once_with(*exp_args) +class TestManage(TestManageBase): def test_db_version(self): migration.db_version = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db', 'version'], migration.db_version, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, 0) def test_db_sync(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db', 'sync'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, None) def test_db_upgrade(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db', 'upgrade'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, None) def test_db_version_control(self): migration.db_version_control = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db', 'version_control'], migration.db_version_control, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, None) def test_db_sync_version(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db', 'sync', '20'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, u'20') def test_db_upgrade_version(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db', 'upgrade', '20'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, '20') def test_db_downgrade_version(self): migration.db_sync = mock.Mock() self._main_test_helper(['glance.cmd.manage', 'db', 'downgrade', '20'], migration.db_sync, + db_api.get_engine(), db_migration.MIGRATE_REPO_PATH, '20')