diff --git a/barbican/common/validators.py b/barbican/common/validators.py index 88605047b..0dd1d47ae 100644 --- a/barbican/common/validators.py +++ b/barbican/common/validators.py @@ -636,14 +636,12 @@ class ACLValidator(ValidatorBase): }, "creator-only": {"type": "boolean"} }, + "additionalProperties": False } }, "type": "object", "properties": { "read": {"$ref": "#/definitions/acl_defintion"}, - "write": {"$ref": "#/definitions/acl_defintion"}, - "delete": {"$ref": "#/definitions/acl_defintion"}, - "list": {"$ref": "#/definitions/acl_defintion"} }, "additionalProperties": False } diff --git a/barbican/tests/api/controllers/test_acls.py b/barbican/tests/api/controllers/test_acls.py index de0f00c4a..d93264507 100644 --- a/barbican/tests/api/controllers/test_acls.py +++ b/barbican/tests/api/controllers/test_acls.py @@ -31,39 +31,48 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): resp, acls = create_acls( self.app, 'secrets', secret_uuid, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) self.assertEqual(resp.status_int, 201) self.assertIsNotNone(acls) - self.assertTrue(2, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/secrets/{0}/acls'.format(secret_uuid), acl_ref['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) - self.assertFalse(acl_map['list']['creator_only']) - def test_create_new_secret_acls_with_creator_only_values(self): + def test_create_new_secret_acls_with_creator_only_true(self): """Should allow creating acls for a new secret with creator-only.""" secret_uuid, _ = create_secret(self.app) resp, acls = create_acls( self.app, 'secrets', secret_uuid, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=True) self.assertEqual(resp.status_int, 201) self.assertIsNotNone(acls) - self.assertTrue(3, len(acls)) + self.assertEqual(1, len(acls)) + for acl_ref in resp.json: + self.assertIn('/secrets/{0}/acls'.format(secret_uuid), + acl_ref['acl_ref']) + acl_map = _get_acl_map(secret_uuid, is_secret=True) + self.assertTrue(acl_map['read']['creator_only']) + + def test_create_new_secret_acls_with_creator_only_false(self): + """Should allow creating acls for a new secret with creator-only.""" + secret_uuid, _ = create_secret(self.app) + + resp, acls = create_acls( + self.app, 'secrets', secret_uuid, + read_creator_only=False) + self.assertEqual(resp.status_int, 201) + self.assertIsNotNone(acls) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/secrets/{0}/acls'.format(secret_uuid), acl_ref['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) self.assertFalse(acl_map['read']['creator_only']) - self.assertFalse(acl_map['list']['creator_only']) - self.assertTrue(acl_map['write']['creator_only']) def test_new_secret_acls_with_invalid_creator_should_fail(self): """Should fail if creator-only flag is provided as string value.""" @@ -119,17 +128,14 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): secret_id, _ = create_secret(self.app) _, _ = create_acls( self.app, 'secrets', secret_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=True) resp = self.app.get( '/secrets/{0}/acls'.format(secret_id), expect_errors=False) acls = resp.json self.assertEqual(resp.status_int, 200) - self.assertTrue(3, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in acls: self.assertIn('/secrets/{0}/acls'.format(secret_id), acl_ref['acl_ref']) @@ -140,9 +146,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_id, read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_user_ids=['u1', 'u3', 'u4']) resp = self.app.get( '/secrets/{0}/acls'.format(uuid.uuid4().hex), @@ -164,24 +168,21 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_uuid, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) resp, acls = update_acls( self.app, 'secrets', secret_uuid, - read_user_ids=['u1', 'u2', 'u5'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2', 'u5']) self.assertEqual(resp.status_int, 200) self.assertIsNotNone(acls) - self.assertTrue(2, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/secrets/{0}/acls'.format(secret_uuid), acl_ref['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) - self.assertFalse(acl_map['list']['creator_only']) self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) def test_update_secret_acls_modify_creator_only_values(self): @@ -191,26 +192,21 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_uuid, read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_creator_only=True) resp, acls = update_acls( self.app, 'secrets', secret_uuid, - read_creator_only=False, - list_user_ids=['u1', 'u3'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) self.assertEqual(resp.status_int, 200) self.assertIsNotNone(acls) - self.assertTrue(3, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/secrets/{0}/acls'.format(secret_uuid), acl_ref['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) self.assertFalse(acl_map['read']['creator_only']) - self.assertFalse(acl_map['list']['creator_only']) - self.assertTrue(acl_map['write']['creator_only']) - self.assertIn('u3', acl_map['list'].to_dict_fields()['users']) - self.assertNotIn('u4', acl_map['list'].to_dict_fields()['users']) + self.assertIn('u1', acl_map['read'].to_dict_fields()['users']) + self.assertIn('u2', acl_map['read'].to_dict_fields()['users']) def test_update_secret_acls_partial_modify_read_users_only(self): """Acls update where specific operation acl is modified.""" @@ -218,8 +214,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_uuid, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4', 'u4']) + read_user_ids=['u1', 'u2']) resp, acls = update_acls( self.app, 'secrets', secret_uuid, @@ -227,15 +222,13 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): self.assertEqual(resp.status_int, 200) self.assertIsNotNone(acls) - self.assertTrue(2, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/secrets/{0}/acls'.format(secret_uuid), acl_ref['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) - list_users = acl_map['list'].to_dict_fields()['users'] - self.assertEqual(set(['u1', 'u3', 'u4']), set(list_users)) - # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) + self.assertIn('u1', acl_map['read'].to_dict_fields()['users']) self.assertIn('u3', acl_map['read'].to_dict_fields()['users']) self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) @@ -245,8 +238,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): secret_id, _ = create_secret(self.app) _, _ = create_acls( self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) resp, acls = update_acls( self.app, 'secrets', uuid.uuid4().hex, @@ -271,10 +263,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): secret_id, _ = create_secret(self.app) _, _ = create_acls( self.app, 'secrets', secret_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=True) resp = self.app.delete( '/secrets/{0}/acls'.format(secret_id), @@ -290,10 +279,7 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): secret_id, _ = create_secret(self.app) _, _ = create_acls( self.app, 'secrets', secret_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = self.app.delete( '/secrets/{0}/acls'.format(uuid.uuid4().hex), @@ -326,9 +312,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_id, read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_user_ids=['u1', 'u3', 'u4']) acl_map = _get_acl_map(secret_id, is_secret=True) resp = self.app.get( @@ -339,15 +323,15 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): self.assertEqual(resp.status_int, 200) self.assertEqual('read', acl['operation']) self.assertFalse(acl['creator-only']) - self.assertIsNone(acl.get('users')) + self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users'])) resp = self.app.get( '/secrets/{0}/acls/{1}'.format(secret_id, - acl_map['list']['id']), + acl_map['read']['id']), expect_errors=False) acl = resp.json self.assertEqual(resp.status_int, 200) - self.assertEqual('list', acl['operation']) + self.assertEqual('read', acl['operation']) self.assertFalse(acl['creator-only']) self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users'])) @@ -357,9 +341,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_id, read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_user_ids=['u1', 'u3', 'u4']) resp = self.app.get( '/secrets/{0}/acls/{1}'.format(secret_id, uuid.uuid4().hex), @@ -381,8 +363,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) acl_map = _get_acl_map(secret_id, is_secret=True) acl_id = acl_map['read']['id'] @@ -414,8 +395,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) acl_map = _get_acl_map(secret_id, is_secret=True) acl_id = acl_map['read']['id'] @@ -438,67 +418,6 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): self.assertEqual('read', acl['operation']) self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - def test_update_secret_acl_modify_only_related_operation(self): - """Only modify the acl for matching operation and ignore others.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=['u1', 'u2', 'u5'], read_creator_only=True, - list_user_ids=['u1', 'u3'], list_creator_only=True, - write_creator_only=True) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - acl['acl_ref']) - self.assertTrue(acl['creator-only']) # read operation value is changed - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - acl_map = _get_acl_map(secret_id, is_secret=True) - - # list, write operation should not be changed - self.assertIsNone(acl_map.get('write')) - self.assertFalse(acl_map['list']['creator_only']) - list_users = acl_map['list'].to_dict_fields()['users'] - self.assertEqual(set(['u1', 'u3', 'u4']), set(list_users)) - - def test_update_secret_acl_modify_different_operation_should_fail(self): - """Should fail as modifying existing acl's operation is not allowed.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - # updating with list ACL should fail as originally read operation - # ACL is associated with acl_id and cannot be modified. - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - list_user_ids=['u1', 'u3'], list_creator_only=True, - expect_errors=True) - self.assertEqual(resp.status_int, 400) - def test_update_secret_acl_modify_only_users(self): """Modifying existing acl's user list and creator-only flag.""" secret_id, _ = create_secret(self.app) @@ -506,8 +425,7 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'secrets', secret_id, read_user_ids=['u1', 'u2'], - read_creator_only=True, - list_user_ids=['u1', 'u3', 'u4']) + read_creator_only=True) acl_map = _get_acl_map(secret_id, is_secret=True) acl_id = acl_map['read']['id'] @@ -542,44 +460,12 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): self.assertIsNone(acl.get('users')) self.assertTrue(acl['creator-only']) - def test_update_secret_acl_modify_creator_only(self): - """Modifying only creator_only flag for existing acl by its id.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2'], - read_creator_only=True, - list_user_ids=['u1', 'u3', 'u4']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_creator_only=False) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertEqual(set(['u1', 'u2']), set(acl['users'])) - self.assertFalse(acl['creator-only']) - def test_update_secret_acl_invalid_acl_should_fail(self): """Update should fail when invalid acl id is provided.""" secret_id, _ = create_secret(self.app) _, _ = create_acls( self.app, 'secrets', secret_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = update_acl( self.app, 'secrets', secret_id, uuid.uuid4().hex, @@ -601,33 +487,27 @@ class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): secret_id, _ = create_secret(self.app) _, _ = create_acls( self.app, 'secrets', secret_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) acl_map = _get_acl_map(secret_id, is_secret=True) - list_acl_id = acl_map['list'].id + read_acl_id = acl_map['read'].id resp = self.app.delete( - '/secrets/{0}/acls/{1}'.format(secret_id, list_acl_id), + '/secrets/{0}/acls/{1}'.format(secret_id, read_acl_id), expect_errors=False) content = resp.json self.assertIsNone(content) # make sure there is no response self.assertEqual(resp.status_int, 200) acl_map = _get_acl_map(secret_id, is_secret=True) - self.assertIsNone(acl_map.get('list')) # list acl should be deleted + self.assertIsNone(acl_map.get('read')) # read acl should be deleted def test_delete_secret_acls_invalid_secret_should_fail(self): """Delete acls should fail when invalid secret id is provided.""" secret_id, _ = create_secret(self.app) _, _ = create_acls( self.app, 'secrets', secret_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = self.app.delete( '/secrets/{0}/acls/{1}'.format(secret_id, uuid.uuid4().hex), @@ -651,39 +531,50 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): resp, acls = create_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) self.assertEqual(resp.status_int, 201) self.assertIsNotNone(acls) - self.assertTrue(2, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/containers/{0}/acls'.format(container_id), acl_ref['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) - self.assertFalse(acl_map['list']['creator_only']) - def test_create_new_container_acls_with_creator_only_values(self): + def test_create_new_container_acls_with_creator_only_false(self): """Should allow creating acls for a new container with creator-only.""" container_id, _ = create_container(self.app) resp, acls = create_acls( self.app, 'containers', container_id, read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_user_ids=['u1', 'u3', 'u4']) self.assertEqual(resp.status_int, 201) self.assertIsNotNone(acls) - self.assertTrue(3, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/containers/{0}/acls'.format(container_id), acl_ref['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) self.assertFalse(acl_map['read']['creator_only']) - self.assertFalse(acl_map['list']['creator_only']) - self.assertTrue(acl_map['write']['creator_only']) + + def test_create_new_container_acls_with_creator_only_true(self): + """Should allow creating acls for a new container with creator-only.""" + container_id, _ = create_container(self.app) + + resp, acls = create_acls( + self.app, 'containers', container_id, + read_creator_only=True, + read_user_ids=['u1', 'u3', 'u4']) + self.assertEqual(resp.status_int, 201) + self.assertIsNotNone(acls) + self.assertEqual(1, len(acls)) + for acl_ref in resp.json: + self.assertIn('/containers/{0}/acls'.format(container_id), + acl_ref['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) + self.assertTrue(acl_map['read']['creator_only']) def test_new_container_acls_with_invalid_creator_should_fail(self): """Should fail if creator-only flag is provided as string value.""" @@ -739,17 +630,14 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = self.app.get( '/containers/{0}/acls'.format(container_id), expect_errors=False) acls = resp.json self.assertEqual(resp.status_int, 200) - self.assertTrue(3, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in acls: self.assertIn('/containers/{0}/acls'.format(container_id), acl_ref['acl_ref']) @@ -759,10 +647,7 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = self.app.get( '/containers/{0}/acls'.format(uuid.uuid4().hex), @@ -784,24 +669,21 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) resp, acls = update_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u2', 'u5'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2', 'u5']) self.assertEqual(resp.status_int, 200) self.assertIsNotNone(acls) - self.assertTrue(2, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/containers/{0}/acls'.format(container_id), acl_ref['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) - self.assertFalse(acl_map['list']['creator_only']) self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) def test_update_container_acls_modify_creator_only_values(self): @@ -810,61 +692,26 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) resp, acls = update_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=True) self.assertEqual(resp.status_int, 200) self.assertIsNotNone(acls) - self.assertTrue(3, len(acls)) + self.assertEqual(1, len(acls)) for acl_ref in resp.json: self.assertIn('/containers/{0}/acls'.format(container_id), acl_ref['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) - self.assertFalse(acl_map['read']['creator_only']) - self.assertFalse(acl_map['list']['creator_only']) - self.assertTrue(acl_map['write']['creator_only']) - self.assertIn('u3', acl_map['list'].to_dict_fields()['users']) - self.assertNotIn('u4', acl_map['list'].to_dict_fields()['users']) - - def test_update_container_acls_partial_modify_read_users_only(self): - """Acls update where specific operation acl is modified.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) - - resp, acls = update_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u3', 'u5']) - - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertTrue(2, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) - acl_map = _get_acl_map(container_id, is_secret=False) - # Check creator_only is False when not provided - self.assertFalse(acl_map['read']['creator_only']) - self.assertIn('u3', acl_map['read'].to_dict_fields()['users']) - self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) - self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) + self.assertTrue(acl_map['read']['creator_only']) def test_update_container_acls_invalid_secret_should_fail(self): """Acls update should fail when invalid container is provided.""" container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) resp, acls = update_acls( self.app, 'containers', uuid.uuid4().hex, @@ -889,10 +736,7 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = self.app.delete( '/containers/{0}/acls'.format(container_id), @@ -908,10 +752,7 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = self.app.delete( '/containers/{0}/acls'.format(uuid.uuid4().hex), @@ -942,10 +783,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) acl_map = _get_acl_map(container_id, is_secret=False) resp = self.app.get( @@ -958,25 +796,12 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): self.assertFalse(acl['creator-only']) self.assertIsNone(acl.get('users')) - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_map['list']['id']), - expect_errors=False) - acl = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual('list', acl['operation']) - self.assertFalse(acl['creator-only']) - self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users'])) - def test_get_container_acl_invalid_acl_should_fail(self): """Get acl request should fail with invalid acl id.""" container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = self.app.get( '/containers/{0}/acls/{1}'.format(container_id, uuid.uuid4().hex), @@ -998,8 +823,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) acl_map = _get_acl_map(container_id, is_secret=False) acl_id = acl_map['read']['id'] @@ -1029,8 +853,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) + read_user_ids=['u1', 'u2']) acl_map = _get_acl_map(container_id, is_secret=False) acl_id = acl_map['read']['id'] @@ -1054,68 +877,6 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): self.assertEqual('read', acl['operation']) self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - def test_update_container_acl_modify_only_related_operation(self): - """Only modify the acl for matching operation and ignore others.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=['u1', 'u2', 'u5'], read_creator_only=True, - list_user_ids=['u1', 'u3'], list_creator_only=True, - write_creator_only=True) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/containers/{0}/acls/{1}'.format(container_id, acl_id), - acl['acl_ref']) - self.assertTrue(acl['creator-only']) # read operation value is changed - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - acl_map = _get_acl_map(container_id, is_secret=False) - - # list, write operation should not be changed - self.assertIsNone(acl_map.get('write')) - self.assertFalse(acl_map['list']['creator_only']) - list_users = acl_map['list'].to_dict_fields()['users'] - self.assertEqual(set(['u1', 'u3', 'u4']), set(list_users)) - - def test_update_container_acl_modify_different_operation_should_fail(self): - """Should fail as modifying existing acl's operation is not allowed.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - list_user_ids=['u1', 'u3', 'u4']) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - # updating with list ACL should fail as originally read operation - # ACL is associated with acl_id and cannot be modified. - resp = update_acl( - self.app, 'containers', container_id, acl_id, - list_user_ids=['u1', 'u3'], list_creator_only=True, - expect_errors=True) - self.assertEqual(resp.status_int, 400) - def test_update_container_acl_modify_only_users(self): """Modifying existing acl's user list and creator-only flag.""" container_id, _ = create_container(self.app) @@ -1123,8 +884,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u2'], - read_creator_only=True, - list_user_ids=['u1', 'u3', 'u4']) + read_creator_only=True) acl_map = _get_acl_map(container_id, is_secret=False) acl_id = acl_map['read']['id'] @@ -1167,8 +927,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): _, _ = create_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u2'], - read_creator_only=True, - list_user_ids=['u1', 'u3', 'u4']) + read_creator_only=True) acl_map = _get_acl_map(container_id, is_secret=False) acl_id = acl_map['read']['id'] @@ -1195,10 +954,7 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = update_acl( self.app, 'containers', container_id, uuid.uuid4().hex, @@ -1220,33 +976,27 @@ class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) acl_map = _get_acl_map(container_id, is_secret=False) - list_acl_id = acl_map['list'].id + read_acl_id = acl_map['read'].id resp = self.app.delete( - '/containers/{0}/acls/{1}'.format(container_id, list_acl_id), + '/containers/{0}/acls/{1}'.format(container_id, read_acl_id), expect_errors=False) content = resp.json self.assertIsNone(content) # make sure there is no response self.assertEqual(resp.status_int, 200) acl_map = _get_acl_map(container_id, is_secret=False) - self.assertIsNone(acl_map.get('list')) # list acl should be deleted + self.assertIsNone(acl_map.get('read')) # read acl should be deleted def test_delete_secret_acls_invalid_secret_should_fail(self): """Delete acls should fail when invalid secret id is provided.""" container_id, _ = create_container(self.app) _, _ = create_acls( self.app, 'containers', container_id, - read_creator_only=False, - list_user_ids=['u1', 'u3', 'u4'], - list_creator_only=None, - write_creator_only=True) + read_creator_only=False) resp = self.app.delete( '/containers/{0}/acls/{1}'.format(container_id, uuid.uuid4().hex), @@ -1323,48 +1073,32 @@ def create_container(app): def create_acls(app, entity_type, entity_id, read_user_ids=None, - write_user_ids=None, list_user_ids=None, - read_creator_only=None, write_creator_only=None, - list_creator_only=None, + read_creator_only=None, expect_errors=False): return manage_acls(app, entity_type, entity_id, read_user_ids=read_user_ids, - write_user_ids=write_user_ids, - list_user_ids=list_user_ids, read_creator_only=read_creator_only, - write_creator_only=write_creator_only, - list_creator_only=list_creator_only, is_update=False, + is_update=False, expect_errors=expect_errors) def update_acls(app, entity_type, entity_id, read_user_ids=None, - write_user_ids=None, list_user_ids=None, - read_creator_only=None, write_creator_only=None, - list_creator_only=None, + read_creator_only=None, expect_errors=False): return manage_acls(app, entity_type, entity_id, read_user_ids=read_user_ids, - write_user_ids=write_user_ids, - list_user_ids=list_user_ids, read_creator_only=read_creator_only, - write_creator_only=write_creator_only, - list_creator_only=list_creator_only, is_update=True, + is_update=True, expect_errors=expect_errors) def manage_acls(app, entity_type, entity_id, read_user_ids=None, - write_user_ids=None, list_user_ids=None, - read_creator_only=None, write_creator_only=None, - list_creator_only=None, is_update=False, + read_creator_only=None, is_update=False, expect_errors=False): request = {} _append_acl_to_request(request, 'read', read_user_ids, read_creator_only) - _append_acl_to_request(request, 'write', write_user_ids, - write_creator_only) - _append_acl_to_request(request, 'list', list_user_ids, - list_creator_only) cleaned_request = {key: val for key, val in request.items() if val is not None} @@ -1390,17 +1124,11 @@ def manage_acls(app, entity_type, entity_id, read_user_ids=None, def update_acl(app, entity_type, entity_id, acl_id, read_user_ids=None, - write_user_ids=None, list_user_ids=None, - read_creator_only=None, write_creator_only=None, - list_creator_only=None, expect_errors=False): + read_creator_only=None, expect_errors=False): request = {} _append_acl_to_request(request, 'read', read_user_ids, read_creator_only) - _append_acl_to_request(request, 'write', write_user_ids, - write_creator_only) - _append_acl_to_request(request, 'list', list_user_ids, - list_creator_only) cleaned_request = {key: val for key, val in request.items() if val is not None} diff --git a/barbican/tests/common/test_validators.py b/barbican/tests/common/test_validators.py index 0ae34f481..c548d8347 100644 --- a/barbican/tests/common/test_validators.py +++ b/barbican/tests/common/test_validators.py @@ -1299,5 +1299,48 @@ class WhenTestingStoredKeyOrderValidator(utils.BaseTestCase): self.order_req) +@utils.parameterized_test_case +class WhenTestingAclValidator(utils.BaseTestCase): + def setUp(self): + super(WhenTestingAclValidator, self).setUp() + self.validator = validators.ACLValidator() + + @utils.parameterized_dataset({ + 'one_reader': [{'read': {'users': ['reader'], 'creator-only': False}}], + 'two_reader': [{'read': {'users': ['r1', 'r2'], + 'creator-only': False}}], + 'private': [{'read': {'users': [], 'creator-only': True}}], + 'default_users': [{'read': {'creator-only': True}}], + 'default_creator': [{'read': {'users': ['reader']}}], + 'almost_empty': [{'read': {}}], + 'empty': [{}], + }) + def test_should_validate(self, acl_req): + self.validator.validate(acl_req) + + @utils.parameterized_dataset({ + 'foo': ['foo'], + 'bad_op': [{'bad_op': {'users': ['reader'], 'creator-only': False}}], + 'bad_field': [{'read': {'bad_field': ['reader'], + 'creator-only': False}}], + 'bad_user': [{'read': {'users': [27], 'creator-only': False}}], + 'missing_op': [{'creator-only': True}], + }) + def test_should_raise(self, acl_req): + self.assertRaises(excep.InvalidObject, + self.validator.validate, + acl_req) + + @utils.parameterized_dataset({ + 'write': [{'write': {'users': ['writer'], 'creator-only': False}}], + 'list': [{'list': {'users': ['lister'], 'creator-only': False}}], + 'delete': [{'delete': {'users': ['deleter'], 'creator-only': False}}], + }) + def test_should_raise_future(self, acl_req): + self.assertRaises(excep.InvalidObject, + self.validator.validate, + acl_req) + + if __name__ == '__main__': unittest.main()