Merge "Keystone to Keystone tests"

This commit is contained in:
Zuul 2019-10-25 21:55:49 +00:00 committed by Gerrit Code Review
commit 13a94876e4
2 changed files with 91 additions and 21 deletions

View File

@ -44,8 +44,8 @@ class Saml2Client(object):
headers=self.ECP_SP_EMPTY_REQUEST_HEADERS
)
def _prepare_sp_saml2_authn_response(self, saml2_idp_authn_response,
relay_state):
def prepare_sp_saml2_authn_response(self, saml2_idp_authn_response,
relay_state):
# Replace the header contents of the Identity Provider response with
# the relay state initially sent by the Service Provider. The response
# is a SOAP envelope with the following structure:
@ -72,10 +72,7 @@ class Saml2Client(object):
)
def send_service_provider_saml2_authn_response(
self, saml2_idp_authn_response, relay_state, idp_consumer_url):
self._prepare_sp_saml2_authn_response(
saml2_idp_authn_response, relay_state)
self, saml2_idp_authn_response, idp_consumer_url):
return self.session.post(
idp_consumer_url,

View File

@ -12,8 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from lxml import etree
from six.moves import http_client
from six.moves import urllib
from tempest import config
from tempest.lib.common.utils import data_utils
import testtools
@ -42,16 +44,26 @@ class TestSaml2EcpFederatedAuthentication(base.BaseIdentityTest):
def _setup_settings(self):
self.idp_id = CONF.fed_scenario.idp_id
self.idp_remote_ids = CONF.fed_scenario.idp_remote_ids
self.idp_url = CONF.fed_scenario.idp_ecp_url
self.keystone_v3_endpoint = CONF.identity.uri_v3
self.password = CONF.fed_scenario.idp_password
self.protocol_id = CONF.fed_scenario.protocol_id
self.username = CONF.fed_scenario.idp_username
self.mapping_remote_type = CONF.fed_scenario.mapping_remote_type
self.mapping_user_name = CONF.fed_scenario.mapping_user_name
self.mapping_group_name = CONF.fed_scenario.mapping_group_name
self.mapping_group_domain_name = \
CONF.fed_scenario.mapping_group_domain_name
# NOTE(knikolla): Authentication endpoint for keystone. If not set,
# will be autodetected.
self.auth_url = None
def _setup_idp(self):
remote_ids = CONF.fed_scenario.idp_remote_ids
idp = self.idps_client.create_identity_provider(
self.idp_id, remote_ids=remote_ids, enabled=True)
self.idp_id, remote_ids=self.idp_remote_ids, enabled=True)
self.addCleanup(
self.keystone_manager.domains_client.delete_domain,
idp['identity_provider']['domain_id'])
@ -63,26 +75,21 @@ class TestSaml2EcpFederatedAuthentication(base.BaseIdentityTest):
def _setup_mapping(self):
self.mapping_id = data_utils.rand_uuid_hex()
mapping_remote_type = CONF.fed_scenario.mapping_remote_type
mapping_user_name = CONF.fed_scenario.mapping_user_name
mapping_group_name = CONF.fed_scenario.mapping_group_name
mapping_group_domain_name = CONF.fed_scenario.mapping_group_domain_name
rules = [{
'local': [
{
'user': {'name': mapping_user_name}
'user': {'name': self.mapping_user_name}
},
{
'group': {
'domain': {'name': mapping_group_domain_name},
'name': mapping_group_name
'domain': {'name': self.mapping_group_domain_name},
'name': self.mapping_group_name
}
}
],
'remote': [
{
'type': mapping_remote_type
'type': self.mapping_remote_type
}
]
}]
@ -116,7 +123,7 @@ class TestSaml2EcpFederatedAuthentication(base.BaseIdentityTest):
self.assertEqual(1, len(l))
return l[0]
def _request_unscoped_token(self):
def _get_sp_authn_request(self):
resp = self.saml2_client.send_service_provider_request(
self.keystone_v3_endpoint, self.idp_id, self.protocol_id)
self.assertEqual(http_client.OK, resp.status_code)
@ -140,19 +147,33 @@ class TestSaml2EcpFederatedAuthentication(base.BaseIdentityTest):
# have the same consumer URL.
self.assertEqual(sp_consumer_url, idp_consumer_url)
# Present the identity provider authn response to the service provider.
self.saml2_client.prepare_sp_saml2_authn_response(
saml2_idp_authn_response, relay_state)
return saml2_idp_authn_response, sp_consumer_url
def _request_unscoped_token(self):
assertion, sp_url = self._get_sp_authn_request()
# Present the identity provider authn response to the service provider
resp = self.saml2_client.send_service_provider_saml2_authn_response(
saml2_idp_authn_response, relay_state, idp_consumer_url)
assertion, sp_url)
# Must receive a redirect from service provider to the URL where the
# unscoped token can be retrieved.
self.assertIn(resp.status_code,
[http_client.FOUND, http_client.SEE_OTHER])
# If this is K2K, don't follow HTTP specs - after the HTTP 302/303
# response don't repeat the call directed to the Location URL. In this
# case, this is an indication that SAML2 session is now active and
# protected resource can be accessed.
# https://opendev.org/openstack/keystoneauth/src/tag/3.17.1/keystoneauth1/identity/v3/k2k.py#L152
sp_url = self.auth_url or resp.headers['location']
# We can receive multiple types of errors here, the response depends on
# the mapping and the username used to authenticate in the Identity
# Provider and also in the Identity Provider remote ID validation.
# If everything works well, we receive an unscoped token.
sp_url = resp.headers['location']
resp = (
self.saml2_client.send_service_provider_unscoped_token_request(
sp_url))
@ -180,3 +201,55 @@ class TestSaml2EcpFederatedAuthentication(base.BaseIdentityTest):
# Get a scoped token to one of the listed projects
self.tokens_client.auth(
project_id=projects[0]['id'], token=token_id)
class TestK2KFederatedAuthentication(TestSaml2EcpFederatedAuthentication):
def setUp(self):
super(TestK2KFederatedAuthentication, self).setUp()
self._setup_sp()
def _setup_settings(self):
super(TestK2KFederatedAuthentication, self)._setup_settings()
self.idp_id = 'keystone'
self.idp_remote_ids = [
'%s/OS-FEDERATION/saml2/idp' % self.keystone_v3_endpoint]
self.mapping_remote_type = 'openstack_user'
self.sp_id = 'keystone'
self.auth_url = (
'%s/OS-FEDERATION/identity_providers/%s/protocols/%s/auth'
) % (self.keystone_v3_endpoint, self.sp_id, self.protocol_id)
url = urllib.parse.urlparse(self.keystone_v3_endpoint)
self.sp_url = '%s://%s/Shibboleth.sso/SAML2/ECP' % (url.scheme,
url.netloc)
def _setup_sp(self):
self.sps_client.create_service_provider(self.sp_id,
sp_url=self.sp_url,
auth_url=self.auth_url,
enabled=True)
self.addCleanup(self.sps_client.delete_service_provider, self.sp_id)
def _get_sp_authn_request(self):
body = {
'auth': {
'identity': {
'methods': ['token'],
'token': {
'id': self.auth_client.token
}
},
'scope': {
'service_provider': {
'id': self.sp_id
}
}
}
}
resp, saml = self.auth_client.post('auth/OS-FEDERATION/saml2/ecp',
json.dumps(body))
self.auth_client.expected_success(200, resp.status)
return etree.XML(saml), self.sp_url