Merge "A mechanism to close stuck running action executions"
This commit is contained in:
commit
e6aa24b9a7
|
@ -131,6 +131,43 @@ directory.
|
|||
For more details see `policy.json file
|
||||
<https://docs.openstack.org/oslo.policy/latest/admin/policy-json-file.html>`_.
|
||||
|
||||
#. Modify the action execution reporting configuration if needed.
|
||||
|
||||
It is possible that actions stuck in *"RUNNING"* state, for example if the
|
||||
assigned executor dies or the message that signals the completion of the
|
||||
action is lost. This section describes a heartbeat based solution to close
|
||||
these forgotten action executions. The related configuration options are
|
||||
``max_missed_heartbeats`` and ``check_interval``. Note that if either
|
||||
of these options are *"0"* then the feature won't be enabled.
|
||||
|
||||
The default configuration is the following::
|
||||
|
||||
[action_heartbeat]
|
||||
max_missed_heartbeats = 15
|
||||
check_interval = 20
|
||||
first_heartbeat_timeout = 3600
|
||||
|
||||
*"check_interval = 20"*, so check action executions every
|
||||
20 seconds. When the checker runs it will transit all running action
|
||||
executions to error if the last heartbeat received is older than *"20 \*
|
||||
15"* seconds. Note that *"first_heartbeat_timeout = 3600"*, so the action
|
||||
execution won't be closed for 3600 seconds if no heartbeat was received for
|
||||
it.
|
||||
|
||||
- **max_missed_heartbeats**
|
||||
|
||||
Defines the maximum amount of missed heartbeats to be allowed. If the number
|
||||
of missed heartbeats exceeds this number, then the related action execution
|
||||
will be transited to *"ERROR"* state with cause *"Heartbeat wasn't received."*.
|
||||
|
||||
- **check_interval**
|
||||
|
||||
The interval between checks (in seconds).
|
||||
|
||||
- **first_heartbeat_timeout**
|
||||
|
||||
The grace period for the first heartbeat (in seconds).
|
||||
|
||||
#. Finally, try to run mistral engine and verify that it is running without
|
||||
any error::
|
||||
|
||||
|
|
|
@ -337,7 +337,7 @@ execution_expiration_policy_opts = [
|
|||
'evaluation_interval',
|
||||
help=_('How often will the executions be evaluated '
|
||||
'(in minutes). For example for value 120 the interval '
|
||||
'will be 2 hours (every 2 hours).'
|
||||
'will be 2 hours (every 2 hours). '
|
||||
'Note that only final state executions will be removed: '
|
||||
'( SUCCESS / ERROR / CANCELLED ).')
|
||||
),
|
||||
|
@ -351,12 +351,12 @@ execution_expiration_policy_opts = [
|
|||
cfg.IntOpt(
|
||||
'max_finished_executions',
|
||||
default=0,
|
||||
help=_('The maximum number of finished workflow executions'
|
||||
'to be stored. For example when max_finished_executions = 100,'
|
||||
'only the 100 latest finished executions will be preserved.'
|
||||
'This means that even unexpired executions are eligible'
|
||||
'for deletion, to decrease the number of executions in the'
|
||||
'database. The default value is 0. If it is set to 0,'
|
||||
help=_('The maximum number of finished workflow executions '
|
||||
'to be stored. For example when max_finished_executions = 100, '
|
||||
'only the 100 latest finished executions will be preserved. '
|
||||
'This means that even unexpired executions are eligible '
|
||||
'for deletion, to decrease the number of executions in the '
|
||||
'database. The default value is 0. If it is set to 0, '
|
||||
'this constraint won\'t be applied.')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
|
@ -364,11 +364,44 @@ execution_expiration_policy_opts = [
|
|||
default=0,
|
||||
help=_('Size of batch of expired executions to be deleted.'
|
||||
'The default value is 0. If it is set to 0, '
|
||||
'size of batch is total number of expired executions'
|
||||
'size of batch is total number of expired executions '
|
||||
'that is going to be deleted.')
|
||||
)
|
||||
]
|
||||
|
||||
action_heartbeat_opts = [
|
||||
cfg.IntOpt(
|
||||
'max_missed_heartbeats',
|
||||
min=0,
|
||||
default=15,
|
||||
help=_('The maximum amount of missed heartbeats to be allowed. '
|
||||
'If set to 0 then this feature won\'t be enabled. '
|
||||
'See check_interval for more details.')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'check_interval',
|
||||
min=0,
|
||||
default=20,
|
||||
help=_('How often the action executions are checked (in seconds). '
|
||||
'For example when check_interval = 10, check action '
|
||||
'executions every 10 seconds. When the checker runs it will '
|
||||
'transit all running action executions to error if the last '
|
||||
'heartbeat received is older than 10 * max_missed_heartbeats '
|
||||
'seconds. If set to 0 then this feature won\'t be enabled.')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'first_heartbeat_timeout',
|
||||
min=0,
|
||||
default=3600,
|
||||
help=_('The first heartbeat is handled differently, to provide a '
|
||||
'grace period in case there is no available executor to handle '
|
||||
'the action execution. For example when '
|
||||
'first_heartbeat_timeout = 3600, wait 3600 seconds before '
|
||||
'closing the action executions that never received a heartbeat.'
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
coordination_opts = [
|
||||
cfg.StrOpt(
|
||||
'backend_url',
|
||||
|
@ -514,6 +547,7 @@ NOTIFIER_GROUP = 'notifier'
|
|||
PECAN_GROUP = 'pecan'
|
||||
COORDINATION_GROUP = 'coordination'
|
||||
EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy'
|
||||
ACTION_HEARTBEAT_GROUP = 'action_heartbeat'
|
||||
PROFILER_GROUP = profiler.list_opts()[0][0]
|
||||
KEYCLOAK_OIDC_GROUP = "keycloak_oidc"
|
||||
OPENSTACK_ACTIONS_GROUP = 'openstack_actions'
|
||||
|
@ -536,6 +570,10 @@ CONF.register_opts(
|
|||
execution_expiration_policy_opts,
|
||||
group=EXECUTION_EXPIRATION_POLICY_GROUP
|
||||
)
|
||||
CONF.register_opts(
|
||||
action_heartbeat_opts,
|
||||
group=ACTION_HEARTBEAT_GROUP
|
||||
)
|
||||
CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP)
|
||||
CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP)
|
||||
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
|
||||
|
@ -591,6 +629,7 @@ def list_opts():
|
|||
(KEYCLOAK_OIDC_GROUP, keycloak_oidc_opts),
|
||||
(OPENSTACK_ACTIONS_GROUP, openstack_actions_opts),
|
||||
(YAQL_GROUP, yaql_opts),
|
||||
(ACTION_HEARTBEAT_GROUP, action_heartbeat_opts),
|
||||
(None, default_group_opts)
|
||||
]
|
||||
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
# Copyright 2018 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Add last_heartbeat to action execution
|
||||
|
||||
Revision ID: 027
|
||||
Revises: 026
|
||||
Create Date: 2018-09-05 16:49:50.342349
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '027'
|
||||
down_revision = '026'
|
||||
|
||||
from alembic import op
|
||||
import datetime
|
||||
from mistral import utils
|
||||
from oslo_config import cfg
|
||||
from sqlalchemy import Column, DateTime, Boolean
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column(
|
||||
'action_executions_v2',
|
||||
Column(
|
||||
'last_heartbeat',
|
||||
DateTime,
|
||||
default=lambda: utils.utc_now_sec() + datetime.timedelta(
|
||||
seconds=CONF.action_heartbeat.first_heartbeat_timeout
|
||||
)
|
||||
)
|
||||
)
|
||||
op.add_column(
|
||||
'action_executions_v2',
|
||||
Column('is_sync', Boolean, default=None, nullable=True)
|
||||
)
|
|
@ -211,8 +211,8 @@ def delete_action_definitions(**kwargs):
|
|||
|
||||
# Action executions.
|
||||
|
||||
def get_action_execution(id, fields=()):
|
||||
return IMPL.get_action_execution(id, fields=fields)
|
||||
def get_action_execution(id, fields=(), insecure=False):
|
||||
return IMPL.get_action_execution(id, fields=fields, insecure=insecure)
|
||||
|
||||
|
||||
def load_action_execution(name, fields=()):
|
||||
|
@ -228,8 +228,8 @@ def create_action_execution(values):
|
|||
return IMPL.create_action_execution(values)
|
||||
|
||||
|
||||
def update_action_execution(id, values):
|
||||
return IMPL.update_action_execution(id, values)
|
||||
def update_action_execution(id, values, insecure=False):
|
||||
return IMPL.update_action_execution(id, values, insecure)
|
||||
|
||||
|
||||
def create_or_update_action_execution(id, values):
|
||||
|
@ -413,6 +413,10 @@ def get_expired_executions(expiration_time, limit=None, columns=()):
|
|||
)
|
||||
|
||||
|
||||
def get_running_expired_sync_actions(expiration_time, session=None):
|
||||
return IMPL.get_running_expired_sync_actions(expiration_time)
|
||||
|
||||
|
||||
def get_superfluous_executions(max_finished_executions, limit=None,
|
||||
columns=()):
|
||||
return IMPL.get_superfluous_executions(
|
||||
|
|
|
@ -669,8 +669,9 @@ def delete_action_definitions(session=None, **kwargs):
|
|||
# Action executions.
|
||||
|
||||
@b.session_aware()
|
||||
def get_action_execution(id, fields=(), session=None):
|
||||
a_ex = _get_db_object_by_id(models.ActionExecution, id, columns=fields)
|
||||
def get_action_execution(id, insecure=False, fields=(), session=None):
|
||||
a_ex = _get_db_object_by_id(models.ActionExecution, id, insecure=insecure,
|
||||
columns=fields)
|
||||
|
||||
if not a_ex:
|
||||
raise exc.DBEntityNotFoundError(
|
||||
|
@ -707,8 +708,8 @@ def create_action_execution(values, session=None):
|
|||
|
||||
|
||||
@b.session_aware()
|
||||
def update_action_execution(id, values, session=None):
|
||||
a_ex = get_action_execution(id)
|
||||
def update_action_execution(id, values, insecure=False, session=None):
|
||||
a_ex = get_action_execution(id, insecure)
|
||||
|
||||
a_ex.update(values.copy())
|
||||
|
||||
|
@ -1098,6 +1099,18 @@ def get_expired_executions(expiration_time, limit=None, columns=(),
|
|||
return query.all()
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def get_running_expired_sync_actions(expiration_time, session=None):
|
||||
query = b.model_query(models.ActionExecution)
|
||||
query = query.filter(
|
||||
models.ActionExecution.last_heartbeat < expiration_time
|
||||
)
|
||||
query = query.filter_by(is_sync=True)
|
||||
query = query.filter(models.ActionExecution.state == states.RUNNING)
|
||||
|
||||
return query.all()
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
|
||||
session=None):
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
|
@ -33,6 +34,7 @@ from mistral import utils
|
|||
|
||||
# Definition objects.
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -195,6 +197,13 @@ class ActionExecution(Execution):
|
|||
accepted = sa.Column(sa.Boolean(), default=False)
|
||||
input = sa.Column(st.JsonLongDictType(), nullable=True)
|
||||
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
|
||||
last_heartbeat = sa.Column(
|
||||
sa.DateTime,
|
||||
default=lambda: utils.utc_now_sec() + datetime.timedelta(
|
||||
seconds=CONF.action_heartbeat.first_heartbeat_timeout
|
||||
)
|
||||
)
|
||||
is_sync = sa.Column(sa.Boolean(), default=None, nullable=True)
|
||||
|
||||
|
||||
class WorkflowExecution(Execution):
|
||||
|
|
|
@ -150,7 +150,7 @@ class Action(object):
|
|||
"""
|
||||
return True
|
||||
|
||||
def _create_action_execution(self, input_dict, runtime_ctx,
|
||||
def _create_action_execution(self, input_dict, runtime_ctx, is_sync,
|
||||
desc='', action_ex_id=None):
|
||||
action_ex_id = action_ex_id or utils.generate_unicode_uuid()
|
||||
|
||||
|
@ -161,7 +161,8 @@ class Action(object):
|
|||
'state': states.RUNNING,
|
||||
'input': input_dict,
|
||||
'runtime_context': runtime_ctx,
|
||||
'description': desc
|
||||
'description': desc,
|
||||
'is_sync': is_sync
|
||||
}
|
||||
|
||||
if self.task_ex:
|
||||
|
@ -246,6 +247,7 @@ class PythonAction(Action):
|
|||
self._create_action_execution(
|
||||
self._prepare_input(input_dict),
|
||||
self._prepare_runtime_context(index, safe_rerun),
|
||||
self.is_sync(input_dict),
|
||||
desc=desc,
|
||||
action_ex_id=action_ex_id
|
||||
)
|
||||
|
@ -278,6 +280,7 @@ class PythonAction(Action):
|
|||
self._create_action_execution(
|
||||
input_dict,
|
||||
runtime_ctx,
|
||||
self.is_sync(input_dict),
|
||||
desc=desc,
|
||||
action_ex_id=action_ex_id
|
||||
)
|
||||
|
|
|
@ -131,6 +131,14 @@ class Engine(object):
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
"""Receives the heartbeat about the running actions.
|
||||
|
||||
:param action_ex_ids: The action execution ids.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class TaskPolicy(object):
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
|
@ -34,6 +35,8 @@ from mistral.workflow import states
|
|||
# options required at top level of this __init__.py are not imported before
|
||||
# the submodules are referenced.
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultEngine(base.Engine):
|
||||
@db_utils.retry_on_db_error
|
||||
|
@ -122,6 +125,7 @@ class DefaultEngine(base.Engine):
|
|||
'input': action_input,
|
||||
'output': output.to_dict(),
|
||||
'state': state,
|
||||
'is_sync': is_action_sync
|
||||
}
|
||||
|
||||
return db_api.create_action_execution(values)
|
||||
|
@ -201,3 +205,22 @@ class DefaultEngine(base.Engine):
|
|||
def rollback_workflow(self, wf_ex_id):
|
||||
# TODO(rakhmerov): Implement.
|
||||
raise NotImplementedError
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
with db_api.transaction():
|
||||
now = u.utc_now_sec()
|
||||
for exec_id in action_ex_ids:
|
||||
try:
|
||||
db_api.update_action_execution(
|
||||
exec_id,
|
||||
{"last_heartbeat": now},
|
||||
insecure=True
|
||||
)
|
||||
except exceptions.DBEntityNotFoundError:
|
||||
LOG.debug("Action execution heartbeat update failed. {}"
|
||||
.format(exec_id), exc_info=True)
|
||||
# Ignore this error and continue with the
|
||||
# remaining ids.
|
||||
pass
|
||||
|
|
|
@ -19,6 +19,7 @@ from mistral.db.v2 import api as db_api
|
|||
from mistral.engine import default_engine
|
||||
from mistral.rpc import base as rpc
|
||||
from mistral.service import base as service_base
|
||||
from mistral.services import action_execution_checker
|
||||
from mistral.services import expiration_policy
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
|
@ -50,6 +51,7 @@ class EngineServer(service_base.MistralService):
|
|||
|
||||
self._scheduler = scheduler.start()
|
||||
self._expiration_policy_tg = expiration_policy.setup()
|
||||
action_execution_checker.setup()
|
||||
|
||||
if self._setup_profiler:
|
||||
profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
|
||||
|
@ -258,6 +260,19 @@ class EngineServer(service_base.MistralService):
|
|||
|
||||
return self.engine.rollback_workflow(wf_ex_id)
|
||||
|
||||
def report_running_actions(self, rpc_ctx, action_ex_ids):
|
||||
"""Receives calls over RPC to receive action execution heartbeats.
|
||||
|
||||
:param rpc_ctx: RPC request context.
|
||||
:param action_ex_ids: Action execution ids.
|
||||
"""
|
||||
LOG.info(
|
||||
"Received RPC request 'report_running_actions'[action_ex_ids=%s]",
|
||||
action_ex_ids
|
||||
)
|
||||
|
||||
return self.engine.report_running_actions(action_ex_ids)
|
||||
|
||||
|
||||
def get_oslo_service(setup_profiler=True):
|
||||
return EngineServer(
|
||||
|
|
|
@ -18,9 +18,11 @@ from mistral import config as cfg
|
|||
from mistral.executors import default_executor as exe
|
||||
from mistral.rpc import base as rpc
|
||||
from mistral.service import base as service_base
|
||||
from mistral.services import action_execution_reporter
|
||||
from mistral import utils
|
||||
from mistral.utils import profiler as profiler_utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -37,10 +39,15 @@ class ExecutorServer(service_base.MistralService):
|
|||
|
||||
self.executor = executor
|
||||
self._rpc_server = None
|
||||
self._reporter = None
|
||||
self._aer = None
|
||||
|
||||
def start(self):
|
||||
super(ExecutorServer, self).start()
|
||||
|
||||
self._aer = action_execution_reporter.ActionExecutionReporter(CONF)
|
||||
self._reporter = action_execution_reporter.setup(self._aer)
|
||||
|
||||
if self._setup_profiler:
|
||||
profiler_utils.setup('mistral-executor', cfg.CONF.executor.host)
|
||||
|
||||
|
@ -56,6 +63,9 @@ class ExecutorServer(service_base.MistralService):
|
|||
def stop(self, graceful=False):
|
||||
super(ExecutorServer, self).stop(graceful)
|
||||
|
||||
if self._reporter:
|
||||
self._reporter.stop(graceful)
|
||||
|
||||
if self._rpc_server:
|
||||
self._rpc_server.stop(graceful)
|
||||
|
||||
|
@ -90,16 +100,21 @@ class ExecutorServer(service_base.MistralService):
|
|||
|
||||
redelivered = rpc_ctx.redelivered or False
|
||||
|
||||
return self.executor.run_action(
|
||||
action_ex_id,
|
||||
action_cls_str,
|
||||
action_cls_attrs,
|
||||
params,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
redelivered,
|
||||
timeout=timeout
|
||||
)
|
||||
try:
|
||||
self._aer.add_action_ex_id(action_ex_id)
|
||||
|
||||
return self.executor.run_action(
|
||||
action_ex_id,
|
||||
action_cls_str,
|
||||
action_cls_attrs,
|
||||
params,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
redelivered,
|
||||
timeout=timeout
|
||||
)
|
||||
finally:
|
||||
self._aer.remove_action_ex_id(action_ex_id)
|
||||
|
||||
|
||||
def get_oslo_service(setup_profiler=True):
|
||||
|
|
|
@ -320,6 +320,18 @@ class EngineClient(eng.Engine):
|
|||
wf_ex_id=wf_ex_id
|
||||
)
|
||||
|
||||
@base.wrap_messaging_exception
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
"""Receives action execution heartbeats.
|
||||
|
||||
:param action_ex_ids: Action execution ids.
|
||||
"""
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'report_running_actions',
|
||||
action_ex_ids=action_ex_ids
|
||||
)
|
||||
|
||||
|
||||
class ExecutorClient(exe.Executor):
|
||||
"""RPC Executor client."""
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
# Copyright 2018 Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import action_handler
|
||||
from mistral.engine import action_queue
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
from mistral_lib import actions as mistral_lib
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
SCHEDULER_KEY = 'handle_expired_actions_key'
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def handle_expired_actions():
|
||||
LOG.debug("Running heartbeat checker...")
|
||||
|
||||
try:
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
exp_date = utils.utc_now_sec() - datetime.timedelta(
|
||||
seconds=max_missed * interval
|
||||
)
|
||||
|
||||
with db_api.transaction():
|
||||
action_exs = db_api.get_running_expired_sync_actions(exp_date)
|
||||
LOG.debug("Found {} running and expired actions.".format(
|
||||
len(action_exs))
|
||||
)
|
||||
if action_exs:
|
||||
LOG.info("Actions executions to transit to error, because "
|
||||
"heartbeat wasn't received: {}".format(action_exs))
|
||||
for action_ex in action_exs:
|
||||
result = mistral_lib.Result(
|
||||
error="Heartbeat wasn't received."
|
||||
)
|
||||
action_handler.on_action_complete(action_ex, result)
|
||||
finally:
|
||||
schedule(interval)
|
||||
|
||||
|
||||
def setup():
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
enabled = interval and max_missed
|
||||
if not enabled:
|
||||
LOG.info("Action heartbeat reporting disabled.")
|
||||
return
|
||||
|
||||
wait_time = interval * max_missed
|
||||
LOG.debug("First run of action execution checker, wait before "
|
||||
"checking to make sure executors have time to send "
|
||||
"heartbeats. ({} seconds)".format(wait_time))
|
||||
|
||||
schedule(wait_time)
|
||||
|
||||
|
||||
def schedule(run_after):
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.services.action_execution_checker.handle_expired_actions',
|
||||
run_after=run_after,
|
||||
key=SCHEDULER_KEY
|
||||
)
|
|
@ -0,0 +1,93 @@
|
|||
# Copyright 2018 Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import periodic_task
|
||||
from oslo_service import threadgroup
|
||||
|
||||
from mistral import context as auth_ctx
|
||||
from mistral.rpc import clients as rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class ActionExecutionReporter(periodic_task.PeriodicTasks):
|
||||
"""The reporter that reports the running action executions."""
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ActionExecutionReporter, self).__init__(conf)
|
||||
self._engine_client = rpc.get_engine_client()
|
||||
self._running_actions = set()
|
||||
|
||||
self.interval = CONF.action_heartbeat.check_interval
|
||||
self.max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
self.enabled = self.interval and self.max_missed
|
||||
|
||||
_periodic_task = periodic_task.periodic_task(
|
||||
spacing=self.interval,
|
||||
run_immediately=True
|
||||
)
|
||||
self.add_periodic_task(
|
||||
_periodic_task(report)
|
||||
)
|
||||
|
||||
def add_action_ex_id(self, action_ex_id):
|
||||
# With run-action there is no actions_ex_id assigned
|
||||
if action_ex_id and self.enabled:
|
||||
self._engine_client.report_running_actions([action_ex_id])
|
||||
self._running_actions.add(action_ex_id)
|
||||
|
||||
def remove_action_ex_id(self, action_ex_id):
|
||||
if action_ex_id and self.enabled:
|
||||
self._running_actions.discard(action_ex_id)
|
||||
|
||||
|
||||
def report(reporter, ctx):
|
||||
LOG.debug("Running heartbeat reporter...")
|
||||
|
||||
if not reporter._running_actions:
|
||||
return
|
||||
|
||||
auth_ctx.set_ctx(ctx)
|
||||
reporter._engine_client.report_running_actions(reporter._running_actions)
|
||||
|
||||
|
||||
def setup(action_execution_reporter):
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
enabled = interval and max_missed
|
||||
if not enabled:
|
||||
LOG.info("Action heartbeat reporting disabled.")
|
||||
return None
|
||||
|
||||
tg = threadgroup.ThreadGroup()
|
||||
|
||||
ctx = auth_ctx.MistralContext(
|
||||
user=None,
|
||||
tenant=None,
|
||||
auth_token=None,
|
||||
is_admin=True
|
||||
)
|
||||
|
||||
tg.add_dynamic_timer(
|
||||
action_execution_reporter.run_periodic_tasks,
|
||||
initial_delay=None,
|
||||
periodic_interval_max=1,
|
||||
context=ctx
|
||||
)
|
||||
|
||||
return tg
|
|
@ -625,6 +625,41 @@ class DefaultEngineTest(base.DbTestCase):
|
|||
# TODO(akhmerov): Implement.
|
||||
pass
|
||||
|
||||
def test_report_running_actions(self):
|
||||
wf_input = {'param1': 'Hey', 'param2': 'Hi'}
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'wb.wf',
|
||||
'',
|
||||
wf_input=wf_input,
|
||||
description='my execution',
|
||||
task_name='task2'
|
||||
)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(1, len(task_execs))
|
||||
|
||||
task_ex = task_execs[0]
|
||||
|
||||
action_execs = db_api.get_action_executions(
|
||||
task_execution_id=task_ex.id
|
||||
)
|
||||
|
||||
task_action_ex = action_execs[0]
|
||||
|
||||
self.engine.report_running_actions([])
|
||||
self.engine.report_running_actions([None, None])
|
||||
self.engine.report_running_actions([None, task_action_ex.id])
|
||||
|
||||
task_action_ex = db_api.get_action_execution(task_action_ex.id)
|
||||
|
||||
self.assertIsNotNone(task_action_ex.last_heartbeat)
|
||||
|
||||
|
||||
class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
|
||||
def test_engine_client_remote_error(self):
|
||||
|
|
|
@ -17,6 +17,7 @@ import mock
|
|||
from oslo_config import cfg
|
||||
|
||||
from mistral.actions import std_actions
|
||||
from mistral import config
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workbooks as wb_service
|
||||
|
@ -32,7 +33,9 @@ from mistral_lib import actions as actions_base
|
|||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
cfg.CONF.set_default('auth_enable', False, group=config.PECAN_GROUP)
|
||||
cfg.CONF.set_default('max_missed_heartbeats', 0,
|
||||
group=config.ACTION_HEARTBEAT_GROUP)
|
||||
|
||||
WB = """
|
||||
---
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- >
|
||||
[`blueprint action-execution-reporting <https://blueprints.launchpad.net/mistral/+spec/action-execution-reporting>`_]
|
||||
Introduced a mechanism to close action executions that stuck in RUNNING state.
|
Loading…
Reference in New Issue