DB migration 209: Clean up child rows as well

Fixes bug 1252891

In order to add missing foreign key constraints, rows that reference a
nonexistant key need to be purged. Unfortunately, DB migration 209 does
not clean up any rows in child tables that themselves have a foreign key
constraint on the tables with the missing foreign keys. This can result
in an integrity violation causing the migration to fail.

When purging the rows, make sure to purge rows in any child tables first.

Change-Id: I25bd899383c72cb0670819b030759630ac89acaa
This commit is contained in:
Johannes Erdfelt 2013-11-25 18:34:00 +00:00
parent 1092acd6b3
commit 7ea026e8d0
2 changed files with 141 additions and 95 deletions

View File

@ -20,71 +20,115 @@ from sqlalchemy.sql.expression import select
from nova.db.sqlalchemy import utils
TABLES = ['compute_nodes', 'services', 'instance_actions', 'instances',
'migrations', 'instance_faults', 'compute_node_stats']
TABLES = [
("compute_node_stats",
('compute_node_id', 'compute_nodes', 'id'),
None),
("compute_nodes",
('service_id', 'services', 'id'),
('compute_node_stats', 'compute_node_id', 'id')),
("instance_actions",
('instance_uuid', 'instances', 'uuid'),
('instance_actions_events', 'action_id', 'id')),
("migrations",
('instance_uuid', 'instances', 'uuid'),
None),
("instance_faults",
('instance_uuid', 'instances', 'uuid'),
None),
]
INDEXES = {
"compute_nodes": (('service_id', 'services', 'id'),),
"instance_actions": (('instance_uuid', 'instances', 'uuid'),),
"migrations": (('instance_uuid', 'instances', 'uuid'),),
"instance_faults": (('instance_uuid', 'instances', 'uuid'),),
"compute_node_stats": (('compute_node_id', 'compute_nodes', 'id'),),
}
def dump_cleanup_rows(engine, meta, table, where):
# Dump rows that match where clause, then delete the rows
dump_table_name = 'dump_' + table.name
table_dump = meta.tables.get(dump_table_name)
if table_dump is None:
columns = [c.copy() for c in table.columns]
table_dump = Table(dump_table_name, meta, *columns,
mysql_engine='InnoDB')
table_dump.create()
engine.execute(utils.InsertFromSelect(table_dump,
table.select().where(where)))
engine.execute(table.delete().where(where))
def upgrade(migrate_engine):
if migrate_engine.name == 'sqlite':
return
meta = MetaData(bind=migrate_engine)
load_tables = dict((table_name, Table(table_name, meta, autoload=True))
for table_name in TABLES)
for table_name, indexes in INDEXES.items():
table = load_tables[table_name]
# Save data that conflicted with FK.
columns = [column.copy() for column in table.columns]
table_dump = Table('dump_' + table_name, meta, *columns,
mysql_engine='InnoDB')
table_dump.create()
for column, ref_table_name, ref_column_name in indexes:
ref_table = load_tables[ref_table_name]
subq = select([getattr(ref_table.c, ref_column_name)]).where(
getattr(ref_table.c, ref_column_name) != None)
sql = utils.InsertFromSelect(table_dump, table.select().where(
~ getattr(table.c, column).in_(subq)))
sql_del = table.delete().where(
~ getattr(table.c, column).in_(subq))
migrate_engine.execute(sql)
migrate_engine.execute(sql_del)
for table_name, ref, child in TABLES:
table = Table(table_name, meta, autoload=True)
params = {'columns': [table.c[column]],
'refcolumns': [ref_table.c[ref_column_name]]}
if migrate_engine.name == 'mysql':
params['name'] = "_".join(('fk', table_name, column))
fkey = ForeignKeyConstraint(**params)
fkey.create()
column_name, ref_table_name, ref_column_name = ref
column = table.c[column_name]
ref_table = Table(ref_table_name, meta, autoload=True)
ref_column = ref_table.c[ref_column_name]
subq = select([ref_column]).where(ref_column != None)
if child:
# Dump and cleanup rows in child table first
child_table_name, child_column_name, child_ref_column_name = child
child_table = Table(child_table_name, meta, autoload=True)
child_column = child_table.c[child_column_name]
child_ref_column = table.c[child_ref_column_name]
child_subq = select([child_ref_column]).where(~ column.in_(subq))
dump_cleanup_rows(migrate_engine, meta, child_table,
child_column.in_(child_subq))
dump_cleanup_rows(migrate_engine, meta, table, ~ column.in_(subq))
params = {'columns': [column], 'refcolumns': [ref_column]}
if migrate_engine.name == 'mysql':
params['name'] = "_".join(('fk', table_name, column_name))
fkey = ForeignKeyConstraint(**params)
fkey.create()
def restore_rows(engine, meta, table_name):
# Restore data that had been dropped in the upgrade
table = Table(table_name, meta, autoload=True)
table_dump_name = 'dump_' + table_name
table_dump = Table(table_dump_name, meta, autoload=True)
sql = utils.InsertFromSelect(table, table_dump.select())
engine.execute(sql)
table_dump.drop()
def downgrade(migrate_engine):
if migrate_engine.name == 'sqlite':
return
meta = MetaData(bind=migrate_engine)
load_tables = dict((table_name, Table(table_name, meta, autoload=True))
for table_name in TABLES)
for table_name, indexes in INDEXES.items():
table = load_tables[table_name]
for column, ref_table_name, ref_column_name in indexes:
ref_table = load_tables[ref_table_name]
params = {'columns': [table.c[column]],
'refcolumns': [ref_table.c[ref_column_name]]}
if migrate_engine.name == 'mysql':
params['name'] = "_".join(('fk', table_name, column))
with migrate_engine.begin():
fkey = ForeignKeyConstraint(**params)
fkey.drop()
for table_name, ref, child in TABLES:
table = Table(table_name, meta, autoload=True)
column_name, ref_table_name, ref_column_name = ref
column = table.c[column_name]
ref_table = Table(ref_table_name, meta, autoload=True)
ref_column = ref_table.c[ref_column_name]
params = {'columns': [column], 'refcolumns': [ref_column]}
if migrate_engine.name == 'mysql':
params['name'] = "_".join(('fk', table_name, column_name))
with migrate_engine.begin():
# Restore data that had been dropped.
table_dump_name = 'dump_' + table_name
table_dump = Table(table_dump_name, meta, autoload=True)
sql = utils.InsertFromSelect(table, table_dump.select())
migrate_engine.execute(sql)
table_dump.drop()
fkey = ForeignKeyConstraint(**params)
fkey.drop()
with migrate_engine.begin():
restore_rows(migrate_engine, meta, table_name)
# compute_node_stats has a missing foreign key and is a child of
# of compute_nodes. Don't bother processing it as a child since
# only want to restore the dump once
if child and table_name != 'compute_nodes':
child_table_name, child_column_name, child_ref_column_name = child
with migrate_engine.begin():
restore_rows(migrate_engine, meta, child_table_name)

View File

@ -2885,15 +2885,22 @@ class TestNovaMigrations(BaseWalkMigrationTestCase, CommonTestsMixIn):
'supported_instance')
def _data_209(self):
ret = {"compute_nodes": {"service_id": 999, "vcpus": 1, "memory_mb": 1,
"local_gb": 1, "vcpus_used": 1,
"memory_mb_used": 1, "local_gb_used": 1,
"hypervisor_type": "fake_type",
"hypervisor_version": 1, "cpu_info": "info"},
"instance_actions": {"instance_uuid": "fake"},
"migrations": {"instance_uuid": "fake"},
"instance_faults": {"instance_uuid": "fake", "code": 1},
"compute_node_stats": {"compute_node_id": 1, "key": "fake"}}
ret = {
"compute_nodes": ({"service_id": 999, "vcpus": 1, "memory_mb": 1,
"local_gb": 1, "vcpus_used": 1,
"memory_mb_used": 1, "local_gb_used": 1,
"hypervisor_type": "fake_type",
"hypervisor_version": 1, "cpu_info": "info"},
("compute_node_stats", "compute_node_id",
{"key": "fake", "value": "bar"})),
"instance_actions": ({"instance_uuid": "fake"},
("instance_actions_events", "action_id",
{"event": "fake"})),
"migrations": ({"instance_uuid": "fake"}, None),
"instance_faults": ({"instance_uuid": "fake", "code": 1}, None),
"compute_node_stats": ({"compute_node_id": 1, "key": "fake"},
None),
}
return ret
def _constraints_209(self):
@ -2906,16 +2913,25 @@ class TestNovaMigrations(BaseWalkMigrationTestCase, CommonTestsMixIn):
def _pre_upgrade_209(self, engine):
if engine.name == 'sqlite':
return
instances = db_utils.get_table(engine, 'instances')
instances.delete().where(instances.c.uuid == None).execute()
tables = ["compute_nodes", "instance_actions", "migrations",
"instance_faults", "compute_node_stats"]
change_tables = dict((i, db_utils.get_table(engine, i))
for i in tables)
data = self._data_209()
for i in tables:
change_tables[i].delete().execute()
change_tables[i].insert().values(data[i]).execute()
for table_name, (row, child) in self._data_209().iteritems():
table = db_utils.get_table(engine, table_name)
table.delete().execute()
result = table.insert().values(row).execute()
if child:
# Get id of row
child_table_name, child_column_name, child_row = child
child_row = child_row.copy()
child_row[child_column_name] = result.inserted_primary_key[0]
child_table = db_utils.get_table(engine, child_table_name)
child_table.delete().execute()
child_table.insert().values(child_row).execute()
# NOTE(jhesketh): Add instance with NULL uuid to check the backups
# still work correctly and avoid violating the foreign
@ -2924,49 +2940,35 @@ class TestNovaMigrations(BaseWalkMigrationTestCase, CommonTestsMixIn):
# this instance inserted here causes migration 209 to
# fail unless the IN set sub-query is modified
# appropriately. See bug/1240325
db_utils.get_table(engine, 'instances').insert(
{'uuid': None}
).execute()
instances.insert({'uuid': None}).execute()
def _check_209(self, engine, data):
if engine.name == 'sqlite':
return
tables = ["compute_nodes", "instance_actions", "migrations",
"instance_faults", "compute_node_stats"]
change_tables = dict((i, db_utils.get_table(engine, i))
for i in tables)
data = self._data_209()
for i in tables:
insert_values = data[i]
table = change_tables[i]
for table_name, (row, child) in self._data_209().iteritems():
table = db_utils.get_table(engine, table_name)
self.assertRaises(sqlalchemy.exc.IntegrityError,
table.insert().execute,
insert_values)
dump_table = db_utils.get_table(engine, 'dump_' + i)
table.insert().values(row).execute)
dump_table = db_utils.get_table(engine, 'dump_' + table_name)
self.assertEqual(len(dump_table.select().execute().fetchall()), 1)
table.delete().execute()
fks = [(f.column.table.name, f.column.name)
for f in table.foreign_keys]
self.assertIn(self._constraints_209().get(i), fks)
self.assertIn(self._constraints_209().get(table_name), fks)
def _post_downgrade_209(self, engine):
if engine.name == 'sqlite':
return
check_tables = engine.table_names()
tables = ["compute_nodes", "instance_actions", "migrations",
"instance_faults", "compute_node_stats"]
change_tables = dict((i, db_utils.get_table(engine, i))
for i in tables)
data = self._data_209()
for i in tables:
dump_table_name = 'dump_' + i
for table_name, (row, child) in self._data_209().iteritems():
table = db_utils.get_table(engine, table_name)
dump_table_name = 'dump_' + table_name
self.assertNotIn(dump_table_name, check_tables)
table = change_tables[i]
table.insert().values(data[i]).execute()
table.insert().values(row).execute()
self.assertEqual(len(table.select().execute().fetchall()), 2)
fks = [(f.column.table.name, f.column.name)
for f in table.foreign_keys]
self.assertNotIn(self._constraints_209().get(i), fks)
self.assertNotIn(self._constraints_209().get(table_name), fks)
def _check_210(self, engine, data):
project_user_quotas = db_utils.get_table(engine, 'project_user_quotas')