diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py index 772cab2ac..f8d624e9e 100644 --- a/taskflow/conductors/backends/impl_executor.py +++ b/taskflow/conductors/backends/impl_executor.py @@ -24,6 +24,7 @@ except ImportError: from debtcollector import removals from oslo_utils import excutils +from oslo_utils import timeutils import six from taskflow.conductors import base @@ -63,6 +64,13 @@ class ExecutorConductor(base.Conductor): level logger will be used instead). """ + REFRESH_PERIODICITY = 30 + """ + Every 30 seconds the jobboard will be resynced (if for some reason + a watch or set of watches was not received) using the `ensure_fresh` + option to ensure this (for supporting jobboard backends only). + """ + #: Default timeout used to idle/wait when no jobs have been found. WAIT_TIMEOUT = 0.5 @@ -274,10 +282,20 @@ class ExecutorConductor(base.Conductor): # Don't even do any work in the first place... if max_dispatches == 0: raise StopIteration + fresh_period = timeutils.StopWatch( + duration=self.REFRESH_PERIODICITY) + fresh_period.start() while not is_stopped(): any_dispatched = False - for job in itertools.takewhile(self._can_claim_more_jobs, - self._jobboard.iterjobs()): + if fresh_period.expired(): + ensure_fresh = True + fresh_period.restart() + else: + ensure_fresh = False + job_it = itertools.takewhile( + self._can_claim_more_jobs, + self._jobboard.iterjobs(ensure_fresh=ensure_fresh)) + for job in job_it: self._log.debug("Trying to claim job: %s", job) try: self._jobboard.claim(job, self._name)