From 54a8e304069cf5be46b57e1119a20fbb030e8e6e Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Wed, 3 Feb 2016 00:08:45 +1300 Subject: [PATCH] Support workflow id in execution operations Workflow UUID will be used in Mistral instead of workflow name, which is not unique in the system. So, we need to support using workflow UUID in execution operations. Partially implements: blueprint use-workflow-id-in-rest-api Change-Id: I1b83eb75aa89484235e2150ab4f111da4ea766b7 --- mistral/api/controllers/v2/execution.py | 13 ++++++++++++- mistral/api/controllers/v2/task.py | 2 ++ mistral/db/v2/sqlalchemy/models.py | 2 +- mistral/engine/action_handler.py | 1 + mistral/engine/base.py | 6 ++++-- mistral/engine/default_engine.py | 7 ++++--- mistral/engine/rpc.py | 15 +++++++++------ mistral/engine/task_handler.py | 1 + mistral/services/executions.py | 7 ++++--- mistral/tests/unit/api/v2/test_executions.py | 16 +++++++++++++++- mistral/tests/unit/api/v2/test_tasks.py | 3 +++ 11 files changed, 56 insertions(+), 17 deletions(-) diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index eb9f46b2b..83654b99c 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -45,6 +45,9 @@ class Execution(resource.Resource): workflow_name = wtypes.text "reference to workflow definition" + workflow_id = wtypes.text + "reference to workflow ID" + description = wtypes.text "description of workflow execution." @@ -74,6 +77,7 @@ class Execution(resource.Resource): def sample(cls): return cls(id='123e4567-e89b-12d3-a456-426655440000', workflow_name='flow', + workflow_id='123e4567-e89b-12d3-a456-426655441111', description='this is the first execution.', state='SUCCESS', input={}, @@ -220,8 +224,15 @@ class ExecutionsController(rest.RestController): engine = rpc.get_engine_client() exec_dict = wf_ex.to_dict() + if not (exec_dict.get('workflow_id') + or exec_dict.get('workflow_name')): + raise exc.WorkflowException( + "Workflow ID or workflow name must be provided. Workflow ID is" + " recommended." + ) + result = engine.start_workflow( - exec_dict['workflow_name'], + exec_dict.get('workflow_id', exec_dict['workflow_name']), exec_dict.get('input'), exec_dict.get('description', ''), **exec_dict.get('params') or {} diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 3a63e5f02..827013409 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -43,6 +43,7 @@ class Task(resource.Resource): name = wtypes.text workflow_name = wtypes.text + workflow_id = wtypes.text workflow_execution_id = wtypes.text state = wtypes.text @@ -69,6 +70,7 @@ class Task(resource.Resource): return cls( id='123e4567-e89b-12d3-a456-426655440000', workflow_name='flow', + workflow_id='123e4567-e89b-12d3-a456-426655441111', workflow_execution_id='123e4567-e89b-12d3-a456-426655440000', name='task', state=states.SUCCESS, diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index ae66baac3..78a42e5cd 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -121,8 +121,8 @@ class Execution(mb.MistralSecureModelBase): id = mb.id_column() name = sa.Column(sa.String(80)) description = sa.Column(sa.String(255), nullable=True) - workflow_name = sa.Column(sa.String(80)) + workflow_id = sa.Column(sa.String(80)) spec = sa.Column(st.JsonDictType()) state = sa.Column(sa.String(20)) state_info = sa.Column(sa.Text(), nullable=True) diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index 4eb4fb1d6..5761564ba 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -62,6 +62,7 @@ def create_action_execution(action_def, action_input, task_ex=None, values.update({ 'task_execution_id': task_ex.id, 'workflow_name': task_ex.workflow_name, + 'workflow_id': task_ex.workflow_id, 'project_id': task_ex.project_id, }) else: diff --git a/mistral/engine/base.py b/mistral/engine/base.py index dc4288d33..d3d22195c 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -29,10 +29,12 @@ class Engine(object): """Engine interface.""" @abc.abstractmethod - def start_workflow(self, wf_name, wf_input, description='', **params): + def start_workflow(self, wf_identifier, wf_input, description='', + **params): """Starts the specified workflow. - :param wf_name: Workflow name. + :param wf_identifier: Workflow ID or name. Workflow ID is recommended, + workflow name will be deprecated since Mitaka. :param wf_input: Workflow input data as a dictionary. :param description: Execution description. :param params: Additional workflow type specific parameters. diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 8f7cb3444..c388d3797 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -50,7 +50,8 @@ class DefaultEngine(base.Engine, coordination.Service): coordination.Service.__init__(self, 'engine_group') @u.log_exec(LOG) - def start_workflow(self, wf_name, wf_input, description='', **params): + def start_workflow(self, wf_identifier, wf_input, description='', + **params): wf_ex_id = None try: @@ -58,7 +59,7 @@ class DefaultEngine(base.Engine, coordination.Service): # The new workflow execution will be in an IDLE # state on initial record creation. wf_ex_id = wf_ex_service.create_workflow_execution( - wf_name, + wf_identifier, wf_input, description, params @@ -87,7 +88,7 @@ class DefaultEngine(base.Engine, coordination.Service): except Exception as e: LOG.error( "Failed to start workflow '%s' id=%s: %s\n%s", - wf_name, wf_ex_id, e, traceback.format_exc() + wf_identifier, wf_ex_id, e, traceback.format_exc() ) wf_ex = self._fail_workflow(wf_ex_id, e) diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index 0ce69f76d..103ed1cfe 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -77,7 +77,7 @@ class EngineServer(object): def __init__(self, engine): self._engine = engine - def start_workflow(self, rpc_ctx, workflow_name, workflow_input, + def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input, description, params): """Receives calls over RPC to start workflows on engine. @@ -87,12 +87,14 @@ class EngineServer(object): LOG.info( "Received RPC request 'start_workflow'[rpc_ctx=%s," - " workflow_name=%s, workflow_input=%s, description=%s, params=%s]" - % (rpc_ctx, workflow_name, workflow_input, description, params) + " workflow_identifier=%s, workflow_input=%s, description=%s, " + "params=%s]" + % (rpc_ctx, workflow_identifier, workflow_input, description, + params) ) return self._engine.start_workflow( - workflow_name, + workflow_identifier, workflow_input, description, **params @@ -283,7 +285,8 @@ class EngineClient(base.Engine): ) @wrap_messaging_exception - def start_workflow(self, wf_name, wf_input, description='', **params): + def start_workflow(self, wf_identifier, wf_input, description='', + **params): """Starts workflow sending a request to engine over RPC. :return: Workflow execution. @@ -291,7 +294,7 @@ class EngineClient(base.Engine): return self._client.call( auth_ctx.ctx(), 'start_workflow', - workflow_name=wf_name, + workflow_identifier=wf_identifier, workflow_input=wf_input or {}, description=description, params=params diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 854d7bd44..56f869863 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -208,6 +208,7 @@ def _create_task_execution(wf_ex, task_spec, ctx, state=states.RUNNING): 'name': task_spec.get_name(), 'workflow_execution_id': wf_ex.id, 'workflow_name': wf_ex.workflow_name, + 'workflow_id': wf_ex.workflow_id, 'state': state, 'spec': task_spec.to_dict(), 'in_context': ctx, diff --git a/mistral/services/executions.py b/mistral/services/executions.py index 9a15863f6..9b8e26d92 100644 --- a/mistral/services/executions.py +++ b/mistral/services/executions.py @@ -50,6 +50,7 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): 'name': wf_def.name, 'description': desc, 'workflow_name': wf_def.name, + 'workflow_id': wf_def.id, 'spec': wf_spec.to_dict(), 'params': params or {}, 'state': states.IDLE, @@ -70,10 +71,10 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): return wf_ex -def create_workflow_execution(wf_name, wf_input, description, params): +def create_workflow_execution(wf_identifier, wf_input, description, params): params = canonize_workflow_params(params) - wf_def = db_api.get_workflow_definition(wf_name) + wf_def = db_api.get_workflow_definition(wf_identifier) wf_spec = spec_parser.get_workflow_spec(wf_def.spec) eng_utils.validate_input(wf_def, wf_input, wf_spec) @@ -86,6 +87,6 @@ def create_workflow_execution(wf_name, wf_input, description, params): params ) - wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) + wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier) return wf_ex.id diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 4e306a67d..53c4338ed 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -33,6 +33,7 @@ from mistral.workflow import states WF_EX = models.WorkflowExecution( id='123e4567-e89b-12d3-a456-426655440000', workflow_name='some', + workflow_id='123e4567-e89b-12d3-a456-426655441111', description='execution description.', spec={'name': 'some'}, state=states.RUNNING, @@ -54,11 +55,13 @@ WF_EX_JSON = { 'created_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00', 'workflow_name': 'some', + 'workflow_id': '123e4567-e89b-12d3-a456-426655441111' } SUB_WF_EX = models.WorkflowExecution( id=str(uuid.uuid4()), workflow_name='some', + workflow_id='123e4567-e89b-12d3-a456-426655441111', description='foobar', spec={'name': 'some'}, state=states.RUNNING, @@ -74,6 +77,7 @@ SUB_WF_EX = models.WorkflowExecution( SUB_WF_EX_JSON = { 'id': SUB_WF_EX.id, 'workflow_name': 'some', + 'workflow_id': '123e4567-e89b-12d3-a456-426655441111', 'description': 'foobar', 'input': '{"foo": "bar"}', 'output': '{}', @@ -389,7 +393,7 @@ class TestExecutionsController(base.FunctionalTest): exec_dict = WF_EX_JSON_WITH_DESC f.assert_called_once_with( - exec_dict['workflow_name'], + exec_dict['workflow_id'], json.loads(exec_dict['input']), exec_dict['description'], **json.loads(exec_dict['params']) @@ -406,6 +410,16 @@ class TestExecutionsController(base.FunctionalTest): self.assertIn('Bad response: 400', context.args[0]) + def test_post_without_workflow_id_and_name(self): + context = self.assertRaises( + webtest_app.AppError, + self.app.post_json, + '/v2/executions', + {'description': 'some description here.'} + ) + + self.assertIn('Bad response: 400', context.args[0]) + @mock.patch.object(db_api, 'delete_workflow_execution', MOCK_DELETE) def test_delete(self): resp = self.app.delete('/v2/executions/123') diff --git a/mistral/tests/unit/api/v2/test_tasks.py b/mistral/tests/unit/api/v2/test_tasks.py index fea7c326e..f54e9e9df 100644 --- a/mistral/tests/unit/api/v2/test_tasks.py +++ b/mistral/tests/unit/api/v2/test_tasks.py @@ -49,6 +49,7 @@ TASK_EX = models.TaskExecution( id='123', name='task', workflow_name='flow', + workflow_id='123e4567-e89b-12d3-a456-426655441111', spec={ 'type': 'direct', 'version': '2.0', @@ -70,6 +71,7 @@ WITH_ITEMS_TASK_EX = models.TaskExecution( id='123', name='task', workflow_name='flow', + workflow_id='123e4567-e89b-12d3-a456-426655441111', spec={ 'type': 'direct', 'version': '2.0', @@ -92,6 +94,7 @@ TASK = { 'id': '123', 'name': 'task', 'workflow_name': 'flow', + 'workflow_id': '123e4567-e89b-12d3-a456-426655441111', 'state': 'RUNNING', 'workflow_execution_id': WF_EX.id, 'created_at': '1970-01-01 00:00:00',