Ensure only one WF execution for every CT cycle

Make sure that only one engine starts a workflow for every cron
trigger cycle by checking the row count of the DML statements used
to update the cron trigger in the DB:
* Modify the update cron trigger to include a WHERE statement that
  verifies that no other engine already updated the cron trigger.
* Change the sqlachemy method used for deleting the cron trigger to be
  able to retrieve the row count of the deleted rows

Start the workflow execution only if the current engine was able to
update the cron trigger DB row

Change-Id: I9d3bec51844de218a46cf6ff53bedf9a00069e30
Closes-Bug: #1513548
(cherry picked from commit 7f9c0c6241)
This commit is contained in:
Moshe Elisha 2015-11-09 17:20:35 +00:00 committed by Renat Akhmerov
parent 72deb1df04
commit b2aa9810ff
5 changed files with 187 additions and 32 deletions

View File

@ -402,8 +402,8 @@ def create_cron_trigger(values):
return IMPL.create_cron_trigger(values)
def update_cron_trigger(name, values):
return IMPL.update_cron_trigger(name, values)
def update_cron_trigger(name, values, query_filter=None):
return IMPL.update_cron_trigger(name, values, query_filter=query_filter)
def create_or_update_cron_trigger(name, values):

View File

@ -984,15 +984,35 @@ def create_cron_trigger(values, session=None):
@b.session_aware()
def update_cron_trigger(name, values, session=None):
def update_cron_trigger(name, values, session=None, query_filter=None):
cron_trigger = _get_cron_trigger(name)
if not cron_trigger:
raise exc.NotFoundException("Cron trigger not found [name=%s]" % name)
cron_trigger.update(values.copy())
if query_filter:
try:
# Execute the UPDATE statement with the query_filter as the WHERE.
specimen = models.CronTrigger(id=cron_trigger.id, **query_filter)
cron_trigger = b.model_query(
models.CronTrigger).update_on_match(specimen=specimen,
surrogate_key='id',
values=values)
return cron_trigger
return cron_trigger, 1
except oslo_sqlalchemy.update_match.NoRowsMatched:
LOG.debug(
"No rows matched for cron update call"
"[id=%s, values=%s, query_filter=%s", id, values, query_filter
)
return cron_trigger, 0
else:
cron_trigger.update(values.copy())
return cron_trigger, len(session.dirty)
@b.session_aware()
@ -1002,7 +1022,8 @@ def create_or_update_cron_trigger(name, values, session=None):
if not cron_trigger:
return create_cron_trigger(values)
else:
return update_cron_trigger(name, values)
updated, _ = update_cron_trigger(name, values)
return updated
@b.session_aware()
@ -1012,7 +1033,13 @@ def delete_cron_trigger(name, session=None):
if not cron_trigger:
raise exc.NotFoundException("Cron trigger not found [name=%s]" % name)
session.delete(cron_trigger)
# Delete the cron trigger by ID and get the affected row count.
table = models.CronTrigger.__table__
result = session.execute(
table.delete().where(table.c.id == cron_trigger.id)
)
return result.rowcount
@b.session_aware()

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import periodic_task
@ -22,6 +24,7 @@ from oslo_service import threadgroup
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api_v2
from mistral.engine import rpc
from mistral import exceptions as exc
from mistral.services import security
from mistral.services import triggers
@ -29,6 +32,9 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
# {periodic_task: thread_group}
_periodic_tasks = {}
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
@periodic_task.periodic_task(spacing=1, run_immediately=True)
@ -44,33 +50,74 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
LOG.debug("Cron trigger security context: %s" % ctx)
try:
rpc.get_engine_client().start_workflow(
t.workflow.name,
t.workflow_input,
description="Workflow execution created by cron trigger.",
**t.workflow_params
# Try to advance the cron trigger next_execution_time and
# remaining_executions if relevant.
modified = advance_cron_trigger(t)
# If cron trigger was not already modified by another engine.
if modified:
LOG.debug(
"Starting workflow %s triggered by cron %s",
t.workflow.name, t.name
)
rpc.get_engine_client().start_workflow(
t.workflow.name,
t.workflow_input,
description="Workflow execution created "
"by cron trigger.",
**t.workflow_params
)
except Exception as e:
# Log and continue to next cron trigger
msg = (
"Failed to process cron trigger %s\n%s"
% (str(t), traceback.format_exc(e))
)
LOG.error(msg)
finally:
if (t.remaining_executions is not None and
t.remaining_executions > 0):
t.remaining_executions -= 1
if t.remaining_executions == 0:
db_api_v2.delete_cron_trigger(t.name)
else: # if remaining execution = None or > 0
next_time = triggers.get_next_execution_time(
t.pattern,
t.next_execution_time
)
auth_ctx.set_ctx(None)
db_api_v2.update_cron_trigger(
t.name,
{
'next_execution_time': next_time,
'remaining_executions': t.remaining_executions
}
)
auth_ctx.set_ctx(None)
def advance_cron_trigger(ct):
modified_count = 0
try:
# If the cron trigger is defined with limited execution count.
if (ct.remaining_executions is not None
and ct.remaining_executions > 0):
ct.remaining_executions -= 1
# If this is the last execution.
if ct.remaining_executions == 0:
modified_count = db_api_v2.delete_cron_trigger(ct.name)
else: # if remaining execution = None or > 0.
next_time = triggers.get_next_execution_time(
ct.pattern,
ct.next_execution_time
)
# Update the cron trigger with next execution details
# only if it wasn't already updated by a different process.
updated, modified_count = db_api_v2.update_cron_trigger(
ct.name,
{
'next_execution_time': next_time,
'remaining_executions': ct.remaining_executions
},
query_filter={
'next_execution_time': ct.next_execution_time
}
)
except exc.NotFoundException as e:
# Cron trigger was probably already deleted by a different process.
LOG.debug(
"Cron trigger named '%s' does not exist anymore: %s",
ct.name, str(e)
)
# Return True if this engine was able to modify the cron trigger in DB.
return modified_count > 0
def setup():
@ -90,3 +137,13 @@ def setup():
periodic_interval_max=1,
context=ctx
)
_periodic_tasks[pt] = tg
return tg
def stop_all_periodic_tasks():
for pt, tg in _periodic_tasks.items():
tg.stop()
del _periodic_tasks[pt]

View File

@ -1113,18 +1113,39 @@ class CronTriggerTest(SQLAlchemyTest):
self.assertIsNone(created.updated_at)
updated = db_api.update_cron_trigger(
updated, updated_count = db_api.update_cron_trigger(
created.name,
{'pattern': '*/1 * * * *'}
)
self.assertEqual('*/1 * * * *', updated.pattern)
self.assertEqual(1, updated_count)
fetched = db_api.get_cron_trigger(created.name)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
# Test update_cron_trigger and query_filter with results
updated, updated_count = db_api.update_cron_trigger(
created.name,
{'pattern': '*/1 * * * *'},
query_filter={'name': created.name}
)
self.assertEqual(updated, fetched)
self.assertEqual(1, updated_count)
# Test update_cron_trigger and query_filter without results
updated, updated_count = db_api.update_cron_trigger(
created.name,
{'pattern': '*/1 * * * *'},
query_filter={'name': 'not-existing-id'}
)
self.assertEqual(updated, updated)
self.assertEqual(0, updated_count)
def test_create_or_update_cron_trigger(self):
name = 'not-existing-id'
@ -1163,8 +1184,9 @@ class CronTriggerTest(SQLAlchemyTest):
self.assertEqual(created, fetched)
db_api.delete_cron_trigger(created.name)
rowcount = db_api.delete_cron_trigger(created.name)
self.assertEqual(1, rowcount)
self.assertRaises(
exc.NotFoundException,
db_api.get_cron_trigger,

View File

@ -13,10 +13,13 @@
# limitations under the License.
import datetime
import eventlet
import mock
from oslo_config import cfg
from mistral.engine import rpc
from mistral import exceptions as exc
from mistral.services import periodic
from mistral.services import security
from mistral.services import triggers as t_s
from mistral.services import workflows
@ -39,6 +42,23 @@ my_wf:
action: std.echo output='Hi!'
"""
advance_cron_trigger_orig = periodic.advance_cron_trigger
def new_advance_cron_trigger(ct):
"""Wrap the original advance_cron_trigger method.
This method makes sure that the other coroutines will also run
while this thread is executing. Without explicitly passing control to
another coroutine the process_cron_triggers_v2 will finish looping
over all the cron triggers in one coroutine without any sharing at all.
"""
eventlet.sleep()
modified = advance_cron_trigger_orig(ct)
eventlet.sleep()
return modified
class TriggerServiceV2Test(base.DbTestCase):
def setUp(self):
@ -236,3 +256,32 @@ class TriggerServiceV2Test(base.DbTestCase):
trigger_names = [t.name for t in t_s.get_next_cron_triggers()]
self.assertEqual(['test2', 'test1', 'test3'], trigger_names)
@mock.patch(
'mistral.services.periodic.advance_cron_trigger',
mock.MagicMock(side_effect=new_advance_cron_trigger)
)
@mock.patch.object(rpc.EngineClient, 'start_workflow')
def test_single_execution_with_multiple_processes(self, start_wf_mock):
def stop_thread_groups():
[tg.stop() for tg in self.tgs]
self.tgs = [periodic.setup(), periodic.setup(), periodic.setup()]
self.addCleanup(stop_thread_groups)
trigger_count = 5
t_s.create_cron_trigger(
'ct1',
self.wf.name,
{},
{},
'* * * * * */1', # Every second
None,
trigger_count,
datetime.datetime(2010, 8, 25)
)
eventlet.sleep(10)
self.assertEqual(True, start_wf_mock.called)
self.assertEqual(trigger_count, start_wf_mock.call_count)