Support large datasets for execution objects

Allow the cloud provider to configure the
size limit of workflow input, action input,
action output, task publish and workflow params

This will allow users to execute workflows that
handle much bigger datasets from the ones which
are supported today.

The changes made in order to achieve that goal:
* Increase DB columns size so they will no longer
  be the barrier
* Add configuration options to control the limit
* Add event listener on the columns to enforce
  the size limitation

Change-Id: If7c29f9325e60ce456e23d5c7b6ceb3477a028d4
Implements: blueprint support-large-datasets
This commit is contained in:
Moshe Elisha 2015-07-23 17:13:37 +00:00
parent 0a6da145cd
commit 30589cfa5f
6 changed files with 300 additions and 7 deletions

View File

@ -327,6 +327,9 @@
# The version of the executor. (string value)
#version = 1.0
# The default maximum size in KB of large text fields of runtime
# execution objects. Use -1 for no limit. (integer value)
#execution_field_size_limit_kb = 1024
[keystone_authtoken]

View File

@ -72,7 +72,10 @@ engine_opts = [
cfg.StrOpt('topic', default='mistral_engine',
help='The message topic that the engine listens on.'),
cfg.StrOpt('version', default='1.0',
help='The version of the engine.')
help='The version of the engine.'),
cfg.IntOpt('execution_field_size_limit_kb', default=1024,
help='The default maximum size in KB of large text fields '
'of runtime execution objects. Use -1 for no limit.'),
]
executor_opts = [

View File

@ -0,0 +1,45 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Increase executions_v2 column size from JsonDictType to JsonLongDictType
Revision ID: 005
Revises: 004
Create Date: 2015-07-21 08:48:51.636094
"""
# revision identifiers, used by Alembic.
revision = '005'
down_revision = '004'
from alembic import op
from mistral.db.sqlalchemy import types as st
def upgrade():
# Changing column types from JsonDictType to JsonLongDictType
op.alter_column('executions_v2', 'runtime_context',
type_=st.JsonLongDictType())
op.alter_column('executions_v2', 'input',
type_=st.JsonLongDictType())
op.alter_column('executions_v2', 'params',
type_=st.JsonLongDictType())
op.alter_column('executions_v2', 'context',
type_=st.JsonLongDictType())
op.alter_column('executions_v2', 'action_spec',
type_=st.JsonLongDictType())
op.alter_column('executions_v2', 'published',
type_=st.JsonLongDictType())

View File

@ -19,14 +19,21 @@ import sqlalchemy as sa
from sqlalchemy import event
from sqlalchemy.orm import backref
from sqlalchemy.orm import relationship
import sys
from oslo_config import cfg
from oslo_log import log as logging
from mistral.db.sqlalchemy import model_base as mb
from mistral.db.sqlalchemy import types as st
from mistral import exceptions as exc
from mistral import utils
# Definition objects.
LOG = logging.getLogger(__name__)
class Definition(mb.MistralSecureModelBase):
__abstract__ = True
@ -106,7 +113,7 @@ class Execution(mb.MistralSecureModelBase):
# Runtime context like iteration_no of a repeater.
# Effectively internal engine properties which will be used to determine
# execution of a task.
runtime_context = sa.Column(st.JsonDictType())
runtime_context = sa.Column(st.JsonLongDictType())
class ActionExecution(Execution):
@ -118,7 +125,7 @@ class ActionExecution(Execution):
# Main properties.
accepted = sa.Column(sa.Boolean(), default=False)
input = sa.Column(st.JsonDictType(), nullable=True)
input = sa.Column(st.JsonLongDictType(), nullable=True)
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
@ -131,10 +138,10 @@ class WorkflowExecution(ActionExecution):
}
# Main properties.
params = sa.Column(st.JsonDictType())
params = sa.Column(st.JsonLongDictType())
# TODO(rakhmerov): We need to get rid of this field at all.
context = sa.Column(st.JsonDictType())
context = sa.Column(st.JsonLongDictType())
class TaskExecution(Execution):
@ -145,7 +152,7 @@ class TaskExecution(Execution):
}
# Main properties.
action_spec = sa.Column(st.JsonDictType())
action_spec = sa.Column(st.JsonLongDictType())
# Whether the task is fully processed (publishing and calculating commands
# after it). It allows to simplify workflow controller implementations
@ -154,7 +161,7 @@ class TaskExecution(Execution):
# Data Flow properties.
in_context = sa.Column(st.JsonLongDictType())
published = sa.Column(st.JsonDictType())
published = sa.Column(st.JsonLongDictType())
for cls in utils.iter_subclasses(Execution):
@ -166,6 +173,40 @@ for cls in utils.iter_subclasses(Execution):
retval=True
)
def validate_long_type_length(cls, field_name, value):
"""Makes sure the value does not exceeds the maximum size."""
if value:
# Get the configured limit.
size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb
# If the size is unlimited.
if (size_limit_kb < 0):
return
size_kb = sys.getsizeof(str(value)) / 1024
if (size_kb > size_limit_kb):
LOG.error(
"Size limit %dKB exceed for class [%s], "
"field %s of size %dKB.",
size_limit_kb, str(cls), field_name, size_kb
)
raise exc.SizeLimitExceededException(field_name, size_kb,
size_limit_kb)
def register_length_validator(attr_name):
"""Register an event listener on the attribute that will
validate the size every time a 'set' occurs.
"""
for cls in utils.iter_subclasses(Execution):
if hasattr(cls, attr_name):
event.listen(
getattr(cls, attr_name),
'set',
lambda t, v, o, i: validate_long_type_length(cls, attr_name, v)
)
# Many-to-one for 'Execution' and 'TaskExecution'.
Execution.task_execution_id = sa.Column(
@ -296,3 +337,8 @@ class CronTrigger(mb.MistralSecureModelBase):
# Register all hooks related to secure models.
mb.register_secure_model_hooks()
# Register an event listener to verify that the size of all the long columns
# affected by the user do not exceed the limit configuration.
for attr_name in ['input', 'output', 'params', 'published']:
register_length_validator(attr_name)

View File

@ -110,3 +110,12 @@ class InvalidModelException(DSLParsingException):
class InvalidResultException(MistralException):
http_code = 400
message = "Unable to parse result"
class SizeLimitExceededException(MistralException):
http_code = 400
def __init__(self, field_name, size_kb, size_limit_kb):
super(SizeLimitExceededException, self).__init__(
"Size of '%s' is %dKB which exceeds the limit of %dKB"
% (field_name, size_kb, size_limit_kb))

View File

@ -0,0 +1,187 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
import testtools
from mistral.actions import base as actions_base
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import workflows as wf_service
from mistral.tests import base as test_base
from mistral.tests.unit.engine import base
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WF = """
---
version: '2.0'
wf:
input:
- workflow_input: '__WORKFLOW_INPUT__'
- action_output_length: 0
tasks:
task1:
action: my_action
input:
action_input: '__ACTION_INPUT__'
action_output_length: <% $.action_output_length %>
publish:
p_var: '__TASK_PUBLISHED__'
"""
class MyAction(actions_base.Action):
def __init__(self, action_input, action_output_length):
self.action_input = action_input
self.action_output_length = action_output_length
def run(self):
return wf_utils.Result(
data=''.join('A' for _ in range(self.action_output_length))
)
def test(self):
raise NotImplementedError
def expect_size_limit_exception(field_name):
def logger(test_func):
def wrapped(*args, **kwargs):
with testtools.ExpectedException(exc.SizeLimitExceededException,
value_re="Size of '%s' is 1KB "
"which exceeds the limit"
" of 0KB" % field_name):
return test_func(*args, **kwargs)
return wrapped
return logger
def generate_workflow(tokens):
new_wf = WF
long_string = ''.join('A' for _ in range(1024))
for token in tokens:
new_wf = new_wf.replace(token, long_string)
return new_wf
class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
def setUp(self):
"""Resets the size limit config between tests"""
super(ExecutionFieldsSizeLimitTest, self).setUp()
cfg.CONF.set_default('execution_field_size_limit_kb', 0,
group='engine')
test_base.register_action_class('my_action', MyAction)
def test_default_limit(self):
cfg.CONF.set_default('execution_field_size_limit_kb', -1,
group='engine')
new_wf = generate_workflow(
['__ACTION_INPUT_', '__WORKFLOW_INPUT__',
'__TASK_PUBLISHED__'])
wf_service.create_workflows(new_wf)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self._await(lambda: self.is_execution_success(wf_ex.id))
@expect_size_limit_exception('input')
def test_workflow_input_default_value_limit(self):
new_wf = generate_workflow(['__WORKFLOW_INPUT__'])
wf_service.create_workflows(new_wf)
# Start workflow.
self.engine.start_workflow('wf', {})
@expect_size_limit_exception('input')
def test_workflow_input_limit(self):
wf_service.create_workflows(WF)
# Start workflow.
self.engine.start_workflow(
'wf',
{
'workflow_input': ''.join('A' for _ in range(1024))
}
)
@expect_size_limit_exception('input')
def test_action_input_limit(self):
new_wf = generate_workflow(['__ACTION_INPUT__'])
wf_service.create_workflows(new_wf)
# Start workflow.
self.engine.start_workflow('wf', {})
def test_action_output_limit(self):
wf_service.create_workflows(WF)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {
'action_output_length': 1024
})
self._await(lambda: self.is_execution_error(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual("Size of 'output' is 1KB which exceeds "
"the limit of 0KB",
wf_ex.state_info)
def test_task_published_limit(self):
new_wf = generate_workflow(['__TASK_PUBLISHED__'])
wf_service.create_workflows(new_wf)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self._await(lambda: self.is_execution_error(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual("Size of 'published' is 1KB which exceeds "
"the limit of 0KB",
wf_ex.state_info)
@expect_size_limit_exception('params')
def test_workflow_params_limit(self):
wf_service.create_workflows(WF)
# Start workflow.
long_string = ''.join('A' for _ in range(1024))
self.engine.start_workflow('wf', {}, '', env={'param': long_string})
def tearDown(self):
"""Restores the size limit config to default"""
super(ExecutionFieldsSizeLimitTest, self).tearDown()
cfg.CONF.set_default('execution_field_size_limit_kb', 1024,
group='engine')