Add new default roles in vnf-lcm API policies

This adds new defaults roles in vnf-lcm API policies.

Backward compatibility:
- Old Rules and Defaults will keep working as it is because they
  are added as deprecated rules and not removed. They are enabled
  by default. This means existing deployment will continue working
  in same way till deprecated rules are there and enabled by default.
- Legacy/current admin stays same and no change in their access permission
- Deprecation warning is added for old defaults so that operators will
  know that new defaults are available to opt-in.

New defaults(project personas):
- Add new defaults but they are disabled by defaults and operators can adopt them
  by enabling the oslo.policy config option. Basically add below in tacker.conf

  [oslo_policy]
  enforce_new_defaults=True

- All GET (read only) APIs are default to PROJECT_READER_OR_ADMIN
- Rest other APIs (write operations) are default to PROJECT_MEMBER_OR_ADMIN

Adding tests also to check permissions of new defaults.

Partial implement blueprint implement-project-personas

Change-Id: Id4b4b9f2ed4029352ccd6564f757ec7f6a69419d
This commit is contained in:
Ghanshyam Mann 2024-01-28 17:57:23 -08:00 committed by Ghanshyam
parent f421f25d65
commit 2de67c771b
5 changed files with 231 additions and 31 deletions

View File

@ -930,13 +930,18 @@ class VnfLcmController(wsgi.Controller):
@wsgi.expected_errors((http_client.FORBIDDEN, http_client.NOT_FOUND))
def show_lcm_op_occs(self, request, id):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'show_lcm_op_occs')
try:
vnf_lcm_op_occs = objects.VnfLcmOpOcc.get_by_id(context, id)
vnf_instance = self._get_vnf_instance(
context, vnf_lcm_op_occs.vnf_instance_id)
context.can(vnf_lcm_policies.VNFLCM % 'show_lcm_op_occs',
target={'project_id': vnf_instance.tenant_id})
except exceptions.NotFound as occ_e:
return self._make_problem_detail(str(occ_e),
404, title='VnfLcmOpOcc NOT FOUND')
except exceptions.PolicyNotAuthorized:
raise
except Exception as e:
LOG.error(traceback.format_exc())
return self._make_problem_detail(str(e),

View File

@ -36,7 +36,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'create',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Creates vnf instance.",
operations=[
{
@ -48,7 +48,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'instantiate',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Instantiate vnf instance.",
operations=[
{
@ -60,7 +60,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'show',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_READER_OR_ADMIN,
description="Query an Individual VNF instance.",
operations=[
{
@ -72,7 +72,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'terminate',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Terminate a VNF instance.",
operations=[
{
@ -84,7 +84,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'heal',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Heal a VNF instance.",
operations=[
{
@ -96,7 +96,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'scale',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Scale a VNF instance.",
operations=[
{
@ -108,7 +108,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'show_lcm_op_occs',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_READER_OR_ADMIN,
description="Query an Individual VNF LCM operation occurrence",
operations=[
{
@ -120,7 +120,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'list_lcm_op_occs',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_READER_OR_ADMIN,
description="Query VNF LCM operation occurrence",
operations=[
{
@ -132,7 +132,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'index',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_READER_OR_ADMIN,
description="Query VNF instances.",
operations=[
{
@ -144,7 +144,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'delete',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Delete an Individual VNF instance.",
operations=[
{
@ -156,7 +156,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'update_vnf',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Update an Individual VNF instance.",
operations=[
{
@ -168,7 +168,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'rollback',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Rollback a VNF instance.",
operations=[
{
@ -180,7 +180,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'cancel',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Cancel a VNF instance.",
operations=[
{
@ -192,7 +192,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'fail',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Fail a VNF instance.",
operations=[
{
@ -204,7 +204,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'retry',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Retry a VNF instance.",
operations=[
{
@ -216,7 +216,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name=VNFLCM % 'change_ext_conn',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.RULE_PROJECT_MEMBER_OR_ADMIN,
description="Change external VNF connectivity.",
operations=[
{

View File

@ -15,11 +15,13 @@
import copy
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils.fixture import uuidsentinel as uuids
from tacker.common import exceptions
from tacker import context
from tacker import policy
from tacker.tests.unit import base
@ -27,9 +29,26 @@ LOG = logging.getLogger(__name__)
class BasePolicyTest(base.TestCase):
# NOTE(gmann): Set this flag to True if you would like to tests the
# new behaviour of policy without deprecated rules.
# This means you can simulate the phase when policies completely
# switch to new behaviour by removing the support of old rules.
enforce_new_defaults = False
def setUp(self):
super(BasePolicyTest, self).setUp()
if self.enforce_new_defaults:
cfg.CONF.set_override('enforce_new_defaults', True,
group='oslo_policy')
# NOTE(gmann): oslo policy config option enforce_new_defaults
# is changed here which is used while loading the rule in
# oslo_policy.init() method that is why we need to reset the
# policy and initialize again so that rule will be re-loaded
# considering the enforce_new_defaults new value.
policy.reset()
policy.init()
self.addCleanup(policy.reset)
self.admin_project_id = uuids.admin_project_id
self.project_id = uuids.project_id
self.other_project_id = uuids.project_id_other

View File

@ -70,20 +70,39 @@ class VNFLCMPolicyTest(base_test.BasePolicyTest):
]
self.project_unauthorized_contexts = []
# Admin or any user in same project will be allowed to get,
# instantiate, terminate etc operations of VNF of their project.
# Admin or any user in same project will be allowed to instantiate,
# terminate etc write operations of VNF of their project.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context
]
# User from other project will not be allowed to get or perform
# the other project's VNF operations.
# User from other project will not be allowed to perform write
# operation on the other project's VNF operations.
self.project_member_unauthorized_contexts = [
self.other_project_member_context,
self.other_project_reader_context
]
# Admin or any user in same project will be allowed to get,
# VNF of their project.
self.project_reader_authorized_contexts = (
self.project_member_authorized_contexts)
# User from other project will not be allowed to get
# the other project's VNF.
self.project_reader_unauthorized_contexts = (
self.project_member_unauthorized_contexts)
# Below user's context will be allowed to list VNF or
# get VNF LCM operation occurrence.
self.get_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context, self.other_project_member_context,
self.other_project_reader_context
]
self.get_unauthorized_contexts = []
@mock.patch.object(vim_client.VimClient, "get_vim")
@mock.patch.object(objects.VnfPackage, 'get_by_id')
@mock.patch('tacker.api.vnflcm.v1.controller.'
@ -136,8 +155,8 @@ class VNFLCMPolicyTest(base_test.BasePolicyTest):
fields.VnfInstanceState.INSTANTIATED,
tenant_id=self.project_id)
rule_name = policies.VNFLCM % 'show'
self.common_policy_check(self.project_member_authorized_contexts,
self.project_member_unauthorized_contexts,
self.common_policy_check(self.project_reader_authorized_contexts,
self.project_reader_unauthorized_contexts,
rule_name,
self.controller.show,
req, uuidsentinel.instance_id)
@ -149,8 +168,8 @@ class VNFLCMPolicyTest(base_test.BasePolicyTest):
vnf_instance_2 = fakes.return_vnf_instance()
mock_vnf_list.return_value = [vnf_instance_1, vnf_instance_2]
rule_name = policies.VNFLCM % 'index'
self.common_policy_check(self.project_authorized_contexts,
self.project_unauthorized_contexts,
self.common_policy_check(self.get_authorized_contexts,
self.get_unauthorized_contexts,
rule_name,
self.controller.index,
req)
@ -386,13 +405,17 @@ class VNFLCMPolicyTest(base_test.BasePolicyTest):
req, uuidsentinel.instance_id)
@mock.patch.object(objects.VnfLcmOpOcc, "get_by_id")
def test_show_lcm_op_occs(self, mock_lcm_by_id):
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_show_lcm_op_occs(self, mock_vnf_by_id, mock_lcm_by_id):
req = fake_request.HTTPRequest.blank(
'/vnf_lcm_op_occs/%s' % uuidsentinel.instance_id)
mock_lcm_by_id.return_value = fakes.return_vnf_lcm_opoccs_obj()
mock_vnf_by_id.return_value = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED,
tenant_id=self.project_id)
rule_name = policies.VNFLCM % 'show_lcm_op_occs'
self.common_policy_check(self.project_authorized_contexts,
self.project_unauthorized_contexts,
self.common_policy_check(self.project_reader_authorized_contexts,
self.project_reader_unauthorized_contexts,
rule_name,
self.controller.show_lcm_op_occs,
req, uuidsentinel.instance_id)
@ -402,8 +425,8 @@ class VNFLCMPolicyTest(base_test.BasePolicyTest):
req = fake_request.HTTPRequest.blank(
'/vnflcm/v1/vnf_lcm_op_occs')
rule_name = policies.VNFLCM % 'list_lcm_op_occs'
self.common_policy_check(self.project_authorized_contexts,
self.project_unauthorized_contexts,
self.common_policy_check(self.get_authorized_contexts,
self.get_unauthorized_contexts,
rule_name,
self.controller.list_lcm_op_occs,
req)
@ -470,3 +493,149 @@ class VNFLCMScopeTypePolicyTest(VNFLCMPolicyTest):
self.system_reader_context, self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context]
self.get_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context, self.other_project_member_context,
self.other_project_reader_context
]
# With scope enabled, system scoped users will not be allowed
# to list VNF and get VNF LCM operation occurrence.
self.get_unauthorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
]
class VNFLCMNewDefaultsPolicyTest(VNFLCMPolicyTest):
"""Test VNF LCM APIs policies with new defaults enabled
This test class enable the new defaults means no legacy old rules
and check how permission level looks like.
"""
enforce_new_defaults = True
def setUp(self):
super(VNFLCMNewDefaultsPolicyTest, self).setUp()
# In new defaults, admin or member roles users will be allowed
# to create VNF or a few of the VNF operations in their project.
# Project reader will not be able to create VNF.
self.project_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.other_project_member_context,
]
# In new defaults, non admin or non member role (Project reader)
# user will not be able to create VNF.
self.project_unauthorized_contexts = [
self.project_reader_context, self.project_foo_context,
self.other_project_reader_context]
# In new defaults, all admin, project members will be allowed to
# instantiate, terminate etc write operations of VNF of their project.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context
]
# In new defaults, Project reader or any other non admin|member
# role (say foo role) will not be allowed to perform any write
# operation on VNF.
self.project_member_unauthorized_contexts = [
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context
]
# In new defaults, Project reader also can get VNF.
self.project_reader_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context,
self.project_reader_context
]
# In new defaults, non admin|member|reader role (say foo role)
# will not be able to get VNF.
self.project_reader_unauthorized_contexts = [
self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context
]
# In new defaults, project random role like foo will not
# be allowed.
self.get_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.other_project_member_context,
self.other_project_reader_context
]
self.get_unauthorized_contexts = [
self.project_foo_context,
]
class VNFLCMNewDefaultsWithScopePolicyTest(VNFLCMNewDefaultsPolicyTest):
"""Test VNF LCM APIs policies with new defaults rules and scope enabled
This means scope enabled and no legacy old rules. This is the end goal
when operators will enable scope and new defaults.
"""
def setUp(self):
super(VNFLCMNewDefaultsWithScopePolicyTest, self).setUp()
cfg.CONF.set_override('enforce_scope', True,
group='oslo_policy')
# With scope enable and no legacy rule, only project admin/member
# will be able to create VNF in their project.
self.project_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.other_project_member_context
]
# System scoped users will not be allowed.
self.project_unauthorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_reader_context, self.project_foo_context,
self.other_project_reader_context]
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context,
]
# System scoped users will not be allowed.
self.project_member_unauthorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context]
self.project_reader_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context,
self.project_reader_context
]
# System scoped users will not be allowed.
self.project_reader_unauthorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context
]
self.get_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.other_project_member_context,
self.other_project_reader_context
]
# With scope enabled, system scoped users will not be allowed
# to list VNF and get VNF LCM operation occurrence.
self.get_unauthorized_contexts = [
self.project_foo_context, self.system_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
]

View File

@ -2196,12 +2196,19 @@ class TestController(base.TestCase):
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM': FakeVNFMPlugin()})
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfLcmOpOcc, "get_by_id")
def test_show_lcm_op_occs(self, mock_get_by_id,
mock_get_service_plugins):
mock_vnf_instance, mock_get_service_plugins):
req = fake_request.HTTPRequest.blank(
'/vnf_lcm_op_occs/%s' % constants.UUID)
mock_get_by_id.return_value = fakes.return_vnf_lcm_opoccs_obj()
vnf_instance = fakes.return_vnf_instance(
fields.VnfInstanceState.NOT_INSTANTIATED,
task_state=fields.VnfInstanceTaskState.ERROR,
tenant_id=req.environ['tacker.context'].project_id)
mock_vnf_instance.return_value = vnf_instance
expected_result = fakes.VNFLCMOPOCC_RESPONSE
res_dict = self.controller.show_lcm_op_occs(req, constants.UUID)
self.assertEqual(expected_result, res_dict)