Pecan: Streamline request body processing
This patch simplifies the process for validating the request body for POST and PUT requests and preparing it for dispatching to plugins. In particular it removes the need for parsing the URL to find the resource identifier, and instead leverages Pecan's routing engine to extract it. As a result the attribute population hook now simply deals with request body validation, and has been renamed accordingly. The logic for loading the current state of the object from the plugin in order to enforce authZ policy on PUT requests has been moved to the appropriate hook, PolicyEnforcementHook. The logic for managing plural/singular names for resource has also been improved, and two helper functions to retrieve a resource's attributes have been added to neutron.api.v2.attributes The logic for aborting requests with unsupported HTTP methods has also been moved to the REST controllers. It used to be in PolicyEnforcementHook, which was probably not the right thing to do. This patch also unskips a functional test concerning ownership checks, and add functional tests for verifying correct request processing. Unit tests for the newly added helper functions are also provided. Related blueprint wsgi-pecan-switch Change-Id: Ib26998b37bdeec8af7a97f77b66d421b8cd271da
This commit is contained in:
parent
5def2dcd74
commit
0f9a607cf6
|
@ -893,6 +893,7 @@ RESOURCE_FOREIGN_KEYS = {
|
|||
NETWORKS: 'network_id'
|
||||
}
|
||||
|
||||
# Store plural/singular mappings
|
||||
PLURALS = {NETWORKS: NETWORK,
|
||||
PORTS: PORT,
|
||||
SUBNETS: SUBNET,
|
||||
|
@ -902,6 +903,31 @@ PLURALS = {NETWORKS: NETWORK,
|
|||
'allocation_pools': 'allocation_pool',
|
||||
'fixed_ips': 'fixed_ip',
|
||||
'extensions': 'extension'}
|
||||
# Store singular/plural mappings. This dictionary is populated by
|
||||
# get_resource_info
|
||||
REVERSED_PLURALS = {}
|
||||
|
||||
|
||||
def get_collection_info(collection):
|
||||
"""Helper function to retrieve attribute info.
|
||||
|
||||
:param collection: Collection or plural name of the resource
|
||||
"""
|
||||
return RESOURCE_ATTRIBUTE_MAP.get(collection)
|
||||
|
||||
|
||||
def get_resource_info(resource):
|
||||
"""Helper function to retrive attribute info
|
||||
|
||||
:param resource: resource name
|
||||
"""
|
||||
plural_name = REVERSED_PLURALS.get(resource)
|
||||
if not plural_name:
|
||||
for (plural, singular) in PLURALS.items():
|
||||
if singular == resource:
|
||||
plural_name = plural
|
||||
REVERSED_PLURALS[resource] = plural_name
|
||||
return RESOURCE_ATTRIBUTE_MAP.get(plural_name)
|
||||
|
||||
|
||||
def fill_default_value(attr_info, res_dict,
|
||||
|
|
|
@ -46,7 +46,7 @@ def setup_app(*args, **kwargs):
|
|||
hooks.ExceptionTranslationHook(), # priority 100
|
||||
hooks.ContextHook(), # priority 95
|
||||
hooks.MemberActionHook(), # piority 95
|
||||
hooks.AttributePopulationHook(), # priority 120
|
||||
hooks.BodyValidationHook(), # priority 120
|
||||
hooks.OwnershipValidationHook(), # priority 125
|
||||
hooks.QuotaEnforcementHook(), # priority 130
|
||||
hooks.PolicyHook(), # priority 135
|
||||
|
|
|
@ -58,7 +58,9 @@ class RootController(object):
|
|||
versions = [builder.build(version) for version in _get_version_info()]
|
||||
return dict(versions=versions)
|
||||
|
||||
@when(index, method='HEAD')
|
||||
@when(index, method='POST')
|
||||
@when(index, method='PATCH')
|
||||
@when(index, method='PUT')
|
||||
@when(index, method='DELETE')
|
||||
def not_supported(self):
|
||||
|
@ -96,7 +98,9 @@ class V2Controller(object):
|
|||
builder = versions_view.get_view_builder(pecan.request)
|
||||
return dict(version=builder.build(self.version_info))
|
||||
|
||||
@when(index, method='HEAD')
|
||||
@when(index, method='POST')
|
||||
@when(index, method='PATCH')
|
||||
@when(index, method='PUT')
|
||||
@when(index, method='DELETE')
|
||||
def not_supported(self):
|
||||
|
@ -110,9 +114,10 @@ class V2Controller(object):
|
|||
LOG.warn(_LW("No controller found for: %s - returning response "
|
||||
"code 404"), collection)
|
||||
pecan.abort(404)
|
||||
# Store resource name in pecan request context so that hooks can
|
||||
# leverage it if necessary
|
||||
# Store resource and collection names in pecan request context so that
|
||||
# hooks can leverage them if necessary
|
||||
request.context['resource'] = controller.resource
|
||||
request.context['collection'] = collection
|
||||
return controller, remainder
|
||||
|
||||
|
||||
|
@ -150,6 +155,8 @@ class CollectionsController(NeutronPecanController):
|
|||
|
||||
@expose()
|
||||
def _lookup(self, item, *remainder):
|
||||
# Store resource identifier in request context
|
||||
request.context['resource_id'] = item
|
||||
return ItemController(self.resource, item), remainder
|
||||
|
||||
@expose(generic=True)
|
||||
|
@ -165,21 +172,36 @@ class CollectionsController(NeutronPecanController):
|
|||
filters = {k: _listify(v) for k, v in kwargs.items()}
|
||||
# TODO(kevinbenton): convert these using api_common.get_filters
|
||||
lister = getattr(self.plugin, 'get_%s' % self.collection)
|
||||
neutron_context = request.context.get('neutron_context')
|
||||
neutron_context = request.context['neutron_context']
|
||||
return {self.collection: lister(neutron_context, filters=filters)}
|
||||
|
||||
@when(index, method='HEAD')
|
||||
@when(index, method='PATCH')
|
||||
@when(index, method='PUT')
|
||||
@when(index, method='DELETE')
|
||||
def not_supported(self):
|
||||
pecan.abort(405)
|
||||
|
||||
@when(index, method='POST')
|
||||
def post(self, *args, **kwargs):
|
||||
# TODO(kevinbenton): emulated bulk!
|
||||
resources = request.context['resources']
|
||||
pecan.response.status = 201
|
||||
if request.bulk:
|
||||
return self.create(resources)
|
||||
|
||||
def create(self, resources):
|
||||
if len(resources) > 1:
|
||||
# Bulk!
|
||||
method = 'create_%s_bulk' % self.resource
|
||||
key = self.collection
|
||||
data = {key: [{self.resource: res} for res in resources]}
|
||||
else:
|
||||
method = 'create_%s' % self.resource
|
||||
key = self.resource
|
||||
data = {key: resources[0]}
|
||||
creator = getattr(self.plugin, method)
|
||||
key = self.collection if request.bulk else self.resource
|
||||
neutron_context = request.context.get('neutron_context')
|
||||
return {key: creator(neutron_context, request.prepared_data)}
|
||||
neutron_context = request.context['neutron_context']
|
||||
return {key: creator(neutron_context, data)}
|
||||
|
||||
|
||||
class ItemController(NeutronPecanController):
|
||||
|
@ -194,25 +216,35 @@ class ItemController(NeutronPecanController):
|
|||
|
||||
def get(self, *args, **kwargs):
|
||||
getter = getattr(self.plugin, 'get_%s' % self.resource)
|
||||
neutron_context = request.context.get('neutron_context')
|
||||
neutron_context = request.context['neutron_context']
|
||||
return {self.resource: getter(neutron_context, self.item)}
|
||||
|
||||
@when(index, method='HEAD')
|
||||
@when(index, method='POST')
|
||||
@when(index, method='PATCH')
|
||||
def not_supported(self):
|
||||
pecan.abort(405)
|
||||
|
||||
@when(index, method='PUT')
|
||||
def put(self, *args, **kwargs):
|
||||
neutron_context = request.context.get('neutron_context')
|
||||
neutron_context = request.context['neutron_context']
|
||||
resources = request.context['resources']
|
||||
if request.member_action:
|
||||
member_action_method = getattr(self.plugin,
|
||||
request.member_action)
|
||||
return member_action_method(neutron_context, self.item,
|
||||
request.prepared_data)
|
||||
resources[0])
|
||||
# TODO(kevinbenton): bulk?
|
||||
updater = getattr(self.plugin, 'update_%s' % self.resource)
|
||||
return updater(neutron_context, self.item, request.prepared_data)
|
||||
# Bulk update is not supported, 'resources' always contains a single
|
||||
# elemenet
|
||||
data = {self.resource: resources[0]}
|
||||
return updater(neutron_context, self.item, data)
|
||||
|
||||
@when(index, method='DELETE')
|
||||
def delete(self):
|
||||
# TODO(kevinbenton): setting code could be in a decorator
|
||||
pecan.response.status = 204
|
||||
neutron_context = request.context.get('neutron_context')
|
||||
neutron_context = request.context['neutron_context']
|
||||
deleter = getattr(self.plugin, 'delete_%s' % self.resource)
|
||||
return deleter(neutron_context, self.item)
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron.pecan_wsgi.hooks import attribute_population
|
||||
from neutron.pecan_wsgi.hooks import body_validation
|
||||
from neutron.pecan_wsgi.hooks import context
|
||||
from neutron.pecan_wsgi.hooks import member_action
|
||||
from neutron.pecan_wsgi.hooks import notifier
|
||||
|
@ -26,7 +26,7 @@ from neutron.pecan_wsgi.hooks import translation
|
|||
ExceptionTranslationHook = translation.ExceptionTranslationHook
|
||||
ContextHook = context.ContextHook
|
||||
MemberActionHook = member_action.MemberActionHook
|
||||
AttributePopulationHook = attribute_population.AttributePopulationHook
|
||||
BodyValidationHook = body_validation.BodyValidationHook
|
||||
OwnershipValidationHook = ownership_validation.OwnershipValidationHook
|
||||
PolicyHook = policy_enforcement.PolicyHook
|
||||
QuotaEnforcementHook = quota_enforcement.QuotaEnforcementHook
|
||||
|
|
|
@ -1,104 +0,0 @@
|
|||
# Copyright (c) 2015 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pecan import hooks
|
||||
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.api.v2 import base as v2base
|
||||
from neutron import manager
|
||||
|
||||
|
||||
class AttributePopulationHook(hooks.PecanHook):
|
||||
|
||||
priority = 120
|
||||
|
||||
def before(self, state):
|
||||
state.request.prepared_data = {}
|
||||
state.request.resources = []
|
||||
if state.request.method not in ('POST', 'PUT'):
|
||||
return
|
||||
is_create = state.request.method == 'POST'
|
||||
resource = state.request.context.get('resource')
|
||||
neutron_context = state.request.context['neutron_context']
|
||||
if not resource:
|
||||
return
|
||||
if state.request.member_action:
|
||||
# Neutron currently does not describe request bodies for member
|
||||
# actions in meh. prepare_request_body should not be called for
|
||||
# member actions, and the body should be passed as it is. The
|
||||
# plugin will do the validation (yuck).
|
||||
state.request.prepared_data = state.request.json
|
||||
else:
|
||||
state.request.prepared_data = (
|
||||
v2base.Controller.prepare_request_body(
|
||||
neutron_context, state.request.json, is_create,
|
||||
resource, _attributes_for_resource(resource),
|
||||
allow_bulk=True))
|
||||
# TODO(kevinbenton): conditional allow_bulk
|
||||
|
||||
state.request.resources = _extract_resources_from_state(state)
|
||||
# make the original object available:
|
||||
if not is_create and not state.request.member_action:
|
||||
obj_id = _pull_id_from_request(state.request, resource)
|
||||
attrs = _attributes_for_resource(resource)
|
||||
field_list = [name for (name, value) in attrs.items()
|
||||
if (value.get('required_by_policy') or
|
||||
value.get('primary_key') or
|
||||
'default' not in value)]
|
||||
plugin = manager.NeutronManager.get_plugin_for_resource(resource)
|
||||
getter = getattr(plugin, 'get_%s' % resource)
|
||||
# TODO(kevinbenton): the parent_id logic currently in base.py
|
||||
obj = getter(neutron_context, obj_id, fields=field_list)
|
||||
state.request.original_object = obj
|
||||
|
||||
|
||||
def _attributes_for_resource(resource):
|
||||
if resource not in attributes.PLURALS.values():
|
||||
return {}
|
||||
return attributes.RESOURCE_ATTRIBUTE_MAP.get(
|
||||
_plural(resource), {})
|
||||
|
||||
|
||||
def _pull_id_from_request(request, resource):
|
||||
# NOTE(kevinbenton): this sucks
|
||||
# Converting /v2.0/ports/dbbdae29-82f6-49cf-b05e-3365bcc95b7a.json
|
||||
# into dbbdae29-82f6-49cf-b05e-3365bcc95b7a
|
||||
resources = _plural(resource)
|
||||
jsontrail = request.path_info.replace('/v2.0/%s/' % resources, '')
|
||||
obj_id = jsontrail.replace('.json', '')
|
||||
return obj_id
|
||||
|
||||
|
||||
def _plural(rtype):
|
||||
for plural, single in attributes.PLURALS.items():
|
||||
if rtype == single:
|
||||
return plural
|
||||
|
||||
|
||||
def _extract_resources_from_state(state):
|
||||
resource = state.request.context['resource']
|
||||
if not resource:
|
||||
return []
|
||||
data = state.request.prepared_data
|
||||
# single item
|
||||
if resource in data:
|
||||
state.request.bulk = False
|
||||
return [data[resource]]
|
||||
# multiple items
|
||||
if _plural(resource) in data:
|
||||
state.request.bulk = True
|
||||
return data[_plural(resource)]
|
||||
|
||||
return []
|
|
@ -0,0 +1,47 @@
|
|||
# Copyright (c) 2015 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pecan import hooks
|
||||
|
||||
from neutron.api.v2 import attributes as v2_attributes
|
||||
from neutron.api.v2 import base as v2_base
|
||||
|
||||
|
||||
class BodyValidationHook(hooks.PecanHook):
|
||||
|
||||
priority = 120
|
||||
|
||||
def before(self, state):
|
||||
if state.request.method not in ('POST', 'PUT'):
|
||||
return
|
||||
resource = state.request.context.get('resource')
|
||||
collection = state.request.context.get('collection')
|
||||
neutron_context = state.request.context['neutron_context']
|
||||
is_create = state.request.method == 'POST'
|
||||
if not resource:
|
||||
return
|
||||
# Prepare data to be passed to the plugin from request body
|
||||
data = v2_base.Controller.prepare_request_body(
|
||||
neutron_context,
|
||||
state.request.json,
|
||||
is_create,
|
||||
resource,
|
||||
v2_attributes.get_collection_info(collection),
|
||||
allow_bulk=is_create)
|
||||
if collection in data:
|
||||
state.request.context['resources'] = [item[resource] for item in
|
||||
data[collection]]
|
||||
else:
|
||||
state.request.context['resources'] = [data[resource]]
|
|
@ -27,8 +27,7 @@ class OwnershipValidationHook(hooks.PecanHook):
|
|||
def before(self, state):
|
||||
if state.request.method != 'POST':
|
||||
return
|
||||
items = state.request.resources
|
||||
for item in items:
|
||||
for item in state.request.context.get('resources', []):
|
||||
self._validate_network_tenant_ownership(state, item)
|
||||
|
||||
def _validate_network_tenant_ownership(self, state, resource_item):
|
||||
|
|
|
@ -18,14 +18,13 @@ import simplejson
|
|||
|
||||
from oslo_policy import policy as oslo_policy
|
||||
from oslo_utils import excutils
|
||||
import pecan
|
||||
from pecan import hooks
|
||||
import webob
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.api.v2 import attributes as v2_attributes
|
||||
from neutron.common import constants as const
|
||||
from neutron import manager
|
||||
from neutron.pecan_wsgi.hooks import attribute_population
|
||||
from neutron import policy
|
||||
|
||||
|
||||
|
@ -34,25 +33,47 @@ class PolicyHook(hooks.PecanHook):
|
|||
ACTION_MAP = {'POST': 'create', 'PUT': 'update', 'GET': 'get',
|
||||
'DELETE': 'delete'}
|
||||
|
||||
def _fetch_resource(self, neutron_context, resource, resource_id):
|
||||
attrs = v2_attributes.get_resource_info(resource)
|
||||
field_list = [name for (name, value) in attrs.items()
|
||||
if (value.get('required_by_policy') or
|
||||
value.get('primary_key') or 'default' not in value)]
|
||||
plugin = manager.NeutronManager.get_plugin_for_resource(resource)
|
||||
getter = getattr(plugin, 'get_%s' % resource)
|
||||
# TODO(kevinbenton): the parent_id logic currently in base.py
|
||||
return getter(neutron_context, resource_id, fields=field_list)
|
||||
|
||||
def before(self, state):
|
||||
if state.request.method not in self.ACTION_MAP:
|
||||
pecan.abort(405)
|
||||
# This hook should be run only for PUT,POST and DELETE methods and for
|
||||
# requests targeting a neutron resource
|
||||
# FIXME(salv-orlando): DELETE support. It does not work with the
|
||||
# current logic. See LP Bug #1520180
|
||||
items = state.request.context.get('resources')
|
||||
if state.request.method not in ('POST', 'PUT') or not items:
|
||||
return
|
||||
neutron_context = state.request.context.get('neutron_context')
|
||||
resource = state.request.context.get('resource')
|
||||
collection = state.request.context.get('collection')
|
||||
is_update = (state.request.method == 'PUT')
|
||||
items = state.request.resources
|
||||
policy.init()
|
||||
action = '%s_%s' % (self.ACTION_MAP[state.request.method], resource)
|
||||
|
||||
# NOTE(salv-orlando): As bulk updates are not supported, in case of PUT
|
||||
# requests there will be only a single item to process, and its
|
||||
# identifier would have been already retrieved by the lookup process
|
||||
for item in items:
|
||||
if is_update:
|
||||
obj = copy.copy(state.request.original_object)
|
||||
resource_id = state.request.context.get('resource_id')
|
||||
obj = copy.copy(self._fetch_resource(neutron_context,
|
||||
resource,
|
||||
resource_id))
|
||||
obj.update(item)
|
||||
obj[const.ATTRIBUTES_TO_UPDATE] = item.keys()
|
||||
item = obj
|
||||
try:
|
||||
policy.enforce(
|
||||
neutron_context, action, item,
|
||||
pluralized=attribute_population._plural(resource))
|
||||
pluralized=collection)
|
||||
except oslo_policy.PolicyNotAuthorized:
|
||||
with excutils.save_and_reraise_exception() as ctxt:
|
||||
# If a tenant is modifying it's own object, it's safe to
|
||||
|
@ -67,6 +88,7 @@ class PolicyHook(hooks.PecanHook):
|
|||
def after(self, state):
|
||||
neutron_context = state.request.context.get('neutron_context')
|
||||
resource = state.request.context.get('resource')
|
||||
collection = state.request.context.get('collection')
|
||||
if not resource:
|
||||
# can't filter a resource we don't recognize
|
||||
return
|
||||
|
@ -79,31 +101,31 @@ class PolicyHook(hooks.PecanHook):
|
|||
return
|
||||
action = '%s_%s' % (self.ACTION_MAP[state.request.method],
|
||||
resource)
|
||||
plural = attribute_population._plural(resource)
|
||||
if not data or (resource not in data and plural not in data):
|
||||
if not data or (resource not in data and collection not in data):
|
||||
return
|
||||
is_single = resource in data
|
||||
key = resource if is_single else plural
|
||||
to_process = [data[resource]] if is_single else data[plural]
|
||||
key = resource if is_single else collection
|
||||
to_process = [data[resource]] if is_single else data[collection]
|
||||
# in the single case, we enforce which raises on violation
|
||||
# in the plural case, we just check so violating items are hidden
|
||||
policy_method = policy.enforce if is_single else policy.check
|
||||
plugin = manager.NeutronManager.get_plugin_for_resource(resource)
|
||||
resp = [self._get_filtered_item(state.request, resource, item)
|
||||
resp = [self._get_filtered_item(state.request, resource,
|
||||
collection, item)
|
||||
for item in to_process
|
||||
if (state.request.method != 'GET' or
|
||||
policy_method(neutron_context, action, item,
|
||||
plugin=plugin,
|
||||
pluralized=plural))]
|
||||
pluralized=collection))]
|
||||
if is_single:
|
||||
resp = resp[0]
|
||||
data[key] = resp
|
||||
state.response.json = data
|
||||
|
||||
def _get_filtered_item(self, request, resource, data):
|
||||
def _get_filtered_item(self, request, resource, collection, data):
|
||||
neutron_context = request.context.get('neutron_context')
|
||||
to_exclude = self._exclude_attributes_by_policy(
|
||||
neutron_context, resource, data)
|
||||
neutron_context, resource, collection, data)
|
||||
return self._filter_attributes(request, data, to_exclude)
|
||||
|
||||
def _filter_attributes(self, request, data, fields_to_strip):
|
||||
|
@ -115,7 +137,8 @@ class PolicyHook(hooks.PecanHook):
|
|||
if (item[0] not in fields_to_strip and
|
||||
(not user_fields or item[0] in user_fields)))
|
||||
|
||||
def _exclude_attributes_by_policy(self, context, resource, data):
|
||||
def _exclude_attributes_by_policy(self, context, resource,
|
||||
collection, data):
|
||||
"""Identifies attributes to exclude according to authZ policies.
|
||||
|
||||
Return a list of attribute names which should be stripped from the
|
||||
|
@ -124,7 +147,7 @@ class PolicyHook(hooks.PecanHook):
|
|||
"""
|
||||
attributes_to_exclude = []
|
||||
for attr_name in data.keys():
|
||||
attr_data = attribute_population._attributes_for_resource(
|
||||
attr_data = v2_attributes.get_resource_info(
|
||||
resource).get(attr_name)
|
||||
if attr_data and attr_data['is_visible']:
|
||||
if policy.check(
|
||||
|
@ -134,7 +157,7 @@ class PolicyHook(hooks.PecanHook):
|
|||
'get_%s:%s' % (resource, attr_name),
|
||||
data,
|
||||
might_not_exist=True,
|
||||
pluralized=attribute_population._plural(resource)):
|
||||
pluralized=collection):
|
||||
# this attribute is visible, check next one
|
||||
continue
|
||||
# if the code reaches this point then either the policy check
|
||||
|
|
|
@ -31,11 +31,11 @@ class QuotaEnforcementHook(hooks.PecanHook):
|
|||
def before(self, state):
|
||||
# TODO(salv-orlando): This hook must go when adapting the pecan code to
|
||||
# use reservations.
|
||||
if state.request.method != 'POST':
|
||||
return
|
||||
resource = state.request.context.get('resource')
|
||||
if state.request.method != 'POST' or not resource:
|
||||
return
|
||||
plugin = manager.NeutronManager.get_plugin_for_resource(resource)
|
||||
items = state.request.resources
|
||||
items = state.request.context.get('resources')
|
||||
deltas = {}
|
||||
for item in items:
|
||||
tenant_id = item['tenant_id']
|
||||
|
|
|
@ -172,35 +172,101 @@ class TestExceptionTranslationHook(PecanFunctionalTest):
|
|||
self.assertEqual(response.status_int, 500)
|
||||
|
||||
|
||||
class TestRequestPopulatingHooks(PecanFunctionalTest):
|
||||
class TestRequestProcessing(PecanFunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRequestPopulatingHooks, self).setUp()
|
||||
super(TestRequestProcessing, self).setUp()
|
||||
|
||||
# request.context is thread-local storage so it has to be accessed by
|
||||
# the controller. We can capture it into a list here to assert on after
|
||||
# the request finishes.
|
||||
|
||||
def capture_request_details(*args, **kwargs):
|
||||
self.req_stash = {
|
||||
'context': request.context['neutron_context'],
|
||||
'resource_type': request.context['resource'],
|
||||
}
|
||||
mock.patch(
|
||||
'neutron.pecan_wsgi.controllers.root.CollectionsController.get',
|
||||
side_effect=capture_request_details
|
||||
).start()
|
||||
self.captured_context = request.context
|
||||
|
||||
mock.patch('neutron.pecan_wsgi.controllers.root.'
|
||||
'CollectionsController.get',
|
||||
side_effect=capture_request_details).start()
|
||||
mock.patch('neutron.pecan_wsgi.controllers.root.'
|
||||
'CollectionsController.create',
|
||||
side_effect=capture_request_details).start()
|
||||
mock.patch('neutron.pecan_wsgi.controllers.root.ItemController.get',
|
||||
side_effect=capture_request_details).start()
|
||||
# TODO(kevinbenton): add context tests for X-Roles etc
|
||||
|
||||
def test_context_set_in_request(self):
|
||||
self.app.get('/v2.0/ports.json',
|
||||
headers={'X-Project-Id': 'tenant_id'})
|
||||
self.assertEqual('tenant_id', self.req_stash['context'].tenant_id)
|
||||
self.assertEqual('tenant_id',
|
||||
self.captured_context['neutron_context'].tenant_id)
|
||||
|
||||
def test_core_resource_identified(self):
|
||||
self.app.get('/v2.0/ports.json')
|
||||
self.assertEqual('port', self.req_stash['resource_type'])
|
||||
self.assertEqual('port', self.captured_context['resource'])
|
||||
self.assertEqual('ports', self.captured_context['collection'])
|
||||
|
||||
def test_lookup_identifies_resource_id(self):
|
||||
# We now this will return a 404 but that's not the point as it is
|
||||
# mocked
|
||||
self.app.get('/v2.0/ports/reina.json')
|
||||
self.assertEqual('port', self.captured_context['resource'])
|
||||
self.assertEqual('ports', self.captured_context['collection'])
|
||||
self.assertEqual('reina', self.captured_context['resource_id'])
|
||||
|
||||
def test_resource_processing_post(self):
|
||||
self.app.post_json(
|
||||
'/v2.0/ports.json',
|
||||
params={'port': {'network_id': self.port['network_id'],
|
||||
'name': 'the_port',
|
||||
'admin_state_up': True}},
|
||||
headers={'X-Project-Id': 'tenid'})
|
||||
self.assertEqual('port', self.captured_context['resource'])
|
||||
self.assertEqual('ports', self.captured_context['collection'])
|
||||
resources = self.captured_context['resources']
|
||||
self.assertEqual(1, len(resources))
|
||||
self.assertEqual(self.port['network_id'],
|
||||
resources[0]['network_id'])
|
||||
self.assertEqual('the_port', resources[0]['name'])
|
||||
|
||||
def test_resource_processing_post_bulk(self):
|
||||
self.app.post_json(
|
||||
'/v2.0/ports.json',
|
||||
params={'ports': [{'network_id': self.port['network_id'],
|
||||
'name': 'the_port_1',
|
||||
'admin_state_up': True},
|
||||
{'network_id': self.port['network_id'],
|
||||
'name': 'the_port_2',
|
||||
'admin_state_up': True}]},
|
||||
headers={'X-Project-Id': 'tenid'})
|
||||
resources = self.captured_context['resources']
|
||||
self.assertEqual(2, len(resources))
|
||||
self.assertEqual(self.port['network_id'],
|
||||
resources[0]['network_id'])
|
||||
self.assertEqual('the_port_1', resources[0]['name'])
|
||||
self.assertEqual(self.port['network_id'],
|
||||
resources[1]['network_id'])
|
||||
self.assertEqual('the_port_2', resources[1]['name'])
|
||||
|
||||
def test_resource_processing_post_unknown_attribute_returns_400(self):
|
||||
response = self.app.post_json(
|
||||
'/v2.0/ports.json',
|
||||
params={'port': {'network_id': self.port['network_id'],
|
||||
'name': 'the_port',
|
||||
'alien': 'E.T.',
|
||||
'admin_state_up': True}},
|
||||
headers={'X-Project-Id': 'tenid'},
|
||||
expect_errors=True)
|
||||
self.assertEqual(400, response.status_int)
|
||||
|
||||
def test_resource_processing_post_validation_errori_returns_400(self):
|
||||
response = self.app.post_json(
|
||||
'/v2.0/ports.json',
|
||||
params={'port': {'network_id': self.port['network_id'],
|
||||
'name': 'the_port',
|
||||
'admin_state_up': 'invalid_value'}},
|
||||
headers={'X-Project-Id': 'tenid'},
|
||||
expect_errors=True)
|
||||
self.assertEqual(400, response.status_int)
|
||||
|
||||
def test_service_plugin_identified(self):
|
||||
# TODO(kevinbenton): fix the unit test setup to include an l3 plugin
|
||||
|
@ -216,14 +282,12 @@ class TestRequestPopulatingHooks(PecanFunctionalTest):
|
|||
class TestEnforcementHooks(PecanFunctionalTest):
|
||||
|
||||
def test_network_ownership_check(self):
|
||||
# TODO(kevinbenton): get a scenario that passes attribute population
|
||||
self.skipTest("Attribute population blocks this test as-is")
|
||||
response = self.app.post_json('/v2.0/ports.json',
|
||||
response = self.app.post_json(
|
||||
'/v2.0/ports.json',
|
||||
params={'port': {'network_id': self.port['network_id'],
|
||||
'admin_state_up': True,
|
||||
'tenant_id': 'tenid2'}},
|
||||
'admin_state_up': True}},
|
||||
headers={'X-Project-Id': 'tenid'})
|
||||
self.assertEqual(response.status_int, 200)
|
||||
self.assertEqual(201, response.status_int)
|
||||
|
||||
def test_quota_enforcement(self):
|
||||
# TODO(kevinbenton): this test should do something
|
||||
|
|
|
@ -1042,3 +1042,35 @@ class TestResDict(base.BaseTestCase):
|
|||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
attributes.populate_tenant_id,
|
||||
ctx, res_dict3, attr_info, True)
|
||||
|
||||
|
||||
class TestHelpers(base.DietTestCase):
|
||||
|
||||
def _verify_port_attributes(self, attrs):
|
||||
for test_attribute in ('id', 'name', 'mac_address', 'network_id',
|
||||
'tenant_id', 'fixed_ips', 'status'):
|
||||
self.assertIn(test_attribute, attrs)
|
||||
|
||||
def test_get_collection_info(self):
|
||||
attrs = attributes.get_collection_info('ports')
|
||||
self._verify_port_attributes(attrs)
|
||||
|
||||
def test_get_collection_info_missing(self):
|
||||
self.assertFalse(attributes.get_collection_info('meh'))
|
||||
|
||||
def test_get_resource_info(self):
|
||||
attributes.REVERSED_PLURALS.pop('port', None)
|
||||
attrs = attributes.get_resource_info('port')
|
||||
self._verify_port_attributes(attrs)
|
||||
# verify side effect
|
||||
self.assertIn('port', attributes.REVERSED_PLURALS)
|
||||
|
||||
def test_get_resource_info_missing(self):
|
||||
self.assertFalse(attributes.get_resource_info('meh'))
|
||||
|
||||
def test_get_resource_info_cached(self):
|
||||
with mock.patch('neutron.api.v2.attributes.PLURALS') as mock_plurals:
|
||||
attributes.REVERSED_PLURALS['port'] = 'ports'
|
||||
attrs = attributes.get_resource_info('port')
|
||||
self._verify_port_attributes(attrs)
|
||||
self.assertEqual(0, mock_plurals.items.call_count)
|
||||
|
|
Loading…
Reference in New Issue