diff --git a/mistral/api/controllers/v2/resources.py b/mistral/api/controllers/v2/resources.py index 171c7fbf6..d19e62a0f 100644 --- a/mistral/api/controllers/v2/resources.py +++ b/mistral/api/controllers/v2/resources.py @@ -404,6 +404,9 @@ class Task(resource.Resource): created_at = wtypes.text updated_at = wtypes.text + started_at = wtypes.text + finished_at = wtypes.text + # Add this param to make Mistral API work with WSME 0.8.0 or higher version reset = wsme.wsattr(bool, mandatory=True) diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/031_add_started_at_and_finished_at_to_task_execution.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/031_add_started_at_and_finished_at_to_task_execution.py new file mode 100644 index 000000000..e493919ab --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/031_add_started_at_and_finished_at_to_task_execution.py @@ -0,0 +1,40 @@ +# Copyright 2018 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Add started_at and finished_at to task execution + +Revision ID: 031 +Revises: 030 +Create Date: 2018-10-03 20:09:45.582597 + +""" + +# revision identifiers, used by Alembic. +revision = '031' +down_revision = '030' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column( + 'task_executions_v2', + sa.Column('started_at', sa.DateTime(), nullable=True) + ) + op.add_column( + 'task_executions_v2', + sa.Column('finished_at', sa.DateTime(), nullable=True) + ) diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index e97fbfb0a..a8f483267 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -255,6 +255,8 @@ class TaskExecution(Execution): action_spec = sa.Column(st.JsonLongDictType()) unique_key = sa.Column(sa.String(255), nullable=True) type = sa.Column(sa.String(10)) + started_at = sa.Column(sa.DateTime, nullable=True) + finished_at = sa.Column(sa.DateTime, nullable=True) # Whether the task is fully processed (publishing and calculating commands # after it). It allows to simplify workflow controller implementations @@ -273,6 +275,14 @@ class TaskExecution(Execution): else self.workflow_executions ) + def to_dict(self): + d = super(TaskExecution, self).to_dict() + + utils.datetime_to_str_in_dict(d, 'started_at') + utils.datetime_to_str_in_dict(d, 'finished_at') + + return d + for cls in utils.iter_subclasses(Execution): event.listen( diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index b180f2088..d87208809 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -73,6 +73,7 @@ def run_task(wf_cmd): LOG.error(msg) task.set_state(states.ERROR, msg) + task.save_finished_time() wf_handler.force_fail_workflow(wf_ex, msg) @@ -125,6 +126,7 @@ def _on_action_complete(action_ex): LOG.error(msg) task.set_state(states.ERROR, msg) + task.save_finished_time() wf_handler.force_fail_workflow(wf_ex, msg) @@ -184,6 +186,7 @@ def _on_action_update(action_ex): LOG.error(msg) task.set_state(states.ERROR, msg) + task.save_finished_time() wf_handler.force_fail_workflow(wf_ex, msg) @@ -212,6 +215,7 @@ def force_fail_task(task_ex, msg): task = _build_task_from_execution(wf_spec, task_ex) task.set_state(states.ERROR, msg) + task.save_finished_time() wf_handler.force_fail_workflow(task_ex.workflow_execution, msg) @@ -238,6 +242,7 @@ def continue_task(task_ex): LOG.error(msg) task.set_state(states.ERROR, msg) + task.save_finished_time() wf_handler.force_fail_workflow(wf_ex, msg) @@ -266,6 +271,7 @@ def complete_task(task_ex, state, state_info): LOG.error(msg) task.set_state(states.ERROR, msg) + task.save_finished_time() wf_handler.force_fail_workflow(wf_ex, msg) @@ -341,7 +347,8 @@ def _build_task_from_command(cmd): task_ex=cmd.task_ex, unique_key=cmd.task_ex.unique_key, waiting=cmd.task_ex.state == states.WAITING, - triggered_by=cmd.triggered_by + triggered_by=cmd.triggered_by, + rerun=cmd.rerun ) if cmd.reset: @@ -366,7 +373,8 @@ def _build_task_from_command(cmd): def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, - unique_key=None, waiting=False, triggered_by=None): + unique_key=None, waiting=False, triggered_by=None, + rerun=False): if task_spec.get_with_items(): cls = tasks.WithItemsTask else: @@ -380,7 +388,8 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, task_ex=task_ex, unique_key=unique_key, waiting=waiting, - triggered_by=triggered_by + triggered_by=triggered_by, + rerun=rerun ) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 7f66b29b4..b1cc9f6b5 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -51,7 +51,8 @@ class Task(object): """ def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None, - unique_key=None, waiting=False, triggered_by=None): + unique_key=None, waiting=False, triggered_by=None, + rerun=False): self.wf_ex = wf_ex self.task_spec = task_spec self.ctx = ctx @@ -60,6 +61,7 @@ class Task(object): self.unique_key = unique_key self.waiting = waiting self.triggered_by = triggered_by + self.rerun = rerun self.reset_flag = False self.created = False self.state_changed = False @@ -174,6 +176,12 @@ class Task(object): cur_state = self.task_ex.state + # Set initial started_at in case of waiting => running. + # We can't set this just in run_existing, because task retries + # will update started_at, which is incorrect. + if cur_state == states.WAITING and state == states.RUNNING: + self.save_started_time() + if cur_state != state or self.task_ex.state_info != state_info: task_ex = db_api.update_task_execution_state( id=self.task_ex.id, @@ -270,6 +278,8 @@ class Task(object): self.register_workflow_completion_check() + self.save_finished_time() + # Publish task event. self.notify(old_task_state, self.task_ex.state) @@ -398,6 +408,18 @@ class Task(object): return env.get('__actions', {}).get(action_name, {}) + def save_started_time(self, value='default'): + if not self.task_ex: + return + time = value if value is not 'default' else utils.utc_now_sec() + self.task_ex.started_at = time + + def save_finished_time(self, value='default'): + if not self.task_ex: + return + time = value if value is not 'default' else utils.utc_now_sec() + self.task_ex.finished_at = time + class RegularTask(Task): """Regular task. @@ -440,6 +462,7 @@ class RegularTask(Task): return self._create_task_execution() + self.save_started_time() # Publish event. self.notify(None, self.task_ex.state) @@ -481,6 +504,15 @@ class RegularTask(Task): # Publish event. self.notify(old_task_state, self.task_ex.state) + if self.rerun: + self.save_started_time() + self.save_finished_time(value=None) + self._before_task_start() + + # Policies could possibly change task state. + if self.task_ex.state != states.RUNNING: + return + self._update_inbound_context() self._update_triggered_by() self._reset_actions() diff --git a/mistral/tests/unit/api/v2/test_tasks.py b/mistral/tests/unit/api/v2/test_tasks.py index eb70a2be9..496900503 100644 --- a/mistral/tests/unit/api/v2/test_tasks.py +++ b/mistral/tests/unit/api/v2/test_tasks.py @@ -73,6 +73,8 @@ TASK_EX = models.TaskExecution( workflow_execution_id=WF_EX.id, created_at=datetime.datetime(1970, 1, 1), updated_at=datetime.datetime(1970, 1, 1), + started_at=datetime.datetime(1970, 1, 1), + finished_at=datetime.datetime(1970, 1, 1), published=PUBLISHED, processed=True ) @@ -96,6 +98,8 @@ WITH_ITEMS_TASK_EX = models.TaskExecution( workflow_execution_id=WF_EX.id, created_at=datetime.datetime(1970, 1, 1), updated_at=datetime.datetime(1970, 1, 1), + started_at=datetime.datetime(1970, 1, 1), + finished_at=datetime.datetime(1970, 1, 1), published=PUBLISHED, processed=True ) @@ -109,6 +113,8 @@ TASK = { 'workflow_execution_id': WF_EX.id, 'created_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00', + 'started_at': '1970-01-01 00:00:00', + 'finished_at': '1970-01-01 00:00:00', 'result': json.dumps(RESULT), 'published': json.dumps(PUBLISHED), 'runtime_context': json.dumps(RUNTIME_CONTEXT), diff --git a/mistral/tests/unit/engine/test_task_started_finished_at.py b/mistral/tests/unit/engine/test_task_started_finished_at.py new file mode 100644 index 000000000..ae4d7656d --- /dev/null +++ b/mistral/tests/unit/engine/test_task_started_finished_at.py @@ -0,0 +1,163 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.tests.unit.engine import base + +from mistral.db.v2 import api as db_api +from mistral.services import workflows as wf_service + +WF_FAIL_WITH_WAIT = """ +version: "2.0" + +wf_fail: + tasks: + task1: + action: std.fail + wait-before: 2 +""" + +WF_JOIN_ALL = """ +version: "2.0" + +wf_join: + tasks: + wait_2: + action: std.sleep seconds=2 + on-success: finish_wf + wait_4: + action: std.sleep seconds=4 + on-success: finish_wf + wait_6: + action: std.sleep seconds=6 + on-success: finish_wf + finish_wf: + join: all + action: std.sleep seconds=2 +""" + +WF_WITH_RETRIES = """ +version: "2.0" + +wf_retry: + tasks: + task1: + action: std.fail + retry: + delay: 1 + count: 5 +""" + +WF_WAIT_BEFORE_AFTER = """ +version: "2.0" + +wf_wait: + tasks: + task1: + action: std.noop + wait-before: 3 + wait-after: 3 +""" + + +class TaskStartedFinishedAtTest(base.EngineTestCase): + + def setUp(self): + super(TaskStartedFinishedAtTest, self).setUp() + + def test_started_finished_fields_updated_after_rerun(self): + wf_service.create_workflows(WF_FAIL_WITH_WAIT) + wf_ex = self.engine.start_workflow('wf_fail') + self.await_workflow_error(wf_ex.id) + + task_ex = self._get_task_from_wf(wf_ex.id) + started_1st, finished_1st = self._get_started_finished(task_ex) + + wf_ex = self.engine.rerun_workflow(task_ex.id) + + task_ex = self._get_task_from_wf(wf_ex.id) + self.assertIsNone(task_ex.finished_at) + self.await_workflow_error(wf_ex.id) + + task_ex = self._get_task_from_wf(wf_ex.id) + started_2nd, finished_2nd = self._get_started_finished(task_ex) + + self.assertNotEqual(started_1st, started_2nd) + self.assertNotEqual(finished_1st, finished_2nd) + + def test_correct_duration_in_case_of_join_all(self): + wf_service.create_workflows(WF_JOIN_ALL) + wf_ex = self.engine.start_workflow('wf_join') + self.await_workflow_success(wf_ex.id) + + wait_2 = self._get_task_from_wf(wf_ex.id, 'wait_2') + wait_4 = self._get_task_from_wf(wf_ex.id, 'wait_4') + wait_6 = self._get_task_from_wf(wf_ex.id, 'wait_6') + finish_wf = self._get_task_from_wf(wf_ex.id, 'finish_wf') + + self._check_was_started_after(finish_wf, wait_2) + self._check_was_started_after(finish_wf, wait_4) + self._check_was_started_after(finish_wf, wait_6) + + def test_retries_do_not_update_created_at(self): + wf_service.create_workflows(WF_WITH_RETRIES) + wf_ex = self.engine.start_workflow('wf_retry') + self.await_workflow_error(wf_ex.id) + + task_ex = self._get_task_from_wf(wf_ex.id) + created_at = task_ex.created_at + started_at = self._get_started_finished(task_ex)[0] + + self.assertEqual(created_at, started_at) + + def test_wait_before_after_are_included_to_duration(self): + wf_service.create_workflows(WF_WAIT_BEFORE_AFTER) + wf_ex = self.engine.start_workflow('wf_wait') + self.await_workflow_success(wf_ex.id) + + task_ex = self._get_task_from_wf(wf_ex.id) + started, finished = self._get_started_finished(task_ex) + duration = self._get_task_duration(started, finished) + + self._check_duration_more_than(duration, 1) + + def _get_task_from_wf(self, wf_ex_id, name='task1'): + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex_id) + task_execs = wf_ex.task_executions + return self._assert_single_item(task_execs, name=name) + + def _get_started_finished(self, task_ex): + started_at = task_ex.started_at + finished_at = task_ex.finished_at + self.assertIsNotNone(started_at) + self.assertIsNotNone(finished_at) + return started_at, finished_at + + def _get_task_duration(self, start_time, finish_time): + return (finish_time - start_time).total_seconds() + + def _check_was_started_after(self, task_started, task_finished): + first_finished = self._get_started_finished(task_finished)[1] + second_started = self._get_started_finished(task_started)[0] + delta = self._get_task_duration(first_finished, second_started) + + self.assertTrue( + delta >= 0, + "Expected {} was started after {} was finished".format( + task_started.name, task_finished.name) + ) + + def _check_duration_more_than(self, duration, time): + self.assertTrue( + time < duration, + "Expected duration {} was more than {}".format(duration, time) + ) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index c8831039e..4e302f3b2 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -144,7 +144,8 @@ class WorkflowController(object): return [] cmds = [ - commands.RunExistingTask(self.wf_ex, self.wf_spec, t_e, reset) + commands.RunExistingTask(self.wf_ex, self.wf_spec, t_e, reset, + rerun=True) for t_e in task_execs ] diff --git a/mistral/workflow/commands.py b/mistral/workflow/commands.py index 84b4d3bc3..5182cd95a 100644 --- a/mistral/workflow/commands.py +++ b/mistral/workflow/commands.py @@ -101,7 +101,8 @@ class RunTask(WorkflowCommand): class RunExistingTask(WorkflowCommand): """Command to run an existing workflow task.""" - def __init__(self, wf_ex, wf_spec, task_ex, reset=True, triggered_by=None): + def __init__(self, wf_ex, wf_spec, task_ex, reset=True, triggered_by=None, + rerun=False): super(RunExistingTask, self).__init__( wf_ex, wf_spec, @@ -113,6 +114,7 @@ class RunExistingTask(WorkflowCommand): self.task_ex = task_ex self.reset = reset self.unique_key = task_ex.unique_key + self.rerun = rerun def to_dict(self): d = super(RunExistingTask, self).to_dict()