db: Don't rely on autocommit behavior

Resolve the following RemovedIn20Warning warning:

  The current statement is being autocommitted using implicit
  autocommit, which will be removed in SQLAlchemy 2.0. Use the .begin()
  method of Engine or Connection in order to use an explicit transaction
  for DML and DDL statements.

I genuinely expected this one to be more difficult to resolve, but we
weren't using this as much as expected (thank you, non-legacy
enginefacade).

With this change, we appear to be SQLAlchemy 2.0 ready.

Change-Id: Ic43c21038ee682f9733fbde42c6d24f8088815fc
Signed-off-by: Stephen Finucane <sfinucan@redhat.com>
This commit is contained in:
Stephen Finucane 2022-04-08 10:58:42 +01:00
parent b02166c91f
commit 612b83ee5d
4 changed files with 309 additions and 215 deletions

View File

@ -612,9 +612,9 @@ def _compute_node_select(context, filters=None, limit=None, marker=None):
def _compute_node_fetchall(context, filters=None, limit=None, marker=None):
select = _compute_node_select(context, filters, limit=limit, marker=marker)
engine = get_engine(context=context)
conn = engine.connect()
results = conn.execute(select).fetchall()
with engine.connect() as conn, conn.begin():
results = conn.execute(select).fetchall()
# Callers expect dict-like objects, not SQLAlchemy RowProxy objects...
results = [dict(r._mapping) for r in results]
@ -983,9 +983,9 @@ def compute_node_statistics(context):
).label('disk_available_least'),
]
select = sql.select(*agg_cols).select_from(j)
conn = engine.connect()
results = conn.execute(select).fetchone()
with engine.connect() as conn, conn.begin():
results = conn.execute(select).fetchone()
# Build a dict of the info--making no assumptions about result
fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used',
@ -993,7 +993,6 @@ def compute_node_statistics(context):
'current_workload', 'running_vms', 'disk_available_least')
results = {field: int(results[idx] or 0)
for idx, field in enumerate(fields)}
conn.close()
return results
@ -4293,7 +4292,8 @@ def _get_fk_stmts(metadata, conn, table, column, records):
select = sql.select(fk.column).where(
sql.and_(fk.parent == fk.column, column.in_(records))
)
rows = conn.execute(select).fetchall()
with conn.begin():
rows = conn.execute(select).fetchall()
p_records = [r[0] for r in rows]
# Then, select rows in the child table that correspond to the
# parent table records that were passed in.
@ -4308,7 +4308,8 @@ def _get_fk_stmts(metadata, conn, table, column, records):
fk_select = sql.select(fk_column).where(
sql.and_(fk.parent == fk.column, fk.column.in_(p_records))
)
fk_rows = conn.execute(fk_select).fetchall()
with conn.begin():
fk_rows = conn.execute(fk_select).fetchall()
fk_records = [r[0] for r in fk_rows]
if fk_records:
# If we found any records in the child table, create shadow
@ -4395,7 +4396,8 @@ def _archive_deleted_rows_for_table(
select = select.where(table.c.updated_at < before)
select = select.order_by(column).limit(max_rows)
rows = conn.execute(select).fetchall()
with conn.begin():
rows = conn.execute(select).fetchall()
records = [r[0] for r in rows]
# We will archive deleted rows for this table and also generate insert and
@ -4431,7 +4433,8 @@ def _archive_deleted_rows_for_table(
query_select = sql.select(table.c.uuid).where(
table.c.id.in_(records)
)
rows = conn.execute(query_select).fetchall()
with conn.begin():
rows = conn.execute(query_select).fetchall()
deleted_instance_uuids = [r[0] for r in rows]
try:
@ -4453,6 +4456,8 @@ def _archive_deleted_rows_for_table(
"%(tablename)s: %(error)s",
{'tablename': tablename, 'error': str(ex)})
conn.close()
return rows_archived, deleted_instance_uuids, extras
@ -4575,7 +4580,8 @@ def purge_shadow_tables(context, before_date, status_fn=None):
else:
delete = table.delete()
deleted = conn.execute(delete)
with conn.begin():
deleted = conn.execute(delete)
if deleted.rowcount > 0:
status_fn(_('Deleted %(rows)i rows from %(table)s based on '
'timestamp column %(col)s') % {
@ -4584,6 +4590,8 @@ def purge_shadow_tables(context, before_date, status_fn=None):
'col': col is None and '(n/a)' or col.name})
total_deleted += deleted.rowcount
conn.close()
return total_deleted

View File

@ -870,15 +870,6 @@ class WarningsFixture(fixtures.Fixture):
category=sqla_exc.SADeprecationWarning,
)
# ...but filter everything out until we get around to fixing them
# TODO(stephenfin): Fix all of these
warnings.filterwarnings(
'ignore',
module='nova',
message=r'The current statement is being autocommitted .*',
category=sqla_exc.SADeprecationWarning)
self.addCleanup(self._reset_warning_filters)
def _reset_warning_filters(self):

View File

@ -5663,7 +5663,6 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
super(ArchiveTestCase, self).setUp()
self.engine = db.get_engine()
self.metadata = sa.MetaData()
self.conn = self.engine.connect()
self.instance_id_mappings = models.InstanceIdMapping.__table__
self.shadow_instance_id_mappings = sqlalchemyutils.get_table(
self.engine, "shadow_instance_id_mappings")
@ -5696,12 +5695,13 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
"""
metadata = sa.MetaData()
metadata.reflect(bind=self.engine)
for table in metadata.tables:
if table.startswith("shadow_") and table not in exceptions:
rows = self.conn.exec_driver_sql(
"SELECT * FROM %s" % table
).fetchall()
self.assertEqual(rows, [], "Table %s not empty" % table)
with self.engine.connect() as conn, conn.begin():
for table in metadata.tables:
if table.startswith("shadow_") and table not in exceptions:
rows = conn.exec_driver_sql(
"SELECT * FROM %s" % table
).fetchall()
self.assertEqual(rows, [], "Table %s not empty" % table)
def test_shadow_tables(self):
"""Validate shadow table schema.
@ -5754,57 +5754,72 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
self._assert_shadow_tables_empty_except()
def test_archive_deleted_rows(self):
# Add 6 rows to table
for uuidstr in self.uuidstrs:
ins_stmt = self.instance_id_mappings.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
# Add 6 rows to table
for uuidstr in self.uuidstrs:
ins_stmt = self.instance_id_mappings.insert().values(
uuid=uuidstr,
)
conn.execute(ins_stmt)
# Set 4 to deleted
update_statement = self.instance_id_mappings.update().\
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
with self.engine.connect() as conn, conn.begin():
update_statement = self.instance_id_mappings.update().where(
self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:4])
).values(deleted=1, deleted_at=timeutils.utcnow())
conn.execute(update_statement)
qiim = sql.select(self.instance_id_mappings).where(
self.instance_id_mappings.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qiim).fetchall()
# Verify we have 6 in main
self.assertEqual(len(rows), 6)
qsiim = sql.select(self.shadow_instance_id_mappings).where(
self.shadow_instance_id_mappings.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qsiim).fetchall()
# Verify we have 0 in shadow
self.assertEqual(len(rows), 0)
# Archive 2 rows
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 0)
results = db.archive_deleted_rows(max_rows=2)
expected = dict(instance_id_mappings=2)
self._assertEqualObjects(expected, results[0])
rows = self.conn.execute(qiim).fetchall()
# Verify we have 4 left in main
self.assertEqual(len(rows), 4)
rows = self.conn.execute(qsiim).fetchall()
# Verify we have 2 in shadow
self.assertEqual(len(rows), 2)
with self.engine.connect() as conn, conn.begin():
# Archive 2 rows and verify we have 4 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
# Archive 2 more rows
results = db.archive_deleted_rows(max_rows=2)
expected = dict(instance_id_mappings=2)
self._assertEqualObjects(expected, results[0])
rows = self.conn.execute(qiim).fetchall()
# Verify we have 2 left in main
self.assertEqual(len(rows), 2)
rows = self.conn.execute(qsiim).fetchall()
# Verify we have 4 in shadow
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
# Try to archive more, but there are no deleted rows left.
results = db.archive_deleted_rows(max_rows=2)
expected = dict()
self._assertEqualObjects(expected, results[0])
rows = self.conn.execute(qiim).fetchall()
# Verify we still have 2 left in main
self.assertEqual(len(rows), 2)
rows = self.conn.execute(qsiim).fetchall()
# Verify we still have 4 in shadow
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we still have 2 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we still have 4 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
# Ensure only deleted rows were deleted
self._assert_shadow_tables_empty_except(
@ -5814,34 +5829,45 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Add 6 rows to table
for uuidstr in self.uuidstrs:
ins_stmt = self.instances.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
ins_stmt = self.instance_actions.insert().\
values(instance_uuid=uuidstr)
result = self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
ins_stmt = self.instance_actions.insert().values(
instance_uuid=uuidstr,
)
with self.engine.connect() as conn, conn.begin():
result = conn.execute(ins_stmt)
instance_action_uuid = result.inserted_primary_key[0]
ins_stmt = self.instance_actions_events.insert().\
values(action_id=instance_action_uuid)
self.conn.execute(ins_stmt)
ins_stmt = self.instance_actions_events.insert().values(
action_id=instance_action_uuid,
)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
# Set 1 to deleted before 2017-01-01
deleted_at = timeutils.parse_strtime('2017-01-01T00:00:00.0')
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[0:1]))\
.values(deleted=1, deleted_at=deleted_at)
self.conn.execute(update_statement)
update_statement = self.instances.update().where(
self.instances.c.uuid.in_(self.uuidstrs[0:1])
).values(deleted=1, deleted_at=deleted_at)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
# Set 1 to deleted before 2017-01-02
deleted_at = timeutils.parse_strtime('2017-01-02T00:00:00.0')
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[1:2]))\
.values(deleted=1, deleted_at=deleted_at)
self.conn.execute(update_statement)
update_statement = self.instances.update().where(
self.instances.c.uuid.in_(self.uuidstrs[1:2])
).values(deleted=1, deleted_at=deleted_at)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
# Set 2 to deleted now
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[2:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
update_statement = self.instances.update().where(
self.instances.c.uuid.in_(self.uuidstrs[2:4])
).values(deleted=1, deleted_at=timeutils.utcnow())
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
qiim = sql.select(self.instances).where(
self. instances.c.uuid.in_(self.uuidstrs)
)
@ -5849,9 +5875,11 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
self.shadow_instances.c.uuid.in_(self.uuidstrs)
)
# Verify we have 6 in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
# Make sure 'before' comparison is for < not <=, nothing deleted
before_date = dateutil_parser.parse('2017-01-01', fuzzy=True)
_, uuids, _ = db.archive_deleted_rows(max_rows=1, before=before_date)
@ -5885,22 +5913,25 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
expected = {}
self._assertEqualObjects(expected, results[0])
# Verify we have 4 left in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = self.conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
with self.engine.connect() as conn, conn.begin():
# Verify we have 4 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
# Archive everything else, make sure default operation without
# before argument didn't break
results = db.archive_deleted_rows(max_rows=1000)
# Verify we have 2 left in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = self.conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
def test_archive_deleted_rows_for_every_uuid_table(self):
tablenames = []
@ -5928,94 +5959,117 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
for uuidstr in self.uuidstrs:
ins_stmt = main_table.insert().values(uuid=uuidstr)
try:
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
except (db_exc.DBError, sqla_exc.OperationalError):
# This table has constraints that require a table-specific
# insert, so skip it.
return 2
# Set 4 to deleted
update_statement = main_table.update().\
where(main_table.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
update_statement = main_table.update().where(
main_table.c.uuid.in_(self.uuidstrs[:4])
).values(deleted=1, deleted_at=timeutils.utcnow())
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
qmt = sql.select(main_table).where(
main_table.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qmt).fetchall()
# Verify we have 6 in main
self.assertEqual(len(rows), 6)
qst = sql.select(shadow_table).where(
shadow_table.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qst).fetchall()
# Verify we have 0 in shadow
self.assertEqual(len(rows), 0)
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in main
rows = conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in shadow
rows = conn.execute(qst).fetchall()
self.assertEqual(len(rows), 0)
# Archive 2 rows
db._archive_deleted_rows_for_table(
self.metadata, self.engine, tablename, max_rows=2, before=None,
task_log=False,
)
# Verify we have 4 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = self.conn.execute(qst).fetchall()
self.assertEqual(len(rows), 2)
with self.engine.connect() as conn, conn.begin():
# Verify we have 4 left in main
rows = conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = conn.execute(qst).fetchall()
self.assertEqual(len(rows), 2)
# Archive 2 more rows
db._archive_deleted_rows_for_table(
self.metadata, self.engine, tablename, max_rows=2, before=None,
task_log=False,
)
# Verify we have 2 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = self.conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 left in main
rows = conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
# Try to archive more, but there are no deleted rows left.
db._archive_deleted_rows_for_table(
self.metadata, self.engine, tablename, max_rows=2, before=None,
task_log=False,
)
# Verify we still have 2 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
# Verify we still have 4 in shadow
rows = self.conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we still have 2 left in main
rows = conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
# Verify we still have 4 in shadow
rows = conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
return 0
def test_archive_deleted_rows_shadow_insertions_equals_deletions(self):
# Add 2 rows to table
for uuidstr in self.uuidstrs[:2]:
ins_stmt = self.instance_id_mappings.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
# Set both to deleted
update_statement = self.instance_id_mappings.update().\
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:2]))\
.values(deleted=1)
self.conn.execute(update_statement)
update_statement = self.instance_id_mappings.update().where(
self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:2])
).values(deleted=1)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
qiim = sql.select(self.instance_id_mappings).where(
self. instance_id_mappings.c.uuid.in_(self.uuidstrs[:2])
)
rows = self.conn.execute(qiim).fetchall()
# Verify we have 2 in main
self.assertEqual(len(rows), 2)
qsiim = sql.select(self.shadow_instance_id_mappings).where(
self.shadow_instance_id_mappings.c.uuid.in_(self.uuidstrs[:2])
)
shadow_rows = self.conn.execute(qsiim).fetchall()
# Verify we have 0 in shadow
self.assertEqual(len(shadow_rows), 0)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 0 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 0)
# Archive the rows
db.archive_deleted_rows(max_rows=2)
main_rows = self.conn.execute(qiim).fetchall()
shadow_rows = self.conn.execute(qsiim).fetchall()
# Verify the insertions into shadow is same as deletions from main
self.assertEqual(len(shadow_rows), len(rows) - len(main_rows))
with self.engine.connect() as conn, conn.begin():
# Verify we now have 0 in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 0)
# Verify we now have 2 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
def test_archive_deleted_rows_for_migrations(self):
# migrations.instance_uuid depends on instances.uuid
@ -6025,13 +6079,18 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
instance_uuid = uuidsentinel.instance
ins_stmt = self.instances.insert().values(
uuid=instance_uuid,
deleted=1,
deleted_at=timeutils.utcnow())
self.conn.execute(ins_stmt)
ins_stmt = self.migrations.insert().values(instance_uuid=instance_uuid,
deleted=0)
self.conn.execute(ins_stmt)
uuid=instance_uuid,
deleted=1,
deleted_at=timeutils.utcnow(),
)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
ins_stmt = self.migrations.insert().values(
instance_uuid=instance_uuid, deleted=0,
)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
# Archiving instances should result in migrations related to the
# instances also being archived.
num = db._archive_deleted_rows_for_table(
@ -6047,70 +6106,86 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Add 6 rows to each table
for uuidstr in self.uuidstrs:
ins_stmt = self.instance_id_mappings.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
ins_stmt2 = self.instances.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt2)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt2)
# Set 4 of each to deleted
update_statement = self.instance_id_mappings.update().\
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
update_statement2 = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement2)
# Verify we have 6 in each main table
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement2)
qiim = sql.select(self.instance_id_mappings).where(
self.instance_id_mappings.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
qi = sql.select(self.instances).where(
self.instances.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qi).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in each shadow table
qsiim = sql.select(self.shadow_instance_id_mappings).where(
self.shadow_instance_id_mappings.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 0)
qsi = sql.select(self.shadow_instances).where(
self.shadow_instances.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qsi).fetchall()
self.assertEqual(len(rows), 0)
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in each main table
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
rows = conn.execute(qi).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in each shadow table
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 0)
rows = conn.execute(qsi).fetchall()
self.assertEqual(len(rows), 0)
# Archive 7 rows, which should be 4 in one table and 3 in the other.
db.archive_deleted_rows(max_rows=7)
# Verify we have 5 left in the two main tables combined
iim_rows = self.conn.execute(qiim).fetchall()
i_rows = self.conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 5)
# Verify we have 7 in the two shadow tables combined.
siim_rows = self.conn.execute(qsiim).fetchall()
si_rows = self.conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 7)
with self.engine.connect() as conn, conn.begin():
# Verify we have 5 left in the two main tables combined
iim_rows = conn.execute(qiim).fetchall()
i_rows = conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 5)
# Verify we have 7 in the two shadow tables combined.
siim_rows = conn.execute(qsiim).fetchall()
si_rows = conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 7)
# Archive the remaining deleted rows.
db.archive_deleted_rows(max_rows=1)
# Verify we have 4 total left in both main tables.
iim_rows = self.conn.execute(qiim).fetchall()
i_rows = self.conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 4)
# Verify we have 8 in shadow
siim_rows = self.conn.execute(qsiim).fetchall()
si_rows = self.conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 8)
with self.engine.connect() as conn, conn.begin():
# Verify we have 4 total left in both main tables.
iim_rows = conn.execute(qiim).fetchall()
i_rows = conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 4)
# Verify we have 8 in shadow
siim_rows = conn.execute(qsiim).fetchall()
si_rows = conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 8)
# Try to archive more, but there are no deleted rows left.
db.archive_deleted_rows(max_rows=500)
# Verify we have 4 total left in both main tables.
iim_rows = self.conn.execute(qiim).fetchall()
i_rows = self.conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 4)
# Verify we have 8 in shadow
siim_rows = self.conn.execute(qsiim).fetchall()
si_rows = self.conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 8)
with self.engine.connect() as conn, conn.begin():
# Verify we have 4 total left in both main tables.
iim_rows = conn.execute(qiim).fetchall()
i_rows = conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 4)
# Verify we have 8 in shadow
siim_rows = conn.execute(qsiim).fetchall()
si_rows = conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 8)
self._assert_shadow_tables_empty_except(
'shadow_instances',
'shadow_instance_id_mappings'
@ -6122,34 +6197,47 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
ins_stmt = self.task_log.insert().values(
id=i, task_name='instance_usage_audit', state='DONE',
host='host', message='message')
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
# Set 1 to updated before 2017-01-01
updated_at = timeutils.parse_strtime('2017-01-01T00:00:00.0')
update_statement = self.task_log.update().where(
self.task_log.c.id == 1).values(updated_at=updated_at)
self.conn.execute(update_statement)
self.task_log.c.id == 1
).values(updated_at=updated_at)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
# Set 1 to updated before 2017-01-02
updated_at = timeutils.parse_strtime('2017-01-02T00:00:00.0')
update_statement = self.task_log.update().where(
self.task_log.c.id == 2).values(updated_at=updated_at)
self.conn.execute(update_statement)
self.task_log.c.id == 2
).values(updated_at=updated_at)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
# Set 2 to updated now
update_statement = self.task_log.update().where(
self.task_log.c.id.in_(range(3, 5))).values(
updated_at=timeutils.utcnow())
self.conn.execute(update_statement)
# Verify we have 6 in main
self.task_log.c.id.in_(range(3, 5))
).values(updated_at=timeutils.utcnow())
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
qtl = sql.select(self.task_log).where(
self.task_log.c.id.in_(range(1, 7))
)
rows = self.conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in shadow
qstl = sql.select(self.shadow_task_log).where(
self.shadow_task_log.c.id.in_(range(1, 7))
)
rows = self.conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 0)
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in main
rows = conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in shadow
rows = conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 0)
# Make sure 'before' comparison is for < not <=
before_date = dateutil_parser.parse('2017-01-01', fuzzy=True)
_, _, rows = db.archive_deleted_rows(
@ -6171,22 +6259,27 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
results = db.archive_deleted_rows(max_rows=2, task_log=True)
expected = dict(task_log=2)
self._assertEqualObjects(expected, results[0])
# Verify we have 2 left in main
rows = self.conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = self.conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 left in main
rows = conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 4)
# Archive the rest
results = db.archive_deleted_rows(max_rows=100, task_log=True)
expected = dict(task_log=2)
self._assertEqualObjects(expected, results[0])
# Verify we have 0 left in main
rows = self.conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 0)
# Verify we have 6 in shadow
rows = self.conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 6)
with self.engine.connect() as conn, conn.begin():
# Verify we have 0 left in main
rows = conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 0)
# Verify we have 6 in shadow
rows = conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 6)
class PciDeviceDBApiTestCase(test.TestCase, ModelsObjectComparatorMixin):

View File

@ -2015,12 +2015,14 @@ class TestInstanceListObject(test_objects._LocalTest,
# manually here.
engine = db.get_engine()
table = sql_models.Instance.__table__
with engine.connect() as conn:
update = table.insert().values(user_id=self.context.user_id,
project_id=self.context.project_id,
uuid=uuids.nullinst,
host='foo',
hidden=None)
with engine.connect() as conn, conn.begin():
update = table.insert().values(
user_id=self.context.user_id,
project_id=self.context.project_id,
uuid=uuids.nullinst,
host='foo',
hidden=None,
)
conn.execute(update)
insts = objects.InstanceList.get_by_filters(self.context,