From 35a9305f172ed8970caf1ff5cec261df7d3fe9ce Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 15 Mar 2016 10:50:00 -0700 Subject: [PATCH] Ensure the fetching jobs does not fetch anything when in bad state When the underlying connection is in LOST or SUSPENDED mode do not allow jobs to be iterated over (and clear the local cache when the connection has been LOST). Related-Bug: #1557107 Change-Id: Ic0a2ab2519ff8a7386d80d9092a0e24579883681 --- taskflow/jobs/backends/impl_zookeeper.py | 74 +++++++++++++++++++++--- 1 file changed, 66 insertions(+), 8 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 098108e06..b3b72bfd2 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import functools import sys @@ -23,6 +24,7 @@ import fasteners import futurist from kazoo import exceptions as k_exceptions from kazoo.protocol import paths as k_paths +from kazoo.protocol import states as k_states from kazoo.recipe import watchers from oslo_serialization import jsonutils from oslo_utils import excutils @@ -261,6 +263,19 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): #: Default znode path used for jobs (data, locks...). DEFAULT_PATH = "/taskflow/jobs" + STATE_HISTORY_LENGTH = 2 + """ + Number of prior state changes to keep a history of, mainly useful + for history tracking and debugging connectivity issues. + """ + + NO_FETCH_STATES = (k_states.KazooState.LOST, k_states.KazooState.SUSPENDED) + """ + Client states underwhich we return empty lists from fetching routines, + during these states the underlying connection either is being recovered + or may be recovered (aka, it has not full disconnected). + """ + def __init__(self, name, conf, client=None, persistence=None, emit_notifications=True): super(ZookeeperJobBoard, self).__init__(name, conf) @@ -298,6 +313,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._worker = None self._emit_notifications = bool(emit_notifications) self._connected = False + self._suspended = False + self._last_states = collections.deque(maxlen=self.STATE_HISTORY_LENGTH) def _try_emit(self, state, details): # Submit the work to the executor to avoid blocking the kazoo threads @@ -334,10 +351,25 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): return len(self._known_jobs) def _fetch_jobs(self, ensure_fresh=False): - if ensure_fresh: - self._force_refresh() - with self._job_cond: - return sorted(six.itervalues(self._known_jobs), reverse=True) + try: + last_state = self._last_states[0] + except IndexError: + last_state = None + if last_state in self.NO_FETCH_STATES: + # NOTE(harlowja): on lost clear out all known jobs (from the + # in-memory mapping) as we can not safely assume there are any + # jobs to continue working on in this state. + if last_state == k_states.KazooState.LOST and self._known_jobs: + # This will force the jobboard to drop all (in-memory) jobs + # that are not in this list (pretty much simulating what + # would happen if a jobboard data directory was emptied). + self._on_job_posting([], delayed=False) + return [] + else: + if ensure_fresh: + self._force_refresh() + with self._job_cond: + return sorted(six.itervalues(self._known_jobs)) def _force_refresh(self): try: @@ -364,12 +396,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): def _remove_job(self, path): if path not in self._known_jobs: - return + return False with self._job_cond: job = self._known_jobs.pop(path, None) if job is not None: LOG.debug("Removed job that was at path '%s'", path) self._try_emit(base.REMOVAL, details={'job': job}) + return True + else: + return False def _process_child(self, path, request, quiet=True): """Receives the result of a child data fetch request.""" @@ -456,8 +491,13 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): investigate_paths.append(path) if pending_removals: with self._job_cond: - for path in pending_removals: - self._remove_job(path) + am_removed = 0 + try: + for path in pending_removals: + am_removed += int(self._remove_job(path)) + finally: + if am_removed: + self._job_cond.notify_all() for path in investigate_paths: # Fire off the request to populate this job. # @@ -694,7 +734,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): kazoo_utils.checked_commit(txn) def _state_change_listener(self, state): - LOG.debug("Kazoo client has changed to state: %s", state) + if self._last_states: + LOG.debug("Kazoo client has changed to" + " state '%s' from prior states '%s'", state, + self._last_states) + else: + LOG.debug("Kazoo client has changed to state '%s' (from" + " its initial/uninitialized state)", state) + self._last_states.appendleft(state) + if state == k_states.KazooState.LOST: + self._connected = False + LOG.warn("Connection to zookeeper has been lost") + elif state == k_states.KazooState.SUSPENDED: + LOG.warn("Connection to zookeeper has been suspended") + self._suspended = True + else: + # Must be CONNECTED then (as there are only 3 enums) + if self._suspended: + self._suspended = False def wait(self, timeout=None): # Wait until timeout expires (or forever) for jobs to appear. @@ -738,6 +795,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._known_jobs.clear() LOG.debug("Stopped & cleared local state") self._connected = False + self._last_states.clear() @fasteners.locked(lock='_open_close_lock') def connect(self, timeout=10.0):