Pass the new ActionContext to mistral-lib

Partial-Bug: #1718353
Depends-On: I6057d0ce3fe4ae23468be8fb06cb85dc5f467f6b
Change-Id: Ife653558bfcda794e7f37086832f70b0ad7c28a4
This commit is contained in:
Dougal Matthews 2017-09-21 13:41:25 +01:00
parent 4820523e7c
commit dd4a4bd440
10 changed files with 88 additions and 20 deletions

View File

@ -15,6 +15,7 @@
import base64
from mistral_lib.actions import context as lib_ctx
from oslo_config import cfg
from oslo_context import context as oslo_context
import oslo_messaging as messaging
@ -254,3 +255,27 @@ class ContextHook(hooks.PecanHook):
def after(self, state):
set_ctx(None)
def create_action_context(execution_ctx):
context = ctx()
security_ctx = lib_ctx.SecurityContext(
auth_cacert=context.auth_cacert,
auth_token=context.auth_token,
auth_uri=context.auth_uri,
expires_at=context.expires_at,
insecure=context.insecure,
is_target=context.is_target,
is_trust_scoped=context.is_trust_scoped,
project_id=context.project_id,
redelivered=context.redelivered,
region_name=context.region_name,
service_catalog=context.service_catalog,
trust_id=context.trust_id,
)
ex_ctx = lib_ctx.ExecutionContext(**execution_ctx)
return lib_ctx.ActionContext(security_ctx, ex_ctx)

View File

@ -57,7 +57,7 @@ def _process_queue(queue):
for operation, args in queue:
if operation == _RUN_ACTION:
action_ex, action_def, target = args
action_ex, action_def, target, execution_context = args
executor.run_action(
action_ex.id,
@ -65,7 +65,8 @@ def _process_queue(queue):
action_def.attributes or {},
action_ex.input,
action_ex.runtime_context.get('safe_rerun', False),
target=target
execution_context,
target=target,
)
elif operation == _ON_ACTION_COMPLETE:
action_ex_id, result, wf_action = args
@ -119,8 +120,9 @@ def process(func):
return decorate
def schedule_run_action(action_ex, action_def, target):
_get_queue().append((_RUN_ACTION, (action_ex, action_def, target)))
def schedule_run_action(action_ex, action_def, target, execution_context):
args = (action_ex, action_def, target, execution_context)
_get_queue().append((_RUN_ACTION, args))
def schedule_on_action_complete(action_ex_id, result, wf_action=False):

View File

@ -248,10 +248,13 @@ class PythonAction(Action):
action_ex_id=action_ex_id
)
execution_context = self._prepare_execution_context()
action_queue.schedule_run_action(
self.action_ex,
self.action_def,
target
target,
execution_context,
)
@profiler.trace('action-run', hide_args=True)
@ -280,12 +283,15 @@ class PythonAction(Action):
executor = exe.get_executor(cfg.CONF.executor.type)
execution_context = self._prepare_execution_context()
result = executor.run_action(
self.action_ex.id if self.action_ex else None,
self.action_def.action_class,
self.action_def.attributes or {},
input_dict,
safe_rerun=safe_rerun,
safe_rerun,
execution_context,
target=target,
async_=False
)
@ -317,6 +323,23 @@ class PythonAction(Action):
self.action_def.action_class
)
def _prepare_execution_context(self):
exc_ctx = {}
if self.task_ex:
wf_ex = self.task_ex.workflow_execution
exc_ctx['workflow_execution_id'] = wf_ex.id
exc_ctx['task_id'] = self.task_ex.id
exc_ctx['workflow_name'] = wf_ex.name
if self.action_ex:
exc_ctx['action_execution_id'] = self.action_ex.id
callback_url = '/v2/action_executions/%s' % self.action_ex.id
exc_ctx['callback_url'] = callback_url
return exc_ctx
def _prepare_input(self, input_dict):
"""Template method to do manipulations with input parameters.

View File

@ -83,7 +83,7 @@ class DefaultEngine(base.Engine):
if not save:
# Action execution is not created but we need to return similar
# object to a client anyway.
# object to the client anyway.
return db_models.ActionExecution(
name=action_name,
description=description,

View File

@ -49,7 +49,7 @@ class Executor(object):
@abc.abstractmethod
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, redelivered=False,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
"""Runs action.
@ -59,6 +59,8 @@ class Executor(object):
will be set to.
:param params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:param execution_context: A dict of values providing information about
the current execution.
:param redelivered: Tells if given action was run before on another
executor.
:param target: Target (group of action executors).

View File

@ -13,11 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral_lib import actions as mistral_lib
from oslo_log import log as logging
from osprofiler import profiler
from mistral_lib import actions as mistral_lib
from mistral.actions import action_factory as a_f
from mistral import context
from mistral import exceptions as exc
@ -35,7 +34,7 @@ class DefaultExecutor(base.Executor):
@profiler.trace('default-executor-run-action', hide_args=True)
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, redelivered=False,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
"""Runs action.
@ -45,6 +44,8 @@ class DefaultExecutor(base.Executor):
will be set to.
:param params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:param execution_context: A dict of values providing information about
the current execution.
:param redelivered: Tells if given action was run before on another
executor.
:param target: Target (group of action executors).
@ -103,10 +104,10 @@ class DefaultExecutor(base.Executor):
try:
# NOTE(d0ugal): If the action is a subclass of mistral-lib we know
# that it expects to be passed the context. We should deprecate
# the builtin action class in Mistral.
# that it expects to be passed the context.
if isinstance(action, mistral_lib.Action):
result = action.run(context.ctx())
action_ctx = context.create_action_context(execution_context)
result = action.run(action_ctx)
else:
result = action.run()

View File

@ -60,7 +60,7 @@ class ExecutorServer(service_base.MistralService):
self._rpc_server.stop(graceful)
def run_action(self, rpc_ctx, action_ex_id, action_cls_str,
action_cls_attrs, params, safe_rerun):
action_cls_attrs, params, safe_rerun, execution_context):
"""Receives calls over RPC to run action on executor.
:param rpc_ctx: RPC request context dictionary.
@ -89,6 +89,7 @@ class ExecutorServer(service_base.MistralService):
action_cls_attrs,
params,
safe_rerun,
execution_context,
redelivered
)

View File

@ -299,7 +299,7 @@ class ExecutorClient(exe.Executor):
@profiler.trace('executor-client-run-action')
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, redelivered=False,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
"""Sends a request to run action to executor.
@ -322,7 +322,8 @@ class ExecutorClient(exe.Executor):
'action_cls_str': action_cls_str,
'action_cls_attrs': action_cls_attrs,
'params': params,
'safe_rerun': safe_rerun
'safe_rerun': safe_rerun,
'execution_context': execution_context,
}
rpc_client_method = (self._client.async_call

View File

@ -79,7 +79,8 @@ workflows:
def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, target=None, async_=True):
params, safe_rerun, execution_context, target=None,
async_=True):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()
@ -89,7 +90,8 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
action_cls_str,
action_cls_attrs,
params,
safe_rerun
safe_rerun,
execution_context=execution_context
)
@ -171,12 +173,21 @@ class EnvironmentTest(base.EngineTestCase):
for t_ex in wf1_task_execs:
a_ex = t_ex.action_executions[0]
callback_url = '/v2/action_executions/%s' % a_ex.id
r_exe.RemoteExecutor.run_action.assert_any_call(
a_ex.id,
'mistral.actions.std_actions.EchoAction',
{},
a_ex.input,
False,
{
'task_id': t_ex.id,
'callback_url': callback_url,
'workflow_execution_id': wf1_ex.id,
'workflow_name': wf1_ex.name,
'action_execution_id': a_ex.id,
},
target=TARGET
)

View File

@ -25,7 +25,8 @@ from mistral.workflow import states
def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, safe_rerun, target=None, async_=True):
action_params, safe_rerun, execution_context, target=None,
async_=True):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()
@ -35,6 +36,7 @@ def _run_at_target(action_ex_id, action_class_str, attributes,
attributes,
action_params,
safe_rerun,
execution_context,
redelivered=True
)