Supporting pagination in api v2.0

Implements bp support-pagination-in-api-v2.0

Add sort and pagination feature for api v2.0.
*Add sort_key, sort_dir for sorting feature.
*Add limit, marker, page_reverse for pagination feature.
*Add emulated sorting and pagination

Native pagination and native sorting implemented for:
*plugins: ovs, linuxbridge
*extensions: l3 securitygroup

Emulated pagination and sorting enabled for:
extensions: lbaas

Change-Id: I28ec6ab7bcae9839cb42c6d60bbfff7250de13ed
This commit is contained in:
He Jie Xu 2012-11-01 16:05:44 +08:00
parent 9cb412eb88
commit 4522a80b6e
23 changed files with 2098 additions and 217 deletions

View File

@ -69,6 +69,10 @@ api_paste_config = api-paste.ini
# Enable or disable bulk create/update/delete operations
# allow_bulk = True
# Enable or disable pagination
# allow_pagination = False
# Enable or disable sorting
# allow_sorting = False
# Enable or disable overlapping IPs for subnets
# Attention: the following parameter MUST be set to False if Quantum is
# being used in conjunction with nova security groups and/or metadata service.
@ -174,6 +178,13 @@ default_notification_level = INFO
# The actual topic names will be %s.%(default_notification_level)s
notification_topics = notifications
# Default maximum number of items returned in a single response,
# value == infinite and value < 0 means no max limit, and value must
# greater than 0. If the number of items requested is greater than
# pagination_max_limit, server will just return pagination_max_limit
# of number of items.
# pagination_max_limit = -1
[QUOTAS]
# resource name(s) that are supported in quota features
# quota_items = network,subnet,port

View File

@ -15,14 +15,282 @@
# License for the specific language governing permissions and limitations
# under the License.
import urllib
from webob import exc
from quantum.common import constants
from quantum.common import exceptions
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def get_filters(request, attr_info, skips=[]):
"""
Extracts the filters from the request string
Returns a dict of lists for the filters:
check=a&check=b&name=Bob&
becomes:
{'check': [u'a', u'b'], 'name': [u'Bob']}
"""
res = {}
for key, values in request.GET.dict_of_lists().iteritems():
if key in skips:
continue
values = [v for v in values if v]
key_attr_info = attr_info.get(key, {})
if 'convert_list_to' in key_attr_info:
values = key_attr_info['convert_list_to'](values)
elif 'convert_to' in key_attr_info:
convert_to = key_attr_info['convert_to']
values = [convert_to(v) for v in values]
if values:
res[key] = values
return res
def get_previous_link(request, items, id_key):
params = request.GET.copy()
params.pop('marker', None)
if items:
marker = items[0][id_key]
params['marker'] = marker
params['page_reverse'] = True
return "%s?%s" % (request.path_url, urllib.urlencode(params))
def get_next_link(request, items, id_key):
params = request.GET.copy()
params.pop('marker', None)
if items:
marker = items[-1][id_key]
params['marker'] = marker
params.pop('page_reverse', None)
return "%s?%s" % (request.path_url, urllib.urlencode(params))
def get_limit_and_marker(request):
"""Return marker, limit tuple from request.
:param request: `wsgi.Request` possibly containing 'marker' and 'limit'
GET variables. 'marker' is the id of the last element
the client has seen, and 'limit' is the maximum number
of items to return. If limit == 0, it means we needn't
pagination, then return None.
"""
max_limit = _get_pagination_max_limit()
limit = _get_limit_param(request, max_limit)
if max_limit > 0:
limit = min(max_limit, limit) or max_limit
if not limit:
return None, None
marker = request.GET.get('marker', None)
return limit, marker
def _get_pagination_max_limit():
max_limit = -1
if (cfg.CONF.pagination_max_limit.lower() !=
constants.PAGINATION_INFINITE):
try:
max_limit = int(cfg.CONF.pagination_max_limit)
if max_limit == 0:
raise ValueError()
except ValueError:
LOG.warn(_("Invalid value for pagination_max_limit: %s. It "
"should be an integer greater to 0"),
cfg.CONF.pagination_max_limit)
return max_limit
def _get_limit_param(request, max_limit):
"""Extract integer limit from request or fail."""
try:
limit = int(request.GET.get('limit', 0))
if limit >= 0:
return limit
except ValueError:
pass
msg = _("Limit must be an integer 0 or greater and not '%d'")
raise exceptions.BadRequest(resource='limit', msg=msg)
def list_args(request, arg):
"""Extracts the list of arg from request"""
return [v for v in request.GET.getall(arg) if v]
def get_sorts(request, attr_info):
"""Extract sort_key and sort_dir from request, return as:
[(key1, value1), (key2, value2)]
"""
sort_keys = list_args(request, "sort_key")
sort_dirs = list_args(request, "sort_dir")
if len(sort_keys) != len(sort_dirs):
msg = _("The number of sort_keys and sort_dirs must be same")
raise exc.HTTPBadRequest(explanation=msg)
valid_dirs = [constants.SORT_DIRECTION_ASC, constants.SORT_DIRECTION_DESC]
absent_keys = [x for x in sort_keys if x not in attr_info]
if absent_keys:
msg = _("%s is invalid attribute for sort_keys") % absent_keys
raise exc.HTTPBadRequest(explanation=msg)
invalid_dirs = [x for x in sort_dirs if x not in valid_dirs]
if invalid_dirs:
msg = (_("%(invalid_dirs)s is invalid value for sort_dirs, "
"valid value is '%(asc)s' and '%(desc)s'") %
{'invalid_dirs': invalid_dirs,
'asc': constants.SORT_DIRECTION_ASC,
'desc': constants.SORT_DIRECTION_DESC})
raise exc.HTTPBadRequest(explanation=msg)
return zip(sort_keys,
[x == constants.SORT_DIRECTION_ASC for x in sort_dirs])
def get_page_reverse(request):
data = request.GET.get('page_reverse', 'False')
return data.lower() == "true"
def get_pagination_links(request, items, limit,
marker, page_reverse, key="id"):
key = key if key else 'id'
links = []
if not limit:
return links
if not (len(items) < limit and not page_reverse):
links.append({"rel": "next",
"href": get_next_link(request, items,
key)})
if not (len(items) < limit and page_reverse):
links.append({"rel": "previous",
"href": get_previous_link(request, items,
key)})
return links
class PaginationHelper(object):
def __init__(self, request, primary_key='id'):
self.request = request
self.primary_key = primary_key
def update_fields(self, original_fields, fields_to_add):
pass
def update_args(self, args):
pass
def paginate(self, items):
return items
def get_links(self, items):
return {}
class PaginationEmulatedHelper(PaginationHelper):
def __init__(self, request, primary_key='id'):
super(PaginationEmulatedHelper, self).__init__(request, primary_key)
self.limit, self.marker = get_limit_and_marker(request)
self.page_reverse = get_page_reverse(request)
def update_fields(self, original_fields, fields_to_add):
if not original_fields:
return
if self.primary_key not in original_fields:
original_fields.append(self.primary_key)
fields_to_add.append(self.primary_key)
def paginate(self, items):
if not self.limit:
return items
i = -1
if self.marker:
for item in items:
i = i + 1
if item[self.primary_key] == self.marker:
break
if self.page_reverse:
return items[i - self.limit:i]
return items[i + 1:i + self.limit + 1]
def get_links(self, items):
return get_pagination_links(
self.request, items, self.limit, self.marker,
self.page_reverse, self.primary_key)
class PaginationNativeHelper(PaginationEmulatedHelper):
def update_args(self, args):
if self.primary_key not in dict(args.get('sorts', [])).keys():
args.setdefault('sorts', []).append((self.primary_key, True))
args.update({'limit': self.limit, 'marker': self.marker,
'page_reverse': self.page_reverse})
def paginate(self, items):
return items
class NoPaginationHelper(PaginationHelper):
pass
class SortingHelper(object):
def __init__(self, request, attr_info):
pass
def update_args(self, args):
pass
def update_fields(self, original_fields, fields_to_add):
pass
def sort(self, items):
return items
class SortingEmulatedHelper(SortingHelper):
def __init__(self, request, attr_info):
super(SortingEmulatedHelper, self).__init__(request, attr_info)
self.sort_dict = get_sorts(request, attr_info)
def update_fields(self, original_fields, fields_to_add):
if not original_fields:
return
for key in dict(self.sort_dict).keys():
if key not in original_fields:
original_fields.append(key)
fields_to_add.append(key)
def sort(self, items):
def cmp_func(obj1, obj2):
for key, direction in self.sort_dict:
ret = cmp(obj1[key], obj2[key])
if ret:
return ret * (1 if direction else -1)
return 0
return sorted(items, cmp=cmp_func)
class SortingNativeHelper(SortingHelper):
def __init__(self, request, attr_info):
self.sort_dict = get_sorts(request, attr_info)
def update_args(self, args):
args['sorts'] = self.sort_dict
class NoSortingHelper(SortingHelper):
pass
class QuantumController(object):
""" Base controller class for Quantum API """
# _resource_name will be redefined in sub concrete controller

View File

@ -476,7 +476,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'networks': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': '', 'is_visible': True},
@ -504,7 +505,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'ports': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'name': {'allow_post': True, 'allow_put': True, 'default': '',
'validate': {'type:string': None},
'is_visible': True},
@ -545,7 +547,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'subnets': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'name': {'allow_post': True, 'allow_put': True, 'default': '',
'validate': {'type:string': None},
'is_visible': True},

View File

@ -20,6 +20,7 @@ import webob.exc
from oslo.config import cfg
from quantum.api import api_common
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from quantum.api.v2 import attributes
from quantum.api.v2 import resource as wsgi_resource
@ -42,41 +43,6 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
}
def _fields(request):
"""
Extracts the list of fields to return
"""
return [v for v in request.GET.getall('fields') if v]
def _filters(request, attr_info):
"""
Extracts the filters from the request string
Returns a dict of lists for the filters:
check=a&check=b&name=Bob&
becomes
{'check': [u'a', u'b'], 'name': [u'Bob']}
"""
res = {}
for key, values in request.GET.dict_of_lists().iteritems():
if key == 'fields':
continue
values = [v for v in values if v]
key_attr_info = attr_info.get(key, {})
if 'convert_list_to' in key_attr_info:
values = key_attr_info['convert_list_to'](values)
elif 'convert_to' in key_attr_info:
convert_to = key_attr_info['convert_to']
values = [convert_to(v) for v in values]
if values:
res[key] = values
return res
class Controller(object):
LIST = 'list'
SHOW = 'show'
@ -85,7 +51,8 @@ class Controller(object):
DELETE = 'delete'
def __init__(self, plugin, collection, resource, attr_info,
allow_bulk=False, member_actions=None, parent=None):
allow_bulk=False, member_actions=None, parent=None,
allow_pagination=False, allow_sorting=False):
if member_actions is None:
member_actions = []
self._plugin = plugin
@ -93,12 +60,26 @@ class Controller(object):
self._resource = resource.replace('-', '_')
self._attr_info = attr_info
self._allow_bulk = allow_bulk
self._allow_pagination = allow_pagination
self._allow_sorting = allow_sorting
self._native_bulk = self._is_native_bulk_supported()
self._native_pagination = self._is_native_pagination_supported()
self._native_sorting = self._is_native_sorting_supported()
self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')]
self._publisher_id = notifier_api.publisher_id('network')
self._dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self._member_actions = member_actions
self._primary_key = self._get_primary_key()
if self._allow_pagination and self._native_pagination:
# Native pagination need native sorting support
if not self._native_sorting:
raise Exception(_("Native pagination depend on native "
"sorting"))
if not self._allow_sorting:
LOG.info(_("Allow sorting is enabled because native "
"pagination requires native sorting"))
self._allow_sorting = True
if parent:
self._parent_id_name = '%s_id' % parent['member_name']
@ -114,11 +95,27 @@ class Controller(object):
self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
self._resource)
def _get_primary_key(self, default_primary_key='id'):
for key, value in self._attr_info.iteritems():
if value.get('primary_key', False):
return key
return default_primary_key
def _is_native_bulk_supported(self):
native_bulk_attr_name = ("_%s__native_bulk_support"
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_bulk_attr_name, False)
def _is_native_pagination_supported(self):
native_pagination_attr_name = ("_%s__native_pagination_support"
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_pagination_attr_name, False)
def _is_native_sorting_supported(self):
native_sorting_attr_name = ("_%s__native_sorting_support"
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_sorting_attr_name, False)
def _is_visible(self, attr):
attr_val = self._attr_info.get(attr)
return attr_val and attr_val['is_visible']
@ -155,18 +152,47 @@ class Controller(object):
else:
raise AttributeError
def _get_pagination_helper(self, request):
if self._allow_pagination and self._native_pagination:
return api_common.PaginationNativeHelper(request,
self._primary_key)
elif self._allow_pagination:
return api_common.PaginationEmulatedHelper(request,
self._primary_key)
return api_common.NoPaginationHelper(request, self._primary_key)
def _get_sorting_helper(self, request):
if self._allow_sorting and self._native_sorting:
return api_common.SortingNativeHelper(request, self._attr_info)
elif self._allow_sorting:
return api_common.SortingEmulatedHelper(request, self._attr_info)
return api_common.NoSortingHelper(request, self._attr_info)
def _items(self, request, do_authz=False, parent_id=None):
"""Retrieves and formats a list of elements of the requested entity"""
# NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the
# plugin before returning.
original_fields, fields_to_add = self._do_field_list(_fields(request))
kwargs = {'filters': _filters(request, self._attr_info),
original_fields, fields_to_add = self._do_field_list(
api_common.list_args(request, 'fields'))
filters = api_common.get_filters(request, self._attr_info,
['fields', 'sort_key', 'sort_dir',
'limit', 'marker', 'page_reverse'])
kwargs = {'filters': filters,
'fields': original_fields}
sorting_helper = self._get_sorting_helper(request)
pagination_helper = self._get_pagination_helper(request)
sorting_helper.update_args(kwargs)
sorting_helper.update_fields(original_fields, fields_to_add)
pagination_helper.update_args(kwargs)
pagination_helper.update_fields(original_fields, fields_to_add)
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
obj_list = obj_getter(request.context, **kwargs)
obj_list = sorting_helper.sort(obj_list)
obj_list = pagination_helper.paginate(obj_list)
# Check authz
if do_authz:
# FIXME(salvatore-orlando): obj_getter might return references to
@ -177,9 +203,15 @@ class Controller(object):
self._plugin_handlers[self.SHOW],
obj,
plugin=self._plugin)]
return {self._collection: [self._view(obj,
fields_to_strip=fields_to_add)
for obj in obj_list]}
collection = {self._collection:
[self._view(obj,
fields_to_strip=fields_to_add)
for obj in obj_list]}
pagination_links = pagination_helper.get_links(obj_list)
if pagination_links:
collection[self._collection + "_links"] = pagination_links
return collection
def _item(self, request, id, do_authz=False, field_list=None,
parent_id=None):
@ -212,7 +244,8 @@ class Controller(object):
# NOTE(salvatore-orlando): The following ensures that fields
# which are needed for authZ policy validation are not stripped
# away by the plugin before returning.
field_list, added_fields = self._do_field_list(_fields(request))
field_list, added_fields = self._do_field_list(
api_common.list_args(request, "fields"))
parent_id = kwargs.get(self._parent_id_name)
return {self._resource:
self._view(self._item(request,
@ -546,8 +579,11 @@ class Controller(object):
def create_resource(collection, resource, plugin, params, allow_bulk=False,
member_actions=None, parent=None):
member_actions=None, parent=None, allow_pagination=False,
allow_sorting=False):
controller = Controller(plugin, collection, resource, params, allow_bulk,
member_actions=member_actions, parent=parent)
member_actions=member_actions, parent=parent,
allow_pagination=allow_pagination,
allow_sorting=allow_sorting)
return wsgi_resource.Resource(controller, FAULT_MAP)

View File

@ -82,10 +82,12 @@ class APIRouter(wsgi.Router):
def _map_resource(collection, resource, params, parent=None):
allow_bulk = cfg.CONF.allow_bulk
controller = base.create_resource(collection, resource,
plugin, params,
allow_bulk=allow_bulk,
parent=parent)
allow_pagination = cfg.CONF.allow_pagination
allow_sorting = cfg.CONF.allow_sorting
controller = base.create_resource(
collection, resource, plugin, params, allow_bulk=allow_bulk,
parent=parent, allow_pagination=allow_pagination,
allow_sorting=allow_sorting)
path_prefix = None
if parent:
path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],

View File

@ -25,6 +25,7 @@ from oslo.config import cfg
from paste import deploy
from quantum.api.v2 import attributes
from quantum.common import constants
from quantum.common import utils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
@ -56,6 +57,14 @@ core_opts = [
help=_("How many times Quantum will retry MAC generation")),
cfg.BoolOpt('allow_bulk', default=True,
help=_("Allow the usage of the bulk API")),
cfg.BoolOpt('allow_pagination', default=False,
help=_("Allow the usage of the pagination")),
cfg.BoolOpt('allow_sorting', default=False,
help=_("Allow the usage of the sorting")),
cfg.StrOpt('pagination_max_limit', default="-1",
help=_("The maximum number of items returned in a single "
"response, value was 'infinite' or negative integer "
"means no limit")),
cfg.IntOpt('max_dns_nameservers', default=5,
help=_("Maximum number of DNS nameservers")),
cfg.IntOpt('max_subnet_host_routes', default=20,

View File

@ -58,3 +58,8 @@ AGENT_TYPE_OVS = 'Open vSwitch agent'
AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
AGENT_TYPE_L3 = 'L3 agent'
L2_AGENT_TOPIC = 'N/A'
PAGINATION_INFINITE = 'infinite'
SORT_DIRECTION_ASC = 'asc'
SORT_DIRECTION_DESC = 'desc'

View File

@ -28,6 +28,7 @@ from quantum.common import constants
from quantum.common import exceptions as q_exc
from quantum.db import api as db
from quantum.db import models_v2
from quantum.db import sqlalchemyutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import timeutils
from quantum.openstack.common import uuidutils
@ -58,9 +59,11 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
"""
# This attribute specifies whether the plugin supports or not
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
# bulk/pagination/sorting operations. Name mangling is used in
# order to ensure it is qualified by class
__native_bulk_support = True
__native_pagination_support = True
__native_sorting_support = True
# Plugins, mixin classes implementing extension will register
# hooks into the dict below for "augmenting" the "core way" of
# building a query for retrieving objects from a model class.
@ -207,15 +210,30 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
query = query.filter(column.in_(value))
return query
def _get_collection_query(self, context, model, filters=None):
def _get_collection_query(self, context, model, filters=None,
sorts=None, limit=None, marker_obj=None,
page_reverse=False):
collection = self._model_query(context, model)
collection = self._apply_filters_to_query(collection, model, filters)
if limit and page_reverse and sorts:
sorts = [(s[0], not s[1]) for s in sorts]
collection = sqlalchemyutils.paginate_query(collection, model, limit,
sorts,
marker_obj=marker_obj)
return collection
def _get_collection(self, context, model, dict_func, filters=None,
fields=None):
query = self._get_collection_query(context, model, filters)
return [dict_func(c, fields) for c in query.all()]
fields=None, sorts=None, limit=None, marker_obj=None,
page_reverse=False):
query = self._get_collection_query(context, model, filters=filters,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
items = [dict_func(c, fields) for c in query.all()]
if limit and page_reverse:
items.reverse()
return items
def _get_collection_count(self, context, model, filters=None):
return self._get_collection_query(context, model, filters).count()
@ -903,6 +921,11 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
raise e
return objects
def _get_marker_obj(self, context, resource, limit, marker):
if limit and marker:
return getattr(self, '_get_%s' % resource)(context, marker)
return None
def create_network_bulk(self, context, networks):
return self._create_bulk('network', context, networks)
@ -965,10 +988,17 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
network = self._get_network(context, id)
return self._make_network_dict(network, fields)
def get_networks(self, context, filters=None, fields=None):
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'network', limit, marker)
return self._get_collection(context, models_v2.Network,
self._make_network_dict,
filters=filters, fields=fields)
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
def get_networks_count(self, context, filters=None):
return self._get_collection_count(context, models_v2.Network,
@ -1192,10 +1222,17 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
subnet = self._get_subnet(context, id)
return self._make_subnet_dict(subnet, fields)
def get_subnets(self, context, filters=None, fields=None):
def get_subnets(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'subnet', limit, marker)
return self._get_collection(context, models_v2.Subnet,
self._make_subnet_dict,
filters=filters, fields=fields)
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
def get_subnets_count(self, context, filters=None):
return self._get_collection_count(context, models_v2.Subnet,
@ -1338,7 +1375,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
port = self._get_port(context, id)
return self._make_port_dict(port, fields)
def _get_ports_query(self, context, filters=None):
def _get_ports_query(self, context, filters=None, sorts=None, limit=None,
marker_obj=None, page_reverse=False):
Port = models_v2.Port
IPAllocation = models_v2.IPAllocation
@ -1358,11 +1396,24 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
query = query.filter(IPAllocation.subnet_id.in_(subnet_ids))
query = self._apply_filters_to_query(query, Port, filters)
if limit and page_reverse and sorts:
sorts = [(s[0], not s[1]) for s in sorts]
query = sqlalchemyutils.paginate_query(query, Port, limit,
sorts, marker_obj)
return query
def get_ports(self, context, filters=None, fields=None):
query = self._get_ports_query(context, filters)
return [self._make_port_dict(c, fields) for c in query.all()]
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'port', limit, marker)
query = self._get_ports_query(context, filters=filters,
sorts=sorts, limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
items = [self._make_port_dict(c, fields) for c in query.all()]
if limit and page_reverse:
items.reverse()
return items
def get_ports_count(self, context, filters=None):
return self._get_ports_query(context, filters).count()

View File

@ -255,10 +255,17 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
router = self._get_router(context, id)
return self._make_router_dict(router, fields)
def get_routers(self, context, filters=None, fields=None):
def get_routers(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'router', limit, marker)
return self._get_collection(context, Router,
self._make_router_dict,
filters=filters, fields=fields)
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
def get_routers_count(self, context, filters=None):
return self._get_collection_count(context, Router,
@ -683,10 +690,18 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
floatingip = self._get_floatingip(context, id)
return self._make_floatingip_dict(floatingip, fields)
def get_floatingips(self, context, filters=None, fields=None):
def get_floatingips(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'floatingip', limit,
marker)
return self._get_collection(context, FloatingIP,
self._make_floatingip_dict,
filters=filters, fields=fields)
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
def get_floatingips_count(self, context, filters=None):
return self._get_collection_count(context, FloatingIP,

View File

@ -187,7 +187,8 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
return collection
def _get_collection(self, context, model, dict_func, filters=None,
fields=None):
fields=None, sorts=None, limit=None, marker_obj=None,
page_reverse=False):
query = self._get_collection_query(context, model, filters)
return [dict_func(c, fields) for c in query.all()]

View File

@ -138,10 +138,18 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
return self._make_security_group_dict(security_group_db)
def get_security_groups(self, context, filters=None, fields=None):
return self._get_collection(context, SecurityGroup,
def get_security_groups(self, context, filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
marker_obj = self._get_marker_obj(context, 'security_group', limit,
marker)
return self._get_collection(context,
SecurityGroup,
self._make_security_group_dict,
filters=filters, fields=fields)
filters=filters, fields=fields,
sorts=sorts,
limit=limit, marker_obj=marker_obj,
page_reverse=page_reverse)
def get_security_groups_count(self, context, filters=None):
return self._get_collection_count(context, SecurityGroup,
@ -220,7 +228,8 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
def _get_port_security_group_bindings(self, context,
filters=None, fields=None):
return self._get_collection(context, SecurityGroupPortBinding,
return self._get_collection(context,
SecurityGroupPortBinding,
self._make_security_group_binding_dict,
filters=filters, fields=fields)
@ -373,10 +382,18 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
if rules:
raise ext_sg.SecurityGroupRuleExists(id=str(rules[0]['id']))
def get_security_group_rules(self, context, filters=None, fields=None):
return self._get_collection(context, SecurityGroupRule,
def get_security_group_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'security_group_rule',
limit, marker)
return self._get_collection(context,
SecurityGroupRule,
self._make_security_group_rule_dict,
filters=filters, fields=fields)
filters=filters, fields=fields,
sorts=sorts,
limit=limit, marker_obj=marker_obj,
page_reverse=page_reverse)
def get_security_group_rules_count(self, context, filters=None):
return self._get_collection_count(context, SecurityGroupRule,

View File

@ -0,0 +1,108 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
from sqlalchemy.orm.properties import RelationshipProperty
from quantum.common import exceptions as q_exc
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def paginate_query(query, model, limit, sorts, marker_obj=None):
"""Returns a query with sorting / pagination criteria added.
Pagination works by requiring a unique sort key, specified by sorts.
(If sort keys is not unique, then we risk looping through values.)
We use the last row in the previous page as the 'marker' for pagination.
So we must return values that follow the passed marker in the order.
With a single-valued sort key, this would be easy: sort_key > X.
With a compound-values sort key, (k1, k2, k3) we must do this to repeat
the lexicographical ordering:
(k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3)
The reason of didn't use OFFSET clause was it don't scale, please refer
discussion at https://lists.launchpad.net/openstack/msg02547.html
We also have to cope with different sort directions.
Typically, the id of the last row is used as the client-facing pagination
marker, then the actual marker object must be fetched from the db and
passed in to us as marker.
:param query: the query object to which we should add paging/sorting
:param model: the ORM model class
:param limit: maximum number of items to return
:param sorts: array of attributes and direction by which results should
be sorted
:param marker: the last item of the previous page; we returns the next
results after this value.
:rtype: sqlalchemy.orm.query.Query
:return: The query with sorting/pagination added.
"""
if not sorts:
return query
# A primary key must be specified in sort keys
assert not (limit and
len(set(dict(sorts).keys()) &
set(model.__table__.primary_key.columns.keys())) == 0)
# Add sorting
for sort_key, sort_direction in sorts:
sort_dir_func = sqlalchemy.asc if sort_direction else sqlalchemy.desc
try:
sort_key_attr = getattr(model, sort_key)
except AttributeError:
# Extension attribute doesn't support for sorting. Because it
# existed in attr_info, it will be catched at here
msg = _("%s is invalid attribute for sort_key") % sort_key
raise q_exc.BadRequest(resource=model.__tablename__, msg=msg)
if isinstance(sort_key_attr.property, RelationshipProperty):
msg = _("The attribute '%(attr)s' is reference to other "
"resource, can't used by sort "
"'%(resource)s'") % {'attr': sort_key,
'resource': model.__tablename__}
raise q_exc.BadRequest(resource=model.__tablename__, msg=msg)
query = query.order_by(sort_dir_func(sort_key_attr))
# Add pagination
if marker_obj:
marker_values = [getattr(marker_obj, sort[0]) for sort in sorts]
# Build up an array of sort criteria as in the docstring
criteria_list = []
for i, sort in enumerate(sorts):
crit_attrs = [(getattr(model, sorts[j][0]) == marker_values[j])
for j in xrange(i)]
model_attr = getattr(model, sort[0])
if sort[1]:
crit_attrs.append((model_attr > marker_values[i]))
else:
crit_attrs.append((model_attr < marker_values[i]))
criteria = sqlalchemy.sql.and_(*crit_attrs)
criteria_list.append(criteria)
f = sqlalchemy.sql.or_(*criteria_list)
query = query.filter(f)
if limit:
query = query.limit(limit)
return query

View File

@ -93,7 +93,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'routers': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
@ -113,7 +114,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'floatingips': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'floating_ip_address': {'allow_post': False, 'allow_put': False,
'validate': {'type:ip_address_or_none': None},
'is_visible': True},
@ -201,10 +203,11 @@ class L3(extensions.ExtensionDescriptor):
quota.QUOTAS.register_resource_by_name(resource_name)
controller = base.create_resource(collection_name,
resource_name,
plugin, params,
member_actions=member_actions)
controller = base.create_resource(
collection_name, resource_name, plugin, params,
member_actions=member_actions,
allow_pagination=cfg.CONF.allow_pagination,
allow_sorting=cfg.CONF.allow_sorting)
ex = extensions.ResourceExtension(collection_name,
controller,
@ -245,7 +248,8 @@ class RouterPluginBase(object):
pass
@abstractmethod
def get_routers(self, context, filters=None, fields=None):
def get_routers(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
pass
@abstractmethod
@ -273,7 +277,9 @@ class RouterPluginBase(object):
pass
@abstractmethod
def get_floatingips(self, context, filters=None, fields=None):
def get_floatingips(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
def get_routers_count(self, context, filters=None):

View File

@ -22,6 +22,7 @@ from quantum.api.v2 import attributes as attr
from quantum.api.v2 import base
from quantum.common import exceptions as qexception
from quantum import manager
from quantum.openstack.common import cfg
from quantum.plugins.common import constants
from quantum.plugins.services.service_base import ServicePluginBase
@ -59,7 +60,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'vips': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
@ -113,7 +115,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'pools': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
@ -152,7 +155,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'members': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
@ -182,7 +186,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'health_monitors': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
@ -279,10 +284,11 @@ class Loadbalancer(extensions.ExtensionDescriptor):
if resource_name == 'pool':
member_actions = {'stats': 'GET'}
controller = base.create_resource(collection_name,
resource_name,
plugin, params,
member_actions=member_actions)
controller = base.create_resource(
collection_name, resource_name, plugin, params,
member_actions=member_actions,
allow_pagination=cfg.CONF.allow_pagination,
allow_sorting=cfg.CONF.allow_sorting)
resource = extensions.ResourceExtension(
collection_name,

View File

@ -186,7 +186,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'security_groups': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
'name': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': '',
'validate': {'type:name_not_default': None}},
@ -204,7 +205,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'security_group_rules': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'is_visible': True,
'primary_key': True},
# external_id can be used to be backwards compatible with nova
'external_id': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': None,
@ -301,7 +303,9 @@ class Securitygroup(extensions.ExtensionDescriptor):
quota.QUOTAS.register_resource_by_name(resource_name)
controller = base.create_resource(collection_name,
resource_name,
plugin, params, allow_bulk=True)
plugin, params, allow_bulk=True,
allow_pagination=True,
allow_sorting=True)
ex = extensions.ResourceExtension(collection_name,
controller,
@ -329,7 +333,9 @@ class SecurityGroupPluginBase(object):
pass
@abstractmethod
def get_security_groups(self, context, filters=None, fields=None):
def get_security_groups(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
@abstractmethod
@ -345,7 +351,9 @@ class SecurityGroupPluginBase(object):
pass
@abstractmethod
def get_security_group_rules(self, context, filters=None, fields=None):
def get_security_group_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
@abstractmethod

View File

@ -192,9 +192,11 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
"""
# This attribute specifies whether the plugin supports or not
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
# bulk/pagination/sorting operations. Name mangling is used in
# order to ensure it is qualified by class
__native_bulk_support = True
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["provider", "router", "binding", "quotas",
"security-group", "agent"]
@ -436,12 +438,13 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._extend_network_dict_l3(context, net)
return self._fields(net, fields)
def get_networks(self, context, filters=None, fields=None):
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
session = context.session
with session.begin(subtransactions=True):
nets = super(LinuxBridgePluginV2, self).get_networks(context,
filters,
None)
nets = super(LinuxBridgePluginV2,
self).get_networks(context, filters, None, sorts,
limit, marker, page_reverse)
for net in nets:
self._extend_network_dict_provider(context, net)
self._extend_network_dict_l3(context, net)
@ -468,12 +471,13 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._extend_port_dict_binding(context, port),
return self._fields(port, fields)
def get_ports(self, context, filters=None, fields=None):
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
res_ports = []
with context.session.begin(subtransactions=True):
ports = super(LinuxBridgePluginV2, self).get_ports(context,
filters,
fields)
ports = super(LinuxBridgePluginV2,
self).get_ports(context, filters, fields, sorts,
limit, marker, page_reverse)
#TODO(nati) filter by security group
for port in ports:
self._extend_port_dict_security_group(context, port)

View File

@ -231,9 +231,12 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
"""
# This attribute specifies whether the plugin supports or not
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
# bulk/pagination/sorting operations. Name mangling is used in
# order to ensure it is qualified by class
__native_bulk_support = True
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["provider", "router",
"binding", "quotas", "security-group",
"agent"]
@ -523,12 +526,14 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._extend_network_dict_l3(context, net)
return self._fields(net, fields)
def get_networks(self, context, filters=None, fields=None):
def get_networks(self, context, filters=None, fields=None,
sorts=None,
limit=None, marker=None, page_reverse=False):
session = context.session
with session.begin(subtransactions=True):
nets = super(OVSQuantumPluginV2, self).get_networks(context,
filters,
None)
nets = super(OVSQuantumPluginV2,
self).get_networks(context, filters, None, sorts,
limit, marker, page_reverse)
for net in nets:
self._extend_network_dict_provider(context, net)
self._extend_network_dict_l3(context, net)
@ -574,10 +579,13 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._extend_port_dict_binding(context, port)
return self._fields(port, fields)
def get_ports(self, context, filters=None, fields=None):
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
with context.session.begin(subtransactions=True):
ports = super(OVSQuantumPluginV2, self).get_ports(
context, filters, fields)
context, filters, fields, sorts, limit, marker,
page_reverse)
#TODO(nati) filter by security group
for port in ports:
self._extend_port_dict_security_group(context, port)

View File

@ -69,7 +69,8 @@ class QuantumPluginBaseV2(object):
pass
@abstractmethod
def get_subnets(self, context, filters=None, fields=None):
def get_subnets(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
"""
Retrieve a list of subnets. The contents of the list depends on
the identity of the user making the request (as indicated by the
@ -156,7 +157,8 @@ class QuantumPluginBaseV2(object):
pass
@abstractmethod
def get_networks(self, context, filters=None, fields=None):
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
"""
Retrieve a list of networks. The contents of the list depends on
the identity of the user making the request (as indicated by the
@ -243,7 +245,8 @@ class QuantumPluginBaseV2(object):
pass
@abstractmethod
def get_ports(self, context, filters=None, fields=None):
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
"""
Retrieve a list of ports. The contents of the list depends on
the identity of the user making the request (as indicated by the

View File

@ -83,6 +83,8 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
cfg.CONF.set_override('core_plugin', core_plugin)
cfg.CONF.set_override('service_plugins', service_plugins)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.set_override('allow_pagination', True)
cfg.CONF.set_override('allow_sorting', True)
self.api = APIRouter()
plugin = loadbalancerPlugin.LoadBalancerPlugin()
@ -269,6 +271,86 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize(res)
def _test_list_with_sort(self, collection, items, sorts, query_params=''):
query_str = query_params
for key, direction in sorts:
query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
direction)
req = self.new_list_request('%ss' % collection,
params=query_str)
api = self._api_for_resource('%ss' % collection)
res = self.deserialize(req.get_response(api))
collection = collection.replace('-', '_')
expected_res = [item[collection]['id'] for item in items]
self.assertListEqual([n['id'] for n in res["%ss" % collection]],
expected_res)
def _test_list_with_pagination(self, collection, items, sort,
limit, expected_page_num, query_params=''):
if self.fmt == 'xml':
self.skipTest("Skip xml test for pagination")
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&sort_key=%s&"
"sort_dir=%s") % (limit, sort[0], sort[1])
req = self.new_list_request("%ss" % collection, params=query_str)
items_res = []
page_num = 0
api = self._api_for_resource('%ss' % collection)
collection = collection.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(req.get_response(api))
self.assertLessEqual(len(res["%ss" % collection]), limit)
items_res = items_res + res["%ss" % collection]
req = None
if '%ss_links' % collection in res:
for link in res['%ss_links' % collection]:
if link['rel'] == 'next':
req = create_request(link['href'],
'', 'application/json')
self.assertEqual(len(res["%ss" % collection]),
limit)
self.assertEqual(page_num, expected_page_num)
self.assertListEqual([n['id'] for n in items_res],
[item[collection]['id'] for item in items])
def _test_list_with_pagination_reverse(self, collection, items, sort,
limit, expected_page_num,
query_params=''):
if self.fmt == 'xml':
self.skipTest("Skip xml test for pagination")
resources = '%ss' % collection
collection = collection.replace('-', '_')
api = self._api_for_resource(resources)
marker = items[-1][collection]['id']
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&page_reverse=True&"
"sort_key=%s&sort_dir=%s&"
"marker=%s") % (limit, sort[0], sort[1],
marker)
req = self.new_list_request(resources, params=query_str)
item_res = [items[-1][collection]]
page_num = 0
while req:
page_num = page_num + 1
res = self.deserialize(req.get_response(api))
self.assertLessEqual(len(res["%ss" % collection]), limit)
res["%ss" % collection].reverse()
item_res = item_res + res["%ss" % collection]
req = None
if '%ss_links' % collection in res:
for link in res['%ss_links' % collection]:
if link['rel'] == 'previous':
req = create_request(link['href'],
'', 'application/json')
self.assertEqual(len(res["%ss" % collection]),
limit)
self.assertEqual(page_num, expected_page_num)
expected_res = [item[collection]['id'] for item in items]
expected_res.reverse()
self.assertListEqual([n['id'] for n in item_res],
expected_res)
@contextlib.contextmanager
def vip(self, fmt=None, name='vip1', pool=None,
protocol='HTTP', port=80, admin_state_up=True, no_delete=False,
@ -552,6 +634,32 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
for k, v in keys:
self.assertEqual(res['vips'][0][k], v)
def test_list_vips_with_sort_emulated(self):
with contextlib.nested(self.vip(name='vip1', port=81),
self.vip(name='vip2', port=82),
self.vip(name='vip3', port=82)
) as (vip1, vip2, vip3):
self._test_list_with_sort('vip', (vip1, vip3, vip2),
[('port', 'asc'), ('name', 'desc')])
def test_list_vips_with_pagination_emulated(self):
with contextlib.nested(self.vip(name='vip1'),
self.vip(name='vip2'),
self.vip(name='vip3')
) as (vip1, vip2, vip3):
self._test_list_with_pagination('vip',
(vip1, vip2, vip3),
('name', 'asc'), 2, 2)
def test_list_vips_with_pagination_reverse_emulated(self):
with contextlib.nested(self.vip(name='vip1'),
self.vip(name='vip2'),
self.vip(name='vip3')
) as (vip1, vip2, vip3):
self._test_list_with_pagination_reverse('vip',
(vip1, vip2, vip3),
('name', 'asc'), 2, 2)
def test_create_pool_with_invalid_values(self):
name = 'pool3'
@ -631,6 +739,32 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
for k, v in keys:
self.assertEqual(res['pool'][k], v)
def test_list_pools_with_sort_emulated(self):
with contextlib.nested(self.pool(name='p1'),
self.pool(name='p2'),
self.pool(name='p3')
) as (p1, p2, p3):
self._test_list_with_sort('pool', (p3, p2, p1),
[('name', 'desc')])
def test_list_pools_with_pagination_emulated(self):
with contextlib.nested(self.pool(name='p1'),
self.pool(name='p2'),
self.pool(name='p3')
) as (p1, p2, p3):
self._test_list_with_pagination('pool',
(p1, p2, p3),
('name', 'asc'), 2, 2)
def test_list_pools_with_pagination_reverse_emulated(self):
with contextlib.nested(self.pool(name='p1'),
self.pool(name='p2'),
self.pool(name='p3')
) as (p1, p2, p3):
self._test_list_with_pagination_reverse('pool',
(p1, p2, p3),
('name', 'asc'), 2, 2)
def test_create_member(self):
with self.pool() as pool:
pool_id = pool['pool']['id']
@ -735,6 +869,44 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
for k, v in keys:
self.assertEqual(res['member'][k], v)
def test_list_members_with_sort_emulated(self):
with self.pool() as pool:
with contextlib.nested(self.member(pool_id=pool['pool']['id'],
port=81),
self.member(pool_id=pool['pool']['id'],
port=82),
self.member(pool_id=pool['pool']['id'],
port=83)
) as (m1, m2, m3):
self._test_list_with_sort('member', (m3, m2, m1),
[('port', 'desc')])
def test_list_members_with_pagination_emulated(self):
with self.pool() as pool:
with contextlib.nested(self.member(pool_id=pool['pool']['id'],
port=81),
self.member(pool_id=pool['pool']['id'],
port=82),
self.member(pool_id=pool['pool']['id'],
port=83)
) as (m1, m2, m3):
self._test_list_with_pagination('member',
(m1, m2, m3),
('port', 'asc'), 2, 2)
def test_list_members_with_pagination_reverse_emulated(self):
with self.pool() as pool:
with contextlib.nested(self.member(pool_id=pool['pool']['id'],
port=81),
self.member(pool_id=pool['pool']['id'],
port=82),
self.member(pool_id=pool['pool']['id'],
port=83)
) as (m1, m2, m3):
self._test_list_with_pagination_reverse('member',
(m1, m2, m3),
('port', 'asc'), 2, 2)
def test_create_healthmonitor(self):
keys = [('type', "TCP"),
('tenant_id', self._tenant_id),
@ -790,6 +962,32 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
for k, v in keys:
self.assertEqual(res['health_monitor'][k], v)
def test_list_healthmonitors_with_sort_emulated(self):
with contextlib.nested(self.health_monitor(delay=30),
self.health_monitor(delay=31),
self.health_monitor(delay=32)
) as (m1, m2, m3):
self._test_list_with_sort('health_monitor', (m3, m2, m1),
[('delay', 'desc')])
def test_list_healthmonitors_with_pagination_emulated(self):
with contextlib.nested(self.health_monitor(delay=30),
self.health_monitor(delay=31),
self.health_monitor(delay=32)
) as (m1, m2, m3):
self._test_list_with_pagination('health_monitor',
(m1, m2, m3),
('delay', 'asc'), 2, 2)
def test_list_healthmonitors_with_pagination_reverse_emulated(self):
with contextlib.nested(self.health_monitor(delay=30),
self.health_monitor(delay=31),
self.health_monitor(delay=32)
) as (m1, m2, m3):
self._test_list_with_pagination_reverse('health_monitor',
(m1, m2, m3),
('delay', 'asc'), 2, 2)
def test_get_pool_stats(self):
keys = [("bytes_in", 0),
("bytes_out", 0),

View File

@ -16,6 +16,7 @@
# under the License.
import os
import urlparse
import mock
from oslo.config import cfg
@ -25,6 +26,7 @@ from webob import exc
import webtest
from quantum.api.extensions import PluginAwareExtensionManager
from quantum.api import api_common
from quantum.api.v2 import attributes
from quantum.api.v2 import base
from quantum.api.v2 import router
@ -102,10 +104,13 @@ class APIv2TestBase(unittest.TestCase):
config.parse(args=args)
# Update the plugin
cfg.CONF.set_override('core_plugin', plugin)
cfg.CONF.set_override('allow_pagination', True)
cfg.CONF.set_override('allow_sorting', True)
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
instance = self.plugin.return_value
instance._QuantumPluginBaseV2__native_pagination_support = True
instance._QuantumPluginBaseV2__native_sorting_support = True
api = router.APIRouter()
self.api = webtest.TestApp(api)
super(APIv2TestBase, self).setUp()
@ -117,6 +122,21 @@ class APIv2TestBase(unittest.TestCase):
cfg.CONF.reset()
class _ArgMatcher(object):
""" An adapter to assist mock assertions, used to custom compare """
def __init__(self, cmp, obj):
self.cmp = cmp
self.obj = obj
def __eq__(self, other):
return self.cmp(self.obj, other)
def _list_cmp(l1, l2):
return set(l1) == set(l2)
class APIv2TestCase(APIv2TestBase):
# NOTE(jkoelker) This potentially leaks the mock object if the setUp
# raises without being caught. Using unittest2
@ -125,20 +145,28 @@ class APIv2TestCase(APIv2TestBase):
def _do_field_list(self, resource, base_fields):
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[resource]
policy_attrs = [name for (name, info) in attr_info.items()
if info.get('required_by_policy')]
if info.get('required_by_policy') or
info.get('primary_key')]
fields = base_fields
fields.extend(policy_attrs)
return fields
def _get_collection_kwargs(self, skipargs=[], **kwargs):
args_list = ['filters', 'fields', 'sorts', 'limit', 'marker',
'page_reverse']
args_dict = dict((arg, mock.ANY)
for arg in set(args_list) - set(skipargs))
args_dict.update(kwargs)
return args_dict
def test_fields(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'fields': 'foo'})
fields = self._do_field_list('networks', ['foo'])
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
fields=fields)
kwargs = self._get_collection_kwargs(fields=fields)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_fields_multiple(self):
instance = self.plugin.return_value
@ -146,9 +174,8 @@ class APIv2TestCase(APIv2TestBase):
fields = self._do_field_list('networks', ['foo', 'bar'])
self.api.get(_get_path('networks'), {'fields': ['foo', 'bar']})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
fields=fields)
kwargs = self._get_collection_kwargs(fields=fields)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_fields_multiple_with_empty(self):
instance = self.plugin.return_value
@ -156,99 +183,89 @@ class APIv2TestCase(APIv2TestBase):
fields = self._do_field_list('networks', ['foo'])
self.api.get(_get_path('networks'), {'fields': ['foo', '']})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
fields=fields)
kwargs = self._get_collection_kwargs(fields=fields)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_fields_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'fields': ''})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
fields=[])
kwargs = self._get_collection_kwargs(fields=[])
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_fields_multiple_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'fields': ['', '']})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
fields=[])
kwargs = self._get_collection_kwargs(fields=[])
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'foo': 'bar'})
filters = {'foo': ['bar']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
fields=mock.ANY)
self.api.get(_get_path('networks'), {'name': 'bar'})
filters = {'name': ['bar']}
kwargs = self._get_collection_kwargs(filters=filters)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'foo': ''})
self.api.get(_get_path('networks'), {'name': ''})
filters = {}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
fields=mock.ANY)
kwargs = self._get_collection_kwargs(filters=filters)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_multiple_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'foo': ['', '']})
self.api.get(_get_path('networks'), {'name': ['', '']})
filters = {}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
fields=mock.ANY)
kwargs = self._get_collection_kwargs(filters=filters)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_multiple_with_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'foo': ['bar', '']})
filters = {'foo': ['bar']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
fields=mock.ANY)
self.api.get(_get_path('networks'), {'name': ['bar', '']})
filters = {'name': ['bar']}
kwargs = self._get_collection_kwargs(filters=filters)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_multiple_values(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'foo': ['bar', 'bar2']})
filters = {'foo': ['bar', 'bar2']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
fields=mock.ANY)
self.api.get(_get_path('networks'), {'name': ['bar', 'bar2']})
filters = {'name': ['bar', 'bar2']}
kwargs = self._get_collection_kwargs(filters=filters)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_multiple(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'foo': 'bar',
'foo2': 'bar2'})
filters = {'foo': ['bar'], 'foo2': ['bar2']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
fields=mock.ANY)
self.api.get(_get_path('networks'), {'name': 'bar',
'tenant_id': 'bar2'})
filters = {'name': ['bar'], 'tenant_id': ['bar2']}
kwargs = self._get_collection_kwargs(filters=filters)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_with_fields(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'foo': 'bar', 'fields': 'foo'})
filters = {'foo': ['bar']}
self.api.get(_get_path('networks'), {'name': 'bar', 'fields': 'foo'})
filters = {'name': ['bar']}
fields = self._do_field_list('networks', ['foo'])
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
fields=fields)
kwargs = self._get_collection_kwargs(filters=filters, fields=fields)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_with_convert_to(self):
instance = self.plugin.return_value
@ -256,9 +273,8 @@ class APIv2TestCase(APIv2TestBase):
self.api.get(_get_path('ports'), {'admin_state_up': 'true'})
filters = {'admin_state_up': [True]}
instance.get_ports.assert_called_once_with(mock.ANY,
filters=filters,
fields=mock.ANY)
kwargs = self._get_collection_kwargs(filters=filters)
instance.get_ports.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_with_convert_list_to(self):
instance = self.plugin.return_value
@ -267,9 +283,244 @@ class APIv2TestCase(APIv2TestBase):
self.api.get(_get_path('ports'),
{'fixed_ips': ['ip_address=foo', 'subnet_id=bar']})
filters = {'fixed_ips': {'ip_address': ['foo'], 'subnet_id': ['bar']}}
instance.get_ports.assert_called_once_with(mock.ANY,
filters=filters,
fields=mock.ANY)
kwargs = self._get_collection_kwargs(filters=filters)
instance.get_ports.assert_called_once_with(mock.ANY, **kwargs)
def test_limit(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'),
{'limit': '10'})
kwargs = self._get_collection_kwargs(limit=10)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_limit_with_great_than_max_limit(self):
cfg.CONF.set_default('pagination_max_limit', '1000')
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'),
{'limit': '1001'})
kwargs = self._get_collection_kwargs(limit=1000)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_limit_with_zero(self):
cfg.CONF.set_default('pagination_max_limit', '1000')
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'limit': '0'})
kwargs = self._get_collection_kwargs(limit=1000)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_limit_with_unspecific(self):
cfg.CONF.set_default('pagination_max_limit', '1000')
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'))
kwargs = self._get_collection_kwargs(limit=1000)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_limit_with_negative_value(self):
cfg.CONF.set_default('pagination_max_limit', '1000')
instance = self.plugin.return_value
instance.get_networks.return_value = []
res = self.api.get(_get_path('networks'), {'limit': -1},
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_limit_with_non_integer(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
res = self.api.get(_get_path('networks'),
{'limit': 'abc'}, expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_limit_with_infinite_pagination_max_limit(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
cfg.CONF.set_override('pagination_max_limit', 'Infinite')
self.api.get(_get_path('networks'))
kwargs = self._get_collection_kwargs(limit=None)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_limit_with_negative_pagination_max_limit(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
cfg.CONF.set_default('pagination_max_limit', '-1')
self.api.get(_get_path('networks'))
kwargs = self._get_collection_kwargs(limit=None)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_limit_with_non_integer_pagination_max_limit(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
cfg.CONF.set_default('pagination_max_limit', 'abc')
self.api.get(_get_path('networks'))
kwargs = self._get_collection_kwargs(limit=None)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_marker(self):
cfg.CONF.set_override('pagination_max_limit', '1000')
instance = self.plugin.return_value
instance.get_networks.return_value = []
marker = _uuid()
self.api.get(_get_path('networks'),
{'marker': marker})
kwargs = self._get_collection_kwargs(limit=1000, marker=marker)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_page_reverse(self):
calls = []
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'),
{'page_reverse': 'True'})
kwargs = self._get_collection_kwargs(page_reverse=True)
calls.append(mock.call.get_networks(mock.ANY, **kwargs))
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'),
{'page_reverse': 'False'})
kwargs = self._get_collection_kwargs(page_reverse=False)
calls.append(mock.call.get_networks(mock.ANY, **kwargs))
def test_page_reverse_with_non_bool(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'),
{'page_reverse': 'abc'})
kwargs = self._get_collection_kwargs(page_reverse=False)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_page_reverse_with_unspecific(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'))
kwargs = self._get_collection_kwargs(page_reverse=False)
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_sort(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'),
{'sort_key': ['name', 'admin_state_up'],
'sort_dir': ['desc', 'asc']})
kwargs = self._get_collection_kwargs(sorts=[('name', False),
('admin_state_up', True),
('id', True)])
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_sort_with_primary_key(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'),
{'sort_key': ['name', 'admin_state_up', 'id'],
'sort_dir': ['desc', 'asc', 'desc']})
kwargs = self._get_collection_kwargs(sorts=[('name', False),
('admin_state_up', True),
('id', False)])
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_sort_without_direction(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
res = self.api.get(_get_path('networks'), {'sort_key': ['name']},
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_sort_with_invalid_attribute(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
res = self.api.get(_get_path('networks'),
{'sort_key': 'abc',
'sort_dir': 'asc'},
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_sort_with_invalid_dirs(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
res = self.api.get(_get_path('networks'),
{'sort_key': 'name',
'sort_dir': 'abc'},
expect_errors=True)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_emulated_sort(self):
instance = self.plugin.return_value
instance._QuantumPluginBaseV2__native_pagination_support = False
instance._QuantumPluginBaseV2__native_sorting_support = False
instance.get_networks.return_value = []
api = webtest.TestApp(router.APIRouter())
api.get(_get_path('networks'), {'sort_key': ['name', 'status'],
'sort_dir': ['desc', 'asc']})
kwargs = self._get_collection_kwargs(
skipargs=['sorts', 'limit', 'marker', 'page_reverse'])
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_emulated_sort_without_sort_field(self):
instance = self.plugin.return_value
instance._QuantumPluginBaseV2__native_pagination_support = False
instance._QuantumPluginBaseV2__native_sorting_support = False
instance.get_networks.return_value = []
api = webtest.TestApp(router.APIRouter())
api.get(_get_path('networks'), {'sort_key': ['name', 'status'],
'sort_dir': ['desc', 'asc'],
'fields': ['subnets']})
kwargs = self._get_collection_kwargs(
skipargs=['sorts', 'limit', 'marker', 'page_reverse'],
fields=_ArgMatcher(_list_cmp, ['name',
'status',
'id',
'subnets',
'shared',
'tenant_id']))
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_emulated_pagination(self):
instance = self.plugin.return_value
instance._QuantumPluginBaseV2__native_pagination_support = False
instance.get_networks.return_value = []
api = webtest.TestApp(router.APIRouter())
api.get(_get_path('networks'), {'limit': 10,
'marker': 'foo',
'page_reverse': False})
kwargs = self._get_collection_kwargs(skipargs=['limit',
'marker',
'page_reverse'])
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_native_pagination_without_native_sorting(self):
instance = self.plugin.return_value
instance._QuantumPluginBaseV2__native_sorting_support = False
self.assertRaises(Exception, router.APIRouter)
def test_native_pagination_without_allow_sorting(self):
cfg.CONF.set_override('allow_sorting', False)
instance = self.plugin.return_value
instance.get_networks.return_value = []
api = webtest.TestApp(router.APIRouter())
api.get(_get_path('networks'),
{'sort_key': ['name', 'admin_state_up'],
'sort_dir': ['desc', 'asc']})
kwargs = self._get_collection_kwargs(sorts=[('name', False),
('admin_state_up', True),
('id', True)])
instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
# Note: since all resources use the same controller and validation
@ -320,6 +571,184 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
tenant_id = _uuid()
self._test_list(tenant_id + "bad", tenant_id)
def test_list_pagination(self):
id1 = str(_uuid())
id2 = str(_uuid())
input_dict1 = {'id': id1,
'name': 'net1',
'admin_state_up': True,
'status': "ACTIVE",
'tenant_id': '',
'shared': False,
'subnets': []}
input_dict2 = {'id': id2,
'name': 'net2',
'admin_state_up': True,
'status': "ACTIVE",
'tenant_id': '',
'shared': False,
'subnets': []}
return_value = [input_dict1, input_dict2]
instance = self.plugin.return_value
instance.get_networks.return_value = return_value
params = {'limit': ['2'],
'marker': [str(_uuid())],
'sort_key': ['name'],
'sort_dir': ['asc']}
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(len(res['networks']), 2)
self.assertItemsEqual([id1, id2],
[res['networks'][0]['id'],
res['networks'][1]['id']])
self.assertIn('networks_links', res)
next_links = []
previous_links = []
for r in res['networks_links']:
if r['rel'] == 'next':
next_links.append(r)
if r['rel'] == 'previous':
previous_links.append(r)
self.assertEqual(len(next_links), 1)
self.assertEqual(len(previous_links), 1)
url = urlparse.urlparse(next_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
params['marker'] = [id2]
self.assertEqual(urlparse.parse_qs(url.query), params)
url = urlparse.urlparse(previous_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
params['marker'] = [id1]
params['page_reverse'] = ['True']
self.assertEqual(urlparse.parse_qs(url.query), params)
def test_list_pagination_with_last_page(self):
id = str(_uuid())
input_dict = {'id': id,
'name': 'net1',
'admin_state_up': True,
'status': "ACTIVE",
'tenant_id': '',
'shared': False,
'subnets': []}
return_value = [input_dict]
instance = self.plugin.return_value
instance.get_networks.return_value = return_value
params = {'limit': ['2'],
'marker': str(_uuid())}
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(len(res['networks']), 1)
self.assertEqual(id, res['networks'][0]['id'])
self.assertIn('networks_links', res)
previous_links = []
for r in res['networks_links']:
self.assertNotEqual(r['rel'], 'next')
if r['rel'] == 'previous':
previous_links.append(r)
self.assertEqual(len(previous_links), 1)
url = urlparse.urlparse(previous_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
expect_params = params.copy()
expect_params['marker'] = [id]
expect_params['page_reverse'] = ['True']
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
def test_list_pagination_with_empty_page(self):
return_value = []
instance = self.plugin.return_value
instance.get_networks.return_value = return_value
params = {'limit': ['2'],
'marker': str(_uuid())}
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(res['networks'], [])
previous_links = []
if 'networks_links' in res:
for r in res['networks_links']:
self.assertNotEqual(r['rel'], 'next')
if r['rel'] == 'previous':
previous_links.append(r)
self.assertEqual(len(previous_links), 1)
url = urlparse.urlparse(previous_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
expect_params = params.copy()
del expect_params['marker']
expect_params['page_reverse'] = ['True']
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
def test_list_pagination_reverse_with_last_page(self):
id = str(_uuid())
input_dict = {'id': id,
'name': 'net1',
'admin_state_up': True,
'status': "ACTIVE",
'tenant_id': '',
'shared': False,
'subnets': []}
return_value = [input_dict]
instance = self.plugin.return_value
instance.get_networks.return_value = return_value
params = {'limit': ['2'],
'marker': [str(_uuid())],
'page_reverse': ['True']}
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(len(res['networks']), 1)
self.assertEqual(id, res['networks'][0]['id'])
self.assertIn('networks_links', res)
next_links = []
for r in res['networks_links']:
self.assertNotEqual(r['rel'], 'previous')
if r['rel'] == 'next':
next_links.append(r)
self.assertEqual(len(next_links), 1)
url = urlparse.urlparse(next_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
expected_params = params.copy()
del expected_params['page_reverse']
expected_params['marker'] = [id]
self.assertEqual(urlparse.parse_qs(url.query),
expected_params)
def test_list_pagination_reverse_with_empty_page(self):
return_value = []
instance = self.plugin.return_value
instance.get_networks.return_value = return_value
params = {'limit': ['2'],
'marker': [str(_uuid())],
'page_reverse': ['True']}
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(res['networks'], [])
next_links = []
if 'networks_links' in res:
for r in res['networks_links']:
self.assertNotEqual(r['rel'], 'previous')
if r['rel'] == 'next':
next_links.append(r)
self.assertEqual(len(next_links), 1)
url = urlparse.urlparse(next_links[0]['href'])
self.assertEqual(url.path, _get_path('networks'))
expect_params = params.copy()
del expect_params['marker']
del expect_params['page_reverse']
self.assertEqual(urlparse.parse_qs(url.query), expect_params)
def test_create(self):
net_id = _uuid()
data = {'network': {'name': 'net1', 'admin_state_up': True,
@ -698,6 +1127,7 @@ class SubresourceTest(unittest.TestCase):
self._plugin_patcher.stop()
self.api = None
self.plugin = None
router.SUB_RESOURCES = {}
cfg.CONF.reset()
def test_index_sub_resource(self):
@ -970,45 +1400,46 @@ class TestSubresourcePlugin():
return
class FieldsTestCase(unittest.TestCase):
def test_with_fields(self):
class ListArgsTestCase(unittest.TestCase):
def test_list_args(self):
path = '/?fields=4&foo=3&fields=2&bar=1'
request = webob.Request.blank(path)
expect_val = ['2', '4']
actual_val = base._fields(request)
actual_val = api_common.list_args(request, 'fields')
self.assertItemsEqual(actual_val, expect_val)
def test_without_fields(self):
def test_list_args_with_empty(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
request = webob.Request.blank(path)
self.assertListEqual([], base._fields(request))
self.assertEqual([], api_common.list_args(request, 'fields'))
class FiltersTestCase(unittest.TestCase):
def test_all_fields(self):
def test_all_skip_args(self):
path = '/?fields=4&fields=3&fields=2&fields=1'
request = webob.Request.blank(path)
self.assertDictEqual({}, base._filters(request, None))
self.assertEqual({}, api_common.get_filters(request, None,
["fields"]))
def test_blank_values(self):
path = '/?foo=&bar=&baz=&qux='
request = webob.Request.blank(path)
self.assertDictEqual({}, base._filters(request, {}))
self.assertEqual({}, api_common.get_filters(request, {}))
def test_no_attr_info(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
request = webob.Request.blank(path)
expect_val = {'foo': ['4'], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
actual_val = base._filters(request, {})
self.assertDictEqual(actual_val, expect_val)
actual_val = api_common.get_filters(request, {})
self.assertEqual(actual_val, expect_val)
def test_attr_info_without_conversion(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
request = webob.Request.blank(path)
attr_info = {'foo': {'key': 'val'}}
expect_val = {'foo': ['4'], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
actual_val = base._filters(request, attr_info)
self.assertDictEqual(actual_val, expect_val)
actual_val = api_common.get_filters(request, attr_info)
self.assertEqual(actual_val, expect_val)
def test_attr_info_with_convert_list_to(self):
path = '/?foo=key=4&bar=3&foo=key=2&qux=1'
@ -1019,16 +1450,16 @@ class FiltersTestCase(unittest.TestCase):
}
}
expect_val = {'foo': {'key': ['2', '4']}, 'bar': ['3'], 'qux': ['1']}
actual_val = base._filters(request, attr_info)
self.assertDictEqual(actual_val, expect_val)
actual_val = api_common.get_filters(request, attr_info)
self.assertEqual(actual_val, expect_val)
def test_attr_info_with_convert_to(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
request = webob.Request.blank(path)
attr_info = {'foo': {'convert_to': attributes.convert_to_int}}
expect_val = {'foo': [4], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
actual_val = base._filters(request, attr_info)
self.assertDictEqual(actual_val, expect_val)
actual_val = api_common.get_filters(request, attr_info)
self.assertEqual(actual_val, expect_val)
class CreateResourceTestCase(unittest.TestCase):

View File

@ -28,6 +28,7 @@ import unittest2
import webob.exc
import quantum
from quantum.api import api_common
from quantum.api.extensions import PluginAwareExtensionManager
from quantum.api.v2 import attributes
from quantum.api.v2.attributes import ATTR_NOT_SPECIFIED
@ -59,6 +60,14 @@ def etcdir(*p):
return os.path.join(ETCDIR, *p)
def _fake_get_pagination_helper(self, request):
return api_common.PaginationEmulatedHelper(request, self._primary_key)
def _fake_get_sorting_helper(self, request):
return api_common.SortingEmulatedHelper(request, self._attr_info)
class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
@ -97,6 +106,8 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.set_override('max_subnet_host_routes', 2)
cfg.CONF.set_override('allow_pagination', True)
cfg.CONF.set_override('allow_sorting', True)
self.api = APIRouter()
# Set the defualt port status
self.port_create_status = 'ACTIVE'
@ -110,6 +121,26 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
self._skip_native_bulk = not _is_native_bulk_supported()
def _is_native_pagination_support():
native_pagination_attr_name = (
"_%s__native_pagination_support" %
QuantumManager.get_plugin().__class__.__name__)
return (cfg.CONF.allow_pagination and
getattr(QuantumManager.get_plugin(),
native_pagination_attr_name, False))
self._skip_native_pagination = not _is_native_pagination_support()
def _is_native_sorting_support():
native_sorting_attr_name = (
"_%s__native_sorting_support" %
QuantumManager.get_plugin().__class__.__name__)
return (cfg.CONF.allow_sorting and
getattr(QuantumManager.get_plugin(),
native_sorting_attr_name, False))
self._skip_native_sorting = not _is_native_sorting_support()
ext_mgr = test_config.get('extension_manager', None)
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
@ -119,6 +150,8 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
self.api = None
self._deserializers = None
self._skip_native_bulk = None
self._skip_native_pagination = None
self._skip_native_sortin = None
self.ext_api = None
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
# doesn't like when the plugin changes ;)
@ -436,6 +469,7 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
res = self._list('%ss' % resource,
quantum_context=quantum_context,
query_params=query_params)
resource = resource.replace('-', '_')
self.assertItemsEqual([i['id'] for i in res['%ss' % resource]],
[i[resource]['id'] for i in items])
@ -504,6 +538,89 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
if not no_delete:
self._delete('ports', port['port']['id'])
def _test_list_with_sort(self, collection, items, sorts, query_params=''):
query_str = query_params
for key, direction in sorts:
query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
direction)
req = self.new_list_request('%ss' % collection,
params=query_str)
api = self._api_for_resource('%ss' % collection)
res = self.deserialize(self.fmt, req.get_response(api))
collection = collection.replace('-', '_')
expected_res = [item[collection]['id'] for item in items]
self.assertListEqual([n['id'] for n in res["%ss" % collection]],
expected_res)
def _test_list_with_pagination(self, collection, items, sort,
limit, expected_page_num, query_params='',
verify_key='id'):
if self.fmt == 'xml':
self.skipTest("Skip xml test for pagination")
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&sort_key=%s&"
"sort_dir=%s") % (limit, sort[0], sort[1])
req = self.new_list_request("%ss" % collection, params=query_str)
items_res = []
page_num = 0
api = self._api_for_resource('%ss' % collection)
collection = collection.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertLessEqual(len(res["%ss" % collection]), limit)
items_res = items_res + res["%ss" % collection]
req = None
if '%ss_links' % collection in res:
for link in res['%ss_links' % collection]:
if link['rel'] == 'next':
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
self.assertEqual(len(res["%ss" % collection]),
limit)
self.assertEqual(page_num, expected_page_num)
self.assertListEqual([n[verify_key] for n in items_res],
[item[collection][verify_key] for item in items])
def _test_list_with_pagination_reverse(self, collection, items, sort,
limit, expected_page_num,
query_params=''):
if self.fmt == 'xml':
self.skipTest("Skip xml test for pagination")
resources = '%ss' % collection
collection = collection.replace('-', '_')
api = self._api_for_resource(resources)
marker = items[-1][collection]['id']
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&page_reverse=True&"
"sort_key=%s&sort_dir=%s&"
"marker=%s") % (limit, sort[0], sort[1],
marker)
req = self.new_list_request(resources, params=query_str)
item_res = [items[-1][collection]]
page_num = 0
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertLessEqual(len(res["%ss" % collection]), limit)
res["%ss" % collection].reverse()
item_res = item_res + res["%ss" % collection]
req = None
if '%ss_links' % collection in res:
for link in res['%ss_links' % collection]:
if link['rel'] == 'previous':
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
self.assertEqual(len(res["%ss" % collection]),
limit)
self.assertEqual(page_num, expected_page_num)
expected_res = [item[collection]['id'] for item in items]
expected_res.reverse()
self.assertListEqual([n['id'] for n in item_res],
expected_res)
class TestBasicGet(QuantumDbPluginV2TestCase):
@ -799,6 +916,101 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self._test_list_resources('port', [port2],
quantum_context=q_context)
def test_list_ports_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(admin_state_up='True',
mac_address='00:00:00:00:00:01'),
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:02'),
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
def test_list_ports_with_sort_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_sorting_helper',
new=_fake_get_sorting_helper)
helper_patcher.start()
try:
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(admin_state_up='True',
mac_address='00:00:00:00:00:01'),
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:02'),
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
finally:
helper_patcher.stop()
def test_list_ports_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
self.port(mac_address='00:00:00:00:00:02'),
self.port(mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
def test_list_ports_with_pagination_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
self.port(mac_address='00:00:00:00:00:02'),
self.port(mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
finally:
helper_patcher.stop()
def test_list_ports_with_pagination_reverse_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
self.port(mac_address='00:00:00:00:00:02'),
self.port(mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_pagination_reverse('port',
(port1, port2, port3),
('mac_address', 'asc'),
2, 2)
def test_list_ports_with_pagination_reverse_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
self.port(mac_address='00:00:00:00:00:02'),
self.port(mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_pagination_reverse('port',
(port1, port2, port3),
('mac_address', 'asc'),
2, 2)
finally:
helper_patcher.stop()
def test_show_port(self):
with self.port() as port:
req = self.new_show_request('ports', port['port']['id'], self.fmt)
@ -1769,6 +1981,158 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
self.network()) as networks:
self._test_list_resources('network', networks)
def test_list_networks_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.network(admin_status_up=True,
name='net1'),
self.network(admin_status_up=False,
name='net2'),
self.network(admin_status_up=False,
name='net3')
) as (net1, net2, net3):
self._test_list_with_sort('network', (net3, net2, net1),
[('admin_state_up', 'asc'),
('name', 'desc')])
def test_list_networks_with_sort_extended_attr_native_returns_400(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.network(admin_status_up=True,
name='net1'),
self.network(admin_status_up=False,
name='net2'),
self.network(admin_status_up=False,
name='net3')
):
req = self.new_list_request(
'networks',
params='sort_key=provider:segmentation_id&sort_dir=asc')
res = req.get_response(self.api)
self.assertEqual(400, res.status_int)
def test_list_networks_with_sort_remote_key_native_returns_400(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.network(admin_status_up=True,
name='net1'),
self.network(admin_status_up=False,
name='net2'),
self.network(admin_status_up=False,
name='net3')
):
req = self.new_list_request(
'networks', params='sort_key=subnets&sort_dir=asc')
res = req.get_response(self.api)
self.assertEqual(400, res.status_int)
def test_list_networks_with_sort_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_sorting_helper',
new=_fake_get_sorting_helper)
helper_patcher.start()
try:
with contextlib.nested(self.network(admin_status_up=True,
name='net1'),
self.network(admin_status_up=False,
name='net2'),
self.network(admin_status_up=False,
name='net3')
) as (net1, net2, net3):
self._test_list_with_sort('network', (net3, net2, net1),
[('admin_state_up', 'asc'),
('name', 'desc')])
finally:
helper_patcher.stop()
def test_list_networks_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2)
def test_list_networks_with_pagination_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2)
finally:
helper_patcher.stop()
def test_list_networks_without_pk_in_fields_pagination_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.network(name='net1',
shared=True),
self.network(name='net2',
shared=False),
self.network(name='net3',
shared=True)
) as (net1, net2, net3):
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2,
query_params="fields=name",
verify_key='name')
finally:
helper_patcher.stop()
def test_list_networks_without_pk_in_fields_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2,
query_params="fields=shared",
verify_key='shared')
def test_list_networks_with_pagination_reverse_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination_reverse('network',
(net1, net2, net3),
('name', 'asc'), 2, 2)
def test_list_networks_with_pagination_reverse_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination_reverse('network',
(net1, net2, net3),
('name', 'asc'), 2, 2)
finally:
helper_patcher.stop()
def test_list_networks_with_parameters(self):
with contextlib.nested(self.network(name='net1',
admin_state_up=False),
@ -2628,6 +2992,97 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
self._test_list_resources('subnet', [],
query_params=query_params)
def test_list_subnets_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.subnet(enable_dhcp=True,
cidr='10.0.0.0/24'),
self.subnet(enable_dhcp=False,
cidr='11.0.0.0/24'),
self.subnet(enable_dhcp=False,
cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_sort('subnet', (subnet3, subnet2, subnet1),
[('enable_dhcp', 'asc'),
('cidr', 'desc')])
def test_list_subnets_with_sort_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_sorting_helper',
new=_fake_get_sorting_helper)
helper_patcher.start()
try:
with contextlib.nested(self.subnet(enable_dhcp=True,
cidr='10.0.0.0/24'),
self.subnet(enable_dhcp=False,
cidr='11.0.0.0/24'),
self.subnet(enable_dhcp=False,
cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_sort('subnet', (subnet3,
subnet2,
subnet1),
[('enable_dhcp', 'asc'),
('cidr', 'desc')])
finally:
helper_patcher.stop()
def test_list_subnets_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='11.0.0.0/24'),
self.subnet(cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_pagination('subnet',
(subnet1, subnet2, subnet3),
('cidr', 'asc'), 2, 2)
def test_list_subnets_with_pagination_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='11.0.0.0/24'),
self.subnet(cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_pagination('subnet',
(subnet1, subnet2, subnet3),
('cidr', 'asc'), 2, 2)
finally:
helper_patcher.stop()
def test_list_subnets_with_pagination_reverse_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='11.0.0.0/24'),
self.subnet(cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_pagination_reverse('subnet',
(subnet1, subnet2,
subnet3),
('cidr', 'asc'), 2, 2)
def test_list_subnets_with_pagination_reverse_emulated(self):
helper_patcher = mock.patch(
'quantum.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='11.0.0.0/24'),
self.subnet(cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_pagination_reverse('subnet',
(subnet1, subnet2,
subnet3),
('cidr', 'asc'), 2, 2)
finally:
helper_patcher.stop()
def test_invalid_ip_version(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],

View File

@ -170,6 +170,9 @@ class SecurityGroupTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
associating ports with security groups.
"""
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["security-group"]
def create_port(self, context, port):
@ -208,9 +211,12 @@ class SecurityGroupTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
return super(SecurityGroupTestPlugin, self).create_network(context,
network)
def get_ports(self, context, filters=None, fields=None):
def get_ports(self, context, filters=None, fields=None,
sorts=[], limit=None, marker=None,
page_reverse=False):
quantum_lports = super(SecurityGroupTestPlugin, self).get_ports(
context, filters)
context, filters, sorts=sorts, limit=limit, marker=marker,
page_reverse=page_reverse)
for quantum_lport in quantum_lports:
self._extend_port_dict_security_group(context, quantum_lport)
return quantum_lports
@ -295,17 +301,54 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.assertEqual(res.status_int, 409)
def test_list_security_groups(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description):
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(len(groups['security_groups']), 2)
for group in groups['security_groups']:
if group['name'] == 'default':
self.assertEquals(len(group['security_group_rules']), 2)
else:
self.assertEquals(len(group['security_group_rules']), 0)
with contextlib.nested(self.security_group(name='sg1',
description='sg'),
self.security_group(name='sg2',
description='sg'),
self.security_group(name='sg3',
description='sg')
) as security_groups:
self._test_list_resources('security-group',
security_groups,
query_params='description=sg')
def test_list_security_groups_with_sort(self):
with contextlib.nested(self.security_group(name='sg1',
description='sg'),
self.security_group(name='sg2',
description='sg'),
self.security_group(name='sg3',
description='sg')
) as (sg1, sg2, sg3):
self._test_list_with_sort('security-group',
(sg3, sg2, sg1),
[('name', 'desc')],
query_params='description=sg')
def test_list_security_groups_with_pagination(self):
with contextlib.nested(self.security_group(name='sg1',
description='sg'),
self.security_group(name='sg2',
description='sg'),
self.security_group(name='sg3',
description='sg')
) as (sg1, sg2, sg3):
self._test_list_with_pagination('security-group',
(sg1, sg2, sg3),
('name', 'asc'), 2, 2,
query_params='description=sg')
def test_list_security_groups_with_pagination_reverse(self):
with contextlib.nested(self.security_group(name='sg1',
description='sg'),
self.security_group(name='sg2',
description='sg'),
self.security_group(name='sg3',
description='sg')
) as (sg1, sg2, sg3):
self._test_list_with_pagination_reverse(
'security-group', (sg1, sg2, sg3), ('name', 'asc'), 2, 2,
query_params='description=sg')
def test_create_security_group_rule_ethertype_invalid_as_number(self):
name = 'webservers'
@ -673,6 +716,89 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.assertEquals(len(port[ext_sg.SECURITYGROUPS]), 1)
self._delete('ports', port['id'])
def test_list_security_group_rules(self):
with self.security_group(name='sg') as sg:
security_group_id = sg['security_group']['id']
with contextlib.nested(self.security_group_rule(security_group_id,
direction='egress',
port_range_min=22,
port_range_max=22),
self.security_group_rule(security_group_id,
direction='egress',
port_range_min=23,
port_range_max=23),
self.security_group_rule(security_group_id,
direction='egress',
port_range_min=24,
port_range_max=24)
) as (sgr1, sgr2, sgr3):
self._test_list_resources('security-group-rule',
[sgr1, sgr2, sgr3],
query_params="direction=egress")
def test_list_security_group_rules_with_sort(self):
with self.security_group(name='sg') as sg:
security_group_id = sg['security_group']['id']
with contextlib.nested(self.security_group_rule(security_group_id,
direction='egress',
port_range_min=22,
port_range_max=22),
self.security_group_rule(security_group_id,
direction='egress',
port_range_min=23,
port_range_max=23),
self.security_group_rule(security_group_id,
direction='egress',
port_range_min=24,
port_range_max=24)
) as (sgr1, sgr2, sgr3):
self._test_list_with_sort('security-group-rule',
(sgr3, sgr2, sgr1),
[('port_range_max', 'desc')],
query_params='direction=egress')
def test_list_security_group_rules_with_pagination(self):
with self.security_group(name='sg') as sg:
security_group_id = sg['security_group']['id']
with contextlib.nested(self.security_group_rule(security_group_id,
direction='egress',
port_range_min=22,
port_range_max=22),
self.security_group_rule(security_group_id,
direction='egress',
port_range_min=23,
port_range_max=23),
self.security_group_rule(security_group_id,
direction='egress',
port_range_min=24,
port_range_max=24)
) as (sgr1, sgr2, sgr3):
self._test_list_with_pagination(
'security-group-rule', (sgr3, sgr2, sgr1),
('port_range_max', 'desc'), 2, 2,
query_params='direction=egress')
def test_list_security_group_rules_with_pagination_reverse(self):
with self.security_group(name='sg') as sg:
security_group_id = sg['security_group']['id']
with contextlib.nested(self.security_group_rule(security_group_id,
direction='egress',
port_range_min=22,
port_range_max=22),
self.security_group_rule(security_group_id,
direction='egress',
port_range_min=23,
port_range_max=23),
self.security_group_rule(security_group_id,
direction='egress',
port_range_min=24,
port_range_max=24)
) as (sgr1, sgr2, sgr3):
self._test_list_with_pagination_reverse(
'security-group-rule', (sgr3, sgr2, sgr1),
('port_range_max', 'desc'), 2, 2,
query_params='direction=egress')
def test_update_port_with_security_group(self):
with self.network() as n:
with self.subnet(n):

View File

@ -39,7 +39,7 @@ from quantum.db import l3_db
from quantum.db import l3_rpc_agent_api
from quantum.db import models_v2
from quantum.extensions import l3
from quantum import manager
from quantum.manager import QuantumManager
from quantum.openstack.common import log as logging
from quantum.openstack.common.notifier import api as notifier_api
from quantum.openstack.common.notifier import test_notifier
@ -75,7 +75,7 @@ class L3NatExtensionTestCase(testlib_api.WebTestCase):
plugin = 'quantum.extensions.l3.RouterPluginBase'
# Ensure 'stale' patched copies of the plugin are never returned
manager.QuantumManager._instance = None
QuantumManager._instance = None
# Ensure existing ExtensionManager is not used
extensions.PluginAwareExtensionManager._instance = None
@ -91,13 +91,17 @@ class L3NatExtensionTestCase(testlib_api.WebTestCase):
# Update the plugin and extensions path
cfg.CONF.set_override('core_plugin', plugin)
cfg.CONF.set_override('allow_pagination', True)
cfg.CONF.set_override('allow_sorting', True)
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
instances = self.plugin.return_value
instances._RouterPluginBase__native_pagination_support = True
instances._RouterPluginBase__native_sorting_support = True
# Instantiate mock plugin and enable the 'router' extension
manager.QuantumManager.get_plugin().supported_extension_aliases = (
QuantumManager.get_plugin().supported_extension_aliases = (
["router"])
ext_mgr = L3TestExtensionManager()
self.ext_mdw = test_extensions.setup_extensions_middleware(ext_mgr)
self.api = webtest.TestApp(self.ext_mdw)
@ -147,7 +151,11 @@ class L3NatExtensionTestCase(testlib_api.WebTestCase):
res = self.api.get(_get_path('routers', fmt=self.fmt))
instance.get_routers.assert_called_with(mock.ANY, fields=mock.ANY,
filters=mock.ANY)
filters=mock.ANY,
sorts=mock.ANY,
limit=mock.ANY,
marker=mock.ANY,
page_reverse=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertTrue('routers' in res)
@ -242,6 +250,10 @@ class L3NatExtensionTestCaseXML(L3NatExtensionTestCase):
# This plugin class is just for testing
class TestL3NatPlugin(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin):
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["router"]
def create_network(self, context, network):
@ -273,9 +285,12 @@ class TestL3NatPlugin(db_base_plugin_v2.QuantumDbPluginV2,
self._extend_network_dict_l3(context, net)
return self._fields(net, fields)
def get_networks(self, context, filters=None, fields=None):
nets = super(TestL3NatPlugin, self).get_networks(context, filters,
None)
def get_networks(self, context, filters=None, fields=None,
sorts=[], limit=None, marker=None,
page_reverse=False):
nets = super(TestL3NatPlugin, self).get_networks(
context, filters=filters, fields=fields, sorts=sorts, limit=limit,
marker=marker, page_reverse=page_reverse)
for net in nets:
self._extend_network_dict_l3(context, net)
nets = self._filter_nets_l3(context, nets, filters)
@ -536,6 +551,33 @@ class L3NatDBTestCase(L3NatTestCaseBase):
self._test_list_resources('router', [],
query_params=query_params)
def test_router_list_with_sort(self):
with contextlib.nested(self.router(name='router1'),
self.router(name='router2'),
self.router(name='router3')
) as (router1, router2, router3):
self._test_list_with_sort('router', (router3, router2, router1),
[('name', 'desc')])
def test_router_list_with_pagination(self):
with contextlib.nested(self.router(name='router1'),
self.router(name='router2'),
self.router(name='router3')
) as (router1, router2, router3):
self._test_list_with_pagination('router',
(router1, router2, router3),
('name', 'asc'), 2, 2)
def test_router_list_with_pagination_reverse(self):
with contextlib.nested(self.router(name='router1'),
self.router(name='router2'),
self.router(name='router3')
) as (router1, router2, router3):
self._test_list_with_pagination_reverse('router',
(router1, router2,
router3),
('name', 'asc'), 2, 2)
def test_router_update(self):
rname1 = "yourrouter"
rname2 = "nachorouter"
@ -1248,6 +1290,74 @@ class L3NatDBTestCase(L3NatTestCaseBase):
uuidutils.generate_uuid(), 'iamnotnanip')
self.assertEqual(res.status_int, 400)
def test_floatingip_list_with_sort(self):
with contextlib.nested(self.subnet(cidr="10.0.0.0/24"),
self.subnet(cidr="11.0.0.0/24"),
self.subnet(cidr="12.0.0.0/24")
) as (s1, s2, s3):
network_id1 = s1['subnet']['network_id']
network_id2 = s2['subnet']['network_id']
network_id3 = s3['subnet']['network_id']
self._set_net_external(network_id1)
self._set_net_external(network_id2)
self._set_net_external(network_id3)
fp1 = self._make_floatingip(self.fmt, network_id1)
fp2 = self._make_floatingip(self.fmt, network_id2)
fp3 = self._make_floatingip(self.fmt, network_id3)
try:
self._test_list_with_sort('floatingip', (fp3, fp2, fp1),
[('floating_ip_address', 'desc')])
finally:
self._delete('floatingips', fp1['floatingip']['id'])
self._delete('floatingips', fp2['floatingip']['id'])
self._delete('floatingips', fp3['floatingip']['id'])
def test_floatingip_list_with_pagination(self):
with contextlib.nested(self.subnet(cidr="10.0.0.0/24"),
self.subnet(cidr="11.0.0.0/24"),
self.subnet(cidr="12.0.0.0/24")
) as (s1, s2, s3):
network_id1 = s1['subnet']['network_id']
network_id2 = s2['subnet']['network_id']
network_id3 = s3['subnet']['network_id']
self._set_net_external(network_id1)
self._set_net_external(network_id2)
self._set_net_external(network_id3)
fp1 = self._make_floatingip(self.fmt, network_id1)
fp2 = self._make_floatingip(self.fmt, network_id2)
fp3 = self._make_floatingip(self.fmt, network_id3)
try:
self._test_list_with_pagination(
'floatingip', (fp1, fp2, fp3),
('floating_ip_address', 'asc'), 2, 2)
finally:
self._delete('floatingips', fp1['floatingip']['id'])
self._delete('floatingips', fp2['floatingip']['id'])
self._delete('floatingips', fp3['floatingip']['id'])
def test_floatingip_list_with_pagination_reverse(self):
with contextlib.nested(self.subnet(cidr="10.0.0.0/24"),
self.subnet(cidr="11.0.0.0/24"),
self.subnet(cidr="12.0.0.0/24")
) as (s1, s2, s3):
network_id1 = s1['subnet']['network_id']
network_id2 = s2['subnet']['network_id']
network_id3 = s3['subnet']['network_id']
self._set_net_external(network_id1)
self._set_net_external(network_id2)
self._set_net_external(network_id3)
fp1 = self._make_floatingip(self.fmt, network_id1)
fp2 = self._make_floatingip(self.fmt, network_id2)
fp3 = self._make_floatingip(self.fmt, network_id3)
try:
self._test_list_with_pagination_reverse(
'floatingip', (fp1, fp2, fp3),
('floating_ip_address', 'asc'), 2, 2)
finally:
self._delete('floatingips', fp1['floatingip']['id'])
self._delete('floatingips', fp2['floatingip']['id'])
self._delete('floatingips', fp3['floatingip']['id'])
def test_floatingip_delete_router_intf_with_subnet_id_returns_409(self):
found = False
with self.floatingip_with_assoc():
@ -1291,20 +1401,20 @@ class L3NatDBTestCase(L3NatTestCaseBase):
self.assertEqual(len(body['networks']), 1)
def test_get_network_succeeds_without_filter(self):
plugin = manager.QuantumManager.get_plugin()
plugin = QuantumManager.get_plugin()
ctx = context.Context(None, None, is_admin=True)
result = plugin.get_networks(ctx, filters=None)
self.assertEqual(result, [])
def test_network_filter_hook_admin_context(self):
plugin = manager.QuantumManager.get_plugin()
plugin = QuantumManager.get_plugin()
ctx = context.Context(None, None, is_admin=True)
model = models_v2.Network
conditions = plugin._network_filter_hook(ctx, model, [])
self.assertEqual(conditions, [])
def test_network_filter_hook_nonadmin_context(self):
plugin = manager.QuantumManager.get_plugin()
plugin = QuantumManager.get_plugin()
ctx = context.Context('edinson', 'cavani')
model = models_v2.Network
txt = "externalnetworks.network_id IS NOT NULL"