Add tests for domain users for policy association

This commit introduces some tests that show how domain users are
expected to behave with the policy assocation API. A subsequent
patches will -

 - project user test coverage for policy association
 - Removing obsolete policies in policy.v3cloudsample.json file
Partial-Bug: #1805409

Change-Id: If5953e3322c5f65deef0843c1be78ccf0df5b1ce
This commit is contained in:
Vishakha Agarwal 2019-08-26 12:03:52 +05:30
parent 2d185a5a91
commit 2af630f06a
1 changed files with 233 additions and 0 deletions

View File

@ -226,6 +226,204 @@ class _SystemReaderAndMemberPoliciesAssociationTests(object):
)
class _DomainAndProjectUserPolicyAssociationsTests(object):
def test_user_cannot_check_policy_association_for_endpoint(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
endpoint = PROVIDERS.catalog_api.create_endpoint(
endpoint['id'], endpoint
)
PROVIDERS.endpoint_policy_api.create_policy_association(
policy['id'], endpoint['id'])
with self.test_client() as c:
c.get('/v3/policies/%s/OS-ENDPOINT-POLICY/endpoints/%s'
% (policy['id'], endpoint['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_check_policy_association_for_service(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
PROVIDERS.endpoint_policy_api.create_policy_association(
policy['id'], service_id=service['id'])
with self.test_client() as c:
c.get('/v3/policies/%s/OS-ENDPOINT-POLICY/services/%s'
% (policy['id'], service['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_check_policy_association_for_region_and_service(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
region = PROVIDERS.catalog_api.create_region(unit.new_region_ref())
PROVIDERS.endpoint_policy_api.create_policy_association(
policy['id'], service_id=service['id'], region_id=region['id']
)
with self.test_client() as c:
c.get('/v3/policies/%s/OS-ENDPOINT-POLICY/services/%s/regions/%s'
% (policy['id'], service['id'], region['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_get_policy_for_endpoint(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
endpoint = PROVIDERS.catalog_api.create_endpoint(
endpoint['id'], endpoint
)
PROVIDERS.endpoint_policy_api.create_policy_association(
policy['id'], endpoint['id']
)
with self.test_client() as c:
c.get('/v3/endpoints/%s/OS-ENDPOINT-POLICY/policy'
% (endpoint['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_list_endpoints_for_policy(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
endpoint = PROVIDERS.catalog_api.create_endpoint(
endpoint['id'], endpoint
)
PROVIDERS.endpoint_policy_api.create_policy_association(
policy['id'], endpoint['id']
)
with self.test_client() as c:
c.get('/v3/policies/%s/OS-ENDPOINT-POLICY/endpoints'
% (policy['id']), headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_create_policy_association_for_endpoint(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
endpoint = PROVIDERS.catalog_api.create_endpoint(
endpoint['id'], endpoint
)
with self.test_client() as c:
c.put(
'/v3/policies/%s/OS-ENDPOINT-POLICY/endpoints/%s'
% (policy['id'], endpoint['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_policy_association_for_endpoint(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
endpoint = PROVIDERS.catalog_api.create_endpoint(
endpoint['id'], endpoint
)
with self.test_client() as c:
c.delete(
'/v3/policies/%s/OS-ENDPOINT-POLICY/endpoints/%s'
% (policy['id'], endpoint['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_create_policy_association_for_service(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
with self.test_client() as c:
c.put(
'/v3/policies/%s/OS-ENDPOINT-POLICY/services/%s'
% (policy['id'], service['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_policy_association_for_service(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
with self.test_client() as c:
c.delete(
'/v3/policies/%s/OS-ENDPOINT-POLICY/services/%s'
% (policy['id'], service['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_create_policy_association_for_region_and_service(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
region = PROVIDERS.catalog_api.create_region(unit.new_region_ref())
with self.test_client() as c:
c.put(
'/v3/policies/%s/OS-ENDPOINT-POLICY/services/%s/regions/%s'
% (policy['id'], service['id'], region['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_policy_association_for_region_and_service(self):
policy = unit.new_policy_ref()
policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
region = PROVIDERS.catalog_api.create_region(unit.new_region_ref())
with self.test_client() as c:
c.delete(
'/v3/policies/%s/OS-ENDPOINT-POLICY/services/%s/regions/%s'
% (policy['id'], service['id'], region['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserPoliciesAssociationTests,
@ -418,3 +616,38 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
headers=self.headers,
expected_status_code=http_client.NO_CONTENT
)
class DomainUserTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainAndProjectUserPolicyAssociationsTests):
def setUp(self):
super(DomainUserTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
domain_admin = unit.new_user_ref(domain_id=self.domain_id)
self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.admin_role_id, user_id=self.user_id,
domain_id=self.domain_id
)
auth = self.build_authentication_request(
user_id=self.user_id,
password=domain_admin['password'],
domain_id=self.domain_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}