cinder-manage db purge has issues with foreign keys

When executing purge on a database with snapshots created from volumes
with glance metadata, it fails due to a foreign key exception.
This patch adds the table at the end of the purge and forces the use of
foreign key checks on the tests

Change-Id: I67436a8319155bb47dfa67544ccae25f337ed246
Closes-Bug: #1489523
This commit is contained in:
Jose Castro Leon 2016-07-06 14:25:31 +02:00
parent afb81cd022
commit b6d5948988
2 changed files with 91 additions and 1 deletions

View File

@ -4457,7 +4457,7 @@ def purge_deleted_rows(context, age_in_days):
# Reorder the list so the volumes and volume_types tables are last
# to avoid FK constraints
for table in ("volume_types", "volumes"):
for table in ("volume_types", "snapshots", "volumes"):
tables.remove(table)
tables.append(table)

View File

@ -51,6 +51,15 @@ class PurgeDeletedTest(test.TestCase):
self.vol_type_proj = sqlalchemyutils.get_table(
self.engine, "volume_type_projects")
self.snapshots = sqlalchemyutils.get_table(
self.engine, "snapshots")
self.sm = sqlalchemyutils.get_table(
self.engine, "snapshot_metadata")
self.vgm = sqlalchemyutils.get_table(
self.engine, "volume_glance_metadata")
self.uuidstrs = []
for unused in range(6):
self.uuidstrs.append(uuid.uuid4().hex)
@ -60,6 +69,9 @@ class PurgeDeletedTest(test.TestCase):
self.conn.execute(ins_stmt)
ins_stmt = self.vm.insert().values(volume_id=uuidstr)
self.conn.execute(ins_stmt)
ins_stmt = self.vgm.insert().values(
volume_id=uuidstr, key='image_name', value='test')
self.conn.execute(ins_stmt)
ins_stmt = self.vol_types.insert().values(id=uuidstr)
self.conn.execute(ins_stmt)
@ -67,6 +79,16 @@ class PurgeDeletedTest(test.TestCase):
values(volume_type_id=uuidstr)
self.conn.execute(ins_stmt)
ins_stmt = self.snapshots.insert().values(
id=uuidstr, volume_id=uuidstr)
self.conn.execute(ins_stmt)
ins_stmt = self.sm.insert().values(snapshot_id=uuidstr)
self.conn.execute(ins_stmt)
ins_stmt = self.vgm.insert().values(
snapshot_id=uuidstr, key='image_name', value='test')
self.conn.execute(ins_stmt)
# Set 4 of them deleted, 2 are 60 days ago, 2 are 20 days ago
old = timeutils.utcnow() - datetime.timedelta(days=20)
older = timeutils.utcnow() - datetime.timedelta(days=60)
@ -97,6 +119,32 @@ class PurgeDeletedTest(test.TestCase):
where(self.vol_type_proj.c.volume_type_id.in_(self.uuidstrs[4:6]))\
.values(deleted_at=older)
make_snap_old = self.snapshots.update().\
where(self.snapshots.c.id.in_(self.uuidstrs[1:3]))\
.values(deleted_at=old)
make_snap_older = self.snapshots.update().\
where(self.snapshots.c.id.in_(self.uuidstrs[4:6]))\
.values(deleted_at=older)
make_snap_meta_old = self.sm.update().\
where(self.sm.c.snapshot_id.in_(self.uuidstrs[1:3]))\
.values(deleted_at=old)
make_snap_meta_older = self.sm.update().\
where(self.sm.c.snapshot_id.in_(self.uuidstrs[4:6]))\
.values(deleted_at=older)
make_vol_glance_meta_old = self.vgm.update().\
where(self.vgm.c.volume_id.in_(self.uuidstrs[1:3]))\
.values(deleted_at=old)
make_vol_glance_meta_older = self.vgm.update().\
where(self.vgm.c.volume_id.in_(self.uuidstrs[4:6]))\
.values(deleted_at=older)
make_snap_glance_meta_old = self.vgm.update().\
where(self.vgm.c.snapshot_id.in_(self.uuidstrs[1:3]))\
.values(deleted_at=old)
make_snap_glance_meta_older = self.vgm.update().\
where(self.vgm.c.snapshot_id.in_(self.uuidstrs[4:6]))\
.values(deleted_at=older)
self.conn.execute(make_vol_old)
self.conn.execute(make_vol_older)
self.conn.execute(make_vol_meta_old)
@ -107,7 +155,27 @@ class PurgeDeletedTest(test.TestCase):
self.conn.execute(make_vol_type_proj_old)
self.conn.execute(make_vol_type_proj_older)
self.conn.execute(make_snap_old)
self.conn.execute(make_snap_older)
self.conn.execute(make_snap_meta_old)
self.conn.execute(make_snap_meta_older)
self.conn.execute(make_vol_glance_meta_old)
self.conn.execute(make_vol_glance_meta_older)
self.conn.execute(make_snap_glance_meta_old)
self.conn.execute(make_snap_glance_meta_older)
def test_purge_deleted_rows_old(self):
dialect = self.engine.url.get_dialect()
if dialect == sqlite.dialect:
# We're seeing issues with foreign key support in SQLite 3.6.20
# SQLAlchemy doesn't support it at all with < SQLite 3.6.19
# It works fine in SQLite 3.7.
# Force foreign_key checking if running SQLite >= 3.7
import sqlite3
tup = sqlite3.sqlite_version_info
if tup[0] > 3 or (tup[0] == 3 and tup[1] >= 7):
self.conn.execute("PRAGMA foreign_keys = ON")
# Purge at 30 days old, should only delete 2 rows
db.purge_deleted_rows(self.context, age_in_days=30)
@ -115,14 +183,30 @@ class PurgeDeletedTest(test.TestCase):
vol_meta_rows = self.session.query(self.vm).count()
vol_type_rows = self.session.query(self.vol_types).count()
vol_type_proj_rows = self.session.query(self.vol_type_proj).count()
snap_rows = self.session.query(self.snapshots).count()
snap_meta_rows = self.session.query(self.sm).count()
vol_glance_meta_rows = self.session.query(self.vgm).count()
# Verify that we only deleted 2
self.assertEqual(4, vol_rows)
self.assertEqual(4, vol_meta_rows)
self.assertEqual(4, vol_type_rows)
self.assertEqual(4, vol_type_proj_rows)
self.assertEqual(4, snap_rows)
self.assertEqual(4, snap_meta_rows)
self.assertEqual(8, vol_glance_meta_rows)
def test_purge_deleted_rows_older(self):
dialect = self.engine.url.get_dialect()
if dialect == sqlite.dialect:
# We're seeing issues with foreign key support in SQLite 3.6.20
# SQLAlchemy doesn't support it at all with < SQLite 3.6.19
# It works fine in SQLite 3.7.
# Force foreign_key checking if running SQLite >= 3.7
import sqlite3
tup = sqlite3.sqlite_version_info
if tup[0] > 3 or (tup[0] == 3 and tup[1] >= 7):
self.conn.execute("PRAGMA foreign_keys = ON")
# Purge at 10 days old now, should delete 2 more rows
db.purge_deleted_rows(self.context, age_in_days=10)
@ -130,12 +214,18 @@ class PurgeDeletedTest(test.TestCase):
vol_meta_rows = self.session.query(self.vm).count()
vol_type_rows = self.session.query(self.vol_types).count()
vol_type_proj_rows = self.session.query(self.vol_type_proj).count()
snap_rows = self.session.query(self.snapshots).count()
snap_meta_rows = self.session.query(self.sm).count()
vol_glance_meta_rows = self.session.query(self.vgm).count()
# Verify that we only have 2 rows now
self.assertEqual(2, vol_rows)
self.assertEqual(2, vol_meta_rows)
self.assertEqual(2, vol_type_rows)
self.assertEqual(2, vol_type_proj_rows)
self.assertEqual(2, snap_rows)
self.assertEqual(2, snap_meta_rows)
self.assertEqual(4, vol_glance_meta_rows)
def test_purge_deleted_rows_bad_args(self):
# Test with no age argument