Use randomized exponential backoff policy for BrokerConnection

This commit is contained in:
Dana Powers 2016-06-19 22:27:10 -07:00
parent bbbac3dc36
commit c12e04bc0c
4 changed files with 49 additions and 22 deletions

View File

@ -67,6 +67,10 @@ class KafkaClient(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host. wait before attempting to reconnect to a given host.
Default: 50. Default: 50.
reconnect_backoff_max (int): If higher than reconnect_backoff_ms,
node reconnect backoff will increase on each consecutive failure
up to this maximum. The actual backoff is chosen randomly from
an exponentially increasing range. Default: 60000.
request_timeout_ms (int): Client request timeout in milliseconds. request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000. Default: 40000.
retry_backoff_ms (int): Milliseconds to backoff when retrying on retry_backoff_ms (int): Milliseconds to backoff when retrying on
@ -137,6 +141,7 @@ class KafkaClient(object):
'request_timeout_ms': 40000, 'request_timeout_ms': 40000,
'connections_max_idle_ms': 9 * 60 * 1000, 'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50, 'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5, 'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None, 'receive_buffer_bytes': None,
'send_buffer_bytes': None, 'send_buffer_bytes': None,
@ -432,15 +437,7 @@ class KafkaClient(object):
""" """
if node_id not in self._conns: if node_id not in self._conns:
return 0 return 0
return self._conns[node_id].connection_delay()
conn = self._conns[node_id]
time_waited_ms = time.time() - (conn.last_attempt or 0)
if conn.disconnected():
return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
elif conn.connecting():
return 0
else:
return 999999999
def is_ready(self, node_id, metadata_priority=True): def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests. """Check whether a node is ready to send more requests.
@ -655,12 +652,10 @@ class KafkaClient(object):
def least_loaded_node(self): def least_loaded_node(self):
"""Choose the node with fewest outstanding requests, with fallbacks. """Choose the node with fewest outstanding requests, with fallbacks.
This method will prefer a node with an existing connection, but will This method will prefer a node with an existing connection and no
potentially choose a node for which we don't yet have a connection if in-flight-requests. If no such node is found, a node will be chosen
all existing connections are in use. This method will never choose a randomly from disconnected nodes that are not "blacked out" (i.e.,
node that was disconnected within the reconnect backoff period. are not subject to a reconnect backoff).
If all else fails, the method will attempt to bootstrap again using the
bootstrap_servers list.
Returns: Returns:
node_id or None if no suitable node was found node_id or None if no suitable node was found
@ -695,10 +690,6 @@ class KafkaClient(object):
elif 'bootstrap' in self._conns: elif 'bootstrap' in self._conns:
return 'bootstrap' return 'bootstrap'
# Last option: try to bootstrap again
# this should only happen if no prior bootstrap has been successful
log.error('No nodes found in metadata -- retrying bootstrap')
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
return None return None
def set_topics(self, topics): def set_topics(self, topics):

View File

@ -5,7 +5,7 @@ import copy
import errno import errno
import logging import logging
import io import io
from random import shuffle from random import randint, shuffle
import socket import socket
import time import time
import traceback import traceback
@ -140,6 +140,7 @@ class BrokerConnection(object):
'node_id': 0, 'node_id': 0,
'request_timeout_ms': 40000, 'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50, 'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5, 'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None, 'receive_buffer_bytes': None,
'send_buffer_bytes': None, 'send_buffer_bytes': None,
@ -199,6 +200,7 @@ class BrokerConnection(object):
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
self.state = ConnectionStates.DISCONNECTED self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
self._sock = None self._sock = None
self._ssl_context = None self._ssl_context = None
if self.config['ssl_context'] is not None: if self.config['ssl_context'] is not None:
@ -305,6 +307,7 @@ class BrokerConnection(object):
else: else:
log.debug('%s: Connection complete.', self) log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self) self.config['state_change_callback'](self)
# Connection failed # Connection failed
@ -340,6 +343,7 @@ class BrokerConnection(object):
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.debug('%s: Connection complete.', self) log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self) self.config['state_change_callback'](self)
return self.state return self.state
@ -475,11 +479,19 @@ class BrokerConnection(object):
re-establish a connection yet re-establish a connection yet
""" """
if self.state is ConnectionStates.DISCONNECTED: if self.state is ConnectionStates.DISCONNECTED:
backoff = self.config['reconnect_backoff_ms'] / 1000.0 if time.time() < self.last_attempt + self._reconnect_backoff:
if time.time() < self.last_attempt + backoff:
return True return True
return False return False
def connection_delay(self):
time_waited_ms = time.time() - (self.last_attempt or 0)
if conn.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited_ms, 0)
elif conn.connecting():
return 0
else:
return 999999999
def connected(self): def connected(self):
"""Return True iff socket is connected.""" """Return True iff socket is connected."""
return self.state is ConnectionStates.CONNECTED return self.state is ConnectionStates.CONNECTED
@ -495,6 +507,19 @@ class BrokerConnection(object):
"""Return True iff socket is closed""" """Return True iff socket is closed"""
return self.state is ConnectionStates.DISCONNECTED return self.state is ConnectionStates.DISCONNECTED
def _reset_reconnect_backoff(self):
self._failures = 0
self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
def _update_reconnect_backoff(self):
if self.config['reconnect_backoff_max'] > self.config['reconnect_backoff_ms']:
self._failures += 1
self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max'])
self._reconnect_backoff = randint(self.config['reconnect_backoff_ms'], self._reconnect_backoff)
self._reconnect_backoff /= 1000.0
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
def close(self, error=None): def close(self, error=None):
"""Close socket and fail all in-flight-requests. """Close socket and fail all in-flight-requests.
@ -512,6 +537,7 @@ class BrokerConnection(object):
log.info('%s: Closing connection. %s', self, error or '') log.info('%s: Closing connection. %s', self, error or '')
self.state = ConnectionStates.DISCONNECTING self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self) self.config['state_change_callback'](self)
self._update_reconnect_backoff()
if self._sock: if self._sock:
self._sock.close() self._sock.close()
self._sock = None self._sock = None

View File

@ -91,6 +91,10 @@ class KafkaConsumer(six.Iterator):
reconnect_backoff_ms (int): The amount of time in milliseconds to reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host. wait before attempting to reconnect to a given host.
Default: 50. Default: 50.
reconnect_backoff_max (int): If higher than reconnect_backoff_ms,
node reconnect backoff will increase on each consecutive failure
up to this maximum. The actual backoff is chosen randomly from
an exponentially increasing range. Default: 60000.
max_in_flight_requests_per_connection (int): Requests are pipelined max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per to kafka brokers up to this number of maximum requests per
broker connection. Default: 5. broker connection. Default: 5.
@ -230,6 +234,7 @@ class KafkaConsumer(six.Iterator):
'request_timeout_ms': 40 * 1000, 'request_timeout_ms': 40 * 1000,
'retry_backoff_ms': 100, 'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50, 'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5, 'max_in_flight_requests_per_connection': 5,
'auto_offset_reset': 'latest', 'auto_offset_reset': 'latest',
'enable_auto_commit': True, 'enable_auto_commit': True,

View File

@ -199,6 +199,10 @@ class KafkaProducer(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host. wait before attempting to reconnect to a given host.
Default: 50. Default: 50.
reconnect_backoff_max (int): If higher than reconnect_backoff_ms,
node reconnect backoff will increase on each consecutive failure
up to this maximum. The actual backoff is chosen randomly from
an exponentially increasing range. Default: 60000.
max_in_flight_requests_per_connection (int): Requests are pipelined max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per to kafka brokers up to this number of maximum requests per
broker connection. Default: 5. broker connection. Default: 5.
@ -276,6 +280,7 @@ class KafkaProducer(object):
'send_buffer_bytes': None, 'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'reconnect_backoff_ms': 50, 'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5, 'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT', 'security_protocol': 'PLAINTEXT',
'ssl_context': None, 'ssl_context': None,