Make more JSON fields in execution objects lazy-loaded

* Some fields of execution objects can be large and, what's even more
  important, their size is defined by user input. Making these fields
  lazy-loaded reduced the amount of data loaded from DB in those
  methods where ther are not needed. For example, "spec" field of the
  WorkflowExecution class is rarely needed because the corresponding
  specification Python object gets cached. It's proven to reduce
  the execution time in some cases by ~5-10%.
* Fixed failing test

Change-Id: Ica0ba2ffa312891483745d276d04c95985c7f4c2
This commit is contained in:
Renat Akhmerov 2019-06-13 16:19:03 +07:00
parent 5c5e948d37
commit 0c3b630609
12 changed files with 249 additions and 210 deletions

View File

@ -51,17 +51,26 @@ STATE_TYPES = wtypes.Enum(
)
def _load_deferred_output_field(ex):
if ex:
# We need to refer to this lazy-load field explicitly in
# order to make sure that it is correctly loaded.
hasattr(ex, 'output')
def _load_deferred_fields(ex, fields):
if not ex:
return ex
# We need to refer lazy-loaded fields explicitly in
# order to make sure that they are correctly loaded.
for f in fields:
hasattr(ex, f)
return ex
def _get_workflow_execution_resource_with_output(wf_ex):
_load_deferred_fields(wf_ex, ['params', 'output'])
return resources.Execution.from_db_model(wf_ex)
def _get_workflow_execution_resource(wf_ex):
_load_deferred_output_field(wf_ex)
_load_deferred_fields(wf_ex, ['params'])
return resources.Execution.from_db_model(wf_ex)
@ -75,7 +84,7 @@ def _get_workflow_execution(id, must_exist=True):
else:
wf_ex = db_api.load_workflow_execution(id)
return _load_deferred_output_field(wf_ex)
return _load_deferred_fields(wf_ex, ['params', 'output'])
# TODO(rakhmerov): Make sure to make all needed renaming on public API.
@ -398,9 +407,9 @@ class ExecutionsController(rest.RestController):
)
if include_output:
resource_function = _get_workflow_execution_resource
resource_function = _get_workflow_execution_resource_with_output
else:
resource_function = None
resource_function = _get_workflow_execution_resource
return rest_utils.get_all(
resources.Executions,

View File

@ -177,7 +177,6 @@ class Execution(mb.MistralSecureModelBase):
workflow_name = sa.Column(sa.String(255))
workflow_namespace = sa.Column(sa.String(255))
workflow_id = sa.Column(sa.String(80))
spec = sa.Column(st.JsonMediumDictType())
state = sa.Column(sa.String(20))
state_info = sa.Column(sa.Text(), nullable=True)
tags = sa.Column(st.JsonListType())
@ -199,6 +198,7 @@ class ActionExecution(Execution):
)
# Main properties.
spec = sa.Column(st.JsonMediumDictType())
accepted = sa.Column(sa.Boolean(), default=False)
input = sa.Column(st.JsonLongDictType(), nullable=True)
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
@ -224,10 +224,11 @@ class WorkflowExecution(Execution):
)
# Main properties.
spec = sa.orm.deferred(sa.Column(st.JsonMediumDictType()))
accepted = sa.Column(sa.Boolean(), default=False)
input = sa.Column(st.JsonLongDictType(), nullable=True)
input = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
params = sa.Column(st.JsonLongDictType())
params = sa.orm.deferred(sa.Column(st.JsonLongDictType()))
# Initial workflow context containing workflow variables, environment,
# openstack security context etc.
@ -235,7 +236,7 @@ class WorkflowExecution(Execution):
# * Data stored in this structure should not be copied into inbound
# contexts of tasks. No need to duplicate it.
# * This structure does not contain workflow input.
context = sa.Column(st.JsonLongDictType())
context = sa.orm.deferred(sa.Column(st.JsonLongDictType()))
class TaskExecution(Execution):
@ -252,6 +253,7 @@ class TaskExecution(Execution):
)
# Main properties.
spec = sa.orm.deferred(sa.Column(st.JsonMediumDictType()))
action_spec = sa.Column(st.JsonLongDictType())
unique_key = sa.Column(sa.String(255), nullable=True)
type = sa.Column(sa.String(10))

View File

@ -898,7 +898,7 @@ class TestExecutionsController(base.APITest):
resource_function = kwargs['resource_function']
self.assertEqual(
execution._get_workflow_execution_resource,
execution._get_workflow_execution_resource_with_output,
resource_function
)
@ -912,7 +912,10 @@ class TestExecutionsController(base.APITest):
args, kwargs = mock_get_all.call_args
resource_function = kwargs['resource_function']
self.assertIsNone(resource_function)
self.assertEqual(
execution._get_workflow_execution_resource,
resource_function
)
@mock.patch('mistral.db.v2.api.get_workflow_executions')
@mock.patch('mistral.context.MistralContext.from_environ')

View File

@ -2112,17 +2112,18 @@ class TaskExecutionTest(SQLAlchemyTest):
self.assertIsNone(created.updated_at)
updated = db_api.update_task_execution(
created.id,
{'workflow_name': 'new_wf'}
)
with db_api.transaction():
updated = db_api.update_task_execution(
created.id,
{'workflow_name': 'new_wf'}
)
self.assertEqual('new_wf', updated.workflow_name)
self.assertEqual('new_wf', updated.workflow_name)
fetched = db_api.get_task_execution(created.id)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
def test_create_or_update_task_execution(self):
id = 'not-existing-id'
@ -2139,20 +2140,21 @@ class TaskExecutionTest(SQLAlchemyTest):
self.assertIsNotNone(created)
self.assertIsNotNone(created.id)
updated = db_api.create_or_update_task_execution(
created.id,
{'state': 'RUNNING'}
)
with db_api.transaction():
updated = db_api.create_or_update_task_execution(
created.id,
{'state': 'RUNNING'}
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_task_execution(updated.id).state
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_task_execution(updated.id).state
)
fetched = db_api.get_task_execution(created.id)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(updated, fetched)
self.assertEqual(updated, fetched)
def test_get_task_executions(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
@ -2183,10 +2185,12 @@ class TaskExecutionTest(SQLAlchemyTest):
created.name,
'eq'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
def test_filter_task_execution_by_not_equal_value(self):
created0, created1 = self._create_task_executions()
@ -2197,10 +2201,11 @@ class TaskExecutionTest(SQLAlchemyTest):
'neq'
)
fetched = db_api.get_task_executions(**_filter)
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_greater_than_value(self):
created0, created1 = self._create_task_executions()
@ -2210,10 +2215,12 @@ class TaskExecutionTest(SQLAlchemyTest):
created0['created_at'],
'gt'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_greater_than_equal_value(self):
created0, created1 = self._create_task_executions()
@ -2223,6 +2230,7 @@ class TaskExecutionTest(SQLAlchemyTest):
created0['created_at'],
'gte'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(2, len(fetched))
@ -2237,10 +2245,12 @@ class TaskExecutionTest(SQLAlchemyTest):
created1['created_at'],
'lt'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
def test_filter_task_execution_by_less_than_equal_value(self):
created0, created1 = self._create_task_executions()
@ -2250,6 +2260,7 @@ class TaskExecutionTest(SQLAlchemyTest):
created1['created_at'],
'lte'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(2, len(fetched))
@ -2264,12 +2275,14 @@ class TaskExecutionTest(SQLAlchemyTest):
[created['created_at']],
'in'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
def test_filter_task_execution_by_values_notin_list(self):
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
def test_filter_task_execution_by_values_not_in_list(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
@ -2277,10 +2290,12 @@ class TaskExecutionTest(SQLAlchemyTest):
[created0['created_at']],
'nin'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_multiple_columns(self):
created0, created1 = self._create_task_executions()
@ -2296,10 +2311,12 @@ class TaskExecutionTest(SQLAlchemyTest):
'eq',
_filter
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_delete_task_execution(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
@ -2307,13 +2324,14 @@ class TaskExecutionTest(SQLAlchemyTest):
values = copy.deepcopy(TASK_EXECS[0])
values.update({'workflow_execution_id': wf_ex.id})
created = db_api.create_task_execution(values)
with db_api.transaction():
created = db_api.create_task_execution(values)
fetched = db_api.get_task_execution(created.id)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(created, fetched)
self.assertEqual(created, fetched)
db_api.delete_task_execution(created.id)
db_api.delete_task_execution(created.id)
self.assertRaises(
exc.DBEntityNotFoundError,
@ -2328,43 +2346,44 @@ class TaskExecutionTest(SQLAlchemyTest):
values.update({'workflow_execution_id': wf_ex.id})
values['state'] = 'RUNNING'
task_ex1 = db_api.create_task_execution(values)
with db_api.transaction():
task_ex1 = db_api.create_task_execution(values)
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
)
# Add one more task.
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
workflow_execution_id=wf_ex.id
)
)
values = copy.deepcopy(TASK_EXECS[1])
values.update({'workflow_execution_id': wf_ex.id})
values['state'] = 'SUCCESS'
# Add one more task.
db_api.create_task_execution(values)
values = copy.deepcopy(TASK_EXECS[1])
values.update({'workflow_execution_id': wf_ex.id})
values['state'] = 'SUCCESS'
# It should be still one incompleted task.
db_api.create_task_execution(values)
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
# It should be still one incompleted task.
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
)
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
workflow_execution_id=wf_ex.id
)
)
def test_task_execution_repr(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])

View File

@ -127,15 +127,15 @@ class DefaultEngineTest(base.DbTestCase):
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self.assertEqual(1, len(task_execs))
task_ex = task_execs[0]
task_ex = task_execs[0]
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
# Data Flow properties.
action_execs = db_api.get_action_executions(
@ -196,15 +196,15 @@ class DefaultEngineTest(base.DbTestCase):
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self.assertEqual(1, len(task_execs))
task_ex = task_execs[0]
task_ex = task_execs[0]
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
# Data Flow properties.
action_execs = db_api.get_action_executions(
@ -234,9 +234,10 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIsNotNone(wf_ex)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual(wf_ex.params.get('env', {}), env)
self.assertDictEqual(wf_ex.params.get('env', {}), env)
@mock.patch.object(db_api, "load_environment", MOCK_ENVIRONMENT)
def test_start_workflow_with_saved_env(self):
@ -256,9 +257,10 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIsNotNone(wf_ex)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual(wf_ex.params.get('env', {}), env)
self.assertDictEqual(wf_ex.params.get('env', {}), env)
@mock.patch.object(db_api, "get_environment", MOCK_NOT_FOUND)
def test_start_workflow_env_not_found(self):
@ -463,15 +465,15 @@ class DefaultEngineTest(base.DbTestCase):
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self.assertEqual(1, len(task_execs))
task1_ex = task_execs[0]
task1_ex = task_execs[0]
self.assertEqual('task1', task1_ex.name)
self.assertEqual(states.RUNNING, task1_ex.state)
self.assertIsNotNone(task1_ex.spec)
self.assertDictEqual({}, task1_ex.runtime_context)
self.assertNotIn('__execution', task1_ex.in_context)
self.assertEqual('task1', task1_ex.name)
self.assertEqual(states.RUNNING, task1_ex.state)
self.assertIsNotNone(task1_ex.spec)
self.assertDictEqual({}, task1_ex.runtime_context)
self.assertNotIn('__execution', task1_ex.in_context)
action_execs = db_api.get_action_executions(
task_execution_id=task1_ex.id

View File

@ -335,10 +335,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
task_10_ex = self._assert_single_item(task_execs, name='t10')
task_21_ex = self._assert_single_item(task_execs, name='t21')

View File

@ -116,22 +116,23 @@ class EnvironmentTest(base.EngineTestCase):
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
wf_execs = db_api.get_workflow_executions()
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(wf_execs))
self.assertEqual(2, len(wf_execs))
# Execution of 'wf1'.
# Execution of 'wf1'.
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
expected_wf1_input = {
'param1': 'Bonnie',
'param2': 'Clyde'
}
expected_wf1_input = {
'param1': 'Bonnie',
'param2': 'Clyde'
}
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictEqual(wf1_ex.input, expected_wf1_input)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictEqual(wf1_ex.input, expected_wf1_input)
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)
@ -389,9 +390,9 @@ class EnvironmentTest(base.EngineTestCase):
name='task1'
)
self.assertDictEqual({'result': 'val1'}, t.published)
self.assertDictEqual({'result': 'val1'}, t.published)
self.assertNotIn('__env', wf_ex.context)
self.assertNotIn('__env', wf_ex.context)
def test_subworkflow_env_no_duplicate(self):
wf_text = """---
@ -444,8 +445,8 @@ class EnvironmentTest(base.EngineTestCase):
sub_wf_ex.output
)
# The environment of the subworkflow must be empty.
# To evaluate expressions it should be taken from the
# parent workflow execution.
self.assertDictEqual({}, sub_wf_ex.params['env'])
self.assertNotIn('__env', sub_wf_ex.context)
# The environment of the subworkflow must be empty.
# To evaluate expressions it should be taken from the
# parent workflow execution.
self.assertDictEqual({}, sub_wf_ex.params['env'])
self.assertNotIn('__env', sub_wf_ex.context)

View File

@ -211,10 +211,10 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
@ -232,11 +232,12 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(task_2_ex.id, env=updated_env)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertDictEqual(updated_env, wf_ex.params['env'])
# Wait for the workflow to succeed.
self.await_workflow_success(wf_ex.id)

View File

@ -238,30 +238,31 @@ class SubworkflowsTest(base.EngineTestCase):
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
wf_execs = db_api.get_workflow_executions()
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(wf_execs))
self.assertEqual(2, len(wf_execs))
# Execution of 'wf2'.
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
# Execution of 'wf2'.
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
self.assertEqual(project_id, wf1_ex.project_id)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset(
{
'task_name': 'task2',
'task_execution_id': wf1_ex.task_execution_id
},
wf1_ex.params
)
self.assertDictEqual(
{
'param1': 'Bonnie',
'param2': 'Clyde'
},
wf1_ex.input
)
self.assertEqual(project_id, wf1_ex.project_id)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset(
{
'task_name': 'task2',
'task_execution_id': wf1_ex.task_execution_id
},
wf1_ex.params
)
self.assertDictEqual(
{
'param1': 'Bonnie',
'param2': 'Clyde'
},
wf1_ex.input
)
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)
@ -360,16 +361,17 @@ class SubworkflowsTest(base.EngineTestCase):
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
wf_execs = db_api.get_workflow_executions()
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(wf_execs))
self.assertEqual(2, len(wf_execs))
# Execution of 'wf1'.
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
# Execution of 'wf1'.
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset({}, wf1_ex.params)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset({}, wf1_ex.params)
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)

View File

@ -421,14 +421,14 @@ class WorkflowResumeTest(base.EngineTestCase):
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='task1')
task_2_ex = self._assert_single_item(task_execs, name='task2')
task_1_ex = self._assert_single_item(task_execs, name='task1')
task_2_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.IDLE, task_2_ex.state)
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.IDLE, task_2_ex.state)
# Update env in workflow execution with the following.
updated_env = {
@ -446,13 +446,13 @@ class WorkflowResumeTest(base.EngineTestCase):
task_execs = wf_ex.task_executions
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertEqual(3, len(task_execs))
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertEqual(3, len(task_execs))
# Check result of task2.
task_2_ex = self._assert_single_item(task_execs, name='task2')
# Check result of task2.
task_2_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.SUCCESS, task_2_ex.state)
self.assertEqual(states.SUCCESS, task_2_ex.state)
# Re-read task execution, otherwise lazy loading of action executions
# may not work.
@ -461,15 +461,15 @@ class WorkflowResumeTest(base.EngineTestCase):
task_2_result = data_flow.get_task_execution_result(task_2_ex)
self.assertEqual(updated_env['var1'], task_2_result)
self.assertEqual(updated_env['var1'], task_2_result)
# Check result of task3.
task_3_ex = self._assert_single_item(
task_execs,
name='task3'
)
# Check result of task3.
task_3_ex = self._assert_single_item(
task_execs,
name='task3'
)
self.assertEqual(states.SUCCESS, task_3_ex.state)
self.assertEqual(states.SUCCESS, task_3_ex.state)
# Re-read task execution, otherwise lazy loading of action executions
# may not work.
@ -478,4 +478,4 @@ class WorkflowResumeTest(base.EngineTestCase):
task_3_result = data_flow.get_task_execution_result(task_3_ex)
self.assertEqual(updated_env['var2'], task_3_result)
self.assertEqual(updated_env['var2'], task_3_result)

View File

@ -185,19 +185,19 @@ class SpecificationCachingTest(base.DbTestCase):
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
wf_ex = db_api.create_workflow_execution({
'id': '1-2-3-4',
'name': 'wf',
'workflow_id': wf_def.id,
'spec': wf_spec.to_dict(),
'state': states.RUNNING
})
with db_api.transaction():
wf_ex = db_api.create_workflow_execution({
'id': '1-2-3-4',
'name': 'wf',
'workflow_id': wf_def.id,
'spec': wf_spec.to_dict(),
'state': states.RUNNING
})
# Check that we can get a valid spec by execution id.
wf_spec_by_exec_id = spec_parser.get_workflow_spec_by_execution_id(
wf_ex.id
)
# Check that we can get a valid spec by execution id.
wf_spec_by_exec_id = spec_parser.get_workflow_spec_by_execution_id(
wf_ex.id
)
self.assertEqual(1, len(wf_spec_by_exec_id.get_tasks()))

View File

@ -303,17 +303,17 @@ class WorkflowServiceTest(base.DbTestCase):
update_env
)
fetched = db_api.get_workflow_execution(created.id)
fetched = db_api.get_workflow_execution(created.id)
self.assertDictEqual(
wf_exec['params']['env'],
fetched.params['env']
)
self.assertDictEqual(
wf_exec['params']['env'],
fetched.params['env']
)
self.assertDictEqual(
wf_exec['context']['__env'],
fetched.context['__env']
)
self.assertDictEqual(
wf_exec['context']['__env'],
fetched.context['__env']
)
def test_with_long_task_name(self):
long_task_name = utils.generate_string(tasks.MAX_LENGTH_TASK_NAME + 1)