Fix lack of device ownership enforcement for DVR routers

The enforcement rule was applied to centralized router interfaces, to avoid
a potential security vulnerabilty.

Even though DVR routers are fundamentally different from centralized routers,
there is no good reason as to why the rule should be skipped for DVR interfaces.

This patch sanitizes the insanity a bit and closes this potential loophole by
preventing the operation for DVR routers too.

Related-bug: #1243327
Closes-bug: #1410984

Change-Id: I048e6e3926e1c74cf9ecb63cfb53a0b1afb3c579
This commit is contained in:
armando-migliaccio 2015-02-05 16:27:52 -08:00
parent ca458c4f20
commit 89025a8dd9
2 changed files with 40 additions and 48 deletions

View File

@ -1321,9 +1321,9 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, p)
if p.get('device_owner') == constants.DEVICE_OWNER_ROUTER_INTF:
self._enforce_device_owner_not_router_intf_or_device_id(context, p,
tenant_id)
if p.get('device_owner'):
self._enforce_device_owner_not_router_intf_or_device_id(
context, p.get('device_owner'), p.get('device_id'), tenant_id)
port_data = dict(tenant_id=tenant_id,
name=p['name'],
@ -1361,30 +1361,22 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
p = port['port']
changed_ips = False
changed_device_id = False
with context.session.begin(subtransactions=True):
port = self._get_port(context, id)
if 'device_owner' in p:
current_device_owner = p['device_owner']
changed_device_owner = True
else:
current_device_owner = port['device_owner']
changed_device_owner = False
if p.get('device_id') != port['device_id']:
changed_device_id = True
changed_owner = 'device_owner' in p
current_owner = p.get('device_owner') or port['device_owner']
changed_device_id = p.get('device_id') != port['device_id']
current_device_id = p.get('device_id') or port['device_id']
# if the current device_owner is ROUTER_INF and the device_id or
# device_owner changed check device_id is not another tenants
# router
if ((current_device_owner == constants.DEVICE_OWNER_ROUTER_INTF)
and (changed_device_id or changed_device_owner)):
if current_owner and changed_device_id or changed_owner:
self._enforce_device_owner_not_router_intf_or_device_id(
context, p, port['tenant_id'], port)
context, current_owner, current_device_id,
port['tenant_id'])
new_mac = p.get('mac_address')
if new_mac and new_mac != port['mac_address']:
self._check_mac_addr_update(
context, port, new_mac, current_device_owner)
context, port, new_mac, current_owner)
# Check if the IPs need to be updated
network_id = port['network_id']
@ -1490,16 +1482,15 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
return self._get_ports_query(context, filters).count()
def _enforce_device_owner_not_router_intf_or_device_id(self, context,
port_request,
tenant_id,
db_port=None):
device_owner,
device_id,
tenant_id):
"""Prevent tenants from replacing the device id of router ports with
a router uuid belonging to another tenant.
"""
if device_owner not in constants.ROUTER_INTERFACE_OWNERS:
return
if not context.is_admin:
# find the device_id. If the call was update_port and the
# device_id was not passed in we use the device_id from the
# db.
device_id = port_request.get('device_id')
if not device_id and db_port:
device_id = db_port.get('device_id')
# check to make sure device_id does not match another tenants
# router.
if device_id:

View File

@ -1255,13 +1255,14 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
with self.network(tenant_id='tenant_a',
set_context=True) as n:
with self.subnet(network=n):
self._create_port(
self.fmt, n['network']['id'],
tenant_id='tenant_a',
device_id=admin_router['router']['id'],
device_owner='network:router_interface',
set_context=True,
expected_res_status=exc.HTTPConflict.code)
for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS:
self._create_port(
self.fmt, n['network']['id'],
tenant_id='tenant_a',
device_id=admin_router['router']['id'],
device_owner=device_owner,
set_context=True,
expected_res_status=exc.HTTPConflict.code)
def test_create_non_router_port_device_id_of_other_teants_router_update(
self):
@ -1272,19 +1273,19 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
with self.network(tenant_id='tenant_a',
set_context=True) as n:
with self.subnet(network=n):
port_res = self._create_port(
self.fmt, n['network']['id'],
tenant_id='tenant_a',
device_id=admin_router['router']['id'],
set_context=True)
port = self.deserialize(self.fmt, port_res)
neutron_context = context.Context('', 'tenant_a')
data = {'port': {'device_owner':
'network:router_interface'}}
self._update('ports', port['port']['id'], data,
neutron_context=neutron_context,
expected_code=exc.HTTPConflict.code)
self._delete('ports', port['port']['id'])
for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS:
port_res = self._create_port(
self.fmt, n['network']['id'],
tenant_id='tenant_a',
device_id=admin_router['router']['id'],
set_context=True)
port = self.deserialize(self.fmt, port_res)
neutron_context = context.Context('', 'tenant_a')
data = {'port': {'device_owner': device_owner}}
self._update('ports', port['port']['id'], data,
neutron_context=neutron_context,
expected_code=exc.HTTPConflict.code)
self._delete('ports', port['port']['id'])
def test_update_port_device_id_to_different_tenants_router(self):
with self.router() as admin_router: