RBAC: Fix port query and deletion for network owner

Network owner should be able to get all ports and delete ports on
network as policy allowed. But current code fails to support this.

Current model query for Port is still based on tenant_id, it forgets
to check for network owner when context tenant_id is not port owner.

For port_delete action, policy will generate checking rules for port
attributes, such as:
    rule:delete_port:binding:vif_details
    rule:delete_port:binding:vif_type
This doesn't make sense, only single policy rule "rule:delete_port"
is enough to check.

This patch fixes this issue.

Co-Authored-By: Kevin Benton <kevinbenton@buttewifi.com>
Change-Id: I55328cb43207654b9bb4cfb732923982d020ab0a
Closes-Bug: #1498790
This commit is contained in:
lzklibj 2015-12-09 21:52:05 +08:00
parent cab6c7c67a
commit 67abf5f9f0
7 changed files with 60 additions and 18 deletions

View File

@ -707,12 +707,10 @@ class Controller(object):
network_owner = network['tenant_id']
if network_owner != resource_item['tenant_id']:
msg = _("Tenant %(tenant_id)s not allowed to "
"create %(resource)s on this network")
raise webob.exc.HTTPForbidden(msg % {
"tenant_id": resource_item['tenant_id'],
"resource": self._resource,
})
# NOTE(kevinbenton): we raise a 404 to hide the existence of the
# network from the tenant since they don't have access to it.
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
def create_resource(collection, resource, plugin, params, allow_bulk=False,

View File

@ -312,3 +312,11 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
return [{'subnet_id': ip["subnet_id"],
'ip_address': ip["ip_address"]}
for ip in ips]
def _port_filter_hook(self, context, original_model, conditions):
# Apply the port filter only in non-admin and non-advsvc context
if self.model_query_scope(context, original_model):
conditions |= (
(context.tenant_id == models_v2.Network.tenant_id) &
(models_v2.Network.id == models_v2.Port.network_id))
return conditions

View File

@ -1424,3 +1424,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
device_id=device_id)
if tenant_id != router['tenant_id']:
raise n_exc.DeviceIDNotOwnedByTenant(device_id=device_id)
db_base_plugin_common.DbBasePluginCommon.register_model_query_hook(
models_v2.Port,
"port",
None,
'_port_filter_hook',
None)

View File

@ -417,8 +417,6 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
def delete_port(self, context, port_id):
query = (context.session.query(models_v2.Port).
enable_eagerloads(False).filter_by(id=port_id))
if not context.is_admin:
query = query.filter_by(tenant_id=context.tenant_id)
# Use of the ORM mapper is needed for ensuring appropriate resource
# tracking; otherwise SQL Alchemy events won't be triggered.
# For more info check 'caveats' in doc/source/devref/quota.rst

View File

@ -60,10 +60,13 @@ def refresh(policy_file=None):
def get_resource_and_action(action, pluralized=None):
"""Extract resource and action (write, read) from api operation."""
"""Return resource and enforce_attr_based_check(boolean) per
resource and action extracted from api operation.
"""
data = action.split(':', 1)[0].split('_', 1)
resource = pluralized or ("%ss" % data[-1])
return (resource, data[0] != 'get')
enforce_attr_based_check = data[0] not in ('get', 'delete')
return (resource, enforce_attr_based_check)
def set_rules(policies, overwrite=True):
@ -150,9 +153,9 @@ def _build_match_rule(action, target, pluralized):
(e.g.: create_router:external_gateway_info:network_id)
"""
match_rule = policy.RuleCheck('rule', action)
resource, is_write = get_resource_and_action(action, pluralized)
# Attribute-based checks shall not be enforced on GETs
if is_write:
resource, enforce_attr_based_check = get_resource_and_action(
action, pluralized)
if enforce_attr_based_check:
# assigning to variable with short name for improving readability
res_map = attributes.RESOURCE_ATTRIBUTE_MAP
if resource in res_map:

View File

@ -276,11 +276,6 @@ class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
@test.attr(type='smoke')
@test.idempotent_id('86c3529b-1231-40de-803c-beefbeefbeef')
def test_tenant_can_delete_port_on_own_network(self):
# TODO(kevinbenton): make adjustments to the db lookup to
# make this work.
msg = "Non-admin cannot currently delete other's ports."
raise self.skipException(msg)
# pylint: disable=unreachable
net = self.create_network() # owned by self.client
self.client.create_rbac_policy(
object_type='network', object_id=net['id'],

View File

@ -1121,6 +1121,29 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self._test_list_resources('port', [port2],
neutron_context=n_context)
def test_list_ports_for_network_owner(self):
with self.network(tenant_id='tenant_1') as network:
with self.subnet(network) as subnet:
with self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# network owner request, should return all ports
port_res = self._list_ports(
'json', set_context=True, tenant_id='tenant_1')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(2, len(port_list))
self.assertIn(port1['port']['id'], port_ids)
self.assertIn(port2['port']['id'], port_ids)
# another tenant request, only return ports belong to it
port_res = self._list_ports(
'json', set_context=True, tenant_id='tenant_2')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(1, len(port_list))
self.assertNotIn(port1['port']['id'], port_ids)
self.assertIn(port2['port']['id'], port_ids)
def test_list_ports_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
@ -1226,6 +1249,16 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_delete_port_by_network_owner(self):
with self.network(tenant_id='tenant_1') as network:
with self.subnet(network) as subnet:
with self.port(subnet, tenant_id='tenant_2') as port:
self._delete(
'ports', port['port']['id'],
neutron_context=context.Context('', 'tenant_1'))
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_update_port(self):
with self.port() as port:
data = {'port': {'admin_state_up': False}}