Use bigger GreenPool for concurrent EC
We're getting some blockage trying to feed backup requests in waterfall EC because the pool_size was limited to the initial batch of requests. This was (un?)fortunately working out in practice because there were lots of initial primary fragment requests and some would inevitably be quick enough to make room for the pending feeder requests. But when enough of the initial requests were slow (network issue at the proxy?) we wouldn't have the expected number of pending backup requests in-flight. Since concurrent EC should never make extra requests to non-primaries (at least not until an existing primary request completes) ec_n_unique_fragments makes a reasonable cap for the pool. Drive-bys: * Don't make concurrent_ec_extra_requests unless you have enabled concurrent_gets. * Improved mock_http_connect extra requests tracking formatting * FakeStatus __repr__'s w/ status code in AssertionErrors Change-Id: Iec579ed874ef097c659dc80fff1ba326b6da05e9
This commit is contained in:
parent
004052dc65
commit
5f95e1bece
|
@ -2907,9 +2907,10 @@ class ECObjectController(BaseObjectController):
|
|||
safe_iter = GreenthreadSafeIterator(node_iter)
|
||||
|
||||
policy_options = self.app.get_policy_options(policy)
|
||||
ec_request_count = policy.ec_ndata + \
|
||||
policy_options.concurrent_ec_extra_requests
|
||||
with ContextPool(ec_request_count) as pool:
|
||||
ec_request_count = policy.ec_ndata
|
||||
if policy_options.concurrent_gets:
|
||||
ec_request_count += policy_options.concurrent_ec_extra_requests
|
||||
with ContextPool(policy.ec_n_unique_fragments) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
buckets = ECGetResponseCollection(policy)
|
||||
node_iter.set_node_provider(buckets.provide_alternate_node)
|
||||
|
@ -2921,7 +2922,7 @@ class ECObjectController(BaseObjectController):
|
|||
self.app.logger.thread_locals)
|
||||
|
||||
feeder_q = None
|
||||
if self.app.get_policy_options(policy).concurrent_gets:
|
||||
if policy_options.concurrent_gets:
|
||||
feeder_q = Queue()
|
||||
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
|
||||
partition, policy, buckets, feeder_q,
|
||||
|
|
|
@ -838,6 +838,11 @@ class FakeStatus(object):
|
|||
self.expect_sleep_list.append(None)
|
||||
self.response_sleep = response_sleep
|
||||
|
||||
def __repr__(self):
|
||||
return '%s(%s, expect_status=%r, response_sleep=%s)' % (
|
||||
self.__class__.__name__, self.status,
|
||||
self.expect_status, self.response_sleep)
|
||||
|
||||
def get_response_status(self):
|
||||
if self.response_sleep is not None:
|
||||
eventlet.sleep(self.response_sleep)
|
||||
|
@ -1078,7 +1083,7 @@ def fake_http_connect(*code_iter, **kwargs):
|
|||
# the code under test may swallow the StopIteration, so by logging
|
||||
# unexpected requests here we allow the test framework to check for
|
||||
# them after the connect function has been used.
|
||||
unexpected_requests.append((args, kwargs))
|
||||
unexpected_requests.append((args, ckwargs))
|
||||
raise
|
||||
|
||||
if 'give_connect' in kwargs:
|
||||
|
@ -1142,8 +1147,8 @@ def mocked_http_conn(*args, **kwargs):
|
|||
if left_over_status:
|
||||
raise AssertionError('left over status %r' % left_over_status)
|
||||
if fake_conn.unexpected_requests:
|
||||
raise AssertionError('unexpected requests %r' %
|
||||
fake_conn.unexpected_requests)
|
||||
raise AssertionError('unexpected requests:\n%s' % '\n '.join(
|
||||
'%r' % (req,) for req in fake_conn.unexpected_requests))
|
||||
|
||||
|
||||
def make_timestamp_iter(offset=0):
|
||||
|
|
|
@ -2602,6 +2602,50 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||
self.assertEqual(len(log.requests),
|
||||
self.policy.ec_n_unique_fragments)
|
||||
|
||||
def test_ec_concurrent_GET_with_slow_leaders(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-289]
|
||||
etag = md5(test_data).hexdigest()
|
||||
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
|
||||
ts = self.ts()
|
||||
headers = []
|
||||
for i, body in enumerate(ec_archive_bodies):
|
||||
headers.append({
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(body),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index':
|
||||
self.policy.get_backend_index(i),
|
||||
'X-Backend-Timestamp': ts.internal,
|
||||
'X-Timestamp': ts.normal,
|
||||
'X-Backend-Durable-Timestamp': ts.internal,
|
||||
'X-Backend-Data-Timestamp': ts.internal,
|
||||
})
|
||||
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_gets = True
|
||||
policy_opts.concurrency_timeout = 0.0
|
||||
|
||||
slow_count = 4
|
||||
status_codes = ([
|
||||
FakeStatus(200, response_sleep=0.2),
|
||||
] * slow_count) + ([
|
||||
FakeStatus(200, response_sleep=0.1),
|
||||
] * (self.policy.ec_n_unique_fragments - slow_count))
|
||||
for i in range(slow_count):
|
||||
# poison the super slow requests
|
||||
ec_archive_bodies[i] = ''
|
||||
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
|
||||
headers=headers) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, test_data, '%r != %r' % (
|
||||
resp.body if len(resp.body) < 60 else '%s...' % resp.body[:60],
|
||||
test_data if len(test_data) < 60 else '%s...' % test_data[:60],
|
||||
))
|
||||
self.assertEqual(len(log.requests), self.policy.ec_n_unique_fragments)
|
||||
|
||||
def test_GET_with_slow_nodes_and_failures(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-289]
|
||||
|
@ -2722,6 +2766,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_ec_extra_requests = self.policy.ec_nparity - 1
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
# w/o concurrent_gets ec_extra_requests has no effect
|
||||
status_codes = [200] * self.policy.ec_ndata
|
||||
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
|
||||
headers=headers) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(len(log.requests), self.policy.ec_ndata)
|
||||
self.assertEqual(resp.body, test_data)
|
||||
|
||||
policy_opts.concurrent_gets = True
|
||||
status_codes = [200] * (self.policy.object_ring.replicas - 1)
|
||||
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
|
||||
headers=headers) as log:
|
||||
|
|
Loading…
Reference in New Issue