Make unit tests compatible with SQLAlchemy 1.4

The .count() method is gone from the Table class.
Let's use func.count in its stead.

Closes-Bug: #1926399
Change-Id: Ica81271fe85478fdffae4ea0b6f4ed3b168f4552
This commit is contained in:
Radosław Piliszek 2021-05-07 10:12:52 +00:00
parent d312d23c82
commit e5246e282f
1 changed files with 14 additions and 19 deletions

View File

@ -20,6 +20,7 @@ import uuid
from oslo_db.sqlalchemy import utils as sqlalchemyutils from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_utils import timeutils from oslo_utils import timeutils
from sqlalchemy.dialects import sqlite from sqlalchemy.dialects import sqlite
from sqlalchemy.sql import func, select
from masakari import context from masakari import context
from masakari import db from masakari import db
@ -112,15 +113,17 @@ class PurgeDeletedTest(test.TestCase):
if dialect == sqlite.dialect: if dialect == sqlite.dialect:
self.conn.execute("PRAGMA foreign_keys = ON") self.conn.execute("PRAGMA foreign_keys = ON")
def _count(self, table):
return self.conn.execute(
select([func.count()]).select_from(table)).scalar()
def test_purge_deleted_rows_old(self): def test_purge_deleted_rows_old(self):
# Purge at 30 days old, should only delete 2 rows # Purge at 30 days old, should only delete 2 rows
db.purge_deleted_rows(self.context, age_in_days=30, max_rows=10) db.purge_deleted_rows(self.context, age_in_days=30, max_rows=10)
notifications_rows = self.conn.execute( notifications_rows = self._count(self.notifications)
self.notifications.count()).scalar() failover_segments_rows = self._count(self.failover_segments)
failover_segments_rows = self.conn.execute( hosts_rows = self._count(self.hosts)
self.failover_segments.count()).scalar()
hosts_rows = self.conn.execute(self.hosts.count()).scalar()
# Verify that we only deleted 2 # Verify that we only deleted 2
self.assertEqual(4, notifications_rows) self.assertEqual(4, notifications_rows)
@ -130,13 +133,9 @@ class PurgeDeletedTest(test.TestCase):
def test_purge_all_deleted_rows(self): def test_purge_all_deleted_rows(self):
db.purge_deleted_rows(self.context, age_in_days=20, max_rows=-1) db.purge_deleted_rows(self.context, age_in_days=20, max_rows=-1)
notifications_rows = self.conn.execute( notifications_rows = self._count(self.notifications)
self.notifications.count()).scalar() failover_segments_rows = self._count(self.failover_segments)
hosts_rows = self._count(self.hosts)
failover_segments_rows = self.conn.execute(
self.failover_segments.count()).scalar()
hosts_rows = self.conn.execute(self.hosts.count()).scalar()
# Verify that we have purged all deleted rows # Verify that we have purged all deleted rows
self.assertEqual(2, notifications_rows) self.assertEqual(2, notifications_rows)
@ -146,13 +145,9 @@ class PurgeDeletedTest(test.TestCase):
def test_purge_maximum_rows_partial_deleted_records(self): def test_purge_maximum_rows_partial_deleted_records(self):
db.purge_deleted_rows(self.context, age_in_days=60, max_rows=3) db.purge_deleted_rows(self.context, age_in_days=60, max_rows=3)
notifications_rows = self.conn.execute( notifications_rows = self._count(self.notifications)
self.notifications.count()).scalar() failover_segments_rows = self._count(self.failover_segments)
hosts_rows = self._count(self.hosts)
failover_segments_rows = self.conn.execute(
self.failover_segments.count()).scalar()
hosts_rows = self.conn.execute(self.hosts.count()).scalar()
# Verify that we have deleted 3 rows only # Verify that we have deleted 3 rows only
self.assertEqual(4, notifications_rows) self.assertEqual(4, notifications_rows)