Optimize sending result to parent workflow

* If a subworkflow completes it sends its result to a parent
  workflow by using the scheduler (delayed call) which operates
  through the database and has a delay between iterations.
  This patch optimizes this by reusing already existing
  decorator @action_queue.process to make RPC calls to convey
  subworkflow results outside of a DB transaction, similar
  way as we schedule action runs after completion of a task.
  The main reason for making this change is how Scheduler now
  works in HA mode. In fact, it doesn't scale well because
  every Scheduler instance keeps quering DB for delayed calls
  eligible for processing and hence in HA setup many Schedulers
  take same delayed calls often and clash between each other
  causing DB deadlocks in mysql. They are caused just by mysql
  locking model (it's documented in their docs) so we have
  means to handle them. However, Scheduler still remans a
  bottleneck in the system and it's better to reduce the load
  on it as much as possible.
  One more reason to make this change is that we don't solve
  the problem of eleminating the possibility to loose RPC
  messages (when a DB TX is committed and RPC calls is not made
  yet) with Scheduler anyway. If we use Scheduler for scheduling
  RPC calls we just shift the place where we can unsync DB and
  MQ to the Scheduler. So, in other words, it is a fundamental
  problem of syncing two external data sources which can't be
  naturally enrolled into one distributed transaction.
  Based on our experience or running big workflows we concluded
  that simplication of network protocols gives better results,
  meaning that the less components we use for network
  communications the better. Eventually it increases performance
  and reduces the load on the system and also reduces the
  probability of having DB and MQ out of sync.
  We used to use Scheduler for running actions on executors too by
  scheduling RPC calls but at some point we saw that it reduces
  performance on 40-50% without bringing any real benefits at
  this expense. The opposite way, Scheduler was even a worse
  bottleneck because of this. So we decided to eliminate the
  Scheduler from this chain and the system became practically
  much more performant and reliable. So now I did the same
  with delivering a subworkflow result.
  I believe when it comes to recovering from situations of
  DB and MQ being out of sync we need to come up with special
  tools that will assume some minimal human intervention
  (although I think we can recover some things automatically).
  Such a tool should just make it very obvious what's broken
  and how to fix it, and make it convenient to fix it (restart
  a task/action etc.).
* Processing action queue now happens within a new greenthread
  because otherwise Mistral engine can get into a deadlock
  by sending a request to itself while processing another one.
  It can happen if we use blocking RPC which is the only option
  for now.
* Other small fixes

Change-Id: Ic3cf6c47bba215dc6a13944b0585cce59e4e88f9
This commit is contained in:
Renat Akhmerov 2017-10-03 16:31:44 +07:00
parent ea0c07cac1
commit 14c8d807b1
10 changed files with 121 additions and 74 deletions

View File

@ -13,16 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import functools
from oslo_config import cfg
from mistral import context
from mistral.executors import base as exe
from mistral.rpc import clients as rpc
from mistral import utils
_THREAD_LOCAL_NAME = "__action_queue_thread_local"
# Action queue operations.
_RUN_ACTION = "run_action"
_ON_ACTION_COMPLETE = "on_action_complete"
def _prepare():
utils.set_thread_local(_THREAD_LOCAL_NAME, list())
@ -45,18 +52,29 @@ def _get_queue():
return queue
def _run_actions():
def _process_queue(queue):
executor = exe.get_executor(cfg.CONF.executor.type)
for action_ex, action_def, target in _get_queue():
executor.run_action(
action_ex.id,
action_def.action_class,
action_def.attributes or {},
action_ex.input,
action_ex.runtime_context.get('safe_rerun', False),
target=target
)
for operation, args in queue:
if operation == _RUN_ACTION:
action_ex, action_def, target = args
executor.run_action(
action_ex.id,
action_def.action_class,
action_def.attributes or {},
action_ex.input,
action_ex.runtime_context.get('safe_rerun', False),
target=target
)
elif operation == _ON_ACTION_COMPLETE:
action_ex_id, result, wf_action = args
rpc.get_engine_client().on_action_complete(
action_ex_id,
result,
wf_action
)
def process(func):
@ -73,7 +91,26 @@ def process(func):
try:
res = func(*args, **kw)
_run_actions()
queue = _get_queue()
auth_ctx = context.ctx() if context.has_ctx() else None
# NOTE(rakhmerov): Since we make RPC calls to the engine itself
# we need to process the action queue asynchronously in a new
# thread. Otherwise, if we have one engine process the engine
# will may send a request to itself while already processing
# another one. In conjunction with blocking RPC it will lead
# to a deadlock (and RPC timeout).
def _within_new_thread():
old_auth_ctx = context.ctx() if context.has_ctx() else None
context.set_ctx(auth_ctx)
try:
_process_queue(queue)
finally:
context.set_ctx(old_auth_ctx)
eventlet.spawn(_within_new_thread)
finally:
_clear()
@ -82,5 +119,11 @@ def process(func):
return decorate
def schedule(action_ex, action_def, target):
_get_queue().append((action_ex, action_def, target))
def schedule_run_action(action_ex, action_def, target):
_get_queue().append((_RUN_ACTION, (action_ex, action_def, target)))
def schedule_on_action_complete(action_ex_id, result, wf_action=False):
_get_queue().append(
(_ON_ACTION_COMPLETE, (action_ex_id, result, wf_action))
)

View File

@ -248,7 +248,11 @@ class PythonAction(Action):
action_ex_id=action_ex_id
)
action_queue.schedule(self.action_ex, self.action_def, target)
action_queue.schedule_run_action(
self.action_ex,
self.action_def,
target
)
@profiler.trace('action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True,

View File

@ -133,6 +133,7 @@ class DefaultEngine(base.Engine):
return action_ex.get_clone()
@action_queue.process
def pause_workflow(self, wf_ex_id):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
@ -161,6 +162,7 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone()
@action_queue.process
def stop_workflow(self, wf_ex_id, state, message=None):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)

View File

@ -60,7 +60,7 @@ class EngineServer(service_base.MistralService):
self._rpc_server.register_endpoint(self)
# Note(ddeja): Engine needs to be run in default (blocking) mode
# since using another mode may leads to deadlock.
# since using another mode may lead to a deadlock.
# See https://review.openstack.org/#/c/356343 for more info.
self._rpc_server.run(executor='blocking')

View File

@ -18,12 +18,12 @@ from osprofiler import profiler
import traceback as tb
from mistral.db.v2 import api as db_api
from mistral.engine import action_queue
from mistral.engine import workflows
from mistral import exceptions as exc
from mistral.services import scheduler
from mistral.workflow import states
LOG = logging.getLogger(__name__)
@ -82,6 +82,7 @@ def cancel_workflow(wf_ex, msg=None):
stop_workflow(wf_ex, states.CANCELLED, msg)
@action_queue.process
@profiler.trace('workflow-handler-check-and-complete', hide_args=True)
def _check_and_complete(wf_ex_id):
# Note: This method can only be called via scheduler.

View File

@ -21,12 +21,11 @@ import six
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_queue
from mistral.engine import dispatcher
from mistral.engine import utils as engine_utils
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from mistral.rpc import clients as rpc
from mistral.services import scheduler
from mistral.services import triggers
from mistral.services import workflows as wf_service
from mistral import utils
@ -42,10 +41,6 @@ from mistral_lib import actions as ml_actions
LOG = logging.getLogger(__name__)
_SEND_RESULT_TO_PARENT_WORKFLOW_PATH = (
'mistral.engine.workflows._send_result_to_parent_workflow'
)
@six.add_metaclass(abc.ABCMeta)
class Workflow(object):
@ -118,12 +113,11 @@ class Workflow(object):
assert self.wf_ex
if state == states.SUCCESS:
return self._succeed_workflow(self._get_final_context(), msg)
self._succeed_workflow(self._get_final_context(), msg)
elif state == states.ERROR:
return self._fail_workflow(self._get_final_context(), msg)
self._fail_workflow(self._get_final_context(), msg)
elif state == states.CANCELLED:
return self._cancel_workflow(msg)
self._cancel_workflow(msg)
def pause(self, msg=None):
"""Pause workflow.
@ -379,6 +373,7 @@ class Workflow(object):
else:
msg = _build_fail_info_message(wf_ctrl, self.wf_ex)
final_context = wf_ctrl.evaluate_workflow_final_context()
self._fail_workflow(final_context, msg)
return 0
@ -394,13 +389,14 @@ class Workflow(object):
self.set_state(states.SUCCESS, msg)
if self.wf_ex.task_execution_id:
self._schedule_send_result_to_parent_workflow()
self._send_result_to_parent_workflow()
def _fail_workflow(self, final_context, msg):
if states.is_paused_or_completed(self.wf_ex.state):
return
output_on_error = {}
try:
output_on_error = data_flow.evaluate_workflow_output(
self.wf_ex,
@ -427,7 +423,7 @@ class Workflow(object):
self.wf_ex.output = merge_dicts({'result': msg}, output_on_error)
if self.wf_ex.task_execution_id:
self._schedule_send_result_to_parent_workflow()
self._send_result_to_parent_workflow()
def _cancel_workflow(self, msg):
if states.is_completed(self.wf_ex.state):
@ -445,14 +441,35 @@ class Workflow(object):
self.wf_ex.output = {'result': msg}
if self.wf_ex.task_execution_id:
self._schedule_send_result_to_parent_workflow()
self._send_result_to_parent_workflow()
def _schedule_send_result_to_parent_workflow(self):
scheduler.schedule_call(
None,
_SEND_RESULT_TO_PARENT_WORKFLOW_PATH,
0,
wf_ex_id=self.wf_ex.id
def _send_result_to_parent_workflow(self):
if self.wf_ex.state == states.SUCCESS:
result = ml_actions.Result(data=self.wf_ex.output)
elif self.wf_ex.state == states.ERROR:
err_msg = (
self.wf_ex.state_info or
'Failed subworkflow [execution_id=%s]' % self.wf_ex.id
)
result = ml_actions.Result(error=err_msg)
elif self.wf_ex.state == states.CANCELLED:
err_msg = (
self.wf_ex.state_info or
'Cancelled subworkflow [execution_id=%s]' % self.wf_ex.id
)
result = ml_actions.Result(error=err_msg, cancel=True)
else:
raise RuntimeError(
"Method _send_result_to_parent_workflow() must never be called"
" if a workflow is not in SUCCESS, ERROR or CANCELLED state."
)
action_queue.schedule_on_action_complete(
self.wf_ex.id,
result,
wf_action=True
)
@ -478,41 +495,6 @@ def _get_environment(params):
)
def _send_result_to_parent_workflow(wf_ex_id):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_output = wf_ex.output
if wf_ex.state == states.SUCCESS:
result = ml_actions.Result(data=wf_output)
elif wf_ex.state == states.ERROR:
err_msg = (
wf_ex.state_info or
'Failed subworkflow [execution_id=%s]' % wf_ex.id
)
result = ml_actions.Result(error=err_msg)
elif wf_ex.state == states.CANCELLED:
err_msg = (
wf_ex.state_info or
'Cancelled subworkflow [execution_id=%s]' % wf_ex.id
)
result = ml_actions.Result(error=err_msg, cancel=True)
else:
raise RuntimeError(
"Method _send_result_to_parent_workflow() must never be called"
" if a workflow is not in SUCCESS, ERROR or CANCELLED state."
)
rpc.get_engine_client().on_action_complete(
wf_ex.id,
result,
wf_action=True
)
def _build_fail_info_message(wf_ctrl, wf_ex):
# Try to find where error is exactly.
failed_tasks = sorted(

View File

@ -15,7 +15,6 @@
from oslo_config import cfg
from oslo_log import log as logging
from mistral.rpc import base as rpc_base
from mistral.rpc import clients as rpc_clients
@ -26,5 +25,4 @@ class RemoteExecutor(rpc_clients.ExecutorClient):
"""Executor that passes execution request to a remote executor."""
def __init__(self):
self.topic = cfg.CONF.executor.topic
self._client = rpc_base.get_rpc_client_driver()(cfg.CONF.executor)
super(RemoteExecutor, self).__init__(cfg.CONF.executor)

View File

@ -67,16 +67,22 @@ class EngineTestCase(base.DbTestCase):
# Start remote executor.
if cfg.CONF.executor.type == 'remote':
LOG.info("Starting remote executor threads...")
self.executor_client = rpc_clients.get_executor_client()
exe_svc = executor_server.get_oslo_service(setup_profiler=False)
self.executor = exe_svc.executor
self.threads.append(eventlet.spawn(launch_service, exe_svc))
self.addCleanup(exe_svc.stop, True)
# Start engine.
LOG.info("Starting engine threads...")
self.engine_client = rpc_clients.get_engine_client()
eng_svc = engine_server.get_oslo_service(setup_profiler=False)
self.engine = eng_svc.engine
self.threads.append(eventlet.spawn(launch_service, eng_svc))
self.addCleanup(eng_svc.stop, True)

View File

@ -404,6 +404,7 @@ class SubworkflowsTest(base.EngineTestCase):
with db_api.transaction():
ex = db_api.get_workflow_execution(ex.id)
self.assertIn('not_existing_wf', ex.state_info)
def test_dynamic_subworkflow_with_generic_input(self):
@ -417,10 +418,12 @@ class SubworkflowsTest(base.EngineTestCase):
wf_identifier='wb4.wf1',
wf_input={'wf_name': 'wf2', 'inp': 'invalid_string_input'}
)
self.await_workflow_error(ex.id)
with db_api.transaction():
ex = db_api.get_workflow_execution(ex.id)
self.assertIn('invalid_string_input', ex.state_info)
def _test_dynamic_workflow_with_dict_param(self, wf_identifier):
@ -428,17 +431,21 @@ class SubworkflowsTest(base.EngineTestCase):
wf_identifier=wf_identifier,
wf_input={'wf_name': 'wf2', 'inp': {'inp': 'abc'}}
)
self.await_workflow_success(ex.id)
with db_api.transaction():
ex = db_api.get_workflow_execution(ex.id)
self.assertEqual({'sub_wf_out': 'abc'}, ex.output)
def test_subworkflow_root_execution_id(self):
wf1_ex = self.engine.start_workflow('wb6.wf1', '', None)
self.engine.start_workflow('wb6.wf1', '', None)
self._await(lambda: len(db_api.get_workflow_executions()) == 3, 0.5, 5)
wf_execs = db_api.get_workflow_executions()
wf1_ex = self._assert_single_item(wf_execs, name='wb6.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb6.wf2')
wf3_ex = self._assert_single_item(wf_execs, name='wb6.wf3')

View File

@ -87,7 +87,9 @@ class LocalExecutorTestCase(base.ExecutorTestCase):
"""
wb_svc.create_workbook_v2(wb_def)
wf_ex = self.engine.start_workflow('wb1.wf1', '', {})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
@ -144,7 +146,9 @@ class LocalExecutorTestCase(base.ExecutorTestCase):
"""
wb_svc.create_workbook_v2(wb_def)
wf_ex = self.engine.start_workflow('wb1.wf1', '', {})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():