Always check for request timeouts (#887)
* Check for requests that timeout without causing a socket read/write event
This commit is contained in:
parent
57ea7e81dc
commit
f71cfc4607
|
@ -578,6 +578,14 @@ class KafkaClient(object):
|
|||
if response:
|
||||
responses.append(response)
|
||||
|
||||
for conn in six.itervalues(self._conns):
|
||||
if conn.requests_timed_out():
|
||||
log.warning('%s timed out after %s ms. Closing connection.',
|
||||
conn, conn.config['request_timeout_ms'])
|
||||
conn.close(error=Errors.RequestTimedOutError(
|
||||
'Request timed out after %s ms' %
|
||||
conn.config['request_timeout_ms']))
|
||||
|
||||
if self._sensors:
|
||||
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
|
||||
return responses
|
||||
|
|
|
@ -575,15 +575,15 @@ class BrokerConnection(object):
|
|||
log.warning('%s: No in-flight-requests to recv', self)
|
||||
return None
|
||||
|
||||
elif self._requests_timed_out():
|
||||
response = self._recv()
|
||||
if not response and self.requests_timed_out():
|
||||
log.warning('%s timed out after %s ms. Closing connection.',
|
||||
self, self.config['request_timeout_ms'])
|
||||
self.close(error=Errors.RequestTimedOutError(
|
||||
'Request timed out after %s ms' %
|
||||
self.config['request_timeout_ms']))
|
||||
return None
|
||||
|
||||
return self._recv()
|
||||
return response
|
||||
|
||||
def _recv(self):
|
||||
# Not receiving is the state of reading the payload header
|
||||
|
@ -719,7 +719,7 @@ class BrokerConnection(object):
|
|||
self._processing = False
|
||||
return response
|
||||
|
||||
def _requests_timed_out(self):
|
||||
def requests_timed_out(self):
|
||||
if self.in_flight_requests:
|
||||
oldest_at = self.in_flight_requests[0].timestamp
|
||||
timeout = self.config['request_timeout_ms'] / 1000.0
|
||||
|
|
Loading…
Reference in New Issue