Reject REST API operations on L3 resources owned by ASR1k L3 router plugin

The plugin will now reject REST API operations (even when performed by admins)
on L3 resources that the router service itself owns and manages.

AKA: DE3496

Closes-Bug: #1694215

Change-Id: I19ccca674dfb8e84ff231ca1954e3284f1238abf
This commit is contained in:
Bob Melander 2017-05-29 11:18:35 +02:00
parent 977f5217ba
commit 00e088b830
3 changed files with 70 additions and 29 deletions

View File

@ -13,6 +13,7 @@
# under the License.
import copy
import inspect
import os
import subprocess
@ -105,6 +106,11 @@ class RouterBindingInfoError(n_exc.NeutronException):
message = _("Could not get binding information for router %(router_id)s.")
class PluginManagedRouterError(n_exc.NotAuthorized):
message = _("Router %(router_id)s is managed by the L3 router service "
"plugin and cannot be modified by users (including admins).")
class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
"""Mixin class implementing Neutron's routing service using appliances."""
@ -231,6 +237,7 @@ class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
return router_created
def update_router(self, context, id, router):
self._validate_caller(context, id)
router_type_id = self.get_router_type_id(context, id)
driver = self._get_router_type_driver(context,
router_type_id)
@ -332,6 +339,11 @@ class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
gw_info)
def delete_router(self, context, router_id, unschedule=True):
try:
self._validate_caller(context, router_id)
except RouterBindingInfoError:
LOG.debug('Router %s to be deleted has no binding information '
'so assuming it is a regular router')
try:
router_db = self._ensure_router_not_in_use(context, router_id)
except sa_exc.InvalidRequestError:
@ -397,8 +409,14 @@ class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
if is_ha:
# process any HA
self._delete_redundancy_routers(context, router_db)
super(L3RouterApplianceDBMixin, self).delete_router(context,
router_id)
try:
super(L3RouterApplianceDBMixin, self).delete_router(context,
router_id)
except l3.RouterNotFound as e:
LOG.debug('Ignorable error: %(err)s as it only indicates that '
'router was already concurrently deleted just '
'before this deletion attempt', {'err': e})
return
if driver:
driver.delete_router_postcommit(context, router_ctxt)
except n_exc.NeutronException:
@ -425,6 +443,7 @@ class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
{'router_interface': router_interface_info})
def add_router_interface(self, context, router_id, interface_info):
self._validate_caller(context, router_id)
router_type_id = self.get_router_type_id(context, router_id)
r_hd_binding_db = self._get_router_binding_info(context.elevated(),
router_id)
@ -484,6 +503,7 @@ class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
port_subnet_id)
def remove_router_interface(self, context, router_id, interface_info):
self._validate_caller(context, router_id)
remove_by_port, remove_by_subnet = self._validate_interface_info(
interface_info, for_removal=True)
e_context = context.elevated()
@ -528,6 +548,16 @@ class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
driver.remove_router_interface_postcommit(context, port_ctxt)
return info
def _validate_caller(self, context, router_id):
frm = inspect.stack()[2]
module_name = inspect.getmodule(frm[0]).__name__
role = self._get_router_binding_info(context.elevated(),
router_id).role
if module_name.endswith('api.v2.base') and role is not None:
# nobody, not even admins are allowed to operate directly on
# HA redundancy routers or Global routers
raise PluginManagedRouterError(router_id=router_id)
@property
def is_gbp_workflow(self):
"""Determine if Group Based Policy service plugin is used.

View File

@ -1436,21 +1436,13 @@ class HAL3RouterApplianceVMTestCase(
routes1 = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.199'},
{'destination': '12.0.0.0/8', 'nexthop': '10.0.1.200'},
{'destination': '141.212.0.0/16', 'nexthop': '10.0.1.201'}]
routes2 = [{'destination': '155.210.0.0/28', 'nexthop': '11.0.1.199'},
{'destination': '130.238.5.0/24', 'nexthop': '11.0.1.199'}]
with self.router() as router,\
self.subnet(cidr='10.0.1.0/24') as subnet1,\
self.subnet(cidr='11.0.1.0/24') as subnet2,\
self.port(subnet=subnet1) as port1,\
self.port(subnet=subnet2) as port2:
self.port(subnet=subnet1) as port1:
r = router['router']
p1 = port1['port']
p2 = port2['port']
updated_r = self._routes_update_prepare(r['id'], None, p1['id'],
routes1)['router']
rr1_id = r[ha.DETAILS][ha.REDUNDANCY_ROUTERS][0]['id']
updated_rr1 = self._rr_routes_update_prepare(
r['id'], None, p2['id'], rr1_id, routes2)['router']
params = "id=%s" % r[ha.DETAILS][ha.REDUNDANCY_ROUTERS][1]['id']
routers = self._list('routers', query_params=params)['routers']
routers.append(updated_r)
@ -1458,10 +1450,6 @@ class HAL3RouterApplianceVMTestCase(
for router in routers:
self.assertEqual(_sort_routes(router['routes']),
correct_routes1)
routes1.extend(routes2)
self.assertEqual(_sort_routes(updated_rr1['routes']),
_sort_routes(routes1))
self._rr_routes_update_cleanup(p2['id'], None, r['id'], rr1_id, [])
self._routes_update_cleanup(p1['id'], None, r['id'], [])
def test_router_update_change_name_changes_redundancy_routers(self):
@ -1662,21 +1650,13 @@ class L3CfgAgentHARouterApplianceTestCase(
routes1 = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.199'},
{'destination': '12.0.0.0/8', 'nexthop': '10.0.1.200'},
{'destination': '141.212.0.0/16', 'nexthop': '10.0.1.201'}]
routes2 = [{'destination': '155.210.0.0/28', 'nexthop': '11.0.1.202'},
{'destination': '130.238.5.0/24', 'nexthop': '11.0.1.202'}]
with self.router() as router,\
self.subnet(cidr='10.0.1.0/24') as subnet1,\
self.subnet(cidr='11.0.1.0/24') as subnet2,\
self.port(subnet=subnet1) as port1,\
self.port(subnet=subnet2) as port2:
self.port(subnet=subnet1) as port1:
r = router['router']
p1 = port1['port']
p2 = port2['port']
self._routes_update_prepare(r['id'], None, p1['id'], r['id'],
routes1)
rr1_id = r[ha.DETAILS][ha.REDUNDANCY_ROUTERS][0]['id']
self._routes_update_prepare(r['id'], None, p2['id'], rr1_id,
routes2)
router_ids = [r['id'],
r[ha.DETAILS][ha.REDUNDANCY_ROUTERS][1]['id']]
e_context = bc.context.get_admin_context()
@ -1686,11 +1666,6 @@ class L3CfgAgentHARouterApplianceTestCase(
for router in routers:
self.assertEqual(_sort_routes(router['routes']),
correct_routes1)
routers = self.l3_plugin.get_sync_data_ext(e_context, [rr1_id])
routes1.extend(routes2)
self.assertEqual(_sort_routes(routers[0]['routes']),
_sort_routes(routes1))
self._routes_update_cleanup(p2['id'], None, r['id'], rr1_id, [])
self._routes_update_cleanup(p1['id'], None, r['id'], r['id'], [])
def test_l3_cfg_agent_query_ha_router_with_fips(self):

View File

@ -33,6 +33,7 @@ from oslo_db import exception as db_exc
from oslo_utils import uuidutils
import six
from sqlalchemy import exc as inner_db_exc
from webob import exc
from networking_cisco._i18n import _
from networking_cisco import backwards_compatibility as bc
@ -553,6 +554,41 @@ class L3RouterApplianceNamespaceTestCase(
'test_router_update_gateway_to_empty_with_existed_floatingip', 1,
1)
def _test_rest_api_operations_denied_for_plugin_managed_router(
self, func, *args, **kwargs):
with mock.patch.object(self.l3_plugin, '_get_router_binding_info',
return_value=lambda: None) as m:
# for now operations on routers of any role != None are not
# permitted for anyone but the l3plugin itself
setattr(m.return_value, 'role', 'some_role')
func(*args, **kwargs)
def test_router_update_denied_for_plugin_managed_router(self):
with self.router() as router:
self._test_rest_api_operations_denied_for_plugin_managed_router(
self._update, 'routers', router['router']['id'],
{'router': {'name': 'new_name'}},
expected_code=exc.HTTPForbidden.code)
def test_router_delete_denied_for_plugin_managed_router(self):
with self.router() as router:
self._test_rest_api_operations_denied_for_plugin_managed_router(
self._delete, 'routers', router['router']['id'],
expected_code=exc.HTTPForbidden.code)
def test_router_interface_add_denied_for_plugin_managed_router(self):
with self.router() as router:
self._test_rest_api_operations_denied_for_plugin_managed_router(
self._router_interface_action, 'add', router['router']['id'],
'some_subnet_id', None, expected_code=exc.HTTPForbidden.code)
def router_interface_remove_denied_for_plugin_managed_router(self):
with self.router() as router:
self._test_rest_api_operations_denied_for_plugin_managed_router(
self._router_interface_action, 'remove',
router['router']['id'], 'some_subnet_id', None,
expected_code=exc.HTTPForbidden.code)
class L3RouterApplianceVMTestCase(L3RouterApplianceNamespaceTestCase):