Merge "archive_deleted_instances is not atomic for insert/delete"
This commit is contained in:
commit
9e5b5425a5
|
@ -6161,43 +6161,39 @@ def _archive_deleted_rows_for_table(tablename, max_rows):
|
|||
|
||||
conn.execute(update_statement)
|
||||
|
||||
insert = shadow_table.insert(inline=True).\
|
||||
from_select(columns,
|
||||
sql.select([table],
|
||||
deleted_column != deleted_column.default.arg).
|
||||
order_by(column).limit(max_rows))
|
||||
query_delete = sql.select([column],
|
||||
deleted_column != deleted_column.default.arg).\
|
||||
order_by(column).limit(max_rows)
|
||||
select = sql.select([column],
|
||||
deleted_column != deleted_column.default.arg).\
|
||||
order_by(column).limit(max_rows)
|
||||
rows = conn.execute(select).fetchall()
|
||||
records = [r[0] for r in rows]
|
||||
|
||||
delete_statement = DeleteFromSelect(table, query_delete, column)
|
||||
if records:
|
||||
insert = shadow_table.insert(inline=True).\
|
||||
from_select(columns, sql.select([table], column.in_(records)))
|
||||
delete = table.delete().where(column.in_(records))
|
||||
# NOTE(tssurya): In order to facilitate the deletion of records from
|
||||
# instance_mappings and request_specs tables in the nova_api DB, the
|
||||
# rows of deleted instances from the instances table are stored prior
|
||||
# to their deletion. Basically the uuids of the archived instances
|
||||
# are queried and returned.
|
||||
if tablename == "instances":
|
||||
query_select = sql.select([table.c.uuid], table.c.id.in_(records))
|
||||
rows = conn.execute(query_select).fetchall()
|
||||
deleted_instance_uuids = [r[0] for r in rows]
|
||||
|
||||
# NOTE(tssurya): In order to facilitate the deletion of records from
|
||||
# instance_mappings table in the nova_api DB, the rows of deleted instances
|
||||
# from the instances table are stored prior to their deletion from
|
||||
# the instances table. Basically the uuids of the archived instances
|
||||
# are queried and returned.
|
||||
if tablename == "instances":
|
||||
query_delete = query_delete.column(table.c.uuid)
|
||||
rows = conn.execute(query_delete).fetchall()
|
||||
deleted_instance_uuids = [r[1] for r in rows]
|
||||
|
||||
try:
|
||||
# Group the insert and delete in a transaction.
|
||||
with conn.begin():
|
||||
conn.execute(insert)
|
||||
if tablename == "instances":
|
||||
delete_statement = table.delete().where(table.c.uuid.in_(
|
||||
deleted_instance_uuids))
|
||||
result_delete = conn.execute(delete_statement)
|
||||
rows_archived = result_delete.rowcount
|
||||
except db_exc.DBReferenceError as ex:
|
||||
# A foreign key constraint keeps us from deleting some of
|
||||
# these rows until we clean up a dependent table. Just
|
||||
# skip this table for now; we'll come back to it later.
|
||||
LOG.warning("IntegrityError detected when archiving table "
|
||||
"%(tablename)s: %(error)s",
|
||||
{'tablename': tablename, 'error': six.text_type(ex)})
|
||||
try:
|
||||
# Group the insert and delete in a transaction.
|
||||
with conn.begin():
|
||||
conn.execute(insert)
|
||||
result_delete = conn.execute(delete)
|
||||
rows_archived = result_delete.rowcount
|
||||
except db_exc.DBReferenceError as ex:
|
||||
# A foreign key constraint keeps us from deleting some of
|
||||
# these rows until we clean up a dependent table. Just
|
||||
# skip this table for now; we'll come back to it later.
|
||||
LOG.warning("IntegrityError detected when archiving table "
|
||||
"%(tablename)s: %(error)s",
|
||||
{'tablename': tablename, 'error': six.text_type(ex)})
|
||||
|
||||
if ((max_rows is None or rows_archived < max_rows)
|
||||
and 'instance_uuid' in columns):
|
||||
|
|
|
@ -9240,6 +9240,37 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
|
|||
'shadow_dns_domains',
|
||||
)
|
||||
|
||||
def test_archive_deleted_rows_shadow_insertions_equals_deletions(self):
|
||||
# Add 2 rows to table
|
||||
for uuidstr in self.uuidstrs[:2]:
|
||||
ins_stmt = self.instance_id_mappings.insert().values(uuid=uuidstr)
|
||||
self.conn.execute(ins_stmt)
|
||||
# Set both to deleted
|
||||
update_statement = self.instance_id_mappings.update().\
|
||||
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:2]))\
|
||||
.values(deleted=1)
|
||||
self.conn.execute(update_statement)
|
||||
qiim = sql.select([self.instance_id_mappings]).where(self.
|
||||
instance_id_mappings.c.uuid.in_(self.uuidstrs[:2]))
|
||||
rows = self.conn.execute(qiim).fetchall()
|
||||
# Verify we have 2 in main
|
||||
self.assertEqual(len(rows), 2)
|
||||
|
||||
qsiim = sql.select([self.shadow_instance_id_mappings]).\
|
||||
where(self.shadow_instance_id_mappings.c.uuid.in_(
|
||||
self.uuidstrs[:2]))
|
||||
shadow_rows = self.conn.execute(qsiim).fetchall()
|
||||
# Verify we have 0 in shadow
|
||||
self.assertEqual(len(shadow_rows), 0)
|
||||
|
||||
# Archive the rows
|
||||
db.archive_deleted_rows(max_rows=2)
|
||||
|
||||
main_rows = self.conn.execute(qiim).fetchall()
|
||||
shadow_rows = self.conn.execute(qsiim).fetchall()
|
||||
# Verify the insertions into shadow is same as deletions from main
|
||||
self.assertEqual(len(shadow_rows), len(rows) - len(main_rows))
|
||||
|
||||
def _check_sqlite_version_less_than_3_7(self):
|
||||
# SQLite doesn't enforce foreign key constraints without a pragma.
|
||||
dialect = self.engine.url.get_dialect()
|
||||
|
|
Loading…
Reference in New Issue