Make sure there are no duplicates in the spec cache w/o restarts

* In case if the same workflow definition is used multiple times
  for many executions (e.g. on-success clause has many tasks where
  each one of them calls the same subworkflow) then the corresponding
  cache entries for those executions should refer to the same
  workflow specification. Mistral shouldn't create a new spec and
  reuse the existing one that's already cached. Note this behavior
  is expected only when there aren't any workflow definition updates
  while creating executions and the engine hasn't restarted.
* Completing a docstring for one of the methods in parser.py

Change-Id: Ic420aa0a90337378206332604facdcb898bf9619
This commit is contained in:
Renat Akhmerov 2018-04-25 18:37:06 +07:00
parent 59467da7a8
commit 56adb2ed65
2 changed files with 67 additions and 3 deletions

View File

@ -33,9 +33,11 @@ V2_0 = '2.0'
ALL_VERSIONS = [V2_0]
# {workflow execution id => workflow specification}.
_WF_EX_CACHE = cachetools.LRUCache(maxsize=100)
_WF_EX_CACHE_LOCK = threading.RLock()
# {(workflow def id, workflow def updated at) => workflow specification}.
_WF_DEF_CACHE = cachetools.LRUCache(maxsize=100)
_WF_DEF_CACHE_LOCK = threading.RLock()
@ -203,9 +205,12 @@ def get_workflow_spec_by_execution_id(wf_ex_id):
The idea is that when a workflow execution is running we
must be getting the same workflow specification even if
the workflow definition has already changed. However, note
that this is true only if the current engine instance didn't
restart during the entire workflow execution run.
:param wf_ex_id: Workflow execution id.
:return: Workflow specification.
:return: Workflow specification.
"""
if not wf_ex_id:
return None
@ -230,8 +235,8 @@ def get_workflow_spec_by_definition_id(wf_def_id, wf_def_updated_at):
:param wf_def_id: Workflow definition id.
:param wf_def_updated_at: Workflow definition 'updated_at' value. It
serves only as part of cache key and is not explicitly used in the
method.
serves only as part of cache key and is not explicitly used in the
method.
:return: Workflow specification.
"""
if not wf_def_id:

View File

@ -17,6 +17,7 @@ from mistral.lang import parser as spec_parser
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit import base
from mistral.tests.unit.engine import base as engine_base
from mistral.workflow import states
@ -236,3 +237,61 @@ class SpecificationCachingTest(base.DbTestCase):
)
self.assertEqual(2, len(wf_spec_by_exec_id.get_tasks()))
class SpecificationCachingEngineTest(engine_base.EngineTestCase):
def test_cache_workflow_spec_no_duplicates(self):
wfs_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.noop
on-success:
- task2
- task3
task2:
workflow: sub_wf my_param="val1"
task3:
workflow: sub_wf my_param="val2"
sub_wf:
input:
- my_param
tasks:
task1:
action: std.echo output="Param value is <% $.my_param %>"
"""
wfs = wf_service.create_workflows(wfs_text)
self.assertEqual(2, len(wfs))
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size())
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
# We expect to have a cache entry for every workflow execution
# but two of them should refer to the same object.
self.assertEqual(3, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size())
sub_wf_execs = db_api.get_workflow_executions(name='sub_wf')
self.assertEqual(2, len(sub_wf_execs))
spec1 = spec_parser.get_workflow_spec_by_execution_id(
sub_wf_execs[0].id
)
spec2 = spec_parser.get_workflow_spec_by_execution_id(
sub_wf_execs[1].id
)
self.assertIs(spec1, spec2)