Ensure the fetching jobs does not fetch anything when in bad state

When the underlying connection is in LOST or SUSPENDED mode do not
allow jobs to be iterated over (and clear the local cache when the
connection has been LOST).

Related-Bug: #1557107

Change-Id: Ic0a2ab2519ff8a7386d80d9092a0e24579883681
This commit is contained in:
Joshua Harlow 2016-03-15 10:50:00 -07:00 committed by Joshua Harlow
parent 1bc8dd9bca
commit 35a9305f17
1 changed files with 66 additions and 8 deletions

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import functools
import sys
@ -23,6 +24,7 @@ import fasteners
import futurist
from kazoo import exceptions as k_exceptions
from kazoo.protocol import paths as k_paths
from kazoo.protocol import states as k_states
from kazoo.recipe import watchers
from oslo_serialization import jsonutils
from oslo_utils import excutils
@ -261,6 +263,19 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
#: Default znode path used for jobs (data, locks...).
DEFAULT_PATH = "/taskflow/jobs"
STATE_HISTORY_LENGTH = 2
"""
Number of prior state changes to keep a history of, mainly useful
for history tracking and debugging connectivity issues.
"""
NO_FETCH_STATES = (k_states.KazooState.LOST, k_states.KazooState.SUSPENDED)
"""
Client states underwhich we return empty lists from fetching routines,
during these states the underlying connection either is being recovered
or may be recovered (aka, it has not full disconnected).
"""
def __init__(self, name, conf,
client=None, persistence=None, emit_notifications=True):
super(ZookeeperJobBoard, self).__init__(name, conf)
@ -298,6 +313,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._worker = None
self._emit_notifications = bool(emit_notifications)
self._connected = False
self._suspended = False
self._last_states = collections.deque(maxlen=self.STATE_HISTORY_LENGTH)
def _try_emit(self, state, details):
# Submit the work to the executor to avoid blocking the kazoo threads
@ -334,10 +351,25 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
return len(self._known_jobs)
def _fetch_jobs(self, ensure_fresh=False):
if ensure_fresh:
self._force_refresh()
with self._job_cond:
return sorted(six.itervalues(self._known_jobs), reverse=True)
try:
last_state = self._last_states[0]
except IndexError:
last_state = None
if last_state in self.NO_FETCH_STATES:
# NOTE(harlowja): on lost clear out all known jobs (from the
# in-memory mapping) as we can not safely assume there are any
# jobs to continue working on in this state.
if last_state == k_states.KazooState.LOST and self._known_jobs:
# This will force the jobboard to drop all (in-memory) jobs
# that are not in this list (pretty much simulating what
# would happen if a jobboard data directory was emptied).
self._on_job_posting([], delayed=False)
return []
else:
if ensure_fresh:
self._force_refresh()
with self._job_cond:
return sorted(six.itervalues(self._known_jobs))
def _force_refresh(self):
try:
@ -364,12 +396,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
def _remove_job(self, path):
if path not in self._known_jobs:
return
return False
with self._job_cond:
job = self._known_jobs.pop(path, None)
if job is not None:
LOG.debug("Removed job that was at path '%s'", path)
self._try_emit(base.REMOVAL, details={'job': job})
return True
else:
return False
def _process_child(self, path, request, quiet=True):
"""Receives the result of a child data fetch request."""
@ -456,8 +491,13 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
investigate_paths.append(path)
if pending_removals:
with self._job_cond:
for path in pending_removals:
self._remove_job(path)
am_removed = 0
try:
for path in pending_removals:
am_removed += int(self._remove_job(path))
finally:
if am_removed:
self._job_cond.notify_all()
for path in investigate_paths:
# Fire off the request to populate this job.
#
@ -694,7 +734,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
kazoo_utils.checked_commit(txn)
def _state_change_listener(self, state):
LOG.debug("Kazoo client has changed to state: %s", state)
if self._last_states:
LOG.debug("Kazoo client has changed to"
" state '%s' from prior states '%s'", state,
self._last_states)
else:
LOG.debug("Kazoo client has changed to state '%s' (from"
" its initial/uninitialized state)", state)
self._last_states.appendleft(state)
if state == k_states.KazooState.LOST:
self._connected = False
LOG.warn("Connection to zookeeper has been lost")
elif state == k_states.KazooState.SUSPENDED:
LOG.warn("Connection to zookeeper has been suspended")
self._suspended = True
else:
# Must be CONNECTED then (as there are only 3 enums)
if self._suspended:
self._suspended = False
def wait(self, timeout=None):
# Wait until timeout expires (or forever) for jobs to appear.
@ -738,6 +795,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._known_jobs.clear()
LOG.debug("Stopped & cleared local state")
self._connected = False
self._last_states.clear()
@fasteners.locked(lock='_open_close_lock')
def connect(self, timeout=10.0):