Merge "Add testing framework for RBAC"

This commit is contained in:
Zuul 2024-03-06 17:04:42 +00:00 committed by Gerrit Code Review
commit f280f5a4d0
3 changed files with 254 additions and 0 deletions

View File

View File

@ -0,0 +1,149 @@
# Copyright (C) 2024 NEC, Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_log import log as logging
from oslo_utils.fixture import uuidsentinel as uuids
from tacker.common import exceptions
from tacker import context
from tacker.tests.unit import base
LOG = logging.getLogger(__name__)
class BasePolicyTest(base.TestCase):
def setUp(self):
super(BasePolicyTest, self).setUp()
self.admin_project_id = uuids.admin_project_id
self.project_id = uuids.project_id
self.other_project_id = uuids.project_id_other
# Create the user context with implied roles so that we can test
# each user's context for RBAC permission.
#
# Legacy admin user
self.legacy_admin_context = context.Context(
user_id="legacy_admin", project_id=self.admin_project_id,
roles=['admin', 'member', 'reader'])
# project scoped users
self.project_admin_context = context.Context(
user_id="project_admin", project_id=self.project_id,
roles=['admin', 'member', 'reader'])
self.project_member_context = context.Context(
user_id="project_member", project_id=self.project_id,
roles=['member', 'reader'])
self.project_reader_context = context.Context(
user_id="project_reader", project_id=self.project_id,
roles=['reader'])
self.project_foo_context = context.Context(
user_id="project_foo", project_id=self.project_id,
roles=['foo'])
self.other_project_member_context = context.Context(
user_id="other_project_member",
project_id=self.other_project_id,
roles=['member', 'reader'])
self.other_project_reader_context = context.Context(
user_id="other_project_member",
project_id=self.other_project_id,
roles=['reader'])
self.all_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context, self.other_project_member_context,
self.other_project_reader_context
]
def common_policy_check(self, authorized_contexts,
unauthorized_contexts, rule_name,
func, req, *arg, **kwarg):
# NOTE(gmann): When fatal=False is passed as a parameter
# then this function does not raise error instead return
# the responses for all contexts.
fatal = kwarg.pop('fatal', True)
authorized_response = []
unauthorize_response = []
def ensure_return(req, *args, **kwargs):
return func(req, *arg, **kwargs)
def ensure_raises(req, *args, **kwargs):
exc = self.assertRaises(
exceptions.PolicyNotAuthorized, func, req, *arg, **kwarg)
# NOTE(gmann): In case of multi-policy APIs, PolicyNotAuthorized
# exception can be raised from either of the policy so checking
# the error message, which includes the rule name, can mismatch.
# Tests verifying the multi policy can pass rule_name as None
# to skip the error message assert.
if rule_name is not None:
self.assertEqual(
"Policy doesn't allow %s to be performed." %
rule_name, exc.format_message())
# Verify all the context having allowed scope and roles pass
# the policy check.
for auth_context in authorized_contexts:
LOG.info("Testing authorized user: %s", auth_context.user_id)
req.environ['tacker.context'] = auth_context
_args = copy.deepcopy(arg)
_kwargs = copy.deepcopy(kwarg)
if not fatal:
authorized_response.append(
ensure_return(req, *_args, **_kwargs))
else:
func(req, *_args, **_kwargs)
# Verify all the context not having allowed scope or roles fail
# the policy check.
for unauth_context in unauthorized_contexts:
LOG.info("Testing unauthorized user: %s", unauth_context.user_id)
req.environ['tacker.context'] = unauth_context
_args = copy.deepcopy(arg)
_kwargs = copy.deepcopy(kwarg)
if not fatal:
try:
unauthorize_response.append(
ensure_return(req, *_args, **_kwargs))
# NOTE(gmann): We need to ignore the PolicyNotAuthorized
# exception here so that we can add the correct response
# in unauthorize_response for the case of fatal=False.
# This handle the case of multi policy checks where tests
# are verifying the second policy via the response of
# fatal-False and ignoring the response checks where the
# first policy itself fail to pass (even test override the
# first policy to allow for everyone but still, scope
# checks can leads to PolicyNotAuthorized error).
# For example: flavor extra specs policy for GET flavor
# API. In that case, flavor extra spec policy is checked
# after the GET flavor policy. So any context failing on
# GET flavor will raise the PolicyNotAuthorized and for
# that case we do not have any way to verify the flavor
# extra specs so skip that context to check in test.
except exceptions.PolicyNotAuthorized:
continue
else:
ensure_raises(req, *_args, **_kwargs)
return authorized_response, unauthorize_response

View File

@ -0,0 +1,105 @@
# Copyright (C) 2024 NEC, Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from tacker.api.vnflcm.v1 import controller
from tacker import objects
from tacker.policies import vnf_lcm as policies
from tacker.tests.unit import fake_request
import tacker.tests.unit.nfvo.test_nfvo_plugin as test_nfvo_plugin
from tacker.tests.unit.policies import base as base_test
from tacker.tests.unit.vnflcm import fakes
from tacker.tests import uuidsentinel
from tacker.vnfm import vim_client
class VNFLCMPolicyTest(base_test.BasePolicyTest):
"""Test VNF LCM APIs policies with all possible context.
This class defines the set of context with different roles
which are allowed and not allowed to pass the policy checks.
With those set of context, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(VNFLCMPolicyTest, self).setUp()
self.patcher = mock.patch(
'tacker.manager.TackerManager.get_service_plugins',
return_value={'VNFM': test_nfvo_plugin.FakeVNFMPlugin()})
self.mock_manager = self.patcher.start()
self.controller = controller.VnfLcmController()
self.vim_info = {
'vim_id': uuidsentinel.vnfd_id,
'vim_type': 'test',
'vim_auth': {'username': 'test', 'password': 'test'},
'placement_attr': {'region': 'TestRegionOne'},
'tenant': 'test',
'extra': {}
}
# Below user's context will be allowed to create the VNF
# in their project.
self.create_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context, self.other_project_member_context,
self.other_project_reader_context
]
self.create_unauthorized_contexts = []
@mock.patch.object(vim_client.VimClient, "get_vim")
@mock.patch.object(objects.VnfPackage, 'get_by_id')
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._create_vnf')
@mock.patch.object(objects.vnf_package.VnfPackage, 'save')
@mock.patch.object(objects.vnf_instance, '_vnf_instance_update')
@mock.patch.object(objects.vnf_instance, '_vnf_instance_create')
@mock.patch.object(objects.vnf_package_vnfd.VnfPackageVnfd, 'get_by_id')
def test_create_vnf(
self, mock_get_by_id,
mock_vnf_instance_create,
mock_vnf_instance_update,
mock_package_save,
mock_private_create_vnf,
mock_vnf_package_get_by_id,
mock_get_vim):
mock_get_vim.return_value = self.vim_info
mock_get_by_id.return_value = fakes.return_vnf_package_vnfd()
mock_vnf_package_get_by_id.return_value = \
fakes.return_vnf_package_with_deployment_flavour()
updates = {'vnfd_id': uuidsentinel.vnfd_id,
'vnf_instance_description': 'SampleVnf Description',
'vnf_instance_name': 'SampleVnf',
'vnf_pkg_id': uuidsentinel.vnf_pkg_id,
'vnf_metadata': {'key': 'value'}}
mock_vnf_instance_create.return_value = (
fakes.return_vnf_instance_model(**updates))
mock_vnf_instance_update.return_value = (
fakes.return_vnf_instance_model(**updates))
body = {'vnfdId': uuidsentinel.vnfd_id,
'vnfInstanceName': 'SampleVnf',
'vnfInstanceDescription': 'SampleVnf Description',
'metadata': {'key': 'value'}}
req = fake_request.HTTPRequest.blank('/vnf_instances')
rule_name = policies.VNFLCM % 'create'
self.common_policy_check(self.create_authorized_contexts,
self.create_unauthorized_contexts,
rule_name,
self.controller.create,
req, body=body)