db: Migrate to alembic

This is easier than Cinder, Nova etc. but more difficult than Heat.
Masakari hasn't had many database migrations in recent cycles but it did
have on in Antelope. This means we need to handle people doing
skip-level upgrades and validate which version of the legacy migrations
they are currently on. We support users coming from both Zed and
Antelope and anyone else will need to go through an older version of
Masakari to update their database migrations first. Other than this
difference, the logic is pretty similar: as with other projects, we
simply determine if we're upgrading a deployment that was previously
using sqlalchemy-migrate, upgrading a deployment that has already
migrated to alembic, or deploying a new deployment, and adjust
accordingly.

In addition, we also have to consider Taskflow's migrations. These were
previously being run once as part of the legacy
006_add_persistence_tables migrations. Since Taskflow uses Alembic under
the hood, it's safe to run every time. The presence of Taskflow does
force us to use a different table name in Masakari though.

Note that one curious side-effect of this is that the order than table
rows are purged change. This appears to be happening because the
notification table is now being created in the initial Alembic
migration, which alters the return value of 'MetaData.sorted_tables'.
In any case, the fix is simply a case of adjusting this order in the
tests.

Change-Id: I5285d7cc3c6da0059c0908cedc195b2262cb1fce
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2023-07-13 13:26:28 +01:00
parent 85fd89726c
commit 6823ea5ed1
6 changed files with 291 additions and 292 deletions

View File

@ -15,16 +15,14 @@
# under the License.
"""
CLI interface for masakari management.
CLI interface for masakari management.
"""
import logging as python_logging
import sys
import time
from oslo_config import cfg
from oslo_db.sqlalchemy import migration
from oslo_log import log as logging
import masakari.conf
@ -76,9 +74,7 @@ class DbCommands(object):
def version(self):
"""Print the current database version."""
print(migration.db_version(db_api.get_engine(),
db_migration.MIGRATE_REPO_PATH,
db_migration.INIT_VERSION))
print(db_migration.db_version())
@args('--age_in_days', type=int, default=30,
help='Purge deleted rows older than age in days (default: '

View File

@ -14,57 +14,166 @@
# under the License.
import os
import threading
from alembic import command as alembic_api
from alembic import config as alembic_config
from alembic import migration as alembic_migration
from oslo_config import cfg
from oslo_db import exception as oslo_exception
from oslo_db import options
from oslo_db.sqlalchemy import migration
from oslo_log import log as logging
import sqlalchemy as sa
from masakari import db
import masakari.conf
from masakari.db import api as db_api
from masakari.engine import driver
from masakari import exception
from masakari.i18n import _
INIT_VERSION = 0
_IMPL = None
_LOCK = threading.Lock()
options.set_defaults(cfg.CONF)
MIGRATE_REPO_PATH = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
'migrate_repo',
)
LOG = logging.getLogger(__name__)
CONF = masakari.conf.CONF
def db_sync(version=None, init_version=INIT_VERSION, engine=None):
def _migrate_legacy_database(engine, connection, config):
"""Check if database is a legacy sqlalchemy-migrate-managed database.
If it is, migrate it by "stamping" the initial alembic schema.
"""
# If the database doesn't have the sqlalchemy-migrate legacy migration
# table, we don't have anything to do
if not sa.inspect(engine).has_table('migrate_version'):
return
# Likewise, if we've already migrated to alembic, we don't have anything to
# do
context = alembic_migration.MigrationContext.configure(connection)
if context.get_current_revision():
return
# We have legacy migrations but no alembic migration. Stamp (dummy apply)
# the initial alembic migration(s). There may be one or two to apply
# depending on what's already applied.
# Get the currently applied version of the legacy migrations using table
# reflection to avoid a dependency on sqlalchemy-migrate
# https://opendev.org/x/sqlalchemy-migrate/src/commit/5d1f322542cd8eb42381612765be4ed9ca8105ec/migrate/versioning/schema.py#L175-L179
meta = sa.MetaData()
table = sa.Table('migrate_version', meta, autoload_with=engine)
with engine.connect() as conn:
version = conn.execute(sa.select(table.c.version)).scalar()
# If the user is requesting a skip-level upgrade from a very old version,
# we can't help them since we don't have alembic-versions of those old
# migrations :(
if version < 7:
reason = _(
'Your database is at version %03d; we only support upgrading '
'from version 007 or later. Please upgrade your database using '
'an earlier release of Masakari and then return here.'
)
raise exception.InvalidInput(reason % version)
elif version > 8:
if os.getenv('FORCE_MASAKARI_DB_SYNC') is None:
reason = _(
'Your database is at version %03d; we do not recognise this '
'version and it is likely you are carrying out-of-tree '
'migrations. You can still upgrade but we cannot guarantee '
'things will work as expected. '
'If you wish to continue, set the FORCE_MASAKARI_DB_SYNC '
'environment variable to any value and retry.'
)
raise exception.InvalidInput(reason % version)
else:
msg = _(
'Your database is at version %03d; we do not recognise this '
'version but the FORCE_MASAKARI_DB_SYNC environment variable '
'is set so we are continuing. Things may break. '
'You have been warned!',
)
LOG.warning(msg, version)
if version == 7:
alembic_init_version = '8f848eb45d03'
else: # 8 or greater (out-of-tree)
alembic_init_version = '8bdf5929c5a6'
LOG.info(
'The database is still under sqlalchemy-migrate control; '
'fake applying the initial alembic migration'
)
alembic_api.stamp(config, alembic_init_version)
def _find_alembic_conf():
"""Get the project's alembic configuration
:returns: An instance of ``alembic.config.Config``
"""
path = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
'alembic.ini',
)
config = alembic_config.Config(os.path.abspath(path))
# We don't want to use the logger configuration from the file, which is
# only really intended for the CLI
# https://stackoverflow.com/a/42691781/613428
config.attributes['configure_logger'] = False
return config
def _upgrade_alembic(engine, config, version):
# re-use the connection rather than creating a new one
with engine.begin() as connection:
config.attributes['connection'] = connection
_migrate_legacy_database(engine, connection, config)
alembic_api.upgrade(config, version or 'head')
def db_sync(version=None, engine=None):
"""Migrate the database to `version` or the most recent version."""
# If the user requested a specific version, check if it's an integer: if
# so, we're almost certainly in sqlalchemy-migrate land and won't support
# that
if version is not None and version.isdigit():
raise ValueError(
'You requested an sqlalchemy-migrate database version; this is '
'no longer supported'
)
if engine is None:
engine = db_api.get_engine()
current_db_version = migration.db_version(
engine,
MIGRATE_REPO_PATH,
init_version,
)
config = _find_alembic_conf()
if version and int(version) < current_db_version:
msg = _('Database schema downgrade is not allowed.')
raise exception.InvalidInput(reason=msg)
# Discard the URL encoded in alembic.ini in favour of the URL configured
# for the engine by the database fixtures, casting from
# 'sqlalchemy.engine.url.URL' to str in the process. This returns a
# RFC-1738 quoted URL, which means that a password like "foo@" will be
# turned into "foo%40". This in turns causes a problem for
# set_main_option() because that uses ConfigParser.set, which (by design)
# uses *python* interpolation to write the string out ... where "%" is the
# special python interpolation character! Avoid this mismatch by quoting
# all %'s for the set below.
engine_url = str(engine.url).replace('%', '%%')
config.set_main_option('sqlalchemy.url', str(engine_url))
if version and int(version) > db.MAX_INT:
message = _(
'Version should be less than or equal to %(max_version)d.'
) % {'max_version': db.MAX_INT}
raise exception.InvalidInput(reason=message)
# First upgrade ourselves, followed by Taskflow
LOG.info('Applying migration(s)')
_upgrade_alembic(engine, config, version)
try:
return migration.db_sync(
engine=engine,
abs_path=MIGRATE_REPO_PATH,
version=version,
init_version=init_version,
)
except oslo_exception.DBMigrationError as exc:
raise exception.InvalidInput(reason=exc)
# Get the taskflow driver configured, default is 'taskflow_driver',
# to load persistence tables to store progress details.
taskflow_driver = driver.load_masakari_driver(CONF.notification_driver)
if CONF.taskflow.connection:
taskflow_driver.upgrade_backend(CONF.taskflow.connection)
LOG.info('Migration(s) applied')
def db_version():
"""Get database version."""
engine = db_api.get_engine()
with engine.connect() as connection:
m_context = alembic_migration.MigrationContext.configure(connection)
return m_context.get_current_revision()

View File

@ -24,7 +24,7 @@ config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
if config.attributes.get('configure_logger', True):
fileConfig(config.config_file_name)
# this is the MetaData object for the various models in the database
@ -49,6 +49,8 @@ def run_migrations_offline() -> None:
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
# use a unique version table name to avoid conflicts with taskflow
version_table='masakari_alembic_version',
)
with context.begin_transaction():
@ -61,16 +63,40 @@ def run_migrations_online() -> None:
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
This is modified from the default based on the below, since we want to
share an engine when unit testing so in-memory database testing actually
works.
with connectable.connect() as connection:
https://alembic.sqlalchemy.org/en/latest/cookbook.html#connection-sharing
"""
connectable = config.attributes.get('connection', None)
if connectable is None:
# only create Engine if we don't have a Connection from the outside
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=True,
# use a unique version table name to avoid conflicts with
# taskflow
version_table='masakari_alembic_version',
)
with context.begin_transaction():
context.run_migrations()
else:
context.configure(
connection=connection, target_metadata=target_metadata
connection=connectable,
target_metadata=target_metadata,
render_as_batch=True,
# use a unique version table name to avoid conflicts with taskflow
version_table='masakari_alembic_version',
)
with context.begin_transaction():

View File

@ -12,268 +12,114 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
"""Tests for database migrations."""
from migrate.versioning import api as versioning_api
from migrate.versioning import repository
from alembic import command as alembic_api
from alembic import script as alembic_script
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_fixtures
from oslo_db.sqlalchemy import test_migrations
from oslo_db.sqlalchemy import utils as oslodbutils
from oslotest import base as test_base
import sqlalchemy
from sqlalchemy.engine import reflection
import sqlalchemy.exc
import masakari.conf
from masakari.db.sqlalchemy import migrate_repo
from masakari.db.sqlalchemy import migration as sa_migration
from masakari.db.sqlalchemy import models
from masakari.tests import fixtures as masakari_fixtures
from masakari.db.sqlalchemy import migration
CONF = masakari.conf.CONF
class MasakariMigrationsCheckers(test_migrations.WalkVersionsMixin):
"""Test sqlalchemy-migrate migrations."""
TIMEOUT_SCALING_FACTOR = 2
@property
def INIT_VERSION(self):
return sa_migration.INIT_VERSION
@property
def REPOSITORY(self):
return repository.Repository(
os.path.abspath(os.path.dirname(migrate_repo.__file__)))
@property
def migration_api(self):
return versioning_api
@property
def migrate_engine(self):
return self.engine
def setUp(self):
super(MasakariMigrationsCheckers, self).setUp()
migrate_log = logging.getLogger('migrate')
old_level = migrate_log.level
migrate_log.setLevel(logging.WARN)
self.addCleanup(migrate_log.setLevel, old_level)
self.useFixture(masakari_fixtures.Timeout(
os.environ.get('OS_TEST_TIMEOUT', 0),
self.TIMEOUT_SCALING_FACTOR))
self.engine = enginefacade.writer.get_engine()
CONF.set_override('connection', str(self.migrate_engine.url),
group='taskflow')
def assertColumnExists(self, engine, table_name, column):
self.assertTrue(oslodbutils.column_exists(engine, table_name, column),
'Column %s.%s does not exist' % (table_name, column))
def assertColumnNotExists(self, engine, table_name, column):
self.assertFalse(oslodbutils.column_exists(engine, table_name, column),
'Column %s.%s should not exist' % (table_name, column))
def assertTableNotExists(self, engine, table):
self.assertRaises(sqlalchemy.exc.NoSuchTableError,
oslodbutils.get_table, engine, table)
def assertIndexExists(self, engine, table_name, index):
self.assertTrue(oslodbutils.index_exists(engine, table_name, index),
'Index %s on table %s does not exist' %
(index, table_name))
def assertIndexNotExists(self, engine, table_name, index):
self.assertFalse(oslodbutils.index_exists(engine, table_name, index),
'Index %s on table %s should not exist' %
(index, table_name))
def assertIndexMembers(self, engine, table, index, members):
self.assertIndexExists(engine, table, index)
t = oslodbutils.get_table(engine, table)
index_columns = None
for idx in t.indexes:
if idx.name == index:
index_columns = [c.name for c in idx.columns]
break
self.assertEqual(members, index_columns)
def include_object(self, object_, name, type_, reflected, compare_to):
if type_ == 'table':
# migrate_version is a sqlalchemy-migrate control table and
# isn't included in the model. shadow_* are generated from
# the model and have their own tests to ensure they don't
# drift.
if name == 'migrate_version' or name.startswith('shadow_'):
return False
return True
# Implementations for ModelsMigrationsSync
def db_sync(self, engine):
sa_migration.db_sync(engine=self.migrate_engine)
def get_engine(self, context=None):
return self.migrate_engine
def get_metadata(self):
return models.BASE.metadata
def migrate_up(self, version, with_data=False):
banned = None
if with_data:
check = getattr(self, "_check_%03d" % version, None)
self.assertIsNotNone(check, ('DB Migration %i does not have a '
'test. Please add one!') % version)
with masakari_fixtures.BannedDBSchemaOperations(banned):
super(MasakariMigrationsCheckers, self).migrate_up(version,
with_data)
def test_walk_versions(self):
self.walk_versions(snake_walk=False, downgrade=False)
def _check_001(self, engine, data):
self.assertColumnExists(engine, 'failover_segments', 'uuid')
self.assertColumnExists(engine, 'failover_segments', 'name')
self.assertColumnExists(engine, 'failover_segments', 'service_type')
self.assertColumnExists(engine, 'failover_segments', 'description')
self.assertColumnExists(engine, 'failover_segments',
'recovery_method')
self.assertIndexMembers(engine, 'failover_segments',
'segments_service_type_idx', ['service_type'])
def _check_002(self, engine, data):
self.assertColumnExists(engine, 'hosts', 'uuid')
self.assertColumnExists(engine, 'hosts', 'name')
self.assertColumnExists(engine, 'hosts', 'reserved')
self.assertColumnExists(engine, 'hosts', 'type')
self.assertColumnExists(engine, 'hosts', 'control_attributes')
self.assertColumnExists(engine, 'hosts', 'failover_segment_id')
self.assertColumnExists(engine, 'hosts', 'on_maintenance')
self.assertColumnExists(engine, 'hosts', 'type')
self.assertIndexMembers(engine, 'hosts', 'hosts_type_idx', ['type'])
def _check_003(self, engine, data):
inspector = reflection.Inspector.from_engine(engine)
constraints = inspector.get_unique_constraints('hosts')
constraint_names = [constraint['name'] for constraint in constraints]
self.assertIn('uniq_host0name0deleted',
constraint_names)
def _check_004(self, engine, data):
self.assertColumnExists(engine, 'notifications', 'notification_uuid')
self.assertColumnExists(engine, 'notifications', 'generated_time')
self.assertColumnExists(engine, 'notifications', 'source_host_uuid')
self.assertColumnExists(engine, 'notifications', 'type')
self.assertColumnExists(engine, 'notifications', 'payload')
self.assertColumnExists(engine, 'notifications', 'status')
def _check_005(self, engine, data):
failover_segments = oslodbutils.get_table(engine, 'failover_segments')
hosts = oslodbutils.get_table(engine, 'hosts')
for table in [failover_segments, hosts]:
self.assertTrue(table.c.created_at.nullable)
def _check_006(self, engine, data):
self.assertColumnExists(engine, 'logbooks', 'created_at')
self.assertColumnExists(engine, 'logbooks', 'updated_at')
self.assertColumnExists(engine, 'logbooks', 'meta')
self.assertColumnExists(engine, 'logbooks', 'name')
self.assertColumnExists(engine, 'logbooks', 'uuid')
self.assertColumnExists(engine, 'flowdetails', 'created_at')
self.assertColumnExists(engine, 'flowdetails', 'updated_at')
self.assertColumnExists(engine, 'flowdetails', 'parent_uuid')
self.assertColumnExists(engine, 'flowdetails', 'meta')
self.assertColumnExists(engine, 'flowdetails', 'name')
self.assertColumnExists(engine, 'flowdetails', 'state')
self.assertColumnExists(engine, 'flowdetails', 'uuid')
self.assertColumnExists(engine, 'atomdetails', 'created_at')
self.assertColumnExists(engine, 'atomdetails', 'updated_at')
self.assertColumnExists(engine, 'atomdetails', 'parent_uuid')
self.assertColumnExists(engine, 'atomdetails', 'meta')
self.assertColumnExists(engine, 'atomdetails', 'name')
self.assertColumnExists(engine, 'atomdetails', 'results')
self.assertColumnExists(engine, 'atomdetails', 'version')
self.assertColumnExists(engine, 'atomdetails', 'state')
self.assertColumnExists(engine, 'atomdetails', 'uuid')
self.assertColumnExists(engine, 'atomdetails', 'failure')
self.assertColumnExists(engine, 'atomdetails', 'atom_type')
self.assertColumnExists(engine, 'atomdetails', 'intention')
self.assertColumnExists(engine, 'atomdetails', 'revert_results')
self.assertColumnExists(engine, 'atomdetails', 'revert_failure')
def _check_007(self, engine, data):
self.assertColumnExists(engine, 'failover_segments', 'enabled')
def _check_008(self, engine, data):
self.assertColumnExists(engine, 'vmoves', 'uuid')
self.assertColumnExists(engine, 'vmoves', 'notification_uuid')
self.assertColumnExists(engine, 'vmoves', 'instance_uuid')
self.assertColumnExists(engine, 'vmoves', 'instance_name')
self.assertColumnExists(engine, 'vmoves', 'source_host')
self.assertColumnExists(engine, 'vmoves', 'dest_host')
self.assertColumnExists(engine, 'vmoves', 'start_time')
self.assertColumnExists(engine, 'vmoves', 'end_time')
self.assertColumnExists(engine, 'vmoves', 'type')
self.assertColumnExists(engine, 'vmoves', 'status')
self.assertColumnExists(engine, 'vmoves', 'message')
class TestMasakariMigrationsSQLite(
MasakariMigrationsCheckers,
class DatabaseSanityChecks(
test_fixtures.OpportunisticDBTestMixin,
test_base.BaseTestCase,
):
def setUp(self):
super().setUp()
self.engine = enginefacade.writer.get_engine()
self.config = migration._find_alembic_conf()
def _check_006(self, engine, data):
# NOTE(ShilpaSD): DB script '006_add_persistence_tables.py' adds db
# tables required for taskflow which doesn't support Sqlite using
# alembic migration.
pass
def test_single_base_revision(self):
"""Ensure we only have a single base revision.
There's no good reason for us to have diverging history, so validate
that only one base revision exists. This will prevent simple errors
where people forget to specify the base revision. If this fail for your
change, look for migrations that do not have a 'revises' line in them.
"""
script = alembic_script.ScriptDirectory.from_config(self.config)
self.assertEqual(1, len(script.get_bases()))
def test_single_head_revision(self):
"""Ensure we only have a single head revision.
There's no good reason for us to have diverging history, so validate
that only one head revision exists. This will prevent merge conflicts
adding additional head revision points. If this fail for your change,
look for migrations with the same 'revises' line in them.
"""
script = alembic_script.ScriptDirectory.from_config(self.config)
self.assertEqual(1, len(script.get_heads()))
class TestMasakariMigrationsMySQL(
MasakariMigrationsCheckers,
class MigrationsWalk(
test_fixtures.OpportunisticDBTestMixin,
test_base.BaseTestCase,
):
# Migrations can take a long time, particularly on underpowered CI nodes.
# Give them some breathing room.
TIMEOUT_SCALING_FACTOR = 4
def setUp(self):
super().setUp()
self.engine = enginefacade.writer.get_engine()
self.config = migration._find_alembic_conf()
self.init_versions = {'8f848eb45d03', '8bdf5929c5a6'}
def _migrate_up(self, revision, connection):
check_method = getattr(self, f'_check_{revision}', None)
# no tests for the initial revisions
if revision not in self.init_versions:
self.assertIsNotNone(
check_method,
f"DB Migration {revision} doesn't have a test; add one"
)
pre_upgrade = getattr(self, f'_pre_upgrade_{revision}', None)
if pre_upgrade:
pre_upgrade(connection)
alembic_api.upgrade(self.config, revision)
if check_method:
check_method(connection)
def test_walk_versions(self):
with self.engine.begin() as connection:
self.config.attributes['connection'] = connection
script = alembic_script.ScriptDirectory.from_config(self.config)
revisions = list(script.walk_revisions())
# Need revisions from older to newer so the walk works as intended
revisions.reverse()
for revision_script in revisions:
self._migrate_up(revision_script.revision, connection)
class TestMigrationsWalkSQLite(
MigrationsWalk,
test_fixtures.OpportunisticDBTestMixin,
test_base.BaseTestCase,
):
pass
class TestMigrationsWalkMySQL(
MigrationsWalk,
test_fixtures.OpportunisticDBTestMixin,
test_base.BaseTestCase,
):
FIXTURE = test_fixtures.MySQLOpportunisticFixture
def test_innodb_tables(self):
sa_migration.db_sync(engine=self.migrate_engine)
total = self.migrate_engine.execute(
"SELECT count(*) "
"FROM information_schema.TABLES "
"WHERE TABLE_SCHEMA = '%(database)s'" %
{'database': self.migrate_engine.url.database})
self.assertGreater(total.scalar(), 0, "No tables found. Wrong schema?")
noninnodb = self.migrate_engine.execute(
"SELECT count(*) "
"FROM information_schema.TABLES "
"WHERE TABLE_SCHEMA='%(database)s' "
"AND ENGINE != 'InnoDB' "
"AND TABLE_NAME != 'migrate_version'" %
{'database': self.migrate_engine.url.database})
count = noninnodb.scalar()
self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
class TestMasakariMigrationsPostgreSQL(
MasakariMigrationsCheckers,
class TestMigrationsWalkPostgreSQL(
MigrationsWalk,
test_fixtures.OpportunisticDBTestMixin,
test_base.BaseTestCase,
):

View File

@ -159,6 +159,6 @@ class PurgeDeletedTest(test.TestCase):
hosts_rows = self._count(self.hosts)
# Verify that we have deleted 3 rows only
self.assertEqual(4, notifications_rows)
self.assertEqual(5, hosts_rows)
self.assertEqual(5, notifications_rows)
self.assertEqual(4, hosts_rows)
self.assertEqual(6, failover_segments_rows)

View File

@ -0,0 +1,22 @@
---
upgrade:
- |
The database migration engine has changed from `sqlalchemy-migrate`__ to
`alembic`__. For most deployments, this should have minimal to no impact
and the switch should be mostly transparent. The main user-facing impact is
the change in schema versioning. While sqlalchemy-migrate used a linear,
integer-based versioning scheme, which required placeholder migrations to
allow for potential migration backports, alembic uses a distributed version
control-like schema where a migration's ancestor is encoded in the file and
branches are possible. The alembic migration files therefore use a
arbitrary UUID-like naming scheme and the ``masakari-manage db_sync``
command now expects such an version when manually specifying the version
that should be applied. For example::
$ masakari-manage db sync c6214ca60943
Attempting to specify an sqlalchemy-migrate-based version will result in an
error.
.. __: https://sqlalchemy-migrate.readthedocs.io/en/latest/
.. __: https://alembic.sqlalchemy.org/en/latest/