Validate mapping exists when creating/updating a protocol

This patch validates that a mapping exists when adding or updating
a federation protocol.

A conflict was resolved in keystone/federation/core.py since
stable/newton has deprecated support for version drivers. Which
doesn't exist in stable/ocata or master.

Change-Id: I996f94d26eb0f2c679542ba13a03bbaa4442486a
Closes-Bug: #1571878
This commit is contained in:
Ronald De Rose 2016-08-29 20:13:35 +00:00 committed by Lance Bragstad
parent 05a129e545
commit 057d585268
7 changed files with 141 additions and 8 deletions

View File

@ -182,6 +182,7 @@ Add a protocol and attribute mapping to an identity provider
Relationship: ``http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/1.0/rel/identity_provider_protocol``
Normal response codes: 201
Error response codes: 400
Request
-------
@ -283,6 +284,7 @@ Update the attribute mapping for an identity provider and protocol
Relationship: ``http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/1.0/rel/identity_provider_protocol``
Normal response codes: 200
Error response codes: 400
Request
-------

View File

@ -22,6 +22,7 @@ import keystone.conf
from keystone import exception
from keystone.federation.backends import base
from keystone.federation import utils
from keystone.i18n import _
# This is a general cache region for service providers.
@ -114,6 +115,21 @@ class Manager(manager.Manager):
mapped_properties = rule_processor.process(assertion_data)
return mapped_properties, mapping['id']
def create_protocol(self, idp_id, protocol_id, protocol):
self._validate_mapping_exists(protocol['mapping_id'])
return self.driver.create_protocol(idp_id, protocol_id, protocol)
def update_protocol(self, idp_id, protocol_id, protocol):
self._validate_mapping_exists(protocol['mapping_id'])
return self.driver.update_protocol(idp_id, protocol_id, protocol)
def _validate_mapping_exists(self, mapping_id):
try:
self.driver.get_mapping(mapping_id)
except exception.MappingNotFound:
msg = _('Invalid mapping id: %s')
raise exception.ValidationError(message=(msg % mapping_id))
@versionutils.deprecated(
versionutils.deprecated.NEWTON,

View File

@ -0,0 +1,92 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit import mapping_fixtures
class TestFederationProtocol(unit.TestCase):
def setUp(self):
super(TestFederationProtocol, self).setUp()
self.useFixture(database.Database())
self.load_backends()
self.idp = {
'id': uuid.uuid4().hex,
'enabled': True,
'description': uuid.uuid4().hex
}
self.federation_api.create_idp(self.idp['id'], self.idp)
self.mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
self.mapping['id'] = uuid.uuid4().hex
self.federation_api.create_mapping(self.mapping['id'],
self.mapping)
def test_create_protocol(self):
protocol = {
'id': uuid.uuid4().hex,
'mapping_id': self.mapping['id']
}
protocol_ret = self.federation_api.create_protocol(self.idp['id'],
protocol['id'],
protocol)
self.assertEqual(protocol['id'], protocol_ret['id'])
def test_create_protocol_with_invalid_mapping_id(self):
protocol = {
'id': uuid.uuid4().hex,
'mapping_id': uuid.uuid4().hex
}
self.assertRaises(exception.ValidationError,
self.federation_api.create_protocol,
self.idp['id'],
protocol['id'],
protocol)
def test_update_protocol(self):
protocol = {
'id': uuid.uuid4().hex,
'mapping_id': self.mapping['id']
}
protocol_ret = self.federation_api.create_protocol(self.idp['id'],
protocol['id'],
protocol)
self.assertEqual(protocol['id'], protocol_ret['id'])
new_mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
new_mapping['id'] = uuid.uuid4().hex
self.federation_api.create_mapping(new_mapping['id'], new_mapping)
protocol['mapping_id'] = new_mapping['id']
protocol_ret = self.federation_api.update_protocol(self.idp['id'],
protocol['id'],
protocol)
self.assertEqual(protocol['id'], protocol_ret['id'])
self.assertEqual(new_mapping['id'], protocol_ret['mapping_id'])
def test_update_protocol_with_invalid_mapping_id(self):
protocol = {
'id': uuid.uuid4().hex,
'mapping_id': self.mapping['id']
}
protocol_ret = self.federation_api.create_protocol(self.idp['id'],
protocol['id'],
protocol)
self.assertEqual(protocol['id'], protocol_ret['id'])
protocol['mapping_id'] = uuid.uuid4().hex
self.assertRaises(exception.ValidationError,
self.federation_api.update_protocol,
self.idp['id'],
protocol['id'],
protocol)

View File

@ -863,6 +863,7 @@ class FederatedIdentityProviderTests(test_v3.RestfulTestCase):
proto = uuid.uuid4().hex
if mapping_id is None:
mapping_id = uuid.uuid4().hex
self._create_mapping(mapping_id)
body = {'mapping_id': mapping_id}
url = url % {'idp_id': idp_id, 'protocol_id': proto}
resp = self.put(url, body={'protocol': body}, **kwargs)
@ -874,11 +875,19 @@ class FederatedIdentityProviderTests(test_v3.RestfulTestCase):
return (resp, idp_id, proto)
def _get_protocol(self, idp_id, protocol_id):
url = "%s/protocols/%s" % (idp_id, protocol_id)
url = '%s/protocols/%s' % (idp_id, protocol_id)
url = self.base_url(suffix=url)
r = self.get(url)
return r
def _create_mapping(self, mapping_id):
mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
mapping['id'] = mapping_id
url = '/OS-FEDERATION/mappings/%s' % mapping_id
self.put(url,
body={'mapping': mapping},
expected_status=http_client.CREATED)
def test_create_idp(self):
"""Create the IdentityProvider entity associated to remote_ids."""
keys_to_check = list(self.idp_keys)
@ -1397,6 +1406,7 @@ class FederatedIdentityProviderTests(test_v3.RestfulTestCase):
resp, idp_id, proto = self._assign_protocol_to_idp(
expected_status=http_client.CREATED)
new_mapping_id = uuid.uuid4().hex
self._create_mapping(mapping_id=new_mapping_id)
url = "%s/protocols/%s" % (idp_id, proto)
url = self.base_url(suffix=url)

View File

@ -14,6 +14,7 @@
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from keystone_tempest_plugin.tests.api.identity import base
from keystone_tempest_plugin.tests.api.identity.v3 import fixtures
@ -217,9 +218,12 @@ class IndentityProvidersTest(base.BaseIdentityTest):
# a non existent mapping ID
mapping_id = data_utils.rand_uuid_hex()
protocol_id = data_utils.rand_uuid_hex()
protocol = self._create_protocol(idp_id, protocol_id, mapping_id)
self._assert_protocol_attributes(protocol, protocol_id, mapping_id)
self.assertRaises(
lib_exc.BadRequest,
self._create_protocol,
idp_id,
protocol_id,
mapping_id)
@decorators.idempotent_id('c73311e7-c207-4c11-998f-532a91f1b0d1')
def test_update_protocol_from_identity_provider_unknown_mapping(self):
@ -232,7 +236,9 @@ class IndentityProvidersTest(base.BaseIdentityTest):
# Update the identity provider protocol using a non existent
# mapping_id
new_mapping_id = data_utils.rand_uuid_hex()
protocol = self.idps_client.update_protocol_mapping(
idp_id, protocol_id, new_mapping_id)['protocol']
self._assert_protocol_attributes(protocol, protocol_id, new_mapping_id)
self.assertRaises(
lib_exc.BadRequest,
self.idps_client.update_protocol_mapping,
idp_id,
protocol_id,
new_mapping_id)

View File

@ -0,0 +1,7 @@
---
fixes:
- >
[`bug 1571878 <https://bugs.launchpad.net/keystone/+bug/1571878>`_]
A valid ``mapping_id`` is now required when creating or updating a
federation protocol. If the ``mapping_id`` does not exist, a
``400 - Bad Request`` will be returned.