diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 68b1bda..ab259dd 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -286,6 +286,10 @@ class BaseCoordinator(object): e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) return Future().failure(e) + elif not self._client.ready(self.coordinator_id, metadata_priority=False): + e = Errors.NodeNotReadyError(self.coordinator_id) + return Future().failure(e) + # send a join group request to the coordinator log.info("(Re-)joining group %s", self.group_id) request = JoinGroupRequest[0]( @@ -416,6 +420,13 @@ class BaseCoordinator(object): if self.coordinator_unknown(): e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) return Future().failure(e) + + # We assume that coordinator is ready if we're sending SyncGroup + # as it typically follows a successful JoinGroup + # Also note that if client.ready() enforces a metadata priority policy, + # we can get into an infinite loop if the leader assignment process + # itself requests a metadata update + future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_sync_group_response, future, time.time()) @@ -467,6 +478,10 @@ class BaseCoordinator(object): if node_id is None: return Future().failure(Errors.NoBrokersAvailable()) + elif not self._client.ready(node_id, metadata_priority=False): + e = Errors.NodeNotReadyError(node_id) + return Future().failure(e) + log.debug("Sending group coordinator request for group %s to broker %s", self.group_id, node_id) request = GroupCoordinatorRequest[0](self.group_id) @@ -553,6 +568,14 @@ class BaseCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" + if self.coordinator_unknown(): + e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) + return Future().failure(e) + + elif not self._client.ready(self.coordinator_id, metadata_priority=False): + e = Errors.NodeNotReadyError(self.coordinator_id) + return Future().failure(e) + request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id) log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member future = Future()