Implement new oslo.db.sqlalchemy.enginefacade module

This module presents a replacement for the EngineFacade
system.  At the center is the oslo.db.sqlalchemy.enginefacade
module, which when imported, provides decorators and context
managers which perform all database and ORM connectivity
functions transparently.   The docstrings as well
as the blueprint provide an introduction.

The patch includes a refactoring of sqlalchemy/session.py
into three dependent modules engines.py, orm.py and
enginefacade.py.  This is to maintain a non-cyclical import
structure as well as to maintain the import behavior of
oslo.db overall, as some projects such as glance currently
have dependencies on this structure.

There is also a slimming down and attempt at modernizing
some very old documentation in session.py.  The enginefacade
system should be preferred moving forward.

Implements: blueprint make-enginefacade-a-facade

Change-Id: I9a3d0c26bb727eb2c0bd823b9a12fde57cc7c9c3
This commit is contained in:
Mike Bayer 2014-12-01 19:39:15 -05:00
parent 42dc93608f
commit fdbd928b1f
12 changed files with 3395 additions and 835 deletions

View File

@ -7,26 +7,91 @@ To use oslo.db in a project:
Session Handling
================
Session handling is achieved using the :mod:`oslo_db.sqlalchemy.enginefacade`
system. This module presents a function decorator as well as a
context manager approach to delivering :class:`.Session` as well as
:class:`.Connection` objects to a function or block.
Both calling styles require the use of a context object. This object may
be of any class, though when used with the decorator form, requires
special instrumentation.
The context manager form is as follows:
.. code:: python
from oslo.config import cfg
from oslo.db.sqlalchemy import session as db_session
_FACADE = None
from oslo_db.sqlalchemy import enginefacade
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
_FACADE = db_session.EngineFacade.from_config(cfg.CONF)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
class MyContext(object):
"User-defined context class."
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def some_reader_api_function(context):
with enginefacade.reader.using(context) as session:
return session.query(SomeClass).all()
def some_writer_api_function(context, x, y):
with enginefacade.writer.using(context) as session:
session.add(SomeClass(x, y))
def run_some_database_calls():
context = MyContext()
results = some_reader_api_function(context)
some_writer_api_function(context, 5, 10)
The decorator form accesses attributes off the user-defined context
directly; the context must be decorated with the
:func:`oslo_db.sqlalchemy.enginefacade.transaction_context_provider`
decorator. Each function must receive the context as the first
positional argument:
.. code:: python
from oslo_db.sqlalchemy import enginefacade
@enginefacade.transaction_context_provider
class MyContext(object):
"User-defined context class."
@enginefacade.reader
def some_reader_api_function(context):
return context.session.query(SomeClass).all()
@enginefacade.writer
def some_writer_api_function(context, x, y):
context.session.add(SomeClass(x, y))
def run_some_database_calls():
context = MyContext()
results = some_reader_api_function(context)
some_writer_api_function(context, 5, 10)
The scope of transaction and connectivity for both approaches is managed
transparently. The configuration for the connection comes from the standard
:obj:`oslo_config.cfg.CONF` collection. Additional configurations can be
established for the enginefacade using the
:func:`oslo_db.sqlalchemy.enginefacade.configure` function, before any use of
the database begins:
.. code:: python
from oslo_db.sqlalchemy import enginefacade
enginefacade.configure(
sqlite_fk=True,
max_retries=5,
mysql_sql_mode='ANSI'
)
Base class for models usage

View File

@ -205,3 +205,46 @@ class RetryRequest(Exception):
"""
def __init__(self, inner_exc):
self.inner_exc = inner_exc
class NoEngineContextEstablished(AttributeError):
"""Error raised for non-present enginefacade attribute access.
This applies to the ``session`` and ``connection`` attributes
of a user-defined context and/or RequestContext object, when they
are accessed outside of the scope of an enginefacade decorator
or context manager.
The exception is a subclass of AttributeError so that
normal Python missing attribute behaviors are maintained, such
as support for ``getattr(context, 'session', None)``.
"""
class NotSupportedWarning(Warning):
"""Warn that an argument or call that was passed is not supported.
This subclasses Warning so that it can be filtered as a distinct
category.
.. seealso::
https://docs.python.org/2/library/warnings.html
"""
class OsloDBDeprecationWarning(DeprecationWarning):
"""Issued per usage of a deprecated API.
This subclasses DeprecationWarning so that it can be filtered as a distinct
category.
.. seealso::
https://docs.python.org/2/library/warnings.html
"""

View File

@ -0,0 +1,995 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import functools
import operator
import threading
import warnings
from oslo_config import cfg
from oslo_context import context as oslo_context
from oslo_db import exception
from oslo_db import options
from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import orm
class _symbol(object):
"""represent a fixed symbol."""
__slots__ = 'name',
def __init__(self, name):
self.name = name
def __repr__(self):
return "symbol(%r)" % self.name
_ASYNC_READER = _symbol('ASYNC_READER')
"""Represent the transaction state of "async reader".
This state indicates that the transaction is a read-only and is
safe to use on an asynchronously updated slave database.
"""
_READER = _symbol('READER')
"""Represent the transaction state of "reader".
This state indicates that the transaction is a read-only and is
only safe to use on a synchronously updated slave database; otherwise
the master database should be used.
"""
_WRITER = _symbol('WRITER')
"""Represent the transaction state of "writer".
This state indicates that the transaction writes data and
should be directed at the master database.
"""
class _Default(object):
"""Mark a value as a default value.
A value in the local configuration dictionary wrapped with
_Default() will not take precedence over a value that is specified
in cfg.CONF. Values that are set after the fact using configure()
will supersede those in cfg.CONF.
"""
__slots__ = 'value',
_notset = _symbol("NOTSET")
def __init__(self, value=_notset):
self.value = value
@classmethod
def resolve(cls, value):
if isinstance(value, _Default):
v = value.value
if v is cls._notset:
return None
else:
return v
else:
return value
@classmethod
def resolve_w_conf(cls, value, conf_namespace, key):
if isinstance(value, _Default):
v = getattr(conf_namespace, key, value.value)
if v is cls._notset:
return None
else:
return v
else:
return value
@classmethod
def is_set(cls, value):
return not isinstance(value, _Default) or \
value.value is not cls._notset
@classmethod
def is_set_w_conf(cls, value, conf_namespace, key):
return not isinstance(value, _Default) or \
value.value is not cls._notset or \
hasattr(conf_namespace, key)
class _TransactionFactory(object):
"""A factory for :class:`._TransactionContext` objects.
By default, there is just one of these, set up
based on CONF, however instance-level :class:`._TransactionFactory`
objects can be made, as is the case with the
:class:`._TestTransactionFactory` subclass used by the oslo.db test suite.
"""
def __init__(self):
self._url_cfg = {
'connection': _Default(),
'slave_connection': _Default(),
}
self._engine_cfg = {
'sqlite_fk': _Default(False),
'mysql_sql_mode': _Default('TRADITIONAL'),
'idle_timeout': _Default(3600),
'connection_debug': _Default(0),
'max_pool_size': _Default(),
'max_overflow': _Default(),
'pool_timeout': _Default(),
'sqlite_synchronous': _Default(True),
'connection_trace': _Default(False),
'max_retries': _Default(10),
'retry_interval': _Default(10),
'thread_checkin': _Default(True)
}
self._maker_cfg = {
'expire_on_commit': _Default(False),
'__autocommit': True
}
self._transaction_ctx_cfg = {
'rollback_reader_sessions': False,
}
self._facade_cfg = {
'synchronous_reader': True
}
# other options that are defined in oslo.db.options.database_opts
# but do not apply to the standard enginefacade arguments
# (most seem to apply to api.DBAPI).
self._ignored_cfg = dict(
(k, _Default(None)) for k in [
'db_max_retries', 'db_inc_retry_interval',
'use_db_reconnect',
'db_retry_interval', 'min_pool_size',
'db_max_retry_interval',
'sqlite_db', 'backend'])
self._started = False
self._legacy_facade = None
self._start_lock = threading.Lock()
def configure_defaults(self, **kw):
"""Apply default configurational options.
This method can only be called before any specific
transaction-beginning methods have been called.
Configurational options are within a fixed set of keys, and fall
under three categories: URL configuration, engine configuration,
and session configuration. Each key given will be tested against
these three configuration sets to see which one is applicable; if
it is not applicable to any set, an exception is raised.
The configurational options given here act as **defaults**
when the :class:`._TransactionFactory` is configured using
a :class:`.oslo.config.cfg.ConfigOpts` object; the options
present within the :class:`.oslo.config.cfg.ConfigOpts` **take
precedence** versus the arguments passed here. By default,
the :class:`._TransactionFactory` loads in the configuration from
:data:`oslo.config.cfg.CONF`, after applying the
:data:`oslo.db.options.database_opts` configurational defaults to it.
.. seealso::
:meth:`._TransactionFactory.configure`
"""
self._configure(True, kw)
def configure(self, **kw):
"""Apply configurational options.
This method can only be called before any specific
transaction-beginning methods have been called.
Behavior here is the same as that of
:meth:`._TransactionFactory.configure_defaults`,
with the exception that values specified here will **supersede** those
setup in the :class:`.oslo.config.cfg.ConfigOpts` options.
.. seealso::
:meth:`._TransactionFactory.configure_defaults`
"""
self._configure(False, kw)
def _configure(self, as_defaults, kw):
if self._started:
raise TypeError("this TransactionFactory is already started")
not_supported = []
for k, v in kw.items():
for dict_ in (
self._url_cfg, self._engine_cfg,
self._maker_cfg, self._ignored_cfg,
self._facade_cfg, self._transaction_ctx_cfg):
if k in dict_:
dict_[k] = _Default(v) if as_defaults else v
break
else:
not_supported.append(k)
if not_supported:
# would like to raise ValueError here, but there are just
# too many unrecognized (obsolete?) configuration options
# coming in from projects
warnings.warn(
"Configuration option(s) %r not supported" %
sorted(not_supported),
exception.NotSupportedWarning
)
def get_legacy_facade(self):
"""Return a :class:`.LegacyEngineFacade` for this factory.
This facade will make use of the same engine and sessionmaker
as this factory, however will not share the same transaction context;
the legacy facade continues to work the old way of returning
a new Session each time get_session() is called.
"""
if not self._legacy_facade:
self._legacy_facade = LegacyEngineFacade(None, _factory=self)
if not self._started:
self._start()
return self._legacy_facade
def _create_connection(self, mode):
if not self._started:
self._start()
if mode is _WRITER:
return self._writer_engine.connect()
elif self.synchronous_reader or mode is _ASYNC_READER:
return self._reader_engine.connect()
else:
return self._writer_engine.connect()
def _create_session(self, mode, bind=None):
if not self._started:
self._start()
kw = {}
# don't pass 'bind' if bind is None; the sessionmaker
# already has a bind to the engine.
if bind:
kw['bind'] = bind
if mode is _WRITER:
return self._writer_maker(**kw)
elif self.synchronous_reader or mode is _ASYNC_READER:
return self._reader_maker(**kw)
else:
return self._writer_maker(**kw)
def _args_for_conf(self, default_cfg, conf):
if conf is None:
return dict(
(key, _Default.resolve(value))
for key, value in default_cfg.items()
if _Default.is_set(value)
)
else:
return dict(
(key, _Default.resolve_w_conf(value, conf.database, key))
for key, value in default_cfg.items()
if _Default.is_set_w_conf(value, conf.database, key)
)
def _url_args_for_conf(self, conf):
return self._args_for_conf(self._url_cfg, conf)
def _engine_args_for_conf(self, conf):
return self._args_for_conf(self._engine_cfg, conf)
def _maker_args_for_conf(self, conf):
return self._args_for_conf(self._maker_cfg, conf)
def _start(self, conf=False, connection=None, slave_connection=None):
with self._start_lock:
# self._started has been checked on the outside
# when _start() was called. Within the lock,
# check the flag once more to detect the case where
# the start process proceeded while this thread was waiting
# for the lock.
if self._started:
return
self._started = True
if conf is False:
conf = cfg.CONF
# perform register_opts() local to actually using
# the cfg.CONF to maintain exact compatibility with
# the EngineFacade design. This can be changed if needed.
if conf is not None:
conf.register_opts(options.database_opts, 'database')
url_args = self._url_args_for_conf(conf)
if connection:
url_args['connection'] = connection
if slave_connection:
url_args['slave_connection'] = slave_connection
engine_args = self._engine_args_for_conf(conf)
maker_args = self._maker_args_for_conf(conf)
maker_args['autocommit'] = maker_args.pop('__autocommit')
self._writer_engine, self._writer_maker = \
self._setup_for_connection(
url_args['connection'],
engine_args, maker_args)
if url_args.get('slave_connection'):
self._reader_engine, self._reader_maker = \
self._setup_for_connection(
url_args['slave_connection'],
engine_args, maker_args)
else:
self._reader_engine, self._reader_maker = \
self._writer_engine, self._writer_maker
self.synchronous_reader = self._facade_cfg['synchronous_reader']
def _setup_for_connection(
self, sql_connection, engine_kwargs, maker_kwargs):
engine = engines.create_engine(
sql_connection=sql_connection, **engine_kwargs)
sessionmaker = orm.get_maker(engine=engine, **maker_kwargs)
return engine, sessionmaker
class _TestTransactionFactory(_TransactionFactory):
"""A :class:`._TransactionFactory` used by test suites.
This is a :class:`._TransactionFactory` that can be directly injected
with an existing engine and sessionmaker.
Note that while this is used by oslo.db's own tests of
the enginefacade system, it is also exported for use by
the test suites of other projects, first as an element of the
oslo_db.sqlalchemy.test_base module, and secondly may be used by
external test suites directly.
Includes a feature to inject itself temporarily as the factory
within the global :class:`._TransactionContextManager`.
"""
def __init__(self, engine, maker, apply_global, synchronous_reader):
self._reader_engine = self._writer_engine = engine
self._reader_maker = self._writer_maker = maker
self._started = True
self._legacy_facade = None
self.synchronous_reader = synchronous_reader
self._facade_cfg = _context_manager._factory._facade_cfg
self._transaction_ctx_cfg = \
_context_manager._factory._transaction_ctx_cfg
if apply_global:
self.existing_factory = _context_manager._factory
_context_manager._root_factory = self
def dispose_global(self):
_context_manager._root_factory = self.existing_factory
class _TransactionContext(object):
"""Represent a single database transaction in progress."""
def __init__(
self, factory, global_factory=None,
rollback_reader_sessions=False):
"""Construct a new :class:`.TransactionContext`.
:param factory: the :class:`.TransactionFactory` which will
serve as a source of connectivity.
:param global_factory: the "global" factory which will be used
by the global ``_context_manager`` for new ``_TransactionContext``
objects created under this one. When left as None the actual
"global" factory is used.
:param rollback_reader_sessions: if True, a :class:`.Session` object
will have its :meth:`.Session.rollback` method invoked at the end
of a ``@reader`` block, actively rolling back the transaction and
expiring the objects within, before the :class:`.Session` moves
on to be closed, which has the effect of releasing connection
resources back to the connection pool and detaching all objects.
If False, the :class:`.Session` is
not affected at the end of a ``@reader`` block; the underlying
connection referred to by this :class:`.Session` will still
be released in the enclosing context via the :meth:`.Session.close`
method, which still ensures that the DBAPI connection is rolled
back, however the objects associated with the :class:`.Session`
retain their database-persisted contents after they are detached.
.. seealso::
http://docs.sqlalchemy.org/en/rel_0_9/glossary.html#term-released\
SQLAlchemy documentation on what "releasing resources" means.
"""
self.factory = factory
self.global_factory = global_factory
self.mode = None
self.session = None
self.connection = None
self.transaction = None
kw = self.factory._transaction_ctx_cfg
self.rollback_reader_sessions = kw['rollback_reader_sessions']
@contextlib.contextmanager
def _connection(self, savepoint=False):
if self.connection is None:
try:
if self.session is not None:
# use existing session, which is outer to us
self.connection = self.session.connection()
if savepoint:
with self.connection.begin_nested():
yield self.connection
else:
yield self.connection
else:
# is outermost
self.connection = self.factory._create_connection(
mode=self.mode)
self.transaction = self.connection.begin()
try:
yield self.connection
self._end_connection_transaction(self.transaction)
except Exception:
self.transaction.rollback()
# TODO(zzzeek) do we need save_and_reraise() here,
# or do newer eventlets not have issues? we are using
# raw "raise" in many other places in oslo.db already
# (and one six.reraise()).
raise
finally:
self.transaction = None
self.connection.close()
finally:
self.connection = None
else:
# use existing connection, which is outer to us
if savepoint:
with self.connection.begin_nested():
yield self.connection
else:
yield self.connection
@contextlib.contextmanager
def _session(self, savepoint=False):
if self.session is None:
self.session = self.factory._create_session(
bind=self.connection, mode=self.mode)
try:
self.session.begin()
yield self.session
self._end_session_transaction(self.session)
except Exception:
self.session.rollback()
# TODO(zzzeek) do we need save_and_reraise() here,
# or do newer eventlets not have issues? we are using
# raw "raise" in many other places in oslo.db already
# (and one six.reraise()).
raise
finally:
self.session.close()
self.session = None
else:
# use existing session, which is outer to us
if savepoint:
with self.session.begin_nested():
yield self.session
else:
yield self.session
def _end_session_transaction(self, session):
if self.mode is _WRITER:
session.commit()
elif self.rollback_reader_sessions:
session.rollback()
# In the absense of calling session.rollback(),
# the next call is session.close(). This releases all
# objects from the session into the detached state, and
# releases the connection as well; the connection when returned
# to the pool is either rolled back in any case, or closed fully.
def _end_connection_transaction(self, transaction):
if self.mode is _WRITER:
transaction.commit()
else:
transaction.rollback()
def _produce_block(self, mode, connection, savepoint):
if mode is _WRITER:
self._writer()
elif mode is _ASYNC_READER:
self._async_reader()
else:
self._reader()
if connection:
return self._connection(savepoint)
else:
return self._session(savepoint)
def _writer(self):
if self.mode is None:
self.mode = _WRITER
elif self.mode is _READER:
raise TypeError(
"Can't upgrade a READER transaction "
"to a WRITER mid-transaction")
elif self.mode is _ASYNC_READER:
raise TypeError(
"Can't upgrade an ASYNC_READER transaction "
"to a WRITER mid-transaction")
def _reader(self):
if self.mode is None:
self.mode = _READER
elif self.mode is _ASYNC_READER:
raise TypeError(
"Can't upgrade an ASYNC_READER transaction "
"to a READER mid-transaction")
def _async_reader(self):
if self.mode is None:
self.mode = _ASYNC_READER
class _TransactionContextTLocal(threading.local):
def __deepcopy__(self, memo):
return self
class _TransactionContextManager(object):
"""Provide context-management and decorator patterns for transactions.
This object integrates user-defined "context" objects with the
:class:`._TransactionContext` class, on behalf of a
contained :class:`._TransactionFactory`.
"""
def __init__(
self, root=None,
mode=None,
independent=False,
savepoint=False,
connection=False,
replace_global_factory=None,
_is_global_manager=False):
if root is None:
self._root = self
self._root_factory = _TransactionFactory()
else:
self._root = root
self._replace_global_factory = replace_global_factory
self._is_global_manager = _is_global_manager
self._mode = mode
self._independent = independent
self._savepoint = savepoint
if self._savepoint and self._independent:
raise TypeError(
"setting savepoint and independent makes no sense.")
self._connection = connection
@property
def _factory(self):
"""The :class:`._TransactionFactory` associated with this context."""
return self._root._root_factory
def configure(self, **kw):
"""Apply configurational options to the factory.
This method can only be called before any specific
transaction-beginning methods have been called.
"""
self._factory.configure(**kw)
@property
def replace(self):
"""Modifier to replace the global transaction factory with this one."""
return self._clone(replace_global_factory=self._factory)
@property
def writer(self):
"""Modifier to set the transaction to WRITER."""
return self._clone(mode=_WRITER)
@property
def reader(self):
"""Modifier to set the transaction to READER."""
return self._clone(mode=_READER)
@property
def independent(self):
"""Modifier to start a transaction independent from any enclosing."""
return self._clone(independent=True)
@property
def savepoint(self):
"""Modifier to start a SAVEPOINT if a transaction already exists."""
return self._clone(savepoint=True)
@property
def connection(self):
"""Modifier to return a core Connection object instead of Session."""
return self._clone(connection=True)
@property
def async(self):
"""Modifier to set a READER operation to ASYNC_READER."""
if self._mode is _WRITER:
raise TypeError("Setting async on a WRITER makes no sense")
return self._clone(mode=_ASYNC_READER)
def using(self, context):
"""Provide a context manager block that will use the given context."""
return self._transaction_scope(context)
def __call__(self, fn):
"""Decorate a function."""
@functools.wraps(fn)
def wrapper(*args, **kwargs):
context = args[0]
with self._transaction_scope(context):
return fn(*args, **kwargs)
return wrapper
def _clone(self, **kw):
default_kw = {
"independent": self._independent,
"mode": self._mode,
"connection": self._connection
}
default_kw.update(kw)
return _TransactionContextManager(root=self._root, **default_kw)
@contextlib.contextmanager
def _transaction_scope(self, context):
new_transaction = self._independent
transaction_contexts_by_thread = \
_transaction_contexts_by_thread(context)
current = restore = getattr(
transaction_contexts_by_thread, "current", None)
use_factory = self._factory
global_factory = None
if self._replace_global_factory:
use_factory = global_factory = self._replace_global_factory
elif current is not None and current.global_factory:
global_factory = current.global_factory
if self._root._is_global_manager:
use_factory = global_factory
if current is not None and (
new_transaction or current.factory is not use_factory
):
current = None
if current is None:
current = transaction_contexts_by_thread.current = \
_TransactionContext(
use_factory, global_factory=global_factory,
**use_factory._transaction_ctx_cfg)
try:
if self._mode is not None:
with current._produce_block(
mode=self._mode,
connection=self._connection,
savepoint=self._savepoint) as resource:
yield resource
else:
yield
finally:
if restore is None:
del transaction_contexts_by_thread.current
elif current is not restore:
transaction_contexts_by_thread.current = restore
def _context_descriptor(attr=None):
getter = operator.attrgetter(attr)
def _property_for_context(context):
try:
transaction_context = context.transaction_ctx
except exception.NoEngineContextEstablished:
raise exception.NoEngineContextEstablished(
"No TransactionContext is established for "
"this %s object within the current thread; "
"the %r attribute is unavailable."
% (context, attr)
)
else:
return getter(transaction_context)
return property(_property_for_context)
def _transaction_ctx_for_context(context):
by_thread = _transaction_contexts_by_thread(context)
try:
return by_thread.current
except AttributeError:
raise exception.NoEngineContextEstablished(
"No TransactionContext is established for "
"this %s object within the current thread. "
% context
)
def _transaction_contexts_by_thread(context):
transaction_contexts_by_thread = getattr(
context, '_enginefacade_context', None)
if transaction_contexts_by_thread is None:
transaction_contexts_by_thread = \
context._enginefacade_context = _TransactionContextTLocal()
return transaction_contexts_by_thread
def transaction_context_provider(klass):
"""Decorate a class with ``session`` and ``connection`` attributes."""
setattr(
klass,
'transaction_ctx',
property(_transaction_ctx_for_context))
# Graft transaction context attributes as context properties
for attr in ('session', 'connection', 'transaction'):
setattr(klass, attr, _context_descriptor(attr))
return klass
# apply the context descriptors to oslo.context.RequestContext
transaction_context_provider(oslo_context.RequestContext)
_context_manager = _TransactionContextManager(_is_global_manager=True)
"""default context manager."""
def transaction_context():
"""Construct a local transaction context.
"""
return _TransactionContextManager()
def configure(**kw):
"""Apply configurational options to the global factory.
This method can only be called before any specific transaction-beginning
methods have been called.
.. seealso::
:meth:`._TransactionFactory.configure`
"""
_context_manager._factory.configure(**kw)
def get_legacy_facade():
"""Return a :class:`.LegacyEngineFacade` for the global factory.
This facade will make use of the same engine and sessionmaker
as this factory, however will not share the same transaction context;
the legacy facade continues to work the old way of returning
a new Session each time get_session() is called.
"""
return _context_manager._factory.get_legacy_facade()
reader = _context_manager.reader
"""The global 'reader' starting point."""
writer = _context_manager.writer
"""The global 'writer' starting point."""
class LegacyEngineFacade(object):
"""A helper class for removing of global engine instances from oslo.db.
.. deprecated::
EngineFacade is deprecated. Please use
oslo.db.sqlalchemy.enginefacade for new development.
As a library, oslo.db can't decide where to store/when to create engine
and sessionmaker instances, so this must be left for a target application.
On the other hand, in order to simplify the adoption of oslo.db changes,
we'll provide a helper class, which creates engine and sessionmaker
on its instantiation and provides get_engine()/get_session() methods
that are compatible with corresponding utility functions that currently
exist in target projects, e.g. in Nova.
engine/sessionmaker instances will still be global (and they are meant to
be global), but they will be stored in the app context, rather that in the
oslo.db context.
Two important things to remember:
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
"""
def __init__(self, sql_connection, slave_connection=None,
sqlite_fk=False, autocommit=True,
expire_on_commit=False, _conf=None, _factory=None, **kwargs):
"""Initialize engine and sessionmaker instances.
:param sql_connection: the connection string for the database to use
:type sql_connection: string
:param slave_connection: the connection string for the 'slave' database
to use. If not provided, the master database
will be used for all operations. Note: this
is meant to be used for offloading of read
operations to asynchronously replicated slaves
to reduce the load on the master database.
:type slave_connection: string
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
Keyword arguments:
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.
-1=Off, 0=None, 100=Everything (defaults
to 0)
:keyword max_pool_size: maximum number of SQL connections to keep open
in a pool (defaults to SQLAlchemy settings)
:keyword max_overflow: if set, use this value for max_overflow with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword pool_timeout: if set, use this value for pool_timeout with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword sqlite_synchronous: if True, SQLite uses synchronous mode
(defaults to True)
:keyword connection_trace: add python stack traces to SQL as comment
strings (defaults to False)
:keyword max_retries: maximum db connection retries during startup.
(setting -1 implies an infinite retry count)
(defaults to 10)
:keyword retry_interval: interval between retries of opening a sql
connection (defaults to 10)
:keyword thread_checkin: boolean that indicates that between each
engine checkin event a sleep(0) will occur to
allow other greenthreads to run (defaults to
True)
"""
warnings.warn(
"EngineFacade is deprecated; please use "
"oslo.db.sqlalchemy.enginefacade",
exception.OsloDBDeprecationWarning,
stacklevel=2)
if _factory:
self._factory = _factory
else:
self._factory = _TransactionFactory()
self._factory.configure(
sqlite_fk=sqlite_fk,
__autocommit=autocommit,
expire_on_commit=expire_on_commit,
**kwargs
)
# make sure passed-in urls are favored over that
# of config
self._factory._start(
_conf, connection=sql_connection,
slave_connection=slave_connection)
def get_engine(self, use_slave=False):
"""Get the engine instance (note, that it's shared).
:param use_slave: if possible, use 'slave' database for this engine.
If the connection string for the slave database
wasn't provided, 'master' engine will be returned.
(defaults to False)
:type use_slave: bool
"""
if use_slave:
return self._factory._reader_engine
else:
return self._factory._writer_engine
def get_session(self, use_slave=False, **kwargs):
"""Get a Session instance.
:param use_slave: if possible, use 'slave' database connection for
this session. If the connection string for the
slave database wasn't provided, a session bound
to the 'master' engine will be returned.
(defaults to False)
:type use_slave: bool
Keyword arugments will be passed to a sessionmaker instance as is (if
passed, they will override the ones used when the sessionmaker instance
was created). See SQLAlchemy Session docs for details.
"""
if use_slave:
return self._factory._reader_maker(**kwargs)
else:
return self._factory._writer_maker(**kwargs)
@classmethod
def from_config(cls, conf,
sqlite_fk=False, autocommit=True, expire_on_commit=False):
"""Initialize EngineFacade using oslo.config config instance options.
:param conf: oslo.config config instance
:type conf: oslo.config.cfg.ConfigOpts
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
return cls(
None,
sqlite_fk=sqlite_fk,
autocommit=autocommit,
expire_on_commit=expire_on_commit, _conf=conf)

View File

@ -0,0 +1,413 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Core SQLAlchemy connectivity routines.
"""
import itertools
import logging
import os
import re
import time
import six
import sqlalchemy
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import pool
from sqlalchemy.sql.expression import select
from oslo_db._i18n import _LW
from oslo_db import exception
from oslo_db.sqlalchemy import exc_filters
from oslo_db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch.
"""
time.sleep(0)
def _connect_ping_listener(connection, branch):
"""Ping the server at connection startup.
Ping the server at transaction begin and transparently reconnect
if a disconnect exception occurs.
"""
if branch:
return
# turn off "close with result". This can also be accomplished
# by branching the connection, however just setting the flag is
# more performant and also doesn't get involved with some
# connection-invalidation awkardness that occurs (see
# https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
save_should_close_with_result = connection.should_close_with_result
connection.should_close_with_result = False
try:
# run a SELECT 1. use a core select() so that
# any details like that needed by Oracle, DB2 etc. are handled.
connection.scalar(select([1]))
except exception.DBConnectionError:
# catch DBConnectionError, which is raised by the filter
# system.
# disconnect detected. The connection is now
# "invalid", but the pool should be ready to return
# new connections assuming they are good now.
# run the select again to re-validate the Connection.
connection.scalar(select([1]))
finally:
connection.should_close_with_result = save_should_close_with_result
def _setup_logging(connection_debug=0):
"""setup_logging function maps SQL debug level to Python log level.
Connection_debug is a verbosity of SQL debugging information.
0=None(default value),
1=Processed only messages with WARNING level or higher
50=Processed only messages with INFO level or higher
100=Processed only messages with DEBUG level
"""
if connection_debug >= 0:
logger = logging.getLogger('sqlalchemy.engine')
if connection_debug >= 100:
logger.setLevel(logging.DEBUG)
elif connection_debug >= 50:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
connection_trace=False, max_retries=10, retry_interval=10,
thread_checkin=True, logging_name=None):
"""Return a new SQLAlchemy engine."""
url = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
"pool_recycle": idle_timeout,
'convert_unicode': True,
'connect_args': {},
'logging_name': logging_name
}
_setup_logging(connection_debug)
_init_connection_args(
url, engine_args,
sqlite_fk=sqlite_fk,
max_pool_size=max_pool_size,
max_overflow=max_overflow,
pool_timeout=pool_timeout
)
engine = sqlalchemy.create_engine(url, **engine_args)
_init_events(
engine,
mysql_sql_mode=mysql_sql_mode,
sqlite_synchronous=sqlite_synchronous,
sqlite_fk=sqlite_fk,
thread_checkin=thread_checkin,
connection_trace=connection_trace
)
# register alternate exception handler
exc_filters.register_engine(engine)
# register engine connect handler
event.listen(engine, "engine_connect", _connect_ping_listener)
# initial connect + test
# NOTE(viktors): the current implementation of _test_connection()
# does nothing, if max_retries == 0, so we can skip it
if max_retries:
test_conn = _test_connection(engine, max_retries, retry_interval)
test_conn.close()
return engine
@utils.dispatch_for_dialect('*', multiple=True)
def _init_connection_args(
url, engine_args,
max_pool_size=None, max_overflow=None, pool_timeout=None, **kw):
pool_class = url.get_dialect().get_pool_class(url)
if issubclass(pool_class, pool.QueuePool):
if max_pool_size is not None:
engine_args['pool_size'] = max_pool_size
if max_overflow is not None:
engine_args['max_overflow'] = max_overflow
if pool_timeout is not None:
engine_args['pool_timeout'] = pool_timeout
@_init_connection_args.dispatch_for("sqlite")
def _init_connection_args(url, engine_args, **kw):
pool_class = url.get_dialect().get_pool_class(url)
# singletonthreadpool is used for :memory: connections;
# replace it with StaticPool.
if issubclass(pool_class, pool.SingletonThreadPool):
engine_args["poolclass"] = pool.StaticPool
engine_args['connect_args']['check_same_thread'] = False
@_init_connection_args.dispatch_for("postgresql")
def _init_connection_args(url, engine_args, **kw):
if 'client_encoding' not in url.query:
# Set encoding using engine_args instead of connect_args since
# it's supported for PostgreSQL 8.*. More details at:
# http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html
engine_args['client_encoding'] = 'utf8'
@_init_connection_args.dispatch_for("mysql")
def _init_connection_args(url, engine_args, **kw):
if 'charset' not in url.query:
engine_args['connect_args']['charset'] = 'utf8'
@_init_connection_args.dispatch_for("mysql+mysqlconnector")
def _init_connection_args(url, engine_args, **kw):
# mysqlconnector engine (<1.0) incorrectly defaults to
# raise_on_warnings=True
# https://bitbucket.org/zzzeek/sqlalchemy/issue/2515
if 'raise_on_warnings' not in url.query:
engine_args['connect_args']['raise_on_warnings'] = False
@_init_connection_args.dispatch_for("mysql+mysqldb")
@_init_connection_args.dispatch_for("mysql+oursql")
def _init_connection_args(url, engine_args, **kw):
# Those drivers require use_unicode=0 to avoid performance drop due
# to internal usage of Python unicode objects in the driver
# http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html
if 'use_unicode' not in url.query:
engine_args['connect_args']['use_unicode'] = 0
@utils.dispatch_for_dialect('*', multiple=True)
def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
"""Set up event listeners for all database backends."""
_add_process_guards(engine)
if connection_trace:
_add_trace_comments(engine)
if thread_checkin:
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
@_init_events.dispatch_for("mysql")
def _init_events(engine, mysql_sql_mode=None, **kw):
"""Set up event listeners for MySQL."""
if mysql_sql_mode is not None:
@sqlalchemy.event.listens_for(engine, "connect")
def _set_session_sql_mode(dbapi_con, connection_rec):
cursor = dbapi_con.cursor()
cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode])
@sqlalchemy.event.listens_for(engine, "first_connect")
def _check_effective_sql_mode(dbapi_con, connection_rec):
if mysql_sql_mode is not None:
_set_session_sql_mode(dbapi_con, connection_rec)
cursor = dbapi_con.cursor()
cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
realmode = cursor.fetchone()
if realmode is None:
LOG.warning(_LW('Unable to detect effective SQL mode'))
else:
realmode = realmode[1]
LOG.debug('MySQL server mode set to %s', realmode)
if 'TRADITIONAL' not in realmode.upper() and \
'STRICT_ALL_TABLES' not in realmode.upper():
LOG.warning(
_LW(
"MySQL SQL mode is '%s', "
"consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
realmode)
@_init_events.dispatch_for("sqlite")
def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):
"""Set up event listeners for SQLite.
This includes several settings made on connections as they are
created, as well as transactional control extensions.
"""
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(six.text_type(item)) is not None
@sqlalchemy.event.listens_for(engine, "connect")
def _sqlite_connect_events(dbapi_con, con_record):
# Add REGEXP functionality on SQLite connections
dbapi_con.create_function('regexp', 2, regexp)
if not sqlite_synchronous:
# Switch sqlite connections to non-synchronous mode
dbapi_con.execute("PRAGMA synchronous = OFF")
# Disable pysqlite's emitting of the BEGIN statement entirely.
# Also stops it from emitting COMMIT before any DDL.
# below, we emit BEGIN ourselves.
# see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\
# sqlite.html#serializable-isolation-savepoints-transactional-ddl
dbapi_con.isolation_level = None
if sqlite_fk:
# Ensures that the foreign key constraints are enforced in SQLite.
dbapi_con.execute('pragma foreign_keys=ON')
@sqlalchemy.event.listens_for(engine, "begin")
def _sqlite_emit_begin(conn):
# emit our own BEGIN, checking for existing
# transactional state
if 'in_transaction' not in conn.info:
conn.execute("BEGIN")
conn.info['in_transaction'] = True
@sqlalchemy.event.listens_for(engine, "rollback")
@sqlalchemy.event.listens_for(engine, "commit")
def _sqlite_end_transaction(conn):
# remove transactional marker
conn.info.pop('in_transaction', None)
def _test_connection(engine, max_retries, retry_interval):
if max_retries == -1:
attempts = itertools.count()
else:
attempts = six.moves.range(max_retries)
# See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for
# why we are not using 'de' directly (it can be removed from the local
# scope).
de_ref = None
for attempt in attempts:
try:
return engine.connect()
except exception.DBConnectionError as de:
msg = _LW('SQL connection failed. %s attempts left.')
LOG.warning(msg, max_retries - attempt)
time.sleep(retry_interval)
de_ref = de
else:
if de_ref is not None:
six.reraise(type(de_ref), de_ref)
def _add_process_guards(engine):
"""Add multiprocessing guards.
Forces a connection to be reconnected if it is detected
as having been shared to a sub-process.
"""
@sqlalchemy.event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
connection_record.info['pid'] = os.getpid()
@sqlalchemy.event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
LOG.debug(_LW(
"Parent process %(orig)s forked (%(newproc)s) with an open "
"database connection, "
"which is being discarded and recreated."),
{"newproc": pid, "orig": connection_record.info['pid']})
connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
(connection_record.info['pid'], pid)
)
def _add_trace_comments(engine):
"""Add trace comments.
Augment statements with a trace of the immediate calling code
for a given statement.
"""
import os
import sys
import traceback
target_paths = set([
os.path.dirname(sys.modules['oslo_db'].__file__),
os.path.dirname(sys.modules['sqlalchemy'].__file__)
])
skip_paths = set([
os.path.dirname(sys.modules['oslo_db.tests'].__file__),
])
@sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True)
def before_cursor_execute(conn, cursor, statement, parameters, context,
executemany):
# NOTE(zzzeek) - if different steps per DB dialect are desirable
# here, switch out on engine.name for now.
stack = traceback.extract_stack()
our_line = None
for idx, (filename, line, method, function) in enumerate(stack):
for tgt in skip_paths:
if filename.startswith(tgt):
break
else:
for tgt in target_paths:
if filename.startswith(tgt):
our_line = idx
break
if our_line:
break
if our_line:
trace = "; ".join(
"File: %s (%s) %s" % (
line[0], line[1], line[2]
)
# include three lines of context.
for line in stack[our_line - 3:our_line]
)
statement = "%s -- %s" % (statement, trace)
return statement, parameters

66
oslo_db/sqlalchemy/orm.py Normal file
View File

@ -0,0 +1,66 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""SQLAlchemy ORM connectivity and query structures.
"""
from oslo_utils import timeutils
import sqlalchemy.orm
from sqlalchemy.sql.expression import literal_column
from oslo_db.sqlalchemy import update_match
class Query(sqlalchemy.orm.query.Query):
"""Subclass of sqlalchemy.query with soft_delete() method."""
def soft_delete(self, synchronize_session='evaluate'):
return self.update({'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()},
synchronize_session=synchronize_session)
def update_returning_pk(self, values, surrogate_key):
"""Perform an UPDATE, returning the primary key of the matched row.
This is a method-version of
oslo_db.sqlalchemy.update_match.update_returning_pk(); see that
function for usage details.
"""
return update_match.update_returning_pk(self, values, surrogate_key)
def update_on_match(self, specimen, surrogate_key, values, **kw):
"""Emit an UPDATE statement matching the given specimen.
This is a method-version of
oslo_db.sqlalchemy.update_match.update_on_match(); see that function
for usage details.
"""
return update_match.update_on_match(
self, specimen, surrogate_key, values, **kw)
class Session(sqlalchemy.orm.session.Session):
"""oslo.db-specific Session subclass."""
def get_maker(engine, autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
class_=Session,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
query_cls=Query)

View File

@ -18,96 +18,71 @@
Recommended ways to use sessions within this framework:
* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
`model_query()` will implicitly use a session when called without one
supplied. This is the ideal situation because it will allow queries
to be automatically retried if the database connection is interrupted.
.. note:: Automatic retry will be enabled in a future patch.
It is generally fine to issue several queries in a row like this. Even though
they may be run in separate transactions and/or separate sessions, each one
will see the data from the prior calls. If needed, undo- or rollback-like
functionality should be handled at a logical level. For an example, look at
the code around quotas and `reservation_rollback()`.
Examples:
* Use the ``enginefacade`` system for connectivity, session and
transaction management:
.. code-block:: python
from oslo.db.sqlalchemy import enginefacade
@enginefacade.reader
def get_foo(context, foo):
return (model_query(context, models.Foo).
return (model_query(models.Foo, context.session).
filter_by(foo=foo).
first())
@enginefacade.writer
def update_foo(context, id, newfoo):
(model_query(context, models.Foo).
(model_query(models.Foo, context.session).
filter_by(id=id).
update({'foo': newfoo}))
@enginefacade.writer
def create_foo(context, values):
foo_ref = models.Foo()
foo_ref.update(values)
foo_ref.save()
foo_ref.save(context.session)
return foo_ref
* Within the scope of a single method, keep all the reads and writes within
the context managed by a single session. In this way, the session's
`__exit__` handler will take care of calling `flush()` and `commit()` for
you. If using this approach, you should not explicitly call `flush()` or
`commit()`. Any error within the context of the session will cause the
session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
raised in `session`'s `__exit__` handler, and any try/except within the
context managed by `session` will not be triggered. And catching other
non-database errors in the session will not trigger the ROLLBACK, so
exception handlers should always be outside the session, unless the
developer wants to do a partial commit on purpose. If the connection is
dropped before this is possible, the database will implicitly roll back the
transaction.
In the above system, transactions are committed automatically, and
are shared among all dependent database methods. Ensure
that methods which "write" data are enclosed within @writer blocks.
.. note:: Statements in the session scope will not be automatically retried.
If you create models within the session, they need to be added, but you
* If you create models within the session, they need to be added, but you
do not need to call `model.save()`:
.. code-block:: python
@enginefacade.writer
def create_many_foo(context, foos):
session = sessionmaker()
with session.begin():
for foo in foos:
foo_ref = models.Foo()
foo_ref.update(foo)
session.add(foo_ref)
for foo in foos:
foo_ref = models.Foo()
foo_ref.update(foo)
context.session.add(foo_ref)
@enginefacade.writer
def update_bar(context, foo_id, newbar):
session = sessionmaker()
with session.begin():
foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id).
first())
(model_query(context, models.Bar, session).
filter_by(id=foo_ref['bar_id']).
update({'bar': newbar}))
foo_ref = (model_query(models.Foo, context.session).
filter_by(id=foo_id).
first())
(model_query(models.Bar, context.session).
filter_by(id=foo_ref['bar_id']).
update({'bar': newbar}))
.. note:: `update_bar` is a trivially simple example of using
``with session.begin``. Whereas `create_many_foo` is a good example of
when a transaction is needed, it is always best to use as few queries as
possible.
The two queries in `update_bar` can be better expressed using a single query
which avoids the need for an explicit transaction. It can be expressed like
so:
The two queries in `update_bar` can alternatively be expressed using
a single query, which may be more efficient depending on scenario:
.. code-block:: python
@enginefacade.writer
def update_bar(context, foo_id, newbar):
subq = (model_query(context, models.Foo.id).
subq = (model_query(models.Foo.id, context.session).
filter_by(id=foo_id).
limit(1).
subquery())
(model_query(context, models.Bar).
(model_query(models.Bar, context.session).
filter_by(id=subq.as_scalar()).
update({'bar': newbar}))
@ -119,87 +94,54 @@ Recommended ways to use sessions within this framework:
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
exception while using ``with session.begin``. Here create two duplicate
exception while using a savepoint. Here we create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code-block:: python
@enginefacade.writer
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
session = sessionmaker()
try:
with session.begin():
with context.session.begin_nested():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
* Passing an active session between methods. Sessions should only be passed
to private methods. The private method must use a subtransaction; otherwise
SQLAlchemy will throw an error when you call `session.begin()` on an existing
transaction. Public methods should not accept a session parameter and should
not be involved in sessions within the caller's scope.
Note that this incurs more overhead in SQLAlchemy than the above means
due to nesting transactions, and it is not possible to implicitly retry
failed database operations when using this approach.
This also makes code somewhat more difficult to read and debug, because a
single database transaction spans more than one method. Error handling
becomes less clear in this situation. When this is needed for code clarity,
it should be clearly documented.
* The enginefacade system eliminates the need to decide when sessions need
to be passed between methods. All methods should instead share a common
context object; the enginefacade system will maintain the transaction
across method calls.
.. code-block:: python
def myfunc(foo):
session = sessionmaker()
with session.begin():
# do some database things
bar = _private_func(foo, session)
@enginefacade.writer
def myfunc(context, foo):
# do some database things
bar = _private_func(context, foo)
return bar
def _private_func(foo, session=None):
if not session:
session = sessionmaker()
with session.begin(subtransaction=True):
def _private_func(context, foo):
with enginefacade.using_writer(context) as session:
# do some other database things
session.add(SomeObject())
return bar
There are some things which it is best to avoid:
* Don't keep a transaction open any longer than necessary.
This means that your ``with session.begin()`` block should be as short
as possible, while still containing all the related calls for that
transaction.
* Avoid ``with_lockmode('UPDATE')`` when possible.
In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
any rows, it will take a gap-lock. This is a form of write-lock on the
"gap" where no rows exist, and prevents any other writes to that space.
This can effectively prevent any INSERT into a table by locking the gap
at the end of the index. Similar problems will occur if the SELECT FOR UPDATE
has an overly broad WHERE clause, or doesn't properly use an index.
One idea proposed at ODS Fall '12 was to use a normal SELECT to test the
number of rows matching a query, and if only one row is returned,
then issue the SELECT FOR UPDATE.
The better long-term solution is to use
``INSERT .. ON DUPLICATE KEY UPDATE``.
However, this can not be done until the "deleted" columns are removed and
proper UNIQUE constraints are added to the tables.
FOR UPDATE is not compatible with MySQL/Galera. Instead, an "opportunistic"
approach should be used, such that if an UPDATE fails, the entire
transaction should be retried. The @wrap_db_retry decorator is one
such system that can be used to achieve this.
Enabling soft deletes:
* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
to your model class. For example:
* To use/enable soft-deletes, `SoftDeleteMixin` may be used. For example:
.. code-block:: python
@ -209,698 +151,46 @@ Enabling soft deletes:
Efficient use of soft deletes:
* There are two possible ways to mark a record as deleted:
`model.soft_delete()` and `query.soft_delete()`.
The `model.soft_delete()` method works with a single already-fetched entry.
`query.soft_delete()` makes only one db request for all entries that
correspond to the query.
* In almost all cases you should use `query.soft_delete()`. Some examples:
* While there is a ``model.soft_delete()`` method, prefer
``query.soft_delete()``. Some examples:
.. code-block:: python
def soft_delete_bar():
count = model_query(BarModel).find(some_condition).soft_delete()
@enginefacade.writer
def soft_delete_bar(context):
# synchronize_session=False will prevent the ORM from attempting
# to search the Session for instances matching the DELETE;
# this is typically not necessary for small operations.
count = model_query(BarModel, context.session).\\
find(some_condition).soft_delete(synchronize_session=False)
if count == 0:
raise Exception("0 entries were soft deleted")
def complex_soft_delete_with_synchronization_bar(session=None):
if session is None:
session = sessionmaker()
with session.begin(subtransactions=True):
count = (model_query(BarModel).
find(some_condition).
soft_delete(synchronize_session=True))
# Here synchronize_session is required, because we
# don't know what is going on in outer session.
if count == 0:
raise Exception("0 entries were soft deleted")
@enginefacade.writer
def complex_soft_delete_with_synchronization_bar(context):
# use synchronize_session='evaluate' when you'd like to attempt
# to update the state of the Session to match that of the DELETE.
# This is potentially helpful if the operation is complex and
# continues to work with instances that were loaded, though
# not usually needed.
count = (model_query(BarModel, context.session).
find(some_condition).
soft_delete(synchronize_session='evaulate'))
if count == 0:
raise Exception("0 entries were soft deleted")
* There is only one situation where `model.soft_delete()` is appropriate: when
you fetch a single record, work with it, and mark it as deleted in the same
transaction.
.. code-block:: python
def soft_delete_bar_model():
session = sessionmaker()
with session.begin():
bar_ref = model_query(BarModel).find(some_condition).first()
# Work with bar_ref
bar_ref.soft_delete(session=session)
However, if you need to work with all entries that correspond to query and
then soft delete them you should use the `query.soft_delete()` method:
.. code-block:: python
def soft_delete_multi_models():
session = sessionmaker()
with session.begin():
query = (model_query(BarModel, session=session).
find(some_condition))
model_refs = query.all()
# Work with model_refs
query.soft_delete(synchronize_session=False)
# synchronize_session=False should be set if there is no outer
# session and these entries are not used after this.
When working with many rows, it is very important to use query.soft_delete,
which issues a single query. Using `model.soft_delete()`, as in the following
example, is very inefficient.
.. code-block:: python
for bar_ref in bar_refs:
bar_ref.soft_delete(session=session)
# This will produce count(bar_refs) db requests.
"""
import itertools
import logging
import os
import re
import time
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import orm
from oslo_utils import timeutils
import six
from sqlalchemy import event
from sqlalchemy import exc
import sqlalchemy.orm
from sqlalchemy import pool
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import select
EngineFacade = enginefacade.LegacyEngineFacade
create_engine = engines.create_engine
get_maker = orm.get_maker
Query = orm.Query
Session = orm.Session
from oslo_db._i18n import _LW
from oslo_db import exception
from oslo_db import options
from oslo_db.sqlalchemy import exc_filters
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch.
"""
time.sleep(0)
def _connect_ping_listener(connection, branch):
"""Ping the server at connection startup.
Ping the server at transaction begin and transparently reconnect
if a disconnect exception occurs.
"""
if branch:
return
# turn off "close with result". This can also be accomplished
# by branching the connection, however just setting the flag is
# more performant and also doesn't get involved with some
# connection-invalidation awkardness that occurs (see
# https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
save_should_close_with_result = connection.should_close_with_result
connection.should_close_with_result = False
try:
# run a SELECT 1. use a core select() so that
# any details like that needed by Oracle, DB2 etc. are handled.
connection.scalar(select([1]))
except exception.DBConnectionError:
# catch DBConnectionError, which is raised by the filter
# system.
# disconnect detected. The connection is now
# "invalid", but the pool should be ready to return
# new connections assuming they are good now.
# run the select again to re-validate the Connection.
connection.scalar(select([1]))
finally:
connection.should_close_with_result = save_should_close_with_result
def _setup_logging(connection_debug=0):
"""setup_logging function maps SQL debug level to Python log level.
Connection_debug is a verbosity of SQL debugging information.
0=None(default value),
1=Processed only messages with WARNING level or higher
50=Processed only messages with INFO level or higher
100=Processed only messages with DEBUG level
"""
if connection_debug >= 0:
logger = logging.getLogger('sqlalchemy.engine')
if connection_debug >= 100:
logger.setLevel(logging.DEBUG)
elif connection_debug >= 50:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
connection_trace=False, max_retries=10, retry_interval=10,
thread_checkin=True, logging_name=None):
"""Return a new SQLAlchemy engine."""
url = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
"pool_recycle": idle_timeout,
'convert_unicode': True,
'connect_args': {},
'logging_name': logging_name
}
_setup_logging(connection_debug)
_init_connection_args(
url, engine_args,
sqlite_fk=sqlite_fk,
max_pool_size=max_pool_size,
max_overflow=max_overflow,
pool_timeout=pool_timeout
)
engine = sqlalchemy.create_engine(url, **engine_args)
_init_events(
engine,
mysql_sql_mode=mysql_sql_mode,
sqlite_synchronous=sqlite_synchronous,
sqlite_fk=sqlite_fk,
thread_checkin=thread_checkin,
connection_trace=connection_trace
)
# register alternate exception handler
exc_filters.register_engine(engine)
# register engine connect handler
event.listen(engine, "engine_connect", _connect_ping_listener)
# initial connect + test
# NOTE(viktors): the current implementation of _test_connection()
# does nothing, if max_retries == 0, so we can skip it
if max_retries:
test_conn = _test_connection(engine, max_retries, retry_interval)
test_conn.close()
return engine
@utils.dispatch_for_dialect('*', multiple=True)
def _init_connection_args(
url, engine_args,
max_pool_size=None, max_overflow=None, pool_timeout=None, **kw):
pool_class = url.get_dialect().get_pool_class(url)
if issubclass(pool_class, pool.QueuePool):
if max_pool_size is not None:
engine_args['pool_size'] = max_pool_size
if max_overflow is not None:
engine_args['max_overflow'] = max_overflow
if pool_timeout is not None:
engine_args['pool_timeout'] = pool_timeout
@_init_connection_args.dispatch_for("sqlite")
def _init_connection_args(url, engine_args, **kw):
pool_class = url.get_dialect().get_pool_class(url)
# singletonthreadpool is used for :memory: connections;
# replace it with StaticPool.
if issubclass(pool_class, pool.SingletonThreadPool):
engine_args["poolclass"] = pool.StaticPool
engine_args['connect_args']['check_same_thread'] = False
@_init_connection_args.dispatch_for("postgresql")
def _init_connection_args(url, engine_args, **kw):
if 'client_encoding' not in url.query:
# Set encoding using engine_args instead of connect_args since
# it's supported for PostgreSQL 8.*. More details at:
# http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html
engine_args['client_encoding'] = 'utf8'
@_init_connection_args.dispatch_for("mysql")
def _init_connection_args(url, engine_args, **kw):
if 'charset' not in url.query:
engine_args['connect_args']['charset'] = 'utf8'
@_init_connection_args.dispatch_for("mysql+mysqlconnector")
def _init_connection_args(url, engine_args, **kw):
# mysqlconnector engine (<1.0) incorrectly defaults to
# raise_on_warnings=True
# https://bitbucket.org/zzzeek/sqlalchemy/issue/2515
if 'raise_on_warnings' not in url.query:
engine_args['connect_args']['raise_on_warnings'] = False
@_init_connection_args.dispatch_for("mysql+mysqldb")
@_init_connection_args.dispatch_for("mysql+oursql")
def _init_connection_args(url, engine_args, **kw):
# Those drivers require use_unicode=0 to avoid performance drop due
# to internal usage of Python unicode objects in the driver
# http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html
if 'use_unicode' not in url.query:
engine_args['connect_args']['use_unicode'] = 0
@utils.dispatch_for_dialect('*', multiple=True)
def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
"""Set up event listeners for all database backends."""
_add_process_guards(engine)
if connection_trace:
_add_trace_comments(engine)
if thread_checkin:
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
@_init_events.dispatch_for("mysql")
def _init_events(engine, mysql_sql_mode=None, **kw):
"""Set up event listeners for MySQL."""
if mysql_sql_mode is not None:
@sqlalchemy.event.listens_for(engine, "connect")
def _set_session_sql_mode(dbapi_con, connection_rec):
cursor = dbapi_con.cursor()
cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode])
@sqlalchemy.event.listens_for(engine, "first_connect")
def _check_effective_sql_mode(dbapi_con, connection_rec):
if mysql_sql_mode is not None:
_set_session_sql_mode(dbapi_con, connection_rec)
cursor = dbapi_con.cursor()
cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
realmode = cursor.fetchone()
if realmode is None:
LOG.warning(_LW('Unable to detect effective SQL mode'))
else:
realmode = realmode[1]
LOG.debug('MySQL server mode set to %s', realmode)
if 'TRADITIONAL' not in realmode.upper() and \
'STRICT_ALL_TABLES' not in realmode.upper():
LOG.warning(
_LW(
"MySQL SQL mode is '%s', "
"consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
realmode)
@_init_events.dispatch_for("sqlite")
def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):
"""Set up event listeners for SQLite.
This includes several settings made on connections as they are
created, as well as transactional control extensions.
"""
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(six.text_type(item)) is not None
@sqlalchemy.event.listens_for(engine, "connect")
def _sqlite_connect_events(dbapi_con, con_record):
# Add REGEXP functionality on SQLite connections
dbapi_con.create_function('regexp', 2, regexp)
if not sqlite_synchronous:
# Switch sqlite connections to non-synchronous mode
dbapi_con.execute("PRAGMA synchronous = OFF")
# Disable pysqlite's emitting of the BEGIN statement entirely.
# Also stops it from emitting COMMIT before any DDL.
# below, we emit BEGIN ourselves.
# see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\
# sqlite.html#serializable-isolation-savepoints-transactional-ddl
dbapi_con.isolation_level = None
if sqlite_fk:
# Ensures that the foreign key constraints are enforced in SQLite.
dbapi_con.execute('pragma foreign_keys=ON')
@sqlalchemy.event.listens_for(engine, "begin")
def _sqlite_emit_begin(conn):
# emit our own BEGIN, checking for existing
# transactional state
if 'in_transaction' not in conn.info:
conn.execute("BEGIN")
conn.info['in_transaction'] = True
@sqlalchemy.event.listens_for(engine, "rollback")
@sqlalchemy.event.listens_for(engine, "commit")
def _sqlite_end_transaction(conn):
# remove transactional marker
conn.info.pop('in_transaction', None)
def _test_connection(engine, max_retries, retry_interval):
if max_retries == -1:
attempts = itertools.count()
else:
attempts = six.moves.range(max_retries)
# See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for
# why we are not using 'de' directly (it can be removed from the local
# scope).
de_ref = None
for attempt in attempts:
try:
return engine.connect()
except exception.DBConnectionError as de:
msg = _LW('SQL connection failed. %s attempts left.')
LOG.warning(msg, max_retries - attempt)
time.sleep(retry_interval)
de_ref = de
else:
if de_ref is not None:
six.reraise(type(de_ref), de_ref)
class Query(sqlalchemy.orm.query.Query):
"""Subclass of sqlalchemy.query with soft_delete() method."""
def soft_delete(self, synchronize_session='evaluate'):
return self.update({'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()},
synchronize_session=synchronize_session)
def update_returning_pk(self, values, surrogate_key):
"""Perform an UPDATE, returning the primary key of the matched row.
This is a method-version of
oslo_db.sqlalchemy.update_match.update_returning_pk(); see that
function for usage details.
"""
return update_match.update_returning_pk(self, values, surrogate_key)
def update_on_match(self, specimen, surrogate_key, values, **kw):
"""Emit an UPDATE statement matching the given specimen.
This is a method-version of
oslo_db.sqlalchemy.update_match.update_on_match(); see that function
for usage details.
"""
return update_match.update_on_match(
self, specimen, surrogate_key, values, **kw)
class Session(sqlalchemy.orm.session.Session):
"""Custom Session class to avoid SqlAlchemy Session monkey patching."""
def get_maker(engine, autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
class_=Session,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
query_cls=Query)
def _add_process_guards(engine):
"""Add multiprocessing guards.
Forces a connection to be reconnected if it is detected
as having been shared to a sub-process.
"""
@sqlalchemy.event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
connection_record.info['pid'] = os.getpid()
@sqlalchemy.event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
LOG.debug(_LW(
"Parent process %(orig)s forked (%(newproc)s) with an open "
"database connection, "
"which is being discarded and recreated."),
{"newproc": pid, "orig": connection_record.info['pid']})
connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
(connection_record.info['pid'], pid)
)
def _add_trace_comments(engine):
"""Add trace comments.
Augment statements with a trace of the immediate calling code
for a given statement.
"""
import os
import sys
import traceback
target_paths = set([
os.path.dirname(sys.modules['oslo_db'].__file__),
os.path.dirname(sys.modules['sqlalchemy'].__file__)
])
@sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True)
def before_cursor_execute(conn, cursor, statement, parameters, context,
executemany):
# NOTE(zzzeek) - if different steps per DB dialect are desirable
# here, switch out on engine.name for now.
stack = traceback.extract_stack()
our_line = None
for idx, (filename, line, method, function) in enumerate(stack):
for tgt in target_paths:
if filename.startswith(tgt):
our_line = idx
break
if our_line:
break
if our_line:
trace = "; ".join(
"File: %s (%s) %s" % (
line[0], line[1], line[2]
)
# include three lines of context.
for line in stack[our_line - 3:our_line]
)
statement = "%s -- %s" % (statement, trace)
return statement, parameters
class EngineFacade(object):
"""A helper class for removing of global engine instances from oslo.db.
As a library, oslo.db can't decide where to store/when to create engine
and sessionmaker instances, so this must be left for a target application.
On the other hand, in order to simplify the adoption of oslo.db changes,
we'll provide a helper class, which creates engine and sessionmaker
on its instantiation and provides get_engine()/get_session() methods
that are compatible with corresponding utility functions that currently
exist in target projects, e.g. in Nova.
engine/sessionmaker instances will still be global (and they are meant to
be global), but they will be stored in the app context, rather that in the
oslo.db context.
Note: using of this helper is completely optional and you are encouraged to
integrate engine/sessionmaker instances into your apps any way you like
(e.g. one might want to bind a session to a request context). Two important
things to remember:
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
"""
def __init__(self, sql_connection, slave_connection=None,
sqlite_fk=False, autocommit=True,
expire_on_commit=False, **kwargs):
"""Initialize engine and sessionmaker instances.
:param sql_connection: the connection string for the database to use
:type sql_connection: string
:param slave_connection: the connection string for the 'slave' database
to use. If not provided, the master database
will be used for all operations. Note: this
is meant to be used for offloading of read
operations to asynchronously replicated slaves
to reduce the load on the master database.
:type slave_connection: string
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
Keyword arguments:
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.
-1=Off, 0=None, 100=Everything (defaults
to 0)
:keyword max_pool_size: maximum number of SQL connections to keep open
in a pool (defaults to SQLAlchemy settings)
:keyword max_overflow: if set, use this value for max_overflow with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword pool_timeout: if set, use this value for pool_timeout with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword sqlite_synchronous: if True, SQLite uses synchronous mode
(defaults to True)
:keyword connection_trace: add python stack traces to SQL as comment
strings (defaults to False)
:keyword max_retries: maximum db connection retries during startup.
(setting -1 implies an infinite retry count)
(defaults to 10)
:keyword retry_interval: interval between retries of opening a sql
connection (defaults to 10)
:keyword thread_checkin: boolean that indicates that between each
engine checkin event a sleep(0) will occur to
allow other greenthreads to run (defaults to
True)
"""
super(EngineFacade, self).__init__()
engine_kwargs = {
'sqlite_fk': sqlite_fk,
'mysql_sql_mode': kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
'idle_timeout': kwargs.get('idle_timeout', 3600),
'connection_debug': kwargs.get('connection_debug', 0),
'max_pool_size': kwargs.get('max_pool_size'),
'max_overflow': kwargs.get('max_overflow'),
'pool_timeout': kwargs.get('pool_timeout'),
'sqlite_synchronous': kwargs.get('sqlite_synchronous', True),
'connection_trace': kwargs.get('connection_trace', False),
'max_retries': kwargs.get('max_retries', 10),
'retry_interval': kwargs.get('retry_interval', 10),
'thread_checkin': kwargs.get('thread_checkin', True)
}
maker_kwargs = {
'autocommit': autocommit,
'expire_on_commit': expire_on_commit
}
self._engine = create_engine(sql_connection=sql_connection,
**engine_kwargs)
self._session_maker = get_maker(engine=self._engine,
**maker_kwargs)
if slave_connection:
self._slave_engine = create_engine(sql_connection=slave_connection,
**engine_kwargs)
self._slave_session_maker = get_maker(engine=self._slave_engine,
**maker_kwargs)
else:
self._slave_engine = None
self._slave_session_maker = None
def get_engine(self, use_slave=False):
"""Get the engine instance (note, that it's shared).
:param use_slave: if possible, use 'slave' database for this engine.
If the connection string for the slave database
wasn't provided, 'master' engine will be returned.
(defaults to False)
:type use_slave: bool
"""
if use_slave and self._slave_engine:
return self._slave_engine
return self._engine
def get_session(self, use_slave=False, **kwargs):
"""Get a Session instance.
:param use_slave: if possible, use 'slave' database connection for
this session. If the connection string for the
slave database wasn't provided, a session bound
to the 'master' engine will be returned.
(defaults to False)
:type use_slave: bool
Keyword arugments will be passed to a sessionmaker instance as is (if
passed, they will override the ones used when the sessionmaker instance
was created). See SQLAlchemy Session docs for details.
"""
if use_slave and self._slave_session_maker:
return self._slave_session_maker(**kwargs)
return self._session_maker(**kwargs)
@classmethod
def from_config(cls, conf,
sqlite_fk=False, autocommit=True, expire_on_commit=False):
"""Initialize EngineFacade using oslo.config config instance options.
:param conf: oslo.config config instance
:type conf: oslo.config.cfg.ConfigOpts
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
conf.register_opts(options.database_opts, 'database')
return cls(sql_connection=conf.database.connection,
slave_connection=conf.database.slave_connection,
sqlite_fk=sqlite_fk,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
mysql_sql_mode=conf.database.mysql_sql_mode,
idle_timeout=conf.database.idle_timeout,
connection_debug=conf.database.connection_debug,
max_pool_size=conf.database.max_pool_size,
max_overflow=conf.database.max_overflow,
pool_timeout=conf.database.pool_timeout,
sqlite_synchronous=conf.database.sqlite_synchronous,
connection_trace=conf.database.connection_trace,
max_retries=conf.database.max_retries,
retry_interval=conf.database.retry_interval)
__all__ = ["EngineFacade", "create_engine", "get_maker", "Query", "Session"]

View File

@ -28,6 +28,7 @@ import os
import six
from oslo_db import exception
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import utils
@ -72,9 +73,15 @@ class DbFixture(fixtures.Fixture):
else:
self.test.engine = self.test.db.engine
self.test.sessionmaker = session.get_maker(self.test.engine)
self.addCleanup(setattr, self.test, 'sessionmaker', None)
self.addCleanup(setattr, self.test, 'engine', None)
self.test.enginefacade = enginefacade._TestTransactionFactory(
self.test.engine, self.test.sessionmaker, apply_global=True,
synchronous_reader=True)
self.addCleanup(self.test.enginefacade.dispose_global)
class DbTestCase(test_base.BaseTestCase):
"""Base class for testing of DB code.

View File

@ -25,8 +25,8 @@ from sqlalchemy.orm import mapper
from oslo.db import exception
from oslo.db.sqlalchemy import exc_filters
from oslo.db.sqlalchemy import test_base
from oslo_db.sqlalchemy import session as private_session
from oslo_db.tests.old_import_api import utils as test_utils
from oslo_db.sqlalchemy import engines
from oslo_db.tests import utils as test_utils
_TABLE_NAME = '__tmp__test__tmp__'
@ -720,7 +720,7 @@ class TestDBDisconnected(TestsExceptionFilter):
engine = self.engine
event.listen(
engine, "engine_connect", private_session._connect_ping_listener)
engine, "engine_connect", engines._connect_ping_listener)
real_do_execute = engine.dialect.do_execute
counter = itertools.count(1)
@ -816,7 +816,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
with self._dbapi_fixture(dialect_name):
with mock.patch.object(engine.dialect, "connect", cant_connect):
return private_session._test_connection(engine, retries, .01)
return engines._test_connection(engine, retries, .01)
def test_connect_no_retries(self):
conn = self._run_test(

View File

@ -29,11 +29,11 @@ from sqlalchemy import Integer, String
from sqlalchemy.ext.declarative import declarative_base
from oslo.db import exception
from oslo.db import options as db_options
from oslo.db.sqlalchemy import models
from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import test_base
from oslo_db import options as db_options
from oslo_db.sqlalchemy import session as private_session
from oslo_db.sqlalchemy import engines
BASE = declarative_base()
@ -300,8 +300,8 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
self.assertFalse(ses.autocommit)
self.assertTrue(ses.expire_on_commit)
@mock.patch('oslo_db.sqlalchemy.session.get_maker')
@mock.patch('oslo_db.sqlalchemy.session.create_engine')
@mock.patch('oslo_db.sqlalchemy.orm.get_maker')
@mock.patch('oslo_db.sqlalchemy.engines.create_engine')
def test_creation_from_config(self, create_engine, get_maker):
conf = cfg.ConfigOpts()
conf.register_opts(db_options.database_opts, group='database')
@ -339,6 +339,42 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
autocommit=False,
expire_on_commit=True)
@mock.patch('oslo_db.sqlalchemy.orm.get_maker')
@mock.patch('oslo_db.sqlalchemy.engines.create_engine')
def test_passed_in_url_overrides_conf(self, create_engine, get_maker):
conf = cfg.ConfigOpts()
conf.register_opts(db_options.database_opts, group='database')
overrides = {
'connection': 'sqlite:///conf_db_setting',
'connection_debug': 100,
'max_pool_size': 10,
'mysql_sql_mode': 'TRADITIONAL',
}
for optname, optvalue in overrides.items():
conf.set_override(optname, optvalue, group='database')
session.EngineFacade(
"sqlite:///override_sql",
**dict(conf.database.items())
)
create_engine.assert_called_once_with(
sql_connection='sqlite:///override_sql',
connection_debug=100,
max_pool_size=10,
mysql_sql_mode='TRADITIONAL',
sqlite_fk=False,
idle_timeout=mock.ANY,
retry_interval=mock.ANY,
max_retries=mock.ANY,
max_overflow=mock.ANY,
connection_trace=mock.ANY,
sqlite_synchronous=mock.ANY,
pool_timeout=mock.ANY,
thread_checkin=mock.ANY,
)
def test_slave_connection(self):
paths = self.create_tempfiles([('db.master', ''), ('db.slave', '')],
ext='')
@ -483,9 +519,7 @@ class MysqlConnectTest(test_base.MySQLOpportunisticTestCase):
)
)
):
private_session._init_events.dispatch_on_drivername("mysql")(
test_engine
)
engines._init_events.dispatch_on_drivername("mysql")(test_engine)
test_engine.raw_connection()
self.assertIn('Unable to detect effective SQL mode',
@ -553,7 +587,7 @@ class PatchStacktraceTest(test_base.DbTestCase):
with mock.patch("traceback.extract_stack", side_effect=extract_stack):
private_session._add_trace_comments(engine)
engines._add_trace_comments(engine)
conn = engine.connect()
with mock.patch.object(engine.dialect, "do_execute") as mock_exec:

File diff suppressed because it is too large Load Diff

View File

@ -23,8 +23,8 @@ from sqlalchemy import event
from sqlalchemy.orm import mapper
from oslo_db import exception
from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import exc_filters
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import test_base
from oslo_db.tests import utils as test_utils
@ -784,7 +784,7 @@ class TestDBDisconnected(TestsExceptionFilter):
dialect_name, exception, num_disconnects, is_disconnect=True):
engine = self.engine
event.listen(engine, "engine_connect", session._connect_ping_listener)
event.listen(engine, "engine_connect", engines._connect_ping_listener)
real_do_execute = engine.dialect.do_execute
counter = itertools.count(1)
@ -895,7 +895,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
with self._dbapi_fixture(dialect_name):
with mock.patch.object(engine.dialect, "connect", cant_connect):
return session._test_connection(engine, retries, .01)
return engines._test_connection(engine, retries, .01)
def test_connect_no_retries(self):
conn = self._run_test(
@ -967,7 +967,7 @@ class TestDBConnectPingWrapping(TestsExceptionFilter):
def setUp(self):
super(TestDBConnectPingWrapping, self).setUp()
event.listen(
self.engine, "engine_connect", session._connect_ping_listener)
self.engine, "engine_connect", engines._connect_ping_listener)
@contextlib.contextmanager
def _fixture(

View File

@ -31,6 +31,7 @@ from sqlalchemy.ext.declarative import declarative_base
from oslo_db import exception
from oslo_db import options as db_options
from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import test_base
@ -312,8 +313,8 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
self.assertFalse(ses.autocommit)
self.assertTrue(ses.expire_on_commit)
@mock.patch('oslo_db.sqlalchemy.session.get_maker')
@mock.patch('oslo_db.sqlalchemy.session.create_engine')
@mock.patch('oslo_db.sqlalchemy.orm.get_maker')
@mock.patch('oslo_db.sqlalchemy.engines.create_engine')
def test_creation_from_config(self, create_engine, get_maker):
conf = cfg.ConfigOpts()
conf.register_opts(db_options.database_opts, group='database')
@ -495,7 +496,7 @@ class MysqlConnectTest(test_base.MySQLOpportunisticTestCase):
)
)
):
session._init_events.dispatch_on_drivername("mysql")(test_engine)
engines._init_events.dispatch_on_drivername("mysql")(test_engine)
test_engine.raw_connection()
self.assertIn('Unable to detect effective SQL mode',
@ -556,7 +557,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.args = {'connect_args': {}}
def test_queuepool_args(self):
session._init_connection_args(
engines._init_connection_args(
url.make_url("mysql://u:p@host/test"), self.args,
max_pool_size=10, max_overflow=10)
self.assertEqual(self.args['pool_size'], 10)
@ -564,7 +565,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def test_sqlite_memory_pool_args(self):
for _url in ("sqlite://", "sqlite:///:memory:"):
session._init_connection_args(
engines._init_connection_args(
url.make_url(_url), self.args,
max_pool_size=10, max_overflow=10)
@ -581,7 +582,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.assertTrue('poolclass' in self.args)
def test_sqlite_file_pool_args(self):
session._init_connection_args(
engines._init_connection_args(
url.make_url("sqlite:///somefile.db"), self.args,
max_pool_size=10, max_overflow=10)
@ -597,37 +598,37 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.assertTrue('poolclass' not in self.args)
def test_mysql_connect_args_default(self):
session._init_connection_args(
engines._init_connection_args(
url.make_url("mysql://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_mysql_oursql_connect_args_default(self):
session._init_connection_args(
engines._init_connection_args(
url.make_url("mysql+oursql://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_mysql_mysqldb_connect_args_default(self):
session._init_connection_args(
engines._init_connection_args(
url.make_url("mysql+mysqldb://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_postgresql_connect_args_default(self):
session._init_connection_args(
engines._init_connection_args(
url.make_url("postgresql://u:p@host/test"), self.args)
self.assertEqual(self.args['client_encoding'], 'utf8')
self.assertFalse(self.args['connect_args'])
def test_mysqlconnector_raise_on_warnings_default(self):
session._init_connection_args(
engines._init_connection_args(
url.make_url("mysql+mysqlconnector://u:p@host/test"),
self.args)
self.assertEqual(self.args['connect_args']['raise_on_warnings'], False)
def test_mysqlconnector_raise_on_warnings_override(self):
session._init_connection_args(
engines._init_connection_args(
url.make_url(
"mysql+mysqlconnector://u:p@host/test"
"?raise_on_warnings=true"),
@ -639,12 +640,12 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def test_thread_checkin(self):
with mock.patch("sqlalchemy.event.listens_for"):
with mock.patch("sqlalchemy.event.listen") as listen_evt:
session._init_events.dispatch_on_drivername(
engines._init_events.dispatch_on_drivername(
"sqlite")(mock.Mock())
self.assertEqual(
listen_evt.mock_calls[0][1][-1],
session._thread_yield
engines._thread_yield
)
@ -693,7 +694,7 @@ class PatchStacktraceTest(test_base.DbTestCase):
with mock.patch("traceback.extract_stack", side_effect=extract_stack):
session._add_trace_comments(engine)
engines._add_trace_comments(engine)
conn = engine.connect()
orig_do_exec = engine.dialect.do_execute
with mock.patch.object(engine.dialect, "do_execute") as mock_exec: