Merge "Add test coverage of existing lock server policies"

This commit is contained in:
Zuul 2020-04-01 17:09:49 +00:00 committed by Gerrit Code Review
commit 5c24cdf9cb
2 changed files with 146 additions and 135 deletions

View File

@ -19,18 +19,14 @@ import six
from nova.api.openstack import api_version_request
from nova.api.openstack import common
from nova.api.openstack.compute import lock_server as lock_server_v21
from nova import context
from nova import exception
from nova import test
from nova.tests.unit.api.openstack.compute import admin_only_action_common
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit import fake_instance
class LockServerTestsV21(admin_only_action_common.CommonTests):
lock_server = lock_server_v21
controller_name = 'LockServerController'
authorization_error = exception.PolicyNotAuthorized
_api_version = '2.1'
def setUp(self):
@ -52,44 +48,6 @@ class LockServerTestsV21(admin_only_action_common.CommonTests):
self._test_actions_with_non_existed_instance(['_lock', '_unlock'],
body_map=body_map)
def test_unlock_not_authorized(self):
instance = self._stub_instance_get()
body = {}
with mock.patch.object(
self.compute_api, 'unlock',
side_effect=exception.PolicyNotAuthorized(
action='unlock')) as mock_unlock:
self.assertRaises(self.authorization_error,
self.controller._unlock,
self.req, instance.uuid, body)
mock_unlock.assert_called_once_with(self.context, instance)
self.mock_get.assert_called_once_with(self.context, instance.uuid,
expected_attrs=None,
cell_down_support=False)
@mock.patch.object(common, 'get_instance')
def test_unlock_override_not_authorized_with_non_admin_user(
self, mock_get_instance):
instance = fake_instance.fake_instance_obj(self.context)
instance.locked_by = "owner"
mock_get_instance.return_value = instance
self.assertRaises(self.authorization_error,
self.controller._unlock, self.req,
instance.uuid,
{'unlock': None})
@mock.patch.object(common, 'get_instance')
def test_unlock_override_with_admin_user(self, mock_get_instance):
admin_req = fakes.HTTPRequest.blank('', use_admin_context=True)
admin_ctxt = admin_req.environ['nova.context']
instance = fake_instance.fake_instance_obj(admin_ctxt)
instance.locked_by = "owner"
mock_get_instance.return_value = instance
with mock.patch.object(self.compute_api, 'unlock') as mock_unlock:
self.controller._unlock(admin_req, instance.uuid, {'unlock': None})
mock_unlock.assert_called_once_with(admin_ctxt, instance)
@mock.patch.object(common, 'get_instance')
def test_unlock_with_any_body(self, get_instance_mock):
instance = fake_instance.fake_instance_obj(
@ -167,96 +125,3 @@ class LockServerTestsV273(LockServerTestsV21):
exp = self.assertRaises(exception.ValidationError,
self.controller._lock, self.req, instance.uuid, body=body)
self.assertIn("('blah' was unexpected)", six.text_type(exp))
class LockServerPolicyEnforcementV21(test.NoDBTestCase):
def setUp(self):
super(LockServerPolicyEnforcementV21, self).setUp()
self.controller = lock_server_v21.LockServerController()
self.req = fakes.HTTPRequest.blank('')
@mock.patch('nova.api.openstack.common.get_instance')
def test_lock_policy_failed_with_other_project(self, get_instance_mock):
get_instance_mock.return_value = fake_instance.fake_instance_obj(
self.req.environ['nova.context'],
project_id=self.req.environ['nova.context'].project_id)
rule_name = "os_compute_api:os-lock-server:lock"
self.policy.set_rules({rule_name: "project_id:%(project_id)s"})
# Change the project_id in request context.
self.req.environ['nova.context'].project_id = 'other-project'
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller._lock, self.req,
fakes.FAKE_UUID,
body={'lock': {}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
@mock.patch('nova.api.openstack.common.get_instance')
def test_lock_overridden_policy_failed_with_other_user_in_same_project(
self, get_instance_mock):
get_instance_mock.return_value = (
fake_instance.fake_instance_obj(self.req.environ['nova.context']))
rule_name = "os_compute_api:os-lock-server:lock"
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
# Change the user_id in request context.
self.req.environ['nova.context'].user_id = 'other-user'
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller._lock, self.req,
fakes.FAKE_UUID, body={'lock': {}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
@mock.patch('nova.compute.api.API.lock')
@mock.patch('nova.api.openstack.common.get_instance')
def test_lock_overridden_policy_pass_with_same_user(self,
get_instance_mock,
lock_mock):
instance = fake_instance.fake_instance_obj(
self.req.environ['nova.context'],
user_id=self.req.environ['nova.context'].user_id)
get_instance_mock.return_value = instance
rule_name = "os_compute_api:os-lock-server:lock"
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.controller._lock(self.req, fakes.FAKE_UUID, body={'lock': {}})
lock_mock.assert_called_once_with(self.req.environ['nova.context'],
instance, reason=None)
@mock.patch('nova.api.openstack.common.get_instance')
def test_unlock_policy_failed(self, get_instance_mock):
instance = fake_instance.fake_instance_obj(
self.req.environ['nova.context'],
user_id=self.req.environ['nova.context'].user_id)
get_instance_mock.return_value = instance
rule_name = "os_compute_api:os-lock-server:unlock"
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller._unlock, self.req,
fakes.FAKE_UUID,
body={'unlock': {}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
@mock.patch.object(common, 'get_instance')
def test_unlock_policy_failed_with_unlock_override(self,
get_instance_mock):
ctxt = context.RequestContext('fake', 'fake')
instance = fake_instance.fake_instance_obj(ctxt)
instance.locked_by = "fake"
get_instance_mock.return_value = instance
rule_name = ("os_compute_api:os-lock-server:"
"unlock:unlock_override")
rules = {"os_compute_api:os-lock-server:unlock": "@",
rule_name: "project:non_fake"}
self.policy.set_rules(rules)
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller._unlock,
self.req, fakes.FAKE_UUID, body={'unlock': {}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())

View File

@ -0,0 +1,146 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import mock
from nova.policies import lock_server as ls_policies
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import timeutils
from nova.api.openstack.compute import lock_server
from nova.compute import vm_states
from nova import exception
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit import fake_instance
from nova.tests.unit.policies import base
class LockServerPolicyTest(base.BasePolicyTest):
"""Test Lock server APIs policies with all possible context.
This class defines the set of context with different roles
which are allowed and not allowed to pass the policy checks.
With those set of context, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(LockServerPolicyTest, self).setUp()
self.controller = lock_server.LockServerController()
self.req = fakes.HTTPRequest.blank('')
user_id = self.req.environ['nova.context'].user_id
self.mock_get = self.useFixture(
fixtures.MockPatch('nova.api.openstack.common.get_instance')).mock
uuid = uuids.fake_id
self.instance = fake_instance.fake_instance_obj(
self.project_member_context,
id=1, uuid=uuid, project_id=self.project_id,
user_id=user_id, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or and server owner is able to lock/unlock
# the sevrer
self.admin_or_owner_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin/owner is not able to lock/unlock
# the server
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context
]
# Check that admin is able to unlock the server which is
# locked by other
self.admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to unlock the server
# which is locked by other
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context
]
@mock.patch('nova.compute.api.API.lock')
def test_lock_server_policy(self, mock_lock):
rule_name = ls_policies.POLICY_ROOT % 'lock'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._lock,
self.req, self.instance.uuid,
body={'lock': {}})
@mock.patch('nova.compute.api.API.unlock')
def test_unlock_server_policy(self, mock_unlock):
rule_name = ls_policies.POLICY_ROOT % 'unlock'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._unlock,
self.req, self.instance.uuid,
body={'unlock': {}})
@mock.patch('nova.compute.api.API.unlock')
@mock.patch('nova.compute.api.API.is_expected_locked_by')
def test_unlock_override_server_policy(self, mock_expected, mock_unlock):
mock_expected.return_value = False
rule = ls_policies.POLICY_ROOT % 'unlock'
self.policy.set_rules({rule: "@"}, overwrite=False)
rule_name = ls_policies.POLICY_ROOT % 'unlock:unlock_override'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller._unlock,
self.req, self.instance.uuid,
body={'unlock': {}})
def test_lock_server_policy_failed_with_other_user(self):
# Change the user_id in request context.
req = fakes.HTTPRequest.blank('')
req.environ['nova.context'].user_id = 'other-user'
rule_name = ls_policies.POLICY_ROOT % 'lock'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller._lock,
req, fakes.FAKE_UUID, body={'lock': {}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
@mock.patch('nova.compute.api.API.lock')
def test_lock_sevrer_overridden_policy_pass_with_same_user(
self, mock_lock):
rule_name = ls_policies.POLICY_ROOT % 'lock'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.controller._lock(self.req,
fakes.FAKE_UUID,
body={'lock': {}})
class LockServerScopeTypePolicyTest(LockServerPolicyTest):
"""Test Lock Server APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
so that we can switch on the scope checking on oslo policy side.
It defines the set of context with scoped token
which are allowed and not allowed to pass the policy checks.
With those set of context, it will run the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(LockServerScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")