New experimental scheduler: the first working version

This patch delivers the first working version of a distributed
scheduler implementation based on local and persistent job
queues. The idea is inspired by the parallel computing pattern
known as "Work stealing" although it doesn't fully repeat it
due to a nature of Mistral.
See https://en.wikipedia.org/wiki/Work_stealing for details.

Advantages of this scheduler implementation:

* It doesn't have job processing delays when a cluster topology'
  is stable caused by DB polling intervals. A job gets scheduled
  in memory and also saved into the persistent storage for
  reliability. A persistent job can be picked up only after a
  configured allowed period of time so that it happens effectively
  after a node responsible for local processing crashed.
* Low DB load. DB polling still exists but it's not a primary
  scheduling mechamisn now but rather a protection from node crash
  situations. That means that a polling interval can now be made
  large like 30 seconds, instead of 1-2 seconds. Less DB load
  leads to less DB deadlocks between scheduler instances and less
  retries on MySQL.
* Since DB load is now less it gives better scalability properties.
  A bigger number of engines won't now lead to much bigger
  contention because of a big DB polling intervals.
* Protection from having jobs forever hanging in processing state.
  In the existing implementation, if a scheduler captured a job
  for processing (set its "processing" flag to True) and then
  crashed then a job will be in processing state forever in the DB.
  Instead of a boolean "processing" flag, the new implementation
  uses a timestamp showing when a job was captured. That gives us
  the opportunity to make such jobs eligible for recapturing and
  further processing after a certain configured timeout.

TODO:

* More testing
* DB migration for the new scheduled jobs table
* Benchmarks and testing under load
* Standardize the scheduler interface and write an adapter for the
  existing scheduler so that we could choose between scheduler
  implementations. It's highly desired to make transition to the
  new scheduler smooth in production: we always need to be able
  to roll back to the existing scheduler.

Partial blueprint: mistral-redesign-scheduler
Partial blueprint: mistral-eliminate-scheduler-delays

Change-Id: If7d06b64ac14d01e80d31242e1640cb93f2aa6fe
This commit is contained in:
Renat Akhmerov 2018-07-19 11:51:02 +07:00
parent 952967a019
commit 7b71f096b9
14 changed files with 831 additions and 1 deletions

View File

@ -255,6 +255,34 @@ scheduler_opts = [
'If this property equals None then there is no '
'restriction on selection.'
)
),
cfg.FloatOpt(
'captured_job_timeout',
default=30,
min=1,
help=(
'Defines how soon (in seconds) a scheduled job captured for '
'processing becomes eligible for capturing by other schedulers '
'again. This option is needed to prevent situations when a '
'scheduler instance captured a job and failed while processing '
'and so this job can never be processed again because it is '
'marked as captured.'
)
),
cfg.FloatOpt(
'pickup_job_after',
default=60,
min=1,
help='Time period given to a scheduler to process a scheduled job '
'locally before it becomes eligible for processing by other '
'scheduler instances.'
'For example, a job needs to run at 12:00:00. When a scheduler '
'starts processing it it has 60 seconds (or other configured '
'value) to complete the job. If the scheduler did not complete '
'the job within this period it most likely means that the '
'scheduler process crashed. In this case another scheduler '
'instance will pick it up from the Job Store, but not earlier '
'than 12:01:00 and try to process it.'
)
]

View File

@ -219,6 +219,7 @@ class RpcContextSerializer(messaging.Serializer):
profiler.init(**trace_info)
ctx = MistralContext.from_dict(context)
set_ctx(ctx)
return ctx

View File

@ -382,6 +382,36 @@ def delete_delayed_calls(**kwargs):
return IMPL.delete_delayed_calls(**kwargs)
# Scheduled jobs.
def get_scheduled_jobs_to_start(time, batch_size=None):
return IMPL.get_scheduled_jobs_to_start(time, batch_size)
def create_scheduled_job(values):
return IMPL.create_scheduled_job(values)
def delete_scheduled_job(id):
return IMPL.delete_scheduled_job(id)
def update_scheduled_job(id, values, query_filter=None):
return IMPL.update_scheduled_job(id, values, query_filter)
def get_scheduled_job(id):
return IMPL.get_scheduled_job(id)
def get_scheduled_jobs(**kwargs):
return IMPL.get_scheduled_jobs(**kwargs)
def delete_scheduled_jobs(**kwargs):
return IMPL.delete_scheduled_jobs(**kwargs)
# Cron triggers.
def get_cron_trigger(identifier):

View File

@ -15,9 +15,11 @@
# limitations under the License.
import contextlib
import datetime
import sys
import threading
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_db import sqlalchemy as oslo_sqlalchemy
@ -1132,6 +1134,123 @@ def delete_delayed_calls(session=None, **kwargs):
return _delete_all(models.DelayedCall, **kwargs)
@b.session_aware()
def create_scheduled_job(values, session=None):
job = models.ScheduledJob()
job.update(values.copy())
try:
job.save(session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryError(
"Duplicate entry for ScheduledJob ID: {}".format(e.value)
)
return job
@b.session_aware()
def get_scheduled_jobs_to_start(time, batch_size=None, session=None):
query = b.model_query(models.ScheduledJob)
execute_at_col = models.ScheduledJob.execute_at
captured_at_col = models.ScheduledJob.captured_at
# Filter by execution time accounting for a configured job pickup interval.
query = query.filter(
execute_at_col <
time - datetime.timedelta(seconds=CONF.scheduler.pickup_job_after)
)
# Filter by captured time accounting for a configured captured job timeout.
min_captured_at = (
datetime.datetime.now() -
datetime.timedelta(seconds=CONF.scheduler.captured_job_timeout)
)
query = query.filter(
sa.or_(
captured_at_col == sa.null(),
captured_at_col <= min_captured_at
)
)
query = query.order_by(execute_at_col)
query = query.limit(batch_size)
return query.all()
@b.session_aware()
def update_scheduled_job(id, values, query_filter=None, session=None):
if query_filter:
try:
specimen = models.ScheduledJob(id=id, **query_filter)
job = b.model_query(
models.ScheduledJob
).update_on_match(
specimen=specimen,
surrogate_key='id',
values=values
)
return job, 1
except oslo_sqlalchemy.update_match.NoRowsMatched as e:
LOG.debug(
"No rows matched for update scheduled job [id=%s, values=%s, "
"query_filter=%s,"
"exception=%s]", id, values, query_filter, e
)
return None, 0
else:
job = get_scheduled_job(id=id, session=session)
job.update(values)
return job, len(session.dirty)
@b.session_aware()
def get_scheduled_job(id, session=None):
job = _get_db_object_by_id(models.ScheduledJob, id)
if not job:
raise exc.DBEntityNotFoundError(
"Scheduled job not found [id=%s]" % id
)
return job
@b.session_aware()
def delete_scheduled_job(id, session=None):
# It's safe to use insecure query here because users can't access
# scheduled job.
count = b.model_query(models.ScheduledJob).filter(
models.ScheduledJob.id == id).delete()
if count == 0:
raise exc.DBEntityNotFoundError(
"Scheduled job not found [id=%s]" % id
)
def get_scheduled_jobs(**kwargs):
return _get_collection(model=models.ScheduledJob, **kwargs)
@b.session_aware()
def delete_scheduled_jobs(session=None, **kwargs):
return _delete_all(models.ScheduledJob, **kwargs)
# Other functions.
@b.session_aware()
def get_expired_executions(expiration_time, limit=None, columns=(),
session=None):

View File

@ -395,6 +395,39 @@ sa.Index(
)
class ScheduledJob(mb.MistralModelBase):
"""Contains info about scheduled jobs."""
__tablename__ = 'scheduled_jobs_v2'
id = mb.id_column()
run_after = sa.Column(sa.Integer)
# The full name of the factory function that returns/builds a Python
# (target) object whose method should be called. Optional.
target_factory_func_name = sa.Column(sa.String(200), nullable=True)
# May take two different forms:
# 1. Full path of a target function that should be called. For example,
# "mistral.utils.random_sleep".
# 2. Name of a method to call on a target object, if
# "target_factory_func_name" is specified.
func_name = sa.Column(sa.String(80), nullable=False)
func_args = sa.Column(st.JsonDictType())
func_arg_serializers = sa.Column(st.JsonDictType())
auth_ctx = sa.Column(st.JsonDictType())
execute_at = sa.Column(sa.DateTime, nullable=False)
captured_at = sa.Column(sa.DateTime, nullable=True)
sa.Index(
'%s_execution_time' % ScheduledJob.__tablename__,
ScheduledJob.execute_at
)
class Environment(mb.MistralSecureModelBase):
"""Contains environment variables for workflow execution."""

View File

@ -51,6 +51,7 @@ class EngineServer(service_base.MistralService):
self._scheduler = scheduler.start()
self._expiration_policy_tg = expiration_policy.setup()
action_execution_checker.setup()
if self._setup_profiler:

View File

78
mistral/scheduler/base.py Normal file
View File

@ -0,0 +1,78 @@
# Copyright 2018 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Scheduler(object):
"""Scheduler interface.
Responsible for scheduling jobs to be executed at some point in future.
"""
@abc.abstractmethod
def schedule(self, job, allow_redistribute=False):
"""Schedules a delayed call to be invoked at some point in future.
:param job: Scheduler Job. An instance of :class:`SchedulerJob`.
:param allow_redistribute: If True then the method is allowed to
reroute the call to other Scheduler instances available in the
cluster.
"""
raise NotImplementedError
class SchedulerJob(object):
"""Scheduler job.
Encapsulates information about a command that needs to be executed
at some point in future.
"""
def __init__(self, run_after=0, target_factory_func_name=None,
func_name=None, func_args=None,
func_arg_serializers=None):
"""Initializes a Scheduler Job.
:param run_after: Amount of seconds after which to invoke
a scheduled call.
:param target_factory_func_name: Full path of a function that returns
a target object against which a method specified with the
"func_name" should be invoked. Optional. If None, then "func_name"
must be a full path of a static function to invoke.
:param func_name: Function or method name to invoke when a job gets
triggered.
:param func_args: Dictionary containing function/method argument names
and values as key-value pairs. A function/method specified with
the "func_name" argument will be invoked with these arguments.
:param func_arg_serializers: Dictionary containing function/method
argument names and serializers for argument values as key-value
pairs. Each serializer is a full path to a subclass of
:class:'mistral_lib.serialization.Serializer' that is capable
of serializing and deserializing of a corresponding argument value.
Optional. Serializers must be specified only for those arguments
whose values can't be saved into a persistent storage as is and
they need to be converted first into a value of a primitive type.
"""
if not func_name:
raise RuntimeError("'target_method_name' must be provided.")
self.run_after = run_after
self.target_factory_func_name = target_factory_func_name
self.func_name = func_name
self.func_args = func_args or {}
self.func_arg_serializers = func_arg_serializers

View File

@ -0,0 +1,291 @@
# Copyright 2018 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import eventlet
import random
import sys
import threading
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from mistral import context
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.scheduler import base
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class DefaultScheduler(base.Scheduler):
def __init__(self, fixed_delay, random_delay, batch_size):
self._fixed_delay = fixed_delay
self._random_delay = random_delay
self._batch_size = batch_size
# Dictionary containing {GreenThread: ScheduledJob} pairs that
# represent in-memory jobs.
self.memory_jobs = {}
self._job_store_checker_thread = threading.Thread(
target=self._job_store_checker
)
self._job_store_checker_thread.daemon = True
self._stopped = True
def start(self):
self._stopped = False
self._job_store_checker_thread.start()
def stop(self, graceful=False):
self._stopped = True
if graceful:
self._job_store_checker_thread.join()
def _job_store_checker(self):
while not self._stopped:
LOG.debug(
"Starting Scheduler Job Store checker [scheduler=%s]...", self
)
try:
self._process_store_jobs()
except Exception:
LOG.exception(
"Scheduler failed to process delayed calls"
" due to unexpected exception."
)
# For some mysterious reason (probably eventlet related)
# the exception is not cleared from the context automatically.
# This results in subsequent log.warning calls to show invalid
# info.
if sys.version_info < (3,):
sys.exc_clear()
eventlet.sleep(
self._fixed_delay +
random.Random().randint(0, self._random_delay * 1000) * 0.001
)
def _process_store_jobs(self):
# Select and capture eligible jobs.
with db_api.transaction():
candidate_jobs = db_api.get_scheduled_jobs_to_start(
datetime.datetime.now(),
self._batch_size
)
captured_jobs = [
job for job in candidate_jobs
if self._capture_scheduled_job(job)
]
# Invoke and delete scheduled jobs.
for job in captured_jobs:
auth_ctx, func, func_args = self._prepare_job(job)
self._invoke_job(auth_ctx, func, func_args)
self._delete_scheduled_job(job)
def schedule(self, job, allow_redistribute=False):
scheduled_job = DefaultScheduler._persist_job(job)
self._schedule_in_memory(job.run_after, scheduled_job)
@classmethod
def _persist_job(cls, job):
ctx_serializer = context.RpcContextSerializer()
ctx = (
ctx_serializer.serialize_context(context.ctx())
if context.has_ctx() else {}
)
execute_at = (datetime.datetime.now() +
datetime.timedelta(seconds=job.run_after))
args = job.func_args
arg_serializers = job.func_arg_serializers
if arg_serializers:
for arg_name, serializer_path in arg_serializers.items():
if arg_name not in args:
raise exc.MistralException(
"Serializable function argument %s"
" not found in func_args=%s"
% (arg_name, args))
try:
serializer = importutils.import_class(serializer_path)()
except ImportError as e:
raise ImportError(
"Cannot import class %s: %s" % (serializer_path, e)
)
args[arg_name] = serializer.serialize(args[arg_name])
values = {
'run_after': job.run_after,
'target_factory_func_name': job.target_factory_func_name,
'func_name': job.func_name,
'func_args': args,
'func_arg_serializers': arg_serializers,
'auth_ctx': ctx,
'execute_at': execute_at,
'captured_at': None
}
return db_api.create_scheduled_job(values)
def _schedule_in_memory(self, run_after, scheduled_job):
green_thread = eventlet.spawn_after(
run_after,
self._process_memory_job,
scheduled_job
)
self.memory_jobs[green_thread] = scheduled_job
def _process_memory_job(self, scheduled_job):
# 1. Capture the job in Job Store.
if not self._capture_scheduled_job(scheduled_job):
LOG.warning(
"Unable to capture a scheduled job [scheduled_job=%s]",
scheduled_job
)
return
# 2. Invoke the target function.
auth_ctx, func, func_args = self._prepare_job(scheduled_job)
self._invoke_job(auth_ctx, func, func_args)
self._delete_scheduled_job(scheduled_job)
# 3. Delete the job from Job Store, if success.
# TODO(rakhmerov):
# 3.1 What do we do if invocation wasn't successful?
# Delete from a local collection of in-memory jobs.
del self.memory_jobs[eventlet.getcurrent()]
@staticmethod
def _capture_scheduled_job(scheduled_job):
"""Capture a scheduled persistent job in a job store.
:param scheduled_job: Job.
:return: True if the job has been captured, False if not.
"""
# Mark this job as captured in order to prevent calling from
# parallel a transaction. We don't use query filter
# {'captured_at': None} to account for a case when the job needs
# to be recaptured after a maximum capture time has elapsed. If this
# method was called for job that has non-empty "captured_at" then
# it means that it is already eligible for recapturing and the
# Job Store selected it.
_, updated_cnt = db_api.update_scheduled_job(
id=scheduled_job.id,
values={'captured_at': datetime.datetime.now()},
query_filter={'captured_at': scheduled_job.captured_at}
)
# If updated_cnt != 1 then another scheduler
# has already updated it.
return updated_cnt == 1
@staticmethod
@db_utils.retry_on_db_error
def _delete_scheduled_job(scheduled_job):
db_api.delete_scheduled_job(scheduled_job.id)
@staticmethod
def _prepare_job(scheduled_job):
"""Prepares a scheduled job for invocation.
To make an invocation of a delayed call it needs to be prepared for
further usage, we need to reconstruct a final target func
and deserialize arguments, if needed.
:param scheduled_job: Persistent scheduled job.
:return: A tuple (auth_ctx, func, args) where all data is properly
deserialized.
"""
LOG.debug(
'Preparing a scheduled job. [ID=%s, target_factory_func_name=%s,'
' func_name=%s, func_args=%s]',
scheduled_job.id,
scheduled_job.target_factory_func_name,
scheduled_job.func_name,
scheduled_job.func_args
)
auth_ctx = copy.deepcopy(scheduled_job.auth_ctx)
if scheduled_job.target_factory_func_name:
factory = importutils.import_class(
scheduled_job.target_factory_func_name
)
func = getattr(factory(), scheduled_job.func_name)
else:
func = importutils.import_class(scheduled_job.func_name)
args = copy.deepcopy(scheduled_job.func_args)
serializers_dict = scheduled_job.func_arg_serializers
if serializers_dict:
# Deserialize arguments.
for arg_name, ser_path in serializers_dict.items():
serializer = importutils.import_class(ser_path)()
deserialized = serializer.deserialize(args[arg_name])
args[arg_name] = deserialized
return auth_ctx, func, args
@staticmethod
def _invoke_job(auth_ctx, func, args):
ctx_serializer = context.RpcContextSerializer()
try:
# Set the correct context for the function.
ctx_serializer.deserialize_context(auth_ctx)
# Invoke the function.
func(**args)
except Exception as e:
LOG.exception(
"Scheduled job failed, method: %s, exception: %s",
func,
e
)
finally:
# Remove context.
context.set_ctx(None)

View File

@ -0,0 +1,68 @@
# Copyright 2018 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from mistral.rpc import base as rpc
from mistral.service import base as service_base
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class SchedulerServer(service_base.MistralService):
"""Scheduler server.
Manages scheduler life-cycle and gets registered as an RPC
endpoint to process scheduler specific calls.
"""
def __init__(self, scheduler, setup_profiler=True):
super(SchedulerServer, self).__init__(
'scheduler_group',
setup_profiler
)
self.scheduler = scheduler
self._rpc_server = None
def start(self):
super(SchedulerServer, self).start()
self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine)
self._rpc_server.register_endpoint(self)
self._rpc_server.run()
self._notify_started('Scheduler server started.')
def stop(self, graceful=False):
super(SchedulerServer, self).stop()
if self._rpc_server:
self._rpc_server.stop(graceful)
def schedule(self, rpc_ctx, job):
"""Receives requests over RPC to schedule delayed calls.
:param rpc_ctx: RPC request context.
:param job: Scheduler job.
"""
LOG.info("Received RPC request 'schedule'[job=%s]", job)
return self.scheduler.schedule(job, allow_redistribute=False)

View File

@ -203,7 +203,7 @@ class BaseTest(base.BaseTestCase):
If within a configured timeout predicate function hasn't evaluated
to True then an exception is raised.
:param predicate: Predication function.
:param predicate: Predicate function.
:param delay: Delay in seconds between predicate function calls.
:param timeout: Maximum amount of time to wait for predication
function to evaluate to True.
@ -302,6 +302,7 @@ class DbTestCase(BaseTest):
db_api.delete_environments()
db_api.delete_resource_members()
db_api.delete_delayed_calls()
db_api.delete_scheduled_jobs()
sqlite_lock.cleanup()

View File

View File

@ -0,0 +1,176 @@
# Copyright 2018 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import event
from eventlet import semaphore
from eventlet import timeout
import datetime
import mock
from mistral.db.v2 import api as db_api
from mistral.scheduler import base as scheduler_base
from mistral.scheduler import default_scheduler
from mistral.tests.unit import base
TARGET_METHOD_PATH = (
'mistral.tests.unit.scheduler.test_scheduler.target_method'
)
def target_method():
pass
class SchedulerTest(base.DbTestCase):
def setUp(self):
super(SchedulerTest, self).setUp()
# This Timeout object is needed to raise an exception if the test took
# longer than a configured number of seconds.
self.timeout = timeout.Timeout(seconds=15)
# Synchronization primitives to control when a scheduled invoked
# method is allowed to enter the method and exit from it to perform
# all needed checks.
self.target_mtd_started = event.Event()
self.target_mtd_finished = event.Event()
self.target_mtd_lock = semaphore.Semaphore(0)
self.scheduler = default_scheduler.DefaultScheduler(1, 1, 100)
self.scheduler.start()
self.addCleanup(self.scheduler.stop, True)
self.addCleanup(self.timeout.cancel)
def target_method(self, *args, **kwargs):
self.target_mtd_started.send()
self.target_mtd_lock.acquire()
# Note: Potentially we can do something else here. No-op for now.
self.target_mtd_finished.send()
def _wait_target_method_start(self):
self.target_mtd_started.wait()
def _unlock_target_method(self):
self.target_mtd_lock.release()
def _wait_target_method_end(self):
self.target_mtd_finished.wait()
@mock.patch(TARGET_METHOD_PATH)
def test_schedule_called_once(self, method):
method.side_effect = self.target_method
job = scheduler_base.SchedulerJob(
run_after=1,
func_name=TARGET_METHOD_PATH,
func_args={'name': 'task', 'id': '321'}
)
self.scheduler.schedule(job)
self._wait_target_method_start()
# Check that the persistent job has been created and captured.
scheduled_jobs = db_api.get_scheduled_jobs()
self.assertEqual(1, len(scheduled_jobs))
captured_at = scheduled_jobs[0].captured_at
self.assertIsNotNone(captured_at)
self.assertTrue(
datetime.datetime.now() - captured_at <
datetime.timedelta(seconds=3)
)
self._unlock_target_method()
self._wait_target_method_end()
method.assert_called_once_with(name='task', id='321')
# After the job is processed the persistent object must be deleted.
self._await(lambda: not db_api.get_scheduled_jobs())
@mock.patch(TARGET_METHOD_PATH)
def test_pickup_from_job_store(self, method):
method.side_effect = self.target_method
self.override_config('pickup_job_after', 1, 'scheduler')
# 1. Create a scheduled job in Job Store.
execute_at = datetime.datetime.now() + datetime.timedelta(seconds=1)
db_api.create_scheduled_job({
'run_after': 1,
'func_name': TARGET_METHOD_PATH,
'func_args': {'name': 'task', 'id': '321'},
'execute_at': execute_at,
'captured_at': None,
'auth_ctx': {}
})
self.assertTrue(len(db_api.get_scheduled_jobs()) > 0)
self._unlock_target_method()
self._wait_target_method_end()
# 2. Wait till Scheduler picks up the job and processes it.
self._await(lambda: not db_api.get_scheduled_jobs())
method.assert_called_once_with(name='task', id='321')
@mock.patch(TARGET_METHOD_PATH)
def test_recapture_job(self, method):
method.side_effect = self.target_method
self.override_config('pickup_job_after', 1, 'scheduler')
self.override_config('captured_job_timeout', 3, 'scheduler')
# 1. Create a scheduled job in Job Store marked as captured in one
# second in the future. It can be captured again only after 3
# seconds after that according to the config option.
captured_at = datetime.datetime.now() + datetime.timedelta(seconds=1)
before_ts = datetime.datetime.now()
db_api.create_scheduled_job({
'run_after': 1,
'func_name': TARGET_METHOD_PATH,
'func_args': {'name': 'task', 'id': '321'},
'execute_at': datetime.datetime.now(),
'captured_at': captured_at,
'auth_ctx': {}
})
self.assertTrue(len(db_api.get_scheduled_jobs()) > 0)
self._unlock_target_method()
self._wait_target_method_end()
# 2. Wait till Scheduler picks up the job and processes it.
self._await(lambda: not db_api.get_scheduled_jobs())
method.assert_called_once_with(name='task', id='321')
# At least 3 seconds should have passed.
self.assertTrue(
datetime.datetime.now() - before_ts >=
datetime.timedelta(seconds=3)
)

View File

@ -41,6 +41,10 @@ from mistral import exceptions as exc
# Thread local storage.
_th_loc_storage = threading.local()
# TODO(rakhmerov): these two constants are misplaced. Utility methods
# should not be Mistral specific. They should be generic enough so to
# be moved to any other project w/o changes.
ACTION_TASK_TYPE = 'ACTION'
WORKFLOW_TASK_TYPE = 'WORKFLOW'