Merge "Cache action definitions"

This commit is contained in:
Zuul 2018-02-15 10:14:19 +00:00 committed by Gerrit Code Review
commit 1f83e5d5a2
5 changed files with 153 additions and 13 deletions

View File

@ -172,6 +172,12 @@ engine_opts = [
' will be restored automatically. If this property is'
' set to a negative value Mistral will never be doing '
' this check.')
),
cfg.IntOpt(
'action_definition_cache_time',
default=60,
help=_('A number of seconds that indicates how long action '
'definitions should be stored in the local cache.')
)
]

View File

@ -32,6 +32,7 @@ from mistral.services import security
from mistral import utils
from mistral.utils import wf_trace
from mistral.workflow import data_flow
from mistral.workflow import lookup_utils
from mistral.workflow import states
from mistral_lib import actions as ml_actions
@ -367,11 +368,11 @@ class AdHocAction(PythonAction):
wf_ctx=None):
self.action_spec = spec_parser.get_action_spec(action_def.spec)
try:
base_action_def = db_api.get_action_definition(
self.action_spec.get_base()
)
except exc.DBEntityNotFoundError:
base_action_def = lookup_utils.find_action_definition_by_name(
self.action_spec.get_base()
)
if not base_action_def:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" %
self.action_spec.get_base()
@ -607,10 +608,14 @@ def resolve_action_definition(action_spec_name, wf_name=None,
action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = db_api.load_action_definition(action_full_name)
action_db = lookup_utils.find_action_definition_by_name(
action_full_name
)
if not action_db:
action_db = db_api.load_action_definition(action_spec_name)
action_db = lookup_utils.find_action_definition_by_name(
action_spec_name
)
if not action_db:
raise exc.InvalidActionException(

View File

@ -12,9 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import cachetools
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.services import actions as action_service
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
from mistral.workflow import lookup_utils
@ -80,3 +84,81 @@ class LookupUtilsTest(base.EngineTestCase):
# Expecting that the cache size is 0 because the workflow has
# finished and invalidated corresponding cache entry.
self.assertEqual(0, lookup_utils.get_task_execution_cache_size())
def test_action_definition_cache_ttl(self):
action = """---
version: '2.0'
action1:
base: std.echo output='Hi'
output:
result: $
"""
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: action1
on-success: join_task
task2:
action: action1
on-success: join_task
join_task:
join: all
on-success: task4
task4:
action: action1
pause-before: true
"""
wf_service.create_workflows(wf_text)
# Create an action.
db_actions = action_service.create_actions(action)
self.assertEqual(1, len(db_actions))
self._assert_single_item(db_actions, name='action1')
# Explicitly mark the action to be deleted after the test execution.
self.addCleanup(db_api.delete_action_definitions, name='action1')
# Reinitialise the cache with reduced action_definition_cache_time
# to make the test faster.
# Save the existing cache into a temporary variable and restore
# the value when the test passed.
old_cache = lookup_utils._ACTION_DEF_CACHE
lookup_utils._ACTION_DEF_CACHE = cachetools.TTLCache(
maxsize=1000,
ttl=5 # 5 seconds
)
self.addCleanup(setattr, lookup_utils, '_ACTION_DEF_CACHE', old_cache)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_paused(wf_ex.id)
# Check that 'action1' 'echo' and 'noop' are cached.
self.assertEqual(3, lookup_utils.get_action_definition_cache_size())
self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.noop', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE)
# Wait some time until cache expires
time.sleep(7)
self.assertEqual(0, lookup_utils.get_action_definition_cache_size())
self.engine.resume_workflow(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Check all actions are cached again.
self.assertEqual(2, lookup_utils.get_action_definition_cache_size())
self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE)

View File

@ -29,13 +29,18 @@ Mostly, they are useful for doing any kind of fast lookups with in order
to make some decision based on their state.
"""
import cachetools
import threading
import cachetools
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.workflow import states
CONF = cfg.CONF
def _create_lru_cache_for_workflow_execution(wf_ex_id):
return cachetools.LRUCache(maxsize=500)
@ -49,7 +54,33 @@ _TASK_EX_CACHE = cachetools.LRUCache(
missing=_create_lru_cache_for_workflow_execution
)
_CACHE_LOCK = threading.RLock()
_ACTION_DEF_CACHE = cachetools.TTLCache(
maxsize=1000,
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
)
_TASK_EX_CACHE_LOCK = threading.RLock()
_ACTION_DEF_CACHE_LOCK = threading.RLock()
def find_action_definition_by_name(action_name):
"""Find action definition name.
:param action_name: Action name.
:return: Action definition (possibly a cached value).
"""
with _ACTION_DEF_CACHE_LOCK:
action_definition = _ACTION_DEF_CACHE.get(action_name)
if action_definition:
return action_definition
action_definition = db_api.load_action_definition(action_name)
with _ACTION_DEF_CACHE_LOCK:
_ACTION_DEF_CACHE[action_name] = action_definition
return action_definition
def find_task_executions_by_name(wf_ex_id, task_name):
@ -59,7 +90,7 @@ def find_task_executions_by_name(wf_ex_id, task_name):
:param task_name: Task name.
:return: Task executions (possibly a cached value).
"""
with _CACHE_LOCK:
with _TASK_EX_CACHE_LOCK:
t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name)
if t_execs:
@ -78,7 +109,7 @@ def find_task_executions_by_name(wf_ex_id, task_name):
)
if all_finished:
with _CACHE_LOCK:
with _TASK_EX_CACHE_LOCK:
_TASK_EX_CACHE[wf_ex_id][task_name] = t_execs
return t_execs
@ -124,12 +155,19 @@ def get_task_execution_cache_size():
return len(_TASK_EX_CACHE)
def get_action_definition_cache_size():
return len(_ACTION_DEF_CACHE)
def invalidate_cached_task_executions(wf_ex_id):
with _CACHE_LOCK:
with _TASK_EX_CACHE_LOCK:
if wf_ex_id in _TASK_EX_CACHE:
del _TASK_EX_CACHE[wf_ex_id]
def clear_caches():
with _CACHE_LOCK:
with _TASK_EX_CACHE_LOCK:
_TASK_EX_CACHE.clear()
with _ACTION_DEF_CACHE_LOCK:
_ACTION_DEF_CACHE.clear()

View File

@ -0,0 +1,9 @@
---
features:
- |
Enable caching of action definitions in local memory. Now, instead of
downloading the definitions from the database every time, mistral engine
will store them in a local cache. This should reduce the number of
database requests and improve the whole performance of the the system.
Cache ttl can be configured with ``action_definition_cache_time`` option
from [engine] group. The default value is 60 seconds.