Add service providers integration tests

This patch adds the tests for the Service Provider API (part of
the Federated Identity API).

To run the tests install keystone and run (in tempest):

    $ tox -e all-plugin -- keystone

Change-Id: I6d6f44736e4187dd2a500c7c0b6715e52296a9b3
This commit is contained in:
Rodrigo Duarte 2016-04-08 11:53:43 -03:00
parent 45f5fc96b0
commit d96e29cbe9
7 changed files with 348 additions and 17 deletions

View File

@ -14,6 +14,8 @@
from keystone_tempest_plugin.services.identity.v3 import (
identity_providers_client)
from keystone_tempest_plugin.services.identity.v3 import (
service_providers_client)
from tempest import clients
@ -26,3 +28,6 @@ class Manager(clients.Manager):
self.identity_providers_client = (
identity_providers_client.IdentityProvidersClient(
self.auth_provider))
self.service_providers_client = (
service_providers_client.ServiceProvidersClient(
self.auth_provider))

View File

@ -34,3 +34,30 @@ class Identity(rest_client.RestClient):
SERVICE_TYPE,
CONF.identity.region,
endpoint_type='adminURL')
class Federation(Identity):
"""Tempest REST client for keystone's Federated Identity API."""
subpath_prefix = 'OS-FEDERATION'
subpath_suffix = None
def _build_path(self, entity_id=None):
subpath = '%s/%s' % (self.subpath_prefix, self.subpath_suffix)
return '%s/%s' % (subpath, entity_id) if entity_id else subpath
def _delete(self, entity_id, **kwargs):
url = self._build_path(entity_id)
return super(Federation, self).delete(url, **kwargs)
def _get(self, entity_id=None, **kwargs):
url = self._build_path(entity_id)
return super(Federation, self).get(url, **kwargs)
def _patch(self, entity_id, body, **kwargs):
url = self._build_path(entity_id)
return super(Federation, self).patch(url, body, **kwargs)
def _put(self, entity_id, body, **kwargs):
url = self._build_path(entity_id)
return super(Federation, self).put(url, body, **kwargs)

View File

@ -19,12 +19,9 @@ from tempest.lib.common import rest_client
from keystone_tempest_plugin.services.identity import clients
class IdentityProvidersClient(clients.Identity):
class IdentityProvidersClient(clients.Federation):
subpath = 'OS-FEDERATION/identity_providers'
def _build_path(self, idp_id=None):
return '%s/%s' % (self.subpath, idp_id) if idp_id else self.subpath
subpath_suffix = 'identity_providers'
def create_identity_provider(self, idp_id, **kwargs):
"""Create an identity provider.
@ -34,33 +31,28 @@ class IdentityProvidersClient(clients.Identity):
(boolean) and remote_ids (list).
"""
put_body = json.dumps({'identity_provider': kwargs})
url = self._build_path(idp_id)
resp, body = self.put(url, put_body)
resp, body = self._put(idp_id, put_body)
self.expected_success(201, resp.status)
body = json.loads(body)
idp = rest_client.ResponseBody(resp, body)
return idp
return rest_client.ResponseBody(resp, body)
def list_identity_providers(self):
"""List the identity providers."""
url = self._build_path()
resp, body = self.get(url)
resp, body = self._get()
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_identity_provider(self, idp_id):
"""Get an identity provider."""
url = self._build_path(idp_id)
resp, body = self.get(url)
resp, body = self._get(idp_id)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def delete_identity_provider(self, idp_id):
"""Delete an identity provider."""
url = self._build_path(idp_id)
resp, body = self.delete(url)
resp, body = self._delete(idp_id)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
@ -72,8 +64,7 @@ class IdentityProvidersClient(clients.Identity):
enabled (boolean) and remote_ids (list).
"""
patch_body = json.dumps({'identity_provider': kwargs})
url = self._build_path(idp_id)
resp, body = self.patch(url, patch_body)
resp, body = self._patch(idp_id, patch_body)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)

View File

@ -0,0 +1,89 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils
from tempest.lib.common import rest_client
from keystone_tempest_plugin.services.identity import clients
class ServiceProvidersClient(clients.Federation):
subpath_suffix = 'service_providers'
def create_service_provider(self, sp_id, **kwargs):
"""Create a service provider.
:param str sp_id: The ID to be used to create the Service Provider.
:param kwargs: Extra attributes. Mandatory: auth_url (str) and sp_url
(str). Optional: description (str), enabled (boolean)
and relay_state_prefix (str).
"""
put_body = jsonutils.dumps({'service_provider': kwargs})
resp, body = self._put(sp_id, put_body)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return rest_client.ResponseBody(resp, body)
def list_service_providers(self):
"""List the service providers."""
resp, body = self._get()
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return rest_client.ResponseBody(resp, body)
def show_service_provider(self, sp_id):
"""Get a service provider."""
resp, body = self._get(sp_id)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return rest_client.ResponseBody(resp, body)
def delete_service_provider(self, sp_id):
"""Delete a service provider."""
resp, body = self._delete(sp_id)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
def update_service_provider(self, sp_id, **kwargs):
"""Update a service provider.
:param str sp_id: The ID of the Service Provider to be updated.
:param kwargs: All attributes to be updated: auth_url (str) and sp_url
(str), description (str), enabled (boolean) and
relay_state_prefix (str).
"""
patch_body = jsonutils.dumps({'service_provider': kwargs})
resp, body = self._patch(sp_id, patch_body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return rest_client.ResponseBody(resp, body)
def get_service_providers_in_token(self):
"""Get the service providers list present in the token.
Only enabled service providers are displayed in the token.
"""
# First we force the auth_data update via the set_auth() command
# in the auth_provider
self.auth_provider.set_auth()
# Now we can retrieve the updated auth_data
auth_data = self.auth_provider.get_auth()[1]
try:
return auth_data['service_providers']
except KeyError:
# no service providers in token
return []

View File

@ -34,3 +34,4 @@ class BaseIdentityTest(test.BaseTestCase):
cls.credential_type, identity_version=cls.identity_version)
cls.keystone_manager = clients.Manager(credentials=credentials)
cls.idps_client = cls.keystone_manager.identity_providers_client
cls.sps_client = cls.keystone_manager.service_providers_client

View File

@ -26,3 +26,18 @@ def idp_ref(enabled=None, remote_ids=None):
ref['remote_ids'] = remote_ids
return ref
def sp_ref(enabled=None, relay_state_prefix=None):
ref = {
'auth_url': data_utils.rand_url(),
'description': data_utils.rand_uuid_hex(),
'sp_url': data_utils.rand_url(),
}
if enabled:
ref['enabled'] = enabled
if relay_state_prefix:
ref['relay_state_prefix'] = relay_state_prefix
return ref

View File

@ -0,0 +1,203 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from keystone_tempest_plugin.tests.api.identity import base
from keystone_tempest_plugin.tests.api.identity.v3 import fixtures
DEFAULT_RELAY_STATE_PREFIX = 'ss:mem:'
class ServiceProvidersTest(base.BaseIdentityTest):
def _assert_service_provider_attributes(self, sp, sp_id, sp_ref=None):
self.assertIn('id', sp)
self.assertEqual(sp_id, sp['id'])
self.assertIn('auth_url', sp)
self.assertIn('sp_url', sp)
# Check the optional attributes have been set
self.assertIn('description', sp)
self.assertIn('enabled', sp)
self.assertIn('relay_state_prefix', sp)
if sp_ref:
self.assertEqual(sp_ref['auth_url'], sp['auth_url'])
self.assertEqual(sp_ref['sp_url'], sp['sp_url'])
self.assertEqual(sp_ref['description'], sp['description'])
if 'enabled' in sp_ref:
self.assertEqual(sp_ref['enabled'], sp['enabled'])
if 'relay_state_prefix' in sp_ref:
self.assertEqual(
sp_ref['relay_state_prefix'], sp['relay_state_prefix'])
def _add_cleanup(self, sp_id):
self.addCleanup(
self.sps_client.delete_service_provider, sp_id)
def _create_sp(self, sp_id, sp_ref):
sp = self.sps_client.create_service_provider(
sp_id, **sp_ref)['service_provider']
self.addCleanup(self.sps_client.delete_service_provider, sp_id)
return sp
@decorators.idempotent_id('6fae0971-5acb-4559-ba25-96f1fd7e5385')
def test_service_provider_create(self):
sp_id = data_utils.rand_uuid_hex()
sp_ref = fixtures.sp_ref()
sp = self._create_sp(sp_id, sp_ref)
# The service provider is disabled by default
sp_ref['enabled'] = False
# The relay_state_prefix should have been set to the default value
sp_ref['relay_state_prefix'] = DEFAULT_RELAY_STATE_PREFIX
self._assert_service_provider_attributes(sp, sp_id, sp_ref)
@test.attr(type=['negative'])
@decorators.idempotent_id('d9d7454c-50b7-4966-aedb-b9d520a41409')
def test_service_provider_create_without_mandatory_attributes(self):
sp_id = data_utils.rand_uuid_hex()
self.assertRaises(
lib_exc.BadRequest,
self.sps_client.create_service_provider,
sp_id)
@test.attr(type=['negative'])
@decorators.idempotent_id('f77ed1c0-c428-44a7-9364-e8e4362c360a')
def test_service_provider_create_with_bad_attributes(self):
sp_id = data_utils.rand_uuid_hex()
sp_ref = fixtures.sp_ref()
# The auth_url must follow a URL regex
sp_ref['auth_url'] = data_utils.rand_uuid_hex()
self.assertRaises(
lib_exc.BadRequest,
self.sps_client.create_service_provider,
sp_id,
**sp_ref)
sp_ref = fixtures.sp_ref()
# The sp_url must follow a URL regex
sp_ref['sp_url'] = data_utils.rand_uuid_hex()
self.assertRaises(
lib_exc.BadRequest,
self.sps_client.create_service_provider,
sp_id,
**sp_ref)
@decorators.idempotent_id('8550b419-f212-4e34-a8fa-7ff64f8a7fd3')
def test_service_provider_create_with_enabled_true(self):
sp_id = data_utils.rand_uuid_hex()
sp_ref = fixtures.sp_ref(enabled=True)
sp = self._create_sp(sp_id, sp_ref)
self._assert_service_provider_attributes(sp, sp_id, sp_ref)
@decorators.idempotent_id('0e319a14-1548-474e-a406-273c6b1c1f2d')
def test_service_provider_create_with_relay_state_prefix(self):
sp_id = data_utils.rand_uuid_hex()
sp_ref = fixtures.sp_ref(
enabled=True, relay_state_prefix=data_utils.rand_uuid_hex())
sp = self._create_sp(sp_id, sp_ref)
self._assert_service_provider_attributes(sp, sp_id, sp_ref)
@decorators.idempotent_id('7df78c7a-9265-4b4f-9630-193b7f07d9eb')
def test_service_provider_get(self):
sp_id = data_utils.rand_uuid_hex()
sp_create = self._create_sp(sp_id, fixtures.sp_ref())
sp_get = self.sps_client.show_service_provider(sp_id)[
'service_provider']
self._assert_service_provider_attributes(sp_get, sp_id, sp_create)
@decorators.idempotent_id('9237cea0-fbeb-4d64-8347-46c567e1d78f')
def test_service_provider_list(self):
sp_ids = []
for _ in range(3):
sp_id = data_utils.rand_uuid_hex()
self._create_sp(sp_id, fixtures.sp_ref())
sp_ids.append(sp_id)
sp_list = self.sps_client.list_service_providers()['service_providers']
fetched_ids = [fetched_sp['id'] for fetched_sp in sp_list]
for sp_id in sp_ids:
self.assertIn(sp_id, fetched_ids)
@decorators.idempotent_id('bb68653f-fbba-4f20-ac1b-7b318a557366')
def test_service_provider_update(self):
sp_id = data_utils.rand_uuid_hex()
sp = self._create_sp(sp_id, fixtures.sp_ref(enabled=True))
# The service provider should be enabled
self.assertTrue(sp['enabled'])
sp = self.sps_client.update_service_provider(
sp_id, enabled=False)['service_provider']
# The service provider should be now disabled
self.assertFalse(sp['enabled'])
@test.attr(type=['negative'])
@decorators.idempotent_id('91ce1183-1a15-4598-ae5f-85cfa98a1c77')
def test_service_provider_update_with_bad_attributes(self):
sp_id = data_utils.rand_uuid_hex()
self._create_sp(sp_id, fixtures.sp_ref())
# The auth_url must follow a URL regex
self.assertRaises(
lib_exc.BadRequest,
self.sps_client.update_service_provider,
sp_id,
auth_url=data_utils.rand_uuid_hex())
# The sp_url must follow a URL regex
self.assertRaises(
lib_exc.BadRequest,
self.sps_client.update_service_provider,
sp_id,
auth_url=data_utils.rand_uuid_hex())
@decorators.idempotent_id('7553579b-9a9e-45dd-9ada-70d906b516c0')
def test_service_providers_in_token(self):
# Create some enabled service providers
enabled_sps = []
for _ in range(2):
sp_id = data_utils.rand_uuid_hex()
self._create_sp(sp_id, fixtures.sp_ref(enabled=True))
enabled_sps.append(sp_id)
# Create some disabled service providers
for _ in range(2):
sp_id = data_utils.rand_uuid_hex()
self._create_sp(sp_id, fixtures.sp_ref(enabled=False))
sps_in_token_ids = [
sp['id'] for sp in
self.sps_client.get_service_providers_in_token()]
# Should be equal to the enabled_sps list
self.assertItemsEqual(enabled_sps, sps_in_token_ids)