Add application credentials db migration

Add an additive-only migration to create the application credentials
table and a table for roles associated with application credentials.

bp application-credentials

Change-Id: Iaaf74013a50b06d29be7f19f699b215f375bc27f
This commit is contained in:
Colleen Murphy 2017-12-02 21:57:28 +01:00
parent 93ecb490b1
commit 476d73ac20
4 changed files with 164 additions and 0 deletions

View File

@ -0,0 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def upgrade(migrate_engine):
pass

View File

@ -0,0 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def upgrade(migrate_engine):
pass

View File

@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sql
from keystone.common import sql as ks_sql
def upgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
application_credential = sql.Table(
'application_credential', meta,
sql.Column('internal_id', sql.Integer, primary_key=True,
nullable=False),
sql.Column('id', sql.String(length=64), nullable=False),
sql.Column('name', sql.String(length=255), nullable=False),
sql.Column('secret_hash', sql.String(length=255), nullable=False),
sql.Column('description', sql.Text),
sql.Column('user_id', sql.String(length=64), nullable=False),
sql.Column('project_id', sql.String(64), nullable=False),
sql.Column('expires_at', ks_sql.DateTimeInt()),
sql.Column('allow_application_credential_creation', sql.Boolean),
sql.UniqueConstraint('user_id', 'name',
name='duplicate_app_cred_constraint'),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
application_credential_role = sql.Table(
'application_credential_role', meta,
sql.Column('application_credential_id', sql.Integer,
sql.ForeignKey(application_credential.c.internal_id,
ondelete='CASCADE'),
primary_key=True, nullable=False),
sql.Column('role_id', sql.String(length=64), primary_key=True,
nullable=False),
mysql_engine='InnoDB', mysql_charset='utf8')
application_credential.create(migrate_engine, checkfirst=True)
application_credential_role.create(migrate_engine, checkfirst=True)

View File

@ -2706,6 +2706,88 @@ class FullMigration(SqlMigrateBase, unit.TestCase):
}
limit_table.insert().values(limit_without_region).execute()
def test_migration_034_adds_application_credential_table(self):
self.expand(33)
self.migrate(33)
self.contract(33)
application_credential_table_name = 'application_credential'
self.assertTableDoesNotExist(application_credential_table_name)
application_credential_role_table_name = 'application_credential_role'
self.assertTableDoesNotExist(application_credential_role_table_name)
self.expand(34)
self.migrate(34)
self.contract(34)
self.assertTableExists(application_credential_table_name)
self.assertTableColumns(
application_credential_table_name,
['internal_id', 'id', 'name', 'secret_hash',
'description', 'user_id', 'project_id', 'expires_at',
'allow_application_credential_creation']
)
if self.engine.name == 'mysql':
self.assertTrue(self.does_index_exist(
'application_credential', 'duplicate_app_cred_constraint'))
else:
self.assertTrue(self.does_constraint_exist(
'application_credential', 'duplicate_app_cred_constraint'))
self.assertTableExists(application_credential_role_table_name)
self.assertTableColumns(
application_credential_role_table_name,
['application_credential_id', 'role_id']
)
app_cred_table = sqlalchemy.Table(
application_credential_table_name, self.metadata, autoload=True
)
app_cred_role_table = sqlalchemy.Table(
application_credential_role_table_name,
self.metadata, autoload=True
)
self.assertTrue(self.does_fk_exist('application_credential_role',
'application_credential_id'))
expires_at = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
epoch = datetime.datetime.fromtimestamp(0, tz=pytz.UTC)
expires_at_int = (expires_at - epoch).total_seconds()
app_cred = {
'internal_id': 1,
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'secret_hash': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
'user_id': uuid.uuid4().hex,
'project_id': uuid.uuid4().hex,
'expires_at': expires_at_int,
'allow_application_credential_creation': False
}
app_cred_table.insert().values(app_cred).execute()
# Exercise unique constraint
dup_app_cred = {
'internal_id': 2,
'id': uuid.uuid4().hex,
'name': app_cred['name'],
'secret_hash': uuid.uuid4().hex,
'user_id': app_cred['user_id'],
'project_id': uuid.uuid4().hex
}
insert = app_cred_table.insert().values(dup_app_cred)
self.assertRaises(db_exception.DBDuplicateEntry,
insert.execute)
role_rel = {
'application_credential_id': app_cred['internal_id'],
'role_id': uuid.uuid4().hex
}
app_cred_role_table.insert().values(role_rel).execute()
# Exercise role table primary keys
insert = app_cred_role_table.insert().values(role_rel)
self.assertRaises(db_exception.DBDuplicateEntry, insert.execute)
class MySQLOpportunisticFullMigration(FullMigration):
FIXTURE = test_base.MySQLOpportunisticFixture