Invalidate workflow spec cache on workflow definition updates

* Workflow spec cache is updated on update of workbook and workflow
* Fixed scheduler docstring

Change-Id: I64042db4aa902166d2e80ea30adb7cbca1515c39
This commit is contained in:
Renat Akhmerov 2016-07-29 17:13:03 +07:00
parent 5d51dfcc5b
commit 09c08864dc
5 changed files with 148 additions and 25 deletions

View File

@ -105,15 +105,11 @@ class CallScheduler(periodic_task.PeriodicTasks):
time_filter = datetime.datetime.now() + datetime.timedelta(
seconds=1)
# Wrap delayed calls processing in transaction to
# guarantee that calls will be processed just once.
# Do delete query to DB first to force hanging up all
# parallel transactions.
# It should work on isolation level 'READ-COMMITTED',
# 'REPEATABLE-READ' and above.
#
# 'REPEATABLE-READ' is by default in MySQL and
# 'READ-COMMITTED is by default in PostgreSQL.
# Wrap delayed calls processing in transaction to guarantee that calls
# will be processed just once. Do delete query to DB first to force
# hanging up all parallel transactions.
# It should work with transactions which run at least 'READ-COMMITTED'
# mode.
delayed_calls = []
with db_api.transaction():
@ -128,7 +124,7 @@ class CallScheduler(periodic_task.PeriodicTasks):
result, number_of_updated = db_api.update_delayed_call(
id=call.id,
values={'processing': True},
query_filter={"processing": False}
query_filter={'processing': False}
)
# If number_of_updated != 1 other scheduler already

View File

@ -20,8 +20,10 @@ from mistral.workbook import parser as spec_parser
def create_workbook_v2(definition, scope='private'):
wb_spec = spec_parser.get_workbook_spec_from_yaml(definition)
wb_values = _get_workbook_values(
spec_parser.get_workbook_spec_from_yaml(definition),
wb_spec,
definition,
scope
)
@ -29,34 +31,38 @@ def create_workbook_v2(definition, scope='private'):
with db_api_v2.transaction():
wb_db = db_api_v2.create_workbook(wb_values)
_on_workbook_update(wb_db, wb_values)
_on_workbook_update(wb_db, wb_spec)
return wb_db
def update_workbook_v2(definition, scope='private'):
values = _get_workbook_values(
spec_parser.get_workbook_spec_from_yaml(definition),
definition,
scope
)
wb_spec = spec_parser.get_workbook_spec_from_yaml(definition)
values = _get_workbook_values(wb_spec, definition, scope)
with db_api_v2.transaction():
wb_db = db_api_v2.update_workbook(values['name'], values)
_on_workbook_update(wb_db, values)
_, db_wfs = _on_workbook_update(wb_db, wb_spec)
# Once transaction has committed we need to update specification cache.
for db_wf, wf_spec in zip(db_wfs, wb_spec.get_workflows()):
spec_parser.update_workflow_cache(db_wf.id, wf_spec)
return wb_db
def _on_workbook_update(wb_db, values):
wb_spec = spec_parser.get_workbook_spec(values['spec'])
def _on_workbook_update(wb_db, wb_spec):
db_actions = _create_or_update_actions(wb_db, wb_spec.get_actions())
db_wfs = _create_or_update_workflows(wb_db, wb_spec.get_workflows())
_create_or_update_actions(wb_db, wb_spec.get_actions())
_create_or_update_workflows(wb_db, wb_spec.get_workflows())
return db_actions, db_wfs
def _create_or_update_actions(wb_db, actions_spec):
db_actions = []
if actions_spec:
for action_spec in actions_spec:
action_name = '%s.%s' % (wb_db.name, action_spec.get_name())
@ -76,10 +82,19 @@ def _create_or_update_actions(wb_db, actions_spec):
'project_id': wb_db.project_id
}
db_api_v2.create_or_update_action_definition(action_name, values)
db_actions.append(
db_api_v2.create_or_update_action_definition(
action_name,
values
)
)
return db_actions
def _create_or_update_workflows(wb_db, workflows_spec):
db_wfs = []
if workflows_spec:
for wf_spec in workflows_spec:
wf_name = '%s.%s' % (wb_db.name, wf_spec.get_name())
@ -93,7 +108,11 @@ def _create_or_update_workflows(wb_db, workflows_spec):
'tags': wf_spec.get_tags()
}
db_api_v2.create_or_update_workflow_definition(wf_name, values)
db_wfs.append(
db_api_v2.create_or_update_workflow_definition(wf_name, values)
)
return db_wfs
def _get_workbook_values(wb_spec, definition, scope):

View File

@ -101,6 +101,11 @@ def update_workflows(definition, scope='private', identifier=None):
identifier=identifier
))
# Once transaction has committed we need to update specification cache.
for db_wf, wf_spec in zip(db_wfs, wf_list_spec.get_workflows()):
spec_parser.update_workflow_cache(db_wf.id, wf_spec)
return db_wfs

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db.v2 import api as db_api
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit import base
from mistral.workbook import parser as spec_parser
@ -36,3 +38,95 @@ class SpecificationCachingTest(base.DbTestCase):
self.assertIsNotNone(wf_spec)
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
def test_workflow_spec_cache_update_via_workflow_service(self):
wf_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.echo output="Echo"
"""
wfs = wf_service.create_workflows(wf_text)
self.assertEqual(0, spec_parser.get_workflow_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id)
self.assertEqual(1, len(wf_spec.get_tasks()))
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
# Now update workflow definition and check that cache is updated too.
wf_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.echo output="1"
task2:
action: std.echo output="2"
"""
wfs = wf_service.update_workflows(wf_text)
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id)
self.assertEqual(2, len(wf_spec.get_tasks()))
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
def test_workflow_spec_cache_update_via_workbook_service(self):
wb_text = """
version: '2.0'
name: wb
workflows:
wf:
tasks:
task1:
action: std.echo output="Echo"
"""
wb_service.create_workbook_v2(wb_text)
self.assertEqual(0, spec_parser.get_workflow_spec_cache_size())
wf = db_api.get_workflow_definition('wb.wf')
wf_spec = spec_parser.get_workflow_spec_by_id(wf.id)
self.assertEqual(1, len(wf_spec.get_tasks()))
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
# Now update workflow definition and check that cache is updated too.
wb_text = """
version: '2.0'
name: wb
workflows:
wf:
tasks:
task1:
action: std.echo output="1"
task2:
action: std.echo output="2"
"""
wb_service.update_workbook_v2(wb_text)
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_id(wf.id)
self.assertEqual(2, len(wf_spec.get_tasks()))
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())

View File

@ -14,6 +14,7 @@
# limitations under the License.
from cachetools import cached
from cachetools import hashkey
from cachetools import LRUCache
from threading import RLock
import yaml
@ -35,6 +36,7 @@ ALL_VERSIONS = [V2_0]
_WF_CACHE = LRUCache(maxsize=100)
_WF_CACHE_LOCK = RLock()
def parse_yaml(text):
@ -186,7 +188,8 @@ def _parse_def_from_wb(wb_def, section_name, item_name):
# Methods for obtaining specifications in a more efficient way using
# caching techniques.
@cached(_WF_CACHE, lock=RLock())
@cached(_WF_CACHE, lock=_WF_CACHE_LOCK)
def get_workflow_spec_by_id(wf_def_id):
if not wf_def_id:
return None
@ -203,3 +206,9 @@ def get_workflow_spec_cache_size():
def clear_caches():
"""Clears all specification caches."""
_WF_CACHE.clear()
def update_workflow_cache(wf_def_id, spec):
with _WF_CACHE_LOCK:
# We have to use hashkey function because @cached uses it implicitly.
_WF_CACHE[hashkey(wf_def_id)] = spec