diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index 784a849f1..d69b8c0ba 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -51,17 +51,26 @@ STATE_TYPES = wtypes.Enum( ) -def _load_deferred_output_field(ex): - if ex: - # We need to refer to this lazy-load field explicitly in - # order to make sure that it is correctly loaded. - hasattr(ex, 'output') +def _load_deferred_fields(ex, fields): + if not ex: + return ex + + # We need to refer lazy-loaded fields explicitly in + # order to make sure that they are correctly loaded. + for f in fields: + hasattr(ex, f) return ex +def _get_workflow_execution_resource_with_output(wf_ex): + _load_deferred_fields(wf_ex, ['params', 'output']) + + return resources.Execution.from_db_model(wf_ex) + + def _get_workflow_execution_resource(wf_ex): - _load_deferred_output_field(wf_ex) + _load_deferred_fields(wf_ex, ['params']) return resources.Execution.from_db_model(wf_ex) @@ -75,7 +84,7 @@ def _get_workflow_execution(id, must_exist=True): else: wf_ex = db_api.load_workflow_execution(id) - return _load_deferred_output_field(wf_ex) + return _load_deferred_fields(wf_ex, ['params', 'output']) # TODO(rakhmerov): Make sure to make all needed renaming on public API. @@ -398,9 +407,9 @@ class ExecutionsController(rest.RestController): ) if include_output: - resource_function = _get_workflow_execution_resource + resource_function = _get_workflow_execution_resource_with_output else: - resource_function = None + resource_function = _get_workflow_execution_resource return rest_utils.get_all( resources.Executions, diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index 0b51d5889..ec0db2b9a 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -177,7 +177,6 @@ class Execution(mb.MistralSecureModelBase): workflow_name = sa.Column(sa.String(255)) workflow_namespace = sa.Column(sa.String(255)) workflow_id = sa.Column(sa.String(80)) - spec = sa.Column(st.JsonMediumDictType()) state = sa.Column(sa.String(20)) state_info = sa.Column(sa.Text(), nullable=True) tags = sa.Column(st.JsonListType()) @@ -199,6 +198,7 @@ class ActionExecution(Execution): ) # Main properties. + spec = sa.Column(st.JsonMediumDictType()) accepted = sa.Column(sa.Boolean(), default=False) input = sa.Column(st.JsonLongDictType(), nullable=True) output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True)) @@ -224,10 +224,11 @@ class WorkflowExecution(Execution): ) # Main properties. + spec = sa.orm.deferred(sa.Column(st.JsonMediumDictType())) accepted = sa.Column(sa.Boolean(), default=False) - input = sa.Column(st.JsonLongDictType(), nullable=True) + input = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True)) output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True)) - params = sa.Column(st.JsonLongDictType()) + params = sa.orm.deferred(sa.Column(st.JsonLongDictType())) # Initial workflow context containing workflow variables, environment, # openstack security context etc. @@ -235,7 +236,7 @@ class WorkflowExecution(Execution): # * Data stored in this structure should not be copied into inbound # contexts of tasks. No need to duplicate it. # * This structure does not contain workflow input. - context = sa.Column(st.JsonLongDictType()) + context = sa.orm.deferred(sa.Column(st.JsonLongDictType())) class TaskExecution(Execution): @@ -252,6 +253,7 @@ class TaskExecution(Execution): ) # Main properties. + spec = sa.orm.deferred(sa.Column(st.JsonMediumDictType())) action_spec = sa.Column(st.JsonLongDictType()) unique_key = sa.Column(sa.String(255), nullable=True) type = sa.Column(sa.String(10)) diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index c696d1152..431dc5213 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -898,7 +898,7 @@ class TestExecutionsController(base.APITest): resource_function = kwargs['resource_function'] self.assertEqual( - execution._get_workflow_execution_resource, + execution._get_workflow_execution_resource_with_output, resource_function ) @@ -912,7 +912,10 @@ class TestExecutionsController(base.APITest): args, kwargs = mock_get_all.call_args resource_function = kwargs['resource_function'] - self.assertIsNone(resource_function) + self.assertEqual( + execution._get_workflow_execution_resource, + resource_function + ) @mock.patch('mistral.db.v2.api.get_workflow_executions') @mock.patch('mistral.context.MistralContext.from_environ') diff --git a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py index c8b91decb..50e56f8d8 100644 --- a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py @@ -2112,17 +2112,18 @@ class TaskExecutionTest(SQLAlchemyTest): self.assertIsNone(created.updated_at) - updated = db_api.update_task_execution( - created.id, - {'workflow_name': 'new_wf'} - ) + with db_api.transaction(): + updated = db_api.update_task_execution( + created.id, + {'workflow_name': 'new_wf'} + ) - self.assertEqual('new_wf', updated.workflow_name) + self.assertEqual('new_wf', updated.workflow_name) - fetched = db_api.get_task_execution(created.id) + fetched = db_api.get_task_execution(created.id) - self.assertEqual(updated, fetched) - self.assertIsNotNone(fetched.updated_at) + self.assertEqual(updated, fetched) + self.assertIsNotNone(fetched.updated_at) def test_create_or_update_task_execution(self): id = 'not-existing-id' @@ -2139,20 +2140,21 @@ class TaskExecutionTest(SQLAlchemyTest): self.assertIsNotNone(created) self.assertIsNotNone(created.id) - updated = db_api.create_or_update_task_execution( - created.id, - {'state': 'RUNNING'} - ) + with db_api.transaction(): + updated = db_api.create_or_update_task_execution( + created.id, + {'state': 'RUNNING'} + ) - self.assertEqual('RUNNING', updated.state) - self.assertEqual( - 'RUNNING', - db_api.load_task_execution(updated.id).state - ) + self.assertEqual('RUNNING', updated.state) + self.assertEqual( + 'RUNNING', + db_api.load_task_execution(updated.id).state + ) - fetched = db_api.get_task_execution(created.id) + fetched = db_api.get_task_execution(created.id) - self.assertEqual(updated, fetched) + self.assertEqual(updated, fetched) def test_get_task_executions(self): wf_ex = db_api.create_workflow_execution(WF_EXECS[0]) @@ -2183,10 +2185,12 @@ class TaskExecutionTest(SQLAlchemyTest): created.name, 'eq' ) - fetched = db_api.get_task_executions(**_filter) - self.assertEqual(1, len(fetched)) - self.assertEqual(created, fetched[0]) + with db_api.transaction(): + fetched = db_api.get_task_executions(**_filter) + + self.assertEqual(1, len(fetched)) + self.assertEqual(created, fetched[0]) def test_filter_task_execution_by_not_equal_value(self): created0, created1 = self._create_task_executions() @@ -2197,10 +2201,11 @@ class TaskExecutionTest(SQLAlchemyTest): 'neq' ) - fetched = db_api.get_task_executions(**_filter) + with db_api.transaction(): + fetched = db_api.get_task_executions(**_filter) - self.assertEqual(1, len(fetched)) - self.assertEqual(created1, fetched[0]) + self.assertEqual(1, len(fetched)) + self.assertEqual(created1, fetched[0]) def test_filter_task_execution_by_greater_than_value(self): created0, created1 = self._create_task_executions() @@ -2210,10 +2215,12 @@ class TaskExecutionTest(SQLAlchemyTest): created0['created_at'], 'gt' ) - fetched = db_api.get_task_executions(**_filter) - self.assertEqual(1, len(fetched)) - self.assertEqual(created1, fetched[0]) + with db_api.transaction(): + fetched = db_api.get_task_executions(**_filter) + + self.assertEqual(1, len(fetched)) + self.assertEqual(created1, fetched[0]) def test_filter_task_execution_by_greater_than_equal_value(self): created0, created1 = self._create_task_executions() @@ -2223,6 +2230,7 @@ class TaskExecutionTest(SQLAlchemyTest): created0['created_at'], 'gte' ) + fetched = db_api.get_task_executions(**_filter) self.assertEqual(2, len(fetched)) @@ -2237,10 +2245,12 @@ class TaskExecutionTest(SQLAlchemyTest): created1['created_at'], 'lt' ) - fetched = db_api.get_task_executions(**_filter) - self.assertEqual(1, len(fetched)) - self.assertEqual(created0, fetched[0]) + with db_api.transaction(): + fetched = db_api.get_task_executions(**_filter) + + self.assertEqual(1, len(fetched)) + self.assertEqual(created0, fetched[0]) def test_filter_task_execution_by_less_than_equal_value(self): created0, created1 = self._create_task_executions() @@ -2250,6 +2260,7 @@ class TaskExecutionTest(SQLAlchemyTest): created1['created_at'], 'lte' ) + fetched = db_api.get_task_executions(**_filter) self.assertEqual(2, len(fetched)) @@ -2264,12 +2275,14 @@ class TaskExecutionTest(SQLAlchemyTest): [created['created_at']], 'in' ) - fetched = db_api.get_task_executions(**_filter) - self.assertEqual(1, len(fetched)) - self.assertEqual(created, fetched[0]) + with db_api.transaction(): + fetched = db_api.get_task_executions(**_filter) - def test_filter_task_execution_by_values_notin_list(self): + self.assertEqual(1, len(fetched)) + self.assertEqual(created, fetched[0]) + + def test_filter_task_execution_by_values_not_in_list(self): created0, created1 = self._create_task_executions() _filter = filter_utils.create_or_update_filter( @@ -2277,10 +2290,12 @@ class TaskExecutionTest(SQLAlchemyTest): [created0['created_at']], 'nin' ) - fetched = db_api.get_task_executions(**_filter) - self.assertEqual(1, len(fetched)) - self.assertEqual(created1, fetched[0]) + with db_api.transaction(): + fetched = db_api.get_task_executions(**_filter) + + self.assertEqual(1, len(fetched)) + self.assertEqual(created1, fetched[0]) def test_filter_task_execution_by_multiple_columns(self): created0, created1 = self._create_task_executions() @@ -2296,10 +2311,12 @@ class TaskExecutionTest(SQLAlchemyTest): 'eq', _filter ) - fetched = db_api.get_task_executions(**_filter) - self.assertEqual(1, len(fetched)) - self.assertEqual(created1, fetched[0]) + with db_api.transaction(): + fetched = db_api.get_task_executions(**_filter) + + self.assertEqual(1, len(fetched)) + self.assertEqual(created1, fetched[0]) def test_delete_task_execution(self): wf_ex = db_api.create_workflow_execution(WF_EXECS[0]) @@ -2307,13 +2324,14 @@ class TaskExecutionTest(SQLAlchemyTest): values = copy.deepcopy(TASK_EXECS[0]) values.update({'workflow_execution_id': wf_ex.id}) - created = db_api.create_task_execution(values) + with db_api.transaction(): + created = db_api.create_task_execution(values) - fetched = db_api.get_task_execution(created.id) + fetched = db_api.get_task_execution(created.id) - self.assertEqual(created, fetched) + self.assertEqual(created, fetched) - db_api.delete_task_execution(created.id) + db_api.delete_task_execution(created.id) self.assertRaises( exc.DBEntityNotFoundError, @@ -2328,43 +2346,44 @@ class TaskExecutionTest(SQLAlchemyTest): values.update({'workflow_execution_id': wf_ex.id}) values['state'] = 'RUNNING' - task_ex1 = db_api.create_task_execution(values) + with db_api.transaction(): + task_ex1 = db_api.create_task_execution(values) - task_execs = db_api.get_incomplete_task_executions( - workflow_execution_id=wf_ex.id - ) - - self.assertEqual(1, len(task_execs)) - self.assertEqual(task_ex1, task_execs[0]) - self.assertEqual( - 1, - db_api.get_incomplete_task_executions_count( + task_execs = db_api.get_incomplete_task_executions( workflow_execution_id=wf_ex.id ) - ) - # Add one more task. + self.assertEqual(1, len(task_execs)) + self.assertEqual(task_ex1, task_execs[0]) + self.assertEqual( + 1, + db_api.get_incomplete_task_executions_count( + workflow_execution_id=wf_ex.id + ) + ) - values = copy.deepcopy(TASK_EXECS[1]) - values.update({'workflow_execution_id': wf_ex.id}) - values['state'] = 'SUCCESS' + # Add one more task. - db_api.create_task_execution(values) + values = copy.deepcopy(TASK_EXECS[1]) + values.update({'workflow_execution_id': wf_ex.id}) + values['state'] = 'SUCCESS' - # It should be still one incompleted task. + db_api.create_task_execution(values) - task_execs = db_api.get_incomplete_task_executions( - workflow_execution_id=wf_ex.id - ) + # It should be still one incompleted task. - self.assertEqual(1, len(task_execs)) - self.assertEqual(task_ex1, task_execs[0]) - self.assertEqual( - 1, - db_api.get_incomplete_task_executions_count( + task_execs = db_api.get_incomplete_task_executions( workflow_execution_id=wf_ex.id ) - ) + + self.assertEqual(1, len(task_execs)) + self.assertEqual(task_ex1, task_execs[0]) + self.assertEqual( + 1, + db_api.get_incomplete_task_executions_count( + workflow_execution_id=wf_ex.id + ) + ) def test_task_execution_repr(self): wf_ex = db_api.create_workflow_execution(WF_EXECS[0]) diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 59d5cdda6..c688e7a81 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -127,15 +127,15 @@ class DefaultEngineTest(base.DbTestCase): task_execs = wf_ex.task_executions - self.assertEqual(1, len(task_execs)) + self.assertEqual(1, len(task_execs)) - task_ex = task_execs[0] + task_ex = task_execs[0] - self.assertEqual('wb.wf', task_ex.workflow_name) - self.assertEqual('task1', task_ex.name) - self.assertEqual(states.RUNNING, task_ex.state) - self.assertIsNotNone(task_ex.spec) - self.assertDictEqual({}, task_ex.runtime_context) + self.assertEqual('wb.wf', task_ex.workflow_name) + self.assertEqual('task1', task_ex.name) + self.assertEqual(states.RUNNING, task_ex.state) + self.assertIsNotNone(task_ex.spec) + self.assertDictEqual({}, task_ex.runtime_context) # Data Flow properties. action_execs = db_api.get_action_executions( @@ -196,15 +196,15 @@ class DefaultEngineTest(base.DbTestCase): task_execs = wf_ex.task_executions - self.assertEqual(1, len(task_execs)) + self.assertEqual(1, len(task_execs)) - task_ex = task_execs[0] + task_ex = task_execs[0] - self.assertEqual('wb.wf', task_ex.workflow_name) - self.assertEqual('task1', task_ex.name) - self.assertEqual(states.RUNNING, task_ex.state) - self.assertIsNotNone(task_ex.spec) - self.assertDictEqual({}, task_ex.runtime_context) + self.assertEqual('wb.wf', task_ex.workflow_name) + self.assertEqual('task1', task_ex.name) + self.assertEqual(states.RUNNING, task_ex.state) + self.assertIsNotNone(task_ex.spec) + self.assertDictEqual({}, task_ex.runtime_context) # Data Flow properties. action_execs = db_api.get_action_executions( @@ -234,9 +234,10 @@ class DefaultEngineTest(base.DbTestCase): self.assertIsNotNone(wf_ex) - wf_ex = db_api.get_workflow_execution(wf_ex.id) + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) - self.assertDictEqual(wf_ex.params.get('env', {}), env) + self.assertDictEqual(wf_ex.params.get('env', {}), env) @mock.patch.object(db_api, "load_environment", MOCK_ENVIRONMENT) def test_start_workflow_with_saved_env(self): @@ -256,9 +257,10 @@ class DefaultEngineTest(base.DbTestCase): self.assertIsNotNone(wf_ex) - wf_ex = db_api.get_workflow_execution(wf_ex.id) + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) - self.assertDictEqual(wf_ex.params.get('env', {}), env) + self.assertDictEqual(wf_ex.params.get('env', {}), env) @mock.patch.object(db_api, "get_environment", MOCK_NOT_FOUND) def test_start_workflow_env_not_found(self): @@ -463,15 +465,15 @@ class DefaultEngineTest(base.DbTestCase): task_execs = wf_ex.task_executions - self.assertEqual(1, len(task_execs)) + self.assertEqual(1, len(task_execs)) - task1_ex = task_execs[0] + task1_ex = task_execs[0] - self.assertEqual('task1', task1_ex.name) - self.assertEqual(states.RUNNING, task1_ex.state) - self.assertIsNotNone(task1_ex.spec) - self.assertDictEqual({}, task1_ex.runtime_context) - self.assertNotIn('__execution', task1_ex.in_context) + self.assertEqual('task1', task1_ex.name) + self.assertEqual(states.RUNNING, task1_ex.state) + self.assertIsNotNone(task1_ex.spec) + self.assertDictEqual({}, task1_ex.runtime_context) + self.assertNotIn('__execution', task1_ex.in_context) action_execs = db_api.get_action_executions( task_execution_id=task1_ex.id diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index 27a5016cd..5c67ec89b 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -335,10 +335,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) task_execs = wf_ex.task_executions - self.assertEqual(states.ERROR, wf_ex.state) - self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(3, len(task_execs)) - self.assertDictEqual(env, wf_ex.params['env']) + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(3, len(task_execs)) + self.assertDictEqual(env, wf_ex.params['env']) task_10_ex = self._assert_single_item(task_execs, name='t10') task_21_ex = self._assert_single_item(task_execs, name='t21') diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 128dffbb3..e384172ba 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -116,22 +116,23 @@ class EnvironmentTest(base.EngineTestCase): self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) - wf_execs = db_api.get_workflow_executions() + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() - self.assertEqual(2, len(wf_execs)) + self.assertEqual(2, len(wf_execs)) - # Execution of 'wf1'. + # Execution of 'wf1'. - wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2') - wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1') + wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2') + wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1') - expected_wf1_input = { - 'param1': 'Bonnie', - 'param2': 'Clyde' - } + expected_wf1_input = { + 'param1': 'Bonnie', + 'param2': 'Clyde' + } - self.assertIsNotNone(wf1_ex.task_execution_id) - self.assertDictEqual(wf1_ex.input, expected_wf1_input) + self.assertIsNotNone(wf1_ex.task_execution_id) + self.assertDictEqual(wf1_ex.input, expected_wf1_input) # Wait till workflow 'wf1' is completed. self.await_workflow_success(wf1_ex.id) @@ -389,9 +390,9 @@ class EnvironmentTest(base.EngineTestCase): name='task1' ) - self.assertDictEqual({'result': 'val1'}, t.published) + self.assertDictEqual({'result': 'val1'}, t.published) - self.assertNotIn('__env', wf_ex.context) + self.assertNotIn('__env', wf_ex.context) def test_subworkflow_env_no_duplicate(self): wf_text = """--- @@ -444,8 +445,8 @@ class EnvironmentTest(base.EngineTestCase): sub_wf_ex.output ) - # The environment of the subworkflow must be empty. - # To evaluate expressions it should be taken from the - # parent workflow execution. - self.assertDictEqual({}, sub_wf_ex.params['env']) - self.assertNotIn('__env', sub_wf_ex.context) + # The environment of the subworkflow must be empty. + # To evaluate expressions it should be taken from the + # parent workflow execution. + self.assertDictEqual({}, sub_wf_ex.params['env']) + self.assertNotIn('__env', sub_wf_ex.context) diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py index 981f5fb40..88e84a9b3 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py @@ -211,10 +211,10 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): task_execs = wf_ex.task_executions - self.assertEqual(states.ERROR, wf_ex.state) - self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(2, len(task_execs)) - self.assertDictEqual(env, wf_ex.params['env']) + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(2, len(task_execs)) + self.assertDictEqual(env, wf_ex.params['env']) task_1_ex = self._assert_single_item(task_execs, name='t1') task_2_ex = self._assert_single_item(task_execs, name='t2') @@ -232,11 +232,12 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): # Resume workflow and re-run failed task. self.engine.rerun_workflow(task_2_ex.id, env=updated_env) - wf_ex = db_api.get_workflow_execution(wf_ex.id) + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) - self.assertEqual(states.RUNNING, wf_ex.state) - self.assertIsNone(wf_ex.state_info) - self.assertDictEqual(updated_env, wf_ex.params['env']) + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertDictEqual(updated_env, wf_ex.params['env']) # Wait for the workflow to succeed. self.await_workflow_success(wf_ex.id) diff --git a/mistral/tests/unit/engine/test_subworkflows.py b/mistral/tests/unit/engine/test_subworkflows.py index fbcdf4d85..90bbd987a 100644 --- a/mistral/tests/unit/engine/test_subworkflows.py +++ b/mistral/tests/unit/engine/test_subworkflows.py @@ -238,30 +238,31 @@ class SubworkflowsTest(base.EngineTestCase): self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) - wf_execs = db_api.get_workflow_executions() + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() - self.assertEqual(2, len(wf_execs)) + self.assertEqual(2, len(wf_execs)) - # Execution of 'wf2'. - wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1') - wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2') + # Execution of 'wf2'. + wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1') + wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2') - self.assertEqual(project_id, wf1_ex.project_id) - self.assertIsNotNone(wf1_ex.task_execution_id) - self.assertDictContainsSubset( - { - 'task_name': 'task2', - 'task_execution_id': wf1_ex.task_execution_id - }, - wf1_ex.params - ) - self.assertDictEqual( - { - 'param1': 'Bonnie', - 'param2': 'Clyde' - }, - wf1_ex.input - ) + self.assertEqual(project_id, wf1_ex.project_id) + self.assertIsNotNone(wf1_ex.task_execution_id) + self.assertDictContainsSubset( + { + 'task_name': 'task2', + 'task_execution_id': wf1_ex.task_execution_id + }, + wf1_ex.params + ) + self.assertDictEqual( + { + 'param1': 'Bonnie', + 'param2': 'Clyde' + }, + wf1_ex.input + ) # Wait till workflow 'wf1' is completed. self.await_workflow_success(wf1_ex.id) @@ -360,16 +361,17 @@ class SubworkflowsTest(base.EngineTestCase): self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) - wf_execs = db_api.get_workflow_executions() + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() - self.assertEqual(2, len(wf_execs)) + self.assertEqual(2, len(wf_execs)) - # Execution of 'wf1'. - wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1') - wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2') + # Execution of 'wf1'. + wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1') + wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2') - self.assertIsNotNone(wf1_ex.task_execution_id) - self.assertDictContainsSubset({}, wf1_ex.params) + self.assertIsNotNone(wf1_ex.task_execution_id) + self.assertDictContainsSubset({}, wf1_ex.params) # Wait till workflow 'wf1' is completed. self.await_workflow_success(wf1_ex.id) diff --git a/mistral/tests/unit/engine/test_workflow_resume.py b/mistral/tests/unit/engine/test_workflow_resume.py index 0d0d88c4a..1d0ba2d89 100644 --- a/mistral/tests/unit/engine/test_workflow_resume.py +++ b/mistral/tests/unit/engine/test_workflow_resume.py @@ -421,14 +421,14 @@ class WorkflowResumeTest(base.EngineTestCase): task_execs = wf_ex.task_executions - task_1_ex = self._assert_single_item(task_execs, name='task1') - task_2_ex = self._assert_single_item(task_execs, name='task2') + task_1_ex = self._assert_single_item(task_execs, name='task1') + task_2_ex = self._assert_single_item(task_execs, name='task2') - self.assertEqual(states.PAUSED, wf_ex.state) - self.assertEqual(2, len(task_execs)) - self.assertDictEqual(env, wf_ex.params['env']) - self.assertEqual(states.SUCCESS, task_1_ex.state) - self.assertEqual(states.IDLE, task_2_ex.state) + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertEqual(2, len(task_execs)) + self.assertDictEqual(env, wf_ex.params['env']) + self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.IDLE, task_2_ex.state) # Update env in workflow execution with the following. updated_env = { @@ -446,13 +446,13 @@ class WorkflowResumeTest(base.EngineTestCase): task_execs = wf_ex.task_executions - self.assertDictEqual(updated_env, wf_ex.params['env']) - self.assertEqual(3, len(task_execs)) + self.assertDictEqual(updated_env, wf_ex.params['env']) + self.assertEqual(3, len(task_execs)) - # Check result of task2. - task_2_ex = self._assert_single_item(task_execs, name='task2') + # Check result of task2. + task_2_ex = self._assert_single_item(task_execs, name='task2') - self.assertEqual(states.SUCCESS, task_2_ex.state) + self.assertEqual(states.SUCCESS, task_2_ex.state) # Re-read task execution, otherwise lazy loading of action executions # may not work. @@ -461,15 +461,15 @@ class WorkflowResumeTest(base.EngineTestCase): task_2_result = data_flow.get_task_execution_result(task_2_ex) - self.assertEqual(updated_env['var1'], task_2_result) + self.assertEqual(updated_env['var1'], task_2_result) - # Check result of task3. - task_3_ex = self._assert_single_item( - task_execs, - name='task3' - ) + # Check result of task3. + task_3_ex = self._assert_single_item( + task_execs, + name='task3' + ) - self.assertEqual(states.SUCCESS, task_3_ex.state) + self.assertEqual(states.SUCCESS, task_3_ex.state) # Re-read task execution, otherwise lazy loading of action executions # may not work. @@ -478,4 +478,4 @@ class WorkflowResumeTest(base.EngineTestCase): task_3_result = data_flow.get_task_execution_result(task_3_ex) - self.assertEqual(updated_env['var2'], task_3_result) + self.assertEqual(updated_env['var2'], task_3_result) diff --git a/mistral/tests/unit/lang/test_spec_caching.py b/mistral/tests/unit/lang/test_spec_caching.py index 1bcf2cce8..1e9ccc092 100644 --- a/mistral/tests/unit/lang/test_spec_caching.py +++ b/mistral/tests/unit/lang/test_spec_caching.py @@ -185,19 +185,19 @@ class SpecificationCachingTest(base.DbTestCase): self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size()) - wf_ex = db_api.create_workflow_execution({ - 'id': '1-2-3-4', - 'name': 'wf', - 'workflow_id': wf_def.id, - 'spec': wf_spec.to_dict(), - 'state': states.RUNNING - }) + with db_api.transaction(): + wf_ex = db_api.create_workflow_execution({ + 'id': '1-2-3-4', + 'name': 'wf', + 'workflow_id': wf_def.id, + 'spec': wf_spec.to_dict(), + 'state': states.RUNNING + }) - # Check that we can get a valid spec by execution id. - - wf_spec_by_exec_id = spec_parser.get_workflow_spec_by_execution_id( - wf_ex.id - ) + # Check that we can get a valid spec by execution id. + wf_spec_by_exec_id = spec_parser.get_workflow_spec_by_execution_id( + wf_ex.id + ) self.assertEqual(1, len(wf_spec_by_exec_id.get_tasks())) diff --git a/mistral/tests/unit/services/test_workflow_service.py b/mistral/tests/unit/services/test_workflow_service.py index 544e4617e..0f0412550 100644 --- a/mistral/tests/unit/services/test_workflow_service.py +++ b/mistral/tests/unit/services/test_workflow_service.py @@ -303,17 +303,17 @@ class WorkflowServiceTest(base.DbTestCase): update_env ) - fetched = db_api.get_workflow_execution(created.id) + fetched = db_api.get_workflow_execution(created.id) - self.assertDictEqual( - wf_exec['params']['env'], - fetched.params['env'] - ) + self.assertDictEqual( + wf_exec['params']['env'], + fetched.params['env'] + ) - self.assertDictEqual( - wf_exec['context']['__env'], - fetched.context['__env'] - ) + self.assertDictEqual( + wf_exec['context']['__env'], + fetched.context['__env'] + ) def test_with_long_task_name(self): long_task_name = utils.generate_string(tasks.MAX_LENGTH_TASK_NAME + 1)