diff --git a/mistral/context.py b/mistral/context.py index 9235811f3..4887f15f9 100644 --- a/mistral/context.py +++ b/mistral/context.py @@ -15,6 +15,7 @@ import base64 +from mistral_lib.actions import context as lib_ctx from oslo_config import cfg from oslo_context import context as oslo_context import oslo_messaging as messaging @@ -254,3 +255,27 @@ class ContextHook(hooks.PecanHook): def after(self, state): set_ctx(None) + + +def create_action_context(execution_ctx): + + context = ctx() + + security_ctx = lib_ctx.SecurityContext( + auth_cacert=context.auth_cacert, + auth_token=context.auth_token, + auth_uri=context.auth_uri, + expires_at=context.expires_at, + insecure=context.insecure, + is_target=context.is_target, + is_trust_scoped=context.is_trust_scoped, + project_id=context.project_id, + redelivered=context.redelivered, + region_name=context.region_name, + service_catalog=context.service_catalog, + trust_id=context.trust_id, + ) + + ex_ctx = lib_ctx.ExecutionContext(**execution_ctx) + + return lib_ctx.ActionContext(security_ctx, ex_ctx) diff --git a/mistral/engine/action_queue.py b/mistral/engine/action_queue.py index c6b586683..3806ec567 100644 --- a/mistral/engine/action_queue.py +++ b/mistral/engine/action_queue.py @@ -57,7 +57,7 @@ def _process_queue(queue): for operation, args in queue: if operation == _RUN_ACTION: - action_ex, action_def, target = args + action_ex, action_def, target, execution_context = args executor.run_action( action_ex.id, @@ -65,7 +65,8 @@ def _process_queue(queue): action_def.attributes or {}, action_ex.input, action_ex.runtime_context.get('safe_rerun', False), - target=target + execution_context, + target=target, ) elif operation == _ON_ACTION_COMPLETE: action_ex_id, result, wf_action = args @@ -119,8 +120,9 @@ def process(func): return decorate -def schedule_run_action(action_ex, action_def, target): - _get_queue().append((_RUN_ACTION, (action_ex, action_def, target))) +def schedule_run_action(action_ex, action_def, target, execution_context): + args = (action_ex, action_def, target, execution_context) + _get_queue().append((_RUN_ACTION, args)) def schedule_on_action_complete(action_ex_id, result, wf_action=False): diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 469e66d20..067ad944a 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -248,10 +248,13 @@ class PythonAction(Action): action_ex_id=action_ex_id ) + execution_context = self._prepare_execution_context() + action_queue.schedule_run_action( self.action_ex, self.action_def, - target + target, + execution_context, ) @profiler.trace('action-run', hide_args=True) @@ -280,12 +283,15 @@ class PythonAction(Action): executor = exe.get_executor(cfg.CONF.executor.type) + execution_context = self._prepare_execution_context() + result = executor.run_action( self.action_ex.id if self.action_ex else None, self.action_def.action_class, self.action_def.attributes or {}, input_dict, - safe_rerun=safe_rerun, + safe_rerun, + execution_context, target=target, async_=False ) @@ -317,6 +323,23 @@ class PythonAction(Action): self.action_def.action_class ) + def _prepare_execution_context(self): + + exc_ctx = {} + + if self.task_ex: + wf_ex = self.task_ex.workflow_execution + exc_ctx['workflow_execution_id'] = wf_ex.id + exc_ctx['task_id'] = self.task_ex.id + exc_ctx['workflow_name'] = wf_ex.name + + if self.action_ex: + exc_ctx['action_execution_id'] = self.action_ex.id + callback_url = '/v2/action_executions/%s' % self.action_ex.id + exc_ctx['callback_url'] = callback_url + + return exc_ctx + def _prepare_input(self, input_dict): """Template method to do manipulations with input parameters. diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 18a85b3c3..18b7a6d88 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -83,7 +83,7 @@ class DefaultEngine(base.Engine): if not save: # Action execution is not created but we need to return similar - # object to a client anyway. + # object to the client anyway. return db_models.ActionExecution( name=action_name, description=description, diff --git a/mistral/executors/base.py b/mistral/executors/base.py index 9789a8d46..5941824c0 100644 --- a/mistral/executors/base.py +++ b/mistral/executors/base.py @@ -49,7 +49,7 @@ class Executor(object): @abc.abstractmethod def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, - params, safe_rerun, redelivered=False, + params, safe_rerun, execution_context, redelivered=False, target=None, async_=True): """Runs action. @@ -59,6 +59,8 @@ class Executor(object): will be set to. :param params: Action parameters. :param safe_rerun: Tells if given action can be safely rerun. + :param execution_context: A dict of values providing information about + the current execution. :param redelivered: Tells if given action was run before on another executor. :param target: Target (group of action executors). diff --git a/mistral/executors/default_executor.py b/mistral/executors/default_executor.py index 8b39f219c..c1fdebe04 100644 --- a/mistral/executors/default_executor.py +++ b/mistral/executors/default_executor.py @@ -13,11 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from mistral_lib import actions as mistral_lib from oslo_log import log as logging from osprofiler import profiler -from mistral_lib import actions as mistral_lib - from mistral.actions import action_factory as a_f from mistral import context from mistral import exceptions as exc @@ -35,7 +34,7 @@ class DefaultExecutor(base.Executor): @profiler.trace('default-executor-run-action', hide_args=True) def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, - params, safe_rerun, redelivered=False, + params, safe_rerun, execution_context, redelivered=False, target=None, async_=True): """Runs action. @@ -45,6 +44,8 @@ class DefaultExecutor(base.Executor): will be set to. :param params: Action parameters. :param safe_rerun: Tells if given action can be safely rerun. + :param execution_context: A dict of values providing information about + the current execution. :param redelivered: Tells if given action was run before on another executor. :param target: Target (group of action executors). @@ -103,10 +104,10 @@ class DefaultExecutor(base.Executor): try: # NOTE(d0ugal): If the action is a subclass of mistral-lib we know - # that it expects to be passed the context. We should deprecate - # the builtin action class in Mistral. + # that it expects to be passed the context. if isinstance(action, mistral_lib.Action): - result = action.run(context.ctx()) + action_ctx = context.create_action_context(execution_context) + result = action.run(action_ctx) else: result = action.run() diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py index 6fde1ecbc..a0dfdf4fd 100644 --- a/mistral/executors/executor_server.py +++ b/mistral/executors/executor_server.py @@ -60,7 +60,7 @@ class ExecutorServer(service_base.MistralService): self._rpc_server.stop(graceful) def run_action(self, rpc_ctx, action_ex_id, action_cls_str, - action_cls_attrs, params, safe_rerun): + action_cls_attrs, params, safe_rerun, execution_context): """Receives calls over RPC to run action on executor. :param rpc_ctx: RPC request context dictionary. @@ -89,6 +89,7 @@ class ExecutorServer(service_base.MistralService): action_cls_attrs, params, safe_rerun, + execution_context, redelivered ) diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index 89cf573c0..b3fdc9f19 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -299,7 +299,7 @@ class ExecutorClient(exe.Executor): @profiler.trace('executor-client-run-action') def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, - params, safe_rerun, redelivered=False, + params, safe_rerun, execution_context, redelivered=False, target=None, async_=True): """Sends a request to run action to executor. @@ -322,7 +322,8 @@ class ExecutorClient(exe.Executor): 'action_cls_str': action_cls_str, 'action_cls_attrs': action_cls_attrs, 'params': params, - 'safe_rerun': safe_rerun + 'safe_rerun': safe_rerun, + 'execution_context': execution_context, } rpc_client_method = (self._client.async_call diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index f4bb1640b..ee177d3b9 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -79,7 +79,8 @@ workflows: def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs, - params, safe_rerun, target=None, async_=True): + params, safe_rerun, execution_context, target=None, + async_=True): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor() @@ -89,7 +90,8 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs, action_cls_str, action_cls_attrs, params, - safe_rerun + safe_rerun, + execution_context=execution_context ) @@ -171,12 +173,21 @@ class EnvironmentTest(base.EngineTestCase): for t_ex in wf1_task_execs: a_ex = t_ex.action_executions[0] + callback_url = '/v2/action_executions/%s' % a_ex.id + r_exe.RemoteExecutor.run_action.assert_any_call( a_ex.id, 'mistral.actions.std_actions.EchoAction', {}, a_ex.input, False, + { + 'task_id': t_ex.id, + 'callback_url': callback_url, + 'workflow_execution_id': wf1_ex.id, + 'workflow_name': wf1_ex.name, + 'action_execution_id': a_ex.id, + }, target=TARGET ) diff --git a/mistral/tests/unit/engine/test_safe_rerun.py b/mistral/tests/unit/engine/test_safe_rerun.py index 57baa5b67..58d77e4dd 100644 --- a/mistral/tests/unit/engine/test_safe_rerun.py +++ b/mistral/tests/unit/engine/test_safe_rerun.py @@ -25,7 +25,8 @@ from mistral.workflow import states def _run_at_target(action_ex_id, action_class_str, attributes, - action_params, safe_rerun, target=None, async_=True): + action_params, safe_rerun, execution_context, target=None, + async_=True): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor() @@ -35,6 +36,7 @@ def _run_at_target(action_ex_id, action_class_str, attributes, attributes, action_params, safe_rerun, + execution_context, redelivered=True )