diff --git a/mistral/engine/base.py b/mistral/engine/base.py index 719c43de3..9130d20dc 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -134,7 +134,7 @@ class Engine(object): raise NotImplementedError @abc.abstractmethod - def report_running_actions(self, action_ex_ids): + def process_action_heartbeats(self, action_ex_ids): """Receives the heartbeat about the running actions. :param action_ex_ids: The action execution ids. diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 242898a4b..951f95051 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -223,14 +223,17 @@ class DefaultEngine(base.Engine): @db_utils.retry_on_db_error @post_tx_queue.run - def report_running_actions(self, action_ex_ids): + def process_action_heartbeats(self, action_ex_ids): with db_api.transaction(): for exec_id in action_ex_ids: try: db_api.update_action_execution_heartbeat(exec_id) except exceptions.DBEntityNotFoundError: - LOG.debug("Action execution heartbeat update failed. {}" - .format(exec_id), exc_info=True) + LOG.debug( + "Action execution heartbeat update failed. {}" + .format(exec_id), + exc_info=True + ) # Ignore this error and continue with the # remaining ids. pass diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py index f2890800d..5a5b32b56 100644 --- a/mistral/engine/engine_server.py +++ b/mistral/engine/engine_server.py @@ -20,7 +20,8 @@ from mistral.engine import default_engine from mistral.rpc import base as rpc from mistral.scheduler import base as sched_base from mistral.service import base as service_base -from mistral.services import action_execution_checker +from mistral.services import action_heartbeat_checker +from mistral.services import action_heartbeat_sender from mistral.services import expiration_policy from mistral.utils import profiler as profiler_utils from mistral_lib import utils @@ -54,7 +55,17 @@ class EngineServer(service_base.MistralService): self._expiration_policy_tg = expiration_policy.setup() - action_execution_checker.start() + action_heartbeat_checker.start() + + # If the current engine instance uses a local action executor + # then we also need to initialize a heartbeat reporter for it. + # Heartbeats will be sent to the engine tier in the same way as + # with a remote executor. So if the current cluster node crashes + # in the middle of executing an action then one of the remaining + # engine instances will expire the action in a configured period + # of time. + if cfg.CONF.executor.type == 'local': + action_heartbeat_sender.start() if self._setup_profiler: profiler_utils.setup('mistral-engine', cfg.CONF.engine.host) @@ -71,7 +82,10 @@ class EngineServer(service_base.MistralService): def stop(self, graceful=False): super(EngineServer, self).stop(graceful) - action_execution_checker.stop(graceful) + action_heartbeat_checker.stop(graceful) + + if cfg.CONF.executor.type == 'local': + action_heartbeat_sender.stop(graceful) if self._scheduler: self._scheduler.stop(graceful) @@ -275,7 +289,7 @@ class EngineServer(service_base.MistralService): action_ex_ids ) - return self.engine.report_running_actions(action_ex_ids) + return self.engine.process_action_heartbeats(action_ex_ids) def get_oslo_service(setup_profiler=True): diff --git a/mistral/executors/default_executor.py b/mistral/executors/default_executor.py index 2de8dc874..2aa06f04b 100644 --- a/mistral/executors/default_executor.py +++ b/mistral/executors/default_executor.py @@ -23,9 +23,9 @@ from mistral import context from mistral import exceptions as exc from mistral.executors import base from mistral.rpc import clients as rpc +from mistral.services import action_heartbeat_sender from mistral.utils import inspect_utils as i_u - LOG = logging.getLogger(__name__) @@ -57,6 +57,25 @@ class DefaultExecutor(base.Executor): :return: Action result. """ + try: + action_heartbeat_sender.add_action(action_ex_id) + + return self._do_run_action( + action_cls_attrs, + action_cls_str, + action_ex_id, + execution_context, + params, + redelivered, + safe_rerun, + timeout + ) + finally: + action_heartbeat_sender.remove_action(action_ex_id) + + def _do_run_action(self, action_cls_attrs, action_cls_str, action_ex_id, + execution_context, params, redelivered, safe_rerun, + timeout): def send_error_back(error_msg): error_result = mistral_lib.Result(error=error_msg) diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py index 9374e6be6..962e290cc 100644 --- a/mistral/executors/executor_server.py +++ b/mistral/executors/executor_server.py @@ -18,7 +18,7 @@ from mistral import config as cfg from mistral.executors import default_executor as exe from mistral.rpc import base as rpc from mistral.service import base as service_base -from mistral.services import action_execution_reporter +from mistral.services import action_heartbeat_sender from mistral.utils import profiler as profiler_utils from mistral_lib import utils @@ -43,7 +43,7 @@ class ExecutorServer(service_base.MistralService): def start(self): super(ExecutorServer, self).start() - action_execution_reporter.start() + action_heartbeat_sender.start() if self._setup_profiler: profiler_utils.setup('mistral-executor', cfg.CONF.executor.host) @@ -60,7 +60,7 @@ class ExecutorServer(service_base.MistralService): def stop(self, graceful=False): super(ExecutorServer, self).stop(graceful) - action_execution_reporter.stop() + action_heartbeat_sender.stop() if self._rpc_server: self._rpc_server.stop(graceful) @@ -96,30 +96,25 @@ class ExecutorServer(service_base.MistralService): redelivered = rpc_ctx.redelivered or False - try: - action_execution_reporter.add_action_ex_id(action_ex_id) + res = self.executor.run_action( + action_ex_id, + action_cls_str, + action_cls_attrs, + params, + safe_rerun, + execution_context, + redelivered, + timeout=timeout + ) - res = self.executor.run_action( - action_ex_id, - action_cls_str, - action_cls_attrs, - params, - safe_rerun, - execution_context, - redelivered, - timeout=timeout - ) + LOG.debug( + "Sending action result to engine" + " [action_ex_id=%s, action_cls=%s]", + action_ex_id, + action_cls_str + ) - LOG.debug( - "Sending action result to engine" - " [action_ex_id=%s, action_cls=%s]", - action_ex_id, - action_cls_str - ) - - return res - finally: - action_execution_reporter.remove_action_ex_id(action_ex_id) + return res def get_oslo_service(setup_profiler=True): diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index 0fbb08ccc..c9674e2f1 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -328,7 +328,7 @@ class EngineClient(eng.Engine): ) @base.wrap_messaging_exception - def report_running_actions(self, action_ex_ids): + def process_action_heartbeats(self, action_ex_ids): """Receives action execution heartbeats. :param action_ex_ids: Action execution ids. diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_heartbeat_checker.py similarity index 100% rename from mistral/services/action_execution_checker.py rename to mistral/services/action_heartbeat_checker.py diff --git a/mistral/services/action_execution_reporter.py b/mistral/services/action_heartbeat_sender.py similarity index 86% rename from mistral/services/action_execution_reporter.py rename to mistral/services/action_heartbeat_sender.py index 441f5df79..693a11136 100644 --- a/mistral/services/action_execution_reporter.py +++ b/mistral/services/action_heartbeat_sender.py @@ -32,32 +32,32 @@ _stopped = True _running_actions = set() -def add_action_ex_id(action_ex_id): +def add_action(action_ex_id): global _enabled # With run-action there is no actions_ex_id assigned. if action_ex_id and _enabled: - rpc.get_engine_client().report_running_actions([action_ex_id]) + rpc.get_engine_client().process_action_heartbeats([action_ex_id]) _running_actions.add(action_ex_id) -def remove_action_ex_id(action_ex_id): +def remove_action(action_ex_id): global _enabled if action_ex_id and _enabled: _running_actions.discard(action_ex_id) -def report_running_actions(): - LOG.debug("Running heartbeat reporter...") +def send_action_heartbeats(): + LOG.debug('Running heartbeat reporter...') global _running_actions if not _running_actions: return - rpc.get_engine_client().report_running_actions(_running_actions) + rpc.get_engine_client().process_action_heartbeats(_running_actions) def _loop(): @@ -76,10 +76,10 @@ def _loop(): while not _stopped: try: - report_running_actions() + send_action_heartbeats() except Exception: LOG.exception( - 'Action execution reporter iteration failed' + 'Action heartbeat sender iteration failed' ' due to an unexpected exception.' ) diff --git a/mistral/tests/unit/engine/test_action_heartbeat.py b/mistral/tests/unit/engine/test_action_heartbeat_checker.py similarity index 97% rename from mistral/tests/unit/engine/test_action_heartbeat.py rename to mistral/tests/unit/engine/test_action_heartbeat_checker.py index 8bc9a84cc..cf5316205 100644 --- a/mistral/tests/unit/engine/test_action_heartbeat.py +++ b/mistral/tests/unit/engine/test_action_heartbeat_checker.py @@ -27,14 +27,14 @@ from mistral.workflow import states cfg.CONF.set_default('auth_enable', False, group='pecan') -class ActionHeartbeatTest(base.EngineTestCase): +class ActionHeartbeatCheckerTest(base.EngineTestCase): def setUp(self): # We need to override configuration values before starting engine. self.override_config('check_interval', 1, 'action_heartbeat') self.override_config('max_missed_heartbeats', 1, 'action_heartbeat') self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat') - super(ActionHeartbeatTest, self).setUp() + super(ActionHeartbeatCheckerTest, self).setUp() # Make sure actions are not sent to an executor. @mock.patch.object( diff --git a/mistral/tests/unit/engine/test_action_heartbeat_sender.py b/mistral/tests/unit/engine/test_action_heartbeat_sender.py new file mode 100644 index 000000000..56d316f88 --- /dev/null +++ b/mistral/tests/unit/engine/test_action_heartbeat_sender.py @@ -0,0 +1,148 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from oslo_config import cfg + +from mistral.db.v2 import api as db_api +from mistral.rpc import clients as rpc_clients +from mistral.services import workflows as wf_service +from mistral.tests.unit.engine import base +from mistral.workflow import states + + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +class ActionHeartbeatSenderBaseTest(base.EngineTestCase): + def setUp(self): + # We need to set all required configuration values before starting + # an engine and an executor. + self.get_configuration() + + super(ActionHeartbeatSenderBaseTest, self).setUp() + + def get_configuration(self): + # We need to override configuration values before starting engine. + # Subclasses can override this method and add/change their own + # config options. + self.override_config('check_interval', 1, 'action_heartbeat') + self.override_config('max_missed_heartbeats', 1, 'action_heartbeat') + self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat') + + def _do_long_action_success_test(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: std.sleep seconds=4 + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + t_execs = wf_ex.task_executions + + t_ex = self._assert_single_item( + t_execs, + name='task1', + state=states.SUCCESS + ) + + a_execs = db_api.get_action_executions(task_execution_id=t_ex.id) + + self._assert_single_item( + a_execs, + name='std.sleep', + state=states.SUCCESS + ) + + # Disable the ability to send action heartbeats. + @mock.patch.object( + rpc_clients.EngineClient, + 'process_action_heartbeats', + mock.MagicMock() + ) + def _do_long_action_failure_test_with_disabled_sender(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: std.sleep seconds=4 + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + t_execs = wf_ex.task_executions + + t_ex = self._assert_single_item( + t_execs, + name='task1', + state=states.ERROR + ) + + a_execs = db_api.get_action_executions(task_execution_id=t_ex.id) + + self._assert_single_item( + a_execs, + name='std.sleep', + state=states.ERROR + ) + + +class ActionHeartbeatSenderLocalExecutorTest(ActionHeartbeatSenderBaseTest): + def get_configuration(self): + super(ActionHeartbeatSenderLocalExecutorTest, self).get_configuration() + + self.override_config('type', 'local', 'executor') + + def test_long_action_success(self): + self._do_long_action_success_test() + + def test_long_action_failure_with_disabled_sender(self): + self._do_long_action_failure_test_with_disabled_sender() + + +class ActionHeartbeatSenderRemoteExecutorTest(ActionHeartbeatSenderBaseTest): + def get_configuration(self): + super( + ActionHeartbeatSenderRemoteExecutorTest, + self + ).get_configuration() + + self.override_config('type', 'remote', 'executor') + + def test_long_action_success(self): + self._do_long_action_success_test() + + def test_long_action_failure_with_disabled_sender(self): + self._do_long_action_failure_test_with_disabled_sender() diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index c688e7a81..6522907c3 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -656,9 +656,9 @@ class DefaultEngineTest(base.DbTestCase): task_action_ex = action_execs[0] - self.engine.report_running_actions([]) - self.engine.report_running_actions([None, None]) - self.engine.report_running_actions([None, task_action_ex.id]) + self.engine.process_action_heartbeats([]) + self.engine.process_action_heartbeats([None, None]) + self.engine.process_action_heartbeats([None, task_action_ex.id]) task_action_ex = db_api.get_action_execution(task_action_ex.id)