policy: clean-up

Registers in-code the last remaining policy rules.
Adds missing 'discoverable' rules. Without them,
the extension_info API can fail, as it tries to check the
os_compute_api:os_server_tags:discoverable rule. As it wasn't
previously registered, when listing the available extensions,
an exception of type PolicyNotRegistered is encountered.
In order to validate this, functional/api_sample_tests/test_extension_info.py
now runs without mocking policy.authorize.

Switches extension_info to context.can.
Switches nova.cells.filters to context.can.
Switches network.neutronv2.api to context.can.

Removes the rest of the entries in etc/policy.json.
Removes DefaultPolicyTestCase, as it tests the default
policy rule, which is not registered.
Removes rules from fake_policy.py that brings no value,
that are the same as the default values.
Removes extensions authorizer factories.
Removes nova.policy.enforce.

Change-Id: Ie7771768f4f3efe0edc787c12f297aa93d533d7e
Partially-Implements: bp policy-in-code
This commit is contained in:
Claudiu Belu 2016-06-24 17:14:27 +03:00
parent bc22a15e4e
commit 7d01bceaa0
20 changed files with 266 additions and 259 deletions

View File

@ -1,11 +1,2 @@
{
"context_is_admin": "role:admin",
"admin_or_owner": "is_admin:True or project_id:%(project_id)s",
"default": "rule:admin_or_owner",
"cells_scheduler_filter:TargetCellFilter": "is_admin:True",
"admin_api": "is_admin:True",
"network:attach_external_network": "is_admin:True"
}

View File

@ -22,6 +22,7 @@ from nova.api.openstack import extensions
from nova.api.openstack import wsgi
from nova import exception
from nova.i18n import _LE
from nova.policies import base as base_policies
from nova.policies import extensions as ext_policies
ALIAS = 'extensions'
@ -175,8 +176,9 @@ class ExtensionInfoController(wsgi.Controller):
)
for alias, ext in six.iteritems(self.extension_info.get_extensions()):
authorize = extensions.os_compute_soft_authorizer(alias)
if authorize(context, action='discoverable'):
action = ':'.join([
base_policies.COMPUTE_API, alias, 'discoverable'])
if context.can(action, fatal=False):
discoverable_extensions[alias] = ext
else:
LOG.debug("Filter out extension %s from discover list",

View File

@ -23,13 +23,11 @@ import six
import webob.dec
import webob.exc
import nova.api.openstack
from nova.api.openstack import wsgi
from nova import exception
from nova.i18n import _
from nova.i18n import _LE
from nova.i18n import _LW
import nova.policy
LOG = logging.getLogger(__name__)
@ -265,50 +263,6 @@ class ResourceExtension(object):
self.member_name = member_name
# This will be deprecated after policy cleanup finished
def core_authorizer(api_name, extension_name):
def authorize(context, target=None, action=None):
if target is None:
target = {'project_id': context.project_id,
'user_id': context.user_id}
if action is None:
act = '%s:%s' % (api_name, extension_name)
else:
act = '%s:%s:%s' % (api_name, extension_name, action)
nova.policy.enforce(context, act, target)
return authorize
def _soft_authorizer(hard_authorizer, api_name, extension_name):
hard_authorize = hard_authorizer(api_name, extension_name)
def authorize(context, target=None, action=None):
try:
hard_authorize(context, target=target, action=action)
return True
except exception.Forbidden:
return False
return authorize
# This will be deprecated after policy cleanup finished
def soft_core_authorizer(api_name, extension_name):
return _soft_authorizer(core_authorizer, api_name, extension_name)
# NOTE(alex_xu): The functions os_compute_authorizer and
# os_compute_soft_authorizer are used to policy enforcement for OpenStack
# Compute API, now Nova V2.1 REST API will invoke it.
#
def os_compute_authorizer(extension_name):
return core_authorizer('os_compute_api', extension_name)
def os_compute_soft_authorizer(extension_name):
return soft_core_authorizer('os_compute_api', extension_name)
@six.add_metaclass(abc.ABCMeta)
class V21APIExtensionBase(object):
"""Abstract base class for all v2.1 API extensions.

View File

@ -18,7 +18,6 @@ Cell scheduler filters
"""
from nova import filters
from nova import policy
class BaseCellFilter(filters.BaseFilter):
@ -31,9 +30,7 @@ class BaseCellFilter(filters.BaseFilter):
is the name of the filter class.
"""
name = 'cells_scheduler_filter:' + self.__class__.__name__
target = {'project_id': ctxt.project_id,
'user_id': ctxt.user_id}
return policy.enforce(ctxt, name, target, do_raise=False)
return ctxt.can(name, fatal=False)
def _filter_one(self, cell, filter_properties):
return self.cell_passes(cell, filter_properties)

View File

@ -26,7 +26,6 @@ from oslo_utils import excutils
from oslo_utils import uuidutils
import six
from nova.api.openstack import extensions
from nova.compute import utils as compute_utils
import nova.conf
from nova import exception
@ -40,14 +39,12 @@ from nova.pci import manager as pci_manager
from nova.pci import request as pci_request
from nova.pci import utils as pci_utils
from nova.pci import whitelist as pci_whitelist
from nova.policies import base as base_policies
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
soft_external_network_attach_authorize = extensions.soft_core_authorizer(
'network', 'attach_external_network')
_SESSION = None
_ADMIN_AUTH = None
@ -287,7 +284,8 @@ class API(base_api.NetworkAPI):
def _check_external_network_attach(self, context, nets):
"""Check if attaching to external network is permitted."""
if not soft_external_network_attach_authorize(context):
if not context.can(base_policies.NETWORK_ATTACH_EXTERNAL,
fatal=False):
for net in nets:
# Perform this check here rather than in validate_networks to
# ensure the check is performed every time

View File

@ -22,8 +22,10 @@ from nova.policies import attach_interfaces
from nova.policies import availability_zone
from nova.policies import baremetal_nodes
from nova.policies import base
from nova.policies import block_device_mapping
from nova.policies import block_device_mapping_v1
from nova.policies import cells
from nova.policies import cells_scheduler
from nova.policies import certificates
from nova.policies import cloudpipe
from nova.policies import config_drive
@ -53,6 +55,7 @@ from nova.policies import fping
from nova.policies import hide_server_addresses
from nova.policies import hosts
from nova.policies import hypervisors
from nova.policies import image_metadata
from nova.policies import image_size
from nova.policies import images
from nova.policies import instance_actions
@ -64,6 +67,7 @@ from nova.policies import lock_server
from nova.policies import migrate_server
from nova.policies import migrations
from nova.policies import multinic
from nova.policies import multiple_create
from nova.policies import networks
from nova.policies import networks_associate
from nova.policies import pause_server
@ -91,6 +95,7 @@ from nova.policies import suspend_server
from nova.policies import tenant_networks
from nova.policies import used_limits
from nova.policies import user_data
from nova.policies import versions
from nova.policies import virtual_interfaces
from nova.policies import volumes
from nova.policies import volumes_attachments
@ -107,8 +112,10 @@ def list_rules():
availability_zone.list_rules(),
baremetal_nodes.list_rules(),
base.list_rules(),
block_device_mapping.list_rules(),
block_device_mapping_v1.list_rules(),
cells.list_rules(),
cells_scheduler.list_rules(),
certificates.list_rules(),
cloudpipe.list_rules(),
config_drive.list_rules(),
@ -138,6 +145,7 @@ def list_rules():
hide_server_addresses.list_rules(),
hosts.list_rules(),
hypervisors.list_rules(),
image_metadata.list_rules(),
image_size.list_rules(),
images.list_rules(),
instance_actions.list_rules(),
@ -149,6 +157,7 @@ def list_rules():
migrate_server.list_rules(),
migrations.list_rules(),
multinic.list_rules(),
multiple_create.list_rules(),
networks.list_rules(),
networks_associate.list_rules(),
pause_server.list_rules(),
@ -176,6 +185,7 @@ def list_rules():
tenant_networks.list_rules(),
used_limits.list_rules(),
user_data.list_rules(),
versions.list_rules(),
virtual_interfaces.list_rules(),
volumes.list_rules(),
volumes_attachments.list_rules()

View File

@ -12,6 +12,9 @@
from oslo_policy import policy
COMPUTE_API = 'os_compute_api'
NETWORK_ATTACH_EXTERNAL = 'network:attach_external_network'
RULE_ADMIN_OR_OWNER = 'rule:admin_or_owner'
RULE_ADMIN_API = 'rule:admin_api'
RULE_ANY = '@'
@ -21,6 +24,7 @@ rules = [
policy.RuleDefault('admin_or_owner',
'is_admin:True or project_id:%(project_id)s'),
policy.RuleDefault('admin_api', 'is_admin:True'),
policy.RuleDefault(NETWORK_ATTACH_EXTERNAL, 'is_admin:True'),
]

View File

@ -0,0 +1,32 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from nova.policies import base
POLICY_ROOT = 'os_compute_api:os-block-device-mapping:%s'
block_device_mapping_policies = [
policy.RuleDefault(
name=POLICY_ROOT % 'discoverable',
check_str=base.RULE_ANY),
]
def list_rules():
return block_device_mapping_policies

View File

@ -0,0 +1,33 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
POLICY_ROOT = 'cells_scheduler_filter:%s'
cells_scheduler_policies = [
policy.RuleDefault(
name=POLICY_ROOT % 'DifferentCellFilter',
check_str='is_admin:True'),
policy.RuleDefault(
name=POLICY_ROOT % 'TargetCellFilter',
check_str='is_admin:True'),
]
def list_rules():
return cells_scheduler_policies

View File

@ -0,0 +1,32 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from nova.policies import base
POLICY_ROOT = 'os_compute_api:image-metadata:%s'
image_metadata_policies = [
policy.RuleDefault(
name=POLICY_ROOT % 'discoverable',
check_str=base.RULE_ANY),
]
def list_rules():
return image_metadata_policies

View File

@ -0,0 +1,32 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from nova.policies import base
POLICY_ROOT = 'os_compute_api:os-multiple-create:%s'
multiple_create_policies = [
policy.RuleDefault(
name=POLICY_ROOT % 'discoverable',
check_str=base.RULE_ANY),
]
def list_rules():
return multiple_create_policies

View File

@ -40,6 +40,9 @@ server_tags_policies = [
policy.RuleDefault(
name=POLICY_ROOT % 'show',
check_str=base.RULE_ANY),
policy.RuleDefault(
name=POLICY_ROOT % 'discoverable',
check_str=base.RULE_ANY),
]

View File

@ -34,6 +34,9 @@ servers_migrations_policies = [
policy.RuleDefault(
name=POLICY_ROOT % 'index',
check_str=base.RULE_ADMIN_API),
policy.RuleDefault(
name='os_compute_api:server-migrations:discoverable',
check_str=base.RULE_ANY),
]

View File

@ -32,7 +32,7 @@ shelve_policies = [
name=POLICY_ROOT % 'shelve_offload',
check_str=base.RULE_ADMIN_API),
policy.RuleDefault(
name=POLICY_ROOT % 'shelve:discoverable',
name=POLICY_ROOT % 'discoverable',
check_str=base.RULE_ANY),
]

32
nova/policies/versions.py Normal file
View File

@ -0,0 +1,32 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from nova.policies import base
POLICY_ROOT = 'os_compute_api:versions:%s'
versions_policies = [
policy.RuleDefault(
name=POLICY_ROOT % 'discoverable',
check_str=base.RULE_ANY),
]
def list_rules():
return versions_policies

View File

@ -73,46 +73,6 @@ def set_rules(rules, overwrite=True, use_conf=False):
_ENFORCER.set_rules(rules, overwrite, use_conf)
# TODO(alaski): All users of this method should move over to authorize() as
# policies are registered and ultimately this should be removed.
def enforce(context, action, target, do_raise=True, exc=None):
"""Verifies that the action is valid on the target in this context.
:param context: nova context
:param action: string representing the action to be checked
this should be colon separated for clarity.
i.e. ``compute:create_instance``,
``compute:attach_volume``,
``volume:attach_volume``
:param target: dictionary representing the object of the action
for object creation this should be a dictionary representing the
location of the object e.g. ``{'project_id': context.project_id}``
:param do_raise: if True (the default), raises PolicyNotAuthorized;
if False, returns False
:raises nova.exception.PolicyNotAuthorized: if verification fails
and do_raise is True.
:return: returns a non-False value (not necessarily "True") if
authorized, and the exact value False if not authorized and
do_raise is False.
"""
init()
credentials = context.to_dict()
if not exc:
exc = exception.PolicyNotAuthorized
try:
result = _ENFORCER.enforce(action, target, credentials,
do_raise=do_raise, exc=exc, action=action)
except Exception:
credentials.pop('auth_token', None)
with excutils.save_and_reraise_exception():
LOG.debug('Policy check for %(action)s failed with credentials '
'%(credentials)s',
{'action': action, 'credentials': credentials})
return result
def authorize(context, action, target, do_raise=True, exc=None):
"""Verifies that the action is valid on the target in this context.
@ -128,7 +88,7 @@ def authorize(context, action, target, do_raise=True, exc=None):
:param do_raise: if True (the default), raises PolicyNotAuthorized;
if False, returns False
:param exc: Class of the exception to raise if the check fails.
Any remaining arguments passed to :meth:`enforce` (both
Any remaining arguments passed to :meth:`authorize` (both
positional and keyword arguments) will be passed to
the exception class. If not specified,
:class:`PolicyNotAuthorized` will be used.

View File

@ -13,24 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nova.api.openstack import extensions as api_extensions
from nova.tests.functional.api_sample_tests import api_sample_base
def fake_soft_extension_authorizer(extension_name, core=False):
def authorize(context, action=None):
return True
return authorize
class ExtensionInfoAllSamplesJsonTest(api_sample_base.ApiSampleTestBaseV21):
sample_dir = "extension-info"
@mock.patch.object(api_extensions, 'os_compute_soft_authorizer')
def test_list_extensions(self, soft_auth):
soft_auth.side_effect = fake_soft_extension_authorizer
def test_list_extensions(self):
response = self._do_get('extensions')
# The full extension list is one of the places that things are
# different between the API versions and the legacy vs. new
@ -45,8 +34,6 @@ class ExtensionInfoAllSamplesJsonTest(api_sample_base.ApiSampleTestBaseV21):
class ExtensionInfoSamplesJsonTest(api_sample_base.ApiSampleTestBaseV21):
sample_dir = "extension-info"
@mock.patch.object(api_extensions, 'os_compute_soft_authorizer')
def test_get_extensions(self, soft_auth):
soft_auth.side_effect = fake_soft_extension_authorizer
def test_get_extensions(self):
response = self._do_get('extensions/os-agents')
self._verify_response('extensions-get-resp', {}, response, 200)

View File

@ -14,6 +14,7 @@
import copy
import mock
import webob
from nova.api.openstack.compute import extension_info
@ -57,15 +58,8 @@ simulated_extension_list = {
}
def fake_policy_enforce(context, action, target, do_raise=True):
return True
def fake_policy_enforce_selective(context, action, target, do_raise=True):
if action == 'os_compute_api:ext1-alias:discoverable':
raise exception.Forbidden
else:
return True
def fake_policy_authorize_selective(context, action, target):
return action != 'os_compute_api:ext1-alias:discoverable'
class ExtensionInfoTest(test.NoDBTestCase):
@ -76,8 +70,8 @@ class ExtensionInfoTest(test.NoDBTestCase):
ext_info.extensions = fake_extensions
self.controller = extension_info.ExtensionInfoController(ext_info)
@mock.patch.object(policy, 'authorize', mock.Mock(return_value=True))
def test_extension_info_list(self):
self.stubs.Set(policy, 'enforce', fake_policy_enforce)
req = fakes.HTTPRequestV21.blank('/extensions')
res_dict = self.controller.index(req)
# NOTE(sdague): because of hardcoded extensions the count is
@ -98,8 +92,8 @@ class ExtensionInfoTest(test.NoDBTestCase):
self.assertEqual(e['links'], [])
self.assertEqual(6, len(e))
@mock.patch.object(policy, 'authorize', mock.Mock(return_value=True))
def test_extension_info_show(self):
self.stubs.Set(policy, 'enforce', fake_policy_enforce)
req = fakes.HTTPRequestV21.blank('/extensions/ext1-alias')
res_dict = self.controller.show(req, 'ext1-alias')
self.assertEqual(1, len(res_dict))
@ -113,8 +107,9 @@ class ExtensionInfoTest(test.NoDBTestCase):
self.assertEqual(res_dict['extension']['links'], [])
self.assertEqual(6, len(res_dict['extension']))
def test_extension_info_list_not_all_discoverable(self):
self.stubs.Set(policy, 'enforce', fake_policy_enforce_selective)
@mock.patch.object(policy, 'authorize')
def test_extension_info_list_not_all_discoverable(self, mock_authorize):
mock_authorize.side_effect = fake_policy_authorize_selective
req = fakes.HTTPRequestV21.blank('/extensions')
res_dict = self.controller.index(req)
# NOTE(sdague): because of hardcoded extensions the count is
@ -144,7 +139,9 @@ class ExtensionInfoV21Test(test.NoDBTestCase):
ext_info = extension_info.LoadedExtensionInfo()
ext_info.extensions = simulated_extension_list
self.controller = extension_info.ExtensionInfoController(ext_info)
self.stubs.Set(policy, 'enforce', fake_policy_enforce)
patcher = mock.patch.object(policy, 'authorize', return_value=True)
patcher.start()
self.addCleanup(patcher.stop)
def test_extension_info_list(self):
req = fakes.HTTPRequest.blank('/extensions')
@ -210,7 +207,6 @@ class ExtensionInfoV21Test(test.NoDBTestCase):
class ExtensionInfoPolicyEnforcementV21(test.NoDBTestCase):
def setUp(self):
super(ExtensionInfoPolicyEnforcementV21, self).setUp()
ext_info = extension_info.LoadedExtensionInfo()

View File

@ -15,37 +15,18 @@
policy_data = """
{
"admin_api": "is_admin:True",
"cells_scheduler_filter:TargetCellFilter": "is_admin:True",
"context_is_admin": "role:admin or role:administrator",
"os_compute_api:servers:show:host_status": "",
"os_compute_api:servers:migrations:delete": "rule:admin_api",
"os_compute_api:servers:migrations:force_complete": "",
"os_compute_api:servers:migrations:index": "rule:admin_api",
"os_compute_api:servers:migrations:show": "rule:admin_api",
"os_compute_api:os-admin-actions:inject_network_info": "",
"os_compute_api:os-admin-actions:reset_network": "",
"os_compute_api:os-admin-actions:reset_state": "",
"os_compute_api:os-admin-password": "",
"os_compute_api:os-aggregates:index": "rule:admin_api",
"os_compute_api:os-aggregates:create": "rule:admin_api",
"os_compute_api:os-aggregates:show": "rule:admin_api",
"os_compute_api:os-aggregates:update": "rule:admin_api",
"os_compute_api:os-aggregates:delete": "rule:admin_api",
"os_compute_api:os-aggregates:add_host": "rule:admin_api",
"os_compute_api:os-aggregates:remove_host": "rule:admin_api",
"os_compute_api:os-aggregates:set_metadata": "rule:admin_api",
"os_compute_api:os-agents": "",
"os_compute_api:os-attach-interfaces": "",
"os_compute_api:os-baremetal-nodes": "",
"os_compute_api:os-cells": "",
"os_compute_api:os-cells:create": "rule:admin_api",
"os_compute_api:os-cells:delete": "rule:admin_api",
"os_compute_api:os-cells:update": "rule:admin_api",
"os_compute_api:os-cells:sync_instances": "rule:admin_api",
"os_compute_api:os-certificates:create": "",
"os_compute_api:os-certificates:show": "",
"os_compute_api:os-cloudpipe": "",
@ -58,7 +39,6 @@ policy_data = """
"os_compute_api:os-consoles:show": "",
"os_compute_api:os-create-backup": "",
"os_compute_api:os-deferred-delete": "",
"os_compute_api:os-evacuate": "is_admin:True",
"os_compute_api:os-extended-server-attributes": "",
"os_compute_api:os-extended-status": "",
"os_compute_api:os-extended-availability-zone": "",
@ -73,9 +53,6 @@ policy_data = """
"os_compute_api:os-flavor-rxtx": "",
"os_compute_api:os-flavor-extra-specs:index": "",
"os_compute_api:os-flavor-extra-specs:show": "",
"os_compute_api:os-flavor-extra-specs:create": "is_admin:True",
"os_compute_api:os-flavor-extra-specs:update": "is_admin:True",
"os_compute_api:os-flavor-extra-specs:delete": "is_admin:True",
"os_compute_api:os-flavor-manage": "",
"os_compute_api:os-floating-ip-dns": "",
"os_compute_api:os-floating-ip-dns:domain:update": "",
@ -84,27 +61,14 @@ policy_data = """
"os_compute_api:os-floating-ips": "",
"os_compute_api:os-floating-ips-bulk": "",
"os_compute_api:os-fping": "",
"os_compute_api:os-fping:all_tenants": "is_admin:True",
"os_compute_api:os-hide-server-addresses": "",
"os_compute_api:os-hosts": "rule:admin_api",
"os_compute_api:os-hypervisors": "rule:admin_api",
"os_compute_api:image-size": "",
"os_compute_api:os-instance-actions": "",
"os_compute_api:os-instance-actions:events": "is_admin:True",
"os_compute_api:os-instance-usage-audit-log": "",
"os_compute_api:os-keypairs": "",
"os_compute_api:os-keypairs:index":
"rule:admin_api or user_id:%(user_id)s",
"os_compute_api:os-keypairs:show":
"rule:admin_api or user_id:%(user_id)s",
"os_compute_api:os-keypairs:create":
"rule:admin_api or user_id:%(user_id)s",
"os_compute_api:os-keypairs:delete":
"rule:admin_api or user_id:%(user_id)s",
"os_compute_api:os-lock-server:lock": "",
"os_compute_api:os-lock-server:unlock": "",
"os_compute_api:os-lock-server:unlock:unlock_override": "rule:admin_api",
"os_compute_api:os-migrate-server:migrate": "",
"os_compute_api:os-migrate-server:migrate_live": "",
"os_compute_api:os-multinic": "",
@ -155,19 +119,14 @@ policy_data = """
"os_compute_api:os-volumes-attachments:delete": "",
"os_compute_api:os-availability-zone:list": "",
"os_compute_api:os-availability-zone:detail": "",
"os_compute_api:os-used-limits": "is_admin:True",
"os_compute_api:limits": "",
"os_compute_api:os-migrations:index": "is_admin:True",
"os_compute_api:os-assisted-volume-snapshots:create": "",
"os_compute_api:os-assisted-volume-snapshots:delete": "",
"os_compute_api:os-console-auth-tokens": "is_admin:True",
"os_compute_api:os-server-external-events:create": "rule:admin_api",
"os_compute_api:server-metadata:create": "",
"os_compute_api:server-metadata:update": "",
"os_compute_api:server-metadata:update_all": "",
"os_compute_api:server-metadata:delete": "",
"os_compute_api:server-metadata:show": "",
"os_compute_api:server-metadata:index": "",
"network:attach_external_network": "rule:admin_api"
"os_compute_api:server-metadata:index": ""
}
"""

View File

@ -49,93 +49,101 @@ class PolicyFileTestCase(test.NoDBTestCase):
# is_admin or not. As a side-effect, policy reset is needed here
# to flush existing policy cache.
policy.reset()
policy.init()
rule = oslo_policy.RuleDefault('example:test', "")
policy._ENFORCER.register_defaults([rule])
action = "example:test"
with open(tmpfilename, "w") as policyfile:
policyfile.write('{"example:test": ""}')
policy.enforce(self.context, action, self.target)
policy.authorize(self.context, action, self.target)
with open(tmpfilename, "w") as policyfile:
policyfile.write('{"example:test": "!"}')
policy._ENFORCER.load_rules(True)
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.context, action, self.target)
class PolicyTestCase(test.NoDBTestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
rules = {
"true": '@',
"example:allowed": '@',
"example:denied": "!",
"example:get_http": "http://www.example.com",
"example:my_file": "role:compute_admin or "
"project_id:%(project_id)s",
"example:early_and_fail": "! and @",
"example:early_or_success": "@ or !",
"example:lowercase_admin": "role:admin or role:sysadmin",
"example:uppercase_admin": "role:ADMIN or role:sysadmin",
}
rules = [
oslo_policy.RuleDefault("true", '@'),
oslo_policy.RuleDefault("example:allowed", '@'),
oslo_policy.RuleDefault("example:denied", "!"),
oslo_policy.RuleDefault("example:get_http",
"http://www.example.com"),
oslo_policy.RuleDefault("example:my_file",
"role:compute_admin or "
"project_id:%(project_id)s"),
oslo_policy.RuleDefault("example:early_and_fail", "! and @"),
oslo_policy.RuleDefault("example:early_or_success", "@ or !"),
oslo_policy.RuleDefault("example:lowercase_admin",
"role:admin or role:sysadmin"),
oslo_policy.RuleDefault("example:uppercase_admin",
"role:ADMIN or role:sysadmin"),
]
policy.reset()
policy.init()
policy.set_rules(oslo_policy.Rules.from_dict(rules))
# before a policy rule can be used, its default has to be registered.
policy._ENFORCER.register_defaults(rules)
self.context = context.RequestContext('fake', 'fake', roles=['member'])
self.target = {}
def test_enforce_nonexistent_action_throws(self):
def test_authorize_nonexistent_action_throws(self):
action = "example:noexist"
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(oslo_policy.PolicyNotRegistered, policy.authorize,
self.context, action, self.target)
def test_enforce_bad_action_throws(self):
def test_authorize_bad_action_throws(self):
action = "example:denied"
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.context, action, self.target)
def test_enforce_bad_action_noraise(self):
def test_authorize_bad_action_noraise(self):
action = "example:denied"
result = policy.enforce(self.context, action, self.target, False)
result = policy.authorize(self.context, action, self.target, False)
self.assertFalse(result)
def test_enforce_good_action(self):
def test_authorize_good_action(self):
action = "example:allowed"
result = policy.enforce(self.context, action, self.target)
result = policy.authorize(self.context, action, self.target)
self.assertTrue(result)
@requests_mock.mock()
def test_enforce_http_true(self, req_mock):
def test_authorize_http_true(self, req_mock):
req_mock.post('http://www.example.com/',
text='True')
action = "example:get_http"
target = {}
result = policy.enforce(self.context, action, target)
result = policy.authorize(self.context, action, target)
self.assertTrue(result)
@requests_mock.mock()
def test_enforce_http_false(self, req_mock):
def test_authorize_http_false(self, req_mock):
req_mock.post('http://www.example.com/',
text='False')
action = "example:get_http"
target = {}
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.context, action, target)
def test_templatized_enforcement(self):
def test_templatized_authorization(self):
target_mine = {'project_id': 'fake'}
target_not_mine = {'project_id': 'another'}
action = "example:my_file"
policy.enforce(self.context, action, target_mine)
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
policy.authorize(self.context, action, target_mine)
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.context, action, target_not_mine)
def test_early_AND_enforcement(self):
def test_early_AND_authorization(self):
action = "example:early_and_fail"
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.context, action, self.target)
def test_early_OR_enforcement(self):
def test_early_OR_authorization(self):
action = "example:early_or_success"
policy.enforce(self.context, action, self.target)
policy.authorize(self.context, action, self.target)
def test_ignore_case_role_check(self):
lowercase_action = "example:lowercase_admin"
@ -145,40 +153,8 @@ class PolicyTestCase(test.NoDBTestCase):
admin_context = context.RequestContext('admin',
'fake',
roles=['AdMiN'])
policy.enforce(admin_context, lowercase_action, self.target)
policy.enforce(admin_context, uppercase_action, self.target)
class DefaultPolicyTestCase(test.NoDBTestCase):
def setUp(self):
super(DefaultPolicyTestCase, self).setUp()
self.rules = {
"default": '',
"example:exist": "!",
}
self._set_rules('default')
self.context = context.RequestContext('fake', 'fake')
def _set_rules(self, default_rule):
policy.reset()
rules = oslo_policy.Rules.from_dict(self.rules)
policy.init(rules=rules, default_rule=default_rule, use_conf=False)
def test_policy_called(self):
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.context, "example:exist", {})
def test_not_found_policy_calls_default(self):
policy.enforce(self.context, "example:noexist", {})
def test_default_not_found(self):
self._set_rules("default_noexist")
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.context, "example:noexist", {})
policy.authorize(admin_context, lowercase_action, self.target)
policy.authorize(admin_context, uppercase_action, self.target)
class IsAdminCheckTestCase(test.NoDBTestCase):
@ -225,12 +201,12 @@ class AdminRolePolicyTestCase(test.NoDBTestCase):
self.actions = policy.get_rules().keys()
self.target = {}
def test_enforce_admin_actions_with_nonadmin_context_throws(self):
def test_authorize_admin_actions_with_nonadmin_context_throws(self):
"""Check if non-admin context passed to admin actions throws
Policy not authorized exception
"""
for action in self.actions:
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.context, action, self.target)
@ -246,6 +222,7 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
self.fake_policy = jsonutils.loads(fake_policy.policy_data)
self.admin_only_rules = (
"cells_scheduler_filter:DifferentCellFilter",
"cells_scheduler_filter:TargetCellFilter",
"network:attach_external_network",
"os_compute_api:servers:create:forced_host",
@ -321,7 +298,6 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
)
self.admin_or_owner_rules = (
"default",
"os_compute_api:servers:start",
"os_compute_api:servers:stop",
"os_compute_api:servers:trigger_crash_dump",
@ -422,6 +398,7 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
"os_compute_api:os-agents:discoverable",
"os_compute_api:os-attach-interfaces:discoverable",
"os_compute_api:os-baremetal-nodes:discoverable",
"os_compute_api:os-block-device-mapping:discoverable",
"os_compute_api:os-block-device-mapping-v1:discoverable",
"os_compute_api:os-cells:discoverable",
"os_compute_api:os-certificates:discoverable",
@ -453,6 +430,7 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
"os_compute_api:os-hosts:discoverable",
"os_compute_api:os-hypervisors:discoverable",
"os_compute_api:images:discoverable",
"os_compute_api:image-metadata:discoverable",
"os_compute_api:image-size:discoverable",
"os_compute_api:os-instance-actions:discoverable",
"os_compute_api:os-instance-usage-audit-log:discoverable",
@ -462,6 +440,7 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
"os_compute_api:os-lock-server:discoverable",
"os_compute_api:os-migrate-server:discoverable",
"os_compute_api:os-multinic:discoverable",
"os_compute_api:os-multiple-create:discoverable",
"os_compute_api:os-networks:discoverable",
"os_compute_api:os-networks-associate:discoverable",
"os_compute_api:os-pause-server:discoverable",
@ -478,14 +457,16 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
"os_compute_api:os-server-groups:discoverable",
"os_compute_api:os-server-tags:delete",
"os_compute_api:os-server-tags:delete_all",
"os_compute_api:os-server-tags:discoverable",
"os_compute_api:os-server-tags:index",
"os_compute_api:os-server-tags:show",
"os_compute_api:os-server-tags:update",
"os_compute_api:os-server-tags:update_all",
"os_compute_api:os-services:discoverable",
"os_compute_api:server-metadata:discoverable",
"os_compute_api:server-migrations:discoverable",
"os_compute_api:servers:discoverable",
"os_compute_api:os-shelve:shelve:discoverable",
"os_compute_api:os-shelve:discoverable",
"os_compute_api:os-simple-tenant-usage:discoverable",
"os_compute_api:os-suspend-server:discoverable",
"os_compute_api:os-tenant-networks:discoverable",
@ -499,6 +480,7 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
"os_compute_api:os-assisted-volume-snapshots:discoverable",
"os_compute_api:os-console-auth-tokens:discoverable",
"os_compute_api:os-server-external-events:discoverable",
"os_compute_api:versions:discoverable",
)
def test_all_rules_in_sample_file(self):
@ -510,21 +492,21 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
def test_admin_only_rules(self):
for rule in self.admin_only_rules:
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.non_admin_context, rule, self.target)
policy.enforce(self.admin_context, rule, self.target)
policy.authorize(self.admin_context, rule, self.target)
def test_non_admin_only_rules(self):
for rule in self.non_admin_only_rules:
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.admin_context, rule, self.target)
policy.enforce(self.non_admin_context, rule, self.target)
policy.authorize(self.non_admin_context, rule, self.target)
def test_admin_or_owner_rules(self):
for rule in self.admin_or_owner_rules:
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
self.non_admin_context, rule, self.target)
policy.enforce(self.non_admin_context, rule,
policy.authorize(self.non_admin_context, rule,
{'project_id': 'fake', 'user_id': 'fake'})
def test_no_empty_rules(self):
@ -542,7 +524,7 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
def test_allow_all_rules(self):
for rule in self.allow_all_rules:
policy.enforce(self.non_admin_context, rule, self.target)
policy.authorize(self.non_admin_context, rule, self.target)
def test_rule_missing(self):
rules = policy.get_rules()