Add downgrade migration tests

Change-Id: Idc79102273d93cd38bc1282aa003ece00ac88d63
Partial-Bug: #1391553
This commit is contained in:
Nikita Zubkov 2016-06-08 15:31:43 +03:00
parent f723e7afc4
commit 55c095b20b
2 changed files with 177 additions and 0 deletions

View File

@ -0,0 +1,103 @@
# coding: utf-8
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import alembic
import sqlalchemy as sa
from nailgun.db import db
from nailgun.db import dropdb
from nailgun.db.migration import ALEMBIC_CONFIG
from nailgun.test import base
_prepare_revision = 'c6edea552f1e'
_test_revision = '675105097a69'
def setup_module():
dropdb()
alembic.command.upgrade(ALEMBIC_CONFIG, _prepare_revision)
prepare()
db.commit()
alembic.command.downgrade(ALEMBIC_CONFIG, _test_revision)
def prepare():
meta = base.reflect_db_metadata()
result = db.execute(
meta.tables['releases'].insert(),
[{
'name': 'test_name',
'version': '2015.1-10.0',
'operating_system': 'ubuntu',
'state': 'available',
'deployment_tasks': '{}',
'roles': '{}',
'roles_metadata': '{}',
'is_deployable': True,
}]
)
release_id = result.inserted_primary_key[0]
result = db.execute(
meta.tables['clusters'].insert(),
[{
'name': 'test',
'release_id': release_id,
'mode': 'ha_compact',
'status': 'new',
'net_provider': 'neutron',
'grouping': 'roles',
'fuel_version': '10.0',
}]
)
cluster_id = result.inserted_primary_key[0]
TestPluginLinksConstraints.prepare(meta, cluster_id)
class TestPluginLinksConstraints(base.BaseAlembicMigrationTest):
test_link = {
'title': 'title',
'url': 'http://www.zzz.com',
'description': 'description',
'hidden': False
}
@classmethod
def prepare(cls, meta, cluster_id):
cls.test_link['cluster_id'] = cluster_id
db.execute(
meta.tables['cluster_plugin_links'].insert(),
[cls.test_link],
)
def test_duplicate_cluster_link(self):
db.execute(
self.meta.tables['cluster_plugin_links'].insert(),
[self.test_link]
)
links_count = db.execute(
sa.select(
[sa.func.count(self.meta.tables['cluster_plugin_links'].c.id)]
)).fetchone()[0]
self.assertEqual(links_count, 2)

View File

@ -0,0 +1,74 @@
# coding: utf-8
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import alembic
from nailgun.db import db
from nailgun.db import dropdb
from nailgun.db.migration import ALEMBIC_CONFIG
from nailgun.test import base
import sqlalchemy as sa
_prepare_revision = '675105097a69'
_test_revision = '11a9adc6d36a'
def setup_module():
dropdb()
alembic.command.upgrade(ALEMBIC_CONFIG, _prepare_revision)
prepare()
db.commit()
alembic.command.downgrade(ALEMBIC_CONFIG, _test_revision)
def prepare():
meta = base.reflect_db_metadata()
TestNodeErrorMessageDowngrade.prepare(meta)
class TestNodeErrorMessageDowngrade(base.BaseAlembicMigrationTest):
node_uuid = '26b508d0-0d76-4159-bce9-f67ec2765480'
long_error_msg = 'a' * 500
@classmethod
def prepare(cls, meta):
nodes = meta.tables['nodes']
db.execute(
nodes.insert(),
[{
'uuid': cls.node_uuid,
'cluster_id': None,
'group_id': None,
'status': 'discover',
'meta': '{}',
'mac': 'aa:aa:aa:aa:aa:aa',
'error_msg': cls.long_error_msg,
'timestamp': datetime.datetime.utcnow(),
}]
)
def test_downgrade_node_error_msg(self):
nodes = self.meta.tables['nodes']
self.assertIsInstance(nodes.columns['error_msg'].type, sa.String)
node = db.query(nodes).filter_by(uuid=self.node_uuid).first()
self.assertEqual(node.error_msg, self.long_error_msg[:255])