Implement project personas for metadef objects
This commit updates the policies for metadef objects to use default roles available from keystone. Implements: blueprint secure-rbac Change-Id: If2001ef652e21f54048db0fc67893337681a2b6f
This commit is contained in:
parent
9d9d7abe21
commit
035e714f83
|
@ -85,16 +85,69 @@ metadef_policies = [
|
|||
],
|
||||
),
|
||||
|
||||
policy.RuleDefault(name="get_metadef_object",
|
||||
check_str="rule:metadef_default"),
|
||||
policy.RuleDefault(name="get_metadef_objects",
|
||||
check_str="rule:metadef_default"),
|
||||
policy.RuleDefault(name="modify_metadef_object",
|
||||
check_str="rule:metadef_admin"),
|
||||
policy.RuleDefault(name="add_metadef_object",
|
||||
check_str="rule:metadef_admin"),
|
||||
policy.RuleDefault(name="delete_metadef_object",
|
||||
check_str="rule:metadef_admin"),
|
||||
policy.DocumentedRuleDefault(
|
||||
name="get_metadef_object",
|
||||
check_str=base.ADMIN_OR_PROJECT_READER_GET_NAMESPACE,
|
||||
scope_types=['system', 'project'],
|
||||
description="Get a specific object from a namespace.",
|
||||
operations=[
|
||||
{'path': '/v2/metadefs/namespaces/{namespace_name}/objects'
|
||||
'/{object_name}',
|
||||
'method': 'GET'}
|
||||
],
|
||||
deprecated_rule=policy.DeprecatedRule(
|
||||
name="get_metadef_object", check_str="rule:metadef_default",
|
||||
deprecated_reason=DEPRECATED_REASON,
|
||||
deprecated_since=versionutils.deprecated.XENA
|
||||
),
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name="get_metadef_objects",
|
||||
check_str=base.ADMIN_OR_PROJECT_READER_GET_NAMESPACE,
|
||||
scope_types=['system', 'project'],
|
||||
description="Get objects from a namespace.",
|
||||
operations=[
|
||||
{'path': '/v2/metadefs/namespaces/{namespace_name}/objects',
|
||||
'method': 'GET'}
|
||||
],
|
||||
deprecated_rule=policy.DeprecatedRule(
|
||||
name="get_metadef_objects", check_str="rule:metadef_default",
|
||||
deprecated_reason=DEPRECATED_REASON,
|
||||
deprecated_since=versionutils.deprecated.XENA
|
||||
),
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name="modify_metadef_object",
|
||||
check_str="rule:metadef_admin",
|
||||
scope_types=['system', 'project'],
|
||||
description="Update an object within a namespace.",
|
||||
operations=[
|
||||
{'path': '/v2/metadefs/namespaces/{namespace_name}/objects'
|
||||
'/{object_name}',
|
||||
'method': 'PUT'}
|
||||
],
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name="add_metadef_object",
|
||||
check_str="rule:metadef_admin",
|
||||
scope_types=['system', 'project'],
|
||||
description="Create an object within a namespace.",
|
||||
operations=[
|
||||
{'path': '/v2/metadefs/namespaces/{namespace_name}/objects',
|
||||
'method': 'POST'}
|
||||
],
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name="delete_metadef_object",
|
||||
check_str="rule:metadef_admin",
|
||||
scope_types=['system', 'project'],
|
||||
description="Delete an object within a namespace.",
|
||||
operations=[
|
||||
{'path': '/v2/metadefs/namespaces/{namespace_name}/objects'
|
||||
'/{object_name}',
|
||||
'method': 'DELETE'}
|
||||
],
|
||||
),
|
||||
|
||||
policy.RuleDefault(name="list_metadef_resource_types",
|
||||
check_str="rule:metadef_default"),
|
||||
|
|
|
@ -13,15 +13,15 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils.fixture import uuidsentinel as uuids
|
||||
import requests
|
||||
from six.moves import http_client as http
|
||||
|
||||
from glance.tests import functional
|
||||
|
||||
TENANT1 = str(uuid.uuid4())
|
||||
TENANT1 = uuids.owner1
|
||||
TENANT2 = uuids.owner2
|
||||
|
||||
|
||||
class TestMetadefObjects(functional.FunctionalTest):
|
||||
|
@ -271,3 +271,180 @@ class TestMetadefObjects(functional.FunctionalTest):
|
|||
(namespace_name, metadata_object_name))
|
||||
response = requests.get(path, headers=self._headers())
|
||||
self.assertEqual(http.NOT_FOUND, response.status_code)
|
||||
|
||||
def _create_namespace(self, path, headers, data):
|
||||
response = requests.post(path, headers=headers, json=data)
|
||||
self.assertEqual(http.CREATED, response.status_code)
|
||||
|
||||
return response.json()
|
||||
|
||||
def _create_object(self, namespaces):
|
||||
objects = []
|
||||
for namespace in namespaces:
|
||||
headers = self._headers({'X-Tenant-Id': namespace['owner']})
|
||||
data = {
|
||||
"name": "object_of_%s" % (namespace['namespace']),
|
||||
"description": "object description.",
|
||||
"required": [
|
||||
"property1"
|
||||
],
|
||||
"properties": {
|
||||
"property1": {
|
||||
"type": "integer",
|
||||
"title": "property1",
|
||||
"description": "property1 description",
|
||||
},
|
||||
}
|
||||
}
|
||||
path = self._url('/v2/metadefs/namespaces/%s/objects' %
|
||||
namespace['namespace'])
|
||||
response = requests.post(path, headers=headers, json=data)
|
||||
self.assertEqual(http.CREATED, response.status_code)
|
||||
obj_metadata = response.json()
|
||||
metadef_objects = dict()
|
||||
metadef_objects[namespace['namespace']] = obj_metadata['name']
|
||||
objects.append(metadef_objects)
|
||||
|
||||
return objects
|
||||
|
||||
def _update_object(self, path, headers, data, namespace):
|
||||
response = requests.put(path, headers=headers, json=data)
|
||||
self.assertEqual(http.OK, response.status_code, response.text)
|
||||
|
||||
expected_object = {
|
||||
'description': data['description'],
|
||||
'name': data['name'],
|
||||
'properties': data['properties'],
|
||||
'required': data['required'],
|
||||
'schema': '/v2/schemas/metadefs/object',
|
||||
'self': '/v2/metadefs/namespaces/%s/objects/%s' % (namespace,
|
||||
data['name'])
|
||||
}
|
||||
# Returned metadata_object should reflect the changes
|
||||
metadata_object = response.json()
|
||||
metadata_object.pop('created_at')
|
||||
metadata_object.pop('updated_at')
|
||||
self.assertEqual(metadata_object, expected_object)
|
||||
|
||||
# Updates should persist across requests
|
||||
response = requests.get(path, headers=self._headers())
|
||||
metadata_object = response.json()
|
||||
metadata_object.pop('created_at')
|
||||
metadata_object.pop('updated_at')
|
||||
self.assertEqual(metadata_object, expected_object)
|
||||
|
||||
def test_role_base_metadata_objects_lifecycle(self):
|
||||
# Create public and private namespaces for tenant1 and tenant2
|
||||
path = self._url('/v2/metadefs/namespaces')
|
||||
headers = self._headers({'content-type': 'application/json'})
|
||||
tenant1_namespaces = []
|
||||
tenant2_namespaces = []
|
||||
for tenant in [TENANT1, TENANT2]:
|
||||
headers['X-Tenant-Id'] = tenant
|
||||
for visibility in ['public', 'private']:
|
||||
data = {
|
||||
"namespace": "%s_%s_namespace" % (tenant, visibility),
|
||||
"display_name": "My User Friendly Namespace",
|
||||
"description": "My description",
|
||||
"visibility": visibility,
|
||||
"owner": tenant
|
||||
}
|
||||
namespace = self._create_namespace(path, headers, data)
|
||||
if tenant == TENANT1:
|
||||
tenant1_namespaces.append(namespace)
|
||||
else:
|
||||
tenant2_namespaces.append(namespace)
|
||||
|
||||
# Create a metadef object for each namespace created above
|
||||
tenant1_objects = self._create_object(tenant1_namespaces)
|
||||
tenant2_objects = self._create_object(tenant2_namespaces)
|
||||
|
||||
def _check_object_access(objects, tenant):
|
||||
headers = self._headers({'content-type': 'application/json',
|
||||
'X-Tenant-Id': tenant,
|
||||
'X-Roles': 'reader,member'})
|
||||
for obj in objects:
|
||||
for namespace, object_name in obj.items():
|
||||
path = self._url('/v2/metadefs/namespaces/%s/objects/%s' %
|
||||
(namespace, object_name))
|
||||
headers = headers
|
||||
response = requests.get(path, headers=headers)
|
||||
if namespace.split('_')[1] == 'public':
|
||||
expected = http.OK
|
||||
else:
|
||||
expected = http.NOT_FOUND
|
||||
|
||||
self.assertEqual(expected, response.status_code)
|
||||
|
||||
path = self._url(
|
||||
'/v2/metadefs/namespaces/%s/objects' % namespace)
|
||||
response = requests.get(path, headers=headers)
|
||||
self.assertEqual(expected, response.status_code)
|
||||
if expected == http.OK:
|
||||
resp_objs = response.json()['objects']
|
||||
self.assertEqual(
|
||||
sorted(obj.values()),
|
||||
sorted([x['name'] for x in resp_objs]))
|
||||
|
||||
# Check Tenant 1 can access objects of all public namespace
|
||||
# and cannot access object of private namespace of Tenant 2
|
||||
_check_object_access(tenant2_objects, TENANT1)
|
||||
|
||||
# Check Tenant 2 can access objects of public namespace and
|
||||
# cannot access objects of private namespace of Tenant 1
|
||||
_check_object_access(tenant1_objects, TENANT2)
|
||||
|
||||
# Update objects with admin and non admin role
|
||||
total_objects = tenant1_objects + tenant2_objects
|
||||
for obj in total_objects:
|
||||
for namespace, object_name in obj.items():
|
||||
data = {
|
||||
"name": object_name,
|
||||
"description": "desc-UPDATED",
|
||||
"required": [
|
||||
"property1"
|
||||
],
|
||||
"properties": {
|
||||
'property1': {
|
||||
'type': 'integer',
|
||||
"title": "property1",
|
||||
'description': 'p1 desc-UPDATED',
|
||||
}
|
||||
}
|
||||
}
|
||||
# Update object should fail with non admin role
|
||||
path = self._url('/v2/metadefs/namespaces/%s/objects/%s' %
|
||||
(namespace, object_name))
|
||||
headers['X-Roles'] = "reader,member"
|
||||
response = requests.put(path, headers=headers, json=data)
|
||||
self.assertEqual(http.FORBIDDEN, response.status_code)
|
||||
|
||||
# Should work with admin role
|
||||
headers = self._headers({
|
||||
'X-Tenant-Id': namespace.split('_')[0]})
|
||||
self._update_object(path, headers, data, namespace)
|
||||
|
||||
# Delete object should not be allowed to non admin role
|
||||
for obj in total_objects:
|
||||
for namespace, object_name in obj.items():
|
||||
path = self._url('/v2/metadefs/namespaces/%s/objects/%s' %
|
||||
(namespace, object_name))
|
||||
response = requests.delete(
|
||||
path, headers=self._headers({
|
||||
'X-Roles': 'reader,member',
|
||||
'X-Tenant-Id': namespace.split('_')[0]
|
||||
}))
|
||||
self.assertEqual(http.FORBIDDEN, response.status_code)
|
||||
|
||||
# Delete all metadef objects
|
||||
headers = self._headers()
|
||||
for obj in total_objects:
|
||||
for namespace, object_name in obj.items():
|
||||
path = self._url('/v2/metadefs/namespaces/%s/objects/%s' %
|
||||
(namespace, object_name))
|
||||
response = requests.delete(path, headers=headers)
|
||||
self.assertEqual(http.NO_CONTENT, response.status_code)
|
||||
|
||||
# Deleted objects should not be exist
|
||||
response = requests.get(path, headers=headers)
|
||||
self.assertEqual(http.NOT_FOUND, response.status_code)
|
||||
|
|
Loading…
Reference in New Issue