Batch results per cell when doing cross-cell listing

This extends the multi_cell_list module with batching support to avoid
querying N*$limit total results when listing resources across cells.
Instead, if our total limit is over a given threshold, we should query
smaller batches in the per-cell thread until we reach the total limit
or are stopped because the sort feeder has found enough across all cells
to satisfy the requirements. In many cases, this can drop the total number
of results we load and process from N*$limit to (best case) $limit+$batch
or (usual case) $limit+(N*$batch).

Since we return a generator from our scatter-gather function, this should
mean we basically finish the scatter immediately after the first batch query
to each cell database, keeping the threads alive until they produce all the
results possible from their cell, or are terminated in the generator loop
by the master loop hitting the total_limit condition. As a result, the
checking over results that we do immediately after the scatter finishes
will no longer do anything since we start running the query code for the
first time as heapq.merge() starts hitting the generators. So, this brings
a query_wrapper() specific to the multi_cell_list code which can mimic the
timeout and error handling abilities of scatter_gather_cells, but inline
as we're processing so that we don't interrupt the merge sort for a
failure.

Related-Bug: #1787977
Change-Id: Iaa4759822e70b39bd735104d03d4deec988d35a1
This commit is contained in:
Dan Smith 2018-08-16 13:52:01 -07:00
parent e6e5e48c07
commit 0a88916911
5 changed files with 393 additions and 37 deletions

View File

@ -45,9 +45,10 @@ class InstanceSortContext(multi_cell_list.RecordSortContext):
class InstanceLister(multi_cell_list.CrossCellLister):
def __init__(self, sort_keys, sort_dirs, cells=None):
def __init__(self, sort_keys, sort_dirs, cells=None, batch_size=None):
super(InstanceLister, self).__init__(
InstanceSortContext(sort_keys, sort_dirs), cells=cells)
InstanceSortContext(sort_keys, sort_dirs), cells=cells,
batch_size=batch_size)
@property
def marker_identifier(self):
@ -89,9 +90,11 @@ class InstanceLister(multi_cell_list.CrossCellLister):
# NOTE(danms): These methods are here for legacy glue reasons. We should not
# replicate these for every data type we implement.
def get_instances_sorted(ctx, filters, limit, marker, columns_to_join,
sort_keys, sort_dirs, cell_mappings=None):
sort_keys, sort_dirs, cell_mappings=None,
batch_size=None):
return InstanceLister(sort_keys, sort_dirs,
cells=cell_mappings).get_records_sorted(
cells=cell_mappings,
batch_size=batch_size).get_records_sorted(
ctx, filters, limit, marker, columns_to_join=columns_to_join)

View File

@ -13,15 +13,18 @@
import abc
import copy
import heapq
import itertools
import eventlet
import six
from oslo_log import log as logging
from nova import context
from nova import exception
LOG = logging.getLogger(__name__)
CELL_FAIL_SENTINELS = (context.did_not_respond_sentinel,
context.raised_exception_sentinel)
class RecordSortContext(object):
@ -61,12 +64,54 @@ class RecordWrapper(object):
self._db_record = db_record
def __lt__(self, other):
# NOTE(danms): This makes us always sort failure sentinels
# higher than actual results. We do this so that they bubble
# up in the get_records_sorted() feeder loop ahead of anything
# else, and so that the implementation of RecordSortContext
# never sees or has to handle the sentinels. If we did not
# sort these to the top then we could potentially return
# $limit results from good cells before we noticed the failed
# cells, and would not properly report them as failed for
# fix-up in the higher layers.
if self._db_record in CELL_FAIL_SENTINELS:
return True
elif other._db_record in CELL_FAIL_SENTINELS:
return False
r = self._sort_ctx.compare_records(self._db_record,
other._db_record)
# cmp(x, y) returns -1 if x < y
return r == -1
def query_wrapper(ctx, fn, *args, **kwargs):
"""This is a helper to run a query with predictable fail semantics.
This is a generator which will mimic the scatter_gather_cells() behavior
by honoring a timeout and catching exceptions, yielding the usual
sentinel objects instead of raising. It wraps these in RecordWrapper
objects, which will prioritize them to the merge sort, causing them to
be handled by the main get_objects_sorted() feeder loop quickly and
gracefully.
"""
with eventlet.timeout.Timeout(context.CELL_TIMEOUT, exception.CellTimeout):
try:
for record in fn(ctx, *args, **kwargs):
yield record
except exception.CellTimeout:
# Here, we yield a RecordWrapper (no sort_ctx needed since
# we won't call into the implementation's comparison routines)
# wrapping the sentinel indicating timeout.
yield RecordWrapper(ctx, None, context.did_not_respond_sentinel)
raise StopIteration
except Exception:
# Here, we yield a RecordWrapper (no sort_ctx needed since
# we won't call into the implementation's comparison routines)
# wrapping the sentinel indicating failure.
yield RecordWrapper(ctx, None, context.raised_exception_sentinel)
raise StopIteration
@six.add_metaclass(abc.ABCMeta)
class CrossCellLister(object):
"""An implementation of a cross-cell efficient lister.
@ -78,9 +123,10 @@ class CrossCellLister(object):
your data type from cell databases.
"""
def __init__(self, sort_ctx, cells=None):
def __init__(self, sort_ctx, cells=None, batch_size=None):
self.sort_ctx = sort_ctx
self.cells = cells
self.batch_size = batch_size
@property
@abc.abstractmethod
@ -238,48 +284,104 @@ class CrossCellLister(object):
# nothing. If we didn't have this clause, we'd
# pass marker=None to the query below and return a
# full unpaginated set for our cell.
return []
return
main_query_result = self.get_by_filters(
cctx, filters,
limit=limit, marker=local_marker,
**kwargs)
if local_marker_prefix:
# Per above, if we had a matching marker object, that is
# the first result we should generate.
yield RecordWrapper(cctx, self.sort_ctx,
local_marker_prefix[0])
return (RecordWrapper(cctx, self.sort_ctx, inst) for inst in
itertools.chain(local_marker_prefix, main_query_result))
# If a batch size was provided, use that as the limit per
# batch. If not, then ask for the entire $limit in a single
# batch.
batch_size = self.batch_size or limit
# NOTE(tssurya): When the below routine provides sentinels to indicate
# a timeout on a cell, we ignore that cell to avoid the crash when
# doing the merge below and continue merging the results from the 'up'
# cells.
# TODO(tssurya): Modify this to return the minimal available info from
# the down cells.
# Keep track of how many we have returned in all batches
return_count = 0
# If limit was unlimited then keep querying batches until
# we run out of results. Otherwise, query until the total count
# we have returned exceeds the limit.
while limit is None or return_count < limit:
batch_count = 0
# Do not query a full batch if it would cause our total
# to exceed the limit
if limit:
query_size = min(batch_size, limit - return_count)
else:
query_size = batch_size
# Get one batch
query_result = self.get_by_filters(
cctx, filters,
limit=query_size or None, marker=local_marker,
**kwargs)
# Yield wrapped results from the batch, counting as we go
# (to avoid traversing the list to count). Also, update our
# local_marker each time so that local_marker is the end of
# this batch in order to find the next batch.
for item in query_result:
local_marker = item[self.marker_identifier]
yield RecordWrapper(cctx, self.sort_ctx, item)
batch_count += 1
# No results means we are done for this cell
if not batch_count:
break
return_count += batch_count
LOG.debug(('Listed batch of %(batch)i results from cell '
'out of %(limit)s limit. Returned %(total)i '
'total so far.'),
{'batch': batch_count,
'total': return_count,
'limit': limit or 'no'})
# NOTE(danms): The calls to do_query() will return immediately
# with a generator. There is no point in us checking the
# results for failure or timeout since we have not actually
# run any code in do_query() until the first iteration
# below. The query_wrapper() utility handles inline
# translation of failures and timeouts to sentinels which will
# be generated and consumed just like any normal result below.
if self.cells:
results = context.scatter_gather_cells(ctx, self.cells,
context.CELL_TIMEOUT,
do_query)
query_wrapper, do_query)
else:
results = context.scatter_gather_all_cells(ctx, do_query)
for cell_uuid in list(results):
if results[cell_uuid] in (context.did_not_respond_sentinel,
context.raised_exception_sentinel):
LOG.warning("Cell %s is not responding and hence skipped "
"from the results.", cell_uuid)
results.pop(cell_uuid)
results = context.scatter_gather_all_cells(ctx,
query_wrapper, do_query)
# If a limit was provided, it was passed to the per-cell query
# routines. That means we have NUM_CELLS * limit items across
# results. So, we need to consume from that limit below and
# stop returning results.
limit = limit or 0
# stop returning results. Call that total_limit since we will
# modify it in the loop below, but do_query() above also looks
# at the original provided limit.
total_limit = limit or 0
# Generate results from heapq so we can return the inner
# instance instead of the wrapper. This is basically free
# as it works as our caller iterates the results.
for i in heapq.merge(*results.values()):
yield i._db_record
limit -= 1
if limit == 0:
feeder = heapq.merge(*results.values())
while True:
try:
item = next(feeder)
except StopIteration:
return
if item._db_record in CELL_FAIL_SENTINELS:
LOG.warning('Cell %s is not responding and hence is '
'being omitted from the results',
item.cell_uuid)
continue
yield item._db_record
total_limit -= 1
if total_limit == 0:
# We'll only hit this if limit was nonzero and we just
# generated our last one
return

View File

@ -146,6 +146,15 @@ class InstanceListTestCase(test.TestCase):
self.assertEqual(sorted(uuids), uuids)
self.assertEqual(len(self.instances), len(uuids))
def test_get_sorted_with_large_limit_batched(self):
insts = instance_list.get_instances_sorted(self.context, {},
5000, None,
[], ['uuid'], ['asc'],
batch_size=2)
uuids = [inst['uuid'] for inst in insts]
self.assertEqual(sorted(uuids), uuids)
self.assertEqual(len(self.instances), len(uuids))
def _test_get_sorted_with_limit_marker(self, sort_by, pages=2, pagesize=2,
sort_dir='asc'):
"""Get multiple pages by a sort key and validate the results.

View File

@ -145,15 +145,17 @@ class TestInstanceList(test.NoDBTestCase):
# storing the uuids of the instances from the up cell
uuid_initial = [inst['uuid'] for inst in inst_cell0]
def wrap(thing):
return multi_cell_list.RecordWrapper(ctx, self.context, thing)
ctx = nova_context.RequestContext()
instances = (multi_cell_list.RecordWrapper(ctx, self.context, inst)
for inst in inst_cell0)
instances = [wrap(inst) for inst in inst_cell0]
# creating one up cell and two down cells
ret_val = {}
ret_val[uuids.cell0] = instances
ret_val[uuids.cell1] = nova_context.raised_exception_sentinel
ret_val[uuids.cell2] = nova_context.did_not_respond_sentinel
ret_val[uuids.cell1] = [wrap(nova_context.raised_exception_sentinel)]
ret_val[uuids.cell2] = [wrap(nova_context.did_not_respond_sentinel)]
mock_sg.return_value = ret_val
res = instance_list.get_instances_sorted(self.context, {}, None, None,

View File

@ -10,10 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
import copy
import datetime
import mock
from nova.compute import multi_cell_list
from nova import context
from nova import exception
from nova import objects
from nova import test
from nova.tests import uuidsentinel as uuids
@ -103,3 +108,238 @@ class TestUtils(test.NoDBTestCase):
# Make sure we can tell which cell a request came from
self.assertEqual(uuids.cell, iw1.cell_uuid)
def test_wrapper_sentinels(self):
inst1 = {'key0': 'foo', 'key1': 'd', 'key2': 456}
ctx = context.RequestContext()
ctx.cell_uuid = uuids.cell
sort_ctx = multi_cell_list.RecordSortContext(['key0', 'key1'],
['asc', 'asc'])
iw1 = multi_cell_list.RecordWrapper(ctx, sort_ctx, inst1)
# Wrappers with sentinels
iw2 = multi_cell_list.RecordWrapper(ctx, sort_ctx,
context.did_not_respond_sentinel)
iw3 = multi_cell_list.RecordWrapper(ctx, sort_ctx,
context.raised_exception_sentinel)
# NOTE(danms): The sentinel wrappers always win
self.assertTrue(iw2 < iw1)
self.assertTrue(iw3 < iw1)
self.assertFalse(iw1 < iw2)
self.assertFalse(iw1 < iw3)
# NOTE(danms): Comparing two wrappers with sentinels will always return
# True for less-than because we're just naive about always favoring the
# left hand side. This is fine for our purposes but put it here to make
# it explicit.
self.assertTrue(iw2 < iw3)
self.assertTrue(iw3 < iw2)
def test_query_wrapper_success(self):
def test(ctx, data):
for thing in data:
yield thing
self.assertEqual([1, 2, 3],
list(multi_cell_list.query_wrapper(
None, test, [1, 2, 3])))
def test_query_wrapper_timeout(self):
def test(ctx):
raise exception.CellTimeout
self.assertEqual([context.did_not_respond_sentinel],
[x._db_record for x in
multi_cell_list.query_wrapper(
mock.MagicMock(), test)])
def test_query_wrapper_fail(self):
def test(ctx):
raise test.TestingException
self.assertEqual([context.raised_exception_sentinel],
[x._db_record for x in
multi_cell_list.query_wrapper(
mock.MagicMock(), test)])
class TestListContext(multi_cell_list.RecordSortContext):
def compare_records(self, rec1, rec2):
return -1
class TestLister(multi_cell_list.CrossCellLister):
def __init__(self, data, sort_keys, sort_dirs,
cells=None, batch_size=None):
self._data = data
self._count_by_cell = {}
super(TestLister, self).__init__(TestListContext(sort_keys, sort_dirs),
cells=cells, batch_size=batch_size)
@property
def marker_identifier(self):
return 'id'
def _method_called(self, ctx, method, limit):
self._count_by_cell.setdefault(ctx.cell_uuid, {})
self._count_by_cell[ctx.cell_uuid].setdefault(method, [])
self._count_by_cell[ctx.cell_uuid][method].append(limit)
def call_summary(self, method):
results = {
'total': 0,
'count_by_cell': [],
'limit_by_cell': [],
'total_by_cell': [],
}
for i, cell in enumerate(self._count_by_cell):
results['total'] += len(self._count_by_cell[cell][method])
# List of number of calls in each cell
results['count_by_cell'].append(
len(self._count_by_cell[cell][method]))
# List of limits used in calls to each cell
results['limit_by_cell'].append(
self._count_by_cell[cell][method])
# List of total results fetched from each cell
results['total_by_cell'].append(sum(
self._count_by_cell[cell][method]))
results['count_by_cell'].sort()
results['limit_by_cell'].sort()
results['total_by_cell'].sort()
return results
def get_marker_record(self, ctx, marker):
pass
def get_marker_by_values(self, ctx, values):
pass
def get_by_filters(self, ctx, filters, limit, marker, **kwargs):
self._method_called(ctx, 'get_by_filters', limit)
batch = self._data[:limit]
self._data = self._data[limit:]
return batch
@contextmanager
def target_cell_cheater(context, target_cell):
# In order to help us do accounting, we need to mimic the real
# behavior where at least cell_uuid gets set on the context, which
# doesn't happen in the simple test fixture.
context = copy.deepcopy(context)
context.cell_uuid = target_cell.uuid
yield context
@mock.patch('nova.context.target_cell', new=target_cell_cheater)
class TestBatching(test.NoDBTestCase):
def setUp(self):
super(TestBatching, self).setUp()
self._data = [{'id': 'foo-%i' % i}
for i in range(0, 1000)]
self._cells = [objects.CellMapping(uuid=getattr(uuids, 'cell%i' % i),
name='cell%i' % i)
for i in range(0, 10)]
def test_batches_not_needed(self):
lister = TestLister(self._data, [], [],
cells=self._cells, batch_size=10)
ctx = context.RequestContext()
res = list(lister.get_records_sorted(ctx, {}, 5, None))
self.assertEqual(5, len(res))
summary = lister.call_summary('get_by_filters')
# We only needed one batch per cell to hit the total,
# so we should have the same number of calls as cells
self.assertEqual(len(self._cells), summary['total'])
# One call per cell, hitting all cells
self.assertEqual(len(self._cells), len(summary['count_by_cell']))
self.assertTrue(all([
cell_count == 1 for cell_count in summary['count_by_cell']]))
def test_batches(self):
lister = TestLister(self._data, [], [],
cells=self._cells, batch_size=10)
ctx = context.RequestContext()
res = list(lister.get_records_sorted(ctx, {}, 500, None))
self.assertEqual(500, len(res))
summary = lister.call_summary('get_by_filters')
# Since we got everything from one cell (due to how things are sorting)
# we should have made 500 / 10 calls to one cell, and 1 call to
# the rest
calls_expected = [1 for cell in self._cells[1:]] + [500 / 10]
self.assertEqual(calls_expected, summary['count_by_cell'])
# Since we got everything from one cell (due to how things are sorting)
# we should have received 500 from one cell and 10 from the rest
count_expected = [10 for cell in self._cells[1:]] + [500]
self.assertEqual(count_expected, summary['total_by_cell'])
# Since we got everything from one cell (due to how things are sorting)
# we should have a bunch of calls for batches of 10, one each for
# every cell except the one that served the bulk of the requests which
# should have 500 / 10 batches of 10.
limit_expected = ([[10] for cell in self._cells[1:]] +
[[10 for i in range(0, 500 // 10)]])
self.assertEqual(limit_expected, summary['limit_by_cell'])
def test_no_batches(self):
lister = TestLister(self._data, [], [],
cells=self._cells)
ctx = context.RequestContext()
res = list(lister.get_records_sorted(ctx, {}, 50, None))
self.assertEqual(50, len(res))
summary = lister.call_summary('get_by_filters')
# Since we used no batches we should have one call per cell
calls_expected = [1 for cell in self._cells]
self.assertEqual(calls_expected, summary['count_by_cell'])
# Since we used no batches, each cell should have returned 50 results
count_expected = [50 for cell in self._cells]
self.assertEqual(count_expected, summary['total_by_cell'])
# Since we used no batches, each cell call should be for $limit
limit_expected = [[count] for count in count_expected]
self.assertEqual(limit_expected, summary['limit_by_cell'])
class FailureLister(TestLister):
def __init__(self, *a, **k):
super(FailureLister, self).__init__(*a, **k)
self._fails = [context.did_not_respond_sentinel,
None,
context.raised_exception_sentinel,
None,
None]
def get_by_filters(self, *a, **k):
action = self._fails.pop()
if action == context.did_not_respond_sentinel:
raise exception.CellTimeout
elif action == context.raised_exception_sentinel:
raise test.TestingException
else:
return super(FailureLister, self).get_by_filters(*a, **k)
@mock.patch('nova.context.target_cell', new=target_cell_cheater)
class TestBaseClass(test.NoDBTestCase):
def test_with_failing_cells(self):
data = [{'id': 'foo-%i' % i} for i in range(0, 100)]
cells = [objects.CellMapping(uuid=getattr(uuids, 'cell%i' % i),
name='cell%i' % i)
for i in range(0, 5)]
# Two of the five cells will fail, one with timeout and one
# with an error
lister = FailureLister(data, [], [], cells=cells)
ctx = context.RequestContext()
result = lister.get_records_sorted(ctx, {}, 50, None)
# We should still have 50 results since there are enough from the
# good cells to fill our limit.
self.assertEqual(50, len(list(result)))