openstacksdk/openstack/tests/unit/cloud/test_identity_roles.py

309 lines
12 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
import openstack.cloud
from openstack.tests.unit import base
from testtools import matchers
RAW_ROLE_ASSIGNMENTS = [
{
"links": {"assignment": "http://example"},
"role": {"id": "123456"},
"scope": {"domain": {"id": "161718"}},
"user": {"id": "313233"}
},
{
"links": {"assignment": "http://example"},
"group": {"id": "101112"},
"role": {"id": "123456"},
"scope": {"project": {"id": "456789"}}
}
]
class TestIdentityRoles(base.RequestsMockTestCase):
def get_mock_url(self, service_type='identity', interface='admin',
resource='roles', append=None, base_url_append='v3',
qs_elements=None):
return super(TestIdentityRoles, self).get_mock_url(
service_type, interface, resource, append, base_url_append,
qs_elements)
def test_list_roles(self):
role_data = self._get_role_data()
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(),
status_code=200,
json={'roles': [role_data.json_response['role']]})
])
self.op_cloud.list_roles()
self.assert_calls()
def test_get_role_by_name(self):
role_data = self._get_role_data()
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(),
status_code=200,
json={'roles': [role_data.json_response['role']]})
])
role = self.op_cloud.get_role(role_data.role_name)
self.assertIsNotNone(role)
self.assertThat(role.id, matchers.Equals(role_data.role_id))
self.assertThat(role.name, matchers.Equals(role_data.role_name))
self.assert_calls()
def test_get_role_by_id(self):
role_data = self._get_role_data()
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(),
status_code=200,
json={'roles': [role_data.json_response['role']]})
])
role = self.op_cloud.get_role(role_data.role_id)
self.assertIsNotNone(role)
self.assertThat(role.id, matchers.Equals(role_data.role_id))
self.assertThat(role.name, matchers.Equals(role_data.role_name))
self.assert_calls()
def test_create_role(self):
role_data = self._get_role_data()
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(),
status_code=200,
json=role_data.json_response,
validate=dict(json=role_data.json_request))
])
role = self.op_cloud.create_role(role_data.role_name)
self.assertIsNotNone(role)
self.assertThat(role.name, matchers.Equals(role_data.role_name))
self.assertThat(role.id, matchers.Equals(role_data.role_id))
self.assert_calls()
def test_update_role(self):
role_data = self._get_role_data()
req = {'role_id': role_data.role_id,
'role': {'name': role_data.role_name}}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(),
status_code=200,
json={'roles': [role_data.json_response['role']]}),
dict(method='PATCH',
uri=self.get_mock_url(),
status_code=200,
json=role_data.json_response,
validate=dict(json=req))
])
role = self.op_cloud.update_role(role_data.role_id,
role_data.role_name)
self.assertIsNotNone(role)
self.assertThat(role.name, matchers.Equals(role_data.role_name))
self.assertThat(role.id, matchers.Equals(role_data.role_id))
self.assert_calls()
def test_delete_role_by_id(self):
role_data = self._get_role_data()
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(),
status_code=200,
json={'roles': [role_data.json_response['role']]}),
dict(method='DELETE',
uri=self.get_mock_url(append=[role_data.role_id]),
status_code=204)
])
role = self.op_cloud.delete_role(role_data.role_id)
self.assertThat(role, matchers.Equals(True))
self.assert_calls()
def test_delete_role_by_name(self):
role_data = self._get_role_data()
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(),
status_code=200,
json={'roles': [role_data.json_response['role']]}),
dict(method='DELETE',
uri=self.get_mock_url(append=[role_data.role_id]),
status_code=204)
])
role = self.op_cloud.delete_role(role_data.role_name)
self.assertThat(role, matchers.Equals(True))
self.assert_calls()
def test_list_role_assignments(self):
domain_data = self._get_domain_data()
user_data = self._get_user_data(domain_id=domain_data.domain_id)
group_data = self._get_group_data(domain_id=domain_data.domain_id)
project_data = self._get_project_data(domain_id=domain_data.domain_id)
role_data = self._get_role_data()
response = [
{'links': 'https://example.com',
'role': {'id': role_data.role_id},
'scope': {'domain': {'id': domain_data.domain_id}},
'user': {'id': user_data.user_id}},
{'links': 'https://example.com',
'role': {'id': role_data.role_id},
'scope': {'project': {'id': project_data.project_id}},
'group': {'id': group_data.group_id}},
]
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
resource='role_assignments'),
status_code=200,
json={'role_assignments': response},
complete_qs=True)
])
ret = self.op_cloud.list_role_assignments()
self.assertThat(len(ret), matchers.Equals(2))
self.assertThat(ret[0].user, matchers.Equals(user_data.user_id))
self.assertThat(ret[0].id, matchers.Equals(role_data.role_id))
self.assertThat(ret[0].domain, matchers.Equals(domain_data.domain_id))
self.assertThat(ret[1].group, matchers.Equals(group_data.group_id))
self.assertThat(ret[1].id, matchers.Equals(role_data.role_id))
self.assertThat(ret[1].project,
matchers.Equals(project_data.project_id))
def test_list_role_assignments_filters(self):
domain_data = self._get_domain_data()
user_data = self._get_user_data(domain_id=domain_data.domain_id)
role_data = self._get_role_data()
response = [
{'links': 'https://example.com',
'role': {'id': role_data.role_id},
'scope': {'domain': {'id': domain_data.domain_id}},
'user': {'id': user_data.user_id}}
]
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
resource='role_assignments',
qs_elements=['scope.domain.id=%s' % domain_data.domain_id,
'user.id=%s' % user_data.user_id,
'effective=True']),
status_code=200,
json={'role_assignments': response},
complete_qs=True)
])
params = dict(user=user_data.user_id, domain=domain_data.domain_id,
effective=True)
ret = self.op_cloud.list_role_assignments(filters=params)
self.assertThat(len(ret), matchers.Equals(1))
self.assertThat(ret[0].user, matchers.Equals(user_data.user_id))
self.assertThat(ret[0].id, matchers.Equals(role_data.role_id))
self.assertThat(ret[0].domain, matchers.Equals(domain_data.domain_id))
def test_list_role_assignments_exception(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(resource='role_assignments'),
status_code=403)
])
with testtools.ExpectedException(
openstack.cloud.exc.OpenStackCloudHTTPError,
"Failed to list role assignments"
):
self.op_cloud.list_role_assignments()
self.assert_calls()
def test_list_role_assignments_keystone_v2(self):
self.use_keystone_v2()
role_data = self._get_role_data()
user_data = self._get_user_data()
project_data = self._get_project_data(v3=False)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
resource='tenants',
append=[project_data.project_id,
'users',
user_data.user_id,
'roles'],
base_url_append=None),
status_code=200,
json={'roles': [role_data.json_response['role']]})
])
ret = self.op_cloud.list_role_assignments(
filters={
'user': user_data.user_id,
'project': project_data.project_id})
self.assertThat(len(ret), matchers.Equals(1))
self.assertThat(ret[0].project,
matchers.Equals(project_data.project_id))
self.assertThat(ret[0].id, matchers.Equals(role_data.role_id))
self.assertThat(ret[0].user, matchers.Equals(user_data.user_id))
self.assert_calls()
def test_list_role_assignments_keystone_v2_with_role(self):
self.use_keystone_v2()
roles_data = [self._get_role_data() for r in range(0, 2)]
user_data = self._get_user_data()
project_data = self._get_project_data(v3=False)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
resource='tenants',
append=[project_data.project_id,
'users',
user_data.user_id,
'roles'],
base_url_append=None),
status_code=200,
json={'roles': [r.json_response['role'] for r in roles_data]})
])
ret = self.op_cloud.list_role_assignments(
filters={
'role': roles_data[0].role_id,
'user': user_data.user_id,
'project': project_data.project_id})
self.assertThat(len(ret), matchers.Equals(1))
self.assertThat(ret[0].project,
matchers.Equals(project_data.project_id))
self.assertThat(ret[0].id, matchers.Equals(roles_data[0].role_id))
self.assertThat(ret[0].user, matchers.Equals(user_data.user_id))
self.assert_calls()
def test_list_role_assignments_exception_v2(self):
self.use_keystone_v2()
with testtools.ExpectedException(
openstack.cloud.OpenStackCloudException,
"Must provide project and user for keystone v2"
):
self.op_cloud.list_role_assignments()
self.assert_calls()
def test_list_role_assignments_exception_v2_no_project(self):
self.use_keystone_v2()
with testtools.ExpectedException(
openstack.cloud.OpenStackCloudException,
"Must provide project and user for keystone v2"
):
self.op_cloud.list_role_assignments(filters={'user': '12345'})
self.assert_calls()