db: use new EngineFacade feature of oslo.db
The new EngineFacade system is the recommended way of working with engine and sessions in oslo.db since 1.12.0. Most notable features are: - transparent thread-safe initialization of EngineFacade instance - transaction scope defined by developer-specified context (no need to pass Session instances to `private` DB API methods as long as some kind of context is established: e.g. a Web-request context or a module thread-local variable) - declarative reader/writer transactions separation (e.g. allows to perform read-only transactions on asynchronous DB replicas) More details on EngineFacade: http://specs.openstack.org/openstack/oslo-specs/specs/kilo/make-enginefacade-a-facade.html Co-Authored-By: Victor Sergeyev <vsergeyev@mirantis.com> Change-Id: I288e1e8e8c242520f12862f8ed6f3579687c5120
This commit is contained in:
parent
9106a1694e
commit
a3082b07a0
|
@ -13,6 +13,7 @@
|
|||
from logging import config as log_config
|
||||
|
||||
from alembic import context
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
try:
|
||||
# NOTE(whaom): This is to register the DB2 alembic code which
|
||||
|
@ -21,7 +22,6 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
from ironic.db.sqlalchemy import api as sqla_api
|
||||
from ironic.db.sqlalchemy import models
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
|
@ -50,7 +50,7 @@ def run_migrations_online():
|
|||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.get_legacy_facade().get_engine()
|
||||
with engine.connect() as connection:
|
||||
context.configure(connection=connection,
|
||||
target_metadata=target_metadata)
|
||||
|
|
|
@ -18,10 +18,11 @@
|
|||
|
||||
import collections
|
||||
import datetime
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import session as db_session
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_db.sqlalchemy import utils as db_utils
|
||||
from oslo_log import log
|
||||
from oslo_utils import strutils
|
||||
|
@ -45,24 +46,7 @@ CONF.import_opt('heartbeat_timeout',
|
|||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
_FACADE = None
|
||||
|
||||
|
||||
def _create_facade_lazily():
|
||||
global _FACADE
|
||||
if _FACADE is None:
|
||||
_FACADE = db_session.EngineFacade.from_config(CONF)
|
||||
return _FACADE
|
||||
|
||||
|
||||
def get_engine():
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_engine()
|
||||
|
||||
|
||||
def get_session(**kwargs):
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_session(**kwargs)
|
||||
_CONTEXT = threading.local()
|
||||
|
||||
|
||||
def get_backend():
|
||||
|
@ -70,15 +54,23 @@ def get_backend():
|
|||
return Connection()
|
||||
|
||||
|
||||
def _session_for_read():
|
||||
return enginefacade.reader.using(_CONTEXT)
|
||||
|
||||
|
||||
def _session_for_write():
|
||||
return enginefacade.writer.using(_CONTEXT)
|
||||
|
||||
|
||||
def model_query(model, *args, **kwargs):
|
||||
"""Query helper for simpler session usage.
|
||||
|
||||
:param session: if present, the session to use
|
||||
"""
|
||||
|
||||
session = kwargs.get('session') or get_session()
|
||||
query = session.query(model, *args)
|
||||
return query
|
||||
with _session_for_read() as session:
|
||||
query = session.query(model, *args)
|
||||
return query
|
||||
|
||||
|
||||
def add_identity_filter(query, value):
|
||||
|
@ -215,9 +207,8 @@ class Connection(api.Connection):
|
|||
sort_key, sort_dir, query)
|
||||
|
||||
def reserve_node(self, tag, node_id):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Node, session=session)
|
||||
with _session_for_write():
|
||||
query = model_query(models.Node)
|
||||
query = add_identity_filter(query, node_id)
|
||||
# be optimistic and assume we usually create a reservation
|
||||
count = query.filter_by(reservation=None).update(
|
||||
|
@ -234,9 +225,8 @@ class Connection(api.Connection):
|
|||
raise exception.NodeNotFound(node_id)
|
||||
|
||||
def release_node(self, tag, node_id):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Node, session=session)
|
||||
with _session_for_write():
|
||||
query = model_query(models.Node)
|
||||
query = add_identity_filter(query, node_id)
|
||||
# be optimistic and assume we usually release a reservation
|
||||
count = query.filter_by(reservation=tag).update(
|
||||
|
@ -263,17 +253,19 @@ class Connection(api.Connection):
|
|||
|
||||
node = models.Node()
|
||||
node.update(values)
|
||||
try:
|
||||
node.save()
|
||||
except db_exc.DBDuplicateEntry as exc:
|
||||
if 'name' in exc.columns:
|
||||
raise exception.DuplicateName(name=values['name'])
|
||||
elif 'instance_uuid' in exc.columns:
|
||||
raise exception.InstanceAssociated(
|
||||
instance_uuid=values['instance_uuid'],
|
||||
node=values['uuid'])
|
||||
raise exception.NodeAlreadyExists(uuid=values['uuid'])
|
||||
return node
|
||||
with _session_for_write() as session:
|
||||
try:
|
||||
session.add(node)
|
||||
session.flush()
|
||||
except db_exc.DBDuplicateEntry as exc:
|
||||
if 'name' in exc.columns:
|
||||
raise exception.DuplicateName(name=values['name'])
|
||||
elif 'instance_uuid' in exc.columns:
|
||||
raise exception.InstanceAssociated(
|
||||
instance_uuid=values['instance_uuid'],
|
||||
node=values['uuid'])
|
||||
raise exception.NodeAlreadyExists(uuid=values['uuid'])
|
||||
return node
|
||||
|
||||
def get_node_by_id(self, node_id):
|
||||
query = model_query(models.Node).filter_by(id=node_id)
|
||||
|
@ -311,9 +303,8 @@ class Connection(api.Connection):
|
|||
return result
|
||||
|
||||
def destroy_node(self, node_id):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Node, session=session)
|
||||
with _session_for_write():
|
||||
query = model_query(models.Node)
|
||||
query = add_identity_filter(query, node_id)
|
||||
|
||||
try:
|
||||
|
@ -326,7 +317,7 @@ class Connection(api.Connection):
|
|||
if uuidutils.is_uuid_like(node_id):
|
||||
node_id = node_ref['id']
|
||||
|
||||
port_query = model_query(models.Port, session=session)
|
||||
port_query = model_query(models.Port)
|
||||
port_query = add_port_filter_by_node(port_query, node_id)
|
||||
port_query.delete()
|
||||
|
||||
|
@ -353,9 +344,8 @@ class Connection(api.Connection):
|
|||
raise e
|
||||
|
||||
def _do_update_node(self, node_id, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Node, session=session)
|
||||
with _session_for_write():
|
||||
query = model_query(models.Node)
|
||||
query = add_identity_filter(query, node_id)
|
||||
try:
|
||||
ref = query.with_lockmode('update').one()
|
||||
|
@ -419,15 +409,18 @@ class Connection(api.Connection):
|
|||
def create_port(self, values):
|
||||
if not values.get('uuid'):
|
||||
values['uuid'] = uuidutils.generate_uuid()
|
||||
|
||||
port = models.Port()
|
||||
port.update(values)
|
||||
try:
|
||||
port.save()
|
||||
except db_exc.DBDuplicateEntry as exc:
|
||||
if 'address' in exc.columns:
|
||||
raise exception.MACAlreadyExists(mac=values['address'])
|
||||
raise exception.PortAlreadyExists(uuid=values['uuid'])
|
||||
return port
|
||||
with _session_for_write() as session:
|
||||
try:
|
||||
session.add(port)
|
||||
session.flush()
|
||||
except db_exc.DBDuplicateEntry as exc:
|
||||
if 'address' in exc.columns:
|
||||
raise exception.MACAlreadyExists(mac=values['address'])
|
||||
raise exception.PortAlreadyExists(uuid=values['uuid'])
|
||||
return port
|
||||
|
||||
def update_port(self, port_id, values):
|
||||
# NOTE(dtantsur): this can lead to very strange errors
|
||||
|
@ -435,13 +428,13 @@ class Connection(api.Connection):
|
|||
msg = _("Cannot overwrite UUID for an existing Port.")
|
||||
raise exception.InvalidParameterValue(err=msg)
|
||||
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
query = model_query(models.Port, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = model_query(models.Port)
|
||||
query = add_port_filter(query, port_id)
|
||||
ref = query.one()
|
||||
ref.update(values)
|
||||
session.flush()
|
||||
except NoResultFound:
|
||||
raise exception.PortNotFound(port=port_id)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
|
@ -449,9 +442,8 @@ class Connection(api.Connection):
|
|||
return ref
|
||||
|
||||
def destroy_port(self, port_id):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Port, session=session)
|
||||
with _session_for_write():
|
||||
query = model_query(models.Port)
|
||||
query = add_port_filter(query, port_id)
|
||||
count = query.delete()
|
||||
if count == 0:
|
||||
|
@ -479,13 +471,16 @@ class Connection(api.Connection):
|
|||
def create_chassis(self, values):
|
||||
if not values.get('uuid'):
|
||||
values['uuid'] = uuidutils.generate_uuid()
|
||||
|
||||
chassis = models.Chassis()
|
||||
chassis.update(values)
|
||||
try:
|
||||
chassis.save()
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.ChassisAlreadyExists(uuid=values['uuid'])
|
||||
return chassis
|
||||
with _session_for_write() as session:
|
||||
try:
|
||||
session.add(chassis)
|
||||
session.flush()
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.ChassisAlreadyExists(uuid=values['uuid'])
|
||||
return chassis
|
||||
|
||||
def update_chassis(self, chassis_id, values):
|
||||
# NOTE(dtantsur): this can lead to very strange errors
|
||||
|
@ -493,9 +488,8 @@ class Connection(api.Connection):
|
|||
msg = _("Cannot overwrite UUID for an existing Chassis.")
|
||||
raise exception.InvalidParameterValue(err=msg)
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Chassis, session=session)
|
||||
with _session_for_write():
|
||||
query = model_query(models.Chassis)
|
||||
query = add_identity_filter(query, chassis_id)
|
||||
|
||||
count = query.update(values)
|
||||
|
@ -505,20 +499,19 @@ class Connection(api.Connection):
|
|||
return ref
|
||||
|
||||
def destroy_chassis(self, chassis_id):
|
||||
def chassis_not_empty(session):
|
||||
def chassis_not_empty():
|
||||
"""Checks whether the chassis does not have nodes."""
|
||||
|
||||
query = model_query(models.Node, session=session)
|
||||
query = model_query(models.Node)
|
||||
query = add_node_filter_by_chassis(query, chassis_id)
|
||||
|
||||
return query.count() != 0
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
if chassis_not_empty(session):
|
||||
with _session_for_write():
|
||||
if chassis_not_empty():
|
||||
raise exception.ChassisNotEmpty(chassis=chassis_id)
|
||||
|
||||
query = model_query(models.Chassis, session=session)
|
||||
query = model_query(models.Chassis)
|
||||
query = add_identity_filter(query, chassis_id)
|
||||
|
||||
count = query.delete()
|
||||
|
@ -526,9 +519,8 @@ class Connection(api.Connection):
|
|||
raise exception.ChassisNotFound(chassis=chassis_id)
|
||||
|
||||
def register_conductor(self, values, update_existing=False):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = (model_query(models.Conductor, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = (model_query(models.Conductor)
|
||||
.filter_by(hostname=values['hostname']))
|
||||
try:
|
||||
ref = query.one()
|
||||
|
@ -537,12 +529,12 @@ class Connection(api.Connection):
|
|||
conductor=values['hostname'])
|
||||
except NoResultFound:
|
||||
ref = models.Conductor()
|
||||
session.add(ref)
|
||||
ref.update(values)
|
||||
# always set online and updated_at fields when registering
|
||||
# a conductor, especially when updating an existing one
|
||||
ref.update({'updated_at': timeutils.utcnow(),
|
||||
'online': True})
|
||||
ref.save(session)
|
||||
return ref
|
||||
|
||||
def get_conductor(self, hostname):
|
||||
|
@ -554,18 +546,16 @@ class Connection(api.Connection):
|
|||
raise exception.ConductorNotFound(conductor=hostname)
|
||||
|
||||
def unregister_conductor(self, hostname):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = (model_query(models.Conductor, session=session)
|
||||
with _session_for_write():
|
||||
query = (model_query(models.Conductor)
|
||||
.filter_by(hostname=hostname, online=True))
|
||||
count = query.update({'online': False})
|
||||
if count == 0:
|
||||
raise exception.ConductorNotFound(conductor=hostname)
|
||||
|
||||
def touch_conductor(self, hostname):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = (model_query(models.Conductor, session=session)
|
||||
with _session_for_write():
|
||||
query = (model_query(models.Conductor)
|
||||
.filter_by(hostname=hostname))
|
||||
# since we're not changing any other field, manually set updated_at
|
||||
# and since we're heartbeating, make sure that online=True
|
||||
|
@ -575,10 +565,9 @@ class Connection(api.Connection):
|
|||
raise exception.ConductorNotFound(conductor=hostname)
|
||||
|
||||
def clear_node_reservations_for_conductor(self, hostname):
|
||||
session = get_session()
|
||||
nodes = []
|
||||
with session.begin():
|
||||
query = (model_query(models.Node, session=session)
|
||||
with _session_for_write():
|
||||
query = (model_query(models.Node)
|
||||
.filter_by(reservation=hostname))
|
||||
nodes = [node['uuid'] for node in query]
|
||||
query.update({'reservation': None})
|
||||
|
|
|
@ -20,8 +20,8 @@ import alembic
|
|||
from alembic import config as alembic_config
|
||||
import alembic.migration as alembic_migration
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
from ironic.db.sqlalchemy import api as sqla_api
|
||||
from ironic.db.sqlalchemy import models
|
||||
|
||||
|
||||
|
@ -38,7 +38,7 @@ def version(config=None, engine=None):
|
|||
:rtype: string
|
||||
"""
|
||||
if engine is None:
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.get_legacy_facade().get_engine()
|
||||
with engine.connect() as conn:
|
||||
context = alembic_migration.MigrationContext.configure(conn)
|
||||
return context.get_current_revision()
|
||||
|
@ -62,7 +62,7 @@ def create_schema(config=None, engine=None):
|
|||
Can be used for initial installation instead of upgrade('head').
|
||||
"""
|
||||
if engine is None:
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.get_legacy_facade().get_engine()
|
||||
|
||||
# NOTE(viktors): If we will use metadata.create_all() for non empty db
|
||||
# schema, it will only add the new tables, but leave
|
||||
|
|
|
@ -100,13 +100,6 @@ class IronicBase(models.TimestampMixin,
|
|||
d[c.name] = self[c.name]
|
||||
return d
|
||||
|
||||
def save(self, session=None):
|
||||
import ironic.db.sqlalchemy.api as db_api
|
||||
|
||||
if session is None:
|
||||
session = db_api.get_session()
|
||||
|
||||
super(IronicBase, self).save(session)
|
||||
|
||||
Base = declarative_base(cls=IronicBase)
|
||||
|
||||
|
|
|
@ -20,10 +20,10 @@ import shutil
|
|||
|
||||
import fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
from ironic.common import paths
|
||||
from ironic.db import api as dbapi
|
||||
from ironic.db.sqlalchemy import api as sqla_api
|
||||
from ironic.db.sqlalchemy import migration
|
||||
from ironic.db.sqlalchemy import models
|
||||
from ironic.tests import base
|
||||
|
@ -36,13 +36,13 @@ _DB_CACHE = None
|
|||
|
||||
class Database(fixtures.Fixture):
|
||||
|
||||
def __init__(self, db_api, db_migrate, sql_connection,
|
||||
def __init__(self, engine, db_migrate, sql_connection,
|
||||
sqlite_db, sqlite_clean_db):
|
||||
self.sql_connection = sql_connection
|
||||
self.sqlite_db = sqlite_db
|
||||
self.sqlite_clean_db = sqlite_clean_db
|
||||
|
||||
self.engine = db_api.get_engine()
|
||||
self.engine = engine
|
||||
self.engine.dispose()
|
||||
conn = self.engine.connect()
|
||||
if sql_connection == "sqlite://":
|
||||
|
@ -94,7 +94,8 @@ class DbTestCase(base.TestCase):
|
|||
|
||||
global _DB_CACHE
|
||||
if not _DB_CACHE:
|
||||
_DB_CACHE = Database(sqla_api, migration,
|
||||
engine = enginefacade.get_legacy_facade().get_engine()
|
||||
_DB_CACHE = Database(engine, migration,
|
||||
sql_connection=CONF.database.connection,
|
||||
sqlite_db=CONF.database.sqlite_db,
|
||||
sqlite_clean_db='clean.sqlite')
|
||||
|
|
|
@ -39,6 +39,7 @@ import contextlib
|
|||
from alembic import script
|
||||
import mock
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_db.sqlalchemy import test_base
|
||||
from oslo_db.sqlalchemy import test_migrations
|
||||
from oslo_db.sqlalchemy import utils as db_utils
|
||||
|
@ -90,9 +91,9 @@ def _is_backend_avail(backend, user, passwd, database):
|
|||
|
||||
@contextlib.contextmanager
|
||||
def patch_with_engine(engine):
|
||||
with mock.patch(('ironic.db'
|
||||
'.sqlalchemy.api.get_engine')) as patch_migration:
|
||||
patch_migration.return_value = engine
|
||||
with mock.patch.object(enginefacade.get_legacy_facade(),
|
||||
'get_engine') as patch_engine:
|
||||
patch_engine.return_value = engine
|
||||
yield
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue