make test use default.json file

This commit is contained in:
Nassim Babaci 2014-07-11 13:00:17 +02:00
parent 98437c5dc5
commit 243701ec5d
5 changed files with 108 additions and 180 deletions

36
policies/default.json Normal file
View File

@ -0,0 +1,36 @@
{
"is_anonymous": "identity:None",
"is_authenticated": "not rule:is_anonymous",
"swift_reseller": "role:reseller",
"swift_operator": "role:admin or role:swiftoperator",
"swift_owner": "rule:swift_reseller or rule:swift_operator",
"reseller_request": "rule:swift_reseller",
"same_tenant": "account:%(account)s",
"tenant_mismatch": "not rule:same_tenant",
"allowed_for_authenticated": "rule:swift_reseller or acl:check_cross_tenant or acl:check_is_public or (rule:same_tenant and rule:swift_operator) or (rule:same_tenant and acl:check_roles)",
"allowed_for_anonymous": "is_authoritative:True and acl:check_is_public",
"allowed_for_user": "(rule:is_authenticated and rule:allowed_for_authenticated) or rule:allowed_for_anonymous",
"get_account": "rule:allowed_for_user",
"post_account": "rule:allowed_for_user",
"head_account": "rule:allowed_for_user",
"delete_account": "rule:swift_reseller",
"options_account": "",
"get_container": "rule:allowed_for_user",
"put_container": "rule:allowed_for_user",
"delete_container": "rule:allowed_for_user",
"post_container": "rule:allowed_for_user",
"head_container": "rule:allowed_for_user",
"options_container": "",
"get_object": "rule:allowed_for_user",
"put_object": "rule:allowed_for_user",
"copy_object": "rule:allowed_for_user",
"delete_object": "rule:allowed_for_user",
"head_object": "rule:allowed_for_user",
"post_object": "rule:allowed_for_user",
"options_object": ""
}

View File

@ -14,14 +14,11 @@
from openstack.common import policy_parser as parser
def get_enforcer(operators_roles, reseller_role, is_admin, logger, policy_file=None):
swift_operators = [role.strip()
for role in operators_roles.split(',')]
def get_enforcer(logger, policy_file):
parser.registry.register('logger', logger)
if policy_file:
return FileBasedEnforcer(policy_file, logger)
else:
return DefaultEnforcer(swift_operators, reseller_role, is_admin, logger)
class Enforcer(object):
def __init__(self, rules=None):
@ -81,36 +78,11 @@ class Enforcer(object):
return result
def load_rules(self, force_reload=False):
#import pdb; pdb.set_trace()
policy = self._get_policy()
rules = parser.Rules.load_json(policy)
self.rules = rules
class DefaultEnforcer(Enforcer):
def __init__(self, swift_operator, swift_reseller, is_admin=False, logger=None):
super(DefaultEnforcer, self).__init__()
self.swift_operator = swift_operator
self.swift_reseller = swift_reseller
self.is_admin = is_admin
self.log = logger
def _get_policy(self):
param = {
"reseller_admin": self.swift_reseller,
"operators": " or ".join(["role:%s" % role
for role in self.swift_operator])
}
if self.is_admin:
template = default_policy_is_admin_tmpl
else:
template = default_policy_tmpl
policy = template % param
return policy
class FileBasedEnforcer(Enforcer):
def __init__(self, policy_file, logger):
super(FileBasedEnforcer, self).__init__()
@ -218,104 +190,4 @@ class AclCheck(parser.Check):
format(match=self.match))
enforcer.log.debug("Rule '%s' evaluated to %s" % (self.match, res))
return res
default_policy_tmpl = (
'{'
'"is_anonymous": "identity:None",'
'"is_authenticated": "not rule:is_anonymous",'
'"swift_reseller": "(role:%(reseller_admin)s)",'
'"swift_operator": "%(operators)s",'
'"swift_owner": "rule:swift_reseller'
' or rule:swift_operator",'
'"reseller_request": "rule:swift_reseller",'
'"same_tenant": "account:%%(account)s",'
'"tenant_mismatch": "not rule:same_tenant",'
'"allowed_for_authenticated": "rule:swift_reseller'
' or acl:check_cross_tenant'
' or acl:check_is_public'
' or (rule:same_tenant and rule:swift_operator)'
' or (rule:same_tenant and acl:check_roles)",'
'"allowed_for_anonymous": "is_authoritative:True'
' and acl:check_is_public",'
'"allowed_for_user": "(rule:is_authenticated'
' and rule:allowed_for_authenticated)'
' or rule:allowed_for_anonymous",'
'"get_account": "rule:allowed_for_user",'
'"post_account": "rule:allowed_for_user",'
'"head_account": "rule:allowed_for_user",'
'"delete_account": "rule:swift_reseller",'
'"options_account": "",'
'"get_container": "rule:allowed_for_user",'
'"put_container": "rule:allowed_for_user",'
'"delete_container": "rule:allowed_for_user",'
'"post_container": "rule:allowed_for_user",'
'"head_container": "rule:allowed_for_user",'
'"options_container": "",'
'"get_object": "rule:allowed_for_user",'
'"put_object": "rule:allowed_for_user",'
'"copy_object": "rule:allowed_for_user",'
'"delete_object": "rule:allowed_for_user",'
'"head_object": "rule:allowed_for_user",'
'"post_object": "rule:allowed_for_user",'
'"options_object": ""'
'}'
)
default_policy_is_admin_tmpl = (
'{'
'"is_anonymous": "identity:None",'
'"is_authenticated": "not rule:is_anonymous",'
'"swift_reseller": "(role:%(reseller_admin)s)",'
'"swift_operator": "%(operators)s",'
'"swift_owner": "rule:swift_reseller'
' or rule:swift_operator'
# diff: add is_admin to swift_owner
' or is_admin:True",'
'"reseller_request": "rule:swift_reseller",'
'"same_tenant": "account:%%(account)s",'
'"tenant_mismatch": "not rule:same_tenant",'
'"allowed_for_authenticated": "rule:swift_reseller'
' or acl:check_cross_tenant or acl:check_is_public'
' or (rule:same_tenant and rule:swift_operator)'
# diff: allow access if user is_admin
' or (rule:same_tenant and is_admin:True)'
' or (rule:same_tenant and is_admin:False and acl:check_roles)",'
'"allowed_for_anonymous": "is_authoritative:True'
' and acl:check_is_public",'
'"allowed_for_user": "(rule:is_authenticated'
' and rule:allowed_for_authenticated)'
' or rule:allowed_for_anonymous",'
'"get_account": "rule:allowed_for_user",'
'"post_account": "rule:allowed_for_user",'
'"head_account": "rule:allowed_for_user",'
'"delete_account": "rule:swift_reseller",'
'"options_account": "",'
'"get_container": "rule:allowed_for_user",'
'"put_container": "rule:allowed_for_user",'
'"delete_container": "rule:allowed_for_user",'
'"post_container": "rule:allowed_for_user",'
'"head_container": "rule:allowed_for_user",'
'"options_container": "",'
'"get_object": "rule:allowed_for_user",'
'"put_object": "rule:allowed_for_user",'
'"copy_object": "rule:allowed_for_user",'
'"delete_object": "rule:allowed_for_user",'
'"head_object": "rule:allowed_for_user",'
'"post_object": "rule:allowed_for_user",'
'"options_object": ""'
'}'
)
return res

View File

@ -82,15 +82,15 @@ class SwiftPolicy(object):
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH_').strip()
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
self.operator_roles = conf.get('operator_roles',
'admin, swiftoperator').lower()
self.reseller_admin_role = conf.get('reseller_admin_role',
'ResellerAdmin').lower()
config_is_admin = conf.get('is_admin', "false").lower()
self.is_admin = swift_utils.config_true_value(config_is_admin)
#self.operator_roles = conf.get('operator_roles',
# 'admin, swiftoperator').lower()
#self.reseller_admin_role = conf.get('reseller_admin_role',
# 'ResellerAdmin').lower()
#config_is_admin = conf.get('is_admin', "false").lower()
#self.is_admin = swift_utils.config_true_value(config_is_admin)
config_overrides = conf.get('allow_overrides', 't').lower()
self.allow_overrides = swift_utils.config_true_value(config_overrides)
self.policy_file = conf.get('policy', None)
self.policy_file = conf.get('policy', "default.json")
def __call__(self, environ, start_response):
identity = self._keystone_identity(environ)
@ -109,7 +109,7 @@ class SwiftPolicy(object):
environ['keystone.identity'] = identity
environ['REMOTE_USER'] = identity.get('tenant')
environ['swift.authorize'] = self.authorize
# Check reseller_request again poicy
# Check reseller_request against policy
if self.check_action('reseller_request', environ):
environ['reseller_request'] = True
else:
@ -227,11 +227,7 @@ class SwiftPolicy(object):
def check_action(self, action, environ):
creds = self.get_creds(environ)
target = self.get_target(environ)
enforcer = get_enforcer(self.operator_roles,
self.reseller_admin_role,
self.is_admin,
self.logger,
self.policy_file)
enforcer = get_enforcer(self.logger, self.policy_file)
self.logger.debug("enforce action '%s'", action)
return enforcer.enforce(action, target, creds)

View File

@ -0,0 +1,36 @@
{
"is_anonymous": "identity:None",
"is_authenticated": "not rule:is_anonymous",
"swift_reseller": "role:reseller",
"swift_operator": "role:admin or role:swiftoperator",
"swift_owner": "rule:swift_reseller or rule:swift_operator",
"reseller_request": "rule:swift_reseller",
"same_tenant": "account:%(account)s",
"tenant_mismatch": "not rule:same_tenant",
"allowed_for_authenticated": "rule:swift_reseller or acl:check_cross_tenant or acl:check_is_public or (rule:same_tenant and rule:swift_operator) or (rule:same_tenant and acl:check_roles)",
"allowed_for_anonymous": "is_authoritative:True and acl:check_is_public",
"allowed_for_user": "(rule:is_authenticated and rule:allowed_for_authenticated) or rule:allowed_for_anonymous",
"get_account": "rule:allowed_for_user",
"post_account": "rule:allowed_for_user",
"head_account": "rule:allowed_for_user",
"delete_account": "rule:swift_reseller",
"options_account": "",
"get_container": "rule:allowed_for_user",
"put_container": "rule:allowed_for_user",
"delete_container": "rule:allowed_for_user",
"post_container": "rule:allowed_for_user",
"head_container": "rule:allowed_for_user",
"options_container": "",
"get_object": "rule:allowed_for_user",
"put_object": "rule:allowed_for_user",
"copy_object": "rule:allowed_for_user",
"delete_object": "rule:allowed_for_user",
"head_object": "rule:allowed_for_user",
"post_object": "rule:allowed_for_user",
"options_object": ""
}

View File

@ -18,7 +18,7 @@ import time
import unittest
from collections import defaultdict
from swift.common.middleware import keystoneauth
from swiftpolicy import swiftpolicy
from swift.common.swob import Request, Response
from swift.common.http import HTTP_FORBIDDEN
from swiftpolicy.enforcer import AclCheck
@ -165,7 +165,9 @@ class FakeApp(object):
class SwiftAuth(unittest.TestCase):
def setUp(self):
self.test_auth = keystoneauth.filter_factory({})(FakeApp())
self.test_auth = swiftpolicy.filter_factory({'policy': 'policies/default.json'})(FakeApp())
# set in default.json
self.reseller_admin_role = "reseller"
self.test_auth.logger = FakeLogger()
def _make_request(self, path=None, headers=None, **kwargs):
@ -183,10 +185,10 @@ class SwiftAuth(unittest.TestCase):
def _get_successful_middleware(self):
response_iter = iter([('200 OK', {}, '')])
return keystoneauth.filter_factory({})(FakeApp(response_iter))
return swiftpolicy.filter_factory({'policy': 'policies/default.json'})(FakeApp(response_iter))
def test_invalid_request_authorized(self):
role = self.test_auth.reseller_admin_role
role = self.reseller_admin_role
headers = self._get_identity_headers(role=role)
req = self._make_request('/', headers=headers)
resp = req.get_response(self._get_successful_middleware())
@ -198,14 +200,14 @@ class SwiftAuth(unittest.TestCase):
self.assertEqual(resp.status_int, 404)
def test_confirmed_identity_is_authorized(self):
role = self.test_auth.reseller_admin_role
role = self.reseller_admin_role
headers = self._get_identity_headers(role=role)
req = self._make_request('/v1/AUTH_acct/c', headers)
resp = req.get_response(self._get_successful_middleware())
self.assertEqual(resp.status_int, 200)
def test_detect_reseller_request(self):
role = self.test_auth.reseller_admin_role
role = self.reseller_admin_role
headers = self._get_identity_headers(role=role)
req = self._make_request('/v1/AUTH_acct/c', headers)
req.get_response(self._get_successful_middleware())
@ -239,23 +241,23 @@ class SwiftAuth(unittest.TestCase):
def test_blank_reseller_prefix(self):
conf = {'reseller_prefix': ''}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
account = tenant_id = 'foo'
self.assertEqual(account, test_auth._get_account_for_tenant(tenant_id))
def test_reseller_prefix_added_underscore(self):
conf = {'reseller_prefix': 'AUTH'}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
self.assertEqual(test_auth.reseller_prefix, "AUTH_")
def test_reseller_prefix_not_added_double_underscores(self):
conf = {'reseller_prefix': 'AUTH_'}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
self.assertEqual(test_auth.reseller_prefix, "AUTH_")
def test_override_asked_for_but_not_allowed(self):
conf = {'allow_overrides': 'false'}
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
conf = {'allow_overrides': 'false', 'policy': 'policies/default.json'}
self.test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
req = self._make_request('/v1/AUTH_account',
environ={'swift.authorize_override': True})
resp = req.get_response(self.test_auth)
@ -263,7 +265,7 @@ class SwiftAuth(unittest.TestCase):
def test_override_asked_for_and_allowed(self):
conf = {'allow_overrides': 'true'}
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
req = self._make_request('/v1/AUTH_account',
environ={'swift.authorize_override': True})
resp = req.get_response(self.test_auth)
@ -300,7 +302,10 @@ class SwiftAuth(unittest.TestCase):
class TestAuthorize(unittest.TestCase):
def setUp(self):
self.test_auth = keystoneauth.filter_factory({})(FakeApp())
self.test_auth = swiftpolicy.filter_factory({'policy': 'policies/default.json'})(FakeApp())
# set in default.json
self.reseller_admin_role = "reseller"
self.operator_roles = ["admin", "swiftoperator",]
self.test_auth.logger = FakeLogger()
def _make_request(self, path, **kwargs):
@ -356,46 +361,29 @@ class TestAuthorize(unittest.TestCase):
exception=HTTP_FORBIDDEN)
def test_authorize_succeeds_for_reseller_admin(self):
roles = [self.test_auth.reseller_admin_role]
roles = [self.reseller_admin_role]
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
self.assertTrue(req.environ.get('swift_owner'))
def test_authorize_succeeds_for_insensitive_reseller_admin(self):
roles = [self.test_auth.reseller_admin_role.upper()]
roles = [self.reseller_admin_role.upper()]
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
self.assertTrue(req.environ.get('swift_owner'))
def test_authorize_succeeds_as_owner_for_operator_role(self):
roles = self.test_auth.operator_roles.split(',')
roles = self.operator_roles
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
self.assertTrue(req.environ.get('swift_owner'))
def test_authorize_succeeds_as_owner_for_insensitive_operator_role(self):
#import pdb; pdb.set_trace()
roles = [r.upper() for r in self.test_auth.operator_roles.split(',')]
roles = [r.upper() for r in self.operator_roles]
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
self.assertTrue(req.environ.get('swift_owner'))
def _check_authorize_for_tenant_owner_match(self, exception=None):
identity = self._get_identity(user_name='same_name',
tenant_name='same_name')
req = self._check_authenticate(identity=identity, exception=exception)
expected = bool(exception is None)
self.assertEqual(bool(req.environ.get('swift_owner')), expected)
def test_authorize_succeeds_as_owner_for_tenant_owner_match(self):
self.test_auth.is_admin = True
self._check_authorize_for_tenant_owner_match()
def test_authorize_fails_as_owner_for_tenant_owner_match(self):
self.test_auth.is_admin = False
self._check_authorize_for_tenant_owner_match(
exception=HTTP_FORBIDDEN)
def test_authorize_succeeds_for_container_sync(self):
env = {'swift_sync_key': 'foo', 'REMOTE_ADDR': '127.0.0.1'}
headers = {'x-container-sync-key': 'foo', 'x-timestamp': '1'}
@ -452,7 +440,7 @@ class TestAuthorize(unittest.TestCase):
self._check_authenticate(identity=identity, acl=acl)
def test_delete_own_account_not_allowed(self):
roles = self.test_auth.operator_roles.split(',')
roles = self.operator_roles
identity = self._get_identity(roles=roles)
account = self._get_account(identity)
self._check_authenticate(account=account,
@ -462,7 +450,7 @@ class TestAuthorize(unittest.TestCase):
env={'REQUEST_METHOD': 'DELETE'})
def test_delete_own_account_when_reseller_allowed(self):
roles = [self.test_auth.reseller_admin_role]
roles = [self.reseller_admin_role]
identity = self._get_identity(roles=roles)
account = self._get_account(identity)
req = self._check_authenticate(account=account,