diff --git a/mistral/config.py b/mistral/config.py index b480223df..e5785bf78 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -255,6 +255,34 @@ scheduler_opts = [ 'If this property equals None then there is no ' 'restriction on selection.' ) + ), + cfg.FloatOpt( + 'captured_job_timeout', + default=30, + min=1, + help=( + 'Defines how soon (in seconds) a scheduled job captured for ' + 'processing becomes eligible for capturing by other schedulers ' + 'again. This option is needed to prevent situations when a ' + 'scheduler instance captured a job and failed while processing ' + 'and so this job can never be processed again because it is ' + 'marked as captured.' + ) + ), + cfg.FloatOpt( + 'pickup_job_after', + default=60, + min=1, + help='Time period given to a scheduler to process a scheduled job ' + 'locally before it becomes eligible for processing by other ' + 'scheduler instances.' + 'For example, a job needs to run at 12:00:00. When a scheduler ' + 'starts processing it it has 60 seconds (or other configured ' + 'value) to complete the job. If the scheduler did not complete ' + 'the job within this period it most likely means that the ' + 'scheduler process crashed. In this case another scheduler ' + 'instance will pick it up from the Job Store, but not earlier ' + 'than 12:01:00 and try to process it.' ) ] diff --git a/mistral/context.py b/mistral/context.py index 28e2a68fb..fceb6cafd 100644 --- a/mistral/context.py +++ b/mistral/context.py @@ -219,6 +219,7 @@ class RpcContextSerializer(messaging.Serializer): profiler.init(**trace_info) ctx = MistralContext.from_dict(context) + set_ctx(ctx) return ctx diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 477914003..de1d0b78c 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -382,6 +382,36 @@ def delete_delayed_calls(**kwargs): return IMPL.delete_delayed_calls(**kwargs) +# Scheduled jobs. + +def get_scheduled_jobs_to_start(time, batch_size=None): + return IMPL.get_scheduled_jobs_to_start(time, batch_size) + + +def create_scheduled_job(values): + return IMPL.create_scheduled_job(values) + + +def delete_scheduled_job(id): + return IMPL.delete_scheduled_job(id) + + +def update_scheduled_job(id, values, query_filter=None): + return IMPL.update_scheduled_job(id, values, query_filter) + + +def get_scheduled_job(id): + return IMPL.get_scheduled_job(id) + + +def get_scheduled_jobs(**kwargs): + return IMPL.get_scheduled_jobs(**kwargs) + + +def delete_scheduled_jobs(**kwargs): + return IMPL.delete_scheduled_jobs(**kwargs) + + # Cron triggers. def get_cron_trigger(identifier): diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index dddf7bb01..671d5f51f 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -15,9 +15,11 @@ # limitations under the License. import contextlib +import datetime import sys import threading + from oslo_config import cfg from oslo_db import exception as db_exc from oslo_db import sqlalchemy as oslo_sqlalchemy @@ -1132,6 +1134,123 @@ def delete_delayed_calls(session=None, **kwargs): return _delete_all(models.DelayedCall, **kwargs) +@b.session_aware() +def create_scheduled_job(values, session=None): + job = models.ScheduledJob() + + job.update(values.copy()) + + try: + job.save(session) + except db_exc.DBDuplicateEntry as e: + raise exc.DBDuplicateEntryError( + "Duplicate entry for ScheduledJob ID: {}".format(e.value) + ) + + return job + + +@b.session_aware() +def get_scheduled_jobs_to_start(time, batch_size=None, session=None): + query = b.model_query(models.ScheduledJob) + + execute_at_col = models.ScheduledJob.execute_at + captured_at_col = models.ScheduledJob.captured_at + + # Filter by execution time accounting for a configured job pickup interval. + query = query.filter( + execute_at_col < + time - datetime.timedelta(seconds=CONF.scheduler.pickup_job_after) + ) + + # Filter by captured time accounting for a configured captured job timeout. + min_captured_at = ( + datetime.datetime.now() - + datetime.timedelta(seconds=CONF.scheduler.captured_job_timeout) + ) + + query = query.filter( + sa.or_( + captured_at_col == sa.null(), + captured_at_col <= min_captured_at + ) + ) + + query = query.order_by(execute_at_col) + query = query.limit(batch_size) + + return query.all() + + +@b.session_aware() +def update_scheduled_job(id, values, query_filter=None, session=None): + if query_filter: + try: + specimen = models.ScheduledJob(id=id, **query_filter) + + job = b.model_query( + models.ScheduledJob + ).update_on_match( + specimen=specimen, + surrogate_key='id', + values=values + ) + + return job, 1 + + except oslo_sqlalchemy.update_match.NoRowsMatched as e: + LOG.debug( + "No rows matched for update scheduled job [id=%s, values=%s, " + "query_filter=%s," + "exception=%s]", id, values, query_filter, e + ) + + return None, 0 + + else: + job = get_scheduled_job(id=id, session=session) + + job.update(values) + + return job, len(session.dirty) + + +@b.session_aware() +def get_scheduled_job(id, session=None): + job = _get_db_object_by_id(models.ScheduledJob, id) + + if not job: + raise exc.DBEntityNotFoundError( + "Scheduled job not found [id=%s]" % id + ) + + return job + + +@b.session_aware() +def delete_scheduled_job(id, session=None): + # It's safe to use insecure query here because users can't access + # scheduled job. + count = b.model_query(models.ScheduledJob).filter( + models.ScheduledJob.id == id).delete() + + if count == 0: + raise exc.DBEntityNotFoundError( + "Scheduled job not found [id=%s]" % id + ) + + +def get_scheduled_jobs(**kwargs): + return _get_collection(model=models.ScheduledJob, **kwargs) + + +@b.session_aware() +def delete_scheduled_jobs(session=None, **kwargs): + return _delete_all(models.ScheduledJob, **kwargs) + + +# Other functions. + @b.session_aware() def get_expired_executions(expiration_time, limit=None, columns=(), session=None): diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index 471e339e3..b6d22512e 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -395,6 +395,39 @@ sa.Index( ) +class ScheduledJob(mb.MistralModelBase): + """Contains info about scheduled jobs.""" + + __tablename__ = 'scheduled_jobs_v2' + + id = mb.id_column() + + run_after = sa.Column(sa.Integer) + + # The full name of the factory function that returns/builds a Python + # (target) object whose method should be called. Optional. + target_factory_func_name = sa.Column(sa.String(200), nullable=True) + + # May take two different forms: + # 1. Full path of a target function that should be called. For example, + # "mistral.utils.random_sleep". + # 2. Name of a method to call on a target object, if + # "target_factory_func_name" is specified. + func_name = sa.Column(sa.String(80), nullable=False) + + func_args = sa.Column(st.JsonDictType()) + func_arg_serializers = sa.Column(st.JsonDictType()) + auth_ctx = sa.Column(st.JsonDictType()) + execute_at = sa.Column(sa.DateTime, nullable=False) + captured_at = sa.Column(sa.DateTime, nullable=True) + + +sa.Index( + '%s_execution_time' % ScheduledJob.__tablename__, + ScheduledJob.execute_at +) + + class Environment(mb.MistralSecureModelBase): """Contains environment variables for workflow execution.""" diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py index c93836898..db8e20321 100644 --- a/mistral/engine/engine_server.py +++ b/mistral/engine/engine_server.py @@ -51,6 +51,7 @@ class EngineServer(service_base.MistralService): self._scheduler = scheduler.start() self._expiration_policy_tg = expiration_policy.setup() + action_execution_checker.setup() if self._setup_profiler: diff --git a/mistral/scheduler/__init__.py b/mistral/scheduler/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/scheduler/base.py b/mistral/scheduler/base.py new file mode 100644 index 000000000..ed0dc459a --- /dev/null +++ b/mistral/scheduler/base.py @@ -0,0 +1,78 @@ +# Copyright 2018 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class Scheduler(object): + """Scheduler interface. + + Responsible for scheduling jobs to be executed at some point in future. + """ + + @abc.abstractmethod + def schedule(self, job, allow_redistribute=False): + """Schedules a delayed call to be invoked at some point in future. + + :param job: Scheduler Job. An instance of :class:`SchedulerJob`. + :param allow_redistribute: If True then the method is allowed to + reroute the call to other Scheduler instances available in the + cluster. + """ + raise NotImplementedError + + +class SchedulerJob(object): + """Scheduler job. + + Encapsulates information about a command that needs to be executed + at some point in future. + """ + def __init__(self, run_after=0, target_factory_func_name=None, + func_name=None, func_args=None, + func_arg_serializers=None): + """Initializes a Scheduler Job. + + :param run_after: Amount of seconds after which to invoke + a scheduled call. + :param target_factory_func_name: Full path of a function that returns + a target object against which a method specified with the + "func_name" should be invoked. Optional. If None, then "func_name" + must be a full path of a static function to invoke. + :param func_name: Function or method name to invoke when a job gets + triggered. + :param func_args: Dictionary containing function/method argument names + and values as key-value pairs. A function/method specified with + the "func_name" argument will be invoked with these arguments. + :param func_arg_serializers: Dictionary containing function/method + argument names and serializers for argument values as key-value + pairs. Each serializer is a full path to a subclass of + :class:'mistral_lib.serialization.Serializer' that is capable + of serializing and deserializing of a corresponding argument value. + Optional. Serializers must be specified only for those arguments + whose values can't be saved into a persistent storage as is and + they need to be converted first into a value of a primitive type. + + """ + + if not func_name: + raise RuntimeError("'target_method_name' must be provided.") + + self.run_after = run_after + self.target_factory_func_name = target_factory_func_name + self.func_name = func_name + self.func_args = func_args or {} + self.func_arg_serializers = func_arg_serializers diff --git a/mistral/scheduler/default_scheduler.py b/mistral/scheduler/default_scheduler.py new file mode 100644 index 000000000..21db17162 --- /dev/null +++ b/mistral/scheduler/default_scheduler.py @@ -0,0 +1,291 @@ +# Copyright 2018 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import datetime +import eventlet +import random +import sys +import threading + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import importutils + +from mistral import context +from mistral.db import utils as db_utils +from mistral.db.v2 import api as db_api +from mistral import exceptions as exc +from mistral.scheduler import base + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + + +class DefaultScheduler(base.Scheduler): + def __init__(self, fixed_delay, random_delay, batch_size): + self._fixed_delay = fixed_delay + self._random_delay = random_delay + self._batch_size = batch_size + + # Dictionary containing {GreenThread: ScheduledJob} pairs that + # represent in-memory jobs. + self.memory_jobs = {} + + self._job_store_checker_thread = threading.Thread( + target=self._job_store_checker + ) + self._job_store_checker_thread.daemon = True + + self._stopped = True + + def start(self): + self._stopped = False + + self._job_store_checker_thread.start() + + def stop(self, graceful=False): + self._stopped = True + + if graceful: + self._job_store_checker_thread.join() + + def _job_store_checker(self): + while not self._stopped: + LOG.debug( + "Starting Scheduler Job Store checker [scheduler=%s]...", self + ) + + try: + self._process_store_jobs() + except Exception: + LOG.exception( + "Scheduler failed to process delayed calls" + " due to unexpected exception." + ) + + # For some mysterious reason (probably eventlet related) + # the exception is not cleared from the context automatically. + # This results in subsequent log.warning calls to show invalid + # info. + if sys.version_info < (3,): + sys.exc_clear() + + eventlet.sleep( + self._fixed_delay + + random.Random().randint(0, self._random_delay * 1000) * 0.001 + ) + + def _process_store_jobs(self): + # Select and capture eligible jobs. + with db_api.transaction(): + candidate_jobs = db_api.get_scheduled_jobs_to_start( + datetime.datetime.now(), + self._batch_size + ) + + captured_jobs = [ + job for job in candidate_jobs + if self._capture_scheduled_job(job) + ] + + # Invoke and delete scheduled jobs. + for job in captured_jobs: + auth_ctx, func, func_args = self._prepare_job(job) + + self._invoke_job(auth_ctx, func, func_args) + + self._delete_scheduled_job(job) + + def schedule(self, job, allow_redistribute=False): + scheduled_job = DefaultScheduler._persist_job(job) + + self._schedule_in_memory(job.run_after, scheduled_job) + + @classmethod + def _persist_job(cls, job): + ctx_serializer = context.RpcContextSerializer() + + ctx = ( + ctx_serializer.serialize_context(context.ctx()) + if context.has_ctx() else {} + ) + + execute_at = (datetime.datetime.now() + + datetime.timedelta(seconds=job.run_after)) + + args = job.func_args + arg_serializers = job.func_arg_serializers + + if arg_serializers: + for arg_name, serializer_path in arg_serializers.items(): + if arg_name not in args: + raise exc.MistralException( + "Serializable function argument %s" + " not found in func_args=%s" + % (arg_name, args)) + try: + serializer = importutils.import_class(serializer_path)() + except ImportError as e: + raise ImportError( + "Cannot import class %s: %s" % (serializer_path, e) + ) + + args[arg_name] = serializer.serialize(args[arg_name]) + + values = { + 'run_after': job.run_after, + 'target_factory_func_name': job.target_factory_func_name, + 'func_name': job.func_name, + 'func_args': args, + 'func_arg_serializers': arg_serializers, + 'auth_ctx': ctx, + 'execute_at': execute_at, + 'captured_at': None + } + + return db_api.create_scheduled_job(values) + + def _schedule_in_memory(self, run_after, scheduled_job): + green_thread = eventlet.spawn_after( + run_after, + self._process_memory_job, + scheduled_job + ) + + self.memory_jobs[green_thread] = scheduled_job + + def _process_memory_job(self, scheduled_job): + # 1. Capture the job in Job Store. + if not self._capture_scheduled_job(scheduled_job): + LOG.warning( + "Unable to capture a scheduled job [scheduled_job=%s]", + scheduled_job + ) + + return + + # 2. Invoke the target function. + auth_ctx, func, func_args = self._prepare_job(scheduled_job) + + self._invoke_job(auth_ctx, func, func_args) + + self._delete_scheduled_job(scheduled_job) + + # 3. Delete the job from Job Store, if success. + # TODO(rakhmerov): + # 3.1 What do we do if invocation wasn't successful? + + # Delete from a local collection of in-memory jobs. + del self.memory_jobs[eventlet.getcurrent()] + + @staticmethod + def _capture_scheduled_job(scheduled_job): + """Capture a scheduled persistent job in a job store. + + :param scheduled_job: Job. + :return: True if the job has been captured, False if not. + """ + + # Mark this job as captured in order to prevent calling from + # parallel a transaction. We don't use query filter + # {'captured_at': None} to account for a case when the job needs + # to be recaptured after a maximum capture time has elapsed. If this + # method was called for job that has non-empty "captured_at" then + # it means that it is already eligible for recapturing and the + # Job Store selected it. + _, updated_cnt = db_api.update_scheduled_job( + id=scheduled_job.id, + values={'captured_at': datetime.datetime.now()}, + query_filter={'captured_at': scheduled_job.captured_at} + ) + + # If updated_cnt != 1 then another scheduler + # has already updated it. + return updated_cnt == 1 + + @staticmethod + @db_utils.retry_on_db_error + def _delete_scheduled_job(scheduled_job): + db_api.delete_scheduled_job(scheduled_job.id) + + @staticmethod + def _prepare_job(scheduled_job): + """Prepares a scheduled job for invocation. + + To make an invocation of a delayed call it needs to be prepared for + further usage, we need to reconstruct a final target func + and deserialize arguments, if needed. + + :param scheduled_job: Persistent scheduled job. + :return: A tuple (auth_ctx, func, args) where all data is properly + deserialized. + """ + + LOG.debug( + 'Preparing a scheduled job. [ID=%s, target_factory_func_name=%s,' + ' func_name=%s, func_args=%s]', + scheduled_job.id, + scheduled_job.target_factory_func_name, + scheduled_job.func_name, + scheduled_job.func_args + ) + + auth_ctx = copy.deepcopy(scheduled_job.auth_ctx) + + if scheduled_job.target_factory_func_name: + factory = importutils.import_class( + scheduled_job.target_factory_func_name + ) + + func = getattr(factory(), scheduled_job.func_name) + else: + func = importutils.import_class(scheduled_job.func_name) + + args = copy.deepcopy(scheduled_job.func_args) + + serializers_dict = scheduled_job.func_arg_serializers + + if serializers_dict: + # Deserialize arguments. + for arg_name, ser_path in serializers_dict.items(): + serializer = importutils.import_class(ser_path)() + + deserialized = serializer.deserialize(args[arg_name]) + + args[arg_name] = deserialized + + return auth_ctx, func, args + + @staticmethod + def _invoke_job(auth_ctx, func, args): + ctx_serializer = context.RpcContextSerializer() + + try: + # Set the correct context for the function. + ctx_serializer.deserialize_context(auth_ctx) + + # Invoke the function. + func(**args) + except Exception as e: + LOG.exception( + "Scheduled job failed, method: %s, exception: %s", + func, + e + ) + finally: + # Remove context. + context.set_ctx(None) diff --git a/mistral/scheduler/scheduler_server.py b/mistral/scheduler/scheduler_server.py new file mode 100644 index 000000000..64abae286 --- /dev/null +++ b/mistral/scheduler/scheduler_server.py @@ -0,0 +1,68 @@ +# Copyright 2018 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from oslo_config import cfg +from oslo_log import log as logging + +from mistral.rpc import base as rpc +from mistral.service import base as service_base + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + + +class SchedulerServer(service_base.MistralService): + """Scheduler server. + + Manages scheduler life-cycle and gets registered as an RPC + endpoint to process scheduler specific calls. + """ + + def __init__(self, scheduler, setup_profiler=True): + super(SchedulerServer, self).__init__( + 'scheduler_group', + setup_profiler + ) + + self.scheduler = scheduler + self._rpc_server = None + + def start(self): + super(SchedulerServer, self).start() + + self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine) + self._rpc_server.register_endpoint(self) + + self._rpc_server.run() + + self._notify_started('Scheduler server started.') + + def stop(self, graceful=False): + super(SchedulerServer, self).stop() + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def schedule(self, rpc_ctx, job): + """Receives requests over RPC to schedule delayed calls. + + :param rpc_ctx: RPC request context. + :param job: Scheduler job. + """ + LOG.info("Received RPC request 'schedule'[job=%s]", job) + + return self.scheduler.schedule(job, allow_redistribute=False) diff --git a/mistral/tests/unit/base.py b/mistral/tests/unit/base.py index 9faeff286..677002033 100644 --- a/mistral/tests/unit/base.py +++ b/mistral/tests/unit/base.py @@ -203,7 +203,7 @@ class BaseTest(base.BaseTestCase): If within a configured timeout predicate function hasn't evaluated to True then an exception is raised. - :param predicate: Predication function. + :param predicate: Predicate function. :param delay: Delay in seconds between predicate function calls. :param timeout: Maximum amount of time to wait for predication function to evaluate to True. @@ -302,6 +302,7 @@ class DbTestCase(BaseTest): db_api.delete_environments() db_api.delete_resource_members() db_api.delete_delayed_calls() + db_api.delete_scheduled_jobs() sqlite_lock.cleanup() diff --git a/mistral/tests/unit/scheduler/__init__.py b/mistral/tests/unit/scheduler/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/tests/unit/scheduler/test_scheduler.py b/mistral/tests/unit/scheduler/test_scheduler.py new file mode 100644 index 000000000..07d591ac5 --- /dev/null +++ b/mistral/tests/unit/scheduler/test_scheduler.py @@ -0,0 +1,176 @@ +# Copyright 2018 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from eventlet import event +from eventlet import semaphore +from eventlet import timeout + +import datetime +import mock + +from mistral.db.v2 import api as db_api +from mistral.scheduler import base as scheduler_base +from mistral.scheduler import default_scheduler +from mistral.tests.unit import base + + +TARGET_METHOD_PATH = ( + 'mistral.tests.unit.scheduler.test_scheduler.target_method' +) + + +def target_method(): + pass + + +class SchedulerTest(base.DbTestCase): + def setUp(self): + super(SchedulerTest, self).setUp() + + # This Timeout object is needed to raise an exception if the test took + # longer than a configured number of seconds. + self.timeout = timeout.Timeout(seconds=15) + + # Synchronization primitives to control when a scheduled invoked + # method is allowed to enter the method and exit from it to perform + # all needed checks. + self.target_mtd_started = event.Event() + self.target_mtd_finished = event.Event() + self.target_mtd_lock = semaphore.Semaphore(0) + + self.scheduler = default_scheduler.DefaultScheduler(1, 1, 100) + self.scheduler.start() + + self.addCleanup(self.scheduler.stop, True) + self.addCleanup(self.timeout.cancel) + + def target_method(self, *args, **kwargs): + self.target_mtd_started.send() + + self.target_mtd_lock.acquire() + + # Note: Potentially we can do something else here. No-op for now. + + self.target_mtd_finished.send() + + def _wait_target_method_start(self): + self.target_mtd_started.wait() + + def _unlock_target_method(self): + self.target_mtd_lock.release() + + def _wait_target_method_end(self): + self.target_mtd_finished.wait() + + @mock.patch(TARGET_METHOD_PATH) + def test_schedule_called_once(self, method): + method.side_effect = self.target_method + + job = scheduler_base.SchedulerJob( + run_after=1, + func_name=TARGET_METHOD_PATH, + func_args={'name': 'task', 'id': '321'} + ) + + self.scheduler.schedule(job) + + self._wait_target_method_start() + + # Check that the persistent job has been created and captured. + scheduled_jobs = db_api.get_scheduled_jobs() + + self.assertEqual(1, len(scheduled_jobs)) + + captured_at = scheduled_jobs[0].captured_at + + self.assertIsNotNone(captured_at) + self.assertTrue( + datetime.datetime.now() - captured_at < + datetime.timedelta(seconds=3) + ) + + self._unlock_target_method() + self._wait_target_method_end() + + method.assert_called_once_with(name='task', id='321') + + # After the job is processed the persistent object must be deleted. + self._await(lambda: not db_api.get_scheduled_jobs()) + + @mock.patch(TARGET_METHOD_PATH) + def test_pickup_from_job_store(self, method): + method.side_effect = self.target_method + + self.override_config('pickup_job_after', 1, 'scheduler') + + # 1. Create a scheduled job in Job Store. + execute_at = datetime.datetime.now() + datetime.timedelta(seconds=1) + + db_api.create_scheduled_job({ + 'run_after': 1, + 'func_name': TARGET_METHOD_PATH, + 'func_args': {'name': 'task', 'id': '321'}, + 'execute_at': execute_at, + 'captured_at': None, + 'auth_ctx': {} + }) + + self.assertTrue(len(db_api.get_scheduled_jobs()) > 0) + + self._unlock_target_method() + self._wait_target_method_end() + + # 2. Wait till Scheduler picks up the job and processes it. + self._await(lambda: not db_api.get_scheduled_jobs()) + + method.assert_called_once_with(name='task', id='321') + + @mock.patch(TARGET_METHOD_PATH) + def test_recapture_job(self, method): + method.side_effect = self.target_method + + self.override_config('pickup_job_after', 1, 'scheduler') + self.override_config('captured_job_timeout', 3, 'scheduler') + + # 1. Create a scheduled job in Job Store marked as captured in one + # second in the future. It can be captured again only after 3 + # seconds after that according to the config option. + captured_at = datetime.datetime.now() + datetime.timedelta(seconds=1) + + before_ts = datetime.datetime.now() + + db_api.create_scheduled_job({ + 'run_after': 1, + 'func_name': TARGET_METHOD_PATH, + 'func_args': {'name': 'task', 'id': '321'}, + 'execute_at': datetime.datetime.now(), + 'captured_at': captured_at, + 'auth_ctx': {} + }) + + self.assertTrue(len(db_api.get_scheduled_jobs()) > 0) + + self._unlock_target_method() + self._wait_target_method_end() + + # 2. Wait till Scheduler picks up the job and processes it. + self._await(lambda: not db_api.get_scheduled_jobs()) + + method.assert_called_once_with(name='task', id='321') + + # At least 3 seconds should have passed. + self.assertTrue( + datetime.datetime.now() - before_ts >= + datetime.timedelta(seconds=3) + ) diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index 6185fc31d..d336cb135 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -41,6 +41,10 @@ from mistral import exceptions as exc # Thread local storage. _th_loc_storage = threading.local() + +# TODO(rakhmerov): these two constants are misplaced. Utility methods +# should not be Mistral specific. They should be generic enough so to +# be moved to any other project w/o changes. ACTION_TASK_TYPE = 'ACTION' WORKFLOW_TASK_TYPE = 'WORKFLOW'