Opportunistic migration tests

Migrations should be tested with real database backends. For this goal
intended number of base test cases which used in current implementation.
Previously there were two ways to run migration tests: using
opportunistic test cases (default way we've been using on CI) and
by using database connection strings, specified in test_migrations.conf,
for every particular database test case. For the sake of simplicity and
consistency we are moving to using of opportunistic db test cases here.

With this change we are free from locking, so we don't need `lockfile`
anymore.

Closes-Bug: #1327397
Closes-Bug: #1328997
Change-Id: I92b1dcd830c4755f429a0f6529911e607c2c7de7
This commit is contained in:
Ilya Pekelny 2014-05-13 14:44:54 +03:00
parent 44bd28704c
commit c34c32e09e
6 changed files with 423 additions and 465 deletions

View File

@ -1,7 +0,0 @@
[DEFAULT]
# Set up any number of migration data stores you want, one
# The "name" used in the test is the config variable key.
#sqlite=sqlite:///test_migrations.db
sqlite=sqlite://
#mysql=mysql://root:@localhost/test_migrations
#postgresql=postgresql://user:pass@localhost/test_migrations

View File

@ -15,21 +15,14 @@
# under the License.
import abc
import functools
import logging
import os
import pprint
import subprocess
import alembic
import alembic.autogenerate
import alembic.migration
import lockfile
from oslotest import base as test_base
import pkg_resources as pkg
import six
from six import moves
from six.moves.urllib import parse
import sqlalchemy
from sqlalchemy.engine import reflection
import sqlalchemy.exc
@ -37,171 +30,133 @@ from sqlalchemy import schema
import sqlalchemy.sql.expression as expr
import sqlalchemy.types as types
from oslo.db import exception as exc
from oslo.db.openstack.common.gettextutils import _LE
from oslo.db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
def _have_mysql(user, passwd, database):
present = os.environ.get('TEST_MYSQL_PRESENT')
if present is None:
return utils.is_backend_avail(backend='mysql',
user=user,
passwd=passwd,
database=database)
return present.lower() in ('', 'true')
def _have_postgresql(user, passwd, database):
present = os.environ.get('TEST_POSTGRESQL_PRESENT')
if present is None:
return utils.is_backend_avail(backend='postgres',
user=user,
passwd=passwd,
database=database)
return present.lower() in ('', 'true')
def _set_db_lock(lock_path=None, lock_prefix=None):
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
path = lock_path or os.environ.get("OSLO_LOCK_PATH")
lock = lockfile.FileLock(os.path.join(path, lock_prefix))
with lock:
LOG.debug('Got lock "%s"', f.__name__)
return f(*args, **kwargs)
finally:
LOG.debug('Lock released "%s"', f.__name__)
return wrapper
return decorator
class BaseMigrationTestCase(test_base.BaseTestCase):
"""Base class fort testing of migration utils."""
def __init__(self, *args, **kwargs):
super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
'test_migrations.conf')
# Test machines can set the TEST_MIGRATIONS_CONF variable
# to override the location of the config file for migration testing
self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF',
self.DEFAULT_CONFIG_FILE)
self.test_databases = {}
self.migration_api = None
def setUp(self):
super(BaseMigrationTestCase, self).setUp()
# Load test databases from the config file. Only do this
# once. No need to re-run this on each test...
LOG.debug('config_path is %s', self.CONFIG_FILE_PATH)
if os.path.exists(self.CONFIG_FILE_PATH):
cp = moves.configparser.RawConfigParser()
try:
cp.read(self.CONFIG_FILE_PATH)
defaults = cp.defaults()
for key, value in defaults.items():
self.test_databases[key] = value
except moves.configparser.ParsingError as e:
self.fail("Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:
self.fail("Failed to find test_migrations.conf config "
"file.")
self.engines = {}
for key, value in self.test_databases.items():
self.engines[key] = sqlalchemy.create_engine(value)
# We start each test case with a completely blank slate.
self._reset_databases()
def tearDown(self):
# We destroy the test data store between each test case,
# and recreate it, which ensures that we have no side-effects
# from the tests
self._reset_databases()
super(BaseMigrationTestCase, self).tearDown()
def execute_cmd(self, cmd=None):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = process.communicate()[0]
LOG.debug(output)
self.assertEqual(0, process.returncode,
"Failed to run: %s\n%s" % (cmd, output))
def _reset_pg(self, conn_pieces):
(user,
password,
database,
host) = utils.get_db_connection_info(conn_pieces)
os.environ['PGPASSWORD'] = password
os.environ['PGUSER'] = user
# note(boris-42): We must create and drop database, we can't
# drop database which we have connected to, so for such
# operations there is a special database template1.
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
" '%(sql)s' -d template1")
sql = ("drop database if exists %s;") % database
droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(droptable)
sql = ("create database %s;") % database
createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(createtable)
os.unsetenv('PGPASSWORD')
os.unsetenv('PGUSER')
@_set_db_lock(lock_prefix='migration_tests-')
def _reset_databases(self):
for key, engine in self.engines.items():
conn_string = self.test_databases[key]
conn_pieces = parse.urlparse(conn_string)
engine.dispose()
if conn_string.startswith('sqlite'):
# We can just delete the SQLite database, which is
# the easiest and cleanest solution
db_path = conn_pieces.path.strip('/')
if os.path.exists(db_path):
os.unlink(db_path)
# No need to recreate the SQLite DB. SQLite will
# create it for us if it's not there...
elif conn_string.startswith('mysql'):
# We can execute the MySQL client to destroy and re-create
# the MYSQL database, which is easier and less error-prone
# than using SQLAlchemy to do this via MetaData...trust me.
(user, password, database, host) = \
utils.get_db_connection_info(conn_pieces)
sql = ("drop database if exists %(db)s; "
"create database %(db)s;") % {'db': database}
cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s "
"-e \"%(sql)s\"") % {'user': user, 'password': password,
'host': host, 'sql': sql}
self.execute_cmd(cmd)
elif conn_string.startswith('postgresql'):
self._reset_pg(conn_pieces)
@six.add_metaclass(abc.ABCMeta)
class WalkVersionsMixin(object):
def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
# Determine latest version script from the repo, then
# upgrade from 1 through to the latest, with no data
# in the databases. This just checks that the schema itself
# upgrades successfully.
"""Test mixin to check upgrade and downgrade ability of migration.
This is only suitable for testing of migrate_ migration scripts. An
abstract class mixin. `INIT_VERSION`, `REPOSITORY` and `migration_api`
attributes must be implemented in subclasses.
.. _auxiliary-dynamic-methods:
Auxiliary methods
-----------------
`_migrate_up` and `_migrate_down` instance methods of the class can be
used with auxiliary methods named `_pre_upgrade_<revision_id>`,
`_check_<revision_id>`, `_post_downgrade_<revision_id>`. The methods
intended to check applied changes for correctness of data operations.
This methods should be implemented for every particular revision
which you want to check with data. Implementation recommendations for
`_pre_upgrade_<revision_id>`, `_check_<revision_id>`,
`_post_downgrade_<revision_id>` implementation:
* `_pre_upgrade_<revision_id>`: provide a data appropriate to a
next revision. Should be used an id of revision which going to be
applied.
* `_check_<revision_id>`: Insert, select, delete operations with
newly applied changes. The data provided by
`_pre_upgrade_<revision_id>` will be used.
*`_post_downgrade_<revision_id>`: check for absence (inability to
use) changes provided by reverted revision.
Execution order of auxiliary methods when revision is upgrading:
`_pre_upgrade_###` => `upgrade` => `_check_###`
Execution order of auxiliary methods when revision is downgrading:
`downgrade` => `_post_downgrade_###`
.. _migrate: https://sqlalchemy-migrate.readthedocs.org/en/latest/
"""
@abc.abstractproperty
def INIT_VERSION(self):
"""Initial version of a migration repository.
Can be different from 0, if a migrations were squashed.
:rtype: int
"""
pass
@abc.abstractproperty
def REPOSITORY(self):
"""Allows basic manipulation with migration repository.
:returns: `migrate.versioning.repository.Repository` subclass.
"""
pass
@abc.abstractproperty
def migration_api(self):
"""Provides API for upgrading, downgrading and version manipulations.
:returns: `migrate.api` or overloaded analog.
"""
pass
@abc.abstractproperty
def migrate_engine(self):
"""Provides engine instance.
Should be the same instance as used when migrations are applied. In
most cases, the `engine` attribute provided by the test class in a
`setUp` method will work.
Example of implementation:
def migrate_engine(self):
return self.engine
:returns: sqlalchemy engine instance
"""
pass
def _walk_versions(self, snake_walk=False, downgrade=True):
"""Check if migration upgrades and downgrades successfully.
Determine the latest version script from the repo, then
upgrade from 1 through to the latest, with no data
in the databases. This just checks that the schema itself
upgrades successfully.
`_walk_versions` calls `_migrate_up` and `_migrate_down` with
`with_data` argument to check changes with data, but these methods
can be called without any extra check outside of `_walk_versions`
method.
:param snake_walk: enables checking that each individual migration can
be upgraded/downgraded by itself.
If we have ordered migrations 123abc, 456def, 789ghi and we run
upgrading with the `snake_walk` argument set to `True`, the
migrations will be applied in the following order:
`123abc => 456def => 123abc =>
456def => 789ghi => 456def => 789ghi`
:type snake_walk: bool
:param downgrade: Check downgrade behavior if True.
:type downgrade: bool
"""
# Place the database under version control
self.migration_api.version_control(engine, self.REPOSITORY,
self.migration_api.version_control(self.migrate_engine,
self.REPOSITORY,
self.INIT_VERSION)
self.assertEqual(self.INIT_VERSION,
self.migration_api.db_version(engine,
self.migration_api.db_version(self.migrate_engine,
self.REPOSITORY))
LOG.debug('latest version is %s', self.REPOSITORY.latest)
@ -209,34 +164,44 @@ class WalkVersionsMixin(object):
for version in versions:
# upgrade -> downgrade -> upgrade
self._migrate_up(engine, version, with_data=True)
self._migrate_up(version, with_data=True)
if snake_walk:
downgraded = self._migrate_down(
engine, version - 1, with_data=True)
downgraded = self._migrate_down(version - 1, with_data=True)
if downgraded:
self._migrate_up(engine, version)
self._migrate_up(version)
if downgrade:
# Now walk it back down to 0 from the latest, testing
# the downgrade paths.
for version in reversed(versions):
# downgrade -> upgrade -> downgrade
downgraded = self._migrate_down(engine, version - 1)
downgraded = self._migrate_down(version - 1)
if snake_walk and downgraded:
self._migrate_up(engine, version)
self._migrate_down(engine, version - 1)
self._migrate_up(version)
self._migrate_down(version - 1)
def _migrate_down(self, version, with_data=False):
"""Migrate down to a previous version of the db.
:param version: id of revision to downgrade.
:type version: str
:keyword with_data: Whether to verify the absence of changes from
migration(s) being downgraded, see
:ref:`auxiliary-dynamic-methods`.
:type with_data: Bool
"""
def _migrate_down(self, engine, version, with_data=False):
try:
self.migration_api.downgrade(engine, self.REPOSITORY, version)
self.migration_api.downgrade(self.migrate_engine,
self.REPOSITORY, version)
except NotImplementedError:
# NOTE(sirp): some migrations, namely release-level
# migrations, don't support a downgrade.
return False
self.assertEqual(
version, self.migration_api.db_version(engine, self.REPOSITORY))
self.assertEqual(version, self.migration_api.db_version(
self.migrate_engine, self.REPOSITORY))
# NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target'
# version). So if we have any downgrade checks, they need to be run for
@ -245,16 +210,18 @@ class WalkVersionsMixin(object):
post_downgrade = getattr(
self, "_post_downgrade_%03d" % (version + 1), None)
if post_downgrade:
post_downgrade(engine)
post_downgrade(self.migrate_engine)
return True
def _migrate_up(self, engine, version, with_data=False):
"""migrate up to a new version of the db.
def _migrate_up(self, version, with_data=False):
"""Migrate up to a new version of the db.
We allow for data insertion and post checks at every
migration version with special _pre_upgrade_### and
_check_### functions in the main test.
:param version: id of revision to upgrade.
:type version: str
:keyword with_data: Whether to verify the applied changes with data,
see :ref:`auxiliary-dynamic-methods`.
:type with_data: Bool
"""
# NOTE(sdague): try block is here because it's impossible to debug
# where a failed data migration happens otherwise
@ -264,20 +231,20 @@ class WalkVersionsMixin(object):
pre_upgrade = getattr(
self, "_pre_upgrade_%03d" % version, None)
if pre_upgrade:
data = pre_upgrade(engine)
data = pre_upgrade(self.migrate_engine)
self.migration_api.upgrade(engine, self.REPOSITORY, version)
self.migration_api.upgrade(self.migrate_engine,
self.REPOSITORY, version)
self.assertEqual(version,
self.migration_api.db_version(engine,
self.migration_api.db_version(self.migrate_engine,
self.REPOSITORY))
if with_data:
check = getattr(self, "_check_%03d" % version, None)
if check:
check(engine, data)
except Exception:
LOG.error(_LE("Failed to migrate to version %(version)s on "
"engine %(engine)s"), {'version': version,
'engine': engine})
check(self.migrate_engine, data)
except exc.DbMigrationError:
LOG.error(_LE("Failed to migrate to version %s on engine %s") %
(version, self.migrate_engine))
raise

View File

@ -1,7 +1,6 @@
alembic>=0.4.1
Babel>=1.3
iso8601>=0.1.9
lockfile>=0.8
oslo.config>=1.2.1
SQLAlchemy>=0.7.8,!=0.9.5,<=0.9.99
sqlalchemy-migrate>=0.9.1

View File

@ -14,34 +14,51 @@
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import mock
from oslotest import base as test
import six
import sqlalchemy as sa
import sqlalchemy.ext.declarative as sa_decl
from oslo.db import exception as exc
from oslo.db.sqlalchemy import test_base
from oslo.db.sqlalchemy import test_migrations as migrate
class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
def setUp(self):
super(TestWalkVersions, self).setUp()
self.migration_api = mock.MagicMock()
self.engine = mock.MagicMock()
self.REPOSITORY = mock.MagicMock()
self.INIT_VERSION = 4
migration_api = mock.MagicMock()
REPOSITORY = mock.MagicMock()
engine = mock.MagicMock()
INIT_VERSION = 4
@property
def migrate_engine(self):
return self.engine
def test_migrate_up(self):
self.migration_api.db_version.return_value = 141
self._migrate_up(self.engine, 141)
self._migrate_up(141)
self.migration_api.upgrade.assert_called_with(
self.engine, self.REPOSITORY, 141)
self.migration_api.db_version.assert_called_with(
self.engine, self.REPOSITORY)
def test_migrate_up_fail(self):
version = 141
self.migration_api.db_version.return_value = version
expected_output = (u"Failed to migrate to version %(version)s on "
"engine %(engine)s\n" %
{'version': version, 'engine': self.engine})
with mock.patch.object(self.migration_api,
'upgrade', side_effect=exc.DbMigrationError):
log = self.useFixture(fixtures.FakeLogger())
self.assertRaises(exc.DbMigrationError, self._migrate_up, version)
self.assertEqual(expected_output, log.output)
def test_migrate_up_with_data(self):
test_value = {"a": 1, "b": 2}
self.migration_api.db_version.return_value = 141
@ -49,7 +66,7 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
self._pre_upgrade_141.return_value = test_value
self._check_141 = mock.MagicMock()
self._migrate_up(self.engine, 141, True)
self._migrate_up(141, True)
self._pre_upgrade_141.assert_called_with(self.engine)
self._check_141.assert_called_with(self.engine, test_value)
@ -57,19 +74,20 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
def test_migrate_down(self):
self.migration_api.db_version.return_value = 42
self.assertTrue(self._migrate_down(self.engine, 42))
self.assertTrue(self._migrate_down(42))
self.migration_api.db_version.assert_called_with(
self.engine, self.REPOSITORY)
def test_migrate_down_not_implemented(self):
self.migration_api.downgrade.side_effect = NotImplementedError
self.assertFalse(self._migrate_down(self.engine, 42))
with mock.patch.object(self.migration_api,
'downgrade', side_effect=NotImplementedError):
self.assertFalse(self._migrate_down(self.engine, 42))
def test_migrate_down_with_data(self):
self._post_downgrade_043 = mock.MagicMock()
self.migration_api.db_version.return_value = 42
self._migrate_down(self.engine, 42, True)
self._migrate_down(42, True)
self._post_downgrade_043.assert_called_with(self.engine)
@ -82,15 +100,16 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
self._walk_versions()
self.migration_api.version_control.assert_called_with(
None, self.REPOSITORY, self.INIT_VERSION)
self.engine, self.REPOSITORY, self.INIT_VERSION)
self.migration_api.db_version.assert_called_with(
None, self.REPOSITORY)
self.engine, self.REPOSITORY)
versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
upgraded = [mock.call(None, v, with_data=True) for v in versions]
upgraded = [mock.call(v, with_data=True)
for v in versions]
self.assertEqual(self._migrate_up.call_args_list, upgraded)
downgraded = [mock.call(None, v - 1) for v in reversed(versions)]
downgraded = [mock.call(v - 1) for v in reversed(versions)]
self.assertEqual(self._migrate_down.call_args_list, downgraded)
@mock.patch.object(migrate.WalkVersionsMixin, '_migrate_up')
@ -99,25 +118,21 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
self.REPOSITORY.latest = 20
self.migration_api.db_version.return_value = self.INIT_VERSION
self._walk_versions(self.engine, snake_walk=True, downgrade=True)
self._walk_versions(snake_walk=True, downgrade=True)
versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
upgraded = []
for v in versions:
upgraded.append(mock.call(self.engine, v, with_data=True))
upgraded.append(mock.call(self.engine, v))
upgraded.extend(
[mock.call(self.engine, v) for v in reversed(versions)]
)
upgraded.append(mock.call(v, with_data=True))
upgraded.append(mock.call(v))
upgraded.extend([mock.call(v) for v in reversed(versions)])
self.assertEqual(upgraded, self._migrate_up.call_args_list)
downgraded_1 = [
mock.call(self.engine, v - 1, with_data=True) for v in versions
]
downgraded_1 = [mock.call(v - 1, with_data=True) for v in versions]
downgraded_2 = []
for v in reversed(versions):
downgraded_2.append(mock.call(self.engine, v - 1))
downgraded_2.append(mock.call(self.engine, v - 1))
downgraded_2.append(mock.call(v - 1))
downgraded_2.append(mock.call(v - 1))
downgraded = downgraded_1 + downgraded_2
self.assertEqual(self._migrate_down.call_args_list, downgraded)
@ -127,19 +142,17 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
self.REPOSITORY.latest = 20
self.migration_api.db_version.return_value = self.INIT_VERSION
self._walk_versions(self.engine, snake_walk=True, downgrade=False)
self._walk_versions(snake_walk=True, downgrade=False)
versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
upgraded = []
for v in versions:
upgraded.append(mock.call(self.engine, v, with_data=True))
upgraded.append(mock.call(self.engine, v))
upgraded.append(mock.call(v, with_data=True))
upgraded.append(mock.call(v))
self.assertEqual(upgraded, self._migrate_up.call_args_list)
downgraded = [
mock.call(self.engine, v - 1, with_data=True) for v in versions
]
downgraded = [mock.call(v - 1, with_data=True) for v in versions]
self.assertEqual(self._migrate_down.call_args_list, downgraded)
@mock.patch.object(migrate.WalkVersionsMixin, '_migrate_up')
@ -148,13 +161,11 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
self.REPOSITORY.latest = 20
self.migration_api.db_version.return_value = self.INIT_VERSION
self._walk_versions(self.engine, snake_walk=False, downgrade=False)
self._walk_versions(snake_walk=False, downgrade=False)
versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
upgraded = [
mock.call(self.engine, v, with_data=True) for v in versions
]
upgraded = [mock.call(v, with_data=True) for v in versions]
self.assertEqual(upgraded, self._migrate_up.call_args_list)

View File

@ -38,7 +38,6 @@ from oslo.db.openstack.common.fixture import moxstubout
from oslo.db.sqlalchemy import models
from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import test_base as db_test_base
from oslo.db.sqlalchemy import test_migrations
from oslo.db.sqlalchemy import utils
from tests import utils as test_utils
@ -180,9 +179,17 @@ class TestPaginateQuery(test_base.BaseTestCase):
marker=self.marker, sort_dirs=['asc', 'mixed'])
class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
class TestMigrationUtils(db_test_base.DbTestCase):
"""Class for testing utils that are used in db migrations."""
def setUp(self):
super(TestMigrationUtils, self).setUp()
self.meta = MetaData(bind=self.engine)
self.conn = self.engine.connect()
self.addCleanup(self.meta.drop_all)
self.addCleanup(self.conn.close)
def _populate_db_for_drop_duplicate_entries(self, engine, meta,
table_name):
values = [
@ -214,29 +221,26 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
def test_drop_old_duplicate_entries_from_table(self):
table_name = "__test_tmp_table__"
for engine in self.engines.values():
meta = MetaData()
meta.bind = engine
test_table, values = self._populate_db_for_drop_duplicate_entries(
engine, meta, table_name)
utils.drop_old_duplicate_entries_from_table(
engine, table_name, False, 'b', 'c')
test_table, values = self._populate_db_for_drop_duplicate_entries(
self.engine, self.meta, table_name)
utils.drop_old_duplicate_entries_from_table(
self.engine, table_name, False, 'b', 'c')
uniq_values = set()
expected_ids = []
for value in sorted(values, key=lambda x: x['id'], reverse=True):
uniq_value = (('b', value['b']), ('c', value['c']))
if uniq_value in uniq_values:
continue
uniq_values.add(uniq_value)
expected_ids.append(value['id'])
uniq_values = set()
expected_ids = []
for value in sorted(values, key=lambda x: x['id'], reverse=True):
uniq_value = (('b', value['b']), ('c', value['c']))
if uniq_value in uniq_values:
continue
uniq_values.add(uniq_value)
expected_ids.append(value['id'])
real_ids = [row[0] for row in
engine.execute(select([test_table.c.id])).fetchall()]
real_ids = [row[0] for row in
self.engine.execute(select([test_table.c.id])).fetchall()]
self.assertEqual(len(real_ids), len(expected_ids))
for id_ in expected_ids:
self.assertTrue(id_ in real_ids)
self.assertEqual(len(real_ids), len(expected_ids))
for id_ in expected_ids:
self.assertTrue(id_ in real_ids)
def test_drop_dup_entries_in_file_conn(self):
table_name = "__test_tmp_table__"
@ -253,110 +257,98 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
def test_drop_old_duplicate_entries_from_table_soft_delete(self):
table_name = "__test_tmp_table__"
for engine in self.engines.values():
meta = MetaData()
meta.bind = engine
table, values = self._populate_db_for_drop_duplicate_entries(
engine, meta, table_name)
utils.drop_old_duplicate_entries_from_table(engine, table_name,
True, 'b', 'c')
uniq_values = set()
expected_values = []
soft_deleted_values = []
table, values = self._populate_db_for_drop_duplicate_entries(
self.engine, self.meta, table_name)
utils.drop_old_duplicate_entries_from_table(self.engine, table_name,
True, 'b', 'c')
uniq_values = set()
expected_values = []
soft_deleted_values = []
for value in sorted(values, key=lambda x: x['id'], reverse=True):
uniq_value = (('b', value['b']), ('c', value['c']))
if uniq_value in uniq_values:
soft_deleted_values.append(value)
continue
uniq_values.add(uniq_value)
expected_values.append(value)
for value in sorted(values, key=lambda x: x['id'], reverse=True):
uniq_value = (('b', value['b']), ('c', value['c']))
if uniq_value in uniq_values:
soft_deleted_values.append(value)
continue
uniq_values.add(uniq_value)
expected_values.append(value)
base_select = table.select()
base_select = table.select()
rows_select = base_select.where(table.c.deleted != table.c.id)
row_ids = [row['id'] for row in
engine.execute(rows_select).fetchall()]
self.assertEqual(len(row_ids), len(expected_values))
for value in expected_values:
self.assertTrue(value['id'] in row_ids)
rows_select = base_select.where(table.c.deleted != table.c.id)
row_ids = [row['id'] for row in
self.engine.execute(rows_select).fetchall()]
self.assertEqual(len(row_ids), len(expected_values))
for value in expected_values:
self.assertTrue(value['id'] in row_ids)
deleted_rows_select = base_select.where(
table.c.deleted == table.c.id)
deleted_rows_ids = [row['id'] for row in
engine.execute(deleted_rows_select).fetchall()]
self.assertEqual(len(deleted_rows_ids),
len(values) - len(row_ids))
for value in soft_deleted_values:
self.assertTrue(value['id'] in deleted_rows_ids)
deleted_rows_select = base_select.where(
table.c.deleted == table.c.id)
deleted_rows_ids = [row['id'] for row in
self.engine.execute(
deleted_rows_select).fetchall()]
self.assertEqual(len(deleted_rows_ids),
len(values) - len(row_ids))
for value in soft_deleted_values:
self.assertTrue(value['id'] in deleted_rows_ids)
def test_change_deleted_column_type_does_not_drop_index(self):
table_name = 'abc'
for engine in self.engines.values():
meta = MetaData(bind=engine)
indexes = {
'idx_a_deleted': ['a', 'deleted'],
'idx_b_deleted': ['b', 'deleted'],
'idx_a': ['a']
}
indexes = {
'idx_a_deleted': ['a', 'deleted'],
'idx_b_deleted': ['b', 'deleted'],
'idx_a': ['a']
}
index_instances = [Index(name, *columns)
for name, columns in six.iteritems(indexes)]
index_instances = [Index(name, *columns)
for name, columns in six.iteritems(indexes)]
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', String(255)),
Column('b', String(255)),
Column('deleted', Boolean),
*index_instances)
table.create()
utils.change_deleted_column_type_to_id_type(engine, table_name)
utils.change_deleted_column_type_to_boolean(engine, table_name)
table = Table(table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('a', String(255)),
Column('b', String(255)),
Column('deleted', Boolean),
*index_instances)
table.create()
utils.change_deleted_column_type_to_id_type(self.engine, table_name)
utils.change_deleted_column_type_to_boolean(self.engine, table_name)
insp = reflection.Inspector.from_engine(engine)
real_indexes = insp.get_indexes(table_name)
self.assertEqual(len(real_indexes), 3)
for index in real_indexes:
name = index['name']
self.assertIn(name, indexes)
self.assertEqual(set(index['column_names']),
set(indexes[name]))
insp = reflection.Inspector.from_engine(self.engine)
real_indexes = insp.get_indexes(table_name)
self.assertEqual(len(real_indexes), 3)
for index in real_indexes:
name = index['name']
self.assertIn(name, indexes)
self.assertEqual(set(index['column_names']),
set(indexes[name]))
def test_change_deleted_column_type_to_id_type_integer(self):
table_name = 'abc'
for engine in self.engines.values():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('deleted', Boolean))
table.create()
utils.change_deleted_column_type_to_id_type(engine, table_name)
table = Table(table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('deleted', Boolean))
table.create()
utils.change_deleted_column_type_to_id_type(self.engine, table_name)
table = utils.get_table(engine, table_name)
self.assertTrue(isinstance(table.c.deleted.type, Integer))
table = utils.get_table(self.engine, table_name)
self.assertTrue(isinstance(table.c.deleted.type, Integer))
def test_change_deleted_column_type_to_id_type_string(self):
table_name = 'abc'
for engine in self.engines.values():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', String(255), primary_key=True),
Column('deleted', Boolean))
table.create()
utils.change_deleted_column_type_to_id_type(engine, table_name)
table = Table(table_name, self.meta,
Column('id', String(255), primary_key=True),
Column('deleted', Boolean))
table.create()
utils.change_deleted_column_type_to_id_type(self.engine, table_name)
table = utils.get_table(engine, table_name)
self.assertTrue(isinstance(table.c.deleted.type, String))
table = utils.get_table(self.engine, table_name)
self.assertTrue(isinstance(table.c.deleted.type, String))
@db_test_base.backend_specific('sqlite')
def test_change_deleted_column_type_to_id_type_custom(self):
table_name = 'abc'
engine = self.engines['sqlite']
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
table = Table(table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('foo', CustomType),
Column('deleted', Boolean))
@ -366,13 +358,13 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
if SA_VERSION < (0, 9, 0):
self.assertRaises(utils.ColumnError,
utils.change_deleted_column_type_to_id_type,
engine, table_name)
self.engine, table_name)
fooColumn = Column('foo', CustomType())
utils.change_deleted_column_type_to_id_type(engine, table_name,
utils.change_deleted_column_type_to_id_type(self.engine, table_name,
foo=fooColumn)
table = utils.get_table(engine, table_name)
table = utils.get_table(self.engine, table_name)
# NOTE(boris-42): There is no way to check has foo type CustomType.
# but sqlalchemy will set it to NullType. This has
# been fixed upstream in recent SA versions
@ -382,51 +374,51 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
def test_change_deleted_column_type_to_boolean(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('deleted', Integer))
table.create()
table = Table(table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('deleted', Integer))
table.create()
utils.change_deleted_column_type_to_boolean(engine, table_name)
utils.change_deleted_column_type_to_boolean(self.engine, table_name)
table = utils.get_table(engine, table_name)
expected_type = Boolean if key != "mysql" else mysql.TINYINT
self.assertTrue(isinstance(table.c.deleted.type, expected_type))
table = utils.get_table(self.engine, table_name)
if self.engine.name != "mysql":
expected_type = Boolean
else:
expected_type = mysql.TINYINT
self.assertTrue(isinstance(table.c.deleted.type, expected_type))
def test_change_deleted_column_type_to_boolean_with_fc(self):
table_name_1 = 'abc'
table_name_2 = 'bcd'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table_1 = Table(table_name_1, meta,
Column('id', Integer, primary_key=True),
Column('deleted', Integer))
table_1.create()
table_1 = Table(table_name_1, self.meta,
Column('id', Integer, primary_key=True),
Column('deleted', Integer))
table_1.create()
table_2 = Table(table_name_2, meta,
Column('id', Integer, primary_key=True),
Column('foreign_id', Integer,
ForeignKey('%s.id' % table_name_1)),
Column('deleted', Integer))
table_2.create()
table_2 = Table(table_name_2, self.meta,
Column('id', Integer, primary_key=True),
Column('foreign_id', Integer,
ForeignKey('%s.id' % table_name_1)),
Column('deleted', Integer))
table_2.create()
utils.change_deleted_column_type_to_boolean(engine, table_name_2)
utils.change_deleted_column_type_to_boolean(self.engine, table_name_2)
table = utils.get_table(engine, table_name_2)
expected_type = Boolean if key != "mysql" else mysql.TINYINT
self.assertTrue(isinstance(table.c.deleted.type, expected_type))
table = utils.get_table(self.engine, table_name_2)
if self.engine.name != "mysql":
expected_type = Boolean
else:
expected_type = mysql.TINYINT
self.assertTrue(isinstance(table.c.deleted.type, expected_type))
@db_test_base.backend_specific('sqlite')
def test_change_deleted_column_type_to_boolean_type_custom(self):
table_name = 'abc'
engine = self.engines['sqlite']
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
table = Table(table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('foo', CustomType),
Column('deleted', Integer))
@ -436,13 +428,13 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
if SA_VERSION < (0, 9, 0):
self.assertRaises(utils.ColumnError,
utils.change_deleted_column_type_to_boolean,
engine, table_name)
self.engine, table_name)
fooColumn = Column('foo', CustomType())
utils.change_deleted_column_type_to_boolean(engine, table_name,
utils.change_deleted_column_type_to_boolean(self.engine, table_name,
foo=fooColumn)
table = utils.get_table(engine, table_name)
table = utils.get_table(self.engine, table_name)
# NOTE(boris-42): There is no way to check has foo type CustomType.
# but sqlalchemy will set it to NullType. This has
# been fixed upstream in recent SA versions
@ -450,22 +442,20 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
self.assertTrue(isinstance(table.c.foo.type, NullType))
self.assertTrue(isinstance(table.c.deleted.type, Boolean))
def test_change_deleted_column_type_drops_check_constraint(self):
@db_test_base.backend_specific('sqlite')
def test_change_deleted_column_type_sqlite_drops_check_constraint(self):
table_name = 'abc'
meta = MetaData()
engine = self.engines['sqlite']
meta.bind = engine
table = Table(table_name, meta,
table = Table(table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('deleted', Boolean))
table.create()
utils._change_deleted_column_type_to_id_type_sqlite(engine,
utils._change_deleted_column_type_to_id_type_sqlite(self.engine,
table_name)
table = Table(table_name, meta, autoload=True)
table = Table(table_name, self.meta, autoload=True)
# NOTE(I159): if the CHECK constraint has been dropped (expected
# behavior), any integer value can be inserted, otherwise only 1 or 0.
engine.execute(table.insert({'deleted': 10}))
self.engine.execute(table.insert({'deleted': 10}))
def test_utils_drop_unique_constraint(self):
table_name = "__test_tmp_table__"
@ -475,41 +465,38 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
{'id': 2, 'a': 2, 'foo': 20},
{'id': 3, 'a': 1, 'foo': 30},
]
for engine in self.engines.values():
meta = MetaData()
meta.bind = engine
test_table = Table(
table_name, meta,
Column('id', Integer, primary_key=True, nullable=False),
Column('a', Integer),
Column('foo', Integer),
UniqueConstraint('a', name='uniq_a'),
UniqueConstraint('foo', name=uc_name),
)
test_table.create()
test_table = Table(
table_name, self.meta,
Column('id', Integer, primary_key=True, nullable=False),
Column('a', Integer),
Column('foo', Integer),
UniqueConstraint('a', name='uniq_a'),
UniqueConstraint('foo', name=uc_name),
)
test_table.create()
engine.execute(test_table.insert(), values)
# NOTE(boris-42): This method is generic UC dropper.
utils.drop_unique_constraint(engine, table_name, uc_name, 'foo')
self.engine.execute(test_table.insert(), values)
# NOTE(boris-42): This method is generic UC dropper.
utils.drop_unique_constraint(self.engine, table_name, uc_name, 'foo')
s = test_table.select().order_by(test_table.c.id)
rows = engine.execute(s).fetchall()
s = test_table.select().order_by(test_table.c.id)
rows = self.engine.execute(s).fetchall()
for i in moves.range(len(values)):
v = values[i]
self.assertEqual((v['id'], v['a'], v['foo']), rows[i])
for i in moves.range(len(values)):
v = values[i]
self.assertEqual((v['id'], v['a'], v['foo']), rows[i])
# NOTE(boris-42): Update data about Table from DB.
meta = MetaData()
meta.bind = engine
test_table = Table(table_name, meta, autoload=True)
constraints = [c for c in test_table.constraints
if c.name == uc_name]
self.assertEqual(len(constraints), 0)
self.assertEqual(len(test_table.constraints), 1)
# NOTE(boris-42): Update data about Table from DB.
meta = MetaData(bind=self.engine)
test_table = Table(table_name, meta, autoload=True)
constraints = [c for c in test_table.constraints
if c.name == uc_name]
self.assertEqual(len(constraints), 0)
self.assertEqual(len(test_table.constraints), 1)
test_table.drop()
test_table.drop()
@db_test_base.backend_specific('sqlite')
def test_util_drop_unique_constraint_with_not_supported_sqlite_type(self):
table_name = "__test_tmp_table__"
uc_name = 'uniq_foo'
@ -519,11 +506,8 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
{'id': 3, 'a': 1, 'foo': 30}
]
engine = self.engines['sqlite']
meta = MetaData(bind=engine)
test_table = Table(
table_name, meta,
table_name, self.meta,
Column('id', Integer, primary_key=True, nullable=False),
Column('a', Integer),
Column('foo', CustomType, default=0),
@ -532,7 +516,7 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
)
test_table.create()
engine.execute(test_table.insert(), values)
self.engine.execute(test_table.insert(), values)
warnings.simplefilter("ignore", SAWarning)
# reflection of custom types has been fixed upstream
@ -541,56 +525,54 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
# unsupported type CustomType.
self.assertRaises(utils.ColumnError,
utils.drop_unique_constraint,
engine, table_name, uc_name, 'foo')
self.engine, table_name, uc_name, 'foo')
# NOTE(boris-42): Wrong type of foo instance. it should be
# instance of sqlalchemy.Column.
self.assertRaises(utils.ColumnError,
utils.drop_unique_constraint,
engine, table_name, uc_name, 'foo',
self.engine, table_name, uc_name, 'foo',
foo=Integer())
foo = Column('foo', CustomType, default=0)
utils.drop_unique_constraint(
engine, table_name, uc_name, 'foo', foo=foo)
self.engine, table_name, uc_name, 'foo', foo=foo)
s = test_table.select().order_by(test_table.c.id)
rows = engine.execute(s).fetchall()
rows = self.engine.execute(s).fetchall()
for i in moves.range(len(values)):
v = values[i]
self.assertEqual((v['id'], v['a'], v['foo']), rows[i])
# NOTE(boris-42): Update data about Table from DB.
meta = MetaData(bind=engine)
meta = MetaData(bind=self.engine)
test_table = Table(table_name, meta, autoload=True)
constraints = [c for c in test_table.constraints if c.name == uc_name]
self.assertEqual(len(constraints), 0)
self.assertEqual(len(test_table.constraints), 1)
test_table.drop()
@db_test_base.backend_specific('sqlite')
def test_drop_unique_constraint_in_sqlite_fk_recreate(self):
engine = self.engines['sqlite']
meta = MetaData()
meta.bind = engine
parent_table = Table(
'table0', meta,
'table0', self.meta,
Column('id', Integer, primary_key=True),
Column('foo', Integer),
)
parent_table.create()
table_name = 'table1'
table = Table(
table_name, meta,
table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('baz', Integer),
Column('bar', Integer, ForeignKey("table0.id")),
UniqueConstraint('baz', name='constr1')
)
table.create()
utils.drop_unique_constraint(engine, table_name, 'constr1', 'baz')
utils.drop_unique_constraint(self.engine, table_name, 'constr1', 'baz')
insp = reflection.Inspector.from_engine(engine)
insp = reflection.Inspector.from_engine(self.engine)
f_keys = insp.get_foreign_keys(table_name)
self.assertEqual(len(f_keys), 1)
f_key = f_keys[0]
@ -604,46 +586,53 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
uuidstrs = []
for unused in range(10):
uuidstrs.append(uuid.uuid4().hex)
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
conn = engine.connect()
insert_table = Table(
insert_table_name, meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
select_table = Table(
select_table_name, meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
insert_table = Table(
insert_table_name, self.meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
select_table = Table(
select_table_name, self.meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
insert_table.create()
select_table.create()
# Add 10 rows to select_table
for uuidstr in uuidstrs:
ins_stmt = select_table.insert().values(uuid=uuidstr)
conn.execute(ins_stmt)
insert_table.create()
select_table.create()
# Add 10 rows to select_table
for uuidstr in uuidstrs:
ins_stmt = select_table.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
# Select 4 rows in one chunk from select_table
column = select_table.c.id
query_insert = select([select_table],
select_table.c.id < 5).order_by(column)
insert_statement = utils.InsertFromSelect(insert_table,
query_insert)
result_insert = conn.execute(insert_statement)
# Verify we insert 4 rows
self.assertEqual(result_insert.rowcount, 4)
# Select 4 rows in one chunk from select_table
column = select_table.c.id
query_insert = select([select_table],
select_table.c.id < 5).order_by(column)
insert_statement = utils.InsertFromSelect(insert_table,
query_insert)
result_insert = self.conn.execute(insert_statement)
# Verify we insert 4 rows
self.assertEqual(result_insert.rowcount, 4)
query_all = select([insert_table]).where(
insert_table.c.uuid.in_(uuidstrs))
rows = conn.execute(query_all).fetchall()
# Verify we really have 4 rows in insert_table
self.assertEqual(len(rows), 4)
query_all = select([insert_table]).where(
insert_table.c.uuid.in_(uuidstrs))
rows = self.conn.execute(query_all).fetchall()
# Verify we really have 4 rows in insert_table
self.assertEqual(len(rows), 4)
insert_table.drop()
select_table.drop()
class PostgesqlTestMigrations(TestMigrationUtils,
db_test_base.PostgreSQLOpportunisticTestCase):
"""Test migrations on PostgreSQL."""
pass
class MySQLTestMigrations(TestMigrationUtils,
db_test_base.MySQLOpportunisticTestCase):
"""Test migrations on MySQL."""
pass
class TestConnectionUtils(test_utils.BaseTestCase):

View File

@ -12,7 +12,6 @@ envlist = py26,py27,pep8
install_command = pip install -U {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
OSLO_LOCK_PATH=/tmp/
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --slowest --testr-args='{posargs}'