Merge "A mechanism to close stuck running action executions"

This commit is contained in:
Zuul 2018-06-17 08:42:56 +00:00 committed by Gerrit Code Review
commit e6aa24b9a7
17 changed files with 477 additions and 29 deletions

View File

@ -131,6 +131,43 @@ directory.
For more details see `policy.json file
<https://docs.openstack.org/oslo.policy/latest/admin/policy-json-file.html>`_.
#. Modify the action execution reporting configuration if needed.
It is possible that actions stuck in *"RUNNING"* state, for example if the
assigned executor dies or the message that signals the completion of the
action is lost. This section describes a heartbeat based solution to close
these forgotten action executions. The related configuration options are
``max_missed_heartbeats`` and ``check_interval``. Note that if either
of these options are *"0"* then the feature won't be enabled.
The default configuration is the following::
[action_heartbeat]
max_missed_heartbeats = 15
check_interval = 20
first_heartbeat_timeout = 3600
*"check_interval = 20"*, so check action executions every
20 seconds. When the checker runs it will transit all running action
executions to error if the last heartbeat received is older than *"20 \*
15"* seconds. Note that *"first_heartbeat_timeout = 3600"*, so the action
execution won't be closed for 3600 seconds if no heartbeat was received for
it.
- **max_missed_heartbeats**
Defines the maximum amount of missed heartbeats to be allowed. If the number
of missed heartbeats exceeds this number, then the related action execution
will be transited to *"ERROR"* state with cause *"Heartbeat wasn't received."*.
- **check_interval**
The interval between checks (in seconds).
- **first_heartbeat_timeout**
The grace period for the first heartbeat (in seconds).
#. Finally, try to run mistral engine and verify that it is running without
any error::

View File

@ -337,7 +337,7 @@ execution_expiration_policy_opts = [
'evaluation_interval',
help=_('How often will the executions be evaluated '
'(in minutes). For example for value 120 the interval '
'will be 2 hours (every 2 hours).'
'will be 2 hours (every 2 hours). '
'Note that only final state executions will be removed: '
'( SUCCESS / ERROR / CANCELLED ).')
),
@ -351,12 +351,12 @@ execution_expiration_policy_opts = [
cfg.IntOpt(
'max_finished_executions',
default=0,
help=_('The maximum number of finished workflow executions'
'to be stored. For example when max_finished_executions = 100,'
'only the 100 latest finished executions will be preserved.'
'This means that even unexpired executions are eligible'
'for deletion, to decrease the number of executions in the'
'database. The default value is 0. If it is set to 0,'
help=_('The maximum number of finished workflow executions '
'to be stored. For example when max_finished_executions = 100, '
'only the 100 latest finished executions will be preserved. '
'This means that even unexpired executions are eligible '
'for deletion, to decrease the number of executions in the '
'database. The default value is 0. If it is set to 0, '
'this constraint won\'t be applied.')
),
cfg.IntOpt(
@ -364,11 +364,44 @@ execution_expiration_policy_opts = [
default=0,
help=_('Size of batch of expired executions to be deleted.'
'The default value is 0. If it is set to 0, '
'size of batch is total number of expired executions'
'size of batch is total number of expired executions '
'that is going to be deleted.')
)
]
action_heartbeat_opts = [
cfg.IntOpt(
'max_missed_heartbeats',
min=0,
default=15,
help=_('The maximum amount of missed heartbeats to be allowed. '
'If set to 0 then this feature won\'t be enabled. '
'See check_interval for more details.')
),
cfg.IntOpt(
'check_interval',
min=0,
default=20,
help=_('How often the action executions are checked (in seconds). '
'For example when check_interval = 10, check action '
'executions every 10 seconds. When the checker runs it will '
'transit all running action executions to error if the last '
'heartbeat received is older than 10 * max_missed_heartbeats '
'seconds. If set to 0 then this feature won\'t be enabled.')
),
cfg.IntOpt(
'first_heartbeat_timeout',
min=0,
default=3600,
help=_('The first heartbeat is handled differently, to provide a '
'grace period in case there is no available executor to handle '
'the action execution. For example when '
'first_heartbeat_timeout = 3600, wait 3600 seconds before '
'closing the action executions that never received a heartbeat.'
)
)
]
coordination_opts = [
cfg.StrOpt(
'backend_url',
@ -514,6 +547,7 @@ NOTIFIER_GROUP = 'notifier'
PECAN_GROUP = 'pecan'
COORDINATION_GROUP = 'coordination'
EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy'
ACTION_HEARTBEAT_GROUP = 'action_heartbeat'
PROFILER_GROUP = profiler.list_opts()[0][0]
KEYCLOAK_OIDC_GROUP = "keycloak_oidc"
OPENSTACK_ACTIONS_GROUP = 'openstack_actions'
@ -536,6 +570,10 @@ CONF.register_opts(
execution_expiration_policy_opts,
group=EXECUTION_EXPIRATION_POLICY_GROUP
)
CONF.register_opts(
action_heartbeat_opts,
group=ACTION_HEARTBEAT_GROUP
)
CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP)
CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP)
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
@ -591,6 +629,7 @@ def list_opts():
(KEYCLOAK_OIDC_GROUP, keycloak_oidc_opts),
(OPENSTACK_ACTIONS_GROUP, openstack_actions_opts),
(YAQL_GROUP, yaql_opts),
(ACTION_HEARTBEAT_GROUP, action_heartbeat_opts),
(None, default_group_opts)
]

View File

@ -0,0 +1,51 @@
# Copyright 2018 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Add last_heartbeat to action execution
Revision ID: 027
Revises: 026
Create Date: 2018-09-05 16:49:50.342349
"""
# revision identifiers, used by Alembic.
revision = '027'
down_revision = '026'
from alembic import op
import datetime
from mistral import utils
from oslo_config import cfg
from sqlalchemy import Column, DateTime, Boolean
CONF = cfg.CONF
def upgrade():
op.add_column(
'action_executions_v2',
Column(
'last_heartbeat',
DateTime,
default=lambda: utils.utc_now_sec() + datetime.timedelta(
seconds=CONF.action_heartbeat.first_heartbeat_timeout
)
)
)
op.add_column(
'action_executions_v2',
Column('is_sync', Boolean, default=None, nullable=True)
)

View File

@ -211,8 +211,8 @@ def delete_action_definitions(**kwargs):
# Action executions.
def get_action_execution(id, fields=()):
return IMPL.get_action_execution(id, fields=fields)
def get_action_execution(id, fields=(), insecure=False):
return IMPL.get_action_execution(id, fields=fields, insecure=insecure)
def load_action_execution(name, fields=()):
@ -228,8 +228,8 @@ def create_action_execution(values):
return IMPL.create_action_execution(values)
def update_action_execution(id, values):
return IMPL.update_action_execution(id, values)
def update_action_execution(id, values, insecure=False):
return IMPL.update_action_execution(id, values, insecure)
def create_or_update_action_execution(id, values):
@ -413,6 +413,10 @@ def get_expired_executions(expiration_time, limit=None, columns=()):
)
def get_running_expired_sync_actions(expiration_time, session=None):
return IMPL.get_running_expired_sync_actions(expiration_time)
def get_superfluous_executions(max_finished_executions, limit=None,
columns=()):
return IMPL.get_superfluous_executions(

View File

@ -669,8 +669,9 @@ def delete_action_definitions(session=None, **kwargs):
# Action executions.
@b.session_aware()
def get_action_execution(id, fields=(), session=None):
a_ex = _get_db_object_by_id(models.ActionExecution, id, columns=fields)
def get_action_execution(id, insecure=False, fields=(), session=None):
a_ex = _get_db_object_by_id(models.ActionExecution, id, insecure=insecure,
columns=fields)
if not a_ex:
raise exc.DBEntityNotFoundError(
@ -707,8 +708,8 @@ def create_action_execution(values, session=None):
@b.session_aware()
def update_action_execution(id, values, session=None):
a_ex = get_action_execution(id)
def update_action_execution(id, values, insecure=False, session=None):
a_ex = get_action_execution(id, insecure)
a_ex.update(values.copy())
@ -1098,6 +1099,18 @@ def get_expired_executions(expiration_time, limit=None, columns=(),
return query.all()
@b.session_aware()
def get_running_expired_sync_actions(expiration_time, session=None):
query = b.model_query(models.ActionExecution)
query = query.filter(
models.ActionExecution.last_heartbeat < expiration_time
)
query = query.filter_by(is_sync=True)
query = query.filter(models.ActionExecution.state == states.RUNNING)
return query.all()
@b.session_aware()
def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
session=None):

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import hashlib
import json
import sys
@ -33,6 +34,7 @@ from mistral import utils
# Definition objects.
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -195,6 +197,13 @@ class ActionExecution(Execution):
accepted = sa.Column(sa.Boolean(), default=False)
input = sa.Column(st.JsonLongDictType(), nullable=True)
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
last_heartbeat = sa.Column(
sa.DateTime,
default=lambda: utils.utc_now_sec() + datetime.timedelta(
seconds=CONF.action_heartbeat.first_heartbeat_timeout
)
)
is_sync = sa.Column(sa.Boolean(), default=None, nullable=True)
class WorkflowExecution(Execution):

View File

@ -150,7 +150,7 @@ class Action(object):
"""
return True
def _create_action_execution(self, input_dict, runtime_ctx,
def _create_action_execution(self, input_dict, runtime_ctx, is_sync,
desc='', action_ex_id=None):
action_ex_id = action_ex_id or utils.generate_unicode_uuid()
@ -161,7 +161,8 @@ class Action(object):
'state': states.RUNNING,
'input': input_dict,
'runtime_context': runtime_ctx,
'description': desc
'description': desc,
'is_sync': is_sync
}
if self.task_ex:
@ -246,6 +247,7 @@ class PythonAction(Action):
self._create_action_execution(
self._prepare_input(input_dict),
self._prepare_runtime_context(index, safe_rerun),
self.is_sync(input_dict),
desc=desc,
action_ex_id=action_ex_id
)
@ -278,6 +280,7 @@ class PythonAction(Action):
self._create_action_execution(
input_dict,
runtime_ctx,
self.is_sync(input_dict),
desc=desc,
action_ex_id=action_ex_id
)

View File

@ -131,6 +131,14 @@ class Engine(object):
"""
raise NotImplementedError
@abc.abstractmethod
def report_running_actions(self, action_ex_ids):
"""Receives the heartbeat about the running actions.
:param action_ex_ids: The action execution ids.
"""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class TaskPolicy(object):

View File

@ -16,6 +16,7 @@
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from osprofiler import profiler
from mistral.db import utils as db_utils
@ -34,6 +35,8 @@ from mistral.workflow import states
# options required at top level of this __init__.py are not imported before
# the submodules are referenced.
LOG = logging.getLogger(__name__)
class DefaultEngine(base.Engine):
@db_utils.retry_on_db_error
@ -122,6 +125,7 @@ class DefaultEngine(base.Engine):
'input': action_input,
'output': output.to_dict(),
'state': state,
'is_sync': is_action_sync
}
return db_api.create_action_execution(values)
@ -201,3 +205,22 @@ class DefaultEngine(base.Engine):
def rollback_workflow(self, wf_ex_id):
# TODO(rakhmerov): Implement.
raise NotImplementedError
@db_utils.retry_on_db_error
@action_queue.process
def report_running_actions(self, action_ex_ids):
with db_api.transaction():
now = u.utc_now_sec()
for exec_id in action_ex_ids:
try:
db_api.update_action_execution(
exec_id,
{"last_heartbeat": now},
insecure=True
)
except exceptions.DBEntityNotFoundError:
LOG.debug("Action execution heartbeat update failed. {}"
.format(exec_id), exc_info=True)
# Ignore this error and continue with the
# remaining ids.
pass

View File

@ -19,6 +19,7 @@ from mistral.db.v2 import api as db_api
from mistral.engine import default_engine
from mistral.rpc import base as rpc
from mistral.service import base as service_base
from mistral.services import action_execution_checker
from mistral.services import expiration_policy
from mistral.services import scheduler
from mistral import utils
@ -50,6 +51,7 @@ class EngineServer(service_base.MistralService):
self._scheduler = scheduler.start()
self._expiration_policy_tg = expiration_policy.setup()
action_execution_checker.setup()
if self._setup_profiler:
profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
@ -258,6 +260,19 @@ class EngineServer(service_base.MistralService):
return self.engine.rollback_workflow(wf_ex_id)
def report_running_actions(self, rpc_ctx, action_ex_ids):
"""Receives calls over RPC to receive action execution heartbeats.
:param rpc_ctx: RPC request context.
:param action_ex_ids: Action execution ids.
"""
LOG.info(
"Received RPC request 'report_running_actions'[action_ex_ids=%s]",
action_ex_ids
)
return self.engine.report_running_actions(action_ex_ids)
def get_oslo_service(setup_profiler=True):
return EngineServer(

View File

@ -18,9 +18,11 @@ from mistral import config as cfg
from mistral.executors import default_executor as exe
from mistral.rpc import base as rpc
from mistral.service import base as service_base
from mistral.services import action_execution_reporter
from mistral import utils
from mistral.utils import profiler as profiler_utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -37,10 +39,15 @@ class ExecutorServer(service_base.MistralService):
self.executor = executor
self._rpc_server = None
self._reporter = None
self._aer = None
def start(self):
super(ExecutorServer, self).start()
self._aer = action_execution_reporter.ActionExecutionReporter(CONF)
self._reporter = action_execution_reporter.setup(self._aer)
if self._setup_profiler:
profiler_utils.setup('mistral-executor', cfg.CONF.executor.host)
@ -56,6 +63,9 @@ class ExecutorServer(service_base.MistralService):
def stop(self, graceful=False):
super(ExecutorServer, self).stop(graceful)
if self._reporter:
self._reporter.stop(graceful)
if self._rpc_server:
self._rpc_server.stop(graceful)
@ -90,16 +100,21 @@ class ExecutorServer(service_base.MistralService):
redelivered = rpc_ctx.redelivered or False
return self.executor.run_action(
action_ex_id,
action_cls_str,
action_cls_attrs,
params,
safe_rerun,
execution_context,
redelivered,
timeout=timeout
)
try:
self._aer.add_action_ex_id(action_ex_id)
return self.executor.run_action(
action_ex_id,
action_cls_str,
action_cls_attrs,
params,
safe_rerun,
execution_context,
redelivered,
timeout=timeout
)
finally:
self._aer.remove_action_ex_id(action_ex_id)
def get_oslo_service(setup_profiler=True):

View File

@ -320,6 +320,18 @@ class EngineClient(eng.Engine):
wf_ex_id=wf_ex_id
)
@base.wrap_messaging_exception
def report_running_actions(self, action_ex_ids):
"""Receives action execution heartbeats.
:param action_ex_ids: Action execution ids.
"""
return self._client.async_call(
auth_ctx.ctx(),
'report_running_actions',
action_ex_ids=action_ex_ids
)
class ExecutorClient(exe.Executor):
"""RPC Executor client."""

View File

@ -0,0 +1,83 @@
# Copyright 2018 Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.engine import action_handler
from mistral.engine import action_queue
from mistral.services import scheduler
from mistral import utils
from mistral_lib import actions as mistral_lib
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
SCHEDULER_KEY = 'handle_expired_actions_key'
@db_utils.retry_on_db_error
@action_queue.process
def handle_expired_actions():
LOG.debug("Running heartbeat checker...")
try:
interval = CONF.action_heartbeat.check_interval
max_missed = CONF.action_heartbeat.max_missed_heartbeats
exp_date = utils.utc_now_sec() - datetime.timedelta(
seconds=max_missed * interval
)
with db_api.transaction():
action_exs = db_api.get_running_expired_sync_actions(exp_date)
LOG.debug("Found {} running and expired actions.".format(
len(action_exs))
)
if action_exs:
LOG.info("Actions executions to transit to error, because "
"heartbeat wasn't received: {}".format(action_exs))
for action_ex in action_exs:
result = mistral_lib.Result(
error="Heartbeat wasn't received."
)
action_handler.on_action_complete(action_ex, result)
finally:
schedule(interval)
def setup():
interval = CONF.action_heartbeat.check_interval
max_missed = CONF.action_heartbeat.max_missed_heartbeats
enabled = interval and max_missed
if not enabled:
LOG.info("Action heartbeat reporting disabled.")
return
wait_time = interval * max_missed
LOG.debug("First run of action execution checker, wait before "
"checking to make sure executors have time to send "
"heartbeats. ({} seconds)".format(wait_time))
schedule(wait_time)
def schedule(run_after):
scheduler.schedule_call(
None,
'mistral.services.action_execution_checker.handle_expired_actions',
run_after=run_after,
key=SCHEDULER_KEY
)

View File

@ -0,0 +1,93 @@
# Copyright 2018 Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import periodic_task
from oslo_service import threadgroup
from mistral import context as auth_ctx
from mistral.rpc import clients as rpc
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class ActionExecutionReporter(periodic_task.PeriodicTasks):
"""The reporter that reports the running action executions."""
def __init__(self, conf):
super(ActionExecutionReporter, self).__init__(conf)
self._engine_client = rpc.get_engine_client()
self._running_actions = set()
self.interval = CONF.action_heartbeat.check_interval
self.max_missed = CONF.action_heartbeat.max_missed_heartbeats
self.enabled = self.interval and self.max_missed
_periodic_task = periodic_task.periodic_task(
spacing=self.interval,
run_immediately=True
)
self.add_periodic_task(
_periodic_task(report)
)
def add_action_ex_id(self, action_ex_id):
# With run-action there is no actions_ex_id assigned
if action_ex_id and self.enabled:
self._engine_client.report_running_actions([action_ex_id])
self._running_actions.add(action_ex_id)
def remove_action_ex_id(self, action_ex_id):
if action_ex_id and self.enabled:
self._running_actions.discard(action_ex_id)
def report(reporter, ctx):
LOG.debug("Running heartbeat reporter...")
if not reporter._running_actions:
return
auth_ctx.set_ctx(ctx)
reporter._engine_client.report_running_actions(reporter._running_actions)
def setup(action_execution_reporter):
interval = CONF.action_heartbeat.check_interval
max_missed = CONF.action_heartbeat.max_missed_heartbeats
enabled = interval and max_missed
if not enabled:
LOG.info("Action heartbeat reporting disabled.")
return None
tg = threadgroup.ThreadGroup()
ctx = auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
tg.add_dynamic_timer(
action_execution_reporter.run_periodic_tasks,
initial_delay=None,
periodic_interval_max=1,
context=ctx
)
return tg

View File

@ -625,6 +625,41 @@ class DefaultEngineTest(base.DbTestCase):
# TODO(akhmerov): Implement.
pass
def test_report_running_actions(self):
wf_input = {'param1': 'Hey', 'param2': 'Hi'}
# Start workflow.
wf_ex = self.engine.start_workflow(
'wb.wf',
'',
wf_input=wf_input,
description='my execution',
task_name='task2'
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
task_ex = task_execs[0]
action_execs = db_api.get_action_executions(
task_execution_id=task_ex.id
)
task_action_ex = action_execs[0]
self.engine.report_running_actions([])
self.engine.report_running_actions([None, None])
self.engine.report_running_actions([None, task_action_ex.id])
task_action_ex = db_api.get_action_execution(task_action_ex.id)
self.assertIsNotNone(task_action_ex.last_heartbeat)
class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
def test_engine_client_remote_error(self):

View File

@ -17,6 +17,7 @@ import mock
from oslo_config import cfg
from mistral.actions import std_actions
from mistral import config
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import workbooks as wb_service
@ -32,7 +33,9 @@ from mistral_lib import actions as actions_base
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
cfg.CONF.set_default('auth_enable', False, group=config.PECAN_GROUP)
cfg.CONF.set_default('max_missed_heartbeats', 0,
group=config.ACTION_HEARTBEAT_GROUP)
WB = """
---

View File

@ -0,0 +1,5 @@
---
features:
- >
[`blueprint action-execution-reporting <https://blueprints.launchpad.net/mistral/+spec/action-execution-reporting>`_]
Introduced a mechanism to close action executions that stuck in RUNNING state.