From 7b71f096b9be21a7260d573e7382725855fb94cb Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Thu, 19 Jul 2018 11:51:02 +0700 Subject: [PATCH] New experimental scheduler: the first working version This patch delivers the first working version of a distributed scheduler implementation based on local and persistent job queues. The idea is inspired by the parallel computing pattern known as "Work stealing" although it doesn't fully repeat it due to a nature of Mistral. See https://en.wikipedia.org/wiki/Work_stealing for details. Advantages of this scheduler implementation: * It doesn't have job processing delays when a cluster topology' is stable caused by DB polling intervals. A job gets scheduled in memory and also saved into the persistent storage for reliability. A persistent job can be picked up only after a configured allowed period of time so that it happens effectively after a node responsible for local processing crashed. * Low DB load. DB polling still exists but it's not a primary scheduling mechamisn now but rather a protection from node crash situations. That means that a polling interval can now be made large like 30 seconds, instead of 1-2 seconds. Less DB load leads to less DB deadlocks between scheduler instances and less retries on MySQL. * Since DB load is now less it gives better scalability properties. A bigger number of engines won't now lead to much bigger contention because of a big DB polling intervals. * Protection from having jobs forever hanging in processing state. In the existing implementation, if a scheduler captured a job for processing (set its "processing" flag to True) and then crashed then a job will be in processing state forever in the DB. Instead of a boolean "processing" flag, the new implementation uses a timestamp showing when a job was captured. That gives us the opportunity to make such jobs eligible for recapturing and further processing after a certain configured timeout. TODO: * More testing * DB migration for the new scheduled jobs table * Benchmarks and testing under load * Standardize the scheduler interface and write an adapter for the existing scheduler so that we could choose between scheduler implementations. It's highly desired to make transition to the new scheduler smooth in production: we always need to be able to roll back to the existing scheduler. Partial blueprint: mistral-redesign-scheduler Partial blueprint: mistral-eliminate-scheduler-delays Change-Id: If7d06b64ac14d01e80d31242e1640cb93f2aa6fe --- mistral/config.py | 28 ++ mistral/context.py | 1 + mistral/db/v2/api.py | 30 ++ mistral/db/v2/sqlalchemy/api.py | 119 +++++++ mistral/db/v2/sqlalchemy/models.py | 33 ++ mistral/engine/engine_server.py | 1 + mistral/scheduler/__init__.py | 0 mistral/scheduler/base.py | 78 +++++ mistral/scheduler/default_scheduler.py | 291 ++++++++++++++++++ mistral/scheduler/scheduler_server.py | 68 ++++ mistral/tests/unit/base.py | 3 +- mistral/tests/unit/scheduler/__init__.py | 0 .../tests/unit/scheduler/test_scheduler.py | 176 +++++++++++ mistral/utils/__init__.py | 4 + 14 files changed, 831 insertions(+), 1 deletion(-) create mode 100644 mistral/scheduler/__init__.py create mode 100644 mistral/scheduler/base.py create mode 100644 mistral/scheduler/default_scheduler.py create mode 100644 mistral/scheduler/scheduler_server.py create mode 100644 mistral/tests/unit/scheduler/__init__.py create mode 100644 mistral/tests/unit/scheduler/test_scheduler.py diff --git a/mistral/config.py b/mistral/config.py index b480223df..e5785bf78 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -255,6 +255,34 @@ scheduler_opts = [ 'If this property equals None then there is no ' 'restriction on selection.' ) + ), + cfg.FloatOpt( + 'captured_job_timeout', + default=30, + min=1, + help=( + 'Defines how soon (in seconds) a scheduled job captured for ' + 'processing becomes eligible for capturing by other schedulers ' + 'again. This option is needed to prevent situations when a ' + 'scheduler instance captured a job and failed while processing ' + 'and so this job can never be processed again because it is ' + 'marked as captured.' + ) + ), + cfg.FloatOpt( + 'pickup_job_after', + default=60, + min=1, + help='Time period given to a scheduler to process a scheduled job ' + 'locally before it becomes eligible for processing by other ' + 'scheduler instances.' + 'For example, a job needs to run at 12:00:00. When a scheduler ' + 'starts processing it it has 60 seconds (or other configured ' + 'value) to complete the job. If the scheduler did not complete ' + 'the job within this period it most likely means that the ' + 'scheduler process crashed. In this case another scheduler ' + 'instance will pick it up from the Job Store, but not earlier ' + 'than 12:01:00 and try to process it.' ) ] diff --git a/mistral/context.py b/mistral/context.py index 28e2a68fb..fceb6cafd 100644 --- a/mistral/context.py +++ b/mistral/context.py @@ -219,6 +219,7 @@ class RpcContextSerializer(messaging.Serializer): profiler.init(**trace_info) ctx = MistralContext.from_dict(context) + set_ctx(ctx) return ctx diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 477914003..de1d0b78c 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -382,6 +382,36 @@ def delete_delayed_calls(**kwargs): return IMPL.delete_delayed_calls(**kwargs) +# Scheduled jobs. + +def get_scheduled_jobs_to_start(time, batch_size=None): + return IMPL.get_scheduled_jobs_to_start(time, batch_size) + + +def create_scheduled_job(values): + return IMPL.create_scheduled_job(values) + + +def delete_scheduled_job(id): + return IMPL.delete_scheduled_job(id) + + +def update_scheduled_job(id, values, query_filter=None): + return IMPL.update_scheduled_job(id, values, query_filter) + + +def get_scheduled_job(id): + return IMPL.get_scheduled_job(id) + + +def get_scheduled_jobs(**kwargs): + return IMPL.get_scheduled_jobs(**kwargs) + + +def delete_scheduled_jobs(**kwargs): + return IMPL.delete_scheduled_jobs(**kwargs) + + # Cron triggers. def get_cron_trigger(identifier): diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index dddf7bb01..671d5f51f 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -15,9 +15,11 @@ # limitations under the License. import contextlib +import datetime import sys import threading + from oslo_config import cfg from oslo_db import exception as db_exc from oslo_db import sqlalchemy as oslo_sqlalchemy @@ -1132,6 +1134,123 @@ def delete_delayed_calls(session=None, **kwargs): return _delete_all(models.DelayedCall, **kwargs) +@b.session_aware() +def create_scheduled_job(values, session=None): + job = models.ScheduledJob() + + job.update(values.copy()) + + try: + job.save(session) + except db_exc.DBDuplicateEntry as e: + raise exc.DBDuplicateEntryError( + "Duplicate entry for ScheduledJob ID: {}".format(e.value) + ) + + return job + + +@b.session_aware() +def get_scheduled_jobs_to_start(time, batch_size=None, session=None): + query = b.model_query(models.ScheduledJob) + + execute_at_col = models.ScheduledJob.execute_at + captured_at_col = models.ScheduledJob.captured_at + + # Filter by execution time accounting for a configured job pickup interval. + query = query.filter( + execute_at_col < + time - datetime.timedelta(seconds=CONF.scheduler.pickup_job_after) + ) + + # Filter by captured time accounting for a configured captured job timeout. + min_captured_at = ( + datetime.datetime.now() - + datetime.timedelta(seconds=CONF.scheduler.captured_job_timeout) + ) + + query = query.filter( + sa.or_( + captured_at_col == sa.null(), + captured_at_col <= min_captured_at + ) + ) + + query = query.order_by(execute_at_col) + query = query.limit(batch_size) + + return query.all() + + +@b.session_aware() +def update_scheduled_job(id, values, query_filter=None, session=None): + if query_filter: + try: + specimen = models.ScheduledJob(id=id, **query_filter) + + job = b.model_query( + models.ScheduledJob + ).update_on_match( + specimen=specimen, + surrogate_key='id', + values=values + ) + + return job, 1 + + except oslo_sqlalchemy.update_match.NoRowsMatched as e: + LOG.debug( + "No rows matched for update scheduled job [id=%s, values=%s, " + "query_filter=%s," + "exception=%s]", id, values, query_filter, e + ) + + return None, 0 + + else: + job = get_scheduled_job(id=id, session=session) + + job.update(values) + + return job, len(session.dirty) + + +@b.session_aware() +def get_scheduled_job(id, session=None): + job = _get_db_object_by_id(models.ScheduledJob, id) + + if not job: + raise exc.DBEntityNotFoundError( + "Scheduled job not found [id=%s]" % id + ) + + return job + + +@b.session_aware() +def delete_scheduled_job(id, session=None): + # It's safe to use insecure query here because users can't access + # scheduled job. + count = b.model_query(models.ScheduledJob).filter( + models.ScheduledJob.id == id).delete() + + if count == 0: + raise exc.DBEntityNotFoundError( + "Scheduled job not found [id=%s]" % id + ) + + +def get_scheduled_jobs(**kwargs): + return _get_collection(model=models.ScheduledJob, **kwargs) + + +@b.session_aware() +def delete_scheduled_jobs(session=None, **kwargs): + return _delete_all(models.ScheduledJob, **kwargs) + + +# Other functions. + @b.session_aware() def get_expired_executions(expiration_time, limit=None, columns=(), session=None): diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index 471e339e3..b6d22512e 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -395,6 +395,39 @@ sa.Index( ) +class ScheduledJob(mb.MistralModelBase): + """Contains info about scheduled jobs.""" + + __tablename__ = 'scheduled_jobs_v2' + + id = mb.id_column() + + run_after = sa.Column(sa.Integer) + + # The full name of the factory function that returns/builds a Python + # (target) object whose method should be called. Optional. + target_factory_func_name = sa.Column(sa.String(200), nullable=True) + + # May take two different forms: + # 1. Full path of a target function that should be called. For example, + # "mistral.utils.random_sleep". + # 2. Name of a method to call on a target object, if + # "target_factory_func_name" is specified. + func_name = sa.Column(sa.String(80), nullable=False) + + func_args = sa.Column(st.JsonDictType()) + func_arg_serializers = sa.Column(st.JsonDictType()) + auth_ctx = sa.Column(st.JsonDictType()) + execute_at = sa.Column(sa.DateTime, nullable=False) + captured_at = sa.Column(sa.DateTime, nullable=True) + + +sa.Index( + '%s_execution_time' % ScheduledJob.__tablename__, + ScheduledJob.execute_at +) + + class Environment(mb.MistralSecureModelBase): """Contains environment variables for workflow execution.""" diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py index c93836898..db8e20321 100644 --- a/mistral/engine/engine_server.py +++ b/mistral/engine/engine_server.py @@ -51,6 +51,7 @@ class EngineServer(service_base.MistralService): self._scheduler = scheduler.start() self._expiration_policy_tg = expiration_policy.setup() + action_execution_checker.setup() if self._setup_profiler: diff --git a/mistral/scheduler/__init__.py b/mistral/scheduler/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/scheduler/base.py b/mistral/scheduler/base.py new file mode 100644 index 000000000..ed0dc459a --- /dev/null +++ b/mistral/scheduler/base.py @@ -0,0 +1,78 @@ +# Copyright 2018 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class Scheduler(object): + """Scheduler interface. + + Responsible for scheduling jobs to be executed at some point in future. + """ + + @abc.abstractmethod + def schedule(self, job, allow_redistribute=False): + """Schedules a delayed call to be invoked at some point in future. + + :param job: Scheduler Job. An instance of :class:`SchedulerJob`. + :param allow_redistribute: If True then the method is allowed to + reroute the call to other Scheduler instances available in the + cluster. + """ + raise NotImplementedError + + +class SchedulerJob(object): + """Scheduler job. + + Encapsulates information about a command that needs to be executed + at some point in future. + """ + def __init__(self, run_after=0, target_factory_func_name=None, + func_name=None, func_args=None, + func_arg_serializers=None): + """Initializes a Scheduler Job. + + :param run_after: Amount of seconds after which to invoke + a scheduled call. + :param target_factory_func_name: Full path of a function that returns + a target object against which a method specified with the + "func_name" should be invoked. Optional. If None, then "func_name" + must be a full path of a static function to invoke. + :param func_name: Function or method name to invoke when a job gets + triggered. + :param func_args: Dictionary containing function/method argument names + and values as key-value pairs. A function/method specified with + the "func_name" argument will be invoked with these arguments. + :param func_arg_serializers: Dictionary containing function/method + argument names and serializers for argument values as key-value + pairs. Each serializer is a full path to a subclass of + :class:'mistral_lib.serialization.Serializer' that is capable + of serializing and deserializing of a corresponding argument value. + Optional. Serializers must be specified only for those arguments + whose values can't be saved into a persistent storage as is and + they need to be converted first into a value of a primitive type. + + """ + + if not func_name: + raise RuntimeError("'target_method_name' must be provided.") + + self.run_after = run_after + self.target_factory_func_name = target_factory_func_name + self.func_name = func_name + self.func_args = func_args or {} + self.func_arg_serializers = func_arg_serializers diff --git a/mistral/scheduler/default_scheduler.py b/mistral/scheduler/default_scheduler.py new file mode 100644 index 000000000..21db17162 --- /dev/null +++ b/mistral/scheduler/default_scheduler.py @@ -0,0 +1,291 @@ +# Copyright 2018 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import datetime +import eventlet +import random +import sys +import threading + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import importutils + +from mistral import context +from mistral.db import utils as db_utils +from mistral.db.v2 import api as db_api +from mistral import exceptions as exc +from mistral.scheduler import base + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + + +class DefaultScheduler(base.Scheduler): + def __init__(self, fixed_delay, random_delay, batch_size): + self._fixed_delay = fixed_delay + self._random_delay = random_delay + self._batch_size = batch_size + + # Dictionary containing {GreenThread: ScheduledJob} pairs that + # represent in-memory jobs. + self.memory_jobs = {} + + self._job_store_checker_thread = threading.Thread( + target=self._job_store_checker + ) + self._job_store_checker_thread.daemon = True + + self._stopped = True + + def start(self): + self._stopped = False + + self._job_store_checker_thread.start() + + def stop(self, graceful=False): + self._stopped = True + + if graceful: + self._job_store_checker_thread.join() + + def _job_store_checker(self): + while not self._stopped: + LOG.debug( + "Starting Scheduler Job Store checker [scheduler=%s]...", self + ) + + try: + self._process_store_jobs() + except Exception: + LOG.exception( + "Scheduler failed to process delayed calls" + " due to unexpected exception." + ) + + # For some mysterious reason (probably eventlet related) + # the exception is not cleared from the context automatically. + # This results in subsequent log.warning calls to show invalid + # info. + if sys.version_info < (3,): + sys.exc_clear() + + eventlet.sleep( + self._fixed_delay + + random.Random().randint(0, self._random_delay * 1000) * 0.001 + ) + + def _process_store_jobs(self): + # Select and capture eligible jobs. + with db_api.transaction(): + candidate_jobs = db_api.get_scheduled_jobs_to_start( + datetime.datetime.now(), + self._batch_size + ) + + captured_jobs = [ + job for job in candidate_jobs + if self._capture_scheduled_job(job) + ] + + # Invoke and delete scheduled jobs. + for job in captured_jobs: + auth_ctx, func, func_args = self._prepare_job(job) + + self._invoke_job(auth_ctx, func, func_args) + + self._delete_scheduled_job(job) + + def schedule(self, job, allow_redistribute=False): + scheduled_job = DefaultScheduler._persist_job(job) + + self._schedule_in_memory(job.run_after, scheduled_job) + + @classmethod + def _persist_job(cls, job): + ctx_serializer = context.RpcContextSerializer() + + ctx = ( + ctx_serializer.serialize_context(context.ctx()) + if context.has_ctx() else {} + ) + + execute_at = (datetime.datetime.now() + + datetime.timedelta(seconds=job.run_after)) + + args = job.func_args + arg_serializers = job.func_arg_serializers + + if arg_serializers: + for arg_name, serializer_path in arg_serializers.items(): + if arg_name not in args: + raise exc.MistralException( + "Serializable function argument %s" + " not found in func_args=%s" + % (arg_name, args)) + try: + serializer = importutils.import_class(serializer_path)() + except ImportError as e: + raise ImportError( + "Cannot import class %s: %s" % (serializer_path, e) + ) + + args[arg_name] = serializer.serialize(args[arg_name]) + + values = { + 'run_after': job.run_after, + 'target_factory_func_name': job.target_factory_func_name, + 'func_name': job.func_name, + 'func_args': args, + 'func_arg_serializers': arg_serializers, + 'auth_ctx': ctx, + 'execute_at': execute_at, + 'captured_at': None + } + + return db_api.create_scheduled_job(values) + + def _schedule_in_memory(self, run_after, scheduled_job): + green_thread = eventlet.spawn_after( + run_after, + self._process_memory_job, + scheduled_job + ) + + self.memory_jobs[green_thread] = scheduled_job + + def _process_memory_job(self, scheduled_job): + # 1. Capture the job in Job Store. + if not self._capture_scheduled_job(scheduled_job): + LOG.warning( + "Unable to capture a scheduled job [scheduled_job=%s]", + scheduled_job + ) + + return + + # 2. Invoke the target function. + auth_ctx, func, func_args = self._prepare_job(scheduled_job) + + self._invoke_job(auth_ctx, func, func_args) + + self._delete_scheduled_job(scheduled_job) + + # 3. Delete the job from Job Store, if success. + # TODO(rakhmerov): + # 3.1 What do we do if invocation wasn't successful? + + # Delete from a local collection of in-memory jobs. + del self.memory_jobs[eventlet.getcurrent()] + + @staticmethod + def _capture_scheduled_job(scheduled_job): + """Capture a scheduled persistent job in a job store. + + :param scheduled_job: Job. + :return: True if the job has been captured, False if not. + """ + + # Mark this job as captured in order to prevent calling from + # parallel a transaction. We don't use query filter + # {'captured_at': None} to account for a case when the job needs + # to be recaptured after a maximum capture time has elapsed. If this + # method was called for job that has non-empty "captured_at" then + # it means that it is already eligible for recapturing and the + # Job Store selected it. + _, updated_cnt = db_api.update_scheduled_job( + id=scheduled_job.id, + values={'captured_at': datetime.datetime.now()}, + query_filter={'captured_at': scheduled_job.captured_at} + ) + + # If updated_cnt != 1 then another scheduler + # has already updated it. + return updated_cnt == 1 + + @staticmethod + @db_utils.retry_on_db_error + def _delete_scheduled_job(scheduled_job): + db_api.delete_scheduled_job(scheduled_job.id) + + @staticmethod + def _prepare_job(scheduled_job): + """Prepares a scheduled job for invocation. + + To make an invocation of a delayed call it needs to be prepared for + further usage, we need to reconstruct a final target func + and deserialize arguments, if needed. + + :param scheduled_job: Persistent scheduled job. + :return: A tuple (auth_ctx, func, args) where all data is properly + deserialized. + """ + + LOG.debug( + 'Preparing a scheduled job. [ID=%s, target_factory_func_name=%s,' + ' func_name=%s, func_args=%s]', + scheduled_job.id, + scheduled_job.target_factory_func_name, + scheduled_job.func_name, + scheduled_job.func_args + ) + + auth_ctx = copy.deepcopy(scheduled_job.auth_ctx) + + if scheduled_job.target_factory_func_name: + factory = importutils.import_class( + scheduled_job.target_factory_func_name + ) + + func = getattr(factory(), scheduled_job.func_name) + else: + func = importutils.import_class(scheduled_job.func_name) + + args = copy.deepcopy(scheduled_job.func_args) + + serializers_dict = scheduled_job.func_arg_serializers + + if serializers_dict: + # Deserialize arguments. + for arg_name, ser_path in serializers_dict.items(): + serializer = importutils.import_class(ser_path)() + + deserialized = serializer.deserialize(args[arg_name]) + + args[arg_name] = deserialized + + return auth_ctx, func, args + + @staticmethod + def _invoke_job(auth_ctx, func, args): + ctx_serializer = context.RpcContextSerializer() + + try: + # Set the correct context for the function. + ctx_serializer.deserialize_context(auth_ctx) + + # Invoke the function. + func(**args) + except Exception as e: + LOG.exception( + "Scheduled job failed, method: %s, exception: %s", + func, + e + ) + finally: + # Remove context. + context.set_ctx(None) diff --git a/mistral/scheduler/scheduler_server.py b/mistral/scheduler/scheduler_server.py new file mode 100644 index 000000000..64abae286 --- /dev/null +++ b/mistral/scheduler/scheduler_server.py @@ -0,0 +1,68 @@ +# Copyright 2018 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from oslo_config import cfg +from oslo_log import log as logging + +from mistral.rpc import base as rpc +from mistral.service import base as service_base + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + + +class SchedulerServer(service_base.MistralService): + """Scheduler server. + + Manages scheduler life-cycle and gets registered as an RPC + endpoint to process scheduler specific calls. + """ + + def __init__(self, scheduler, setup_profiler=True): + super(SchedulerServer, self).__init__( + 'scheduler_group', + setup_profiler + ) + + self.scheduler = scheduler + self._rpc_server = None + + def start(self): + super(SchedulerServer, self).start() + + self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine) + self._rpc_server.register_endpoint(self) + + self._rpc_server.run() + + self._notify_started('Scheduler server started.') + + def stop(self, graceful=False): + super(SchedulerServer, self).stop() + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def schedule(self, rpc_ctx, job): + """Receives requests over RPC to schedule delayed calls. + + :param rpc_ctx: RPC request context. + :param job: Scheduler job. + """ + LOG.info("Received RPC request 'schedule'[job=%s]", job) + + return self.scheduler.schedule(job, allow_redistribute=False) diff --git a/mistral/tests/unit/base.py b/mistral/tests/unit/base.py index 9faeff286..677002033 100644 --- a/mistral/tests/unit/base.py +++ b/mistral/tests/unit/base.py @@ -203,7 +203,7 @@ class BaseTest(base.BaseTestCase): If within a configured timeout predicate function hasn't evaluated to True then an exception is raised. - :param predicate: Predication function. + :param predicate: Predicate function. :param delay: Delay in seconds between predicate function calls. :param timeout: Maximum amount of time to wait for predication function to evaluate to True. @@ -302,6 +302,7 @@ class DbTestCase(BaseTest): db_api.delete_environments() db_api.delete_resource_members() db_api.delete_delayed_calls() + db_api.delete_scheduled_jobs() sqlite_lock.cleanup() diff --git a/mistral/tests/unit/scheduler/__init__.py b/mistral/tests/unit/scheduler/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/tests/unit/scheduler/test_scheduler.py b/mistral/tests/unit/scheduler/test_scheduler.py new file mode 100644 index 000000000..07d591ac5 --- /dev/null +++ b/mistral/tests/unit/scheduler/test_scheduler.py @@ -0,0 +1,176 @@ +# Copyright 2018 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from eventlet import event +from eventlet import semaphore +from eventlet import timeout + +import datetime +import mock + +from mistral.db.v2 import api as db_api +from mistral.scheduler import base as scheduler_base +from mistral.scheduler import default_scheduler +from mistral.tests.unit import base + + +TARGET_METHOD_PATH = ( + 'mistral.tests.unit.scheduler.test_scheduler.target_method' +) + + +def target_method(): + pass + + +class SchedulerTest(base.DbTestCase): + def setUp(self): + super(SchedulerTest, self).setUp() + + # This Timeout object is needed to raise an exception if the test took + # longer than a configured number of seconds. + self.timeout = timeout.Timeout(seconds=15) + + # Synchronization primitives to control when a scheduled invoked + # method is allowed to enter the method and exit from it to perform + # all needed checks. + self.target_mtd_started = event.Event() + self.target_mtd_finished = event.Event() + self.target_mtd_lock = semaphore.Semaphore(0) + + self.scheduler = default_scheduler.DefaultScheduler(1, 1, 100) + self.scheduler.start() + + self.addCleanup(self.scheduler.stop, True) + self.addCleanup(self.timeout.cancel) + + def target_method(self, *args, **kwargs): + self.target_mtd_started.send() + + self.target_mtd_lock.acquire() + + # Note: Potentially we can do something else here. No-op for now. + + self.target_mtd_finished.send() + + def _wait_target_method_start(self): + self.target_mtd_started.wait() + + def _unlock_target_method(self): + self.target_mtd_lock.release() + + def _wait_target_method_end(self): + self.target_mtd_finished.wait() + + @mock.patch(TARGET_METHOD_PATH) + def test_schedule_called_once(self, method): + method.side_effect = self.target_method + + job = scheduler_base.SchedulerJob( + run_after=1, + func_name=TARGET_METHOD_PATH, + func_args={'name': 'task', 'id': '321'} + ) + + self.scheduler.schedule(job) + + self._wait_target_method_start() + + # Check that the persistent job has been created and captured. + scheduled_jobs = db_api.get_scheduled_jobs() + + self.assertEqual(1, len(scheduled_jobs)) + + captured_at = scheduled_jobs[0].captured_at + + self.assertIsNotNone(captured_at) + self.assertTrue( + datetime.datetime.now() - captured_at < + datetime.timedelta(seconds=3) + ) + + self._unlock_target_method() + self._wait_target_method_end() + + method.assert_called_once_with(name='task', id='321') + + # After the job is processed the persistent object must be deleted. + self._await(lambda: not db_api.get_scheduled_jobs()) + + @mock.patch(TARGET_METHOD_PATH) + def test_pickup_from_job_store(self, method): + method.side_effect = self.target_method + + self.override_config('pickup_job_after', 1, 'scheduler') + + # 1. Create a scheduled job in Job Store. + execute_at = datetime.datetime.now() + datetime.timedelta(seconds=1) + + db_api.create_scheduled_job({ + 'run_after': 1, + 'func_name': TARGET_METHOD_PATH, + 'func_args': {'name': 'task', 'id': '321'}, + 'execute_at': execute_at, + 'captured_at': None, + 'auth_ctx': {} + }) + + self.assertTrue(len(db_api.get_scheduled_jobs()) > 0) + + self._unlock_target_method() + self._wait_target_method_end() + + # 2. Wait till Scheduler picks up the job and processes it. + self._await(lambda: not db_api.get_scheduled_jobs()) + + method.assert_called_once_with(name='task', id='321') + + @mock.patch(TARGET_METHOD_PATH) + def test_recapture_job(self, method): + method.side_effect = self.target_method + + self.override_config('pickup_job_after', 1, 'scheduler') + self.override_config('captured_job_timeout', 3, 'scheduler') + + # 1. Create a scheduled job in Job Store marked as captured in one + # second in the future. It can be captured again only after 3 + # seconds after that according to the config option. + captured_at = datetime.datetime.now() + datetime.timedelta(seconds=1) + + before_ts = datetime.datetime.now() + + db_api.create_scheduled_job({ + 'run_after': 1, + 'func_name': TARGET_METHOD_PATH, + 'func_args': {'name': 'task', 'id': '321'}, + 'execute_at': datetime.datetime.now(), + 'captured_at': captured_at, + 'auth_ctx': {} + }) + + self.assertTrue(len(db_api.get_scheduled_jobs()) > 0) + + self._unlock_target_method() + self._wait_target_method_end() + + # 2. Wait till Scheduler picks up the job and processes it. + self._await(lambda: not db_api.get_scheduled_jobs()) + + method.assert_called_once_with(name='task', id='321') + + # At least 3 seconds should have passed. + self.assertTrue( + datetime.datetime.now() - before_ts >= + datetime.timedelta(seconds=3) + ) diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index 6185fc31d..d336cb135 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -41,6 +41,10 @@ from mistral import exceptions as exc # Thread local storage. _th_loc_storage = threading.local() + +# TODO(rakhmerov): these two constants are misplaced. Utility methods +# should not be Mistral specific. They should be generic enough so to +# be moved to any other project w/o changes. ACTION_TASK_TYPE = 'ACTION' WORKFLOW_TASK_TYPE = 'WORKFLOW'