Make sure that the field "state_info" trimmed as expected
* Before this patch, "state_info" field of execution objects
wasn't truncated properly before saving into DB which led to
the DB error like:
DBDataError: (_mysql_exceptions.DataError) (1406, "Data too
long for column 'state_info' at row 1")
It was happening because the method utils.cut() didn't work
accurately enough in case if we passed it with a dictionary.
This patch doesn't fix utils.cut() method but it just saves
space for possible method result difference with the expected
length so that we make sure that the truncated string
representation is always less than 65536 bytes (i.e. the size
of MySQL TEXT type). The total difference is not critical
anyway.
Closes-Bug: #1698125
Change-Id: I18449710ace6276224aaea564588c53a3e2c6adc
(cherry picked from commit 5a43d54a05
)
This commit is contained in:
parent
58955acef6
commit
c15c378011
|
@ -256,9 +256,16 @@ class TaskExecution(Execution):
|
|||
for cls in utils.iter_subclasses(Execution):
|
||||
event.listen(
|
||||
# Catch and trim Execution.state_info to always fit allocated size.
|
||||
# Note that the limit is 65500 which is less than 65535 (2^16 -1).
|
||||
# The reason is that utils.cut() is not exactly accurate in case if
|
||||
# the value is not a string, but, for example, a dictionary. If we
|
||||
# limit it exactly to 65535 then once in a while it may go slightly
|
||||
# beyond the allowed maximum size. It may depend on the order of
|
||||
# keys in a string representation and other things that are hidden
|
||||
# inside utils.cut_dict() method.
|
||||
cls.state_info,
|
||||
'set',
|
||||
lambda t, v, o, i: utils.cut(v, 65532),
|
||||
lambda t, v, o, i: utils.cut(v, 65500),
|
||||
retval=True
|
||||
)
|
||||
|
||||
|
|
|
@ -1356,7 +1356,7 @@ class ActionExecutionTest(SQLAlchemyTest):
|
|||
self.assertEqual('FAILED', updated.state)
|
||||
state_info = db_api.load_action_execution(updated.id).state_info
|
||||
self.assertEqual(
|
||||
65535,
|
||||
65503,
|
||||
len(state_info)
|
||||
)
|
||||
|
||||
|
@ -1652,7 +1652,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
|
|||
self.assertEqual('FAILED', updated.state)
|
||||
state_info = db_api.load_workflow_execution(updated.id).state_info
|
||||
self.assertEqual(
|
||||
65535,
|
||||
65503,
|
||||
len(state_info)
|
||||
)
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
from oslo_config import cfg
|
||||
|
||||
from mistral.actions import base as actions_base
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workflows as wf_service
|
||||
|
@ -36,27 +37,42 @@ wf:
|
|||
input:
|
||||
- workflow_input: '__WORKFLOW_INPUT__'
|
||||
- action_output_length: 0
|
||||
- action_output_dict: false
|
||||
- action_error: false
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: my_action
|
||||
input:
|
||||
action_input: '__ACTION_INPUT__'
|
||||
action_output_length: <% $.action_output_length %>
|
||||
input: '__ACTION_INPUT__'
|
||||
output_length: <% $.action_output_length %>
|
||||
output_dict: <% $.action_output_dict %>
|
||||
error: <% $.action_error %>
|
||||
publish:
|
||||
p_var: '__TASK_PUBLISHED__'
|
||||
"""
|
||||
|
||||
|
||||
class MyAction(actions_base.Action):
|
||||
def __init__(self, action_input, action_output_length):
|
||||
self.action_input = action_input
|
||||
self.action_output_length = action_output_length
|
||||
def __init__(self, input, output_length, output_dict=False, error=False):
|
||||
self.input = input
|
||||
self.output_length = output_length
|
||||
self.output_dict = output_dict
|
||||
self.error = error
|
||||
|
||||
def run(self):
|
||||
return wf_utils.Result(
|
||||
data=''.join('A' for _ in range(self.action_output_length))
|
||||
)
|
||||
if not self.output_dict:
|
||||
result = ''.join('A' for _ in range(self.output_length))
|
||||
else:
|
||||
result = {}
|
||||
|
||||
for i in range(self.output_length):
|
||||
result[i] = 'A'
|
||||
|
||||
if not self.error:
|
||||
return wf_utils.Result(data=result)
|
||||
else:
|
||||
return wf_utils.Result(error=result)
|
||||
|
||||
def test(self):
|
||||
raise NotImplementedError
|
||||
|
@ -229,3 +245,39 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
|
|||
"Size of 'params' is 1KB which exceeds the limit of 0KB",
|
||||
e.message
|
||||
)
|
||||
|
||||
def test_task_execution_state_info_trimmed(self):
|
||||
# No limit on output, input and other JSON fields.
|
||||
cfg.CONF.set_default(
|
||||
'execution_field_size_limit_kb',
|
||||
-1,
|
||||
group='engine'
|
||||
)
|
||||
|
||||
wf_service.create_workflows(WF)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'wf',
|
||||
{
|
||||
'action_output_length': 80000,
|
||||
'action_output_dict': True,
|
||||
'action_error': True
|
||||
}
|
||||
)
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
state=states.ERROR
|
||||
)
|
||||
|
||||
# "state_info" must be trimmed so that it's not greater than 65535.
|
||||
self.assertLess(len(task_ex.state_info), 65536)
|
||||
self.assertGreater(len(task_ex.state_info), 65490)
|
||||
self.assertLess(len(wf_ex.state_info), 65536)
|
||||
self.assertGreater(len(wf_ex.state_info), 65490)
|
||||
|
|
|
@ -185,6 +185,27 @@ def get_file_list(directory):
|
|||
|
||||
|
||||
def cut_dict(d, length=100):
|
||||
"""Removes dictionary entries according to the given length.
|
||||
|
||||
This method removes a number of entries, if needed, so that a
|
||||
string representation would fit into the given length.
|
||||
The intention of this method is to optimize truncation of string
|
||||
representation for dictionaries where the exact precision is not
|
||||
critically important. Otherwise, we'd always have to convert a dict
|
||||
into a string first and then shrink it to a needed size which will
|
||||
increase memory footprint and reduce performance in case of large
|
||||
dictionaries (i.e. tens of thousands entries).
|
||||
Note that the method, due to complexity of the algorithm, has some
|
||||
non-zero precision which depends on exact keys and values placed into
|
||||
the dict. So for some dicts their reduced string representations will
|
||||
be only approximately equal to the given value (up to around several
|
||||
chars difference).
|
||||
|
||||
:param d: A dictionary.
|
||||
:param length: A length limiting the dictionary string representation.
|
||||
:return: A dictionary which is a subset of the given dictionary.
|
||||
"""
|
||||
|
||||
if not isinstance(d, dict):
|
||||
raise ValueError("A dictionary is expected, got: %s" % type(d))
|
||||
|
||||
|
|
Loading…
Reference in New Issue