diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 158e2357f..dd2392e95 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -317,6 +317,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._emit_notifications = bool(emit_notifications) self._connected = False self._suspended = False + self._closing = False self._last_states = collections.deque(maxlen=self.STATE_HISTORY_LENGTH) def _try_emit(self, state, details): @@ -748,7 +749,11 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._last_states.appendleft(state) if state == k_states.KazooState.LOST: self._connected = False - LOG.warning("Connection to zookeeper has been lost") + # When the client is itself closing itself down this will be + # triggered, but in that case we expect it, so we don't need + # to emit a warning message. + if not self._closing: + LOG.warning("Connection to zookeeper has been lost") elif state == k_states.KazooState.SUSPENDED: LOG.warning("Connection to zookeeper has been suspended") self._suspended = True @@ -790,6 +795,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): def close(self): if self._owned: LOG.debug("Stopping client") + self._closing = True kazoo_utils.finalize_client(self._client) if self._worker is not None: LOG.debug("Shutting down the notifier") @@ -818,6 +824,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): if timeout is not None: timeout = float(timeout) self._client.start(timeout=timeout) + self._closing = False except (self._client.handler.timeout_exception, k_exceptions.KazooException): excp.raise_with_cause(excp.JobFailure,