From 56adb2ed65503377a20d44ea5a8ad18014d94751 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 25 Apr 2018 18:37:06 +0700 Subject: [PATCH] Make sure there are no duplicates in the spec cache w/o restarts * In case if the same workflow definition is used multiple times for many executions (e.g. on-success clause has many tasks where each one of them calls the same subworkflow) then the corresponding cache entries for those executions should refer to the same workflow specification. Mistral shouldn't create a new spec and reuse the existing one that's already cached. Note this behavior is expected only when there aren't any workflow definition updates while creating executions and the engine hasn't restarted. * Completing a docstring for one of the methods in parser.py Change-Id: Ic420aa0a90337378206332604facdcb898bf9619 --- mistral/lang/parser.py | 11 +++- mistral/tests/unit/lang/test_spec_caching.py | 59 ++++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/mistral/lang/parser.py b/mistral/lang/parser.py index 206435a2a..dc92c184a 100644 --- a/mistral/lang/parser.py +++ b/mistral/lang/parser.py @@ -33,9 +33,11 @@ V2_0 = '2.0' ALL_VERSIONS = [V2_0] +# {workflow execution id => workflow specification}. _WF_EX_CACHE = cachetools.LRUCache(maxsize=100) _WF_EX_CACHE_LOCK = threading.RLock() +# {(workflow def id, workflow def updated at) => workflow specification}. _WF_DEF_CACHE = cachetools.LRUCache(maxsize=100) _WF_DEF_CACHE_LOCK = threading.RLock() @@ -203,9 +205,12 @@ def get_workflow_spec_by_execution_id(wf_ex_id): The idea is that when a workflow execution is running we must be getting the same workflow specification even if + the workflow definition has already changed. However, note + that this is true only if the current engine instance didn't + restart during the entire workflow execution run. :param wf_ex_id: Workflow execution id. - :return: Workflow specification. + :return: Workflow specification. """ if not wf_ex_id: return None @@ -230,8 +235,8 @@ def get_workflow_spec_by_definition_id(wf_def_id, wf_def_updated_at): :param wf_def_id: Workflow definition id. :param wf_def_updated_at: Workflow definition 'updated_at' value. It - serves only as part of cache key and is not explicitly used in the - method. + serves only as part of cache key and is not explicitly used in the + method. :return: Workflow specification. """ if not wf_def_id: diff --git a/mistral/tests/unit/lang/test_spec_caching.py b/mistral/tests/unit/lang/test_spec_caching.py index 6c83accd7..1bcf2cce8 100644 --- a/mistral/tests/unit/lang/test_spec_caching.py +++ b/mistral/tests/unit/lang/test_spec_caching.py @@ -17,6 +17,7 @@ from mistral.lang import parser as spec_parser from mistral.services import workbooks as wb_service from mistral.services import workflows as wf_service from mistral.tests.unit import base +from mistral.tests.unit.engine import base as engine_base from mistral.workflow import states @@ -236,3 +237,61 @@ class SpecificationCachingTest(base.DbTestCase): ) self.assertEqual(2, len(wf_spec_by_exec_id.get_tasks())) + + +class SpecificationCachingEngineTest(engine_base.EngineTestCase): + def test_cache_workflow_spec_no_duplicates(self): + wfs_text = """ + version: '2.0' + + wf: + tasks: + task1: + action: std.noop + on-success: + - task2 + - task3 + + task2: + workflow: sub_wf my_param="val1" + + task3: + workflow: sub_wf my_param="val2" + + sub_wf: + input: + - my_param + + tasks: + task1: + action: std.echo output="Param value is <% $.my_param %>" + """ + + wfs = wf_service.create_workflows(wfs_text) + + self.assertEqual(2, len(wfs)) + + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size()) + + wf_ex = self.engine.start_workflow('wf') + + self.await_workflow_success(wf_ex.id) + + # We expect to have a cache entry for every workflow execution + # but two of them should refer to the same object. + self.assertEqual(3, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size()) + + sub_wf_execs = db_api.get_workflow_executions(name='sub_wf') + + self.assertEqual(2, len(sub_wf_execs)) + + spec1 = spec_parser.get_workflow_spec_by_execution_id( + sub_wf_execs[0].id + ) + spec2 = spec_parser.get_workflow_spec_by_execution_id( + sub_wf_execs[1].id + ) + + self.assertIs(spec1, spec2)