From 43d23c0e254e237303c91d91fb0d1330552df610 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Fri, 21 Jun 2019 11:33:12 +0700 Subject: [PATCH] Create needed infrastructure to switch scheduler implementations * After this patch we can switch scheduler implementations in the configuration. All functionality related to scheduling jobs is now expressed vi the internal API classes Scheduler and SchedulerJob. Patch also adds another entry point into setup.cfg where we can register a new scheduler implementation. * The new scheduler (which is now called DefaultScheduler) still should be considered experimental and requires a lot of testing and optimisations. * Fixed and refactored "with-items" tests. Before the patch they were breaking the "black box" testing principle and relied on on some either purely implementation or volatile data (e.g. checks of the internal 'capacity' property) * Fixed all other relevant tests. Change-Id: I340f886615d416a1db08e4516f825d200f76860d --- mistral/config.py | 10 + mistral/db/v2/api.py | 4 + mistral/db/v2/sqlalchemy/api.py | 5 + mistral/db/v2/sqlalchemy/models.py | 8 +- mistral/engine/engine_server.py | 10 +- mistral/engine/policies.py | 71 ++++--- mistral/engine/task_handler.py | 66 ++++--- mistral/engine/workflow_handler.py | 17 +- mistral/scheduler/base.py | 66 ++++++- mistral/scheduler/default_scheduler.py | 38 +++- .../{scheduler.py => legacy_scheduler.py} | 64 ++---- .../tests/unit/engine/test_direct_workflow.py | 31 +-- mistral/tests/unit/engine/test_with_items.py | 185 +++++++++--------- ...scheduler.py => test_default_scheduler.py} | 24 ++- ..._scheduler.py => test_legacy_scheduler.py} | 156 +++++++++------ mistral/tests/unit/test_launcher.py | 3 +- setup.cfg | 4 + 17 files changed, 464 insertions(+), 298 deletions(-) rename mistral/services/{scheduler.py => legacy_scheduler.py} (91%) rename mistral/tests/unit/scheduler/{test_scheduler.py => test_default_scheduler.py} (86%) rename mistral/tests/unit/services/{test_scheduler.py => test_legacy_scheduler.py} (72%) diff --git a/mistral/config.py b/mistral/config.py index 1fae88674..035818cb7 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -1,6 +1,7 @@ # Copyright 2013 - Mirantis, Inc. # Copyright 2016 - Brocade Communications Systems, Inc. # Copyright 2018 - Extreme Networks, Inc. +# Copyright 2019 - Nokia Networks # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -240,6 +241,13 @@ executor_opts = [ ) ] +scheduler_type_opt = cfg.StrOpt( + 'scheduler_type', + default='legacy', + choices=['legacy', 'default'], + help=_('The name of the scheduler implementation used in the system.') +) + scheduler_opts = [ cfg.FloatOpt( 'fixed_delay', @@ -643,6 +651,7 @@ KEYSTONE_GROUP = "keystone" CONF.register_opt(wf_trace_log_name_opt) CONF.register_opt(auth_type_opt) +CONF.register_opt(scheduler_type_opt) CONF.register_opt(js_impl_opt) CONF.register_opt(rpc_impl_opt) CONF.register_opt(rpc_response_timeout_opt) @@ -682,6 +691,7 @@ default_group_opts = itertools.chain( [ wf_trace_log_name_opt, auth_type_opt, + scheduler_type_opt, js_impl_opt, rpc_impl_opt, rpc_response_timeout_opt, diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index e41c623b1..b77789289 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -453,6 +453,10 @@ def delete_scheduled_jobs(**kwargs): return IMPL.delete_scheduled_jobs(**kwargs) +def get_scheduled_jobs_count(**kwargs): + return IMPL.get_scheduled_jobs_count(**kwargs) + + # Cron triggers. def get_cron_trigger(identifier): diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 61b0d3e01..5f39d1236 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -1243,6 +1243,7 @@ def get_scheduled_jobs_to_start(time, batch_size=None, session=None): captured_at_col = models.ScheduledJob.captured_at # Filter by execution time accounting for a configured job pickup interval. + # TODO(rakhmerov): Configuration options should not be accessed here. query = query.filter( execute_at_col < time - datetime.timedelta(seconds=CONF.scheduler.pickup_job_after) @@ -1334,6 +1335,10 @@ def delete_scheduled_jobs(session=None, **kwargs): return _delete_all(models.ScheduledJob, **kwargs) +def get_scheduled_jobs_count(**kwargs): + return _get_count(model=models.ScheduledJob, **kwargs) + + # Other functions. @b.session_aware() diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index ec0db2b9a..dbc2d659f 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -457,11 +457,17 @@ class ScheduledJob(mb.MistralModelBase): execute_at = sa.Column(sa.DateTime, nullable=False) captured_at = sa.Column(sa.DateTime, nullable=True) + key = sa.Column(sa.String(250), nullable=True) + sa.Index( - '%s_execution_time' % ScheduledJob.__tablename__, + '%s_execute_at' % ScheduledJob.__tablename__, ScheduledJob.execute_at ) +sa.Index( + '%s_captured_at' % ScheduledJob.__tablename__, + ScheduledJob.captured_at +) class Environment(mb.MistralSecureModelBase): diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py index 776469bb6..f1a9a6a9f 100644 --- a/mistral/engine/engine_server.py +++ b/mistral/engine/engine_server.py @@ -18,10 +18,10 @@ from mistral import config as cfg from mistral.db.v2 import api as db_api from mistral.engine import default_engine from mistral.rpc import base as rpc +from mistral.scheduler import base as sched_base from mistral.service import base as service_base from mistral.services import action_execution_checker from mistral.services import expiration_policy -from mistral.services import scheduler from mistral import utils from mistral.utils import profiler as profiler_utils @@ -49,7 +49,9 @@ class EngineServer(service_base.MistralService): db_api.setup_db() - self._scheduler = scheduler.start() + self._scheduler = sched_base.get_system_scheduler() + self._scheduler.start() + self._expiration_policy_tg = expiration_policy.setup() action_execution_checker.start() @@ -72,7 +74,9 @@ class EngineServer(service_base.MistralService): action_execution_checker.stop(graceful) if self._scheduler: - scheduler.stop_scheduler(self._scheduler, graceful) + self._scheduler.stop(graceful) + + sched_base.destroy_system_scheduler() if self._expiration_policy_tg: self._expiration_policy_tg.stop(graceful) diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index c99032a8c..4682f4d31 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -19,7 +19,7 @@ from mistral.engine import base from mistral.engine import post_tx_queue from mistral.engine import workflow_handler as wf_handler from mistral import expressions -from mistral.services import scheduler +from mistral.scheduler import base as sched_base from mistral.utils import wf_trace from mistral.workflow import data_flow from mistral.workflow import states @@ -210,13 +210,18 @@ class WaitBeforePolicy(base.TaskPolicy): task_ex.state = states.RUNNING_DELAYED - scheduler.schedule_call( - None, - _CONTINUE_TASK_PATH, - self.delay, - task_ex_id=task_ex.id, + sched = sched_base.get_system_scheduler() + + job = sched_base.SchedulerJob( + run_after=self.delay, + func_name=_CONTINUE_TASK_PATH, + func_args={ + 'task_ex_id': task_ex.id + } ) + sched.schedule(job) + class WaitAfterPolicy(base.TaskPolicy): _schema = { @@ -269,15 +274,20 @@ class WaitAfterPolicy(base.TaskPolicy): ) # Schedule to change task state to RUNNING again. - scheduler.schedule_call( - None, - _COMPLETE_TASK_PATH, - self.delay, - task_ex_id=task_ex.id, - state=end_state, - state_info=end_state_info + sched = sched_base.get_system_scheduler() + + job = sched_base.SchedulerJob( + run_after=self.delay, + func_name=_COMPLETE_TASK_PATH, + func_args={ + 'task_ex_id': task_ex.id, + 'state': end_state, + 'state_info': end_state_info + } ) + sched.schedule(job) + class RetryPolicy(base.TaskPolicy): _schema = { @@ -399,21 +409,29 @@ class RetryPolicy(base.TaskPolicy): # the correct logical state. if hasattr(task_spec, "get_join") and task_spec.get_join(): from mistral.engine import task_handler as t_h + _log_task_delay(task_ex, self.delay, states.WAITING) + task_ex.state = states.WAITING + t_h._schedule_refresh_task_state(task_ex.id, self.delay) + return _log_task_delay(task_ex, self.delay) + task_ex.state = states.RUNNING_DELAYED - scheduler.schedule_call( - None, - _CONTINUE_TASK_PATH, - self.delay, - task_ex_id=task_ex.id, + sched = sched_base.get_system_scheduler() + + job = sched_base.SchedulerJob( + run_after=self.delay, + func_name=_CONTINUE_TASK_PATH, + func_args={'task_ex_id': task_ex.id} ) + sched.schedule(job) + @staticmethod def refresh_runtime_context(task_ex): runtime_context = task_ex.runtime_context or {} @@ -444,14 +462,19 @@ class TimeoutPolicy(base.TaskPolicy): if self.delay == 0: return - scheduler.schedule_call( - None, - _FAIL_IF_INCOMPLETE_TASK_PATH, - self.delay, - task_ex_id=task_ex.id, - timeout=self.delay + sched = sched_base.get_system_scheduler() + + job = sched_base.SchedulerJob( + run_after=self.delay, + func_name=_FAIL_IF_INCOMPLETE_TASK_PATH, + func_args={ + 'task_ex_id': task_ex.id, + 'timeout': self.delay + } ) + sched.schedule(job) + wf_trace.info( task_ex, "Timeout check scheduled [task=%s, timeout(s)=%s]." % diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 87ee439f2..920bf9b98 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -27,7 +27,7 @@ from mistral.engine import tasks from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc from mistral.lang import parser as spec_parser -from mistral.services import scheduler +from mistral.scheduler import base as sched_base from mistral.workflow import base as wf_base from mistral.workflow import commands as wf_cmds from mistral.workflow import states @@ -292,14 +292,16 @@ def _check_affected_tasks(task): ) def _schedule_if_needed(t_ex_id): - # NOTE(rakhmerov): we need to minimize the number of delayed calls + # NOTE(rakhmerov): we need to minimize the number of scheduled jobs # that refresh state of "join" tasks. We'll check if corresponding - # calls are already scheduled. Note that we must ignore delayed calls + # jobs are already scheduled. Note that we must ignore scheduled jobs # that are currently being processed because of a possible race with - # the transaction that deletes delayed calls, i.e. the call may still + # the transaction that deletes scheduled jobs, i.e. the job may still # exist in DB (the deleting transaction didn't commit yet) but it has # already been processed and the task state hasn't changed. - cnt = db_api.get_delayed_calls_count( + sched = sched_base.get_system_scheduler() + + cnt = sched.get_scheduled_jobs_count( key=_get_refresh_state_job_key(t_ex_id), processing=False ) @@ -462,16 +464,18 @@ def _schedule_refresh_task_state(task_ex_id, delay=0): :param task_ex_id: Task execution ID. :param delay: Delay. """ - key = _get_refresh_state_job_key(task_ex_id) - scheduler.schedule_call( - None, - _REFRESH_TASK_STATE_PATH, - delay, - key=key, - task_ex_id=task_ex_id + sched = sched_base.get_system_scheduler() + + job = sched_base.SchedulerJob( + run_after=delay, + func_name=_REFRESH_TASK_STATE_PATH, + func_args={'task_ex_id': task_ex_id}, + key=_get_refresh_state_job_key(task_ex_id) ) + sched.schedule(job) + def _get_refresh_state_job_key(task_ex_id): return 'th_r_t_s-%s' % task_ex_id @@ -512,17 +516,20 @@ def schedule_on_action_complete(action_ex, delay=0): return - key = 'th_on_a_c-%s' % action_ex.task_execution_id + sched = sched_base.get_system_scheduler() - scheduler.schedule_call( - None, - _SCHEDULED_ON_ACTION_COMPLETE_PATH, - delay, - key=key, - action_ex_id=action_ex.id, - wf_action=isinstance(action_ex, models.WorkflowExecution) + job = sched_base.SchedulerJob( + run_after=delay, + func_name=_SCHEDULED_ON_ACTION_COMPLETE_PATH, + func_args={ + 'action_ex_id': action_ex.id, + 'wf_action': isinstance(action_ex, models.WorkflowExecution) + }, + key='th_on_a_c-%s' % action_ex.task_execution_id ) + sched.schedule(job) + @db_utils.retry_on_db_error @post_tx_queue.run @@ -559,13 +566,16 @@ def schedule_on_action_update(action_ex, delay=0): return - key = 'th_on_a_u-%s' % action_ex.task_execution_id + sched = sched_base.get_system_scheduler() - scheduler.schedule_call( - None, - _SCHEDULED_ON_ACTION_UPDATE_PATH, - delay, - key=key, - action_ex_id=action_ex.id, - wf_action=isinstance(action_ex, models.WorkflowExecution) + job = sched_base.SchedulerJob( + run_after=delay, + func_name=_SCHEDULED_ON_ACTION_UPDATE_PATH, + func_args={ + 'action_ex_id': action_ex.id, + 'wf_action': isinstance(action_ex, models.WorkflowExecution) + }, + key='th_on_a_u-%s' % action_ex.task_execution_id ) + + sched.schedule(job) diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 4519cb57b..eedd5af99 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -23,7 +23,7 @@ from mistral.db.v2 import api as db_api from mistral.engine import post_tx_queue from mistral.engine import workflows from mistral import exceptions as exc -from mistral.services import scheduler +from mistral.scheduler import base as sched_base from mistral.workflow import states LOG = logging.getLogger(__name__) @@ -274,12 +274,13 @@ def _schedule_check_and_fix_integrity(wf_ex, delay=0): # Never check integrity if it's a negative value. return - key = _get_integrity_check_key(wf_ex) + sched = sched_base.get_system_scheduler() - scheduler.schedule_call( - None, - _CHECK_AND_FIX_INTEGRITY_PATH, - delay, - key=key, - wf_ex_id=wf_ex.id + job = sched_base.SchedulerJob( + run_after=delay, + func_name=_CHECK_AND_FIX_INTEGRITY_PATH, + func_args={'wf_ex_id': wf_ex.id}, + key=_get_integrity_check_key(wf_ex) ) + + sched.schedule(job) diff --git a/mistral/scheduler/base.py b/mistral/scheduler/base.py index ed0dc459a..42ad40bab 100644 --- a/mistral/scheduler/base.py +++ b/mistral/scheduler/base.py @@ -15,6 +15,16 @@ import abc import six +from oslo_config import cfg +from stevedore import driver + + +CONF = cfg.CONF + + +_SCHEDULER_IMPL = None +_SCHEDULER = None + @six.add_metaclass(abc.ABCMeta) class Scheduler(object): @@ -34,6 +44,28 @@ class Scheduler(object): """ raise NotImplementedError + @abc.abstractmethod + def get_scheduled_jobs_count(self, **filters): + """Returns the number of scheduled jobs. + + :param filters: Filters that define what kind of jobs + need to be counted. Permitted values: + * key= - a key set for a job when it was scheduled. + * processing= - if True, return only jobs that are + currently being processed. + """ + raise NotImplementedError + + @abc.abstractmethod + def start(self): + """Starts this scheduler.""" + raise NotImplementedError + + @abc.abstractmethod + def stop(self, graceful=False): + """Stops this scheduler.""" + raise NotImplementedError + class SchedulerJob(object): """Scheduler job. @@ -43,7 +75,7 @@ class SchedulerJob(object): """ def __init__(self, run_after=0, target_factory_func_name=None, func_name=None, func_args=None, - func_arg_serializers=None): + func_arg_serializers=None, key=None): """Initializes a Scheduler Job. :param run_after: Amount of seconds after which to invoke @@ -65,7 +97,7 @@ class SchedulerJob(object): Optional. Serializers must be specified only for those arguments whose values can't be saved into a persistent storage as is and they need to be converted first into a value of a primitive type. - + :param key: A value that can be used to find the job. """ if not func_name: @@ -76,3 +108,33 @@ class SchedulerJob(object): self.func_name = func_name self.func_args = func_args or {} self.func_arg_serializers = func_arg_serializers + self.key = key + + +def get_system_scheduler(): + global _SCHEDULER + + if not _SCHEDULER: + impl = _get_scheduler_implementation() + + _SCHEDULER = impl(CONF.scheduler) + + return _SCHEDULER + + +def destroy_system_scheduler(): + global _SCHEDULER + + _SCHEDULER = None + + +def _get_scheduler_implementation(): + global _SCHEDULER_IMPL + + if not _SCHEDULER_IMPL: + _SCHEDULER_IMPL = driver.DriverManager( + 'mistral.schedulers', + CONF.scheduler_type + ).driver + + return _SCHEDULER_IMPL diff --git a/mistral/scheduler/default_scheduler.py b/mistral/scheduler/default_scheduler.py index 0fd6b30e2..171a5e827 100644 --- a/mistral/scheduler/default_scheduler.py +++ b/mistral/scheduler/default_scheduler.py @@ -37,14 +37,27 @@ CONF = cfg.CONF class DefaultScheduler(base.Scheduler): - def __init__(self, fixed_delay, random_delay, batch_size): - self._fixed_delay = fixed_delay - self._random_delay = random_delay - self._batch_size = batch_size + def __init__(self, conf): + """Initializes a scheduler instance. + + # TODO(rakhmerov): Fix docstring + :param fixed_delay: A fixed part of the delay (in seconds) that + defines how often this scheduler checks the persistent job + store for the new jobs to run. + :param random_delay: A random part of the delay (in seconds) that + defines how often this scheduler checks the persistent job + store for the new jobs to run. + :param batch_size: Defines how many jobs this scheduler can pick + up from the job store at once. + """ + + self._fixed_delay = conf.fixed_delay + self._random_delay = conf.random_delay + self._batch_size = conf.batch_size # Dictionary containing {GreenThread: ScheduledJob} pairs that # represent in-memory jobs. - self.memory_jobs = {} + self.in_memory_jobs = {} self._job_store_checker_thread = threading.Thread( target=self._job_store_checker @@ -116,6 +129,14 @@ class DefaultScheduler(base.Scheduler): self._schedule_in_memory(job.run_after, scheduled_job) + def get_scheduled_jobs_count(self, **filters): + if filters and 'processing' in filters: + processing = filters.pop('processing') + + filters['captured_at'] = {'neq' if processing else 'eq': None} + + return db_api.get_scheduled_jobs_count(**filters) + @classmethod def _persist_job(cls, job): ctx_serializer = context.RpcContextSerializer() @@ -155,7 +176,8 @@ class DefaultScheduler(base.Scheduler): 'func_arg_serializers': arg_serializers, 'auth_ctx': ctx, 'execute_at': execute_at, - 'captured_at': None + 'captured_at': None, + 'key': job.key } return db_api.create_scheduled_job(values) @@ -167,7 +189,7 @@ class DefaultScheduler(base.Scheduler): scheduled_job ) - self.memory_jobs[green_thread] = scheduled_job + self.in_memory_jobs[green_thread] = scheduled_job def _process_memory_job(self, scheduled_job): # 1. Capture the job in Job Store. @@ -191,7 +213,7 @@ class DefaultScheduler(base.Scheduler): # 3.1 What do we do if invocation wasn't successful? # Delete from a local collection of in-memory jobs. - del self.memory_jobs[eventlet.getcurrent()] + del self.in_memory_jobs[eventlet.getcurrent()] @staticmethod def _capture_scheduled_job(scheduled_job): diff --git a/mistral/services/scheduler.py b/mistral/services/legacy_scheduler.py similarity index 91% rename from mistral/services/scheduler.py rename to mistral/services/legacy_scheduler.py index b39a60c07..c7d8bb7f8 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/legacy_scheduler.py @@ -21,7 +21,6 @@ import random import sys import threading -from oslo_config import cfg from oslo_log import log as logging from oslo_utils import importutils @@ -29,19 +28,15 @@ from mistral import context from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api from mistral import exceptions as exc +from mistral.scheduler import base as sched_base from mistral import utils LOG = logging.getLogger(__name__) -CONF = cfg.CONF -# All schedulers. -_schedulers = set() - - -def schedule_call(factory_method_path, target_method_name, - run_after, serializers=None, key=None, **method_args): +def _schedule_call(factory_method_path, target_method_name, + run_after, serializers=None, key=None, **method_args): """Schedules call and lately invokes target_method. Add this call specification to DB, and then after run_after @@ -102,14 +97,27 @@ def schedule_call(factory_method_path, target_method_name, db_api.create_delayed_call(values) -class Scheduler(object): - def __init__(self, fixed_delay, random_delay, batch_size): +class LegacyScheduler(sched_base.Scheduler): + def __init__(self, conf): self._stopped = False self._thread = threading.Thread(target=self._loop) self._thread.daemon = True - self._fixed_delay = fixed_delay - self._random_delay = random_delay - self._batch_size = batch_size + self._fixed_delay = conf.fixed_delay + self._random_delay = conf.random_delay + self._batch_size = conf.batch_size + + def schedule(self, job, allow_redistribute=False): + _schedule_call( + job.target_factory_func_name, + job.func_name, + job.run_after, + serializers=job.func_arg_serializers, + key=None, + **job.func_args + ) + + def get_scheduled_jobs_count(self, **filters): + return db_api.get_delayed_calls_count(**filters) def start(self): self._thread.start() @@ -309,33 +317,3 @@ class Scheduler(object): raise e LOG.debug("Scheduler deleted %s delayed calls.", len(db_calls)) - - -def start(): - sched = Scheduler( - CONF.scheduler.fixed_delay, - CONF.scheduler.random_delay, - CONF.scheduler.batch_size - ) - - _schedulers.add(sched) - - sched.start() - - return sched - - -def stop_scheduler(sched, graceful=False): - if not sched: - return - - sched.stop(graceful) - - _schedulers.remove(sched) - - -def stop_all_schedulers(): - for sched in _schedulers: - sched.stop(graceful=True) - - _schedulers.clear() diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index f26891143..dee98468f 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -20,6 +20,7 @@ from oslo_config import cfg from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral.lang import parser as spec_parser +from mistral.scheduler import base as sched_base from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import states @@ -739,7 +740,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertIn("Task 'task3' not found", str(exception)) - def test_delete_workflow_completion_check_on_stop(self): + def test_delete_workflow_integrity_check_on_stop(self): wf_text = """--- version: '2.0' @@ -753,20 +754,13 @@ class DirectWorkflowEngineTest(base.EngineTestCase): wf_ex = self.engine.start_workflow('wf') - calls = db_api.get_delayed_calls() - - mtd_name = 'mistral.engine.workflow_handler._check_and_fix_integrity' - - self._assert_single_item(calls, target_method_name=mtd_name) - self.engine.stop_workflow(wf_ex.id, state=states.CANCELLED) - self._await( - lambda: - len(db_api.get_delayed_calls(target_method_name=mtd_name)) == 0 - ) + sched = sched_base.get_system_scheduler() - def test_delete_workflow_completion_on_execution_delete(self): + self._await(lambda: sched.get_scheduled_jobs_count() == 0) + + def test_delete_workflow_integrity_check_on_execution_delete(self): wf_text = """--- version: '2.0' @@ -780,18 +774,11 @@ class DirectWorkflowEngineTest(base.EngineTestCase): wf_ex = self.engine.start_workflow('wf') - calls = db_api.get_delayed_calls() - - mtd_name = 'mistral.engine.workflow_handler._check_and_fix_integrity' - - self._assert_single_item(calls, target_method_name=mtd_name) - db_api.delete_workflow_execution(wf_ex.id) - self._await( - lambda: - len(db_api.get_delayed_calls(target_method_name=mtd_name)) == 0 - ) + sched = sched_base.get_system_scheduler() + + self._await(lambda: sched.get_scheduled_jobs_count() == 0) def test_output(self): wf_text = """--- diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 2a21b753d..ca92ee58f 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -150,21 +150,28 @@ class RandomSleepEchoAction(actions_base.Action): class WithItemsEngineTest(base.EngineTestCase): - def _assert_capacity(self, capacity, task_ex): - self.assertEqual( - capacity, - task_ex.runtime_context['with_items']['capacity'] - ) + @staticmethod + def _get_incomplete_action(task_ex_id): + with db_api.transaction(): + task_ex = db_api.get_task_execution(task_ex_id) + + return [e for e in task_ex.executions if not e.accepted][0] @staticmethod - def _get_incomplete_action(task_ex): - return [e for e in task_ex.executions if not e.accepted][0] + def _get_running_actions_count(task_ex_id): + with db_api.transaction(): + task_ex = db_api.get_task_execution(task_ex_id) + + return len( + [e for e in task_ex.executions if e.state == states.RUNNING] + ) @staticmethod - def _get_running_actions_count(task_ex): - return len( - [e for e in task_ex.executions if e.state == states.RUNNING] - ) + def _action_result_equals(action_ex_id, output): + with db_api.transaction(): + a_ex = db_api.get_action_execution(action_ex_id) + + return a_ex.output == output def test_with_items_simple(self): wb_service.create_workbook_v2(WB) @@ -626,51 +633,54 @@ class WithItemsEngineTest(base.EngineTestCase): # Also initialize lazy collections. task_ex = wf_ex.task_executions[0] - self._assert_capacity(0, task_ex) - self.assertEqual(1, self._get_running_actions_count(task_ex)) + self.assertEqual(1, self._get_running_actions_count(task_ex.id)) # 1st iteration complete. + action_ex_id = self._get_incomplete_action(task_ex.id).id + self.engine.on_action_complete( - self._get_incomplete_action(task_ex).id, + action_ex_id, actions_base.Result("John") ) # Wait till the delayed on_action_complete is processed. - # 1 is always there to periodically check WF completion. - self._await(lambda: len(db_api.get_delayed_calls()) == 1) + self._await( + lambda: + self._action_result_equals(action_ex_id, {'result': 'John'}) + ) - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex.id) - - self._assert_capacity(0, task_ex) - self.assertEqual(1, self._get_running_actions_count(task_ex)) + self._await(lambda: self._get_running_actions_count(task_ex.id) == 1) # 2nd iteration complete. + action_ex_id = self._get_incomplete_action(task_ex.id).id + self.engine.on_action_complete( - self._get_incomplete_action(task_ex).id, + action_ex_id, actions_base.Result("Ivan") ) - self._await(lambda: len(db_api.get_delayed_calls()) == 1) + self._await( + lambda: + self._action_result_equals(action_ex_id, {'result': 'Ivan'}) + ) - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex.id) - - self._assert_capacity(0, task_ex) - self.assertEqual(1, self._get_running_actions_count(task_ex)) + self._await(lambda: self._get_running_actions_count(task_ex.id) == 1) # 3rd iteration complete. + action_ex_id = self._get_incomplete_action(task_ex.id).id + self.engine.on_action_complete( - self._get_incomplete_action(task_ex).id, + action_ex_id, actions_base.Result("Mistral") ) - self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1)) + self._await( + lambda: + self._action_result_equals(action_ex_id, {'result': 'Mistral'}) + ) task_ex = db_api.get_task_execution(task_ex.id) - self._assert_capacity(1, task_ex) - self.await_workflow_success(wf_ex.id) # Since we know that we can receive results in random order, @@ -788,59 +798,53 @@ class WithItemsEngineTest(base.EngineTestCase): task_ex = wf_ex.task_executions[0] - running_cnt = self._get_running_actions_count(task_ex) - - self._assert_capacity(0, task_ex) - self.assertEqual(2, running_cnt) + self.assertEqual(2, self._get_running_actions_count(task_ex.id)) # 1st iteration complete. + action_ex_id = self._get_incomplete_action(task_ex.id).id + self.engine.on_action_complete( - self._get_incomplete_action(task_ex).id, + action_ex_id, actions_base.Result("John") ) # Wait till the delayed on_action_complete is processed. - # 1 is always there to periodically check WF completion. - self._await(lambda: len(db_api.get_delayed_calls()) == 1) + self._await( + lambda: + self._action_result_equals(action_ex_id, {'result': 'John'}) + ) - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex.id) - - running_cnt = self._get_running_actions_count(task_ex) - - self._assert_capacity(0, task_ex) - self.assertEqual(2, running_cnt) + self._await(lambda: self._get_running_actions_count(task_ex.id) == 2) # 2nd iteration complete. + action_ex_id = self._get_incomplete_action(task_ex.id).id + self.engine.on_action_complete( - self._get_incomplete_action(task_ex).id, + action_ex_id, actions_base.Result("Ivan") ) - self._await(lambda: len(db_api.get_delayed_calls()) == 1) + self._await( + lambda: + self._action_result_equals(action_ex_id, {'result': 'Ivan'}) + ) - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex.id) - - running_cnt = self._get_running_actions_count(task_ex) - - self._assert_capacity(0, task_ex) - self.assertEqual(2, running_cnt) + self._await(lambda: self._get_running_actions_count(task_ex.id) == 2) # 3rd iteration complete. + action_ex_id = self._get_incomplete_action(task_ex.id).id + self.engine.on_action_complete( - self._get_incomplete_action(task_ex).id, + action_ex_id, actions_base.Result("Mistral") ) - self._await(lambda: len(db_api.get_delayed_calls()) == 1) + self._await( + lambda: + self._action_result_equals(action_ex_id, {'result': 'Mistral'}) + ) - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex.id) - - self._assert_capacity(1, task_ex) - - incomplete_action = self._get_incomplete_action(task_ex) + incomplete_action = self._get_incomplete_action(task_ex.id) # 4th iteration complete. self.engine.on_action_complete( @@ -848,12 +852,15 @@ class WithItemsEngineTest(base.EngineTestCase): actions_base.Result("Hello") ) - self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1)) + self._await( + lambda: self._action_result_equals( + incomplete_action.id, + {'result': 'Hello'} + ) + ) task_ex = db_api.get_task_execution(task_ex.id) - self._assert_capacity(2, task_ex) - self.await_workflow_success(wf_ex.id) # Since we know that we can receive results in random order, @@ -940,27 +947,24 @@ class WithItemsEngineTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) task_ex = wf_ex.task_executions[0] - running_cnt = self._get_running_actions_count(task_ex) - self._assert_capacity(0, task_ex) - self.assertEqual(3, running_cnt) + self.assertEqual(3, self._get_running_actions_count(task_ex.id)) # 1st iteration complete. + action_ex_id = self._get_incomplete_action(task_ex.id).id + self.engine.on_action_complete( - self._get_incomplete_action(task_ex).id, + action_ex_id, actions_base.Result("John") ) # Wait till the delayed on_action_complete is processed. - # 1 is always there to periodically check WF completion. - self._await(lambda: len(db_api.get_delayed_calls()) == 1) + self._await( + lambda: + self._action_result_equals(action_ex_id, {'result': 'John'}) + ) - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex.id) - - self._assert_capacity(1, task_ex) - - incomplete_action = self._get_incomplete_action(task_ex) + incomplete_action = self._get_incomplete_action(task_ex.id) # 2nd iteration complete. self.engine.on_action_complete( @@ -968,14 +972,15 @@ class WithItemsEngineTest(base.EngineTestCase): actions_base.Result("Ivan") ) - self._await(lambda: len(db_api.get_delayed_calls()) == 1) + self._await( + lambda: + self._action_result_equals( + incomplete_action.id, + {'result': 'Ivan'} + ) + ) - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex.id) - - self._assert_capacity(2, task_ex) - - incomplete_action = self._get_incomplete_action(task_ex) + incomplete_action = self._get_incomplete_action(task_ex.id) # 3rd iteration complete. self.engine.on_action_complete( @@ -983,12 +988,16 @@ class WithItemsEngineTest(base.EngineTestCase): actions_base.Result("Mistral") ) - self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1)) + self._await( + lambda: + self._action_result_equals( + incomplete_action.id, + {'result': 'Mistral'} + ) + ) task_ex = db_api.get_task_execution(task_ex.id) - self._assert_capacity(3, task_ex) - self.await_workflow_success(wf_ex.id) with db_api.transaction(): diff --git a/mistral/tests/unit/scheduler/test_scheduler.py b/mistral/tests/unit/scheduler/test_default_scheduler.py similarity index 86% rename from mistral/tests/unit/scheduler/test_scheduler.py rename to mistral/tests/unit/scheduler/test_default_scheduler.py index 11f869a78..42a7b6263 100644 --- a/mistral/tests/unit/scheduler/test_scheduler.py +++ b/mistral/tests/unit/scheduler/test_default_scheduler.py @@ -19,14 +19,19 @@ from eventlet import timeout import datetime import mock +from oslo_config import cfg + from mistral.db.v2 import api as db_api from mistral.scheduler import base as scheduler_base from mistral.scheduler import default_scheduler from mistral.tests.unit import base +CONF = cfg.CONF + + TARGET_METHOD_PATH = ( - 'mistral.tests.unit.scheduler.test_scheduler.target_method' + 'mistral.tests.unit.scheduler.test_default_scheduler.target_method' ) @@ -34,9 +39,9 @@ def target_method(): pass -class SchedulerTest(base.DbTestCase): +class DefaultSchedulerTest(base.DbTestCase): def setUp(self): - super(SchedulerTest, self).setUp() + super(DefaultSchedulerTest, self).setUp() # This Timeout object is needed to raise an exception if the test took # longer than a configured number of seconds. @@ -49,7 +54,11 @@ class SchedulerTest(base.DbTestCase): self.target_mtd_finished = event.Event() self.target_mtd_lock = semaphore.Semaphore(0) - self.scheduler = default_scheduler.DefaultScheduler(1, 1, 100) + self.override_config('fixed_delay', 1, 'scheduler') + self.override_config('random_delay', 1, 'scheduler') + self.override_config('batch_size', 100, 'scheduler') + + self.scheduler = default_scheduler.DefaultScheduler(CONF.scheduler) self.scheduler.start() self.addCleanup(self.scheduler.stop, True) @@ -75,6 +84,7 @@ class SchedulerTest(base.DbTestCase): @mock.patch(TARGET_METHOD_PATH) def test_schedule_called_once(self, method): + # Delegate from the module function to the method of the test class. method.side_effect = self.target_method job = scheduler_base.SchedulerJob( @@ -110,6 +120,7 @@ class SchedulerTest(base.DbTestCase): @mock.patch(TARGET_METHOD_PATH) def test_pickup_from_job_store(self, method): + # Delegate from the module function to the method of the test class. method.side_effect = self.target_method self.override_config('pickup_job_after', 1, 'scheduler') @@ -126,7 +137,7 @@ class SchedulerTest(base.DbTestCase): 'auth_ctx': {} }) - self.assertTrue(len(db_api.get_scheduled_jobs()) > 0) + self.assertEqual(1, len(db_api.get_scheduled_jobs())) self._unlock_target_method() self._wait_target_method_end() @@ -138,6 +149,7 @@ class SchedulerTest(base.DbTestCase): @mock.patch(TARGET_METHOD_PATH) def test_recapture_job(self, method): + # Delegate from the module function to the method of the test class. method.side_effect = self.target_method self.override_config('pickup_job_after', 1, 'scheduler') @@ -161,7 +173,7 @@ class SchedulerTest(base.DbTestCase): 'auth_ctx': {} }) - self.assertTrue(len(db_api.get_scheduled_jobs()) > 0) + self.assertEqual(1, len(db_api.get_scheduled_jobs())) self._unlock_target_method() self._wait_target_method_end() diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_legacy_scheduler.py similarity index 72% rename from mistral/tests/unit/services/test_scheduler.py rename to mistral/tests/unit/services/test_legacy_scheduler.py index 250f2459a..1b871c981 100644 --- a/mistral/tests/unit/services/test_scheduler.py +++ b/mistral/tests/unit/services/test_legacy_scheduler.py @@ -19,15 +19,21 @@ import mock from eventlet import queue from eventlet import timeout +from oslo_config import cfg + from mistral import context as auth_context from mistral.db.v2 import api as db_api from mistral import exceptions as exc -from mistral.services import scheduler +from mistral.scheduler import base as sched_base +from mistral.services import legacy_scheduler from mistral.tests.unit import base from mistral_lib import actions as ml_actions + +CONF = cfg.CONF + TARGET_METHOD_PATH = ( - 'mistral.tests.unit.services.test_scheduler.target_method' + 'mistral.tests.unit.services.test_legacy_scheduler.target_method' ) DELAY = 1.5 @@ -40,15 +46,18 @@ def target_method(): pass -class SchedulerServiceTest(base.DbTestCase): - +class LegacySchedulerTest(base.DbTestCase): def setUp(self): - super(SchedulerServiceTest, self).setUp() + super(LegacySchedulerTest, self).setUp() self.timeout = timeout.Timeout(seconds=10) self.queue = queue.Queue() - self.scheduler = scheduler.Scheduler(0, 1, None) + self.override_config('fixed_delay', 1, 'scheduler') + self.override_config('random_delay', 0, 'scheduler') + self.override_config('batch_size', 100, 'scheduler') + + self.scheduler = legacy_scheduler.LegacyScheduler(CONF.scheduler) self.scheduler.start() self.addCleanup(self.scheduler.stop, True) @@ -64,6 +73,7 @@ class SchedulerServiceTest(base.DbTestCase): @mock.patch(TARGET_METHOD_PATH) def test_scheduler_with_factory(self, factory): target_method_name = 'run_something' + factory.return_value = type( 'something', (object,), @@ -73,13 +83,15 @@ class SchedulerServiceTest(base.DbTestCase): } ) - scheduler.schedule_call( - TARGET_METHOD_PATH, - target_method_name, - DELAY, - **{'name': 'task', 'id': '123'} + job = sched_base.SchedulerJob( + run_after=DELAY, + target_factory_func_name=TARGET_METHOD_PATH, + func_name=target_method_name, + func_args={'name': 'task', 'id': '123'} ) + self.scheduler.schedule(job) + calls = db_api.get_delayed_calls_to_start(get_time_delay()) call = self._assert_single_item( calls, @@ -97,13 +109,14 @@ class SchedulerServiceTest(base.DbTestCase): def test_scheduler_without_factory(self, method): method.side_effect = self.target_method - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - **{'name': 'task', 'id': '321'} + job = sched_base.SchedulerJob( + run_after=DELAY, + func_name=TARGET_METHOD_PATH, + func_args={'name': 'task', 'id': '321'} ) + self.scheduler.schedule(job) + calls = db_api.get_delayed_calls_to_start(get_time_delay()) call = self._assert_single_item( calls, @@ -123,29 +136,29 @@ class SchedulerServiceTest(base.DbTestCase): default_context = base.get_context(default=True) auth_context.set_ctx(default_context) - default_project_id = ( - default_context.project_id + + default_project_id = default_context.project_id + + job = sched_base.SchedulerJob( + run_after=DELAY, + func_name=TARGET_METHOD_PATH, + func_args={'expected_project_id': default_project_id} ) - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - **{'expected_project_id': default_project_id} - ) + self.scheduler.schedule(job) second_context = base.get_context(default=False) auth_context.set_ctx(second_context) - second_project_id = ( - second_context.project_id + + second_project_id = second_context.project_id + + job = sched_base.SchedulerJob( + run_after=DELAY, + func_name=TARGET_METHOD_PATH, + func_args={'expected_project_id': second_project_id} ) - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - **{'expected_project_id': second_project_id} - ) + self.scheduler.schedule(job) self.assertNotEqual(default_project_id, second_project_id) @@ -155,6 +168,7 @@ class SchedulerServiceTest(base.DbTestCase): @mock.patch(TARGET_METHOD_PATH) def test_scheduler_with_serializer(self, factory): target_method_name = 'run_something' + factory.return_value = type( 'something', (object,), @@ -176,14 +190,16 @@ class SchedulerServiceTest(base.DbTestCase): 'result': 'mistral.workflow.utils.ResultSerializer' } - scheduler.schedule_call( - TARGET_METHOD_PATH, - target_method_name, - DELAY, - serializers=serializers, - **method_args + job = sched_base.SchedulerJob( + run_after=DELAY, + target_factory_func_name=TARGET_METHOD_PATH, + func_name=target_method_name, + func_args=method_args, + func_arg_serializers=serializers ) + self.scheduler.schedule(job) + calls = db_api.get_delayed_calls_to_start(get_time_delay()) call = self._assert_single_item( calls, @@ -206,17 +222,19 @@ class SchedulerServiceTest(base.DbTestCase): def test_scheduler_multi_instance(self, method): method.side_effect = self.target_method - second_scheduler = scheduler.Scheduler(1, 1, None) + second_scheduler = legacy_scheduler.LegacyScheduler(CONF.scheduler) second_scheduler.start() + self.addCleanup(second_scheduler.stop, True) - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - **{'name': 'task', 'id': '321'} + job = sched_base.SchedulerJob( + run_after=DELAY, + func_name=TARGET_METHOD_PATH, + func_args={'name': 'task', 'id': '321'}, ) + second_scheduler.schedule(job) + calls = db_api.get_delayed_calls_to_start(get_time_delay()) self._assert_single_item(calls, target_method_name=TARGET_METHOD_PATH) @@ -230,13 +248,14 @@ class SchedulerServiceTest(base.DbTestCase): def test_scheduler_delete_calls(self, method): method.side_effect = self.target_method - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - **{'name': 'task', 'id': '321'} + job = sched_base.SchedulerJob( + run_after=DELAY, + func_name=TARGET_METHOD_PATH, + func_args={'name': 'task', 'id': '321'}, ) + self.scheduler.schedule(job) + calls = db_api.get_delayed_calls_to_start(get_time_delay()) self._assert_single_item(calls, target_method_name=TARGET_METHOD_PATH) @@ -278,20 +297,23 @@ class SchedulerServiceTest(base.DbTestCase): update_delayed_call): def update_call_failed(id, values, query_filter): self.queue.put("item") + return None, 0 update_delayed_call.side_effect = update_call_failed - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - **{'name': 'task', 'id': '321'} + job = sched_base.SchedulerJob( + run_after=DELAY, + func_name=TARGET_METHOD_PATH, + func_args={'name': 'task', 'id': '321'}, ) + self.scheduler.schedule(job) + calls = db_api.get_delayed_calls_to_start(get_time_delay()) self.queue.get() + eventlet.sleep(1) update_delayed_call.assert_called_with( @@ -299,6 +321,7 @@ class SchedulerServiceTest(base.DbTestCase): values=mock.ANY, query_filter=mock.ANY ) + # If the scheduler does handel calls that failed on update # DBEntityNotFoundException will raise. db_api.get_delayed_call(calls[0].id) @@ -309,7 +332,8 @@ class SchedulerServiceTest(base.DbTestCase): number_delayed_calls = 5 processed_calls_at_time = [] - real_delete_calls_method = scheduler.Scheduler.delete_calls + real_delete_calls_method = \ + legacy_scheduler.LegacyScheduler.delete_calls @staticmethod def delete_calls_counter(delayed_calls): @@ -317,21 +341,25 @@ class SchedulerServiceTest(base.DbTestCase): for _ in range(len(delayed_calls)): self.queue.put("item") + processed_calls_at_time.append(len(delayed_calls)) - scheduler.Scheduler.delete_calls = delete_calls_counter + legacy_scheduler.LegacyScheduler.delete_calls = delete_calls_counter - # Create 5 delayed calls + # Create 5 delayed calls. for i in range(number_delayed_calls): - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - 0, - **{'name': 'task', 'id': i} + job = sched_base.SchedulerJob( + run_after=DELAY, + func_name=TARGET_METHOD_PATH, + func_args={'name': 'task', 'id': i}, ) - # Start scheduler which process 2 calls at a time - self.scheduler = scheduler.Scheduler(0, 1, 2) + self.scheduler.schedule(job) + + # Start scheduler which process 2 calls at a time. + self.override_config('batch_size', 2, 'scheduler') + + self.scheduler = legacy_scheduler.LegacyScheduler(CONF.scheduler) self.scheduler.start() # Wait when all of calls will be processed diff --git a/mistral/tests/unit/test_launcher.py b/mistral/tests/unit/test_launcher.py index 0d9711761..607af4332 100644 --- a/mistral/tests/unit/test_launcher.py +++ b/mistral/tests/unit/test_launcher.py @@ -17,17 +17,18 @@ import eventlet from mistral.api import service as api_service from mistral.cmd import launch +from mistral.scheduler import base as sched_base from mistral.tests.unit import base class ServiceLauncherTest(base.DbTestCase): - def setUp(self): super(ServiceLauncherTest, self).setUp() self.override_config('enabled', False, group='cron_trigger') launch.reset_server_managers() + sched_base.destroy_system_scheduler() def test_launch_all(self): eventlet.spawn(launch.launch_any, launch.LAUNCH_OPTIONS.keys()) diff --git a/setup.cfg b/setup.cfg index 55da2cf36..386028c8f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -114,3 +114,7 @@ mistral.js.implementation = pyv8 = mistral.utils.javascript:PyV8Evaluator v8eval = mistral.utils.javascript:V8EvalEvaluator py_mini_racer = mistral.utils.javascript:PyMiniRacerEvaluator + +mistral.schedulers = + legacy = mistral.services.legacy_scheduler:LegacyScheduler + default = mistral.scheduler.default_scheduler:DefaultScheduler