Remove Future Parameters (write, list, delete) from ACL Validation Schema

Updated the validation schema for ACLs.
  - Removed write, list, and delete operations since they are not yet supported.
  - Added line for no additional properties other than "users" and "creator_only"
  - Added unit tests covering all of these and other conditions.
  - Updated unit tests to limit to "read" ACLs
  - Removed unit tests covering cases with more than one operation

Change-Id: Ie8a3abd22d02bb4a7bbedad373efa7435f3bdb3b
Closes-Bug: #1447872
(cherry picked from commit 1fa86f707e)
This commit is contained in:
Dave McCowan 2015-04-24 15:32:48 -04:00 committed by Douglas Mendizábal
parent 2434a52f04
commit d0a5ac2c80
3 changed files with 149 additions and 380 deletions

View File

@ -636,14 +636,12 @@ class ACLValidator(ValidatorBase):
},
"creator-only": {"type": "boolean"}
},
"additionalProperties": False
}
},
"type": "object",
"properties": {
"read": {"$ref": "#/definitions/acl_defintion"},
"write": {"$ref": "#/definitions/acl_defintion"},
"delete": {"$ref": "#/definitions/acl_defintion"},
"list": {"$ref": "#/definitions/acl_defintion"}
},
"additionalProperties": False
}

View File

@ -31,39 +31,48 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
resp, acls = create_acls(
self.app, 'secrets', secret_uuid,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
self.assertEqual(resp.status_int, 201)
self.assertIsNotNone(acls)
self.assertTrue(2, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/secrets/{0}/acls'.format(secret_uuid),
acl_ref['acl_ref'])
acl_map = _get_acl_map(secret_uuid, is_secret=True)
# Check creator_only is False when not provided
self.assertFalse(acl_map['read']['creator_only'])
self.assertFalse(acl_map['list']['creator_only'])
def test_create_new_secret_acls_with_creator_only_values(self):
def test_create_new_secret_acls_with_creator_only_true(self):
"""Should allow creating acls for a new secret with creator-only."""
secret_uuid, _ = create_secret(self.app)
resp, acls = create_acls(
self.app, 'secrets', secret_uuid,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=True)
self.assertEqual(resp.status_int, 201)
self.assertIsNotNone(acls)
self.assertTrue(3, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/secrets/{0}/acls'.format(secret_uuid),
acl_ref['acl_ref'])
acl_map = _get_acl_map(secret_uuid, is_secret=True)
self.assertTrue(acl_map['read']['creator_only'])
def test_create_new_secret_acls_with_creator_only_false(self):
"""Should allow creating acls for a new secret with creator-only."""
secret_uuid, _ = create_secret(self.app)
resp, acls = create_acls(
self.app, 'secrets', secret_uuid,
read_creator_only=False)
self.assertEqual(resp.status_int, 201)
self.assertIsNotNone(acls)
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/secrets/{0}/acls'.format(secret_uuid),
acl_ref['acl_ref'])
acl_map = _get_acl_map(secret_uuid, is_secret=True)
self.assertFalse(acl_map['read']['creator_only'])
self.assertFalse(acl_map['list']['creator_only'])
self.assertTrue(acl_map['write']['creator_only'])
def test_new_secret_acls_with_invalid_creator_should_fail(self):
"""Should fail if creator-only flag is provided as string value."""
@ -119,17 +128,14 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=True)
resp = self.app.get(
'/secrets/{0}/acls'.format(secret_id),
expect_errors=False)
acls = resp.json
self.assertEqual(resp.status_int, 200)
self.assertTrue(3, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in acls:
self.assertIn('/secrets/{0}/acls'.format(secret_id),
acl_ref['acl_ref'])
@ -140,9 +146,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_user_ids=['u1', 'u3', 'u4'])
resp = self.app.get(
'/secrets/{0}/acls'.format(uuid.uuid4().hex),
@ -164,24 +168,21 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_uuid,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
resp, acls = update_acls(
self.app, 'secrets', secret_uuid,
read_user_ids=['u1', 'u2', 'u5'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2', 'u5'])
self.assertEqual(resp.status_int, 200)
self.assertIsNotNone(acls)
self.assertTrue(2, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/secrets/{0}/acls'.format(secret_uuid),
acl_ref['acl_ref'])
acl_map = _get_acl_map(secret_uuid, is_secret=True)
# Check creator_only is False when not provided
self.assertFalse(acl_map['read']['creator_only'])
self.assertFalse(acl_map['list']['creator_only'])
self.assertIn('u5', acl_map['read'].to_dict_fields()['users'])
def test_update_secret_acls_modify_creator_only_values(self):
@ -191,26 +192,21 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_uuid,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_creator_only=True)
resp, acls = update_acls(
self.app, 'secrets', secret_uuid,
read_creator_only=False,
list_user_ids=['u1', 'u3'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
self.assertEqual(resp.status_int, 200)
self.assertIsNotNone(acls)
self.assertTrue(3, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/secrets/{0}/acls'.format(secret_uuid),
acl_ref['acl_ref'])
acl_map = _get_acl_map(secret_uuid, is_secret=True)
self.assertFalse(acl_map['read']['creator_only'])
self.assertFalse(acl_map['list']['creator_only'])
self.assertTrue(acl_map['write']['creator_only'])
self.assertIn('u3', acl_map['list'].to_dict_fields()['users'])
self.assertNotIn('u4', acl_map['list'].to_dict_fields()['users'])
self.assertIn('u1', acl_map['read'].to_dict_fields()['users'])
self.assertIn('u2', acl_map['read'].to_dict_fields()['users'])
def test_update_secret_acls_partial_modify_read_users_only(self):
"""Acls update where specific operation acl is modified."""
@ -218,8 +214,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_uuid,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4', 'u4'])
read_user_ids=['u1', 'u2'])
resp, acls = update_acls(
self.app, 'secrets', secret_uuid,
@ -227,15 +222,13 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
self.assertEqual(resp.status_int, 200)
self.assertIsNotNone(acls)
self.assertTrue(2, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/secrets/{0}/acls'.format(secret_uuid),
acl_ref['acl_ref'])
acl_map = _get_acl_map(secret_uuid, is_secret=True)
list_users = acl_map['list'].to_dict_fields()['users']
self.assertEqual(set(['u1', 'u3', 'u4']), set(list_users))
# Check creator_only is False when not provided
self.assertFalse(acl_map['read']['creator_only'])
self.assertIn('u1', acl_map['read'].to_dict_fields()['users'])
self.assertIn('u3', acl_map['read'].to_dict_fields()['users'])
self.assertIn('u5', acl_map['read'].to_dict_fields()['users'])
self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users'])
@ -245,8 +238,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
resp, acls = update_acls(
self.app, 'secrets', uuid.uuid4().hex,
@ -271,10 +263,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=True)
resp = self.app.delete(
'/secrets/{0}/acls'.format(secret_id),
@ -290,10 +279,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase):
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = self.app.delete(
'/secrets/{0}/acls'.format(uuid.uuid4().hex),
@ -326,9 +312,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_user_ids=['u1', 'u3', 'u4'])
acl_map = _get_acl_map(secret_id, is_secret=True)
resp = self.app.get(
@ -339,15 +323,15 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
self.assertEqual(resp.status_int, 200)
self.assertEqual('read', acl['operation'])
self.assertFalse(acl['creator-only'])
self.assertIsNone(acl.get('users'))
self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users']))
resp = self.app.get(
'/secrets/{0}/acls/{1}'.format(secret_id,
acl_map['list']['id']),
acl_map['read']['id']),
expect_errors=False)
acl = resp.json
self.assertEqual(resp.status_int, 200)
self.assertEqual('list', acl['operation'])
self.assertEqual('read', acl['operation'])
self.assertFalse(acl['creator-only'])
self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users']))
@ -357,9 +341,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_user_ids=['u1', 'u3', 'u4'])
resp = self.app.get(
'/secrets/{0}/acls/{1}'.format(secret_id,
uuid.uuid4().hex),
@ -381,8 +363,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
acl_map = _get_acl_map(secret_id, is_secret=True)
acl_id = acl_map['read']['id']
@ -414,8 +395,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
acl_map = _get_acl_map(secret_id, is_secret=True)
acl_id = acl_map['read']['id']
@ -438,67 +418,6 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
self.assertEqual('read', acl['operation'])
self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users']))
def test_update_secret_acl_modify_only_related_operation(self):
"""Only modify the acl for matching operation and ignore others."""
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
acl_map = _get_acl_map(secret_id, is_secret=True)
acl_id = acl_map['read']['id']
# updating read, list operation and adding write operation acl
# Update should be for 'read' operation ACL only. Others are ignored.
resp = update_acl(
self.app, 'secrets', secret_id, acl_id,
read_user_ids=['u1', 'u2', 'u5'], read_creator_only=True,
list_user_ids=['u1', 'u3'], list_creator_only=True,
write_creator_only=True)
self.assertEqual(resp.status_int, 200)
resp = self.app.get(
'/secrets/{0}/acls/{1}'.format(secret_id, acl_id),
expect_errors=False)
acl = resp.json
self.assertIsNotNone(acl)
self.assertIn('/secrets/{0}/acls/{1}'.format(secret_id, acl_id),
acl['acl_ref'])
self.assertTrue(acl['creator-only']) # read operation value is changed
self.assertEqual('read', acl['operation'])
self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users']))
acl_map = _get_acl_map(secret_id, is_secret=True)
# list, write operation should not be changed
self.assertIsNone(acl_map.get('write'))
self.assertFalse(acl_map['list']['creator_only'])
list_users = acl_map['list'].to_dict_fields()['users']
self.assertEqual(set(['u1', 'u3', 'u4']), set(list_users))
def test_update_secret_acl_modify_different_operation_should_fail(self):
"""Should fail as modifying existing acl's operation is not allowed."""
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
acl_map = _get_acl_map(secret_id, is_secret=True)
acl_id = acl_map['read']['id']
# updating with list ACL should fail as originally read operation
# ACL is associated with acl_id and cannot be modified.
resp = update_acl(
self.app, 'secrets', secret_id, acl_id,
list_user_ids=['u1', 'u3'], list_creator_only=True,
expect_errors=True)
self.assertEqual(resp.status_int, 400)
def test_update_secret_acl_modify_only_users(self):
"""Modifying existing acl's user list and creator-only flag."""
secret_id, _ = create_secret(self.app)
@ -506,8 +425,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_user_ids=['u1', 'u2'],
read_creator_only=True,
list_user_ids=['u1', 'u3', 'u4'])
read_creator_only=True)
acl_map = _get_acl_map(secret_id, is_secret=True)
acl_id = acl_map['read']['id']
@ -542,44 +460,12 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
self.assertIsNone(acl.get('users'))
self.assertTrue(acl['creator-only'])
def test_update_secret_acl_modify_creator_only(self):
"""Modifying only creator_only flag for existing acl by its id."""
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_user_ids=['u1', 'u2'],
read_creator_only=True,
list_user_ids=['u1', 'u3', 'u4'])
acl_map = _get_acl_map(secret_id, is_secret=True)
acl_id = acl_map['read']['id']
# updating read, list operation and adding write operation acl
# Update should be for 'read' operation ACL only. Others are ignored.
resp = update_acl(
self.app, 'secrets', secret_id, acl_id,
read_creator_only=False)
self.assertEqual(resp.status_int, 200)
resp = self.app.get(
'/secrets/{0}/acls/{1}'.format(secret_id, acl_id),
expect_errors=False)
acl = resp.json
self.assertIsNotNone(acl)
self.assertEqual(set(['u1', 'u2']), set(acl['users']))
self.assertFalse(acl['creator-only'])
def test_update_secret_acl_invalid_acl_should_fail(self):
"""Update should fail when invalid acl id is provided."""
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = update_acl(
self.app, 'secrets', secret_id, uuid.uuid4().hex,
@ -601,33 +487,27 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase):
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
acl_map = _get_acl_map(secret_id, is_secret=True)
list_acl_id = acl_map['list'].id
read_acl_id = acl_map['read'].id
resp = self.app.delete(
'/secrets/{0}/acls/{1}'.format(secret_id, list_acl_id),
'/secrets/{0}/acls/{1}'.format(secret_id, read_acl_id),
expect_errors=False)
content = resp.json
self.assertIsNone(content) # make sure there is no response
self.assertEqual(resp.status_int, 200)
acl_map = _get_acl_map(secret_id, is_secret=True)
self.assertIsNone(acl_map.get('list')) # list acl should be deleted
self.assertIsNone(acl_map.get('read')) # read acl should be deleted
def test_delete_secret_acls_invalid_secret_should_fail(self):
"""Delete acls should fail when invalid secret id is provided."""
secret_id, _ = create_secret(self.app)
_, _ = create_acls(
self.app, 'secrets', secret_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = self.app.delete(
'/secrets/{0}/acls/{1}'.format(secret_id, uuid.uuid4().hex),
@ -651,39 +531,50 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase):
resp, acls = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
self.assertEqual(resp.status_int, 201)
self.assertIsNotNone(acls)
self.assertTrue(2, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/containers/{0}/acls'.format(container_id),
acl_ref['acl_ref'])
acl_map = _get_acl_map(container_id, is_secret=False)
# Check creator_only is False when not provided
self.assertFalse(acl_map['read']['creator_only'])
self.assertFalse(acl_map['list']['creator_only'])
def test_create_new_container_acls_with_creator_only_values(self):
def test_create_new_container_acls_with_creator_only_false(self):
"""Should allow creating acls for a new container with creator-only."""
container_id, _ = create_container(self.app)
resp, acls = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_user_ids=['u1', 'u3', 'u4'])
self.assertEqual(resp.status_int, 201)
self.assertIsNotNone(acls)
self.assertTrue(3, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/containers/{0}/acls'.format(container_id),
acl_ref['acl_ref'])
acl_map = _get_acl_map(container_id, is_secret=False)
self.assertFalse(acl_map['read']['creator_only'])
self.assertFalse(acl_map['list']['creator_only'])
self.assertTrue(acl_map['write']['creator_only'])
def test_create_new_container_acls_with_creator_only_true(self):
"""Should allow creating acls for a new container with creator-only."""
container_id, _ = create_container(self.app)
resp, acls = create_acls(
self.app, 'containers', container_id,
read_creator_only=True,
read_user_ids=['u1', 'u3', 'u4'])
self.assertEqual(resp.status_int, 201)
self.assertIsNotNone(acls)
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/containers/{0}/acls'.format(container_id),
acl_ref['acl_ref'])
acl_map = _get_acl_map(container_id, is_secret=False)
self.assertTrue(acl_map['read']['creator_only'])
def test_new_container_acls_with_invalid_creator_should_fail(self):
"""Should fail if creator-only flag is provided as string value."""
@ -739,17 +630,14 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase):
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = self.app.get(
'/containers/{0}/acls'.format(container_id),
expect_errors=False)
acls = resp.json
self.assertEqual(resp.status_int, 200)
self.assertTrue(3, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in acls:
self.assertIn('/containers/{0}/acls'.format(container_id),
acl_ref['acl_ref'])
@ -759,10 +647,7 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase):
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = self.app.get(
'/containers/{0}/acls'.format(uuid.uuid4().hex),
@ -784,24 +669,21 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
resp, acls = update_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2', 'u5'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2', 'u5'])
self.assertEqual(resp.status_int, 200)
self.assertIsNotNone(acls)
self.assertTrue(2, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/containers/{0}/acls'.format(container_id),
acl_ref['acl_ref'])
acl_map = _get_acl_map(container_id, is_secret=False)
# Check creator_only is False when not provided
self.assertFalse(acl_map['read']['creator_only'])
self.assertFalse(acl_map['list']['creator_only'])
self.assertIn('u5', acl_map['read'].to_dict_fields()['users'])
def test_update_container_acls_modify_creator_only_values(self):
@ -810,61 +692,26 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
resp, acls = update_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=True)
self.assertEqual(resp.status_int, 200)
self.assertIsNotNone(acls)
self.assertTrue(3, len(acls))
self.assertEqual(1, len(acls))
for acl_ref in resp.json:
self.assertIn('/containers/{0}/acls'.format(container_id),
acl_ref['acl_ref'])
acl_map = _get_acl_map(container_id, is_secret=False)
self.assertFalse(acl_map['read']['creator_only'])
self.assertFalse(acl_map['list']['creator_only'])
self.assertTrue(acl_map['write']['creator_only'])
self.assertIn('u3', acl_map['list'].to_dict_fields()['users'])
self.assertNotIn('u4', acl_map['list'].to_dict_fields()['users'])
def test_update_container_acls_partial_modify_read_users_only(self):
"""Acls update where specific operation acl is modified."""
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
resp, acls = update_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u3', 'u5'])
self.assertEqual(resp.status_int, 200)
self.assertIsNotNone(acls)
self.assertTrue(2, len(acls))
for acl_ref in resp.json:
self.assertIn('/containers/{0}/acls'.format(container_id),
acl_ref['acl_ref'])
acl_map = _get_acl_map(container_id, is_secret=False)
# Check creator_only is False when not provided
self.assertFalse(acl_map['read']['creator_only'])
self.assertIn('u3', acl_map['read'].to_dict_fields()['users'])
self.assertIn('u5', acl_map['read'].to_dict_fields()['users'])
self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users'])
self.assertTrue(acl_map['read']['creator_only'])
def test_update_container_acls_invalid_secret_should_fail(self):
"""Acls update should fail when invalid container is provided."""
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
resp, acls = update_acls(
self.app, 'containers', uuid.uuid4().hex,
@ -889,10 +736,7 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase):
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = self.app.delete(
'/containers/{0}/acls'.format(container_id),
@ -908,10 +752,7 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase):
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = self.app.delete(
'/containers/{0}/acls'.format(uuid.uuid4().hex),
@ -942,10 +783,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
acl_map = _get_acl_map(container_id, is_secret=False)
resp = self.app.get(
@ -958,25 +796,12 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
self.assertFalse(acl['creator-only'])
self.assertIsNone(acl.get('users'))
resp = self.app.get(
'/containers/{0}/acls/{1}'.format(container_id,
acl_map['list']['id']),
expect_errors=False)
acl = resp.json
self.assertEqual(resp.status_int, 200)
self.assertEqual('list', acl['operation'])
self.assertFalse(acl['creator-only'])
self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users']))
def test_get_container_acl_invalid_acl_should_fail(self):
"""Get acl request should fail with invalid acl id."""
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = self.app.get(
'/containers/{0}/acls/{1}'.format(container_id,
uuid.uuid4().hex),
@ -998,8 +823,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
acl_map = _get_acl_map(container_id, is_secret=False)
acl_id = acl_map['read']['id']
@ -1029,8 +853,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
read_user_ids=['u1', 'u2'])
acl_map = _get_acl_map(container_id, is_secret=False)
acl_id = acl_map['read']['id']
@ -1054,68 +877,6 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
self.assertEqual('read', acl['operation'])
self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users']))
def test_update_container_acl_modify_only_related_operation(self):
"""Only modify the acl for matching operation and ignore others."""
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
acl_map = _get_acl_map(container_id, is_secret=False)
acl_id = acl_map['read']['id']
# updating read, list operation and adding write operation acl
# Update should be for 'read' operation ACL only. Others are ignored.
resp = update_acl(
self.app, 'containers', container_id, acl_id,
read_user_ids=['u1', 'u2', 'u5'], read_creator_only=True,
list_user_ids=['u1', 'u3'], list_creator_only=True,
write_creator_only=True)
self.assertEqual(resp.status_int, 200)
resp = self.app.get(
'/containers/{0}/acls/{1}'.format(container_id,
acl_id),
expect_errors=False)
acl = resp.json
self.assertIsNotNone(acl)
self.assertIn('/containers/{0}/acls/{1}'.format(container_id, acl_id),
acl['acl_ref'])
self.assertTrue(acl['creator-only']) # read operation value is changed
self.assertEqual('read', acl['operation'])
self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users']))
acl_map = _get_acl_map(container_id, is_secret=False)
# list, write operation should not be changed
self.assertIsNone(acl_map.get('write'))
self.assertFalse(acl_map['list']['creator_only'])
list_users = acl_map['list'].to_dict_fields()['users']
self.assertEqual(set(['u1', 'u3', 'u4']), set(list_users))
def test_update_container_acl_modify_different_operation_should_fail(self):
"""Should fail as modifying existing acl's operation is not allowed."""
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
list_user_ids=['u1', 'u3', 'u4'])
acl_map = _get_acl_map(container_id, is_secret=False)
acl_id = acl_map['read']['id']
# updating with list ACL should fail as originally read operation
# ACL is associated with acl_id and cannot be modified.
resp = update_acl(
self.app, 'containers', container_id, acl_id,
list_user_ids=['u1', 'u3'], list_creator_only=True,
expect_errors=True)
self.assertEqual(resp.status_int, 400)
def test_update_container_acl_modify_only_users(self):
"""Modifying existing acl's user list and creator-only flag."""
container_id, _ = create_container(self.app)
@ -1123,8 +884,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
read_creator_only=True,
list_user_ids=['u1', 'u3', 'u4'])
read_creator_only=True)
acl_map = _get_acl_map(container_id, is_secret=False)
acl_id = acl_map['read']['id']
@ -1167,8 +927,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
_, _ = create_acls(
self.app, 'containers', container_id,
read_user_ids=['u1', 'u2'],
read_creator_only=True,
list_user_ids=['u1', 'u3', 'u4'])
read_creator_only=True)
acl_map = _get_acl_map(container_id, is_secret=False)
acl_id = acl_map['read']['id']
@ -1195,10 +954,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = update_acl(
self.app, 'containers', container_id, uuid.uuid4().hex,
@ -1220,33 +976,27 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase):
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
acl_map = _get_acl_map(container_id, is_secret=False)
list_acl_id = acl_map['list'].id
read_acl_id = acl_map['read'].id
resp = self.app.delete(
'/containers/{0}/acls/{1}'.format(container_id, list_acl_id),
'/containers/{0}/acls/{1}'.format(container_id, read_acl_id),
expect_errors=False)
content = resp.json
self.assertIsNone(content) # make sure there is no response
self.assertEqual(resp.status_int, 200)
acl_map = _get_acl_map(container_id, is_secret=False)
self.assertIsNone(acl_map.get('list')) # list acl should be deleted
self.assertIsNone(acl_map.get('read')) # read acl should be deleted
def test_delete_secret_acls_invalid_secret_should_fail(self):
"""Delete acls should fail when invalid secret id is provided."""
container_id, _ = create_container(self.app)
_, _ = create_acls(
self.app, 'containers', container_id,
read_creator_only=False,
list_user_ids=['u1', 'u3', 'u4'],
list_creator_only=None,
write_creator_only=True)
read_creator_only=False)
resp = self.app.delete(
'/containers/{0}/acls/{1}'.format(container_id, uuid.uuid4().hex),
@ -1323,48 +1073,32 @@ def create_container(app):
def create_acls(app, entity_type, entity_id, read_user_ids=None,
write_user_ids=None, list_user_ids=None,
read_creator_only=None, write_creator_only=None,
list_creator_only=None,
read_creator_only=None,
expect_errors=False):
return manage_acls(app, entity_type, entity_id,
read_user_ids=read_user_ids,
write_user_ids=write_user_ids,
list_user_ids=list_user_ids,
read_creator_only=read_creator_only,
write_creator_only=write_creator_only,
list_creator_only=list_creator_only, is_update=False,
is_update=False,
expect_errors=expect_errors)
def update_acls(app, entity_type, entity_id, read_user_ids=None,
write_user_ids=None, list_user_ids=None,
read_creator_only=None, write_creator_only=None,
list_creator_only=None,
read_creator_only=None,
expect_errors=False):
return manage_acls(app, entity_type, entity_id,
read_user_ids=read_user_ids,
write_user_ids=write_user_ids,
list_user_ids=list_user_ids,
read_creator_only=read_creator_only,
write_creator_only=write_creator_only,
list_creator_only=list_creator_only, is_update=True,
is_update=True,
expect_errors=expect_errors)
def manage_acls(app, entity_type, entity_id, read_user_ids=None,
write_user_ids=None, list_user_ids=None,
read_creator_only=None, write_creator_only=None,
list_creator_only=None, is_update=False,
read_creator_only=None, is_update=False,
expect_errors=False):
request = {}
_append_acl_to_request(request, 'read', read_user_ids,
read_creator_only)
_append_acl_to_request(request, 'write', write_user_ids,
write_creator_only)
_append_acl_to_request(request, 'list', list_user_ids,
list_creator_only)
cleaned_request = {key: val for key, val in request.items()
if val is not None}
@ -1390,17 +1124,11 @@ def manage_acls(app, entity_type, entity_id, read_user_ids=None,
def update_acl(app, entity_type, entity_id, acl_id, read_user_ids=None,
write_user_ids=None, list_user_ids=None,
read_creator_only=None, write_creator_only=None,
list_creator_only=None, expect_errors=False):
read_creator_only=None, expect_errors=False):
request = {}
_append_acl_to_request(request, 'read', read_user_ids,
read_creator_only)
_append_acl_to_request(request, 'write', write_user_ids,
write_creator_only)
_append_acl_to_request(request, 'list', list_user_ids,
list_creator_only)
cleaned_request = {key: val for key, val in request.items()
if val is not None}

View File

@ -1299,5 +1299,48 @@ class WhenTestingStoredKeyOrderValidator(utils.BaseTestCase):
self.order_req)
@utils.parameterized_test_case
class WhenTestingAclValidator(utils.BaseTestCase):
def setUp(self):
super(WhenTestingAclValidator, self).setUp()
self.validator = validators.ACLValidator()
@utils.parameterized_dataset({
'one_reader': [{'read': {'users': ['reader'], 'creator-only': False}}],
'two_reader': [{'read': {'users': ['r1', 'r2'],
'creator-only': False}}],
'private': [{'read': {'users': [], 'creator-only': True}}],
'default_users': [{'read': {'creator-only': True}}],
'default_creator': [{'read': {'users': ['reader']}}],
'almost_empty': [{'read': {}}],
'empty': [{}],
})
def test_should_validate(self, acl_req):
self.validator.validate(acl_req)
@utils.parameterized_dataset({
'foo': ['foo'],
'bad_op': [{'bad_op': {'users': ['reader'], 'creator-only': False}}],
'bad_field': [{'read': {'bad_field': ['reader'],
'creator-only': False}}],
'bad_user': [{'read': {'users': [27], 'creator-only': False}}],
'missing_op': [{'creator-only': True}],
})
def test_should_raise(self, acl_req):
self.assertRaises(excep.InvalidObject,
self.validator.validate,
acl_req)
@utils.parameterized_dataset({
'write': [{'write': {'users': ['writer'], 'creator-only': False}}],
'list': [{'list': {'users': ['lister'], 'creator-only': False}}],
'delete': [{'delete': {'users': ['deleter'], 'creator-only': False}}],
})
def test_should_raise_future(self, acl_req):
self.assertRaises(excep.InvalidObject,
self.validator.validate,
acl_req)
if __name__ == '__main__':
unittest.main()