Make sure that the field "state_info" trimmed as expected

* Before this patch, "state_info" field of execution objects
  wasn't truncated properly before saving into DB which led to
  the DB error like:
    DBDataError: (_mysql_exceptions.DataError) (1406, "Data too
    long for column 'state_info' at row 1")
  It was happening because the method utils.cut() didn't work
  accurately enough in case if we passed it with a dictionary.
  This patch doesn't fix utils.cut() method but it just saves
  space for possible method result difference with the expected
  length so that we make sure that the truncated string
  representation is always less than 65536 bytes (i.e. the size
  of MySQL TEXT type). The total difference is not critical
  anyway.

Closes-Bug: #1698125
Change-Id: I18449710ace6276224aaea564588c53a3e2c6adc
(cherry picked from commit 5a43d54a05)
This commit is contained in:
Renat Akhmerov 2017-06-14 20:10:03 +07:00
parent 58955acef6
commit c15c378011
4 changed files with 91 additions and 11 deletions

View File

@ -256,9 +256,16 @@ class TaskExecution(Execution):
for cls in utils.iter_subclasses(Execution):
event.listen(
# Catch and trim Execution.state_info to always fit allocated size.
# Note that the limit is 65500 which is less than 65535 (2^16 -1).
# The reason is that utils.cut() is not exactly accurate in case if
# the value is not a string, but, for example, a dictionary. If we
# limit it exactly to 65535 then once in a while it may go slightly
# beyond the allowed maximum size. It may depend on the order of
# keys in a string representation and other things that are hidden
# inside utils.cut_dict() method.
cls.state_info,
'set',
lambda t, v, o, i: utils.cut(v, 65532),
lambda t, v, o, i: utils.cut(v, 65500),
retval=True
)

View File

@ -1356,7 +1356,7 @@ class ActionExecutionTest(SQLAlchemyTest):
self.assertEqual('FAILED', updated.state)
state_info = db_api.load_action_execution(updated.id).state_info
self.assertEqual(
65535,
65503,
len(state_info)
)
@ -1652,7 +1652,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual('FAILED', updated.state)
state_info = db_api.load_workflow_execution(updated.id).state_info
self.assertEqual(
65535,
65503,
len(state_info)
)

View File

@ -15,6 +15,7 @@
from oslo_config import cfg
from mistral.actions import base as actions_base
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import workflows as wf_service
@ -36,27 +37,42 @@ wf:
input:
- workflow_input: '__WORKFLOW_INPUT__'
- action_output_length: 0
- action_output_dict: false
- action_error: false
tasks:
task1:
action: my_action
input:
action_input: '__ACTION_INPUT__'
action_output_length: <% $.action_output_length %>
input: '__ACTION_INPUT__'
output_length: <% $.action_output_length %>
output_dict: <% $.action_output_dict %>
error: <% $.action_error %>
publish:
p_var: '__TASK_PUBLISHED__'
"""
class MyAction(actions_base.Action):
def __init__(self, action_input, action_output_length):
self.action_input = action_input
self.action_output_length = action_output_length
def __init__(self, input, output_length, output_dict=False, error=False):
self.input = input
self.output_length = output_length
self.output_dict = output_dict
self.error = error
def run(self):
return wf_utils.Result(
data=''.join('A' for _ in range(self.action_output_length))
)
if not self.output_dict:
result = ''.join('A' for _ in range(self.output_length))
else:
result = {}
for i in range(self.output_length):
result[i] = 'A'
if not self.error:
return wf_utils.Result(data=result)
else:
return wf_utils.Result(error=result)
def test(self):
raise NotImplementedError
@ -229,3 +245,39 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
"Size of 'params' is 1KB which exceeds the limit of 0KB",
e.message
)
def test_task_execution_state_info_trimmed(self):
# No limit on output, input and other JSON fields.
cfg.CONF.set_default(
'execution_field_size_limit_kb',
-1,
group='engine'
)
wf_service.create_workflows(WF)
# Start workflow.
wf_ex = self.engine.start_workflow(
'wf',
{
'action_output_length': 80000,
'action_output_dict': True,
'action_error': True
}
)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
state=states.ERROR
)
# "state_info" must be trimmed so that it's not greater than 65535.
self.assertLess(len(task_ex.state_info), 65536)
self.assertGreater(len(task_ex.state_info), 65490)
self.assertLess(len(wf_ex.state_info), 65536)
self.assertGreater(len(wf_ex.state_info), 65490)

View File

@ -185,6 +185,27 @@ def get_file_list(directory):
def cut_dict(d, length=100):
"""Removes dictionary entries according to the given length.
This method removes a number of entries, if needed, so that a
string representation would fit into the given length.
The intention of this method is to optimize truncation of string
representation for dictionaries where the exact precision is not
critically important. Otherwise, we'd always have to convert a dict
into a string first and then shrink it to a needed size which will
increase memory footprint and reduce performance in case of large
dictionaries (i.e. tens of thousands entries).
Note that the method, due to complexity of the algorithm, has some
non-zero precision which depends on exact keys and values placed into
the dict. So for some dicts their reduced string representations will
be only approximately equal to the given value (up to around several
chars difference).
:param d: A dictionary.
:param length: A length limiting the dictionary string representation.
:return: A dictionary which is a subset of the given dictionary.
"""
if not isinstance(d, dict):
raise ValueError("A dictionary is expected, got: %s" % type(d))