Merge "Replace all "with Chunk*Timeout" by a watchdog"

This commit is contained in:
Zuul 2020-04-04 07:53:26 +00:00 committed by Gerrit Code Review
commit 7158adfb22
6 changed files with 267 additions and 36 deletions

View File

@ -62,6 +62,7 @@ import eventlet.patcher
import eventlet.semaphore
import pkg_resources
from eventlet import GreenPool, sleep, Timeout
from eventlet.event import Event
from eventlet.green import socket, threading
import eventlet.hubs
import eventlet.queue
@ -5847,3 +5848,132 @@ def systemd_notify(logger=None):
except EnvironmentError:
if logger:
logger.debug("Systemd notification failed", exc_info=True)
class Watchdog(object):
"""
Implements a watchdog to efficiently manage concurrent timeouts.
Compared to eventlet.timeouts.Timeout, it reduces the number of context
switching in eventlet by avoiding to schedule actions (throw an Exception),
then unschedule them if the timeouts are cancelled.
1. at T+0, request timeout(10)
=> wathdog greenlet sleeps 10 seconds
2. at T+1, request timeout(15)
=> the timeout will expire after the current, no need to wake up the
watchdog greenlet
3. at T+2, request timeout(5)
=> the timeout will expire before the first timeout, wake up the
watchdog greenlet to calculate a new sleep period
4. at T+7, the 3rd timeout expires
=> the exception is raised, then the greenlet watchdog sleep(3) to
wake up for the 1st timeout expiration
"""
def __init__(self):
# key => (timeout, timeout_at, caller_greenthread, exception)
self._timeouts = dict()
self._evt = Event()
self._next_expiration = None
self._run_gth = None
def start(self, timeout, exc, timeout_at=None):
"""
Schedule a timeout action
:param timeout: duration before the timeout expires
:param exc: exception to throw when the timeout expire, must inherit
from eventlet.timeouts.Timeout
:param timeout_at: allow to force the expiration timestamp
:return: id of the scheduled timeout, needed to cancel it
"""
if not timeout_at:
timeout_at = time.time() + timeout
gth = eventlet.greenthread.getcurrent()
timeout_definition = (timeout, timeout_at, gth, exc)
key = id(timeout_definition)
self._timeouts[key] = timeout_definition
# Wake up the watchdog loop only when there is a new shorter timeout
if (self._next_expiration is None
or self._next_expiration > timeout_at):
# There could be concurrency on .send(), so wrap it in a try
try:
if not self._evt.ready():
self._evt.send()
except AssertionError:
pass
return key
def stop(self, key):
"""
Cancel a scheduled timeout
:param key: timeout id, as returned by start()
"""
try:
if key in self._timeouts:
del(self._timeouts[key])
except KeyError:
pass
def spawn(self):
"""
Start the watchdog greenthread.
"""
if self._run_gth is None:
self._run_gth = eventlet.spawn(self.run)
def run(self):
while True:
self._run()
def _run(self):
now = time.time()
self._next_expiration = None
if self._evt.ready():
self._evt.reset()
for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()):
if timeout_at <= now:
try:
if k in self._timeouts:
del(self._timeouts[k])
except KeyError:
pass
e = exc()
e.seconds = timeout
eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e)
else:
if (self._next_expiration is None
or self._next_expiration > timeout_at):
self._next_expiration = timeout_at
if self._next_expiration is None:
sleep_duration = self._next_expiration
else:
sleep_duration = self._next_expiration - now
self._evt.wait(sleep_duration)
class WatchdogTimeout(object):
"""
Context manager to schedule a timeout in a Watchdog instance
"""
def __init__(self, watchdog, timeout, exc, timeout_at=None):
"""
Schedule a timeout in a Watchdog instance
:param watchdog: Watchdog instance
:param timeout: duration before the timeout expires
:param exc: exception to throw when the timeout expire, must inherit
from eventlet.timeouts.Timeout
:param timeout_at: allow to force the expiration timestamp
"""
self.watchdog = watchdog
self.key = watchdog.start(timeout, exc, timeout_at=timeout_at)
def __enter__(self):
pass
def __exit__(self, type, value, traceback):
self.watchdog.stop(self.key)

View File

@ -42,7 +42,7 @@ from eventlet.timeout import Timeout
import six
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, config_true_value, \
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
document_iters_to_http_response_body, ShardRange, find_shard_range
@ -1047,7 +1047,8 @@ class ResumingGetter(object):
# but just a 200 or a single-range 206, then this
# performs no IO, and either just returns source or
# raises StopIteration.
with ChunkReadTimeout(node_timeout):
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
# if StopIteration is raised, it escapes and is
# handled elsewhere
start_byte, end_byte, length, headers, part = next(
@ -1078,7 +1079,8 @@ class ResumingGetter(object):
part_file = ByteCountEnforcer(part_file, nbytes)
while True:
try:
with ChunkReadTimeout(node_timeout):
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
chunk = part_file.read(self.app.object_chunk_size)
nchunks += 1
# NB: this append must be *inside* the context
@ -1138,8 +1140,9 @@ class ResumingGetter(object):
if not chunk:
if buf:
with ChunkWriteTimeout(
self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''
@ -1149,13 +1152,16 @@ class ResumingGetter(object):
while len(buf) >= client_chunk_size:
client_chunk = buf[:client_chunk_size]
buf = buf[client_chunk_size:]
with ChunkWriteTimeout(
self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += \
len(client_chunk)
yield client_chunk
else:
with ChunkWriteTimeout(self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''

View File

@ -44,7 +44,7 @@ from eventlet.timeout import Timeout
from swift.common.utils import (
clean_content_type, config_true_value, ContextPool, csv_append,
GreenAsyncPile, GreenthreadSafeIterator, Timestamp,
GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout,
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads)
@ -869,7 +869,7 @@ class ReplicatedObjectController(BaseObjectController):
def _make_putter(self, node, part, req, headers):
if req.environ.get('swift.callback.update_footers'):
putter = MIMEPutter.connect(
node, part, req.swift_entity_path, headers,
node, part, req.swift_entity_path, headers, self.app.watchdog,
conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
@ -879,7 +879,7 @@ class ReplicatedObjectController(BaseObjectController):
else:
te = ',' + headers.get('Transfer-Encoding', '')
putter = Putter.connect(
node, part, req.swift_entity_path, headers,
node, part, req.swift_entity_path, headers, self.app.watchdog,
conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
@ -897,9 +897,10 @@ class ReplicatedObjectController(BaseObjectController):
bytes_transferred = 0
def send_chunk(chunk):
timeout_at = time.time() + self.app.node_timeout
for putter in list(putters):
if not putter.failed:
putter.send_chunk(chunk)
putter.send_chunk(chunk, timeout_at=timeout_at)
else:
putter.close()
putters.remove(putter)
@ -911,7 +912,9 @@ class ReplicatedObjectController(BaseObjectController):
min_conns = quorum_size(len(nodes))
try:
while True:
with ChunkReadTimeout(self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkReadTimeout):
try:
chunk = next(data_source)
except StopIteration:
@ -1569,13 +1572,14 @@ class Putter(object):
:param resp: an HTTPResponse instance if connect() received final response
:param path: the object path to send to the storage node
:param connect_duration: time taken to initiate the HTTPConnection
:param watchdog: a spawned Watchdog instance that will enforce timeouts
:param write_timeout: time limit to write a chunk to the connection socket
:param send_exception_handler: callback called when an exception occured
writing to the connection socket
:param logger: a Logger instance
:param chunked: boolean indicating if the request encoding is chunked
"""
def __init__(self, conn, node, resp, path, connect_duration,
def __init__(self, conn, node, resp, path, connect_duration, watchdog,
write_timeout, send_exception_handler, logger,
chunked=False):
# Note: you probably want to call Putter.connect() instead of
@ -1585,6 +1589,7 @@ class Putter(object):
self.resp = self.final_resp = resp
self.path = path
self.connect_duration = connect_duration
self.watchdog = watchdog
self.write_timeout = write_timeout
self.send_exception_handler = send_exception_handler
# for handoff nodes node_index is None
@ -1627,7 +1632,7 @@ class Putter(object):
# Subclasses may implement custom behaviour
pass
def send_chunk(self, chunk):
def send_chunk(self, chunk, timeout_at=None):
if not chunk:
# If we're not using chunked transfer-encoding, sending a 0-byte
# chunk is just wasteful. If we *are* using chunked
@ -1641,7 +1646,7 @@ class Putter(object):
self._start_object_data()
self.state = SENDING_DATA
self._send_chunk(chunk)
self._send_chunk(chunk, timeout_at=timeout_at)
def end_of_object_data(self, **kwargs):
"""
@ -1653,14 +1658,15 @@ class Putter(object):
self._send_chunk(b'')
self.state = DATA_SENT
def _send_chunk(self, chunk):
def _send_chunk(self, chunk, timeout_at=None):
if not self.failed:
if self.chunked:
to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk)
else:
to_send = chunk
try:
with ChunkWriteTimeout(self.write_timeout):
with WatchdogTimeout(self.watchdog, self.write_timeout,
ChunkWriteTimeout, timeout_at=timeout_at):
self.conn.send(to_send)
except (Exception, ChunkWriteTimeout):
self.failed = True
@ -1702,9 +1708,9 @@ class Putter(object):
return conn, resp, final_resp, connect_duration
@classmethod
def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
write_timeout, send_exception_handler, logger=None,
chunked=False, **kwargs):
def connect(cls, node, part, path, headers, watchdog, conn_timeout,
node_timeout, write_timeout, send_exception_handler,
logger=None, chunked=False, **kwargs):
"""
Connect to a backend node and send the headers.
@ -1717,7 +1723,7 @@ class Putter(object):
"""
conn, expect_resp, final_resp, connect_duration = cls._make_connection(
node, part, path, headers, conn_timeout, node_timeout)
return cls(conn, node, final_resp, path, connect_duration,
return cls(conn, node, final_resp, path, connect_duration, watchdog,
write_timeout, send_exception_handler, logger,
chunked=chunked)
@ -1732,12 +1738,13 @@ class MIMEPutter(Putter):
An HTTP PUT request that supports streaming.
"""
def __init__(self, conn, node, resp, req, connect_duration,
def __init__(self, conn, node, resp, req, connect_duration, watchdog,
write_timeout, send_exception_handler, logger, mime_boundary,
multiphase=False):
super(MIMEPutter, self).__init__(conn, node, resp, req,
connect_duration, write_timeout,
send_exception_handler, logger)
connect_duration, watchdog,
write_timeout, send_exception_handler,
logger)
# Note: you probably want to call MimePutter.connect() instead of
# instantiating one of these directly.
self.chunked = True # MIME requests always send chunked body
@ -1815,9 +1822,9 @@ class MIMEPutter(Putter):
self.state = COMMIT_SENT
@classmethod
def connect(cls, node, part, req, headers, conn_timeout, node_timeout,
write_timeout, send_exception_handler, logger=None,
need_multiphase=True, **kwargs):
def connect(cls, node, part, req, headers, watchdog, conn_timeout,
node_timeout, write_timeout, send_exception_handler,
logger=None, need_multiphase=True, **kwargs):
"""
Connect to a backend node and send the headers.
@ -1869,7 +1876,7 @@ class MIMEPutter(Putter):
if need_multiphase and not can_handle_multiphase_put:
raise MultiphasePUTNotSupported()
return cls(conn, node, final_resp, req, connect_duration,
return cls(conn, node, final_resp, req, connect_duration, watchdog,
write_timeout, send_exception_handler, logger,
mime_boundary, multiphase=need_multiphase)
@ -2502,7 +2509,7 @@ class ECObjectController(BaseObjectController):
def _make_putter(self, node, part, req, headers):
return MIMEPutter.connect(
node, part, req.swift_entity_path, headers,
node, part, req.swift_entity_path, headers, self.app.watchdog,
conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
@ -2603,6 +2610,7 @@ class ECObjectController(BaseObjectController):
return
updated_frag_indexes = set()
timeout_at = time.time() + self.app.node_timeout
for putter in list(putters):
frag_index = putter_to_frag_index[putter]
backend_chunk = backend_chunks[frag_index]
@ -2613,7 +2621,7 @@ class ECObjectController(BaseObjectController):
if frag_index not in updated_frag_indexes:
frag_hashers[frag_index].update(backend_chunk)
updated_frag_indexes.add(frag_index)
putter.send_chunk(backend_chunk)
putter.send_chunk(backend_chunk, timeout_at=timeout_at)
else:
putter.close()
putters.remove(putter)
@ -2629,7 +2637,9 @@ class ECObjectController(BaseObjectController):
putters, policy)
while True:
with ChunkReadTimeout(self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkReadTimeout):
try:
chunk = next(data_source)
except StopIteration:

View File

@ -32,7 +32,7 @@ from swift.common import constraints
from swift.common.http import is_server_error
from swift.common.storage_policy import POLICIES
from swift.common.ring import Ring
from swift.common.utils import cache_from_env, get_logger, \
from swift.common.utils import Watchdog, cache_from_env, get_logger, \
get_remote_client, split_path, config_true_value, generate_trans_id, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \
register_swift_info, readconf, config_auto_int_value
@ -317,6 +317,8 @@ class Application(object):
allow_account_management=self.allow_account_management,
account_autocreate=self.account_autocreate,
**constraints.EFFECTIVE_CONSTRAINTS)
self.watchdog = Watchdog()
self.watchdog.spawn()
def _make_policy_override(self, policy, conf, override_conf):
label_for_policy = _label_for_policy(policy)

View File

@ -8381,3 +8381,86 @@ class Test_LibcWrapper(unittest.TestCase):
# 0 is SEEK_SET
0)
self.assertEqual(tf.read(100), b"defgh")
class TestWatchdog(unittest.TestCase):
def test_start_stop(self):
w = utils.Watchdog()
w._evt.send = mock.Mock(side_effect=w._evt.send)
gth = object()
with patch('eventlet.greenthread.getcurrent', return_value=gth),\
patch('time.time', return_value=10.0):
# On first call, _next_expiration is None, it should unblock
# greenthread that is blocked for ever
key = w.start(1.0, Timeout)
self.assertIn(key, w._timeouts)
self.assertEqual(w._timeouts[key], (1.0, 11.0, gth, Timeout))
w._evt.send.assert_called_once()
w.stop(key)
self.assertNotIn(key, w._timeouts)
def test_timeout_concurrency(self):
w = utils.Watchdog()
w._evt.send = mock.Mock(side_effect=w._evt.send)
w._evt.wait = mock.Mock()
gth = object()
w._run()
w._evt.wait.assert_called_once_with(None)
with patch('eventlet.greenthread.getcurrent', return_value=gth):
w._evt.send.reset_mock()
w._evt.wait.reset_mock()
with patch('time.time', return_value=10.00):
# On first call, _next_expiration is None, it should unblock
# greenthread that is blocked for ever
w.start(5.0, Timeout) # Will end at 15.0
w._evt.send.assert_called_once()
with patch('time.time', return_value=10.01):
w._run()
self.assertEqual(15.0, w._next_expiration)
w._evt.wait.assert_called_once_with(15.0 - 10.01)
w._evt.send.reset_mock()
w._evt.wait.reset_mock()
with patch('time.time', return_value=12.00):
# Now _next_expiration is 15.0, it won't unblock greenthread
# because this expiration is later
w.start(5.0, Timeout) # Will end at 17.0
w._evt.send.assert_not_called()
w._evt.send.reset_mock()
w._evt.wait.reset_mock()
with patch('time.time', return_value=14.00):
# Now _next_expiration is still 15.0, it will unblock
# greenthread because this new expiration is 14.5
w.start(0.5, Timeout) # Will end at 14.5
w._evt.send.assert_called_once()
with patch('time.time', return_value=14.01):
w._run()
w._evt.wait.assert_called_once_with(14.5 - 14.01)
self.assertEqual(14.5, w._next_expiration)
# Should wakeup at 14.5
def test_timeout_expire(self):
w = utils.Watchdog()
w._evt.send = mock.Mock() # To avoid it to call get_hub()
w._evt.wait = mock.Mock() # To avoid it to call get_hub()
with patch('eventlet.hubs.get_hub') as m_gh:
with patch('time.time', return_value=10.0):
w.start(5.0, Timeout) # Will end at 15.0
with patch('time.time', return_value=16.0):
w._run()
m_gh.assert_called_once()
m_gh.return_value.schedule_call_global.assert_called_once()
exc = m_gh.return_value.schedule_call_global.call_args[0][2]
self.assertIsInstance(exc, Timeout)
self.assertEqual(exc.seconds, 5.0)
self.assertEqual(None, w._next_expiration)
w._evt.wait.assert_called_once_with(None)

View File

@ -67,7 +67,7 @@ from swift.common.middleware import proxy_logging, versioned_writes, \
copy, listing_formats
from swift.common.middleware.acl import parse_acl, format_acl
from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \
APIVersionError, ChunkWriteTimeout, ChunkReadError
APIVersionError, ChunkReadError
from swift.common import utils, constraints
from swift.common.utils import hash_path, storage_directory, \
parse_content_type, parse_mime_headers, \
@ -7302,7 +7302,7 @@ class BaseTestECObjectController(BaseTestObjectController):
exp = b'HTTP/1.1 201'
self.assertEqual(headers[:len(exp)], exp)
class WrappedTimeout(ChunkWriteTimeout):
class WrappedTimeout(utils.WatchdogTimeout):
def __enter__(self):
timeouts[self] = traceback.extract_stack()
return super(WrappedTimeout, self).__enter__()
@ -7312,7 +7312,7 @@ class BaseTestECObjectController(BaseTestObjectController):
return super(WrappedTimeout, self).__exit__(typ, value, tb)
timeouts = {}
with mock.patch('swift.proxy.controllers.base.ChunkWriteTimeout',
with mock.patch('swift.proxy.controllers.base.WatchdogTimeout',
WrappedTimeout):
with mock.patch.object(_test_servers[0], 'client_timeout', new=5):
# get object