diff --git a/keystone/common/sql/migrate_repo/versions/086_add_duplicate_constraint_trusts.py b/keystone/common/sql/migrate_repo/versions/086_add_duplicate_constraint_trusts.py new file mode 100644 index 0000000000..2b115ea423 --- /dev/null +++ b/keystone/common/sql/migrate_repo/versions/086_add_duplicate_constraint_trusts.py @@ -0,0 +1,26 @@ +# Copyright 2015 Intel Corporation +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from migrate import UniqueConstraint +from sqlalchemy import MetaData, Table + + +def upgrade(migrate_engine): + meta = MetaData(bind=migrate_engine) + trusts = Table('trust', meta, autoload=True) + + UniqueConstraint('trustor_user_id', 'trustee_user_id', 'project_id', + 'impersonation', 'expires_at', table=trusts, + name='duplicate_trust_constraint').create() diff --git a/keystone/tests/unit/test_backend.py b/keystone/tests/unit/test_backend.py index 273bd87f24..541efc9599 100644 --- a/keystone/tests/unit/test_backend.py +++ b/keystone/tests/unit/test_backend.py @@ -4786,13 +4786,13 @@ class TrustTests(object): def create_sample_trust(self, new_id, remaining_uses=None): self.trustor = self.user_foo self.trustee = self.user_two + expires_at = datetime.datetime.utcnow().replace(year=2031) trust_data = (self.trust_api.create_trust (new_id, {'trustor_user_id': self.trustor['id'], 'trustee_user_id': self.user_two['id'], 'project_id': self.tenant_bar['id'], - 'expires_at': timeutils. - parse_isotime('2031-02-18T18:10:00Z'), + 'expires_at': expires_at, 'impersonation': True, 'remaining_uses': remaining_uses}, roles=[{"id": "member"}, @@ -4914,6 +4914,26 @@ class TrustTests(object): self.trust_api.get_trust, trust_data['id']) + def test_duplicate_trusts_not_allowed(self): + self.trustor = self.user_foo + self.trustee = self.user_two + trust_data = {'trustor_user_id': self.trustor['id'], + 'trustee_user_id': self.user_two['id'], + 'project_id': self.tenant_bar['id'], + 'expires_at': timeutils.parse_isotime( + '2031-02-18T18:10:00Z'), + 'impersonation': True, + 'remaining_uses': None} + roles = [{"id": "member"}, + {"id": "other"}, + {"id": "browser"}] + self.trust_api.create_trust(uuid.uuid4().hex, trust_data, roles) + self.assertRaises(exception.Conflict, + self.trust_api.create_trust, + uuid.uuid4().hex, + trust_data, + roles) + class CatalogTests(object): diff --git a/keystone/tests/unit/test_sql_upgrade.py b/keystone/tests/unit/test_sql_upgrade.py index 23572edb20..7c6ce24ad1 100644 --- a/keystone/tests/unit/test_sql_upgrade.py +++ b/keystone/tests/unit/test_sql_upgrade.py @@ -808,6 +808,13 @@ class SqlUpgradeTests(SqlMigrateBase): self.assertTableDoesNotExist('endpoint_group') self.assertTableDoesNotExist('project_endpoint_group') + def test_add_trust_unique_constraint_upgrade(self): + self.upgrade(86) + inspector = reflection.Inspector.from_engine(self.engine) + constraints = inspector.get_unique_constraints('trust') + constraint_names = [constraint['name'] for constraint in constraints] + self.assertIn('duplicate_trust_constraint', constraint_names) + def populate_user_table(self, with_pass_enab=False, with_pass_enab_domain=False): # Populate the appropriate fields in the user diff --git a/keystone/tests/unit/test_v3_auth.py b/keystone/tests/unit/test_v3_auth.py index e22511e01c..210a354f90 100644 --- a/keystone/tests/unit/test_v3_auth.py +++ b/keystone/tests/unit/test_v3_auth.py @@ -2847,6 +2847,8 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): # Create first trust with extended set of roles ref = self.redelegated_trust_ref + ref['expires_at'] = datetime.datetime.utcnow().replace( + year=2031).strftime(unit.TIME_FORMAT) ref['roles'].append({'id': role['id']}) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) @@ -2859,6 +2861,9 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): trust_token = self._get_trust_token(trust) # Chain second trust with roles subset + self.chained_trust_ref['expires_at'] = ( + datetime.datetime.utcnow().replace(year=2030).strftime( + unit.TIME_FORMAT)) r = self.post('/OS-TRUST/trusts', body={'trust': self.chained_trust_ref}, token=trust_token) @@ -2879,6 +2884,8 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): expires=dict(minutes=1), role_names=[self.role['name']], allow_redelegation=True) + ref['expires_at'] = datetime.datetime.utcnow().replace( + year=2031).strftime(unit.TIME_FORMAT) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) @@ -2892,6 +2899,8 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): impersonation=True, role_names=[self.role['name']], allow_redelegation=True) + ref['expires_at'] = datetime.datetime.utcnow().replace( + year=2030).strftime(unit.TIME_FORMAT) r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=trust_token) @@ -2924,12 +2933,18 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): expected_status=http_client.FORBIDDEN) def test_redelegation_terminator(self): + self.redelegated_trust_ref['expires_at'] = ( + datetime.datetime.utcnow().replace(year=2031).strftime( + unit.TIME_FORMAT)) r = self.post('/OS-TRUST/trusts', body={'trust': self.redelegated_trust_ref}) trust = self.assertValidTrustResponse(r) trust_token = self._get_trust_token(trust) # Build second trust - the terminator + self.chained_trust_ref['expires_at'] = ( + datetime.datetime.utcnow().replace(year=2030).strftime( + unit.TIME_FORMAT)) ref = dict(self.chained_trust_ref, redelegation_count=1, allow_redelegation=False) @@ -3834,6 +3849,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): role_ids=[self.role_id]) for i in range(3): + ref['expires_at'] = datetime.datetime.utcnow().replace( + year=2031).strftime(unit.TIME_FORMAT) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) self.assertValidTrustResponse(r, ref) diff --git a/keystone/trust/backends/sql.py b/keystone/trust/backends/sql.py index a017056bf7..29daf5b645 100644 --- a/keystone/trust/backends/sql.py +++ b/keystone/trust/backends/sql.py @@ -45,6 +45,10 @@ class TrustModel(sql.ModelBase, sql.DictBase): expires_at = sql.Column(sql.DateTime) remaining_uses = sql.Column(sql.Integer, nullable=True) extra = sql.Column(sql.JsonBlob()) + __table_args__ = (sql.UniqueConstraint( + 'trustor_user_id', 'trustee_user_id', 'project_id', + 'impersonation', 'expires_at', + name='duplicate_trust_constraint'),) class TrustRole(sql.ModelBase):