Ensure the fetching jobs does not fetch anything when in bad state
When the underlying connection is in LOST or SUSPENDED mode do not allow jobs to be iterated over (and clear the local cache when the connection has been LOST). Related-Bug: #1557107 Change-Id: Ic0a2ab2519ff8a7386d80d9092a0e24579883681
This commit is contained in:
parent
1bc8dd9bca
commit
35a9305f17
|
@ -14,6 +14,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import functools
|
||||
import sys
|
||||
|
@ -23,6 +24,7 @@ import fasteners
|
|||
import futurist
|
||||
from kazoo import exceptions as k_exceptions
|
||||
from kazoo.protocol import paths as k_paths
|
||||
from kazoo.protocol import states as k_states
|
||||
from kazoo.recipe import watchers
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import excutils
|
||||
|
@ -261,6 +263,19 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||
#: Default znode path used for jobs (data, locks...).
|
||||
DEFAULT_PATH = "/taskflow/jobs"
|
||||
|
||||
STATE_HISTORY_LENGTH = 2
|
||||
"""
|
||||
Number of prior state changes to keep a history of, mainly useful
|
||||
for history tracking and debugging connectivity issues.
|
||||
"""
|
||||
|
||||
NO_FETCH_STATES = (k_states.KazooState.LOST, k_states.KazooState.SUSPENDED)
|
||||
"""
|
||||
Client states underwhich we return empty lists from fetching routines,
|
||||
during these states the underlying connection either is being recovered
|
||||
or may be recovered (aka, it has not full disconnected).
|
||||
"""
|
||||
|
||||
def __init__(self, name, conf,
|
||||
client=None, persistence=None, emit_notifications=True):
|
||||
super(ZookeeperJobBoard, self).__init__(name, conf)
|
||||
|
@ -298,6 +313,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||
self._worker = None
|
||||
self._emit_notifications = bool(emit_notifications)
|
||||
self._connected = False
|
||||
self._suspended = False
|
||||
self._last_states = collections.deque(maxlen=self.STATE_HISTORY_LENGTH)
|
||||
|
||||
def _try_emit(self, state, details):
|
||||
# Submit the work to the executor to avoid blocking the kazoo threads
|
||||
|
@ -334,10 +351,25 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||
return len(self._known_jobs)
|
||||
|
||||
def _fetch_jobs(self, ensure_fresh=False):
|
||||
if ensure_fresh:
|
||||
self._force_refresh()
|
||||
with self._job_cond:
|
||||
return sorted(six.itervalues(self._known_jobs), reverse=True)
|
||||
try:
|
||||
last_state = self._last_states[0]
|
||||
except IndexError:
|
||||
last_state = None
|
||||
if last_state in self.NO_FETCH_STATES:
|
||||
# NOTE(harlowja): on lost clear out all known jobs (from the
|
||||
# in-memory mapping) as we can not safely assume there are any
|
||||
# jobs to continue working on in this state.
|
||||
if last_state == k_states.KazooState.LOST and self._known_jobs:
|
||||
# This will force the jobboard to drop all (in-memory) jobs
|
||||
# that are not in this list (pretty much simulating what
|
||||
# would happen if a jobboard data directory was emptied).
|
||||
self._on_job_posting([], delayed=False)
|
||||
return []
|
||||
else:
|
||||
if ensure_fresh:
|
||||
self._force_refresh()
|
||||
with self._job_cond:
|
||||
return sorted(six.itervalues(self._known_jobs))
|
||||
|
||||
def _force_refresh(self):
|
||||
try:
|
||||
|
@ -364,12 +396,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||
|
||||
def _remove_job(self, path):
|
||||
if path not in self._known_jobs:
|
||||
return
|
||||
return False
|
||||
with self._job_cond:
|
||||
job = self._known_jobs.pop(path, None)
|
||||
if job is not None:
|
||||
LOG.debug("Removed job that was at path '%s'", path)
|
||||
self._try_emit(base.REMOVAL, details={'job': job})
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _process_child(self, path, request, quiet=True):
|
||||
"""Receives the result of a child data fetch request."""
|
||||
|
@ -456,8 +491,13 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||
investigate_paths.append(path)
|
||||
if pending_removals:
|
||||
with self._job_cond:
|
||||
for path in pending_removals:
|
||||
self._remove_job(path)
|
||||
am_removed = 0
|
||||
try:
|
||||
for path in pending_removals:
|
||||
am_removed += int(self._remove_job(path))
|
||||
finally:
|
||||
if am_removed:
|
||||
self._job_cond.notify_all()
|
||||
for path in investigate_paths:
|
||||
# Fire off the request to populate this job.
|
||||
#
|
||||
|
@ -694,7 +734,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||
kazoo_utils.checked_commit(txn)
|
||||
|
||||
def _state_change_listener(self, state):
|
||||
LOG.debug("Kazoo client has changed to state: %s", state)
|
||||
if self._last_states:
|
||||
LOG.debug("Kazoo client has changed to"
|
||||
" state '%s' from prior states '%s'", state,
|
||||
self._last_states)
|
||||
else:
|
||||
LOG.debug("Kazoo client has changed to state '%s' (from"
|
||||
" its initial/uninitialized state)", state)
|
||||
self._last_states.appendleft(state)
|
||||
if state == k_states.KazooState.LOST:
|
||||
self._connected = False
|
||||
LOG.warn("Connection to zookeeper has been lost")
|
||||
elif state == k_states.KazooState.SUSPENDED:
|
||||
LOG.warn("Connection to zookeeper has been suspended")
|
||||
self._suspended = True
|
||||
else:
|
||||
# Must be CONNECTED then (as there are only 3 enums)
|
||||
if self._suspended:
|
||||
self._suspended = False
|
||||
|
||||
def wait(self, timeout=None):
|
||||
# Wait until timeout expires (or forever) for jobs to appear.
|
||||
|
@ -738,6 +795,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||
self._known_jobs.clear()
|
||||
LOG.debug("Stopped & cleared local state")
|
||||
self._connected = False
|
||||
self._last_states.clear()
|
||||
|
||||
@fasteners.locked(lock='_open_close_lock')
|
||||
def connect(self, timeout=10.0):
|
||||
|
|
Loading…
Reference in New Issue