Merge "Support workflow id in execution operations"

This commit is contained in:
Jenkins 2016-02-10 08:44:59 +00:00 committed by Gerrit Code Review
commit 5e3666aadc
11 changed files with 56 additions and 17 deletions

View File

@ -45,6 +45,9 @@ class Execution(resource.Resource):
workflow_name = wtypes.text
"reference to workflow definition"
workflow_id = wtypes.text
"reference to workflow ID"
description = wtypes.text
"description of workflow execution."
@ -74,6 +77,7 @@ class Execution(resource.Resource):
def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='flow',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
description='this is the first execution.',
state='SUCCESS',
input={},
@ -220,8 +224,15 @@ class ExecutionsController(rest.RestController):
engine = rpc.get_engine_client()
exec_dict = wf_ex.to_dict()
if not (exec_dict.get('workflow_id')
or exec_dict.get('workflow_name')):
raise exc.WorkflowException(
"Workflow ID or workflow name must be provided. Workflow ID is"
" recommended."
)
result = engine.start_workflow(
exec_dict['workflow_name'],
exec_dict.get('workflow_id', exec_dict['workflow_name']),
exec_dict.get('input'),
exec_dict.get('description', ''),
**exec_dict.get('params') or {}

View File

@ -43,6 +43,7 @@ class Task(resource.Resource):
name = wtypes.text
workflow_name = wtypes.text
workflow_id = wtypes.text
workflow_execution_id = wtypes.text
state = wtypes.text
@ -69,6 +70,7 @@ class Task(resource.Resource):
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='flow',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
workflow_execution_id='123e4567-e89b-12d3-a456-426655440000',
name='task',
state=states.SUCCESS,

View File

@ -121,8 +121,8 @@ class Execution(mb.MistralSecureModelBase):
id = mb.id_column()
name = sa.Column(sa.String(80))
description = sa.Column(sa.String(255), nullable=True)
workflow_name = sa.Column(sa.String(80))
workflow_id = sa.Column(sa.String(80))
spec = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20))
state_info = sa.Column(sa.Text(), nullable=True)

View File

@ -62,6 +62,7 @@ def create_action_execution(action_def, action_input, task_ex=None,
values.update({
'task_execution_id': task_ex.id,
'workflow_name': task_ex.workflow_name,
'workflow_id': task_ex.workflow_id,
'project_id': task_ex.project_id,
})
else:

View File

@ -29,10 +29,12 @@ class Engine(object):
"""Engine interface."""
@abc.abstractmethod
def start_workflow(self, wf_name, wf_input, description='', **params):
def start_workflow(self, wf_identifier, wf_input, description='',
**params):
"""Starts the specified workflow.
:param wf_name: Workflow name.
:param wf_identifier: Workflow ID or name. Workflow ID is recommended,
workflow name will be deprecated since Mitaka.
:param wf_input: Workflow input data as a dictionary.
:param description: Execution description.
:param params: Additional workflow type specific parameters.

View File

@ -50,7 +50,8 @@ class DefaultEngine(base.Engine, coordination.Service):
coordination.Service.__init__(self, 'engine_group')
@u.log_exec(LOG)
def start_workflow(self, wf_name, wf_input, description='', **params):
def start_workflow(self, wf_identifier, wf_input, description='',
**params):
wf_ex_id = None
try:
@ -58,7 +59,7 @@ class DefaultEngine(base.Engine, coordination.Service):
# The new workflow execution will be in an IDLE
# state on initial record creation.
wf_ex_id = wf_ex_service.create_workflow_execution(
wf_name,
wf_identifier,
wf_input,
description,
params
@ -87,7 +88,7 @@ class DefaultEngine(base.Engine, coordination.Service):
except Exception as e:
LOG.error(
"Failed to start workflow '%s' id=%s: %s\n%s",
wf_name, wf_ex_id, e, traceback.format_exc()
wf_identifier, wf_ex_id, e, traceback.format_exc()
)
wf_ex = self._fail_workflow(wf_ex_id, e)

View File

@ -77,7 +77,7 @@ class EngineServer(object):
def __init__(self, engine):
self._engine = engine
def start_workflow(self, rpc_ctx, workflow_name, workflow_input,
def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input,
description, params):
"""Receives calls over RPC to start workflows on engine.
@ -87,12 +87,14 @@ class EngineServer(object):
LOG.info(
"Received RPC request 'start_workflow'[rpc_ctx=%s,"
" workflow_name=%s, workflow_input=%s, description=%s, params=%s]"
% (rpc_ctx, workflow_name, workflow_input, description, params)
" workflow_identifier=%s, workflow_input=%s, description=%s, "
"params=%s]"
% (rpc_ctx, workflow_identifier, workflow_input, description,
params)
)
return self._engine.start_workflow(
workflow_name,
workflow_identifier,
workflow_input,
description,
**params
@ -283,7 +285,8 @@ class EngineClient(base.Engine):
)
@wrap_messaging_exception
def start_workflow(self, wf_name, wf_input, description='', **params):
def start_workflow(self, wf_identifier, wf_input, description='',
**params):
"""Starts workflow sending a request to engine over RPC.
:return: Workflow execution.
@ -291,7 +294,7 @@ class EngineClient(base.Engine):
return self._client.call(
auth_ctx.ctx(),
'start_workflow',
workflow_name=wf_name,
workflow_identifier=wf_identifier,
workflow_input=wf_input or {},
description=description,
params=params

View File

@ -214,6 +214,7 @@ def _create_task_execution(wf_ex, task_spec, ctx, state=states.RUNNING):
'name': task_spec.get_name(),
'workflow_execution_id': wf_ex.id,
'workflow_name': wf_ex.workflow_name,
'workflow_id': wf_ex.workflow_id,
'state': state,
'spec': task_spec.to_dict(),
'in_context': ctx,

View File

@ -50,6 +50,7 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
'name': wf_def.name,
'description': desc,
'workflow_name': wf_def.name,
'workflow_id': wf_def.id,
'spec': wf_spec.to_dict(),
'params': params or {},
'state': states.IDLE,
@ -70,10 +71,10 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
return wf_ex
def create_workflow_execution(wf_name, wf_input, description, params):
def create_workflow_execution(wf_identifier, wf_input, description, params):
params = canonize_workflow_params(params)
wf_def = db_api.get_workflow_definition(wf_name)
wf_def = db_api.get_workflow_definition(wf_identifier)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
eng_utils.validate_input(wf_def, wf_input, wf_spec)
@ -86,6 +87,6 @@ def create_workflow_execution(wf_name, wf_input, description, params):
params
)
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name)
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier)
return wf_ex.id

View File

@ -33,6 +33,7 @@ from mistral.workflow import states
WF_EX = models.WorkflowExecution(
id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='some',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
description='execution description.',
spec={'name': 'some'},
state=states.RUNNING,
@ -54,11 +55,13 @@ WF_EX_JSON = {
'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00',
'workflow_name': 'some',
'workflow_id': '123e4567-e89b-12d3-a456-426655441111'
}
SUB_WF_EX = models.WorkflowExecution(
id=str(uuid.uuid4()),
workflow_name='some',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
description='foobar',
spec={'name': 'some'},
state=states.RUNNING,
@ -74,6 +77,7 @@ SUB_WF_EX = models.WorkflowExecution(
SUB_WF_EX_JSON = {
'id': SUB_WF_EX.id,
'workflow_name': 'some',
'workflow_id': '123e4567-e89b-12d3-a456-426655441111',
'description': 'foobar',
'input': '{"foo": "bar"}',
'output': '{}',
@ -389,7 +393,7 @@ class TestExecutionsController(base.FunctionalTest):
exec_dict = WF_EX_JSON_WITH_DESC
f.assert_called_once_with(
exec_dict['workflow_name'],
exec_dict['workflow_id'],
json.loads(exec_dict['input']),
exec_dict['description'],
**json.loads(exec_dict['params'])
@ -406,6 +410,16 @@ class TestExecutionsController(base.FunctionalTest):
self.assertIn('Bad response: 400', context.args[0])
def test_post_without_workflow_id_and_name(self):
context = self.assertRaises(
webtest_app.AppError,
self.app.post_json,
'/v2/executions',
{'description': 'some description here.'}
)
self.assertIn('Bad response: 400', context.args[0])
@mock.patch.object(db_api, 'delete_workflow_execution', MOCK_DELETE)
def test_delete(self):
resp = self.app.delete('/v2/executions/123')

View File

@ -49,6 +49,7 @@ TASK_EX = models.TaskExecution(
id='123',
name='task',
workflow_name='flow',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
spec={
'type': 'direct',
'version': '2.0',
@ -70,6 +71,7 @@ WITH_ITEMS_TASK_EX = models.TaskExecution(
id='123',
name='task',
workflow_name='flow',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
spec={
'type': 'direct',
'version': '2.0',
@ -92,6 +94,7 @@ TASK = {
'id': '123',
'name': 'task',
'workflow_name': 'flow',
'workflow_id': '123e4567-e89b-12d3-a456-426655441111',
'state': 'RUNNING',
'workflow_execution_id': WF_EX.id,
'created_at': '1970-01-01 00:00:00',