From 1ae082794a94cb0746fbbdde757c605816ce2c1d Mon Sep 17 00:00:00 2001 From: Mike Fedosin Date: Thu, 16 Nov 2017 00:44:04 +0300 Subject: [PATCH] Cache action definitions Now to perform some action mistral gets its definition from the database first. It's not really optimal, because if there are a lot of similar action calls, mistral will reread the same data from db. It increases the whole execution time and the load on the database. To improve the performance it's suggested to cache read definitions and take them from the cache instead of the database in the subsequent times. Cache ttl can be configured with ``action_definition_cache_time`` option from [engine] group. The default value is 60 seconds. Change-Id: I330b7cde982821d4f0a06cdd2954499ac0b7be37 --- mistral/config.py | 6 ++ mistral/engine/actions.py | 19 +++-- .../tests/unit/engine/test_lookup_utils.py | 82 +++++++++++++++++++ mistral/workflow/lookup_utils.py | 50 +++++++++-- ...n_definition_caching-78d4446d61c6d739.yaml | 9 ++ 5 files changed, 153 insertions(+), 13 deletions(-) create mode 100644 releasenotes/notes/add_action_definition_caching-78d4446d61c6d739.yaml diff --git a/mistral/config.py b/mistral/config.py index de3f5de05..0c1108684 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -172,6 +172,12 @@ engine_opts = [ ' will be restored automatically. If this property is' ' set to a negative value Mistral will never be doing ' ' this check.') + ), + cfg.IntOpt( + 'action_definition_cache_time', + default=60, + help=_('A number of seconds that indicates how long action ' + 'definitions should be stored in the local cache.') ) ] diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index a0b0a4a46..ad4bd55bd 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -32,6 +32,7 @@ from mistral.services import security from mistral import utils from mistral.utils import wf_trace from mistral.workflow import data_flow +from mistral.workflow import lookup_utils from mistral.workflow import states from mistral_lib import actions as ml_actions @@ -367,11 +368,11 @@ class AdHocAction(PythonAction): wf_ctx=None): self.action_spec = spec_parser.get_action_spec(action_def.spec) - try: - base_action_def = db_api.get_action_definition( - self.action_spec.get_base() - ) - except exc.DBEntityNotFoundError: + base_action_def = lookup_utils.find_action_definition_by_name( + self.action_spec.get_base() + ) + + if not base_action_def: raise exc.InvalidActionException( "Failed to find action [action_name=%s]" % self.action_spec.get_base() @@ -607,10 +608,14 @@ def resolve_action_definition(action_spec_name, wf_name=None, action_full_name = "%s.%s" % (wb_name, action_spec_name) - action_db = db_api.load_action_definition(action_full_name) + action_db = lookup_utils.find_action_definition_by_name( + action_full_name + ) if not action_db: - action_db = db_api.load_action_definition(action_spec_name) + action_db = lookup_utils.find_action_definition_by_name( + action_spec_name + ) if not action_db: raise exc.InvalidActionException( diff --git a/mistral/tests/unit/engine/test_lookup_utils.py b/mistral/tests/unit/engine/test_lookup_utils.py index 738410259..91db57503 100644 --- a/mistral/tests/unit/engine/test_lookup_utils.py +++ b/mistral/tests/unit/engine/test_lookup_utils.py @@ -12,9 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + +import cachetools from oslo_config import cfg from mistral.db.v2 import api as db_api +from mistral.services import actions as action_service from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import lookup_utils @@ -80,3 +84,81 @@ class LookupUtilsTest(base.EngineTestCase): # Expecting that the cache size is 0 because the workflow has # finished and invalidated corresponding cache entry. self.assertEqual(0, lookup_utils.get_task_execution_cache_size()) + + def test_action_definition_cache_ttl(self): + action = """--- + version: '2.0' + + action1: + base: std.echo output='Hi' + output: + result: $ + """ + + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: action1 + on-success: join_task + + task2: + action: action1 + on-success: join_task + + join_task: + join: all + on-success: task4 + + task4: + action: action1 + pause-before: true + """ + + wf_service.create_workflows(wf_text) + + # Create an action. + db_actions = action_service.create_actions(action) + + self.assertEqual(1, len(db_actions)) + self._assert_single_item(db_actions, name='action1') + + # Explicitly mark the action to be deleted after the test execution. + self.addCleanup(db_api.delete_action_definitions, name='action1') + + # Reinitialise the cache with reduced action_definition_cache_time + # to make the test faster. + # Save the existing cache into a temporary variable and restore + # the value when the test passed. + old_cache = lookup_utils._ACTION_DEF_CACHE + lookup_utils._ACTION_DEF_CACHE = cachetools.TTLCache( + maxsize=1000, + ttl=5 # 5 seconds + ) + self.addCleanup(setattr, lookup_utils, '_ACTION_DEF_CACHE', old_cache) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf') + + self.await_workflow_paused(wf_ex.id) + + # Check that 'action1' 'echo' and 'noop' are cached. + self.assertEqual(3, lookup_utils.get_action_definition_cache_size()) + self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE) + self.assertIn('std.noop', lookup_utils._ACTION_DEF_CACHE) + self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE) + + # Wait some time until cache expires + time.sleep(7) + self.assertEqual(0, lookup_utils.get_action_definition_cache_size()) + + self.engine.resume_workflow(wf_ex.id) + + self.await_workflow_success(wf_ex.id) + + # Check all actions are cached again. + self.assertEqual(2, lookup_utils.get_action_definition_cache_size()) + self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE) + self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE) diff --git a/mistral/workflow/lookup_utils.py b/mistral/workflow/lookup_utils.py index 901dcb2b3..68de2c10b 100644 --- a/mistral/workflow/lookup_utils.py +++ b/mistral/workflow/lookup_utils.py @@ -29,13 +29,18 @@ Mostly, they are useful for doing any kind of fast lookups with in order to make some decision based on their state. """ -import cachetools import threading +import cachetools +from oslo_config import cfg + from mistral.db.v2 import api as db_api from mistral.workflow import states +CONF = cfg.CONF + + def _create_lru_cache_for_workflow_execution(wf_ex_id): return cachetools.LRUCache(maxsize=500) @@ -49,7 +54,33 @@ _TASK_EX_CACHE = cachetools.LRUCache( missing=_create_lru_cache_for_workflow_execution ) -_CACHE_LOCK = threading.RLock() +_ACTION_DEF_CACHE = cachetools.TTLCache( + maxsize=1000, + ttl=CONF.engine.action_definition_cache_time # 60 seconds by default +) + +_TASK_EX_CACHE_LOCK = threading.RLock() +_ACTION_DEF_CACHE_LOCK = threading.RLock() + + +def find_action_definition_by_name(action_name): + """Find action definition name. + + :param action_name: Action name. + :return: Action definition (possibly a cached value). + """ + with _ACTION_DEF_CACHE_LOCK: + action_definition = _ACTION_DEF_CACHE.get(action_name) + + if action_definition: + return action_definition + + action_definition = db_api.load_action_definition(action_name) + + with _ACTION_DEF_CACHE_LOCK: + _ACTION_DEF_CACHE[action_name] = action_definition + + return action_definition def find_task_executions_by_name(wf_ex_id, task_name): @@ -59,7 +90,7 @@ def find_task_executions_by_name(wf_ex_id, task_name): :param task_name: Task name. :return: Task executions (possibly a cached value). """ - with _CACHE_LOCK: + with _TASK_EX_CACHE_LOCK: t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name) if t_execs: @@ -78,7 +109,7 @@ def find_task_executions_by_name(wf_ex_id, task_name): ) if all_finished: - with _CACHE_LOCK: + with _TASK_EX_CACHE_LOCK: _TASK_EX_CACHE[wf_ex_id][task_name] = t_execs return t_execs @@ -124,12 +155,19 @@ def get_task_execution_cache_size(): return len(_TASK_EX_CACHE) +def get_action_definition_cache_size(): + return len(_ACTION_DEF_CACHE) + + def invalidate_cached_task_executions(wf_ex_id): - with _CACHE_LOCK: + with _TASK_EX_CACHE_LOCK: if wf_ex_id in _TASK_EX_CACHE: del _TASK_EX_CACHE[wf_ex_id] def clear_caches(): - with _CACHE_LOCK: + with _TASK_EX_CACHE_LOCK: _TASK_EX_CACHE.clear() + + with _ACTION_DEF_CACHE_LOCK: + _ACTION_DEF_CACHE.clear() diff --git a/releasenotes/notes/add_action_definition_caching-78d4446d61c6d739.yaml b/releasenotes/notes/add_action_definition_caching-78d4446d61c6d739.yaml new file mode 100644 index 000000000..faf9cc5b0 --- /dev/null +++ b/releasenotes/notes/add_action_definition_caching-78d4446d61c6d739.yaml @@ -0,0 +1,9 @@ +--- +features: + - | + Enable caching of action definitions in local memory. Now, instead of + downloading the definitions from the database every time, mistral engine + will store them in a local cache. This should reduce the number of + database requests and improve the whole performance of the the system. + Cache ttl can be configured with ``action_definition_cache_time`` option + from [engine] group. The default value is 60 seconds.