Merge "Improve new scheduler"

This commit is contained in:
Zuul 2019-08-28 20:24:07 +00:00 committed by Gerrit Code Review
commit 2cdcb5415e
6 changed files with 59 additions and 24 deletions

View File

@ -301,12 +301,12 @@ def _check_affected_tasks(task):
# already been processed and the task state hasn't changed.
sched = sched_base.get_system_scheduler()
cnt = sched.get_scheduled_jobs_count(
jobs_exist = sched.has_scheduled_jobs(
key=_get_refresh_state_job_key(t_ex_id),
processing=False
)
if cnt == 0:
if not jobs_exist:
_schedule_refresh_task_state(t_ex_id)
for t_ex in affected_task_execs:

View File

@ -45,13 +45,13 @@ class Scheduler(object):
raise NotImplementedError
@abc.abstractmethod
def get_scheduled_jobs_count(self, **filters):
"""Returns the number of scheduled jobs.
def has_scheduled_jobs(self, **filters):
"""Returns True if there are scheduled jobs matching the given filter.
:param filters: Filters that define what kind of jobs
need to be counted. Permitted values:
* key=<string> - a key set for a job when it was scheduled.
* processing=<boolean> - if True, return only jobs that are
* processing=<boolean> - if True, count only jobs that are
currently being processed.
"""
raise NotImplementedError

View File

@ -83,6 +83,11 @@ class DefaultScheduler(base.Scheduler):
"Starting Scheduler Job Store checker [scheduler=%s]...", self
)
eventlet.sleep(
self._fixed_delay +
random.Random().randint(0, self._random_delay * 1000) * 0.001
)
try:
self._process_store_jobs()
except Exception:
@ -98,11 +103,6 @@ class DefaultScheduler(base.Scheduler):
if sys.version_info < (3,):
sys.exc_clear()
eventlet.sleep(
self._fixed_delay +
random.Random().randint(0, self._random_delay * 1000) * 0.001
)
def _process_store_jobs(self):
# Select and capture eligible jobs.
with db_api.transaction():
@ -125,20 +125,31 @@ class DefaultScheduler(base.Scheduler):
self._delete_scheduled_job(job)
def schedule(self, job, allow_redistribute=False):
scheduled_job = DefaultScheduler._persist_job(job)
scheduled_job = self._persist_job(job)
self._schedule_in_memory(job.run_after, scheduled_job)
def get_scheduled_jobs_count(self, **filters):
def has_scheduled_jobs(self, **filters):
# Checking in-memory jobs first.
for j in self.in_memory_jobs.values():
if filters and 'key' in filters and filters['key'] != j.key:
continue
if filters and 'processing' in filters:
if filters['processing'] is (j.captured_at is None):
continue
return True
if filters and 'processing' in filters:
processing = filters.pop('processing')
filters['captured_at'] = {'neq' if processing else 'eq': None}
return db_api.get_scheduled_jobs_count(**filters)
return db_api.get_scheduled_jobs_count(**filters) > 0
@classmethod
def _persist_job(cls, job):
@staticmethod
def _persist_job(job):
ctx_serializer = context.RpcContextSerializer()
ctx = (
@ -223,26 +234,32 @@ class DefaultScheduler(base.Scheduler):
:return: True if the job has been captured, False if not.
"""
now_sec = utils.utc_now_sec()
# Mark this job as captured in order to prevent calling from
# a parallel transaction. We don't use query filter
# {'captured_at': None} to account for a case when the job needs
# {'captured_at': None} to account for a case when the job needs
# to be recaptured after a maximum capture time has elapsed. If this
# method was called for job that has non-empty "captured_at" then
# method was called for a job that has non-empty "captured_at" then
# it means that it is already eligible for recapturing and the
# Job Store selected it.
_, updated_cnt = db_api.update_scheduled_job(
id=scheduled_job.id,
values={'captured_at': utils.utc_now_sec()},
values={'captured_at': now_sec},
query_filter={'captured_at': scheduled_job.captured_at}
)
# We need to update "captured_at" of the initial object stored in
# memory because it's used in a few places.
if updated_cnt == 1:
scheduled_job.captured_at = now_sec
# If updated_cnt != 1 then another scheduler
# has already updated it.
return updated_cnt == 1
@staticmethod
@db_utils.retry_on_db_error
def _delete_scheduled_job(scheduled_job):
def _delete_scheduled_job(self, scheduled_job):
db_api.delete_scheduled_job(scheduled_job.id)
@staticmethod

View File

@ -116,8 +116,8 @@ class LegacyScheduler(sched_base.Scheduler):
**job.func_args
)
def get_scheduled_jobs_count(self, **filters):
return db_api.get_delayed_calls_count(**filters)
def has_scheduled_jobs(self, **filters):
return db_api.get_delayed_calls_count(**filters) > 0
def start(self):
self._thread.start()

View File

@ -758,7 +758,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
sched = sched_base.get_system_scheduler()
self._await(lambda: sched.get_scheduled_jobs_count() == 0)
self._await(lambda: not sched.has_scheduled_jobs())
def test_delete_workflow_integrity_check_on_execution_delete(self):
wf_text = """---
@ -778,7 +778,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
sched = sched_base.get_system_scheduler()
self._await(lambda: sched.get_scheduled_jobs_count() == 0)
self._await(lambda: not sched.has_scheduled_jobs())
def test_output(self):
wf_text = """---

View File

@ -102,6 +102,24 @@ class DefaultSchedulerTest(base.DbTestCase):
self.assertEqual(1, len(scheduled_jobs))
self.assertTrue(self.scheduler.has_scheduled_jobs())
self.assertTrue(self.scheduler.has_scheduled_jobs(processing=True))
self.assertFalse(self.scheduler.has_scheduled_jobs(processing=False))
self.assertTrue(
self.scheduler.has_scheduled_jobs(key=None, processing=True)
)
self.assertFalse(
self.scheduler.has_scheduled_jobs(key=None, processing=False)
)
self.assertFalse(self.scheduler.has_scheduled_jobs(key='foobar'))
self.assertFalse(
self.scheduler.has_scheduled_jobs(key='foobar', processing=True)
)
self.assertFalse(
self.scheduler.has_scheduled_jobs(key='foobar', processing=False)
)
captured_at = scheduled_jobs[0].captured_at
self.assertIsNotNone(captured_at)