Add new foreign key utility function to utils

Adds two new functions:

- get_foreign_key_constraint_name() - common function
  used in subprojects which retrieves the name of a
  foreign key constraint given table name / column name
- suspend_fk_constraints_for_col_alter() - given table name and column
  name, locates matching foreign key constraints, drops them,
  yields context, then recreates them as they were.
  By default only runs for NDB thus avoiding expensive
  constraint regeneration on other backends.

Co-authored-by: Mike Bayer <mike_mp@zzzcomputing.com>
Co-authored-by: Paul Bourke <paul.bourke@oracle.com>
Change-Id: I76c0d82fddf38add0ca05be270264b6e2d043f7f
This commit is contained in:
oorgeron 2017-08-09 13:39:59 -06:00 committed by Paul Bourke (pbourke)
parent 4de33ebd50
commit 9585ca8318
2 changed files with 219 additions and 8 deletions

View File

@ -22,6 +22,8 @@ import itertools
import logging
import re
from alembic.migration import MigrationContext
from alembic.operations import Operations
from oslo_utils import timeutils
import six
import sqlalchemy
@ -47,6 +49,7 @@ from sqlalchemy.types import NullType
from oslo_db._i18n import _
from oslo_db import exception
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import ndb
# NOTE(ochuprykov): Add references for backwards compatibility
InvalidSortKey = exception.InvalidSortKey
@ -1067,11 +1070,10 @@ def get_non_innodb_tables(connectable, skip_tables=('migrate_version',
'alembic_version')):
"""Get a list of tables which don't use InnoDB storage engine.
:param connectable: a SQLAlchemy Engine or a Connection instance
:param skip_tables: a list of tables which might have a different
storage engine
"""
:param connectable: a SQLAlchemy Engine or a Connection instance
:param skip_tables: a list of tables which might have a different
storage engine
"""
query_str = """
SELECT table_name
FROM information_schema.tables
@ -1125,6 +1127,98 @@ def get_non_ndbcluster_tables(connectable, skip_tables=None):
return [i[0] for i in nonndbcluster]
def get_foreign_key_constraint_name(engine, table_name, column_name):
"""Find the name of foreign key in a table, given constrained column name.
:param engine: a SQLAlchemy engine (or connection)
:param table_name: name of table which contains the constraint
:param column_name: name of column that is constrained by the foreign key.
:return: the name of the first foreign key constraint which constrains
the given column in the given table.
"""
insp = inspect(engine)
for fk in insp.get_foreign_keys(table_name):
if column_name in fk['constrained_columns']:
return fk['name']
@contextlib.contextmanager
def suspend_fk_constraints_for_col_alter(
engine, table_name, column_name, referents=[]):
"""Detect foreign key constraints, drop, and recreate.
This is used to guard against a column ALTER that on some backends
cannot proceed unless foreign key constraints are not present.
e.g.::
from oslo_db.sqlalchemy.util import (
suspend_fk_constraints_for_col_alter
)
with suspend_fk_constraints_for_col_alter(
migrate_engine, "user_table",
referents=[
"local_user", "nonlocal_user", "project"
]):
user_table.c.domain_id.alter(nullable=False)
:param engine: a SQLAlchemy engine (or connection)
:param table_name: target table name. All foreign key constraints
that refer to the table_name / column_name will be dropped and recreated.
:param column_name: target column name. all foreign key constraints
which refer to this column, either partially or fully, will be dropped
and recreated.
:param referents: sequence of string table names to search for foreign
key constraints. A future version of this function may no longer
require this argument, however for the moment it is required.
"""
if (
not ndb.ndb_status(engine)
):
yield
else:
with engine.connect() as conn:
insp = inspect(conn)
fks = []
for ref_table_name in referents:
for fk in insp.get_foreign_keys(ref_table_name):
assert fk.get('name')
if fk['referred_table'] == table_name and \
column_name in fk['referred_columns']:
fk['source_table'] = ref_table_name
if 'options' not in fk:
fk['options'] = {}
fks.append(fk)
ctx = MigrationContext.configure(conn)
op = Operations(ctx)
for fk in fks:
op.drop_constraint(
fk['name'], fk['source_table'], type_="foreignkey")
yield
for fk in fks:
op.create_foreign_key(
fk['name'], fk['source_table'],
fk['referred_table'],
fk['constrained_columns'],
fk['referred_columns'],
onupdate=fk['options'].get('onupdate'),
ondelete=fk['options'].get('ondelete'),
deferrable=fk['options'].get('deferrable'),
initially=fk['options'].get('initially'),
)
class NonCommittingConnectable(object):
"""A ``Connectable`` substitute which rolls all operations back.

View File

@ -22,7 +22,8 @@ import sqlalchemy
from sqlalchemy.dialects import mysql
from sqlalchemy import Boolean, Index, Integer, DateTime, String, SmallInteger
from sqlalchemy import CheckConstraint
from sqlalchemy import MetaData, Table, Column, ForeignKey
from sqlalchemy import MetaData, Table, Column
from sqlalchemy import ForeignKey, ForeignKeyConstraint
from sqlalchemy.engine import reflection
from sqlalchemy.engine import url as sa_url
from sqlalchemy.exc import OperationalError
@ -30,6 +31,7 @@ from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import mapper
from sqlalchemy.orm import Session
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy.sql.expression import cast
from sqlalchemy.sql import select
from sqlalchemy.types import UserDefinedType, NullType
@ -770,9 +772,124 @@ class TestMigrationUtils(db_test_base.DbTestCase):
# behavior), any integer value can be inserted, otherwise only 1 or 0.
self.engine.execute(table.insert({'deleted': 10}))
def test_get_foreign_key_constraint_name(self):
table_1 = Table('table_name_1', self.meta,
Column('id', Integer, primary_key=True),
Column('deleted', Integer))
table_2 = Table('table_name_2', self.meta,
Column('id', Integer, primary_key=True),
Column('foreign_id', Integer),
ForeignKeyConstraint(['foreign_id'],
['table_name_1.id'],
name='table_name_2_fk1'),
Column('deleted', Integer))
class PostgesqlTestMigrations(TestMigrationUtils,
db_test_base.PostgreSQLOpportunisticTestCase):
self.meta.create_all(tables=[table_1, table_2])
fkc = utils.get_foreign_key_constraint_name(self.engine,
'table_name_2',
'foreign_id')
self.assertEqual(fkc, 'table_name_2_fk1')
@db_test_base.backend_specific('mysql', 'postgresql')
def test_suspend_fk_constraints_for_col_alter(self):
a = Table(
'a', self.meta,
Column('id', Integer, primary_key=True)
)
b = Table(
'b', self.meta,
Column('key', Integer),
Column('archive_id', Integer),
Column('aid', ForeignKey('a.id')),
PrimaryKeyConstraint("key", "archive_id")
)
c = Table(
'c', self.meta,
Column('id', Integer, primary_key=True),
Column('aid', ForeignKey('a.id')),
Column('key', Integer),
Column('archive_id', Integer),
ForeignKeyConstraint(
['key', 'archive_id'], ['b.key', 'b.archive_id'],
name="some_composite_fk")
)
self.meta.create_all(tables=[a, b, c])
def get_fk_entries():
inspector = sqlalchemy.inspect(self.engine)
return sorted(
inspector.get_foreign_keys('b') +
inspector.get_foreign_keys('c'),
key=lambda fk: fk['referred_table']
)
def normalize_fk_entries(fks):
return [{
'name': fk['name'],
'referred_columns': fk['referred_columns'],
'referred_table': fk['referred_table'],
} for fk in fks]
existing_foreign_keys = get_fk_entries()
self.assertEqual(
[{'name': mock.ANY,
'referred_columns': ['id'], 'referred_table': 'a'},
{'name': mock.ANY,
'referred_columns': ['id'], 'referred_table': 'a'},
{'name': 'some_composite_fk',
'referred_columns': ['key', 'archive_id'],
'referred_table': 'b'}],
normalize_fk_entries(existing_foreign_keys)
)
with mock.patch("oslo_db.sqlalchemy.ndb.ndb_status",
mock.Mock(return_value=True)):
with utils.suspend_fk_constraints_for_col_alter(
self.engine, 'a', 'id', referents=['b', 'c']):
no_a_foreign_keys = get_fk_entries()
self.assertEqual(
[{'name': 'some_composite_fk',
'referred_columns': ['key', 'archive_id'],
'referred_table': 'b'}],
normalize_fk_entries(no_a_foreign_keys)
)
self.assertEqual(existing_foreign_keys, get_fk_entries())
with mock.patch("oslo_db.sqlalchemy.ndb.ndb_status",
mock.Mock(return_value=True)):
with utils.suspend_fk_constraints_for_col_alter(
self.engine, 'b', 'archive_id', referents=['c']):
self.assertEqual(
[{'name': mock.ANY,
'referred_columns': ['id'], 'referred_table': 'a'},
{'name': mock.ANY,
'referred_columns': ['id'], 'referred_table': 'a'}],
normalize_fk_entries(get_fk_entries())
)
self.assertEqual(existing_foreign_keys, get_fk_entries())
with utils.suspend_fk_constraints_for_col_alter(
self.engine, 'a', 'id', referents=['b', 'c']):
self.assertEqual(existing_foreign_keys, get_fk_entries())
if self.engine.name == 'mysql':
self.engine.dialect._oslodb_enable_ndb_support = True
self.addCleanup(
setattr, self.engine.dialect, "_oslodb_enable_ndb_support",
False
)
with utils.suspend_fk_constraints_for_col_alter(
self.engine, 'a', 'id', referents=['b', 'c']):
self.assertEqual(no_a_foreign_keys, get_fk_entries())
class PostgresqlTestMigrations(TestMigrationUtils,
db_test_base.PostgreSQLOpportunisticTestCase):
"""Test migrations on PostgreSQL."""
pass