From c12e04bc0c30c627d849d0057e4a460edc5ffafe Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 19 Jun 2016 22:27:10 -0700 Subject: [PATCH] Use randomized exponential backoff policy for BrokerConnection --- kafka/client_async.py | 29 ++++++++++------------------- kafka/conn.py | 32 +++++++++++++++++++++++++++++--- kafka/consumer/group.py | 5 +++++ kafka/producer/kafka.py | 5 +++++ 4 files changed, 49 insertions(+), 22 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index d8c2389..44bc9af 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -67,6 +67,10 @@ class KafkaClient(object): reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. + reconnect_backoff_max (int): If higher than reconnect_backoff_ms, + node reconnect backoff will increase on each consecutive failure + up to this maximum. The actual backoff is chosen randomly from + an exponentially increasing range. Default: 60000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 40000. retry_backoff_ms (int): Milliseconds to backoff when retrying on @@ -137,6 +141,7 @@ class KafkaClient(object): 'request_timeout_ms': 40000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, + 'reconnect_backoff_max': 60000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, @@ -432,15 +437,7 @@ class KafkaClient(object): """ if node_id not in self._conns: return 0 - - conn = self._conns[node_id] - time_waited_ms = time.time() - (conn.last_attempt or 0) - if conn.disconnected(): - return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0) - elif conn.connecting(): - return 0 - else: - return 999999999 + return self._conns[node_id].connection_delay() def is_ready(self, node_id, metadata_priority=True): """Check whether a node is ready to send more requests. @@ -655,12 +652,10 @@ class KafkaClient(object): def least_loaded_node(self): """Choose the node with fewest outstanding requests, with fallbacks. - This method will prefer a node with an existing connection, but will - potentially choose a node for which we don't yet have a connection if - all existing connections are in use. This method will never choose a - node that was disconnected within the reconnect backoff period. - If all else fails, the method will attempt to bootstrap again using the - bootstrap_servers list. + This method will prefer a node with an existing connection and no + in-flight-requests. If no such node is found, a node will be chosen + randomly from disconnected nodes that are not "blacked out" (i.e., + are not subject to a reconnect backoff). Returns: node_id or None if no suitable node was found @@ -695,10 +690,6 @@ class KafkaClient(object): elif 'bootstrap' in self._conns: return 'bootstrap' - # Last option: try to bootstrap again - # this should only happen if no prior bootstrap has been successful - log.error('No nodes found in metadata -- retrying bootstrap') - self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) return None def set_topics(self, topics): diff --git a/kafka/conn.py b/kafka/conn.py index 12bd08d..687b748 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,7 +5,7 @@ import copy import errno import logging import io -from random import shuffle +from random import randint, shuffle import socket import time import traceback @@ -140,6 +140,7 @@ class BrokerConnection(object): 'node_id': 0, 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, + 'reconnect_backoff_max': 60000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, @@ -199,6 +200,7 @@ class BrokerConnection(object): assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' self.state = ConnectionStates.DISCONNECTED + self._reset_reconnect_backoff() self._sock = None self._ssl_context = None if self.config['ssl_context'] is not None: @@ -305,6 +307,7 @@ class BrokerConnection(object): else: log.debug('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() self.config['state_change_callback'](self) # Connection failed @@ -340,6 +343,7 @@ class BrokerConnection(object): log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) log.debug('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() self.config['state_change_callback'](self) return self.state @@ -475,11 +479,19 @@ class BrokerConnection(object): re-establish a connection yet """ if self.state is ConnectionStates.DISCONNECTED: - backoff = self.config['reconnect_backoff_ms'] / 1000.0 - if time.time() < self.last_attempt + backoff: + if time.time() < self.last_attempt + self._reconnect_backoff: return True return False + def connection_delay(self): + time_waited_ms = time.time() - (self.last_attempt or 0) + if conn.state is ConnectionStates.DISCONNECTED: + return max(self._reconnect_backoff - time_waited_ms, 0) + elif conn.connecting(): + return 0 + else: + return 999999999 + def connected(self): """Return True iff socket is connected.""" return self.state is ConnectionStates.CONNECTED @@ -495,6 +507,19 @@ class BrokerConnection(object): """Return True iff socket is closed""" return self.state is ConnectionStates.DISCONNECTED + def _reset_reconnect_backoff(self): + self._failures = 0 + self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0 + + def _update_reconnect_backoff(self): + if self.config['reconnect_backoff_max'] > self.config['reconnect_backoff_ms']: + self._failures += 1 + self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1) + self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max']) + self._reconnect_backoff = randint(self.config['reconnect_backoff_ms'], self._reconnect_backoff) + self._reconnect_backoff /= 1000.0 + log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures) + def close(self, error=None): """Close socket and fail all in-flight-requests. @@ -512,6 +537,7 @@ class BrokerConnection(object): log.info('%s: Closing connection. %s', self, error or '') self.state = ConnectionStates.DISCONNECTING self.config['state_change_callback'](self) + self._update_reconnect_backoff() if self._sock: self._sock.close() self._sock = None diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 15a8947..2512139 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -91,6 +91,10 @@ class KafkaConsumer(six.Iterator): reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. + reconnect_backoff_max (int): If higher than reconnect_backoff_ms, + node reconnect backoff will increase on each consecutive failure + up to this maximum. The actual backoff is chosen randomly from + an exponentially increasing range. Default: 60000. max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. @@ -230,6 +234,7 @@ class KafkaConsumer(six.Iterator): 'request_timeout_ms': 40 * 1000, 'retry_backoff_ms': 100, 'reconnect_backoff_ms': 50, + 'reconnect_backoff_max': 60000, 'max_in_flight_requests_per_connection': 5, 'auto_offset_reset': 'latest', 'enable_auto_commit': True, diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 51c2182..482628d 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -199,6 +199,10 @@ class KafkaProducer(object): reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. + reconnect_backoff_max (int): If higher than reconnect_backoff_ms, + node reconnect backoff will increase on each consecutive failure + up to this maximum. The actual backoff is chosen randomly from + an exponentially increasing range. Default: 60000. max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. @@ -276,6 +280,7 @@ class KafkaProducer(object): 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 'reconnect_backoff_ms': 50, + 'reconnect_backoff_max': 60000, 'max_in_flight_requests_per_connection': 5, 'security_protocol': 'PLAINTEXT', 'ssl_context': None,