Merge "Make sure that the field "state_info" trimmed as expected"
This commit is contained in:
commit
a412bb9626
|
@ -256,9 +256,16 @@ class TaskExecution(Execution):
|
|||
for cls in utils.iter_subclasses(Execution):
|
||||
event.listen(
|
||||
# Catch and trim Execution.state_info to always fit allocated size.
|
||||
# Note that the limit is 65500 which is less than 65535 (2^16 -1).
|
||||
# The reason is that utils.cut() is not exactly accurate in case if
|
||||
# the value is not a string, but, for example, a dictionary. If we
|
||||
# limit it exactly to 65535 then once in a while it may go slightly
|
||||
# beyond the allowed maximum size. It may depend on the order of
|
||||
# keys in a string representation and other things that are hidden
|
||||
# inside utils.cut_dict() method.
|
||||
cls.state_info,
|
||||
'set',
|
||||
lambda t, v, o, i: utils.cut(v, 65532),
|
||||
lambda t, v, o, i: utils.cut(v, 65500),
|
||||
retval=True
|
||||
)
|
||||
|
||||
|
|
|
@ -1385,7 +1385,7 @@ class ActionExecutionTest(SQLAlchemyTest):
|
|||
self.assertEqual('FAILED', updated.state)
|
||||
state_info = db_api.load_action_execution(updated.id).state_info
|
||||
self.assertEqual(
|
||||
65535,
|
||||
65503,
|
||||
len(state_info)
|
||||
)
|
||||
|
||||
|
@ -1745,7 +1745,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
|
|||
self.assertEqual('FAILED', updated.state)
|
||||
state_info = db_api.load_workflow_execution(updated.id).state_info
|
||||
self.assertEqual(
|
||||
65535,
|
||||
65503,
|
||||
len(state_info)
|
||||
)
|
||||
|
||||
|
|
|
@ -14,7 +14,8 @@
|
|||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.actions import base as actions_base
|
||||
from mistral_lib.actions import base as actions_base
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workflows as wf_service
|
||||
|
@ -36,27 +37,42 @@ wf:
|
|||
input:
|
||||
- workflow_input: '__WORKFLOW_INPUT__'
|
||||
- action_output_length: 0
|
||||
- action_output_dict: false
|
||||
- action_error: false
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: my_action
|
||||
input:
|
||||
action_input: '__ACTION_INPUT__'
|
||||
action_output_length: <% $.action_output_length %>
|
||||
input: '__ACTION_INPUT__'
|
||||
output_length: <% $.action_output_length %>
|
||||
output_dict: <% $.action_output_dict %>
|
||||
error: <% $.action_error %>
|
||||
publish:
|
||||
p_var: '__TASK_PUBLISHED__'
|
||||
"""
|
||||
|
||||
|
||||
class MyAction(actions_base.Action):
|
||||
def __init__(self, action_input, action_output_length):
|
||||
self.action_input = action_input
|
||||
self.action_output_length = action_output_length
|
||||
def __init__(self, input, output_length, output_dict=False, error=False):
|
||||
self.input = input
|
||||
self.output_length = output_length
|
||||
self.output_dict = output_dict
|
||||
self.error = error
|
||||
|
||||
def run(self):
|
||||
return wf_utils.Result(
|
||||
data=''.join('A' for _ in range(self.action_output_length))
|
||||
)
|
||||
def run(self, context):
|
||||
if not self.output_dict:
|
||||
result = ''.join('A' for _ in range(self.output_length))
|
||||
else:
|
||||
result = {}
|
||||
|
||||
for i in range(self.output_length):
|
||||
result[i] = 'A'
|
||||
|
||||
if not self.error:
|
||||
return wf_utils.Result(data=result)
|
||||
else:
|
||||
return wf_utils.Result(error=result)
|
||||
|
||||
def test(self):
|
||||
raise NotImplementedError
|
||||
|
@ -229,3 +245,39 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
|
|||
"Size of 'params' is 1KB which exceeds the limit of 0KB",
|
||||
e.message
|
||||
)
|
||||
|
||||
def test_task_execution_state_info_trimmed(self):
|
||||
# No limit on output, input and other JSON fields.
|
||||
cfg.CONF.set_default(
|
||||
'execution_field_size_limit_kb',
|
||||
-1,
|
||||
group='engine'
|
||||
)
|
||||
|
||||
wf_service.create_workflows(WF)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'wf',
|
||||
{
|
||||
'action_output_length': 80000,
|
||||
'action_output_dict': True,
|
||||
'action_error': True
|
||||
}
|
||||
)
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
state=states.ERROR
|
||||
)
|
||||
|
||||
# "state_info" must be trimmed so that it's not greater than 65535.
|
||||
self.assertLess(len(task_ex.state_info), 65536)
|
||||
self.assertGreater(len(task_ex.state_info), 65490)
|
||||
self.assertLess(len(wf_ex.state_info), 65536)
|
||||
self.assertGreater(len(wf_ex.state_info), 65490)
|
||||
|
|
|
@ -186,6 +186,27 @@ def get_file_list(directory):
|
|||
|
||||
|
||||
def cut_dict(d, length=100):
|
||||
"""Removes dictionary entries according to the given length.
|
||||
|
||||
This method removes a number of entries, if needed, so that a
|
||||
string representation would fit into the given length.
|
||||
The intention of this method is to optimize truncation of string
|
||||
representation for dictionaries where the exact precision is not
|
||||
critically important. Otherwise, we'd always have to convert a dict
|
||||
into a string first and then shrink it to a needed size which will
|
||||
increase memory footprint and reduce performance in case of large
|
||||
dictionaries (i.e. tens of thousands entries).
|
||||
Note that the method, due to complexity of the algorithm, has some
|
||||
non-zero precision which depends on exact keys and values placed into
|
||||
the dict. So for some dicts their reduced string representations will
|
||||
be only approximately equal to the given value (up to around several
|
||||
chars difference).
|
||||
|
||||
:param d: A dictionary.
|
||||
:param length: A length limiting the dictionary string representation.
|
||||
:return: A dictionary which is a subset of the given dictionary.
|
||||
"""
|
||||
|
||||
if not isinstance(d, dict):
|
||||
raise ValueError("A dictionary is expected, got: %s" % type(d))
|
||||
|
||||
|
|
Loading…
Reference in New Issue