# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import copy import fixtures import uuid from keystoneauth1 import exceptions from keystoneauth1 import fixture from keystoneauth1.identity import v3 from keystoneauth1 import session from keystoneauth1.tests.unit import k2k_fixtures from testtools import matchers from keystoneclient import access from keystoneclient.tests.unit.v3 import utils from keystoneclient.v3 import client from keystoneclient.v3.contrib.federation import base from keystoneclient.v3.contrib.federation import identity_providers from keystoneclient.v3.contrib.federation import mappings from keystoneclient.v3.contrib.federation import protocols from keystoneclient.v3.contrib.federation import service_providers from keystoneclient.v3 import domains from keystoneclient.v3 import projects class IdentityProviderTests(utils.ClientTestCase, utils.CrudTests): def setUp(self): super(IdentityProviderTests, self).setUp() self.key = 'identity_provider' self.collection_key = 'identity_providers' self.model = identity_providers.IdentityProvider self.manager = self.client.federation.identity_providers self.path_prefix = 'OS-FEDERATION' def new_ref(self, **kwargs): kwargs.setdefault('id', uuid.uuid4().hex) kwargs.setdefault('description', uuid.uuid4().hex) kwargs.setdefault('enabled', True) return kwargs def test_positional_parameters_expect_fail(self): """Ensure CrudManager raises TypeError exceptions. After passing wrong number of positional arguments an exception should be raised. Operations to be tested: * create() * get() * list() * delete() * update() """ POS_PARAM_1 = uuid.uuid4().hex POS_PARAM_2 = uuid.uuid4().hex POS_PARAM_3 = uuid.uuid4().hex PARAMETERS = { 'create': (POS_PARAM_1, POS_PARAM_2), 'get': (POS_PARAM_1, POS_PARAM_2), 'list': (POS_PARAM_1, POS_PARAM_2), 'update': (POS_PARAM_1, POS_PARAM_2, POS_PARAM_3), 'delete': (POS_PARAM_1, POS_PARAM_2) } for f_name, args in PARAMETERS.items(): self.assertRaises(TypeError, getattr(self.manager, f_name), *args) def test_create(self): ref = self.new_ref() req_ref = ref.copy() req_ref.pop('id') self.stub_entity('PUT', entity=ref, id=ref['id'], status_code=201) returned = self.manager.create(**ref) self.assertIsInstance(returned, self.model) for attr in req_ref: self.assertEqual( getattr(returned, attr), req_ref[attr], 'Expected different %s' % attr) self.assertEntityRequestBodyIs(req_ref) class MappingTests(utils.ClientTestCase, utils.CrudTests): def setUp(self): super(MappingTests, self).setUp() self.key = 'mapping' self.collection_key = 'mappings' self.model = mappings.Mapping self.manager = self.client.federation.mappings self.path_prefix = 'OS-FEDERATION' def new_ref(self, **kwargs): kwargs.setdefault('id', uuid.uuid4().hex) kwargs.setdefault('rules', [uuid.uuid4().hex, uuid.uuid4().hex]) return kwargs def test_create(self): ref = self.new_ref() manager_ref = ref.copy() mapping_id = manager_ref.pop('id') req_ref = ref.copy() self.stub_entity('PUT', entity=req_ref, id=mapping_id, status_code=201) returned = self.manager.create(mapping_id=mapping_id, **manager_ref) self.assertIsInstance(returned, self.model) for attr in req_ref: self.assertEqual( getattr(returned, attr), req_ref[attr], 'Expected different %s' % attr) self.assertEntityRequestBodyIs(manager_ref) class ProtocolTests(utils.ClientTestCase, utils.CrudTests): def setUp(self): super(ProtocolTests, self).setUp() self.key = 'protocol' self.collection_key = 'protocols' self.model = protocols.Protocol self.manager = self.client.federation.protocols self.path_prefix = 'OS-FEDERATION/identity_providers' def _transform_to_response(self, ref): """Construct a response body from a dictionary.""" response = copy.deepcopy(ref) del response['identity_provider'] return response def new_ref(self, **kwargs): kwargs.setdefault('id', uuid.uuid4().hex) kwargs.setdefault('mapping_id', uuid.uuid4().hex) kwargs.setdefault('identity_provider', uuid.uuid4().hex) return kwargs def build_parts(self, idp_id, protocol_id=None): """Build array used to construct mocking URL. Construct and return array with URL parts later used by methods like utils.TestCase.stub_entity(). Example of URL: ``OS-FEDERATION/identity_providers/{idp_id}/ protocols/{protocol_id}`` """ parts = ['OS-FEDERATION', 'identity_providers', idp_id, 'protocols'] if protocol_id: parts.append(protocol_id) return parts def test_build_url_provide_base_url(self): base_url = uuid.uuid4().hex parameters = {'base_url': base_url} url = self.manager.build_url(dict_args_in_out=parameters) self.assertEqual('/'.join([base_url, self.collection_key]), url) def test_build_url_w_idp_id(self): """Test whether kwargs ``base_url`` discards object's base_url. This test shows, that when ``base_url`` is specified in the dict_args_in_out dictionary, values like ``identity_provider_id`` are not taken into consideration while building the url. """ base_url, identity_provider_id = uuid.uuid4().hex, uuid.uuid4().hex parameters = { 'base_url': base_url, 'identity_provider_id': identity_provider_id } url = self.manager.build_url(dict_args_in_out=parameters) self.assertEqual('/'.join([base_url, self.collection_key]), url) def test_build_url_default_base_url(self): identity_provider_id = uuid.uuid4().hex parameters = { 'identity_provider_id': identity_provider_id } url = self.manager.build_url(dict_args_in_out=parameters) self.assertEqual( '/'.join([self.manager.base_url, identity_provider_id, self.manager.collection_key]), url) def test_create(self): """Test creating federation protocol tied to an Identity Provider. URL to be tested: PUT /OS-FEDERATION/identity_providers/ $identity_provider/protocols/$protocol """ ref = self.new_ref() expected = self._transform_to_response(ref) parts = self.build_parts( idp_id=ref['identity_provider'], protocol_id=ref['id']) self.stub_entity('PUT', entity=expected, parts=parts, status_code=201) returned = self.manager.create( protocol_id=ref['id'], identity_provider=ref['identity_provider'], mapping=ref['mapping_id']) self.assertEqual(expected, returned.to_dict()) request_body = {'mapping_id': ref['mapping_id']} self.assertEntityRequestBodyIs(request_body) def test_get(self): """Fetch federation protocol object. URL to be tested: GET /OS-FEDERATION/identity_providers/ $identity_provider/protocols/$protocol """ ref = self.new_ref() expected = self._transform_to_response(ref) parts = self.build_parts( idp_id=ref['identity_provider'], protocol_id=ref['id']) self.stub_entity('GET', entity=expected, parts=parts, status_code=201) returned = self.manager.get(ref['identity_provider'], ref['id']) self.assertIsInstance(returned, self.model) self.assertEqual(expected, returned.to_dict()) def test_delete(self): """Delete federation protocol object. URL to be tested: DELETE /OS-FEDERATION/identity_providers/ $identity_provider/protocols/$protocol """ ref = self.new_ref() parts = self.build_parts( idp_id=ref['identity_provider'], protocol_id=ref['id']) self.stub_entity('DELETE', parts=parts, status_code=204) self.manager.delete(ref['identity_provider'], ref['id']) def test_list(self): """Test listing all federation protocols tied to the Identity Provider. URL to be tested: GET /OS-FEDERATION/identity_providers/ $identity_provider/protocols """ def _ref_protocols(): return { 'id': uuid.uuid4().hex, 'mapping_id': uuid.uuid4().hex } ref = self.new_ref() expected = [_ref_protocols() for _ in range(3)] parts = self.build_parts(idp_id=ref['identity_provider']) self.stub_entity('GET', parts=parts, entity=expected, status_code=200) returned = self.manager.list(ref['identity_provider']) for obj, ref_obj in zip(returned, expected): self.assertEqual(obj.to_dict(), ref_obj) def test_list_by_id(self): # The test in the parent class needs to be overridden because it # assumes globally unique IDs, which is not the case with protocol IDs # (which are contextualized per identity provider). ref = self.new_ref() super(ProtocolTests, self).test_list_by_id( ref=ref, identity_provider=ref['identity_provider'], id=ref['id']) def test_list_params(self): request_args = self.new_ref() filter_kwargs = {uuid.uuid4().hex: uuid.uuid4().hex} parts = self.build_parts(request_args['identity_provider']) # Return HTTP 401 as we don't accept such requests. self.stub_entity('GET', parts=parts, status_code=401) self.assertRaises(exceptions.Unauthorized, self.manager.list, request_args['identity_provider'], **filter_kwargs) self.assertQueryStringContains(**filter_kwargs) def test_update(self): """Test updating federation protocol. URL to be tested: PATCH /OS-FEDERATION/identity_providers/ $identity_provider/protocols/$protocol """ ref = self.new_ref() expected = self._transform_to_response(ref) parts = self.build_parts( idp_id=ref['identity_provider'], protocol_id=ref['id']) self.stub_entity('PATCH', parts=parts, entity=expected, status_code=200) returned = self.manager.update(ref['identity_provider'], ref['id'], mapping=ref['mapping_id']) self.assertIsInstance(returned, self.model) self.assertEqual(expected, returned.to_dict()) request_body = {'mapping_id': ref['mapping_id']} self.assertEntityRequestBodyIs(request_body) class EntityManagerTests(utils.ClientTestCase): def test_create_object_expect_fail(self): self.assertRaises(TypeError, base.EntityManager, self.client) class FederationProjectTests(utils.ClientTestCase): def setUp(self): super(FederationProjectTests, self).setUp() self.key = 'project' self.collection_key = 'projects' self.model = projects.Project self.manager = self.client.federation.projects self.URL = "%s%s" % (self.TEST_URL, '/auth/projects') def new_ref(self, **kwargs): kwargs.setdefault('id', uuid.uuid4().hex) kwargs.setdefault('domain_id', uuid.uuid4().hex) kwargs.setdefault('enabled', True) kwargs.setdefault('name', uuid.uuid4().hex) return kwargs def test_list_accessible_projects(self): projects_ref = [self.new_ref(), self.new_ref()] projects_json = { self.collection_key: [self.new_ref(), self.new_ref()] } self.requests_mock.get(self.URL, json=projects_json) returned_list = self.manager.list() self.assertEqual(len(projects_ref), len(returned_list)) for project in returned_list: self.assertIsInstance(project, self.model) class K2KFederatedProjectTests(utils.TestCase): TEST_ROOT_URL = 'http://127.0.0.1:5000/' TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3') TEST_PASS = 'password' REQUEST_ECP_URL = TEST_URL + '/auth/OS-FEDERATION/saml2/ecp' SP_ID = 'sp1' SP_ROOT_URL = 'https://example.com/v3' SP_URL = 'https://example.com/Shibboleth.sso/SAML2/ECP' SP_AUTH_URL = (SP_ROOT_URL + '/OS-FEDERATION/identity_providers' '/testidp/protocols/saml2/auth') def setUp(self): super(K2KFederatedProjectTests, self).setUp() self.token_v3 = fixture.V3Token() self.token_v3.add_service_provider( self.SP_ID, self.SP_AUTH_URL, self.SP_URL) self.session = session.Session() self.collection_key = 'projects' self.model = projects.Project self.URL = '%s%s' % (self.SP_ROOT_URL, '/auth/projects') self.k2kplugin = self.get_plugin() self._mock_k2k_flow_urls() def new_ref(self, **kwargs): kwargs.setdefault('id', uuid.uuid4().hex) kwargs.setdefault('domain_id', uuid.uuid4().hex) kwargs.setdefault('enabled', True) kwargs.setdefault('name', uuid.uuid4().hex) return kwargs def _get_base_plugin(self): self.stub_url('POST', ['auth', 'tokens'], headers={'X-Subject-Token': uuid.uuid4().hex}, json=self.token_v3) return v3.Password(self.TEST_URL, username=self.TEST_USER, password=self.TEST_PASS) def _mock_k2k_flow_urls(self): # We need to check the auth versions available self.requests_mock.get( self.TEST_URL, json={'version': fixture.V3Discovery(self.TEST_URL)}, headers={'Content-Type': 'application/json'}) # The identity provider receives a request for an ECP wrapped # assertion. This assertion contains the user authentication info # and will be presented to the service provider self.requests_mock.register_uri( 'POST', self.REQUEST_ECP_URL, content=k2k_fixtures.ECP_ENVELOPE.encode(), headers={'Content-Type': 'application/vnd.paos+xml'}, status_code=200) # The service provider is presented with the ECP wrapped assertion # generated by the identity provider and should return a redirect # (302 or 303) upon successful authentication self.requests_mock.register_uri( 'POST', self.SP_URL, content=k2k_fixtures.TOKEN_BASED_ECP.encode(), headers={'Content-Type': 'application/vnd.paos+xml'}, status_code=302) # Should not follow the redirect URL, but use the auth_url attribute self.requests_mock.register_uri( 'GET', self.SP_AUTH_URL, json=k2k_fixtures.UNSCOPED_TOKEN, headers={'X-Subject-Token': k2k_fixtures.UNSCOPED_TOKEN_HEADER}) def get_plugin(self, **kwargs): kwargs.setdefault('base_plugin', self._get_base_plugin()) kwargs.setdefault('service_provider', self.SP_ID) return v3.Keystone2Keystone(**kwargs) def test_list_projects(self): k2k_client = client.Client(session=self.session, auth=self.k2kplugin) self.requests_mock.get(self.URL, json={ self.collection_key: [self.new_ref(), self.new_ref()] }) self.requests_mock.get(self.SP_ROOT_URL, json={ 'version': fixture.discovery.V3Discovery(self.SP_ROOT_URL) }) returned_list = k2k_client.federation.projects.list() self.assertThat(returned_list, matchers.HasLength(2)) for project in returned_list: self.assertIsInstance(project, self.model) class FederationDomainTests(utils.ClientTestCase): def setUp(self): super(FederationDomainTests, self).setUp() self.key = 'domain' self.collection_key = 'domains' self.model = domains.Domain self.manager = self.client.federation.domains self.URL = "%s%s" % (self.TEST_URL, '/auth/domains') def new_ref(self, **kwargs): kwargs.setdefault('id', uuid.uuid4().hex) kwargs.setdefault('enabled', True) kwargs.setdefault('name', uuid.uuid4().hex) kwargs.setdefault('description', uuid.uuid4().hex) return kwargs def test_list_accessible_domains(self): domains_ref = [self.new_ref(), self.new_ref()] domains_json = { self.collection_key: domains_ref } self.requests_mock.get(self.URL, json=domains_json) returned_list = self.manager.list() self.assertEqual(len(domains_ref), len(returned_list)) for domain in returned_list: self.assertIsInstance(domain, self.model) class FederatedTokenTests(utils.ClientTestCase): def setUp(self): super(FederatedTokenTests, self).setUp() token = fixture.V3FederationToken() token.set_project_scope() token.add_role() self.federated_token = access.AccessInfo.factory(body=token) def test_federated_property_federated_token(self): """Check if is_federated property returns expected value.""" self.assertTrue(self.federated_token.is_federated) def test_get_user_domain_name(self): """Ensure a federated user's domain name does not exist.""" self.assertIsNone(self.federated_token.user_domain_name) def test_get_user_domain_id(self): """Ensure a federated user's domain ID does not exist.""" self.assertEqual('Federated', self.federated_token.user_domain_id) class ServiceProviderTests(utils.ClientTestCase, utils.CrudTests): def setUp(self): super(ServiceProviderTests, self).setUp() self.key = 'service_provider' self.collection_key = 'service_providers' self.model = service_providers.ServiceProvider self.manager = self.client.federation.service_providers self.path_prefix = 'OS-FEDERATION' def new_ref(self, **kwargs): kwargs.setdefault('auth_url', uuid.uuid4().hex) kwargs.setdefault('description', uuid.uuid4().hex) kwargs.setdefault('enabled', True) kwargs.setdefault('id', uuid.uuid4().hex) kwargs.setdefault('sp_url', uuid.uuid4().hex) return kwargs def test_positional_parameters_expect_fail(self): """Ensure CrudManager raises TypeError exceptions. After passing wrong number of positional arguments an exception should be raised. Operations to be tested: * create() * get() * list() * delete() * update() """ POS_PARAM_1 = uuid.uuid4().hex POS_PARAM_2 = uuid.uuid4().hex POS_PARAM_3 = uuid.uuid4().hex PARAMETERS = { 'create': (POS_PARAM_1, POS_PARAM_2), 'get': (POS_PARAM_1, POS_PARAM_2), 'list': (POS_PARAM_1, POS_PARAM_2), 'update': (POS_PARAM_1, POS_PARAM_2, POS_PARAM_3), 'delete': (POS_PARAM_1, POS_PARAM_2) } for f_name, args in PARAMETERS.items(): self.assertRaises(TypeError, getattr(self.manager, f_name), *args) def test_create(self): ref = self.new_ref() # req_ref argument allows you to specify a different # signature for the request when the manager does some # conversion before doing the request (e.g. converting # from datetime object to timestamp string) req_ref = ref.copy() req_ref.pop('id') self.stub_entity('PUT', entity=ref, id=ref['id'], status_code=201) returned = self.manager.create(**ref) self.assertIsInstance(returned, self.model) for attr in req_ref: self.assertEqual( getattr(returned, attr), req_ref[attr], 'Expected different %s' % attr) self.assertEntityRequestBodyIs(req_ref) class IdentityProviderRequestIdTests(utils.TestRequestId): def setUp(self): super(IdentityProviderRequestIdTests, self).setUp() self.mgr = identity_providers.IdentityProviderManager(self.client) def _mock_request_method(self, method=None, body=None): return self.useFixture(fixtures.MockPatchObject( self.client, method, autospec=True, return_value=(self.resp, body)) ).mock def test_get_identity_provider(self): body = {"identity_provider": {"name": "admin"}} get_mock = self._mock_request_method(method='get', body=body) response = self.mgr.get("admin") self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/admin') def test_list_identity_provider(self): body = {"identity_providers": [{"name": "admin"}]} get_mock = self._mock_request_method(method='get', body=body) response = self.mgr.list() self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with('OS-FEDERATION/identity_providers?') def test_create_identity_provider(self): body = {"identity_provider": {"name": "admin"}} self._mock_request_method(method='post', body=body) put_mock = self._mock_request_method(method='put', body=body) response = self.mgr.create(id="admin", description='fake') self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) put_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/admin', body={'identity_provider': {'description': 'fake'}}) def test_update_identity_provider(self): body = {"identity_provider": {"name": "admin"}} patch_mock = self._mock_request_method(method='patch', body=body) self._mock_request_method(method='post', body=body) response = self.mgr.update("admin") self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) patch_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/admin', body={ 'identity_provider': {}}) def test_delete_identity_provider(self): get_mock = self._mock_request_method(method='delete') _, resp = self.mgr.delete("admin") self.assertEqual(resp.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/admin') class MappingRequestIdTests(utils.TestRequestId): def setUp(self): super(MappingRequestIdTests, self).setUp() self.mgr = mappings.MappingManager(self.client) def _mock_request_method(self, method=None, body=None): return self.useFixture(fixtures.MockPatchObject( self.client, method, autospec=True, return_value=(self.resp, body)) ).mock def test_get_mapping(self): body = {"mapping": {"name": "admin"}} get_mock = self._mock_request_method(method='get', body=body) response = self.mgr.get("admin") self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with('OS-FEDERATION/mappings/admin') def test_list_mapping(self): body = {"mappings": [{"name": "admin"}]} get_mock = self._mock_request_method(method='get', body=body) response = self.mgr.list() self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with('OS-FEDERATION/mappings?') def test_create_mapping(self): body = {"mapping": {"name": "admin"}} self._mock_request_method(method='post', body=body) put_mock = self._mock_request_method(method='put', body=body) response = self.mgr.create(mapping_id="admin", description='fake') self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) put_mock.assert_called_once_with( 'OS-FEDERATION/mappings/admin', body={ 'mapping': {'description': 'fake'}}) def test_update_mapping(self): body = {"mapping": {"name": "admin"}} patch_mock = self._mock_request_method(method='patch', body=body) self._mock_request_method(method='post', body=body) response = self.mgr.update("admin") self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) patch_mock.assert_called_once_with( 'OS-FEDERATION/mappings/admin', body={'mapping': {}}) def test_delete_mapping(self): get_mock = self._mock_request_method(method='delete') _, resp = self.mgr.delete("admin") self.assertEqual(resp.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with('OS-FEDERATION/mappings/admin') class ProtocolRequestIdTests(utils.TestRequestId): def setUp(self): super(ProtocolRequestIdTests, self).setUp() self.mgr = protocols.ProtocolManager(self.client) def _mock_request_method(self, method=None, body=None): return self.useFixture(fixtures.MockPatchObject( self.client, method, autospec=True, return_value=(self.resp, body)) ).mock def test_get_protocol(self): body = {"protocol": {"name": "admin"}} get_mock = self._mock_request_method(method='get', body=body) response = self.mgr.get("admin", "protocol") self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/admin/protocols/protocol') def test_list_protocol(self): body = {"protocols": [{"name": "admin"}]} get_mock = self._mock_request_method(method='get', body=body) response = self.mgr.list("identity_provider") self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/identity_provider/protocols?') def test_create_protocol(self): body = {"protocol": {"name": "admin"}} self._mock_request_method(method='post', body=body) put_mock = self._mock_request_method(method='put', body=body) response = self.mgr.create( protocol_id="admin", identity_provider='fake', mapping='fake') self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) put_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/fake/protocols/admin', body={ 'protocol': {'mapping_id': 'fake'}}) def test_update_protocol(self): body = {"protocol": {"name": "admin"}} patch_mock = self._mock_request_method(method='patch', body=body) self._mock_request_method(method='post', body=body) response = self.mgr.update(protocol="admin", identity_provider='fake', mapping='fake') self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) patch_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/fake/protocols/admin', body={ 'protocol': {'mapping_id': 'fake'}}) def test_delete_protocol(self): get_mock = self._mock_request_method(method='delete') _, resp = self.mgr.delete("identity_provider", "protocol") self.assertEqual(resp.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with( 'OS-FEDERATION/identity_providers/' 'identity_provider/protocols/protocol') class ServiceProviderRequestIdTests(utils.TestRequestId): def setUp(self): super(ServiceProviderRequestIdTests, self).setUp() self.mgr = service_providers.ServiceProviderManager(self.client) def _mock_request_method(self, method=None, body=None): return self.useFixture(fixtures.MockPatchObject( self.client, method, autospec=True, return_value=(self.resp, body)) ).mock def test_get_service_provider(self): body = {"service_provider": {"name": "admin"}} get_mock = self._mock_request_method(method='get', body=body) response = self.mgr.get("provider") self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with( 'OS-FEDERATION/service_providers/provider') def test_list_service_provider(self): body = {"service_providers": [{"name": "admin"}]} get_mock = self._mock_request_method(method='get', body=body) response = self.mgr.list() self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with('OS-FEDERATION/service_providers?') def test_create_service_provider(self): body = {"service_provider": {"name": "admin"}} self._mock_request_method(method='post', body=body) put_mock = self._mock_request_method(method='put', body=body) response = self.mgr.create(id='provider') self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) put_mock.assert_called_once_with( 'OS-FEDERATION/service_providers/provider', body={ 'service_provider': {}}) def test_update_service_provider(self): body = {"service_provider": {"name": "admin"}} patch_mock = self._mock_request_method(method='patch', body=body) self._mock_request_method(method='post', body=body) response = self.mgr.update("provider") self.assertEqual(response.request_ids[0], self.TEST_REQUEST_ID) patch_mock.assert_called_once_with( 'OS-FEDERATION/service_providers/provider', body={ 'service_provider': {}}) def test_delete_service_provider(self): get_mock = self._mock_request_method(method='delete') _, resp = self.mgr.delete("provider") self.assertEqual(resp.request_ids[0], self.TEST_REQUEST_ID) get_mock.assert_called_once_with( 'OS-FEDERATION/service_providers/provider')