From bbbac3dc3678df069ef72ecfea62d435bc519a07 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 18 Jun 2017 23:18:41 -0700 Subject: [PATCH] Fixup for #1085 -- only check for changed metadata on disconnected nodes --- kafka/client_async.py | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 0b08415..d8c2389 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -325,31 +325,37 @@ class KafkaClient(object): def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" broker = self.cluster.broker_metadata(node_id) + conn = self._conns.get(node_id) - # If broker metadata indicates that a node's host/port has changed, remove it - if node_id in self._conns and broker is not None: - conn = self._conns[node_id] - host, _, __ = get_ip_port_afi(broker.host) - if conn.host != host or conn.port != broker.port: - log.debug("Closing connection to decommissioned node %s at %s:%s", - node_id, conn.host, conn.port) - conn.close() - self._conns.pop(node_id) - - if node_id not in self._conns: + if conn is None: assert broker, 'Broker id %s not in current metadata' % node_id log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) host, port, afi = get_ip_port_afi(broker.host) cb = functools.partial(self._conn_state_change, node_id) - self._conns[node_id] = BrokerConnection(host, broker.port, afi, - state_change_callback=cb, - node_id=node_id, - **self.config) - conn = self._conns[node_id] - if conn.connected(): + conn = BrokerConnection(host, broker.port, afi, + state_change_callback=cb, + node_id=node_id, + **self.config) + self._conns[node_id] = conn + + # Check if existing connection should be recreated because host/port changed + elif conn.disconnected() and broker is not None: + host, _, __ = get_ip_port_afi(broker.host) + if conn.host != host or conn.port != broker.port: + log.info("Broker metadata change detected for node %s" + " from %s:%s to %s:%s", node_id, conn.host, conn.port, + broker.host, broker.port) + + # Drop old connection object. + # It will be recreated on next _maybe_connect + self._conns.pop(node_id) + return False + + elif conn.connected(): return True + conn.connect() return conn.connected()