Merge "Refactor self.*_api out of tests"

This commit is contained in:
Zuul 2018-02-05 22:34:16 +00:00 committed by Gerrit Code Review
commit 7fec65fa9a
19 changed files with 1835 additions and 1445 deletions

File diff suppressed because it is too large Load Diff

View File

@ -14,12 +14,15 @@ import uuid
from six.moves import range
from keystone.common import provider_api
from keystone.credential.providers import fernet as credential_provider
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
PROVIDERS = provider_api.ProviderAPIs
class SqlTests(unit.SQLDriverOverrides, unit.TestCase):
@ -46,7 +49,9 @@ class SqlCredential(SqlTests):
credential = unit.new_credential_ref(user_id=user_id,
extra=uuid.uuid4().hex,
type=uuid.uuid4().hex)
self.credential_api.create_credential(credential['id'], credential)
PROVIDERS.credential_api.create_credential(
credential['id'], credential
)
return credential
def _validate_credential_list(self, retrieved_credentials,
@ -79,9 +84,9 @@ class SqlCredential(SqlTests):
self.credentials.append(cred)
def test_backend_credential_sql_hints_none(self):
credentials = self.credential_api.list_credentials(hints=None)
credentials = PROVIDERS.credential_api.list_credentials(hints=None)
self._validate_credential_list(credentials, self.user_credentials)
def test_backend_credential_sql_no_hints(self):
credentials = self.credential_api.list_credentials()
credentials = PROVIDERS.credential_api.list_credentials()
self._validate_credential_list(credentials, self.user_credentials)

View File

@ -18,9 +18,12 @@ import uuid
from six.moves import http_client
from testtools import matchers
from keystone.common import provider_api
from keystone.tests import unit
from keystone.tests.unit import test_v3
PROVIDERS = provider_api.ProviderAPIs
class EndpointFilterTestCase(test_v3.RestfulTestCase):
@ -296,14 +299,14 @@ class EndpointFilterCRUDTestCase(EndpointFilterTestCase):
region_id=self.region_id,
interface='public',
id=endpoint_id2)
self.catalog_api.create_endpoint(endpoint_id2, endpoint2.copy())
PROVIDERS.catalog_api.create_endpoint(endpoint_id2, endpoint2.copy())
# create endpoint project association.
self.put(self.default_request_url)
# should get back only one endpoint that was just created.
user_id = uuid.uuid4().hex
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -313,13 +316,13 @@ class EndpointFilterCRUDTestCase(EndpointFilterTestCase):
# add the second endpoint to default project, bypassing
# catalog_api API manager.
self.catalog_api.driver.add_endpoint_to_project(
PROVIDERS.catalog_api.driver.add_endpoint_to_project(
endpoint_id2,
self.default_domain_project_id)
# but, we can just get back one endpoint from the cache, since the
# catalog is pulled out from cache and its haven't been invalidated.
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -327,7 +330,7 @@ class EndpointFilterCRUDTestCase(EndpointFilterTestCase):
# remove the endpoint2 from the default project, and add it again via
# catalog_api API manager.
self.catalog_api.driver.remove_endpoint_from_project(
PROVIDERS.catalog_api.driver.remove_endpoint_from_project(
endpoint_id2,
self.default_domain_project_id)
@ -358,7 +361,7 @@ class EndpointFilterCRUDTestCase(EndpointFilterTestCase):
region_id=self.region_id,
interface='public',
id=endpoint_id2)
self.catalog_api.create_endpoint(endpoint_id2, endpoint2.copy())
PROVIDERS.catalog_api.create_endpoint(endpoint_id2, endpoint2.copy())
# create endpoint project association.
self.put(self.default_request_url)
@ -370,7 +373,7 @@ class EndpointFilterCRUDTestCase(EndpointFilterTestCase):
# should get back only one endpoint that was just created.
user_id = uuid.uuid4().hex
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -382,14 +385,14 @@ class EndpointFilterCRUDTestCase(EndpointFilterTestCase):
# remove the endpoint2 from the default project, bypassing
# catalog_api API manager.
self.catalog_api.driver.remove_endpoint_from_project(
PROVIDERS.catalog_api.driver.remove_endpoint_from_project(
endpoint_id2,
self.default_domain_project_id)
# but, we can just still get back two endpoints from the cache,
# since the catalog is pulled out from cache and its haven't
# been invalidated.
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -397,7 +400,7 @@ class EndpointFilterCRUDTestCase(EndpointFilterTestCase):
# add back the endpoint2 to the default project, and remove it by
# catalog_api API manage.
self.catalog_api.driver.add_endpoint_to_project(
PROVIDERS.catalog_api.driver.add_endpoint_to_project(
endpoint_id2,
self.default_domain_project_id)
@ -411,7 +414,7 @@ class EndpointFilterCRUDTestCase(EndpointFilterTestCase):
# should only get back one endpoint since the cache has been
# invalidated after the endpoint project association was removed.
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -522,7 +525,7 @@ class EndpointFilterTokenRequestTestCase(EndpointFilterTestCase):
region_id=self.region_id,
interface='public',
id=endpoint_id2)
self.catalog_api.create_endpoint(endpoint_id2, endpoint2.copy())
PROVIDERS.catalog_api.create_endpoint(endpoint_id2, endpoint2.copy())
# add second endpoint to default project
self.put('/OS-EP-FILTER/projects/%(project_id)s'
@ -533,7 +536,7 @@ class EndpointFilterTokenRequestTestCase(EndpointFilterTestCase):
# remove the temporary reference
# this will create inconsistency in the endpoint filter table
# which is fixed during the catalog creation for token request
self.catalog_api.delete_endpoint(endpoint_id2)
PROVIDERS.catalog_api.delete_endpoint(endpoint_id2)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
@ -566,8 +569,9 @@ class EndpointFilterTokenRequestTestCase(EndpointFilterTestCase):
'enabled': False,
'interface': 'internal'
})
self.catalog_api.create_endpoint(disabled_endpoint_id,
disabled_endpoint_ref)
PROVIDERS.catalog_api.create_endpoint(
disabled_endpoint_id, disabled_endpoint_ref
)
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
@ -1144,7 +1148,7 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
# association, this is needed when a project scoped token is issued
# and "endpoint_filter.sql" backend driver is in place.
user_id = uuid.uuid4().hex
catalog_list = self.catalog_api.get_v3_catalog(
catalog_list = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
self.assertEqual(2, len(catalog_list))
@ -1163,7 +1167,7 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
endpoints = self.assertValidEndpointListResponse(r)
self.assertEqual(1, len(endpoints))
catalog_list = self.catalog_api.get_v3_catalog(
catalog_list = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
self.assertEqual(1, len(catalog_list))
@ -1259,14 +1263,14 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
region_id=self.region_id,
interface='admin',
id=endpoint_id2)
self.catalog_api.create_endpoint(endpoint_id2, endpoint2)
PROVIDERS.catalog_api.create_endpoint(endpoint_id2, endpoint2)
# create a project and endpoint association.
self.put(self.default_request_url)
# there is only one endpoint associated with the default project.
user_id = uuid.uuid4().hex
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -1278,13 +1282,13 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
# add the endpoint group to default project, bypassing
# catalog_api API manager.
self.catalog_api.driver.add_endpoint_group_to_project(
PROVIDERS.catalog_api.driver.add_endpoint_group_to_project(
endpoint_group_id,
self.default_domain_project_id)
# can get back only one endpoint from the cache, since the catalog
# is pulled out from cache.
invalid_catalog = self.catalog_api.get_v3_catalog(
invalid_catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -1294,16 +1298,16 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
# remove the endpoint group from default project, and add it again via
# catalog_api API manager.
self.catalog_api.driver.remove_endpoint_group_from_project(
PROVIDERS.catalog_api.driver.remove_endpoint_group_from_project(
endpoint_group_id,
self.default_domain_project_id)
# add the endpoint group to default project.
self.catalog_api.add_endpoint_group_to_project(
PROVIDERS.catalog_api.add_endpoint_group_to_project(
endpoint_group_id,
self.default_domain_project_id)
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -1329,7 +1333,7 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
region_id=self.region_id,
interface='admin',
id=endpoint_id2)
self.catalog_api.create_endpoint(endpoint_id2, endpoint2)
PROVIDERS.catalog_api.create_endpoint(endpoint_id2, endpoint2)
# create project and endpoint association.
self.put(self.default_request_url)
@ -1339,7 +1343,7 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
self.DEFAULT_ENDPOINT_GROUP_URL, self.DEFAULT_ENDPOINT_GROUP_BODY)
# add the endpoint group to default project.
self.catalog_api.add_endpoint_group_to_project(
PROVIDERS.catalog_api.add_endpoint_group_to_project(
endpoint_group_id,
self.default_domain_project_id)
@ -1347,7 +1351,7 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
# association, the other one is from endpoint_group project
# association.
user_id = uuid.uuid4().hex
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -1359,13 +1363,13 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
# remove endpoint_group project association, bypassing
# catalog_api API manager.
self.catalog_api.driver.remove_endpoint_group_from_project(
PROVIDERS.catalog_api.driver.remove_endpoint_group_from_project(
endpoint_group_id,
self.default_domain_project_id)
# still get back two endpoints, since the catalog is pulled out
# from cache and the cache haven't been invalidated.
invalid_catalog = self.catalog_api.get_v3_catalog(
invalid_catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)
@ -1375,18 +1379,18 @@ class EndpointGroupCRUDTestCase(EndpointFilterTestCase):
# add back the endpoint_group project association and remove it from
# manager.
self.catalog_api.driver.add_endpoint_group_to_project(
PROVIDERS.catalog_api.driver.add_endpoint_group_to_project(
endpoint_group_id,
self.default_domain_project_id)
self.catalog_api.remove_endpoint_group_from_project(
PROVIDERS.catalog_api.remove_endpoint_group_from_project(
endpoint_group_id,
self.default_domain_project_id)
# should only get back one endpoint since the cache has been
# invalidated after the endpoint_group project association was
# removed.
catalog = self.catalog_api.get_v3_catalog(
catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id,
self.default_domain_project_id)

View File

@ -17,6 +17,7 @@ import fixtures
import ldappool
import mock
from keystone.common import provider_api
import keystone.conf
from keystone.identity.backends import ldap
from keystone.identity.backends.ldap import common as common_ldap
@ -26,6 +27,7 @@ from keystone.tests.unit import test_backend_ldap
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class LdapPoolCommonTestMixin(object):
@ -36,7 +38,7 @@ class LdapPoolCommonTestMixin(object):
def test_handler_with_use_pool_enabled(self):
# by default use_pool and use_auth_pool is enabled in test pool config
user_ref = self.identity_api.get_user(self.user_foo['id'])
user_ref = PROVIDERS.identity_api.get_user(self.user_foo['id'])
self.user_foo.pop('password')
self.assertDictEqual(self.user_foo, user_ref)
@ -90,7 +92,7 @@ class LdapPoolCommonTestMixin(object):
def test_pool_retry_delay_set(self):
# just make one identity call to initiate ldap connection if not there
self.identity_api.get_user(self.user_foo['id'])
PROVIDERS.identity_api.get_user(self.user_foo['id'])
# get related connection manager instance
ldappool_cm = self.conn_pools[CONF.ldap.url]
@ -174,7 +176,7 @@ class LdapPoolCommonTestMixin(object):
# authenticate so that connection is added to pool before password
# change
user_ref = self.identity_api.authenticate(
user_ref = PROVIDERS.identity_api.authenticate(
self.make_request(),
user_id=self.user_sna['id'],
password=self.user_sna['password'])
@ -185,11 +187,11 @@ class LdapPoolCommonTestMixin(object):
new_password = 'new_password'
user_ref['password'] = new_password
self.identity_api.update_user(user_ref['id'], user_ref)
PROVIDERS.identity_api.update_user(user_ref['id'], user_ref)
# now authenticate again to make sure new password works with
# connection pool
user_ref2 = self.identity_api.authenticate(
user_ref2 = PROVIDERS.identity_api.authenticate(
self.make_request(),
user_id=self.user_sna['id'],
password=new_password)
@ -201,7 +203,7 @@ class LdapPoolCommonTestMixin(object):
# is only one connection in pool which get bind again with updated
# password..so no old bind is maintained in this case.
self.assertRaises(AssertionError,
self.identity_api.authenticate,
PROVIDERS.identity_api.authenticate,
self.make_request(),
user_id=self.user_sna['id'],
password=old_password)
@ -223,7 +225,7 @@ class LDAPIdentity(LdapPoolCommonTestMixin,
# super class loads db fixtures which establishes ldap connection
# so adding dummy call to highlight connection pool initialization
# as its not that obvious though its not needed here
self.identity_api.get_user(self.user_foo['id'])
PROVIDERS.identity_api.get_user(self.user_foo['id'])
def config_files(self):
config_files = super(LDAPIdentity, self).config_files()
@ -236,8 +238,8 @@ class LDAPIdentity(LdapPoolCommonTestMixin,
return arg
mocked_method.side_effect = side_effect
# invalidate the cache to get utf8_encode function called.
self.identity_api.get_user.invalidate(self.identity_api,
self.user_foo['id'])
self.identity_api.get_user(self.user_foo['id'])
PROVIDERS.identity_api.get_user.invalidate(PROVIDERS.identity_api,
self.user_foo['id'])
PROVIDERS.identity_api.get_user(self.user_foo['id'])
mocked_method.assert_any_call(CONF.ldap.user)
mocked_method.assert_any_call(CONF.ldap.password)

View File

@ -242,8 +242,9 @@ class SqlIdentity(SqlTests,
resource_tests.ResourceTests):
def test_password_hashed(self):
with sql.session_for_read() as session:
user_ref = self.identity_api._get_user(session,
self.user_foo['id'])
user_ref = PROVIDERS.identity_api._get_user(
session, self.user_foo['id']
)
self.assertNotEqual(self.user_foo['password'],
user_ref['password'])
@ -251,46 +252,49 @@ class SqlIdentity(SqlTests,
user_dict = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id)
user_dict["password"] = None
new_user_dict = self.identity_api.create_user(user_dict)
new_user_dict = PROVIDERS.identity_api.create_user(user_dict)
with sql.session_for_read() as session:
new_user_ref = self.identity_api._get_user(session,
new_user_dict['id'])
new_user_ref = PROVIDERS.identity_api._get_user(
session, new_user_dict['id']
)
self.assertIsNone(new_user_ref.password)
def test_update_user_with_null_password(self):
user_dict = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id)
self.assertTrue(user_dict['password'])
new_user_dict = self.identity_api.create_user(user_dict)
new_user_dict = PROVIDERS.identity_api.create_user(user_dict)
new_user_dict["password"] = None
new_user_dict = self.identity_api.update_user(new_user_dict['id'],
new_user_dict)
new_user_dict = PROVIDERS.identity_api.update_user(
new_user_dict['id'], new_user_dict
)
with sql.session_for_read() as session:
new_user_ref = self.identity_api._get_user(session,
new_user_dict['id'])
new_user_ref = PROVIDERS.identity_api._get_user(
session, new_user_dict['id']
)
self.assertIsNone(new_user_ref.password)
def test_delete_user_with_project_association(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
user = PROVIDERS.identity_api.create_user(user)
role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
self.assignment_api.add_role_to_user_and_project(user['id'],
self.tenant_bar['id'],
role_member['id'])
self.identity_api.delete_user(user['id'])
PROVIDERS.role_api.create_role(role_member['id'], role_member)
PROVIDERS.assignment_api.add_role_to_user_and_project(
user['id'], self.tenant_bar['id'], role_member['id']
)
PROVIDERS.identity_api.delete_user(user['id'])
self.assertRaises(exception.UserNotFound,
self.assignment_api.list_projects_for_user,
PROVIDERS.assignment_api.list_projects_for_user,
user['id'])
def test_create_null_user_name(self):
user = unit.new_user_ref(name=None,
domain_id=CONF.identity.default_domain_id)
self.assertRaises(exception.ValidationError,
self.identity_api.create_user,
PROVIDERS.identity_api.create_user,
user)
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user_by_name,
PROVIDERS.identity_api.get_user_by_name,
user['name'],
CONF.identity.default_domain_id)
@ -302,11 +306,11 @@ class SqlIdentity(SqlTests,
# create a ref with a lowercase name
ref = unit.new_user_ref(name=uuid.uuid4().hex.lower(),
domain_id=CONF.identity.default_domain_id)
ref = self.identity_api.create_user(ref)
ref = PROVIDERS.identity_api.create_user(ref)
# assign a new ID with the same name, but this time in uppercase
ref['name'] = ref['name'].upper()
self.identity_api.create_user(ref)
PROVIDERS.identity_api.create_user(ref)
def test_create_project_case_sensitivity(self):
# project name case sensitivity is down to the fact that it is marked
@ -315,23 +319,23 @@ class SqlIdentity(SqlTests,
# create a ref with a lowercase name
ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(ref['id'], ref)
PROVIDERS.resource_api.create_project(ref['id'], ref)
# assign a new ID with the same name, but this time in uppercase
ref['id'] = uuid.uuid4().hex
ref['name'] = ref['name'].upper()
self.resource_api.create_project(ref['id'], ref)
PROVIDERS.resource_api.create_project(ref['id'], ref)
def test_delete_project_with_user_association(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
user = PROVIDERS.identity_api.create_user(user)
role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
self.assignment_api.add_role_to_user_and_project(user['id'],
self.tenant_bar['id'],
role_member['id'])
self.resource_api.delete_project(self.tenant_bar['id'])
tenants = self.assignment_api.list_projects_for_user(user['id'])
PROVIDERS.role_api.create_role(role_member['id'], role_member)
PROVIDERS.assignment_api.add_role_to_user_and_project(
user['id'], self.tenant_bar['id'], role_member['id']
)
PROVIDERS.resource_api.delete_project(self.tenant_bar['id'])
tenants = PROVIDERS.assignment_api.list_projects_for_user(user['id'])
self.assertEqual([], tenants)
def test_update_project_returns_extra(self):
@ -349,12 +353,12 @@ class SqlIdentity(SqlTests,
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id)
project[arbitrary_key] = arbitrary_value
ref = self.resource_api.create_project(project['id'], project)
ref = PROVIDERS.resource_api.create_project(project['id'], project)
self.assertEqual(arbitrary_value, ref[arbitrary_key])
self.assertIsNone(ref.get('extra'))
ref['name'] = uuid.uuid4().hex
ref = self.resource_api.update_project(ref['id'], ref)
ref = PROVIDERS.resource_api.update_project(ref['id'], ref)
self.assertEqual(arbitrary_value, ref[arbitrary_key])
self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key])
@ -373,14 +377,14 @@ class SqlIdentity(SqlTests,
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user[arbitrary_key] = arbitrary_value
del user["id"]
ref = self.identity_api.create_user(user)
ref = PROVIDERS.identity_api.create_user(user)
self.assertEqual(arbitrary_value, ref[arbitrary_key])
self.assertIsNone(ref.get('password'))
self.assertIsNone(ref.get('extra'))
user['name'] = uuid.uuid4().hex
user['password'] = uuid.uuid4().hex
ref = self.identity_api.update_user(ref['id'], user)
ref = PROVIDERS.identity_api.update_user(ref['id'], user)
self.assertIsNone(ref.get('password'))
self.assertIsNone(ref['extra'].get('password'))
self.assertEqual(arbitrary_value, ref[arbitrary_key])
@ -388,7 +392,7 @@ class SqlIdentity(SqlTests,
def test_sql_user_to_dict_null_default_project_id(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
user = PROVIDERS.identity_api.create_user(user)
with sql.session_for_read() as session:
query = session.query(identity_sql.User)
query = query.filter_by(id=user['id'])
@ -400,24 +404,30 @@ class SqlIdentity(SqlTests,
def test_list_domains_for_user(self):
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
user = unit.new_user_ref(domain_id=domain['id'])
test_domain1 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain1['id'], test_domain1)
PROVIDERS.resource_api.create_domain(test_domain1['id'], test_domain1)
test_domain2 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain2['id'], test_domain2)
PROVIDERS.resource_api.create_domain(test_domain2['id'], test_domain2)
user = self.identity_api.create_user(user)
user_domains = self.assignment_api.list_domains_for_user(user['id'])
user = PROVIDERS.identity_api.create_user(user)
user_domains = PROVIDERS.assignment_api.list_domains_for_user(
user['id']
)
self.assertEqual(0, len(user_domains))
self.assignment_api.create_grant(user_id=user['id'],
domain_id=test_domain1['id'],
role_id=self.role_member['id'])
self.assignment_api.create_grant(user_id=user['id'],
domain_id=test_domain2['id'],
role_id=self.role_member['id'])
user_domains = self.assignment_api.list_domains_for_user(user['id'])
PROVIDERS.assignment_api.create_grant(
user_id=user['id'], domain_id=test_domain1['id'],
role_id=self.role_member['id']
)
PROVIDERS.assignment_api.create_grant(
user_id=user['id'], domain_id=test_domain2['id'],
role_id=self.role_member['id']
)
user_domains = PROVIDERS.assignment_api.list_domains_for_user(
user['id']
)
self.assertThat(user_domains, matchers.HasLength(2))
def test_list_domains_for_user_with_grants(self):
@ -425,35 +435,40 @@ class SqlIdentity(SqlTests,
# make user1 a member of both groups. Both these new domains
# should now be included, along with any direct user grants.
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
user = unit.new_user_ref(domain_id=domain['id'])
user = self.identity_api.create_user(user)
user = PROVIDERS.identity_api.create_user(user)
group1 = unit.new_group_ref(domain_id=domain['id'])
group1 = self.identity_api.create_group(group1)
group1 = PROVIDERS.identity_api.create_group(group1)
group2 = unit.new_group_ref(domain_id=domain['id'])
group2 = self.identity_api.create_group(group2)
group2 = PROVIDERS.identity_api.create_group(group2)
test_domain1 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain1['id'], test_domain1)
PROVIDERS.resource_api.create_domain(test_domain1['id'], test_domain1)
test_domain2 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain2['id'], test_domain2)
PROVIDERS.resource_api.create_domain(test_domain2['id'], test_domain2)
test_domain3 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain3['id'], test_domain3)
PROVIDERS.resource_api.create_domain(test_domain3['id'], test_domain3)
self.identity_api.add_user_to_group(user['id'], group1['id'])
self.identity_api.add_user_to_group(user['id'], group2['id'])
PROVIDERS.identity_api.add_user_to_group(user['id'], group1['id'])
PROVIDERS.identity_api.add_user_to_group(user['id'], group2['id'])
# Create 3 grants, one user grant, the other two as group grants
self.assignment_api.create_grant(user_id=user['id'],
domain_id=test_domain1['id'],
role_id=self.role_member['id'])
self.assignment_api.create_grant(group_id=group1['id'],
domain_id=test_domain2['id'],
role_id=self.role_admin['id'])
self.assignment_api.create_grant(group_id=group2['id'],
domain_id=test_domain3['id'],
role_id=self.role_admin['id'])
user_domains = self.assignment_api.list_domains_for_user(user['id'])
PROVIDERS.assignment_api.create_grant(
user_id=user['id'], domain_id=test_domain1['id'],
role_id=self.role_member['id']
)
PROVIDERS.assignment_api.create_grant(
group_id=group1['id'], domain_id=test_domain2['id'],
role_id=self.role_admin['id']
)
PROVIDERS.assignment_api.create_grant(
group_id=group2['id'], domain_id=test_domain3['id'],
role_id=self.role_admin['id']
)
user_domains = PROVIDERS.assignment_api.list_domains_for_user(
user['id']
)
self.assertThat(user_domains, matchers.HasLength(3))
def test_list_domains_for_user_with_inherited_grants(self):
@ -468,29 +483,31 @@ class SqlIdentity(SqlTests,
"""
domain1 = unit.new_domain_ref()
domain1 = self.resource_api.create_domain(domain1['id'], domain1)
domain1 = PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
domain2 = unit.new_domain_ref()
domain2 = self.resource_api.create_domain(domain2['id'], domain2)
domain2 = PROVIDERS.resource_api.create_domain(domain2['id'], domain2)
user = unit.new_user_ref(domain_id=domain1['id'])
user = self.identity_api.create_user(user)
user = PROVIDERS.identity_api.create_user(user)
group = unit.new_group_ref(domain_id=domain1['id'])
group = self.identity_api.create_group(group)
self.identity_api.add_user_to_group(user['id'], group['id'])
group = PROVIDERS.identity_api.create_group(group)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
PROVIDERS.role_api.create_role(role['id'], role)
# Create a grant on each domain, one user grant, one group grant,
# both inherited.
self.assignment_api.create_grant(user_id=user['id'],
domain_id=domain1['id'],
role_id=role['id'],
inherited_to_projects=True)
self.assignment_api.create_grant(group_id=group['id'],
domain_id=domain2['id'],
role_id=role['id'],
inherited_to_projects=True)
PROVIDERS.assignment_api.create_grant(
user_id=user['id'], domain_id=domain1['id'], role_id=role['id'],
inherited_to_projects=True
)
PROVIDERS.assignment_api.create_grant(
group_id=group['id'], domain_id=domain2['id'], role_id=role['id'],
inherited_to_projects=True
)
user_domains = self.assignment_api.list_domains_for_user(user['id'])
user_domains = PROVIDERS.assignment_api.list_domains_for_user(
user['id']
)
# No domains should be returned since both domains have only inherited
# roles assignments.
self.assertThat(user_domains, matchers.HasLength(0))
@ -504,13 +521,13 @@ class SqlIdentity(SqlTests,
for x in range(0, USER_COUNT):
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
new_user = PROVIDERS.identity_api.create_user(new_user)
test_users.append(new_user)
positive_user = test_users[0]
negative_user = test_users[1]
for x in range(0, USER_COUNT):
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
test_users[x]['id'])
self.assertEqual(0, len(group_refs))
@ -518,23 +535,23 @@ class SqlIdentity(SqlTests,
before_count = x
after_count = x + 1
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
new_group = PROVIDERS.identity_api.create_group(new_group)
test_groups.append(new_group)
# add the user to the group and ensure that the
# group count increases by one for each
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(before_count, len(group_refs))
self.identity_api.add_user_to_group(
PROVIDERS.identity_api.add_user_to_group(
positive_user['id'],
new_group['id'])
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(after_count, len(group_refs))
# Make sure the group count for the unrelated user did not change
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
@ -543,18 +560,18 @@ class SqlIdentity(SqlTests,
for x in range(0, 3):
before_count = GROUP_COUNT - x
after_count = GROUP_COUNT - x - 1
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(before_count, len(group_refs))
self.identity_api.remove_user_from_group(
PROVIDERS.identity_api.remove_user_from_group(
positive_user['id'],
test_groups[x]['id'])
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(after_count, len(group_refs))
# Make sure the group count for the unrelated user
# did not change
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
@ -570,43 +587,46 @@ class SqlIdentity(SqlTests,
"""
spoiler_project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(spoiler_project['id'],
spoiler_project)
PROVIDERS.resource_api.create_project(
spoiler_project['id'], spoiler_project
)
# First let's create a project with a None domain_id and make sure we
# can read it back.
project = unit.new_project_ref(domain_id=None, is_domain=True)
project = self.resource_api.create_project(project['id'], project)
ref = self.resource_api.get_project(project['id'])
project = PROVIDERS.resource_api.create_project(project['id'], project)
ref = PROVIDERS.resource_api.get_project(project['id'])
self.assertDictEqual(project, ref)
# Can we get it by name?
ref = self.resource_api.get_project_by_name(project['name'], None)
ref = PROVIDERS.resource_api.get_project_by_name(project['name'], None)
self.assertDictEqual(project, ref)
# Can we filter for them - create a second domain to ensure we are
# testing the receipt of more than one.
project2 = unit.new_project_ref(domain_id=None, is_domain=True)
project2 = self.resource_api.create_project(project2['id'], project2)
project2 = PROVIDERS.resource_api.create_project(
project2['id'], project2
)
hints = driver_hints.Hints()
hints.add_filter('domain_id', None)
refs = self.resource_api.list_projects(hints)
refs = PROVIDERS.resource_api.list_projects(hints)
self.assertThat(refs, matchers.HasLength(2 + self.domain_count))
self.assertIn(project, refs)
self.assertIn(project2, refs)
# Can we update it?
project['name'] = uuid.uuid4().hex
self.resource_api.update_project(project['id'], project)
ref = self.resource_api.get_project(project['id'])
PROVIDERS.resource_api.update_project(project['id'], project)
ref = PROVIDERS.resource_api.get_project(project['id'])
self.assertDictEqual(project, ref)
# Finally, make sure we can delete it
project['enabled'] = False
self.resource_api.update_project(project['id'], project)
self.resource_api.delete_project(project['id'])
PROVIDERS.resource_api.update_project(project['id'], project)
PROVIDERS.resource_api.delete_project(project['id'])
self.assertRaises(exception.ProjectNotFound,
self.resource_api.get_project,
PROVIDERS.resource_api.get_project,
project['id'])
def test_hidden_project_domain_root_is_really_hidden(self):
@ -619,7 +639,7 @@ class SqlIdentity(SqlTests,
"""
def _exercise_project_api(ref_id):
driver = self.resource_api.driver
driver = PROVIDERS.resource_api.driver
self.assertRaises(exception.ProjectNotFound,
driver.get_project,
ref_id)
@ -680,7 +700,7 @@ class SqlIdentity(SqlTests,
# create 10 users. 10 is just a random number
for i in range(10):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
self.identity_api.create_user(user)
PROVIDERS.identity_api.create_user(user)
# sqlalchemy emits various events and allows to listen to them. Here
# bound method `query_counter` will be called each time when a query
@ -699,14 +719,14 @@ class SqlIdentity(SqlTests,
sqlalchemy.event.listen(sqlalchemy.orm.query.Query, 'before_compile',
counter.query_counter)
first_call_users = self.identity_api.list_users()
first_call_users = PROVIDERS.identity_api.list_users()
first_call_counter = counter.calls
# add 10 more users
for i in range(10):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
self.identity_api.create_user(user)
PROVIDERS.identity_api.create_user(user)
counter.reset()
second_call_users = self.identity_api.list_users()
second_call_users = PROVIDERS.identity_api.list_users()
# ensure that the number of calls does not depend on the number of
# users fetched.
self.assertNotEqual(len(first_call_users), len(second_call_users))
@ -858,27 +878,27 @@ class SqlCatalog(SqlTests, catalog_tests.CatalogTests):
def test_catalog_ignored_malformed_urls(self):
service = unit.new_service_ref()
self.catalog_api.create_service(service['id'], service)
PROVIDERS.catalog_api.create_service(service['id'], service)
malformed_url = "http://192.168.1.104:8774/v2/$(tenant)s"
endpoint = unit.new_endpoint_ref(service_id=service['id'],
url=malformed_url,
region_id=None)
self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())
PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())
# NOTE(dstanek): there are no valid URLs, so nothing is in the catalog
catalog = self.catalog_api.get_catalog('fake-user', 'fake-tenant')
catalog = PROVIDERS.catalog_api.get_catalog('fake-user', 'fake-tenant')
self.assertEqual({}, catalog)
def test_get_catalog_with_empty_public_url(self):
service = unit.new_service_ref()
self.catalog_api.create_service(service['id'], service)
PROVIDERS.catalog_api.create_service(service['id'], service)
endpoint = unit.new_endpoint_ref(url='', service_id=service['id'],
region_id=None)
self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())
PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())
catalog = self.catalog_api.get_catalog('user', 'tenant')
catalog = PROVIDERS.catalog_api.get_catalog('user', 'tenant')
catalog_endpoint = catalog[endpoint['region_id']][service['type']]
self.assertEqual(service['name'], catalog_endpoint['name'])
self.assertEqual(endpoint['id'], catalog_endpoint['id'])
@ -888,13 +908,13 @@ class SqlCatalog(SqlTests, catalog_tests.CatalogTests):
def test_create_endpoint_region_returns_not_found(self):
service = unit.new_service_ref()
self.catalog_api.create_service(service['id'], service)
PROVIDERS.catalog_api.create_service(service['id'], service)
endpoint = unit.new_endpoint_ref(region_id=uuid.uuid4().hex,
service_id=service['id'])
self.assertRaises(exception.ValidationError,
self.catalog_api.create_endpoint,
PROVIDERS.catalog_api.create_endpoint,
endpoint['id'],
endpoint.copy())
@ -902,85 +922,92 @@ class SqlCatalog(SqlTests, catalog_tests.CatalogTests):
region = unit.new_region_ref(id='0' * 256)
self.assertRaises(exception.StringLengthExceeded,
self.catalog_api.create_region,
PROVIDERS.catalog_api.create_region,
region)
def test_create_region_invalid_parent_id(self):
region = unit.new_region_ref(parent_region_id='0' * 256)
self.assertRaises(exception.RegionNotFound,
self.catalog_api.create_region,
PROVIDERS.catalog_api.create_region,
region)
def test_delete_region_with_endpoint(self):
# create a region
region = unit.new_region_ref()
self.catalog_api.create_region(region)
PROVIDERS.catalog_api.create_region(region)
# create a child region
child_region = unit.new_region_ref(parent_region_id=region['id'])
self.catalog_api.create_region(child_region)
PROVIDERS.catalog_api.create_region(child_region)
# create a service
service = unit.new_service_ref()
self.catalog_api.create_service(service['id'], service)
PROVIDERS.catalog_api.create_service(service['id'], service)
# create an endpoint attached to the service and child region
child_endpoint = unit.new_endpoint_ref(region_id=child_region['id'],
service_id=service['id'])
self.catalog_api.create_endpoint(child_endpoint['id'], child_endpoint)
PROVIDERS.catalog_api.create_endpoint(
child_endpoint['id'], child_endpoint
)
self.assertRaises(exception.RegionDeletionError,
self.catalog_api.delete_region,
PROVIDERS.catalog_api.delete_region,
child_region['id'])
# create an endpoint attached to the service and parent region
endpoint = unit.new_endpoint_ref(region_id=region['id'],
service_id=service['id'])
self.catalog_api.create_endpoint(endpoint['id'], endpoint)
PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
self.assertRaises(exception.RegionDeletionError,
self.catalog_api.delete_region,
PROVIDERS.catalog_api.delete_region,
region['id'])
def test_v3_catalog_domain_scoped_token(self):
# test the case that tenant_id is None.
srv_1 = unit.new_service_ref()
self.catalog_api.create_service(srv_1['id'], srv_1)
PROVIDERS.catalog_api.create_service(srv_1['id'], srv_1)
endpoint_1 = unit.new_endpoint_ref(service_id=srv_1['id'],
region_id=None)
self.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
PROVIDERS.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
srv_2 = unit.new_service_ref()
self.catalog_api.create_service(srv_2['id'], srv_2)
PROVIDERS.catalog_api.create_service(srv_2['id'], srv_2)
endpoint_2 = unit.new_endpoint_ref(service_id=srv_2['id'],
region_id=None)
self.catalog_api.create_endpoint(endpoint_2['id'], endpoint_2)
PROVIDERS.catalog_api.create_endpoint(endpoint_2['id'], endpoint_2)
self.config_fixture.config(group='endpoint_filter',
return_all_endpoints_if_no_filter=True)
catalog_ref = self.catalog_api.get_v3_catalog(uuid.uuid4().hex, None)
catalog_ref = PROVIDERS.catalog_api.get_v3_catalog(
uuid.uuid4().hex, None
)
self.assertThat(catalog_ref, matchers.HasLength(2))
self.config_fixture.config(group='endpoint_filter',
return_all_endpoints_if_no_filter=False)
catalog_ref = self.catalog_api.get_v3_catalog(uuid.uuid4().hex, None)
catalog_ref = PROVIDERS.catalog_api.get_v3_catalog(
uuid.uuid4().hex, None
)
self.assertThat(catalog_ref, matchers.HasLength(0))
def test_v3_catalog_endpoint_filter_enabled(self):
srv_1 = unit.new_service_ref()
self.catalog_api.create_service(srv_1['id'], srv_1)
PROVIDERS.catalog_api.create_service(srv_1['id'], srv_1)
endpoint_1 = unit.new_endpoint_ref(service_id=srv_1['id'],
region_id=None)
self.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
PROVIDERS.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
endpoint_2 = unit.new_endpoint_ref(service_id=srv_1['id'],
region_id=None)
self.catalog_api.create_endpoint(endpoint_2['id'], endpoint_2)
PROVIDERS.catalog_api.create_endpoint(endpoint_2['id'], endpoint_2)
# create endpoint-project association.
self.catalog_api.add_endpoint_to_project(
PROVIDERS.catalog_api.add_endpoint_to_project(
endpoint_1['id'],
self.tenant_bar['id'])
catalog_ref = self.catalog_api.get_v3_catalog(uuid.uuid4().hex,
self.tenant_bar['id'])
catalog_ref = PROVIDERS.catalog_api.get_v3_catalog(
uuid.uuid4().hex, self.tenant_bar['id']
)
self.assertThat(catalog_ref, matchers.HasLength(1))
self.assertThat(catalog_ref[0]['endpoints'], matchers.HasLength(1))
# the endpoint is that defined in the endpoint-project association.
@ -992,16 +1019,17 @@ class SqlCatalog(SqlTests, catalog_tests.CatalogTests):
self.config_fixture.config(group='endpoint_filter',
return_all_endpoints_if_no_filter=True)
srv_1 = unit.new_service_ref()
self.catalog_api.create_service(srv_1['id'], srv_1)
PROVIDERS.catalog_api.create_service(srv_1['id'], srv_1)
endpoint_1 = unit.new_endpoint_ref(service_id=srv_1['id'],
region_id=None)
self.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
PROVIDERS.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
srv_2 = unit.new_service_ref()
self.catalog_api.create_service(srv_2['id'], srv_2)
PROVIDERS.catalog_api.create_service(srv_2['id'], srv_2)
catalog_ref = self.catalog_api.get_v3_catalog(uuid.uuid4().hex,
self.tenant_bar['id'])
catalog_ref = PROVIDERS.catalog_api.get_v3_catalog(
uuid.uuid4().hex, self.tenant_bar['id']
)
self.assertThat(catalog_ref, matchers.HasLength(2))
srv_id_list = [catalog_ref[0]['id'], catalog_ref[1]['id']]
self.assertItemsEqual([srv_1['id'], srv_2['id']], srv_id_list)
@ -1047,8 +1075,8 @@ class SqlFilterTests(SqlTests, identity_tests.FilterTests):
del self.entity_list
del self.domain1_entity_list
self.domain1['enabled'] = False
self.resource_api.update_domain(self.domain1['id'], self.domain1)
self.resource_api.delete_domain(self.domain1['id'])
PROVIDERS.resource_api.update_domain(self.domain1['id'], self.domain1)
PROVIDERS.resource_api.delete_domain(self.domain1['id'])
del self.domain1
def test_list_entities_filtered_by_domain(self):
@ -1058,7 +1086,7 @@ class SqlFilterTests(SqlTests, identity_tests.FilterTests):
# the driver level.
self.addCleanup(self.clean_up_entities)
self.domain1 = unit.new_domain_ref()
self.resource_api.create_domain(self.domain1['id'], self.domain1)
PROVIDERS.resource_api.create_domain(self.domain1['id'], self.domain1)
self.entity_list = {}
self.domain1_entity_list = {}
@ -1087,25 +1115,25 @@ class SqlFilterTests(SqlTests, identity_tests.FilterTests):
"""
# Check we have some users
users = self.identity_api.list_users()
users = PROVIDERS.identity_api.list_users()
self.assertGreater(len(users), 0)
hints = driver_hints.Hints()
hints.add_filter('name', "anything' or 'x'='x")
users = self.identity_api.list_users(hints=hints)
users = PROVIDERS.identity_api.list_users(hints=hints)
self.assertEqual(0, len(users))
# See if we can add a SQL command...use the group table instead of the
# user table since 'user' is reserved word for SQLAlchemy.
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
group = PROVIDERS.identity_api.create_group(group)
hints = driver_hints.Hints()
hints.add_filter('name', "x'; drop table group")
groups = self.identity_api.list_groups(hints=hints)
groups = PROVIDERS.identity_api.list_groups(hints=hints)
self.assertEqual(0, len(groups))
groups = self.identity_api.list_groups()
groups = PROVIDERS.identity_api.list_groups()
self.assertGreater(len(groups), 0)
@ -1167,7 +1195,9 @@ class SqlCredential(SqlTests):
credential = unit.new_credential_ref(user_id=user_id,
extra=uuid.uuid4().hex,
type=uuid.uuid4().hex)
self.credential_api.create_credential(credential['id'], credential)
PROVIDERS.credential_api.create_credential(
credential['id'], credential
)
return credential
def _validateCredentialList(self, retrieved_credentials,
@ -1199,29 +1229,29 @@ class SqlCredential(SqlTests):
self.credentials.append(cred)
def test_list_credentials(self):
credentials = self.credential_api.list_credentials()
credentials = PROVIDERS.credential_api.list_credentials()
self._validateCredentialList(credentials, self.credentials)
# test filtering using hints
hints = driver_hints.Hints()
hints.add_filter('user_id', self.user_foo['id'])
credentials = self.credential_api.list_credentials(hints)
credentials = PROVIDERS.credential_api.list_credentials(hints)
self._validateCredentialList(credentials, self.user_credentials)
def test_list_credentials_for_user(self):
credentials = self.credential_api.list_credentials_for_user(
credentials = PROVIDERS.credential_api.list_credentials_for_user(
self.user_foo['id'])
self._validateCredentialList(credentials, self.user_credentials)
def test_list_credentials_for_user_and_type(self):
cred = self.user_credentials[0]
credentials = self.credential_api.list_credentials_for_user(
credentials = PROVIDERS.credential_api.list_credentials_for_user(
self.user_foo['id'], type=cred['type'])
self._validateCredentialList(credentials, [cred])
def test_create_credential_is_encrypted_when_stored(self):
credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
credential_id = credential['id']
returned_credential = self.credential_api.create_credential(
returned_credential = PROVIDERS.credential_api.create_credential(
credential_id,
credential
)
@ -1230,8 +1260,8 @@ class SqlCredential(SqlTests):
# credential API.
self.assertEqual(returned_credential['blob'], credential['blob'])
credential_from_backend = self.credential_api.driver.get_credential(
credential_id
credential_from_backend = (
PROVIDERS.credential_api.driver.get_credential(credential_id)
)
# Pull the credential directly from the backend, the `blob` should be
@ -1245,15 +1275,15 @@ class SqlCredential(SqlTests):
credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
credential_id = credential['id']
created_credential = self.credential_api.create_credential(
created_credential = PROVIDERS.credential_api.create_credential(
credential_id,
credential
)
# Pull the credential directly from the backend, the `blob` should be
# encrypted.
credential_from_backend = self.credential_api.driver.get_credential(
credential_id
credential_from_backend = (
PROVIDERS.credential_api.driver.get_credential(credential_id)
)
self.assertNotEqual(
credential_from_backend['encrypted_blob'],
@ -1261,7 +1291,7 @@ class SqlCredential(SqlTests):
)
# Make sure the `blob` values listed from the API are not encrypted.
listed_credentials = self.credential_api.list_credentials()
listed_credentials = PROVIDERS.credential_api.list_credentials()
self.assertIn(created_credential, listed_credentials)

View File

@ -50,6 +50,7 @@ from keystone.tests.unit.ksfixtures import ldapdb
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class CliTestCase(unit.SQLDriverOverrides, unit.TestCase):
@ -396,13 +397,15 @@ class CliDomainConfigAllTestCase(unit.SQLDriverOverrides, unit.TestCase):
if domain == 'domain_default':
# Not allowed to delete the default domain, but should at least
# delete any domain-specific config for it.
self.domain_config_api.delete_config(
PROVIDERS.domain_config_api.delete_config(
CONF.identity.default_domain_id)
continue
this_domain = self.domains[domain]
this_domain['enabled'] = False
self.resource_api.update_domain(this_domain['id'], this_domain)
self.resource_api.delete_domain(this_domain['id'])
PROVIDERS.resource_api.update_domain(
this_domain['id'], this_domain
)
PROVIDERS.resource_api.delete_domain(this_domain['id'])
self.domains = {}
def config(self, config_files):
@ -412,7 +415,7 @@ class CliDomainConfigAllTestCase(unit.SQLDriverOverrides, unit.TestCase):
def setup_initial_domains(self):
def create_domain(domain):
return self.resource_api.create_domain(domain['id'], domain)
return PROVIDERS.resource_api.create_domain(domain['id'], domain)
self.domains = {}
self.addCleanup(self.cleanup_domains)
@ -458,13 +461,13 @@ class CliDomainConfigAllTestCase(unit.SQLDriverOverrides, unit.TestCase):
provider_api.ProviderAPIs._clear_registry_instances()
cli.DomainConfigUpload.main()
res = self.domain_config_api.get_config_with_sensitive_info(
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
CONF.identity.default_domain_id)
self.assertEqual(default_config, res)
res = self.domain_config_api.get_config_with_sensitive_info(
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
self.domains['domain1']['id'])
self.assertEqual(domain1_config, res)
res = self.domain_config_api.get_config_with_sensitive_info(
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
self.domains['domain2']['id'])
self.assertEqual(domain2_config, res)
@ -490,13 +493,13 @@ class CliDomainConfigSingleDomainTestCase(CliDomainConfigAllTestCase):
provider_api.ProviderAPIs._clear_registry_instances()
cli.DomainConfigUpload.main()
res = self.domain_config_api.get_config_with_sensitive_info(
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
CONF.identity.default_domain_id)
self.assertEqual(default_config, res)
res = self.domain_config_api.get_config_with_sensitive_info(
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
self.domains['domain1']['id'])
self.assertEqual({}, res)
res = self.domain_config_api.get_config_with_sensitive_info(
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
self.domains['domain2']['id'])
self.assertEqual({}, res)
@ -506,7 +509,7 @@ class CliDomainConfigSingleDomainTestCase(CliDomainConfigAllTestCase):
'ldap': {'url': uuid.uuid4().hex},
'identity': {'driver': 'ldap'}
}
self.domain_config_api.create_config(
PROVIDERS.domain_config_api.create_config(
CONF.identity.default_domain_id, default_config)
# Now try and upload the settings in the configuration file for the
@ -523,7 +526,7 @@ class CliDomainConfigSingleDomainTestCase(CliDomainConfigAllTestCase):
file_name)}
mock_print.assert_has_calls([mock.call(error_msg)])
res = self.domain_config_api.get_config(
res = PROVIDERS.domain_config_api.get_config(
CONF.identity.default_domain_id)
# The initial config should not have been overwritten
self.assertEqual(default_config, res)
@ -724,15 +727,17 @@ class TestMappingPopulate(unit.SQLDriverOverrides, unit.TestCase):
# 3. Execute mapping_populate. It should create id mappings
# 4. For the same users verify that they have public_id now
purge_filter = {}
self.id_mapping_api.purge_mappings(purge_filter)
PROVIDERS.id_mapping_api.purge_mappings(purge_filter)
hints = None
users = self.identity_api.driver.list_users(hints)
users = PROVIDERS.identity_api.driver.list_users(hints)
for user in users:
local_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': user['id'],
'entity_type': identity_mapping.EntityType.USER}
self.assertIsNone(self.id_mapping_api.get_public_id(local_entity))
self.assertIsNone(
PROVIDERS.id_mapping_api.get_public_id(local_entity)
)
# backends are loaded again in the command handler
provider_api.ProviderAPIs._clear_registry_instances()
@ -744,7 +749,7 @@ class TestMappingPopulate(unit.SQLDriverOverrides, unit.TestCase):
'local_id': user['id'],
'entity_type': identity_mapping.EntityType.USER}
self.assertIsNotNone(
self.id_mapping_api.get_public_id(local_entity))
PROVIDERS.id_mapping_api.get_public_id(local_entity))
def test_bad_domain_name(self):
CONF(args=['mapping_populate', '--domain-name', uuid.uuid4().hex],

View File

@ -17,6 +17,7 @@ import subprocess
import ldap.modlist
from six.moves import range
from keystone.common import provider_api
import keystone.conf
from keystone import exception
from keystone.identity.backends import ldap as identity_ldap
@ -25,6 +26,7 @@ from keystone.tests.unit import test_backend_ldap
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
def create_object(dn, attrs):
@ -100,20 +102,20 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity):
self.config_fixture.config(group='ldap',
query_scope='sub',
alias_dereferencing='never')
self.identity_api = identity_ldap.Identity()
PROVIDERS.identity_api = identity_ldap.Identity()
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
PROVIDERS.identity_api.get_user,
'alt_fake1')
self.config_fixture.config(group='ldap',
alias_dereferencing='searching')
self.identity_api = identity_ldap.Identity()
user_ref = self.identity_api.get_user('alt_fake1')
PROVIDERS.identity_api = identity_ldap.Identity()
user_ref = PROVIDERS.identity_api.get_user('alt_fake1')
self.assertEqual('alt_fake1', user_ref['id'])
self.config_fixture.config(group='ldap', alias_dereferencing='always')
self.identity_api = identity_ldap.Identity()
user_ref = self.identity_api.get_user('alt_fake1')
PROVIDERS.identity_api = identity_ldap.Identity()
user_ref = PROVIDERS.identity_api.get_user('alt_fake1')
self.assertEqual('alt_fake1', user_ref['id'])
# FakeLDAP does not correctly process filters, so this test can only be
@ -125,51 +127,51 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity):
GROUP_COUNT = 3
USER_COUNT = 2
positive_user = unit.create_user(self.identity_api, domain['id'])
negative_user = unit.create_user(self.identity_api, domain['id'])
positive_user = unit.create_user(PROVIDERS.identity_api, domain['id'])
negative_user = unit.create_user(PROVIDERS.identity_api, domain['id'])
for x in range(0, USER_COUNT):
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
test_users[x]['id'])
self.assertEqual(0, len(group_refs))
for x in range(0, GROUP_COUNT):
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
new_group = PROVIDERS.identity_api.create_group(new_group)
test_groups.append(new_group)
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(x, len(group_refs))
self.identity_api.add_user_to_group(
PROVIDERS.identity_api.add_user_to_group(
positive_user['id'],
new_group['id'])
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(x + 1, len(group_refs))
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
driver = self.identity_api._select_identity_driver(
driver = PROVIDERS.identity_api._select_identity_driver(
CONF.identity.default_domain_id)
driver.group.ldap_filter = '(dn=xx)'
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(0, len(group_refs))
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
driver.group.ldap_filter = '(objectclass=*)'
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(GROUP_COUNT, len(group_refs))
group_refs = self.identity_api.list_groups_for_user(
group_refs = PROVIDERS.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))

View File

@ -15,6 +15,7 @@
import ldap.modlist
from keystone.common import provider_api
import keystone.conf
from keystone import exception
from keystone import identity
@ -23,6 +24,7 @@ from keystone.tests.unit import test_ldap_livetest
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
def create_object(dn, attrs):
@ -48,39 +50,45 @@ class LiveTLSLDAPIdentity(test_ldap_livetest.LiveLDAPIdentity):
use_tls=True,
tls_cacertdir=None,
tls_req_cert='demand')
self.identity_api = identity.backends.ldap.Identity()
PROVIDERS.identity_api = identity.backends.ldap.Identity()
user = unit.create_user(self.identity_api, 'default',
user = unit.create_user(PROVIDERS.identity_api, 'default',
name='fake1', password='fakepass1')
user_ref = self.identity_api.get_user(user['id'])
user_ref = PROVIDERS.identity_api.get_user(user['id'])
self.assertEqual(user['id'], user_ref['id'])
user['password'] = 'fakepass2'
self.identity_api.update_user(user['id'], user)
PROVIDERS.identity_api.update_user(user['id'], user)
self.identity_api.delete_user(user['id'])
self.assertRaises(exception.UserNotFound, self.identity_api.get_user,
user['id'])
PROVIDERS.identity_api.delete_user(user['id'])
self.assertRaises(
exception.UserNotFound,
PROVIDERS.identity_api.get_user,
user['id']
)
def test_tls_certdir_demand_option(self):
self.config_fixture.config(group='ldap',
use_tls=True,
tls_cacertdir=None,
tls_req_cert='demand')
self.identity_api = identity.backends.ldap.Identity()
PROVIDERS.identity_api = identity.backends.ldap.Identity()
user = unit.create_user(self.identity_api, 'default',
user = unit.create_user(PROVIDERS.identity_api, 'default',
id='fake1', name='fake1',
password='fakepass1')
user_ref = self.identity_api.get_user('fake1')
user_ref = PROVIDERS.identity_api.get_user('fake1')
self.assertEqual('fake1', user_ref['id'])
user['password'] = 'fakepass2'
self.identity_api.update_user('fake1', user)
PROVIDERS.identity_api.update_user('fake1', user)
self.identity_api.delete_user('fake1')
self.assertRaises(exception.UserNotFound, self.identity_api.get_user,
'fake1')
PROVIDERS.identity_api.delete_user('fake1')
self.assertRaises(
exception.UserNotFound,
PROVIDERS.identity_api.get_user,
'fake1'
)
def test_tls_bad_certfile(self):
self.config_fixture.config(
@ -89,10 +97,10 @@ class LiveTLSLDAPIdentity(test_ldap_livetest.LiveLDAPIdentity):
tls_req_cert='demand',
tls_cacertfile='/etc/keystone/ssl/certs/mythicalcert.pem',
tls_cacertdir=None)
self.identity_api = identity.backends.ldap.Identity()
PROVIDERS.identity_api = identity.backends.ldap.Identity()
user = unit.new_user_ref('default')
self.assertRaises(IOError, self.identity_api.create_user, user)
self.assertRaises(IOError, PROVIDERS.identity_api.create_user, user)
def test_tls_bad_certdir(self):
self.config_fixture.config(
@ -101,7 +109,7 @@ class LiveTLSLDAPIdentity(test_ldap_livetest.LiveLDAPIdentity):
tls_cacertfile=None,
tls_req_cert='demand',
tls_cacertdir='/etc/keystone/ssl/mythicalcertdir')
self.identity_api = identity.backends.ldap.Identity()
PROVIDERS.identity_api = identity.backends.ldap.Identity()
user = unit.new_user_ref('default')
self.assertRaises(IOError, self.identity_api.create_user, user)
self.assertRaises(IOError, PROVIDERS.identity_api.create_user, user)

View File

@ -18,6 +18,7 @@ import mock
from oslo_utils import timeutils
from testtools import matchers
from keystone.common import provider_api
from keystone.common import utils
import keystone.conf
from keystone import exception
@ -30,6 +31,7 @@ from keystone.token.providers import common
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
def _future_time():
@ -49,27 +51,30 @@ class RevokeTests(object):
def _assertTokenRevoked(self, token_data):
self.assertRaises(exception.TokenNotFound,
self.revoke_api.check_token,
PROVIDERS.revoke_api.check_token,
token=token_data)
def _assertTokenNotRevoked(self, token_data):
self.assertIsNone(self.revoke_api.check_token(token_data))
self.assertIsNone(PROVIDERS.revoke_api.check_token(token_data))
def test_list(self):
self.revoke_api.revoke_by_user(user_id=1)
self.assertEqual(1, len(self.revoke_api.list_events()))
PROVIDERS.revoke_api.revoke_by_user(user_id=1)
self.assertEqual(1, len(PROVIDERS.revoke_api.list_events()))
self.revoke_api.revoke_by_user(user_id=2)
self.assertEqual(2, len(self.revoke_api.list_events()))
PROVIDERS.revoke_api.revoke_by_user(user_id=2)
self.assertEqual(2, len(PROVIDERS.revoke_api.list_events()))
def test_list_since(self):
self.revoke_api.revoke_by_user(user_id=1)
self.revoke_api.revoke_by_user(user_id=2)
PROVIDERS.revoke_api.revoke_by_user(user_id=1)
PROVIDERS.revoke_api.revoke_by_user(user_id=2)
past = timeutils.utcnow() - datetime.timedelta(seconds=1000)
self.assertEqual(2, len(self.revoke_api.list_events(last_fetch=past)))
self.assertEqual(
2, len(PROVIDERS.revoke_api.list_events(last_fetch=past))
)
future = timeutils.utcnow() + datetime.timedelta(seconds=1000)
self.assertEqual(0,
len(self.revoke_api.list_events(last_fetch=future)))
self.assertEqual(
0, len(PROVIDERS.revoke_api.list_events(last_fetch=future))
)
def test_list_revoked_user(self):
revocation_backend = sql.Revoke()
@ -80,7 +85,7 @@ class RevokeTests(object):
# event in the backend.
first_token = _sample_blank_token()
first_token['user_id'] = uuid.uuid4().hex
self.revoke_api.revoke_by_user(user_id=first_token['user_id'])
PROVIDERS.revoke_api.revoke_by_user(user_id=first_token['user_id'])
self._assertTokenRevoked(first_token)
self.assertEqual(
1, len(revocation_backend.list_events(token=first_token))
@ -92,7 +97,7 @@ class RevokeTests(object):
# one should match the values of the second token.
second_token = _sample_blank_token()
second_token['user_id'] = uuid.uuid4().hex
self.revoke_api.revoke_by_user(user_id=second_token['user_id'])
PROVIDERS.revoke_api.revoke_by_user(user_id=second_token['user_id'])
self._assertTokenRevoked(second_token)
self.assertEqual(
1, len(revocation_backend.list_events(token=second_token))
@ -164,7 +169,7 @@ class RevokeTests(object):
# just revoked.
first_token = _sample_blank_token()
first_token['audit_id'] = common.random_urlsafe_str()
self.revoke_api.revoke_by_audit_id(
PROVIDERS.revoke_api.revoke_by_audit_id(
audit_id=first_token['audit_id'])
self._assertTokenRevoked(first_token)
self.assertEqual(
@ -175,7 +180,7 @@ class RevokeTests(object):
# dont both have different populated audit_id fields
second_token = _sample_blank_token()
second_token['audit_id'] = common.random_urlsafe_str()
self.revoke_api.revoke_by_audit_id(
PROVIDERS.revoke_api.revoke_by_audit_id(
audit_id=second_token['audit_id'])
self._assertTokenRevoked(second_token)
self.assertEqual(
@ -193,8 +198,8 @@ class RevokeTests(object):
def test_list_revoked_since(self):
revocation_backend = sql.Revoke()
token = _sample_blank_token()
self.revoke_api.revoke_by_user(user_id=None)
self.revoke_api.revoke_by_user(user_id=None)
PROVIDERS.revoke_api.revoke_by_user(user_id=None)
PROVIDERS.revoke_api.revoke_by_user(user_id=None)
self.assertEqual(2, len(revocation_backend.list_events(token=token)))
future = timeutils.utcnow() + datetime.timedelta(seconds=1000)
token['issued_at'] = future
@ -210,7 +215,7 @@ class RevokeTests(object):
first_token['audit_id'] = common.random_urlsafe_str()
# revoke event and then verify that there is only one revocation
# and verify the only revoked event is the token
self.revoke_api.revoke(revoke_model.RevokeEvent(
PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
user_id=first_token['user_id'],
project_id=first_token['project_id'],
audit_id=first_token['audit_id']))
@ -237,7 +242,7 @@ class RevokeTests(object):
fourth_token['user_id'] = uuid.uuid4().hex
fourth_token['project_id'] = uuid.uuid4().hex
fourth_token['audit_id'] = common.random_urlsafe_str()
self.revoke_api.revoke(revoke_model.RevokeEvent(
PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
project_id=fourth_token['project_id'],
audit_id=fourth_token['audit_id']))
self._assertTokenRevoked(fourth_token)
@ -247,7 +252,7 @@ class RevokeTests(object):
def _user_field_test(self, field_name):
token = _sample_blank_token()
token[field_name] = uuid.uuid4().hex
self.revoke_api.revoke_by_user(user_id=token[field_name])
PROVIDERS.revoke_api.revoke_by_user(user_id=token[field_name])
self._assertTokenRevoked(token)
token2 = _sample_blank_token()
token2[field_name] = uuid.uuid4().hex
@ -276,7 +281,9 @@ class RevokeTests(object):
self.assertEqual(
0, len(revocation_backend.list_events(token=token_data)))
self.revoke_api.revoke(revoke_model.RevokeEvent(domain_id=domain_id))
PROVIDERS.revoke_api.revoke(
revoke_model.RevokeEvent(domain_id=domain_id)
)
self._assertTokenRevoked(token_data)
self.assertEqual(
@ -297,7 +304,7 @@ class RevokeTests(object):
# If revoke a domain, then a token scoped to a project in the domain
# is revoked.
self.revoke_api.revoke(revoke_model.RevokeEvent(
PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
domain_id=token_data['assignment_domain_id']))
self._assertTokenRevoked(token_data)
@ -317,7 +324,7 @@ class RevokeTests(object):
0, len(revocation_backend.list_events(token=token_data)))
# If revoke a domain, then a token scoped to the domain is revoked.
self.revoke_api.revoke(revoke_model.RevokeEvent(
PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
domain_id=token_data['assignment_domain_id']))
self._assertTokenRevoked(token_data)
@ -344,7 +351,7 @@ class RevokeTests(object):
0, len(revocation_backend.list_events(token=second_token)))
# Revoke first_token using user_id and project_id
self.revoke_api.revoke_by_user_and_project(
PROVIDERS.revoke_api.revoke_by_user_and_project(
first_token['user_id'], first_token['project_id'])
# Check that only first_token has been revoked.
@ -361,7 +368,7 @@ class RevokeTests(object):
# if the token is an original token
token['audit_id'] = uuid.uuid4().hex
token['audit_chain_id'] = token['audit_id']
self.revoke_api.revoke_by_audit_id(audit_id=token['audit_id'])
PROVIDERS.revoke_api.revoke_by_audit_id(audit_id=token['audit_id'])
self._assertTokenRevoked(token)
token2 = _sample_blank_token()
@ -384,7 +391,7 @@ class RevokeTests(object):
self.assertEqual(0, len(revocation_backend.list_events(token=token)))
# Revoked token by audit chain id using the audit_id
self.revoke_api.revoke_by_audit_chain_id(audit_id)
PROVIDERS.revoke_api.revoke_by_audit_chain_id(audit_id)
# Check that the token is now revoked
self._assertTokenRevoked(token)
self.assertEqual(1, len(revocation_backend.list_events(token=token)))
@ -406,67 +413,73 @@ class RevokeTests(object):
token_values = _sample_token_values()
audit_chain_id = uuid.uuid4().hex
self.revoke_api.revoke_by_audit_chain_id(audit_chain_id)
PROVIDERS.revoke_api.revoke_by_audit_chain_id(audit_chain_id)
token_values['audit_chain_id'] = audit_chain_id
self.assertRaises(exception.TokenNotFound,
self.revoke_api.check_token,
PROVIDERS.revoke_api.check_token,
token_values)
# Move our clock forward by 2h, build a new token and validate it.
# 'synchronize' should now be exercised and remove old expired events
mock_utcnow.return_value = now_plus_2h
self.revoke_api.revoke_by_audit_chain_id(audit_chain_id)
PROVIDERS.revoke_api.revoke_by_audit_chain_id(audit_chain_id)
# two hours later, it should still be not found
self.assertRaises(exception.TokenNotFound,
self.revoke_api.check_token,
PROVIDERS.revoke_api.check_token,
token_values)
def test_delete_group_without_role_does_not_revoke_users(self):
revocation_backend = sql.Revoke()
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
# Create two groups. Group1 will be used to test deleting a group,
# without role assignments and users in the group, doesn't create
# revoked events. Group2 will show that deleting a group with role
# assignment and users in the group does create revoked events
group1 = unit.new_group_ref(domain_id=domain['id'])
group1 = self.identity_api.create_group(group1)
group1 = PROVIDERS.identity_api.create_group(group1)
group2 = unit.new_group_ref(domain_id=domain['id'])
group2 = self.identity_api.create_group(group2)
group2 = PROVIDERS.identity_api.create_group(group2)
role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
PROVIDERS.role_api.create_role(role['id'], role)
user1 = unit.new_user_ref(domain_id=domain['id'])
user1 = self.identity_api.create_user(user1)
user1 = PROVIDERS.identity_api.create_user(user1)
user2 = unit.new_user_ref(domain_id=domain['id'])
user2 = self.identity_api.create_user(user2)
user2 = PROVIDERS.identity_api.create_user(user2)
# Add two users to the group, verify they are added, delete group, and
# check that the revocaiton events have not been created
self.identity_api.add_user_to_group(user_id=user1['id'],
group_id=group1['id'])
self.identity_api.add_user_to_group(user_id=user2['id'],
group_id=group1['id'])
PROVIDERS.identity_api.add_user_to_group(
user_id=user1['id'], group_id=group1['id']
)
PROVIDERS.identity_api.add_user_to_group(
user_id=user2['id'], group_id=group1['id']
)
self.assertEqual(
2, len(self.identity_api.list_users_in_group(group1['id'])))
self.identity_api.delete_group(group1['id'])
2, len(PROVIDERS.identity_api.list_users_in_group(group1['id'])))
PROVIDERS.identity_api.delete_group(group1['id'])
self.assertEqual(0, len(revocation_backend.list_events()))
# Assign a role to the group, add two users to the group, verify that
# the role has been assigned to the group, verify the users have been
# added to the group, delete the group, check that the revocation
# events have been created
self.assignment_api.create_grant(group_id=group2['id'],
domain_id=domain['id'],
role_id=role['id'])
grants = self.assignment_api.list_role_assignments(role_id=role['id'])
PROVIDERS.assignment_api.create_grant(
group_id=group2['id'], domain_id=domain['id'], role_id=role['id']
)
grants = PROVIDERS.assignment_api.list_role_assignments(
role_id=role['id']
)
self.assertThat(grants, matchers.HasLength(1))
self.identity_api.add_user_to_group(user_id=user1['id'],
group_id=group2['id'])
self.identity_api.add_user_to_group(user_id=user2['id'],
group_id=group2['id'])
PROVIDERS.identity_api.add_user_to_group(
user_id=user1['id'], group_id=group2['id']
)
PROVIDERS.identity_api.add_user_to_group(
user_id=user2['id'], group_id=group2['id']
)
self.assertEqual(
2, len(self.identity_api.list_users_in_group(group2['id'])))
self.identity_api.delete_group(group2['id'])
2, len(PROVIDERS.identity_api.list_users_in_group(group2['id'])))
PROVIDERS.identity_api.delete_group(group2['id'])
self.assertEqual(2, len(revocation_backend.list_events()))

View File

@ -12,11 +12,14 @@
import uuid
from keystone.common import provider_api
from keystone.tests import unit
from keystone.tests.unit.identity.shadow_users import test_backend
from keystone.tests.unit.identity.shadow_users import test_core
from keystone.tests.unit.ksfixtures import database
PROVIDERS = provider_api.ProviderAPIs
class ShadowUsersTests(unit.TestCase,
test_backend.ShadowUsersBackendTests,
@ -44,9 +47,11 @@ class ShadowUsersTests(unit.TestCase,
'unique_id': uuid.uuid4().hex,
'display_name': uuid.uuid4().hex
}
self.federation_api.create_idp(self.idp['id'], self.idp)
self.federation_api.create_mapping(self.mapping['id'], self.mapping)
self.federation_api.create_protocol(
PROVIDERS.federation_api.create_idp(self.idp['id'], self.idp)
PROVIDERS.federation_api.create_mapping(
self.mapping['id'], self.mapping
)
PROVIDERS.federation_api.create_protocol(
self.idp['id'], self.protocol['id'], self.protocol)
self.domain_id = (
self.federation_api.get_idp(self.idp['id'])['domain_id'])
PROVIDERS.federation_api.get_idp(self.idp['id'])['domain_id'])

View File

@ -18,10 +18,13 @@ import uuid
from six.moves import http_client
from testtools import matchers
from keystone.common import provider_api
from keystone.tests import unit
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit import test_v3
PROVIDERS = provider_api.ProviderAPIs
class CatalogTestCase(test_v3.RestfulTestCase):
"""Test service & endpoint CRUD."""
@ -690,7 +693,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
url=url_with_space)
# add the endpoint to the database
self.catalog_api.create_endpoint(ref['id'], ref)
PROVIDERS.catalog_api.create_endpoint(ref['id'], ref)
# delete the endpoint
self.delete('/endpoints/%s' % ref['id'])
@ -825,7 +828,7 @@ class TestCatalogAPISQL(unit.TestCase):
service = unit.new_service_ref()
self.service_id = service['id']
self.catalog_api.create_service(self.service_id, service)
PROVIDERS.catalog_api.create_service(self.service_id, service)
self.create_endpoint(service_id=self.service_id)
@ -833,7 +836,7 @@ class TestCatalogAPISQL(unit.TestCase):
endpoint = unit.new_endpoint_ref(service_id=service_id,
region_id=None, **kwargs)
self.catalog_api.create_endpoint(endpoint['id'], endpoint)
PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
return endpoint
def config_overrides(self):
@ -847,15 +850,15 @@ class TestCatalogAPISQL(unit.TestCase):
# filter the catalog by the project or replace the url with a
# valid project id.
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
project = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project['id'], project)
PROVIDERS.resource_api.create_project(project['id'], project)
# the only endpoint in the catalog is the one created in setUp
catalog = self.catalog_api.get_v3_catalog(user_id, project['id'])
catalog = PROVIDERS.catalog_api.get_v3_catalog(user_id, project['id'])
self.assertEqual(1, len(catalog[0]['endpoints']))
# it's also the only endpoint in the backend
self.assertEqual(1, len(self.catalog_api.list_endpoints()))
self.assertEqual(1, len(PROVIDERS.catalog_api.list_endpoints()))
# create a new, invalid endpoint - malformed type declaration
self.create_endpoint(self.service_id,
@ -866,23 +869,23 @@ class TestCatalogAPISQL(unit.TestCase):
url='http://keystone/%(you_wont_find_me)s')
# verify that the invalid endpoints don't appear in the catalog
catalog = self.catalog_api.get_v3_catalog(user_id, project['id'])
catalog = PROVIDERS.catalog_api.get_v3_catalog(user_id, project['id'])
self.assertEqual(1, len(catalog[0]['endpoints']))
# all three appear in the backend
self.assertEqual(3, len(self.catalog_api.list_endpoints()))
self.assertEqual(3, len(PROVIDERS.catalog_api.list_endpoints()))
# create another valid endpoint - project_id will be replaced
self.create_endpoint(self.service_id,
url='http://keystone/%(project_id)s')
# there are two valid endpoints, positive check
catalog = self.catalog_api.get_v3_catalog(user_id, project['id'])
catalog = PROVIDERS.catalog_api.get_v3_catalog(user_id, project['id'])
self.assertThat(catalog[0]['endpoints'], matchers.HasLength(2))
# If the URL has no 'project_id' to substitute, we will skip the
# endpoint which contains this kind of URL, negative check.
project_id = None
catalog = self.catalog_api.get_v3_catalog(user_id, project_id)
catalog = PROVIDERS.catalog_api.get_v3_catalog(user_id, project_id)
self.assertThat(catalog[0]['endpoints'], matchers.HasLength(1))
def test_get_catalog_always_returns_service_name(self):
@ -891,22 +894,22 @@ class TestCatalogAPISQL(unit.TestCase):
# filter the catalog by the project or replace the url with a
# valid project id.
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
project = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project['id'], project)
PROVIDERS.resource_api.create_project(project['id'], project)
# create a service, with a name
named_svc = unit.new_service_ref()
self.catalog_api.create_service(named_svc['id'], named_svc)
PROVIDERS.catalog_api.create_service(named_svc['id'], named_svc)
self.create_endpoint(service_id=named_svc['id'])
# create a service, with no name
unnamed_svc = unit.new_service_ref(name=None)
del unnamed_svc['name']
self.catalog_api.create_service(unnamed_svc['id'], unnamed_svc)
PROVIDERS.catalog_api.create_service(unnamed_svc['id'], unnamed_svc)
self.create_endpoint(service_id=unnamed_svc['id'])
catalog = self.catalog_api.get_v3_catalog(user_id, project['id'])
catalog = PROVIDERS.catalog_api.get_v3_catalog(user_id, project['id'])
named_endpoint = [ep for ep in catalog
if ep['type'] == named_svc['type']][0]
@ -934,46 +937,46 @@ class TestCatalogAPISQLRegions(unit.TestCase):
def test_get_catalog_returns_proper_endpoints_with_no_region(self):
service = unit.new_service_ref()
service_id = service['id']
self.catalog_api.create_service(service_id, service)
PROVIDERS.catalog_api.create_service(service_id, service)
endpoint = unit.new_endpoint_ref(service_id=service_id,
region_id=None)
del endpoint['region_id']
self.catalog_api.create_endpoint(endpoint['id'], endpoint)
PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
# create a project since the project should exist if we want to
# filter the catalog by the project or replace the url with a
# valid project id.
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
project = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project['id'], project)
PROVIDERS.resource_api.create_project(project['id'], project)
user_id = uuid.uuid4().hex
catalog = self.catalog_api.get_v3_catalog(user_id, project['id'])
catalog = PROVIDERS.catalog_api.get_v3_catalog(user_id, project['id'])
self.assertValidCatalogEndpoint(
catalog[0]['endpoints'][0], ref=endpoint)
def test_get_catalog_returns_proper_endpoints_with_region(self):
service = unit.new_service_ref()
service_id = service['id']
self.catalog_api.create_service(service_id, service)
PROVIDERS.catalog_api.create_service(service_id, service)
endpoint = unit.new_endpoint_ref(service_id=service_id)
region = unit.new_region_ref(id=endpoint['region_id'])
self.catalog_api.create_region(region)
self.catalog_api.create_endpoint(endpoint['id'], endpoint)
PROVIDERS.catalog_api.create_region(region)
PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
endpoint = self.catalog_api.get_endpoint(endpoint['id'])
endpoint = PROVIDERS.catalog_api.get_endpoint(endpoint['id'])
user_id = uuid.uuid4().hex
# create a project since the project should exist if we want to
# filter the catalog by the project or replace the url with a
# valid project id.
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
PROVIDERS.resource_api.create_domain(domain['id'], domain)
project = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project['id'], project)
PROVIDERS.resource_api.create_project(project['id'], project)
catalog = self.catalog_api.get_v3_catalog(user_id, project['id'])
catalog = PROVIDERS.catalog_api.get_v3_catalog(user_id, project['id'])
self.assertValidCatalogEndpoint(
catalog[0]['endpoints'][0], ref=endpoint)

View File

@ -15,9 +15,12 @@
from six.moves import http_client
from testtools import matchers
from keystone.common import provider_api
from keystone.tests import unit
from keystone.tests.unit import test_v3
PROVIDERS = provider_api.ProviderAPIs
class EndpointPolicyTestCase(test_v3.RestfulTestCase):
"""Test endpoint policy CRUD.
@ -33,15 +36,17 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
def setUp(self):
super(EndpointPolicyTestCase, self).setUp()
self.policy = unit.new_policy_ref()
self.policy_api.create_policy(self.policy['id'], self.policy)
PROVIDERS.policy_api.create_policy(self.policy['id'], self.policy)
self.service = unit.new_service_ref()
self.catalog_api.create_service(self.service['id'], self.service)
PROVIDERS.catalog_api.create_service(self.service['id'], self.service)
self.endpoint = unit.new_endpoint_ref(self.service['id'], enabled=True,
interface='public',
region_id=self.region_id)
self.catalog_api.create_endpoint(self.endpoint['id'], self.endpoint)
PROVIDERS.catalog_api.create_endpoint(
self.endpoint['id'], self.endpoint
)
self.region = unit.new_region_ref()
self.catalog_api.create_region(self.region)
PROVIDERS.catalog_api.create_region(self.region)
def assert_head_and_get_return_same_response(self, url, expected_status):
self.get(url, expected_status=expected_status)

View File

@ -21,6 +21,7 @@ from oslo_serialization import jsonutils
from six.moves import http_client
from six.moves import range
from keystone.common import provider_api
import keystone.conf
from keystone.tests import unit
from keystone.tests.unit import filtering
@ -30,6 +31,7 @@ from keystone.tests.unit import test_v3
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class IdentityTestFilteredCase(filtering.FilterTests,
@ -63,26 +65,27 @@ class IdentityTestFilteredCase(filtering.FilterTests,
# Start by creating a few domains
self._populate_default_domain()
self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
self.domainB = unit.new_domain_ref()
self.resource_api.create_domain(self.domainB['id'], self.domainB)
PROVIDERS.resource_api.create_domain(self.domainB['id'], self.domainB)
self.domainC = unit.new_domain_ref()
self.domainC['enabled'] = False
self.resource_api.create_domain(self.domainC['id'], self.domainC)
PROVIDERS.resource_api.create_domain(self.domainC['id'], self.domainC)
# Now create some users, one in domainA and two of them in domainB
self.user1 = unit.create_user(self.identity_api,
self.user1 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainA['id'])
self.user2 = unit.create_user(self.identity_api,
self.user2 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainB['id'])
self.user3 = unit.create_user(self.identity_api,
self.user3 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainB['id'])
self.role = unit.new_role_ref()
self.role_api.create_role(self.role['id'], self.role)
self.assignment_api.create_grant(self.role['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
PROVIDERS.role_api.create_role(self.role['id'], self.role)
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user1['id'],
domain_id=self.domainA['id']
)
# A default auth request we can use - un-scoped user token
self.auth = self.build_authentication_request(
@ -224,7 +227,7 @@ class IdentityTestFilteredCase(filtering.FilterTests,
self._set_policy({"identity:list_users": []})
user = self.user1
user['name'] = '%my%name%'
self.identity_api.update_user(user['id'], user)
PROVIDERS.identity_api.update_user(user['id'], user)
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
@ -240,23 +243,23 @@ class IdentityTestFilteredCase(filtering.FilterTests,
# Set up some names that we can filter on
user = user_list[5]
user['name'] = 'The'
self.identity_api.update_user(user['id'], user)
PROVIDERS.identity_api.update_user(user['id'], user)
user = user_list[6]
user['name'] = 'The Ministry'
self.identity_api.update_user(user['id'], user)
PROVIDERS.identity_api.update_user(user['id'], user)
user = user_list[7]
user['name'] = 'The Ministry of'
self.identity_api.update_user(user['id'], user)
PROVIDERS.identity_api.update_user(user['id'], user)
user = user_list[8]
user['name'] = 'The Ministry of Silly'
self.identity_api.update_user(user['id'], user)
PROVIDERS.identity_api.update_user(user['id'], user)
user = user_list[9]
user['name'] = 'The Ministry of Silly Walks'
self.identity_api.update_user(user['id'], user)
PROVIDERS.identity_api.update_user(user['id'], user)
# ...and one for useful case insensitivity testing
user = user_list[10]
user['name'] = 'the ministry of silly walks OF'
self.identity_api.update_user(user['id'], user)
PROVIDERS.identity_api.update_user(user['id'], user)
self._set_policy({"identity:list_users": []})
@ -318,7 +321,7 @@ class IdentityTestFilteredCase(filtering.FilterTests,
# See if we can add a SQL command...use the group table instead of the
# user table since 'user' is reserved word for SQLAlchemy.
group = unit.new_group_ref(domain_id=self.domainB['id'])
group = self.identity_api.create_group(group)
group = PROVIDERS.identity_api.create_group(group)
url_by_name = "/users?name=x'; drop table group"
r = self.get(url_by_name, auth=self.auth)
@ -351,14 +354,15 @@ class IdentityPasswordExpiryFilteredTestCase(filtering.FilterTests,
"""
self._populate_default_domain()
self.domain = unit.new_domain_ref()
self.resource_api.create_domain(self.domain['id'], self.domain)
PROVIDERS.resource_api.create_domain(self.domain['id'], self.domain)
self.domain_id = self.domain['id']
self.project = unit.new_project_ref(domain_id=self.domain_id)
self.project_id = self.project['id']
self.project = self.resource_api.create_project(self.project_id,
self.project)
self.project = PROVIDERS.resource_api.create_project(
self.project_id, self.project
)
self.group = unit.new_group_ref(domain_id=self.domain_id)
self.group = self.identity_api.create_group(self.group)
self.group = PROVIDERS.identity_api.create_group(self.group)
self.group_id = self.group['id']
# Creates three users each with password expiration offset
# by one day, starting with the current time frozen.
@ -366,43 +370,45 @@ class IdentityPasswordExpiryFilteredTestCase(filtering.FilterTests,
with freezegun.freeze_time(self.starttime):
self.config_fixture.config(group='security_compliance',
password_expires_days=1)
self.user = unit.create_user(self.identity_api,
self.user = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domain_id)
self.config_fixture.config(group='security_compliance',
password_expires_days=2)
self.user2 = unit.create_user(self.identity_api,
self.user2 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domain_id)
self.config_fixture.config(group='security_compliance',
password_expires_days=3)
self.user3 = unit.create_user(self.identity_api,
self.user3 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domain_id)
self.role = unit.new_role_ref(name='admin')
self.role_api.create_role(self.role['id'], self.role)
PROVIDERS.role_api.create_role(self.role['id'], self.role)
self.role_id = self.role['id']
# Grant admin role to the users created.
self.assignment_api.create_grant(self.role_id,
user_id=self.user['id'],
domain_id=self.domain_id)
self.assignment_api.create_grant(self.role_id,
user_id=self.user2['id'],
domain_id=self.domain_id)
self.assignment_api.create_grant(self.role_id,
user_id=self.user3['id'],
domain_id=self.domain_id)
self.assignment_api.create_grant(self.role_id,
user_id=self.user['id'],
project_id=self.project_id)
self.assignment_api.create_grant(self.role_id,
user_id=self.user2['id'],
project_id=self.project_id)
self.assignment_api.create_grant(self.role_id,
user_id=self.user3['id'],
project_id=self.project_id)
PROVIDERS.assignment_api.create_grant(
self.role_id, user_id=self.user['id'], domain_id=self.domain_id
)
PROVIDERS.assignment_api.create_grant(
self.role_id, user_id=self.user2['id'], domain_id=self.domain_id
)
PROVIDERS.assignment_api.create_grant(
self.role_id, user_id=self.user3['id'], domain_id=self.domain_id
)
PROVIDERS.assignment_api.create_grant(
self.role_id, user_id=self.user['id'], project_id=self.project_id
)
PROVIDERS.assignment_api.create_grant(
self.role_id, user_id=self.user2['id'], project_id=self.project_id
)
PROVIDERS.assignment_api.create_grant(
self.role_id, user_id=self.user3['id'], project_id=self.project_id
)
# Add the last two users to the group.
self.identity_api.add_user_to_group(self.user2['id'],
self.group_id)
self.identity_api.add_user_to_group(self.user3['id'],
self.group_id)
PROVIDERS.identity_api.add_user_to_group(
self.user2['id'], self.group_id
)
PROVIDERS.identity_api.add_user_to_group(
self.user3['id'], self.group_id
)
def _list_users_by_password_expires_at(self, time, operator=None):
"""Call `list_users` with `password_expires_at` filter.
@ -741,16 +747,18 @@ class IdentityTestListLimitCase(IdentityTestFilteredCase):
self.addCleanup(self.clean_up_service)
for _ in range(10):
new_entity = unit.new_service_ref()
service = self.catalog_api.create_service(new_entity['id'],
new_entity)
service = PROVIDERS.catalog_api.create_service(
new_entity['id'], new_entity
)
self.service_list.append(service)
self.policy_list = []
self.addCleanup(self.clean_up_policy)
for _ in range(10):
new_entity = unit.new_policy_ref()
policy = self.policy_api.create_policy(new_entity['id'],
new_entity)
policy = PROVIDERS.policy_api.create_policy(
new_entity['id'], new_entity
)
self.policy_list.append(policy)
def clean_up_entity(self, entity):
@ -760,12 +768,12 @@ class IdentityTestListLimitCase(IdentityTestFilteredCase):
def clean_up_service(self):
"""Clean up service test data from Identity Limit Test Cases."""
for service in self.service_list:
self.catalog_api.delete_service(service['id'])
PROVIDERS.catalog_api.delete_service(service['id'])
def clean_up_policy(self):
"""Clean up policy test data from Identity Limit Test Cases."""
for policy in self.policy_list:
self.policy_api.delete_policy(policy['id'])
PROVIDERS.policy_api.delete_policy(policy['id'])
def _test_entity_list_limit(self, entity, driver):
"""GET /<entities> (limited).

View File

@ -25,6 +25,7 @@ from six.moves import http_client
from six.moves import urllib
from six.moves.urllib import parse as urlparse
from keystone.common import provider_api
import keystone.conf
from keystone import exception
from keystone import oauth1
@ -38,6 +39,7 @@ from keystone.tests.unit import test_v3
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
def _urllib_parse_qs_text_keys(content):
@ -535,14 +537,14 @@ class AuthTokenTests(object):
def test_delete_keystone_tokens_by_consumer_id(self):
self.test_oauth_flow()
self.token_provider_api._persistence.get_token(
PROVIDERS.token_provider_api._persistence.get_token(
self.keystone_token_id)
self.token_provider_api._persistence.delete_tokens(
PROVIDERS.token_provider_api._persistence.delete_tokens(
self.user_id,
consumer_id=self.consumer['key'])
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api._persistence.get_token,
PROVIDERS.token_provider_api._persistence.get_token,
self.keystone_token_id)
def _create_trust_get_token(self):
@ -882,7 +884,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
credentials = _urllib_parse_qs_text_keys(content.result)
request_key = credentials['oauth_token'][0]
self.assignment_api.remove_role_from_user_and_project(
PROVIDERS.assignment_api.remove_role_from_user_and_project(
self.user_id, self.project_id, self.role_id)
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
@ -927,14 +929,14 @@ class MaliciousOAuth1Tests(OAuth1Tests):
resp = self.put(url, body=body, expected_status=http_client.OK)
verifier = resp.result['token']['oauth_verifier']
request_token.set_verifier(verifier)
request_token_created = self.oauth_api.get_request_token(
request_token_created = PROVIDERS.oauth_api.get_request_token(
request_key.decode('utf-8'))
request_token_created.update({'authorizing_user_id': ''})
# Update the request token that is created instead of mocking
# the whole token object to focus on what's we want to test
# here and avoid any other factors that will result in the same
# exception.
with mock.patch.object(self.oauth_api,
with mock.patch.object(PROVIDERS.oauth_api,
'get_request_token') as mock_token:
mock_token.return_value = request_token_created
url, headers = self._create_access_token(consumer, request_token)
@ -1069,7 +1071,7 @@ class OAuthNotificationTests(OAuth1Tests,
def test_update_consumer(self):
consumer_ref = self._create_single_consumer()
update_ref = {'consumer': {'description': uuid.uuid4().hex}}
self.oauth_api.update_consumer(consumer_ref['id'], update_ref)
PROVIDERS.oauth_api.update_consumer(consumer_ref['id'], update_ref)
self._assert_notify_sent(consumer_ref['id'],
test_notifications.UPDATED_OPERATION,
'OS-OAUTH1:consumer')
@ -1080,7 +1082,7 @@ class OAuthNotificationTests(OAuth1Tests,
def test_delete_consumer(self):
consumer_ref = self._create_single_consumer()
self.oauth_api.delete_consumer(consumer_ref['id'])
PROVIDERS.oauth_api.delete_consumer(consumer_ref['id'])
self._assert_notify_sent(consumer_ref['id'],
test_notifications.DELETED_OPERATION,
'OS-OAUTH1:consumer')

View File

@ -21,10 +21,13 @@ import six
from six.moves import http_client
from testtools import matchers
from keystone.common import provider_api
from keystone.common import utils
from keystone.models import revoke_model
from keystone.tests.unit import test_v3
PROVIDERS = provider_api.ProviderAPIs
def _future_time_string():
expire_delta = datetime.timedelta(seconds=1000)
@ -79,7 +82,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
sample = self._blank_event()
sample['audit_id'] = six.text_type(audit_id)
before_time = timeutils.utcnow().replace(microsecond=0)
self.revoke_api.revoke_by_audit_id(audit_id)
PROVIDERS.revoke_api.revoke_by_audit_id(audit_id)
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(1, len(events))
@ -90,7 +93,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
sample = dict()
sample['project_id'] = six.text_type(project_id)
before_time = timeutils.utcnow().replace(microsecond=0)
self.revoke_api.revoke(
PROVIDERS.revoke_api.revoke(
revoke_model.RevokeEvent(project_id=project_id))
resp = self.get('/OS-REVOKE/events')
@ -103,7 +106,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
sample = dict()
sample['domain_id'] = six.text_type(domain_id)
before_time = timeutils.utcnow().replace(microsecond=0)
self.revoke_api.revoke(
PROVIDERS.revoke_api.revoke(
revoke_model.RevokeEvent(domain_id=domain_id))
resp = self.get('/OS-REVOKE/events')
@ -125,7 +128,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
sample = dict()
sample['domain_id'] = six.text_type(domain_id)
self.revoke_api.revoke(
PROVIDERS.revoke_api.revoke(
revoke_model.RevokeEvent(domain_id=domain_id))
resp = self.get('/OS-REVOKE/events')
@ -141,7 +144,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
with freezegun.freeze_time(time) as frozen_datetime:
revoked_at = timeutils.utcnow()
# Given or not, `revoked_at` will always be set in the backend.
self.revoke_api.revoke(
PROVIDERS.revoke_api.revoke(
revoke_model.RevokeEvent(revoked_at=revoked_at))
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
@ -157,7 +160,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
ref = {'description': uuid.uuid4().hex}
resp = self.post('/OS-OAUTH1/consumers', body={'consumer': ref})
consumer_id = resp.result['consumer']['id']
self.oauth_api.delete_consumer(consumer_id)
PROVIDERS.oauth_api.delete_consumer(consumer_id)
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
@ -194,7 +197,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
sql_delete_mock.side_effect = side_effect
try:
self.revoke_api.revoke(revoke_model.RevokeEvent(
PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
user_id=uuid.uuid4().hex))
finally:
if side_effect.patched:

View File

@ -17,9 +17,12 @@ import uuid
from six.moves import http_client
from keystone.common import provider_api
from keystone.tests import unit
from keystone.tests.unit import test_v3
PROVIDERS = provider_api.ProviderAPIs
class PolicyTestCase(test_v3.RestfulTestCase):
"""Test policy CRUD."""
@ -28,7 +31,7 @@ class PolicyTestCase(test_v3.RestfulTestCase):
super(PolicyTestCase, self).setUp()
self.policy = unit.new_policy_ref()
self.policy_id = self.policy['id']
self.policy_api.create_policy(
PROVIDERS.policy_api.create_policy(
self.policy_id,
self.policy.copy())

View File

@ -18,6 +18,7 @@ import uuid
from oslo_serialization import jsonutils
from six.moves import http_client
from keystone.common import provider_api
import keystone.conf
from keystone.credential.providers import fernet as credential_fernet
from keystone import exception
@ -29,6 +30,7 @@ from keystone.tests.unit import utils
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class IdentityTestProtectedCase(test_v3.RestfulTestCase):
@ -67,55 +69,64 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
# Start by creating a couple of domains
self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
self.domainB = unit.new_domain_ref()
self.resource_api.create_domain(self.domainB['id'], self.domainB)
PROVIDERS.resource_api.create_domain(self.domainB['id'], self.domainB)
self.domainC = unit.new_domain_ref(enabled=False)
self.resource_api.create_domain(self.domainC['id'], self.domainC)
PROVIDERS.resource_api.create_domain(self.domainC['id'], self.domainC)
# Some projects in the domains
self.projectA = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(self.projectA['id'], self.projectA)
PROVIDERS.resource_api.create_project(
self.projectA['id'], self.projectA
)
self.projectB = unit.new_project_ref(domain_id=self.domainB['id'])
self.resource_api.create_project(self.projectB['id'], self.projectB)
PROVIDERS.resource_api.create_project(
self.projectB['id'], self.projectB
)
# Now create some users, one in domainA and two of them in domainB
self.user1 = unit.create_user(self.identity_api,
self.user1 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainA['id'])
self.user2 = unit.create_user(self.identity_api,
self.user2 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainB['id'])
self.user3 = unit.create_user(self.identity_api,
self.user3 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainB['id'])
self.group1 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group1 = self.identity_api.create_group(self.group1)
self.group1 = PROVIDERS.identity_api.create_group(self.group1)
self.group2 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group2 = self.identity_api.create_group(self.group2)
self.group2 = PROVIDERS.identity_api.create_group(self.group2)
self.group3 = unit.new_group_ref(domain_id=self.domainB['id'])
self.group3 = self.identity_api.create_group(self.group3)
self.group3 = PROVIDERS.identity_api.create_group(self.group3)
self.role = unit.new_role_ref()
self.role_api.create_role(self.role['id'], self.role)
PROVIDERS.role_api.create_role(self.role['id'], self.role)
self.role1 = unit.new_role_ref()
self.role_api.create_role(self.role1['id'], self.role1)
PROVIDERS.role_api.create_role(self.role1['id'], self.role1)
self.assignment_api.create_grant(self.role['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.user2['id'],
domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.role1['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.user1['id'],
project_id=self.projectA['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.user2['id'],
project_id=self.projectB['id'])
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user1['id'],
domain_id=self.domainA['id']
)
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user2['id'],
domain_id=self.domainA['id']
)
PROVIDERS.assignment_api.create_grant(
self.role1['id'], user_id=self.user1['id'],
domain_id=self.domainA['id']
)
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user1['id'],
project_id=self.projectA['id']
)
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user2['id'],
project_id=self.projectB['id']
)
def _get_id_list_from_ref_list(self, ref_list):
result_list = []
@ -388,33 +399,36 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
self._populate_default_domain()
self.just_a_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=CONF.identity.default_domain_id)
self.another_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=CONF.identity.default_domain_id)
self.admin_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=CONF.identity.default_domain_id)
self.role = unit.new_role_ref()
self.role_api.create_role(self.role['id'], self.role)
PROVIDERS.role_api.create_role(self.role['id'], self.role)
self.admin_role = unit.new_role_ref(name='admin')
self.role_api.create_role(self.admin_role['id'], self.admin_role)
PROVIDERS.role_api.create_role(self.admin_role['id'], self.admin_role)
# Create and assign roles to the project
self.project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(self.project['id'], self.project)
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
project_id=self.project['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.another_user['id'],
project_id=self.project['id'])
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.admin_user['id'],
project_id=self.project['id'])
PROVIDERS.resource_api.create_project(self.project['id'], self.project)
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.just_a_user['id'],
project_id=self.project['id']
)
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.another_user['id'],
project_id=self.project['id']
)
PROVIDERS.assignment_api.create_grant(
self.admin_role['id'], user_id=self.admin_user['id'],
project_id=self.project['id']
)
def test_user_validate_same_token(self):
# Given a non-admin user token, the token can be used to validate
@ -677,85 +691,95 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
# Start by creating a couple of domains
self._populate_default_domain()
self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
self.domainB = unit.new_domain_ref()
self.resource_api.create_domain(self.domainB['id'], self.domainB)
PROVIDERS.resource_api.create_domain(self.domainB['id'], self.domainB)
self.admin_domain = unit.new_domain_ref()
self.resource_api.create_domain(self.admin_domain['id'],
self.admin_domain)
PROVIDERS.resource_api.create_domain(
self.admin_domain['id'], self.admin_domain
)
self.admin_project = unit.new_project_ref(
domain_id=self.admin_domain['id'])
self.resource_api.create_project(self.admin_project['id'],
self.admin_project)
PROVIDERS.resource_api.create_project(
self.admin_project['id'], self.admin_project
)
# And our users
self.cloud_admin_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=self.admin_domain['id'])
self.just_a_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=self.domainA['id'])
self.domain_admin_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=self.domainA['id'])
self.domainB_admin_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=self.domainB['id'])
self.project_admin_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=self.domainA['id'])
self.project_adminB_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=self.domainB['id'])
# The admin role, a domain specific role and another plain role
self.admin_role = unit.new_role_ref(name='admin')
self.role_api.create_role(self.admin_role['id'], self.admin_role)
PROVIDERS.role_api.create_role(self.admin_role['id'], self.admin_role)
self.roleA = unit.new_role_ref(domain_id=self.domainA['id'])
self.role_api.create_role(self.roleA['id'], self.roleA)
PROVIDERS.role_api.create_role(self.roleA['id'], self.roleA)
self.role = unit.new_role_ref()
self.role_api.create_role(self.role['id'], self.role)
PROVIDERS.role_api.create_role(self.role['id'], self.role)
# The cloud admin just gets the admin role on the special admin project
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.cloud_admin_user['id'],
project_id=self.admin_project['id'])
PROVIDERS.assignment_api.create_grant(
self.admin_role['id'], user_id=self.cloud_admin_user['id'],
project_id=self.admin_project['id']
)
# Assign roles to the domain
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.domain_admin_user['id'],
domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.domainB_admin_user['id'],
domain_id=self.domainB['id'])
PROVIDERS.assignment_api.create_grant(
self.admin_role['id'], user_id=self.domain_admin_user['id'],
domain_id=self.domainA['id']
)
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.just_a_user['id'],
domain_id=self.domainA['id']
)
PROVIDERS.assignment_api.create_grant(
self.admin_role['id'], user_id=self.domainB_admin_user['id'],
domain_id=self.domainB['id']
)
# Create and assign roles to the project
self.project = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(self.project['id'], self.project)
PROVIDERS.resource_api.create_project(self.project['id'], self.project)
self.projectB = unit.new_project_ref(domain_id=self.domainB['id'])
self.resource_api.create_project(self.projectB['id'], self.projectB)
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.project_admin_user['id'],
project_id=self.project['id'])
self.assignment_api.create_grant(
PROVIDERS.resource_api.create_project(
self.projectB['id'], self.projectB
)
PROVIDERS.assignment_api.create_grant(
self.admin_role['id'], user_id=self.project_admin_user['id'],
project_id=self.project['id']
)
PROVIDERS.assignment_api.create_grant(
self.admin_role['id'], user_id=self.project_adminB_user['id'],
project_id=self.projectB['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
project_id=self.project['id'])
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.just_a_user['id'],
project_id=self.project['id']
)
self.group1 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group1 = self.identity_api.create_group(self.group1)
self.group1 = PROVIDERS.identity_api.create_group(self.group1)
self.group2 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group2 = self.identity_api.create_group(self.group2)
self.group2 = PROVIDERS.identity_api.create_group(self.group2)
self.group3 = unit.new_group_ref(domain_id=self.domainB['id'])
self.group3 = self.identity_api.create_group(self.group3)
self.group3 = PROVIDERS.identity_api.create_group(self.group3)
def _stati(self, expected_status):
# Return the expected return codes for APIs with and without data
@ -854,7 +878,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
list_status_OK=False, expected=None):
status_OK, status_created, status_no_data = self._stati(expected)
a_role = unit.new_role_ref(domain_id=role_domain_id)
self.role_api.create_role(a_role['id'], a_role)
PROVIDERS.role_api.create_role(a_role['id'], a_role)
collection_url = (
'/%(target)s/%(target_id)s/users/%(user_id)s/roles' % {
@ -1397,7 +1421,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
def test_project_admin_list_assignments_of_another_project_failed(self):
projectB = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(projectB['id'], projectB)
PROVIDERS.resource_api.create_project(projectB['id'], projectB)
admin_auth = self.build_authentication_request(
user_id=self.project_admin_user['id'],
password=self.project_admin_user['password'],
@ -1439,10 +1463,11 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
# Add a child project to the standard test data
sub_project = unit.new_project_ref(domain_id=self.domainA['id'],
parent_id=self.project['id'])
self.resource_api.create_project(sub_project['id'], sub_project)
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
project_id=sub_project['id'])
PROVIDERS.resource_api.create_project(sub_project['id'], sub_project)
PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.just_a_user['id'],
project_id=sub_project['id']
)
collection_url = self.build_role_assignment_query_url(
project_id=self.project['id'])
@ -1469,11 +1494,12 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
# A neither should a domain admin from a different domain
domainB_admin_user = unit.create_user(
self.identity_api,
PROVIDERS.identity_api,
domain_id=self.domainB['id'])
self.assignment_api.create_grant(self.admin_role['id'],
user_id=domainB_admin_user['id'],
domain_id=self.domainB['id'])
PROVIDERS.assignment_api.create_grant(
self.admin_role['id'], user_id=domainB_admin_user['id'],
domain_id=self.domainB['id']
)
auth = self.build_authentication_request(
user_id=domainB_admin_user['id'],
password=domainB_admin_user['password'],
@ -1535,11 +1561,13 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
def test_list_user_credentials(self):
credential_user = unit.new_credential_ref(self.just_a_user['id'])
self.credential_api.create_credential(credential_user['id'],
credential_user)
PROVIDERS.credential_api.create_credential(
credential_user['id'], credential_user
)
credential_admin = unit.new_credential_ref(self.cloud_admin_user['id'])
self.credential_api.create_credential(credential_admin['id'],
credential_admin)
PROVIDERS.credential_api.create_credential(
credential_admin['id'], credential_admin
)
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
@ -1555,7 +1583,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
def test_get_and_delete_ec2_credentials(self):
"""Test getting and deleting ec2 credentials through the ec2 API."""
another_user = unit.create_user(self.identity_api,
another_user = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainA['id'])
# create a credential for just_a_user
@ -1826,7 +1854,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
# Test user can not get project for one they don't have a role in,
# even if they have a role on another project
project2 = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(project2['id'], project2)
PROVIDERS.resource_api.create_project(project2['id'], project2)
self.get('/projects/%s' % project2['id'], auth=user_auth,
expected_status=exception.ForbiddenAction.code)
@ -1974,14 +2002,19 @@ class IdentityTestImpliedDomainSpecificRoles(IdentityTestv3CloudPolicySample):
self.admin_token = self.get_requested_token(domain_admin_auth)
self.appdev_role = unit.new_role_ref(domain_id=self.domainA['id'])
self.role_api.create_role(self.appdev_role['id'], self.appdev_role)
PROVIDERS.role_api.create_role(
self.appdev_role['id'], self.appdev_role
)
self.appadmin_role = unit.new_role_ref(domain_id=self.domainA['id'])
self.role_api.create_role(self.appadmin_role['id'], self.appadmin_role)
PROVIDERS.role_api.create_role(
self.appadmin_role['id'], self.appadmin_role
)
def _create_implied_role(self):
self.role_api.create_implied_role(self.appadmin_role['id'],
self.appdev_role['id'])
PROVIDERS.role_api.create_implied_role(
self.appadmin_role['id'], self.appdev_role['id']
)
def test_get(self):
# A domain admin should be able to get an existing implied role
@ -2028,10 +2061,10 @@ class IdentityTestImpliedDomainSpecificRoles(IdentityTestv3CloudPolicySample):
def test_forbidden_role_implication_from_different_domain(self):
domain2 = unit.new_domain_ref(domain_id=uuid.uuid4().hex)
self.resource_api.create_domain(domain2['id'], domain2)
PROVIDERS.resource_api.create_domain(domain2['id'], domain2)
role2 = unit.new_role_ref(domain_id=domain2['id'])
implied = self.role_api.create_role(role2['id'], role2)
implied = PROVIDERS.role_api.create_role(role2['id'], role2)
self.put('/roles/%s/implies/%s'
% (self.appdev_role['id'], implied['id']),
@ -2045,10 +2078,10 @@ class IdentityTestImpliedDomainSpecificRoles(IdentityTestv3CloudPolicySample):
project_id=self.admin_project['id'])
domain2 = unit.new_domain_ref(domain_id=uuid.uuid4().hex)
self.resource_api.create_domain(domain2['id'], domain2)
PROVIDERS.resource_api.create_domain(domain2['id'], domain2)
role2 = unit.new_role_ref(domain_id=domain2['id'])
implied = self.role_api.create_role(role2['id'], role2)
implied = PROVIDERS.role_api.create_role(role2['id'], role2)
self.put('/roles/%s/implies/%s'
% (self.appdev_role['id'], implied['id']),

View File

@ -19,12 +19,14 @@ from oslo_utils import timeutils
import six
from six.moves import range
from keystone.common import provider_api
from keystone import exception
from keystone.tests import unit
from keystone.token import provider
NULL_OBJECT = object()
PROVIDERS = provider_api.ProviderAPIs
class TokenTests(object):
@ -37,7 +39,7 @@ class TokenTests(object):
# token persistence service
persistence_list = [
x['id']
for x in self.token_provider_api.list_revoked_tokens()
for x in PROVIDERS.token_provider_api.list_revoked_tokens()
]
self.assertEqual(persistence_list, revoked_token_id_list)
@ -48,8 +50,9 @@ class TokenTests(object):
'user': {'id': 'testuserid'},
'token_data': {'access': {'token': {
'audit_ids': [uuid.uuid4().hex]}}}}
data_ref = self.token_provider_api._persistence.create_token(token_id,
data)
data_ref = PROVIDERS.token_provider_api._persistence.create_token(
token_id, data
)
expires = data_ref.pop('expires')
data_ref.pop('user_id')
self.assertIsInstance(expires, datetime.datetime)
@ -57,7 +60,9 @@ class TokenTests(object):
data.pop('id')
self.assertDictEqual(data, data_ref)
new_data_ref = self.token_provider_api._persistence.get_token(token_id)
new_data_ref = PROVIDERS.token_provider_api._persistence.get_token(
token_id
)
expires = new_data_ref.pop('expires')
self.assertIsInstance(expires, datetime.datetime)
new_data_ref.pop('user_id')
@ -65,13 +70,13 @@ class TokenTests(object):
self.assertEqual(data, new_data_ref)
self.token_provider_api._persistence.delete_token(token_id)
PROVIDERS.token_provider_api._persistence.delete_token(token_id)
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api._persistence.get_token, token_id)
PROVIDERS.token_provider_api._persistence.get_token, token_id)
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api._persistence.delete_token, token_id)
PROVIDERS.token_provider_api._persistence.delete_token, token_id)
def create_token_sample_data(self, token_id=None, tenant_id=None,
trust_id=None, user_id=None, expires=None):
@ -101,12 +106,13 @@ class TokenTests(object):
# Issue token stores a copy of all token data at token['token_data'].
# This emulates that assumption as part of the test.
data['token_data'] = copy.deepcopy(data)
new_token = self.token_provider_api._persistence.create_token(token_id,
data)
new_token = PROVIDERS.token_provider_api._persistence.create_token(
token_id, data
)
return new_token['id'], data
def test_delete_tokens(self):
tokens = self.token_provider_api._persistence._list_tokens(
tokens = PROVIDERS.token_provider_api._persistence._list_tokens(
'testuserid')
self.assertEqual(0, len(tokens))
token_id1, data = self.create_token_sample_data(
@ -116,28 +122,28 @@ class TokenTests(object):
token_id3, data = self.create_token_sample_data(
tenant_id='testtenantid',
user_id='testuserid1')
tokens = self.token_provider_api._persistence._list_tokens(
tokens = PROVIDERS.token_provider_api._persistence._list_tokens(
'testuserid')
self.assertEqual(2, len(tokens))
self.assertIn(token_id2, tokens)
self.assertIn(token_id1, tokens)
self.token_provider_api._persistence.delete_tokens(
PROVIDERS.token_provider_api._persistence.delete_tokens(
user_id='testuserid',
tenant_id='testtenantid')
tokens = self.token_provider_api._persistence._list_tokens(
tokens = PROVIDERS.token_provider_api._persistence._list_tokens(
'testuserid')
self.assertEqual(0, len(tokens))
self.assertRaises(exception.TokenNotFound,
self.token_provider_api._persistence.get_token,
PROVIDERS.token_provider_api._persistence.get_token,
token_id1)
self.assertRaises(exception.TokenNotFound,
self.token_provider_api._persistence.get_token,
PROVIDERS.token_provider_api._persistence.get_token,
token_id2)
self.token_provider_api._persistence.get_token(token_id3)
PROVIDERS.token_provider_api._persistence.get_token(token_id3)
def test_delete_tokens_trust(self):
tokens = self.token_provider_api._persistence._list_tokens(
tokens = PROVIDERS.token_provider_api._persistence._list_tokens(
user_id='testuserid')
self.assertEqual(0, len(tokens))
token_id1, data = self.create_token_sample_data(
@ -147,18 +153,18 @@ class TokenTests(object):
tenant_id='testtenantid',
user_id='testuserid1',
trust_id='testtrustid1')
tokens = self.token_provider_api._persistence._list_tokens(
tokens = PROVIDERS.token_provider_api._persistence._list_tokens(
'testuserid')
self.assertEqual(1, len(tokens))
self.assertIn(token_id1, tokens)
self.token_provider_api._persistence.delete_tokens(
PROVIDERS.token_provider_api._persistence.delete_tokens(
user_id='testuserid',
tenant_id='testtenantid',
trust_id='testtrustid')
self.assertRaises(exception.TokenNotFound,
self.token_provider_api._persistence.get_token,
PROVIDERS.token_provider_api._persistence.get_token,
token_id1)
self.token_provider_api._persistence.get_token(token_id2)
PROVIDERS.token_provider_api._persistence.get_token(token_id2)
def _test_token_list(self, token_list_fn):
tokens = token_list_fn('testuserid')
@ -172,11 +178,11 @@ class TokenTests(object):
self.assertEqual(2, len(tokens))
self.assertIn(token_id2, tokens)
self.assertIn(token_id1, tokens)
self.token_provider_api._persistence.delete_token(token_id1)
PROVIDERS.token_provider_api._persistence.delete_token(token_id1)
tokens = token_list_fn('testuserid')
self.assertIn(token_id2, tokens)
self.assertNotIn(token_id1, tokens)
self.token_provider_api._persistence.delete_token(token_id2)
PROVIDERS.token_provider_api._persistence.delete_token(token_id2)
tokens = token_list_fn('testuserid')
self.assertNotIn(token_id2, tokens)
self.assertNotIn(token_id1, tokens)
@ -204,34 +210,39 @@ class TokenTests(object):
def test_token_list(self):
self._test_token_list(
self.token_provider_api._persistence._list_tokens)
PROVIDERS.token_provider_api._persistence._list_tokens)
def test_token_list_trust(self):
trust_id = uuid.uuid4().hex
token_id5, data = self.create_token_sample_data(trust_id=trust_id)
tokens = self.token_provider_api._persistence._list_tokens(
tokens = PROVIDERS.token_provider_api._persistence._list_tokens(
'testuserid', trust_id=trust_id)
self.assertEqual(1, len(tokens))
self.assertIn(token_id5, tokens)
def test_get_token_returns_not_found(self):
self.assertRaises(exception.TokenNotFound,
self.token_provider_api._persistence.get_token,
PROVIDERS.token_provider_api._persistence.get_token,
uuid.uuid4().hex)
def test_delete_token_returns_not_found(self):
self.assertRaises(exception.TokenNotFound,
self.token_provider_api._persistence.delete_token,
uuid.uuid4().hex)
self.assertRaises(
exception.TokenNotFound,
PROVIDERS.token_provider_api._persistence.delete_token,
uuid.uuid4().hex
)
def test_null_expires_token(self):
token_id = uuid.uuid4().hex
data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None,
'user': {'id': 'testuserid'}}
data_ref = self.token_provider_api._persistence.create_token(token_id,
data)
data_ref = PROVIDERS.token_provider_api._persistence.create_token(
token_id, data
)
self.assertIsNotNone(data_ref['expires'])
new_data_ref = self.token_provider_api._persistence.get_token(token_id)
new_data_ref = PROVIDERS.token_provider_api._persistence.get_token(
token_id
)
# MySQL doesn't store microseconds, so discard them before testing
data_ref['expires'] = data_ref['expires'].replace(microsecond=0)
@ -241,7 +252,7 @@ class TokenTests(object):
self.assertEqual(data_ref, new_data_ref)
def check_list_revoked_tokens(self, token_infos):
revocation_list = self.token_provider_api.list_revoked_tokens()
revocation_list = PROVIDERS.token_provider_api.list_revoked_tokens()
revoked_ids = [x['id'] for x in revocation_list]
revoked_audit_ids = [x['audit_id'] for x in revocation_list]
self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
@ -255,22 +266,24 @@ class TokenTests(object):
data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
'user': {'id': 'testuserid'},
'token_data': {'token': {'audit_ids': [audit_id]}}}
data_ref = self.token_provider_api._persistence.create_token(token_id,
data)
self.token_provider_api._persistence.delete_token(token_id)
data_ref = PROVIDERS.token_provider_api._persistence.create_token(
token_id, data
)
PROVIDERS.token_provider_api._persistence.delete_token(token_id)
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api._persistence.get_token,
PROVIDERS.token_provider_api._persistence.get_token,
data_ref['id'])
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api._persistence.delete_token,
PROVIDERS.token_provider_api._persistence.delete_token,
data_ref['id'])
return (token_id, audit_id)
def test_list_revoked_tokens_returns_empty_list(self):
revoked_ids = [x['id']
for x in self.token_provider_api.list_revoked_tokens()]
revoked_ids = [
x['id'] for x in PROVIDERS.token_provider_api.list_revoked_tokens()
]
self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
self.assertEqual([], revoked_ids)
@ -289,8 +302,9 @@ class TokenTests(object):
'expires': expire_time,
'trust_id': None,
'user': {'id': 'testuserid'}}
data_ref = self.token_provider_api._persistence.create_token(token_id,
data)
data_ref = PROVIDERS.token_provider_api._persistence.create_token(
token_id, data
)
data_ref.pop('user_id')
self.assertDictEqual(data, data_ref)
@ -300,13 +314,14 @@ class TokenTests(object):
'expires': expire_time,
'trust_id': None,
'user': {'id': 'testuserid'}}
data_ref = self.token_provider_api._persistence.create_token(token_id,
data)
data_ref = PROVIDERS.token_provider_api._persistence.create_token(
token_id, data
)
data_ref.pop('user_id')
self.assertDictEqual(data, data_ref)
self.token_provider_api._persistence.flush_expired_tokens()
tokens = self.token_provider_api._persistence._list_tokens(
PROVIDERS.token_provider_api._persistence.flush_expired_tokens()
tokens = PROVIDERS.token_provider_api._persistence._list_tokens(
'testuserid')
self.assertEqual(1, len(tokens))
self.assertIn(token_id, tokens)
@ -329,31 +344,41 @@ class TokenTests(object):
'token_data': {'token': {
'audit_ids': [uuid.uuid4().hex]}}}
# Create 2 Tokens.
self.token_provider_api._persistence.create_token(token_id,
token_data)
self.token_provider_api._persistence.create_token(token2_id,
token2_data)
PROVIDERS.token_provider_api._persistence.create_token(
token_id, token_data
)
PROVIDERS.token_provider_api._persistence.create_token(
token2_id, token2_data
)
# Verify the revocation list is empty.
self.assertEqual(
[], self.token_provider_api._persistence.list_revoked_tokens())
self.assertEqual([], self.token_provider_api.list_revoked_tokens())
[], PROVIDERS.token_provider_api._persistence.list_revoked_tokens()
)
self.assertEqual(
[], PROVIDERS.token_provider_api.list_revoked_tokens()
)
# Delete a token directly, bypassing the manager.
self.token_provider_api._persistence.driver.delete_token(token_id)
PROVIDERS.token_provider_api._persistence.driver.delete_token(token_id)
# Verify the revocation list is still empty.
self.assertEqual(
[], self.token_provider_api._persistence.list_revoked_tokens())
self.assertEqual([], self.token_provider_api.list_revoked_tokens())
[], PROVIDERS.token_provider_api._persistence.list_revoked_tokens()
)
self.assertEqual(
[], PROVIDERS.token_provider_api.list_revoked_tokens()
)
# Invalidate the revocation list.
self.token_provider_api._persistence.invalidate_revocation_list()
PROVIDERS.token_provider_api._persistence.invalidate_revocation_list()
# Verify the deleted token is in the revocation list.
revoked_ids = [x['id']
for x in self.token_provider_api.list_revoked_tokens()]
revoked_ids = [
x['id'] for x in PROVIDERS.token_provider_api.list_revoked_tokens()
]
self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
self.assertIn(token_id, revoked_ids)
# Delete the second token, through the manager
self.token_provider_api._persistence.delete_token(token2_id)
revoked_ids = [x['id']
for x in self.token_provider_api.list_revoked_tokens()]
PROVIDERS.token_provider_api._persistence.delete_token(token2_id)
revoked_ids = [
x['id'] for x in PROVIDERS.token_provider_api.list_revoked_tokens()
]
self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
# Verify both tokens are in the revocation list.
self.assertIn(token_id, revoked_ids)
@ -364,10 +389,10 @@ class TokenTests(object):
token = {'user': {'id': uuid.uuid4().hex},
'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}}
self.token_provider_api._persistence.create_token(token_id, token)
self.token_provider_api._persistence.delete_token(token_id)
PROVIDERS.token_provider_api._persistence.create_token(token_id, token)
PROVIDERS.token_provider_api._persistence.delete_token(token_id)
revoked_tokens = self.token_provider_api.list_revoked_tokens()
revoked_tokens = PROVIDERS.token_provider_api.list_revoked_tokens()
revoked_ids = [x['id'] for x in revoked_tokens]
self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
self.assertIn(token_id, revoked_ids)
@ -377,12 +402,12 @@ class TokenTests(object):
def test_create_unicode_token_id(self):
token_id = six.text_type(self._create_token_id())
self.create_token_sample_data(token_id=token_id)
self.token_provider_api._persistence.get_token(token_id)
PROVIDERS.token_provider_api._persistence.get_token(token_id)
def test_create_unicode_user_id(self):
user_id = six.text_type(uuid.uuid4().hex)
token_id, data = self.create_token_sample_data(user_id=user_id)
self.token_provider_api._persistence.get_token(token_id)
PROVIDERS.token_provider_api._persistence.get_token(token_id)
class TokenCacheInvalidation(object):
@ -390,7 +415,7 @@ class TokenCacheInvalidation(object):
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
# Create an equivalent of a scoped token
token_id, data = self.token_provider_api.issue_token(
token_id, data = PROVIDERS.token_provider_api.issue_token(
self.user_foo['id'],
['password'],
project_id=self.tenant_bar['id']
@ -398,7 +423,7 @@ class TokenCacheInvalidation(object):
self.scoped_token_id = token_id
# ..and an un-scoped one
token_id, data = self.token_provider_api.issue_token(
token_id, data = PROVIDERS.token_provider_api.issue_token(
self.user_foo['id'],
['password']
)
@ -407,28 +432,28 @@ class TokenCacheInvalidation(object):
# Validate them, in the various ways possible - this will load the
# responses into the token cache.
self.token_provider_api.validate_token(self.scoped_token_id)
self.token_provider_api.validate_token(self.unscoped_token_id)
PROVIDERS.token_provider_api.validate_token(self.scoped_token_id)
PROVIDERS.token_provider_api.validate_token(self.unscoped_token_id)
def test_delete_unscoped_token(self):
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
self.token_provider_api._persistence.delete_token(
PROVIDERS.token_provider_api._persistence.delete_token(
self.unscoped_token_id)
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
# Ensure the unscoped token is invalid
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_token,
PROVIDERS.token_provider_api.validate_token,
self.unscoped_token_id)
# Ensure the scoped token is still valid
self.token_provider_api.validate_token(self.scoped_token_id)
PROVIDERS.token_provider_api.validate_token(self.scoped_token_id)
def test_delete_scoped_token_by_id(self):
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
self.token_provider_api._persistence.delete_token(
PROVIDERS.token_provider_api._persistence.delete_token(
self.scoped_token_id
)
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
@ -436,15 +461,15 @@ class TokenCacheInvalidation(object):
# Ensure the project token is invalid
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_token,
PROVIDERS.token_provider_api.validate_token,
self.scoped_token_id)
# Ensure the unscoped token is still valid
self.token_provider_api.validate_token(self.unscoped_token_id)
PROVIDERS.token_provider_api.validate_token(self.unscoped_token_id)
def test_delete_scoped_token_by_user(self):
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
self.token_provider_api._persistence.delete_tokens(
PROVIDERS.token_provider_api._persistence.delete_tokens(
self.user_foo['id']
)
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
@ -453,17 +478,17 @@ class TokenCacheInvalidation(object):
# now be invalid.
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_token,
PROVIDERS.token_provider_api.validate_token,
self.scoped_token_id)
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_token,
PROVIDERS.token_provider_api.validate_token,
self.unscoped_token_id)
def test_delete_scoped_token_by_user_and_tenant(self):
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
self.token_provider_api._persistence.delete_tokens(
PROVIDERS.token_provider_api._persistence.delete_tokens(
self.user_foo['id'],
tenant_id=self.tenant_bar['id'])
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
@ -471,7 +496,7 @@ class TokenCacheInvalidation(object):
# Ensure the scoped token is invalid
self.assertRaises(
exception.TokenNotFound,
self.token_provider_api.validate_token,
PROVIDERS.token_provider_api.validate_token,
self.scoped_token_id)
# Ensure the unscoped token is still valid
self.token_provider_api.validate_token(self.unscoped_token_id)
PROVIDERS.token_provider_api.validate_token(self.unscoped_token_id)

View File

@ -21,6 +21,7 @@ from oslo_utils import timeutils
import six
from keystone import auth
from keystone.common import provider_api
from keystone.common import token_utils
from keystone.common import utils
import keystone.conf
@ -35,6 +36,7 @@ from keystone.token import token_formatters
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class TestFernetTokenProvider(unit.TestCase):
@ -94,17 +96,18 @@ class TestValidate(unit.TestCase):
# with a simple token.
domain_ref = unit.new_domain_ref()
domain_ref = self.resource_api.create_domain(domain_ref['id'],
domain_ref)
domain_ref = PROVIDERS.resource_api.create_domain(
domain_ref['id'], domain_ref
)
user_ref = unit.new_user_ref(domain_ref['id'])
user_ref = self.identity_api.create_user(user_ref)
user_ref = PROVIDERS.identity_api.create_user(user_ref)
method_names = ['password']
token_id, token_data_ = self.token_provider_api.issue_token(
token_id, token_data_ = PROVIDERS.token_provider_api.issue_token(
user_ref['id'], method_names)
token_data = self.token_provider_api.validate_token(token_id)
token_data = PROVIDERS.token_provider_api.validate_token(token_id)
token = token_data['token']
self.assertIsInstance(token['audit_ids'], list)
self.assertIsInstance(token['expires_at'], str)
@ -126,11 +129,12 @@ class TestValidate(unit.TestCase):
# when the token has federated info.
domain_ref = unit.new_domain_ref()
domain_ref = self.resource_api.create_domain(domain_ref['id'],
domain_ref)
domain_ref = PROVIDERS.resource_api.create_domain(
domain_ref['id'], domain_ref
)
user_ref = unit.new_user_ref(domain_ref['id'])
user_ref = self.identity_api.create_user(user_ref)
user_ref = PROVIDERS.identity_api.create_user(user_ref)
method_names = ['mapped']
@ -145,10 +149,10 @@ class TestValidate(unit.TestCase):
federation_constants.PROTOCOL: protocol,
}
auth_context = auth.core.AuthContext(**auth_context_params)
token_id, token_data_ = self.token_provider_api.issue_token(
token_id, token_data_ = PROVIDERS.token_provider_api.issue_token(
user_ref['id'], method_names, auth_context=auth_context)
token_data = self.token_provider_api.validate_token(token_id)
token_data = PROVIDERS.token_provider_api.validate_token(token_id)
token = token_data['token']
exp_user_info = {
'id': user_ref['id'],
@ -168,27 +172,29 @@ class TestValidate(unit.TestCase):
# when the token has trust info.
domain_ref = unit.new_domain_ref()
domain_ref = self.resource_api.create_domain(domain_ref['id'],
domain_ref)
domain_ref = PROVIDERS.resource_api.create_domain(
domain_ref['id'], domain_ref
)
user_ref = unit.new_user_ref(domain_ref['id'])
user_ref = self.identity_api.create_user(user_ref)
user_ref = PROVIDERS.identity_api.create_user(user_ref)
trustor_user_ref = unit.new_user_ref(domain_ref['id'])
trustor_user_ref = self.identity_api.create_user(trustor_user_ref)
trustor_user_ref = PROVIDERS.identity_api.create_user(trustor_user_ref)
project_ref = unit.new_project_ref(domain_id=domain_ref['id'])
project_ref = self.resource_api.create_project(project_ref['id'],
project_ref)
project_ref = PROVIDERS.resource_api.create_project(
project_ref['id'], project_ref
)
role_ref = unit.new_role_ref()
role_ref = self.role_api.create_role(role_ref['id'], role_ref)
role_ref = PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
self.assignment_api.create_grant(
PROVIDERS.assignment_api.create_grant(
role_ref['id'], user_id=user_ref['id'],
project_id=project_ref['id'])
self.assignment_api.create_grant(
PROVIDERS.assignment_api.create_grant(
role_ref['id'], user_id=trustor_user_ref['id'],
project_id=project_ref['id'])
@ -197,16 +203,17 @@ class TestValidate(unit.TestCase):
trust_ref = unit.new_trust_ref(
trustor_user_id, trustee_user_id, project_id=project_ref['id'],
role_ids=[role_ref['id'], ])
trust_ref = self.trust_api.create_trust(trust_ref['id'], trust_ref,
trust_ref['roles'])
trust_ref = PROVIDERS.trust_api.create_trust(
trust_ref['id'], trust_ref, trust_ref['roles']
)
method_names = ['password']
token_id, token_data_ = self.token_provider_api.issue_token(
token_id, token_data_ = PROVIDERS.token_provider_api.issue_token(
user_ref['id'], method_names, project_id=project_ref['id'],
trust=trust_ref)
token_data = self.token_provider_api.validate_token(token_id)
token_data = PROVIDERS.token_provider_api.validate_token(token_id)
token = token_data['token']
exp_trust_info = {
'id': trust_ref['id'],
@ -221,8 +228,11 @@ class TestValidate(unit.TestCase):
# A uuid string isn't a valid Fernet token.
token_id = uuid.uuid4().hex
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_token, token_id)
self.assertRaises(
exception.TokenNotFound,
PROVIDERS.token_provider_api.validate_token,
token_id
)
class TestTokenFormatter(unit.TestCase):