From 20a52de7ea2b01e89d01743cf84895ac8bbf3b8c Mon Sep 17 00:00:00 2001 From: southeast02 Date: Sun, 26 Mar 2017 20:52:51 +0800 Subject: [PATCH] Synchronized patch: Support policy control on Admin-API request 1.What is the problem Originally this patch had been committed to the Tricircle and provided policy control support. After splitting Trio2o needs these functions, too. So we plan to synchronize this patch from Gerrit to Trio2o. You can find the old patch on Gerrit here[1]. Currently Admin-API is to manage pod and pod-binding, the Admin-API access is hard coded, and only admin role is allowed. OpenStack usually use policy.json based authorization to control the API-request. Policy feature is missing in the Trio2o. 2. What is the solution to the problem Remove hard coded Admin-API request authorization, use policy instead. For Nova API-GW and Cinder API-GW, the API access control should be done at bottom OpenStack as far as possible if the API request will be forwarded to bottom OpenStack directly for further processing; only these APIs which only interact with database for example flavor and volume type, because these APIs processing will be terminated at the Trio2o layer, so policy control should be done in Nova API-GW or Cinder API-GW. No work needs to do in Trio2o Neutron Plugin for Neutron API server is there, Neutron API server will be responsible for policy control. 3. What the features need to be implemented to the Trio2o to realize the solution In this patch, default policy option and rule, and policy control in Admin-API were added. Using the default option and value to generate the policy.json will be implemented in next patch. No policy.json is mandatory required after this patch is merged, if no policy.json is configured or provided, the policy control will use the default rule automatically. [1] https://review.openstack.org/#/c/356262/ Change-Id: I61cab299d1286dcc2729dd943f4134c427d79bb1 --- trio2o/api/controllers/pod.py | 35 ++-- trio2o/common/config.py | 28 ++- trio2o/common/context.py | 7 +- trio2o/common/exceptions.py | 4 + trio2o/common/policy.py | 186 ++++++++++++++++++ .../functional/api/controllers/test_pod.py | 126 +++++++++--- trio2o/tests/unit/api/controllers/test_pod.py | 3 + trio2o/tests/unit/common/test_policy.py | 123 ++++++++++++ 8 files changed, 454 insertions(+), 58 deletions(-) create mode 100644 trio2o/common/policy.py create mode 100644 trio2o/tests/unit/common/test_policy.py diff --git a/trio2o/api/controllers/pod.py b/trio2o/api/controllers/pod.py index 6814bfa..bb22442 100644 --- a/trio2o/api/controllers/pod.py +++ b/trio2o/api/controllers/pod.py @@ -27,6 +27,7 @@ import trio2o.common.context as t_context import trio2o.common.exceptions as t_exc from trio2o.common.i18n import _ from trio2o.common.i18n import _LE +from trio2o.common import policy from trio2o.common import utils from trio2o.db import api as db_api @@ -45,8 +46,8 @@ class PodsController(rest.RestController): def post(self, **kw): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to create pods')) + if not policy.enforce(context, policy.ADMIN_API_PODS_CREATE): + pecan.abort(401, _('Unauthorized to create pods')) return if 'pod' not in kw: @@ -129,8 +130,8 @@ class PodsController(rest.RestController): def get_one(self, _id): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to show pods')) + if not policy.enforce(context, policy.ADMIN_API_PODS_SHOW): + pecan.abort(401, _('Unauthorized to show pods')) return try: @@ -143,8 +144,8 @@ class PodsController(rest.RestController): def get_all(self): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to list pods')) + if not policy.enforce(context, policy.ADMIN_API_PODS_LIST): + pecan.abort(401, _('Unauthorized to list pods')) return try: @@ -160,8 +161,8 @@ class PodsController(rest.RestController): def delete(self, _id): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to delete pods')) + if not policy.enforce(context, policy.ADMIN_API_PODS_DELETE): + pecan.abort(401, _('Unauthorized to delete pods')) return try: @@ -174,7 +175,6 @@ class PodsController(rest.RestController): az_ag.delete_ag(context, ag['id']) core.delete_resource(context, models.Pod, _id) pecan.response.status = 200 - return {} except t_exc.ResourceNotFound: return Response(_('Pod not found'), 404) except Exception as e: @@ -212,8 +212,8 @@ class BindingsController(rest.RestController): def post(self, **kw): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to create bindings')) + if not policy.enforce(context, policy.ADMIN_API_BINDINGS_CREATE): + pecan.abort(401, _('Unauthorized to create bindings')) return if 'pod_binding' not in kw: @@ -272,8 +272,8 @@ class BindingsController(rest.RestController): def get_one(self, _id): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to show bindings')) + if not policy.enforce(context, policy.ADMIN_API_BINDINGS_SHOW): + pecan.abort(401, _('Unauthorized to show bindings')) return try: @@ -290,8 +290,8 @@ class BindingsController(rest.RestController): def get_all(self): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to list bindings')) + if not policy.enforce(context, policy.ADMIN_API_BINDINGS_LIST): + pecan.abort(401, _('Unauthorized to list bindings')) return try: @@ -309,15 +309,14 @@ class BindingsController(rest.RestController): def delete(self, _id): context = t_context.extract_context_from_environ() - if not t_context.is_admin_context(context): - pecan.abort(400, _('Admin role required to delete bindings')) + if not policy.enforce(context, policy.ADMIN_API_BINDINGS_DELETE): + pecan.abort(401, _('Unauthorized to delete bindings')) return try: with context.session.begin(): core.delete_resource(context, models.PodBinding, _id) pecan.response.status = 200 - return {} except t_exc.ResourceNotFound: pecan.abort(404, _('Pod binding not found')) return diff --git a/trio2o/common/config.py b/trio2o/common/config.py index 2d1622e..79873e5 100644 --- a/trio2o/common/config.py +++ b/trio2o/common/config.py @@ -16,35 +16,35 @@ """ Routines for configuring trio2o, largely copy from Neutron """ - import sys from oslo_config import cfg import oslo_log.log as logging +from oslo_policy import opts as policy_opts from trio2o.common.i18n import _LI -# from trio2o import policy +from trio2o.common import policy from trio2o.common import rpc from trio2o.common import version +logging.register_options(cfg.CONF) LOG = logging.getLogger(__name__) +policy_opts.set_defaults(cfg.CONF, 'policy.json') + def init(opts, args, **kwargs): # Register the configuration options cfg.CONF.register_opts(opts) - # ks_session.Session.register_conf_options(cfg.CONF) - # auth.register_conf_options(cfg.CONF) - logging.register_options(cfg.CONF) - cfg.CONF(args=args, project='trio2o', version=version.version_info, **kwargs) _setup_logging() + _setup_policy() rpc.init(cfg.CONF) @@ -60,11 +60,23 @@ def _setup_logging(): LOG.debug("command line: %s", " ".join(sys.argv)) +def _setup_policy(): + + # if there is valid policy file, use policy file by oslo_policy + # otherwise, use the default policy value in policy.py + policy_file = cfg.CONF.oslo_policy.policy_file + if policy_file and cfg.CONF.find_file(policy_file): + # just return here, oslo_policy lib will use policy file by itself + return + + policy.populate_default_rules() + + def reset_service(): # Reset worker in case SIGHUP is called. # Note that this is called only in case a service is running in # daemon mode. _setup_logging() - # TODO(zhiyuan) enforce policy later - # policy.refresh() + policy.reset() + _setup_policy() diff --git a/trio2o/common/context.py b/trio2o/common/context.py index 88882e3..2dbbf7e 100644 --- a/trio2o/common/context.py +++ b/trio2o/common/context.py @@ -76,7 +76,7 @@ class ContextBase(oslo_ctx.RequestContext): def __init__(self, auth_token=None, user_id=None, tenant_id=None, is_admin=False, read_deleted="no", request_id=None, overwrite=True, user_name=None, tenant_name=None, - quota_class=None, **kwargs): + quota_class=None, roles=None, **kwargs): """Initialize RequestContext. :param read_deleted: 'no' indicates deleted records are hidden, 'yes' @@ -105,6 +105,7 @@ class ContextBase(oslo_ctx.RequestContext): self.read_deleted = read_deleted self.nova_micro_version = kwargs.get('nova_micro_version', constants.NOVA_APIGW_MIN_VERSION) + self.roles = roles or [] def _get_read_deleted(self): return self._read_deleted @@ -128,7 +129,8 @@ class ContextBase(oslo_ctx.RequestContext): 'tenant_name': self.tenant_name, 'tenant_id': self.tenant_id, 'project_id': self.project_id, - 'quota_class': self.quota_class + 'quota_class': self.quota_class, + 'roles': self.roles }) return ctx_dict @@ -175,6 +177,7 @@ class Context(ContextBase): def elevated(self, read_deleted=None, overwrite=False): """Return a version of this context with admin flag set.""" ctx = copy.copy(self) + ctx.roles = copy.deepcopy(self.roles) ctx.is_admin = True if read_deleted is not None: diff --git a/trio2o/common/exceptions.py b/trio2o/common/exceptions.py index 979a5af..94096bf 100644 --- a/trio2o/common/exceptions.py +++ b/trio2o/common/exceptions.py @@ -123,6 +123,10 @@ class AdminRequired(NotAuthorized): message = _("User does not have admin privileges") +class PolicyNotAuthorized(NotAuthorized): + message = _("Policy doesn't allow this operation to be performed.") + + class InUse(Trio2oException): message = _("The resource is inuse") diff --git a/trio2o/common/policy.py b/trio2o/common/policy.py new file mode 100644 index 0000000..7cbb438 --- /dev/null +++ b/trio2o/common/policy.py @@ -0,0 +1,186 @@ +# Copyright (c) Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Policy Engine For Trio2o.""" + +# Policy controlled API access mainly for the Trio2o Admin API. Regarding +# to Nova API-GW and Cinder API-GW, the API access control should be done at +# bottom OpenStack as far as possible if the API request will be forwarded +# to bottom OpenStack directly for further processing; only these APIs which +# only can interact with database for example flavor and volume type, because +# these APIs processing will be terminated at the Trio2o layer, so policy +# control should be done by Nova API-GW or Cinder API-GW. No work is required +# to do in the Trio2o Neutron Plugin for Neutron API server is there, +# Neutron API server will be responsible for policy control. + + +from oslo_config import cfg +import oslo_log.log as logging +from oslo_policy import policy + +from trio2o.common import exceptions as t_exec +from trio2o.common.i18n import _LE + +_ENFORCER = None +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +default_policies = [ + policy.RuleDefault('context_is_admin', 'role:admin'), + policy.RuleDefault('admin_api', 'is_admin:True', + description='cloud admin allowed'), + policy.RuleDefault('admin_or_owner', + 'is_admin:True or project_id:%(project_id)s', + description='cloud admin or project owner allowed'), + policy.RuleDefault('default', 'rule:admin_or_owner'), +] + +ADMIN_API_PODS_CREATE = 'admin_api:pods:create' +ADMIN_API_PODS_DELETE = 'admin_api:pods:delete' +ADMIN_API_PODS_SHOW = 'admin_api:pods:show' +ADMIN_API_PODS_LIST = 'admin_api:pods:list' + +ADMIN_API_BINDINGS_CREATE = 'admin_api:bindings:create' +ADMIN_API_BINDINGS_DELETE = 'admin_api:bindings:delete' +ADMIN_API_BINDINGS_SHOW = 'admin_api:bindings:show' +ADMIN_API_BINDINGS_LIST = 'admin_api:bindings:list' + +trio2o_admin_api_policies = [ + policy.RuleDefault(ADMIN_API_PODS_CREATE, + 'rule:admin_api', + description='Create pod'), + policy.RuleDefault(ADMIN_API_PODS_DELETE, + 'rule:admin_api', + description='Delete pod'), + policy.RuleDefault(ADMIN_API_PODS_SHOW, + 'rule:admin_api', + description='Show pod detail'), + policy.RuleDefault(ADMIN_API_PODS_LIST, + 'rule:admin_api', + description='List pods'), + + policy.RuleDefault(ADMIN_API_BINDINGS_CREATE, + 'rule:admin_api', + description='Create pod binding'), + policy.RuleDefault(ADMIN_API_BINDINGS_DELETE, + 'rule:admin_api', + description='Delete pod binding'), + policy.RuleDefault(ADMIN_API_BINDINGS_SHOW, + 'rule:admin_api', + description='Show pod binding detail'), + policy.RuleDefault(ADMIN_API_BINDINGS_LIST, + 'rule:admin_api', + description='List pod bindings'), +] + + +def list_policies(): + policies = (default_policies + + trio2o_admin_api_policies) + return policies + + +# we can get a policy enforcer by this init. +# oslo policy supports change policy rule dynamically. +# at present, policy.enforce will reload the policy rules when it checks +# the policy file has been touched. +def init(policy_file=None, rules=None, + default_rule=None, use_conf=True, overwrite=True): + """Init an Enforcer class. + + :param policy_file: Custom policy file to use, if none is + specified, ``conf.policy_file`` will be + used. + :param rules: Default dictionary / Rules to use. It will be + considered just in the first instantiation. If + :meth:`load_rules` with ``force_reload=True``, + :meth:`clear` or :meth:`set_rules` with + ``overwrite=True`` is called this will be overwritten. + :param default_rule: Default rule to use, conf.default_rule will + be used if none is specified. + :param use_conf: Whether to load rules from cache or config file. + :param overwrite: Whether to overwrite existing rules when reload rules + from config file. + """ + global _ENFORCER + if not _ENFORCER: + # http://docs.openstack.org/developer/oslo.policy/usage.html + _ENFORCER = policy.Enforcer(CONF, + policy_file=policy_file, + rules=rules, + default_rule=default_rule, + use_conf=use_conf, + overwrite=overwrite) + _ENFORCER.register_defaults(list_policies()) + return _ENFORCER + + +def set_rules(rules, overwrite=True, use_conf=False): + """Set rules based on the provided dict of rules. + + :param rules: New rules to use. It should be an instance of dict. + :param overwrite: Whether to overwrite current rules or update them + with the new rules. + :param use_conf: Whether to reload rules from config file. + """ + init(use_conf=False) + _ENFORCER.set_rules(rules, overwrite, use_conf) + + +def populate_default_rules(): + reset() + init(use_conf=False) + dict_rules = {} + for default in list_policies(): + dict_rules[default.name] = default.check_str + rules = policy.Rules.from_dict(dict_rules) + set_rules(rules) + + +def reset(): + global _ENFORCER + if _ENFORCER: + _ENFORCER.clear() + _ENFORCER = None + + +def enforce(context, rule=None, target=None, *args, **kwargs): + """Check authorization of a rule against the target and credentials. + + :param dict context: As much information about the user performing the + action as possible. + :param rule: The rule to evaluate. + :param dict target: As much information about the object being operated + on as possible. + :return: ``True`` if the policy allows the action. + ``False`` if the policy does not allow the action. + """ + enforcer = init() + credentials = context.to_dict() + if target is None: + target = {'project_id': context.project_id, + 'user_id': context.user_id} + + exc = t_exec.PolicyNotAuthorized + + try: + result = enforcer.enforce(rule, target, credentials, + do_raise=True, exc=exc, *args, **kwargs) + + except t_exec.PolicyNotAuthorized as e: + result = False + LOG.exception(_LE("%(msg)s, %(rule)s, %(target)s"), + {'msg': str(e), 'rule': rule, 'target': target}) + return result diff --git a/trio2o/tests/functional/api/controllers/test_pod.py b/trio2o/tests/functional/api/controllers/test_pod.py index 344fc00..d97bf83 100644 --- a/trio2o/tests/functional/api/controllers/test_pod.py +++ b/trio2o/tests/functional/api/controllers/test_pod.py @@ -24,6 +24,7 @@ import oslo_db.exception as db_exc from trio2o.api import app from trio2o.common import az_ag from trio2o.common import context +from trio2o.common import policy from trio2o.common import utils from trio2o.db import core from trio2o.tests import base @@ -33,8 +34,14 @@ OPT_GROUP_NAME = 'keystone_authtoken' cfg.CONF.import_group(OPT_GROUP_NAME, "keystonemiddleware.auth_token") -def fake_is_admin(ctx): - return True +def fake_admin_context(): + context_paras = {'is_admin': True} + return context.Context(**context_paras) + + +def fake_non_admin_context(): + context_paras = {} + return context.Context(**context_paras) class API_FunctionalTest(base.TestCase): @@ -44,6 +51,7 @@ class API_FunctionalTest(base.TestCase): self.addCleanup(set_config, {}, overwrite=True) + cfg.CONF.clear() cfg.CONF.register_opts(app.common_opts) self.CONF = self.useFixture(fixture_config.Config()).conf @@ -56,6 +64,8 @@ class API_FunctionalTest(base.TestCase): self.context = context.get_admin_context() + policy.populate_default_rules() + self.app = self._make_app() def _make_app(self, enable_acl=False): @@ -78,13 +88,14 @@ class API_FunctionalTest(base.TestCase): cfg.CONF.unregister_opts(app.common_opts) pecan.set_config({}, overwrite=True) core.ModelBase.metadata.drop_all(core.get_engine()) + policy.reset() class TestPodController(API_FunctionalTest): """Test version listing on root URI.""" - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_no_input(self): pods = [ # missing pod @@ -109,8 +120,8 @@ class TestPodController(API_FunctionalTest): def fake_create_ag_az(context, ag_name, az_name): raise db_exc.DBDuplicateEntry - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) @patch.object(az_ag, 'create_ag_az', new=fake_create_ag_az) def test_post_dup_db_exception(self): @@ -132,8 +143,8 @@ class TestPodController(API_FunctionalTest): def fake_create_ag_az_exp(context, ag_name, az_name): raise Exception - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) @patch.object(core, 'create_resource', new=fake_create_ag_az_exp) def test_post_exception(self): @@ -152,8 +163,8 @@ class TestPodController(API_FunctionalTest): self._test_and_check(pods) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_invalid_input(self): pods = [ @@ -230,8 +241,8 @@ class TestPodController(API_FunctionalTest): self._test_and_check(pods) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_duplicate_top_region(self): pods = [ @@ -261,8 +272,8 @@ class TestPodController(API_FunctionalTest): self._test_and_check(pods) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_duplicate_pod(self): pods = [ @@ -293,8 +304,8 @@ class TestPodController(API_FunctionalTest): self._test_and_check(pods) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_pod_duplicate_top_region(self): pods = [ @@ -336,8 +347,8 @@ class TestPodController(API_FunctionalTest): self.assertEqual(response.status_int, test_pod['expected_error']) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_get_all(self): pods = [ @@ -387,12 +398,9 @@ class TestPodController(API_FunctionalTest): self.assertIn('Pod1', response) self.assertIn('Pod2', response) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) - @patch.object(context, 'extract_context_from_environ') - def test_get_delete_one(self, mock_context): - - mock_context.return_value = self.context + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + def test_get_delete_one(self): pods = [ @@ -480,12 +488,42 @@ class TestPodController(API_FunctionalTest): ag = az_ag.get_ag_by_name(self.context, ag_name) self.assertIsNone(ag) + @patch.object(context, 'extract_context_from_environ', + new=fake_non_admin_context) + def test_non_admin_action(self): + + pods = [ + { + "pod": + { + "pod_name": "Pod1", + "pod_az_name": "az1", + "dc_name": "dc2", + "az_name": "AZ1" + }, + "expected_error": 401, + }, + ] + self._test_and_check(pods) + + response = self.app.get('/v1.0/pods/1234567890', + expect_errors=True) + self.assertEqual(response.status_int, 401) + + response = self.app.get('/v1.0/pods', + expect_errors=True) + self.assertEqual(response.status_int, 401) + + response = self.app.delete('/v1.0/pods/1234567890', + expect_errors=True) + self.assertEqual(response.status_int, 401) + class TestBindingController(API_FunctionalTest): """Test version listing on root URI.""" - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_no_input(self): pod_bindings = [ # missing pod_binding @@ -507,8 +545,8 @@ class TestBindingController(API_FunctionalTest): self.assertEqual(response.status_int, test_pod['expected_error']) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_post_invalid_input(self): pod_bindings = [ @@ -552,8 +590,8 @@ class TestBindingController(API_FunctionalTest): self._test_and_check(pod_bindings) - @patch.object(context, 'is_admin_context', - new=fake_is_admin) + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) def test_bindings(self): pods = [ @@ -665,3 +703,31 @@ class TestBindingController(API_FunctionalTest): self.assertEqual(response.status_int, test_pod['expected_error']) + + @patch.object(context, 'extract_context_from_environ', + new=fake_non_admin_context) + def test_non_admin_action(self): + pod_bindings = [ + { + "pod_binding": + { + "tenant_id": "dddddd", + "pod_id": "0ace0db2-ef33-43a6-a150-42703ffda643" + }, + "expected_error": 401 + }, + ] + + self._test_and_check(pod_bindings) + + response = self.app.get('/v1.0/bindings/1234567890', + expect_errors=True) + self.assertEqual(response.status_int, 401) + + response = self.app.get('/v1.0/bindings', + expect_errors=True) + self.assertEqual(response.status_int, 401) + + response = self.app.delete('/v1.0/bindings/1234567890', + expect_errors=True) + self.assertEqual(response.status_int, 401) diff --git a/trio2o/tests/unit/api/controllers/test_pod.py b/trio2o/tests/unit/api/controllers/test_pod.py index 2c7daed..8c74b49 100644 --- a/trio2o/tests/unit/api/controllers/test_pod.py +++ b/trio2o/tests/unit/api/controllers/test_pod.py @@ -21,6 +21,7 @@ import pecan from trio2o.api.controllers import pod from trio2o.common import context +from trio2o.common import policy from trio2o.common import utils from trio2o.db import core from trio2o.db import models @@ -32,6 +33,7 @@ class PodsControllerTest(unittest.TestCase): core.ModelBase.metadata.create_all(core.get_engine()) self.controller = pod.PodsController() self.context = context.get_admin_context() + policy.populate_default_rules() @patch.object(context, 'extract_context_from_environ') def test_post_top_pod(self, mock_context): @@ -133,3 +135,4 @@ class PodsControllerTest(unittest.TestCase): def tearDown(self): core.ModelBase.metadata.drop_all(core.get_engine()) + policy.reset() diff --git a/trio2o/tests/unit/common/test_policy.py b/trio2o/tests/unit/common/test_policy.py new file mode 100644 index 0000000..6e34ccf --- /dev/null +++ b/trio2o/tests/unit/common/test_policy.py @@ -0,0 +1,123 @@ +# Copyright 2016 Huawei Technologies Co., Ltd. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +from oslo_policy import policy as oslo_policy + +from trio2o.common import context +from trio2o.common import policy + + +class PolicyTestCase(unittest.TestCase): + def setUp(self): + super(PolicyTestCase, self).setUp() + rules = oslo_policy.Rules.from_dict({ + "true": '@', + "example:allowed": '@', + "example:denied": "!", + "example:my_file": "role:admin or " + "project_id:%(project_id)s", + "example:early_and_fail": "! and @", + "example:early_or_success": "@ or !", + "example:lowercase_admin": "role:admin or role:sysadmin", + "example:uppercase_admin": "role:ADMIN or role:sysadmin", + }) + policy.reset() + policy.init() + policy.set_rules(rules) + self.context = context.Context(user_id='fake', + tenant_id='fake', + roles=['member']) + self.target = None + + def test_enforce_nonexistent_action_throws(self): + action = "example:non_exist" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, False) + + def test_enforce_bad_action_throws(self): + action = "example:denied" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, False) + + def test_enforce_good_action(self): + action = "example:allowed" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, True) + + def test_templatized_enforcement(self): + target_mine = {'project_id': 'fake'} + target_not_mine = {'project_id': 'another'} + action = "example:my_file" + result = policy.enforce(self.context, action, target_mine) + self.assertEqual(result, True) + result = policy.enforce(self.context, action, target_not_mine) + self.assertEqual(result, False) + + def test_early_AND_enforcement(self): + action = "example:early_and_fail" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, False) + + def test_early_OR_enforcement(self): + action = "example:early_or_success" + result = policy.enforce(self.context, action, self.target) + self.assertEqual(result, True) + + def test_ignore_case_role_check(self): + lowercase_action = "example:lowercase_admin" + uppercase_action = "example:uppercase_admin" + admin_context = context.Context(user_id='fake', + tenant_id='fake', + roles=['AdMiN']) + result = policy.enforce(admin_context, lowercase_action, self.target) + self.assertEqual(result, True) + result = policy.enforce(admin_context, uppercase_action, self.target) + self.assertEqual(result, True) + + +class DefaultPolicyTestCase(unittest.TestCase): + + def setUp(self): + super(DefaultPolicyTestCase, self).setUp() + + self.rules = oslo_policy.Rules.from_dict({ + "default": '', + "example:exist": "!", + }) + + self._set_rules('default') + + self.context = context.Context(user_id='fake', + tenant_id='fake') + + def _set_rules(self, default_rule): + policy.reset() + policy.init(rules=self.rules, default_rule=default_rule, + use_conf=False) + + def test_policy_called(self): + result = policy.enforce(self.context, "example:exist", {}) + self.assertEqual(result, False) + + def test_not_found_policy_calls_default(self): + result = policy.enforce(self.context, "example:noexist", {}) + self.assertEqual(result, True) + + def test_default_not_found(self): + self._set_rules("default_noexist") + result = policy.enforce(self.context, "example:noexist", {}) + self.assertEqual(result, False)