diff --git a/neutron/common/exceptions.py b/neutron/common/exceptions.py index f5fd9317779..bc92c6d9ccd 100644 --- a/neutron/common/exceptions.py +++ b/neutron/common/exceptions.py @@ -52,6 +52,11 @@ class PolicyCheckError(e.NeutronException): message = _("Failed to check policy %(policy)s because %(reason)s.") +class PolicyRemoveAuthorizationError(e.NotAuthorized): + message = _("Failed to remove provided policy %(policy_id)s " + "because you are not authorized.") + + class StateInvalid(e.BadRequest): message = _("Unsupported port state: %(port_state)s.") diff --git a/neutron/core_extensions/qos.py b/neutron/core_extensions/qos.py index 0ee323ff703..6b9f1365c14 100644 --- a/neutron/core_extensions/qos.py +++ b/neutron/core_extensions/qos.py @@ -37,10 +37,23 @@ class QosCoreResourceExtension(base.CoreResourceExtension): raise n_exc.QosPolicyNotFound(policy_id=policy_id) return obj + def _check_policy_change_permission(self, context, old_policy): + """An existing policy can be modified only if one of the following is + true: + + the policy's tenant is the context's tenant + the policy is shared with the tenant + + Using is_accessible expresses these conditions. + """ + if not (policy_object.QosPolicy.is_accessible(context, old_policy)): + raise n_exc.PolicyRemoveAuthorizationError(policy_id=old_policy.id) + def _update_port_policy(self, context, port, port_changes): old_policy = policy_object.QosPolicy.get_port_policy( - context, port['id']) + context.elevated(), port['id']) if old_policy: + self._check_policy_change_permission(context, old_policy) old_policy.detach_port(port['id']) qos_policy_id = port_changes.get(qos_consts.QOS_POLICY_ID) @@ -51,8 +64,9 @@ class QosCoreResourceExtension(base.CoreResourceExtension): def _update_network_policy(self, context, network, network_changes): old_policy = policy_object.QosPolicy.get_network_policy( - context, network['id']) + context.elevated(), network['id']) if old_policy: + self._check_policy_change_permission(context, old_policy) old_policy.detach_network(network['id']) qos_policy_id = network_changes.get(qos_consts.QOS_POLICY_ID) diff --git a/neutron/tests/unit/core_extensions/test_qos.py b/neutron/tests/unit/core_extensions/test_qos.py index 88aa6b86663..1070ba14fcb 100644 --- a/neutron/tests/unit/core_extensions/test_qos.py +++ b/neutron/tests/unit/core_extensions/test_qos.py @@ -15,6 +15,7 @@ import mock +from neutron.common import exceptions as n_exc from neutron import context from neutron.core_extensions import base as base_core from neutron.core_extensions import qos as qos_core @@ -36,6 +37,7 @@ class QosCoreResourceExtensionTestCase(base.BaseTestCase): policy_p = mock.patch('neutron.objects.qos.policy.QosPolicy') self.policy_m = policy_p.start() self.context = context.get_admin_context() + self.non_admin_context = context.Context('user_id', 'tenant_id') def test_process_fields_no_qos_policy_id(self): self.core_extension.process_fields( @@ -110,6 +112,51 @@ class QosCoreResourceExtensionTestCase(base.BaseTestCase): old_qos_policy.detach_port.assert_called_once_with(port_id) self.assertIsNone(actual_port['qos_policy_id']) + def _process_port_updated_policy(self, context, shared, + policy_tenant_id): + with self._mock_plugin_loaded(True): + port_id = mock.sentinel.port_id + qos_policy_id = mock.sentinel.policy_id + actual_port = {'id': port_id, + qos_consts.QOS_POLICY_ID: qos_policy_id} + old_qos_policy = mock.MagicMock() + old_qos_policy.shared = shared + old_qos_policy.tenant_id = policy_tenant_id + self.policy_m.get_port_policy = mock.Mock( + return_value=old_qos_policy) + self.core_extension.process_fields( + context, base_core.PORT, + {qos_consts.QOS_POLICY_ID: None}, + actual_port) + + old_qos_policy.detach_port.assert_called_once_with(port_id) + + def test_process_resource_port_updated_remove_own_policy(self): + self._process_port_updated_policy( + context=self.non_admin_context, + shared=False, + policy_tenant_id=self.non_admin_context.tenant_id) + + def test_process_resource_port_updated_admin_remove_provided_policy(self): + self._process_port_updated_policy( + context=self.context, + shared=False, + policy_tenant_id=self.non_admin_context.tenant_id) + + def test_process_resource_port_updated_remove_shared_policy(self): + self._process_port_updated_policy( + context=self.non_admin_context, + shared=True, + policy_tenant_id=self.context.tenant_id) + + def test_process_resource_port_updated_remove_provided_policy(self): + self.policy_m.is_accessible.return_value = False + self.assertRaises(n_exc.PolicyRemoveAuthorizationError, + self._process_port_updated_policy, + context=self.non_admin_context, + shared=False, + policy_tenant_id=self.context.tenant_id) + def test_process_resource_network_updated_no_policy(self): with self._mock_plugin_loaded(True): network_id = mock.Mock() @@ -161,6 +208,49 @@ class QosCoreResourceExtensionTestCase(base.BaseTestCase): old_qos_policy.detach_network.assert_called_once_with(network_id) new_qos_policy.attach_network.assert_called_once_with(network_id) + def _process_network_updated_policy(self, context, shared, + policy_tenant_id): + with self._mock_plugin_loaded(True): + qos_policy_id = mock.sentinel.policy_id + network_id = mock.sentinel.net_id + actual_network = {'id': network_id, + qos_consts.QOS_POLICY_ID: qos_policy_id} + old_qos_policy = mock.MagicMock() + old_qos_policy.shared = shared + old_qos_policy.tenant_id = policy_tenant_id + self.policy_m.get_network_policy.return_value = old_qos_policy + self.core_extension.process_fields( + context, base_core.NETWORK, + {qos_consts.QOS_POLICY_ID: None}, actual_network) + + old_qos_policy.detach_network.assert_called_once_with(network_id) + + def test_process_fields_network_updated_remove_shared_policy(self): + self._process_network_updated_policy( + context=self.non_admin_context, + shared=True, + policy_tenant_id=self.context.tenant_id) + + def test_process_fields_network_updated_remove_own_policy(self): + self._process_network_updated_policy( + context=self.non_admin_context, + shared=True, + policy_tenant_id=self.non_admin_context.tenant_id) + + def test_process_fields_network_updated_admin_remove_provided_policy(self): + self._process_network_updated_policy( + context=self.context, + shared=True, + policy_tenant_id=self.non_admin_context.tenant_id) + + def test_process_fields_network_updated_remove_provided_policy(self): + self.policy_m.is_accessible.return_value = False + self.assertRaises(n_exc.PolicyRemoveAuthorizationError, + self._process_network_updated_policy, + context=self.non_admin_context, + shared=False, + policy_tenant_id=self.context.tenant_id) + def test_extract_fields_plugin_not_loaded(self): with self._mock_plugin_loaded(False): fields = self.core_extension.extract_fields(None, None)