Merge "Use partial loading of workflow execution in the controller"

This commit is contained in:
Zuul 2018-05-16 11:21:27 +00:00 committed by Gerrit Code Review
commit d703e15a5f
5 changed files with 52 additions and 16 deletions

View File

@ -27,6 +27,7 @@ from mistral.api.controllers.v2 import task
from mistral.api.controllers.v2 import types
from mistral import context
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral import exceptions as exc
from mistral.rpc import clients as rpc
from mistral.services import workflows as wf_service
@ -117,7 +118,10 @@ class ExecutionsController(rest.RestController):
def _compute_delta(wf_ex):
with db_api.transaction():
# ensure that workflow execution exists
db_api.get_workflow_execution(id)
db_api.get_workflow_execution(
id,
fields=(db_models.WorkflowExecution.id,)
)
delta = {}
@ -283,12 +287,15 @@ class ExecutionsController(rest.RestController):
LOG.debug("Delete execution [id=%s]", id)
if not force:
wf_ex = db_api.get_workflow_execution(id)
state = db_api.get_workflow_execution(
id,
fields=(db_models.WorkflowExecution.state,)
)[0]
if not states.is_completed(wf_ex.state):
if not states.is_completed(state):
raise exc.NotAllowedException(
"Only completed executions can be deleted."
" Execution {} is in {} state".format(id, wf_ex.state)
" Execution {} is in {} state".format(id, state)
)
return rest_utils.rest_retry_on_db_error(

View File

@ -242,8 +242,9 @@ def delete_action_executions(**kwargs):
# Workflow executions.
def get_workflow_execution(id):
return IMPL.get_workflow_execution(id)
# TODO(rakhmerov): Add 'fields' parameter to all 'get' methods.
def get_workflow_execution(id, fields=()):
return IMPL.get_workflow_execution(id, fields=fields)
def load_workflow_execution(name):

View File

@ -293,8 +293,12 @@ def _get_db_object_by_name(model, name, filter_=None, order_by=None):
return query.filter(final_filter).first()
def _get_db_object_by_id(model, id, insecure=False):
query = b.model_query(model) if insecure else _secure_query(model)
def _get_db_object_by_id(model, id, insecure=False, columns=()):
query = (
b.model_query(model, columns=columns)
if insecure
else _secure_query(model, *columns)
)
return query.filter_by(id=id).first()
@ -310,6 +314,7 @@ def _get_db_object_by_name_and_namespace_or_id(model, identifier,
match_name = sa.and_(match_name, model.namespace == namespace)
match_id = model.id == identifier
query = query.filter(
sa.or_(
match_id,
@ -747,13 +752,14 @@ def _get_action_executions(**kwargs):
# Workflow executions.
@b.session_aware()
def get_workflow_execution(id, session=None):
def get_workflow_execution(id, fields=(), session=None):
ctx = context.ctx()
wf_ex = _get_db_object_by_id(
models.WorkflowExecution,
id,
insecure=ctx.is_admin
insecure=ctx.is_admin,
columns=fields
)
if not wf_ex:
@ -1116,9 +1122,12 @@ def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
def _get_completed_root_executions_query(columns):
query = b.model_query(models.WorkflowExecution, columns=columns)
# Only WorkflowExecution that are not a child of other WorkflowExecution.
query = query.filter(models.WorkflowExecution.
task_execution_id == sa.null())
query = query.filter(
models.WorkflowExecution.task_execution_id == sa.null()
)
query = query.filter(
models.WorkflowExecution.state.in_(
[states.SUCCESS,
@ -1126,6 +1135,7 @@ def _get_completed_root_executions_query(columns):
states.CANCELLED]
)
)
return query

View File

@ -372,7 +372,11 @@ class TestExecutionsController(base.APITest):
resp = self.app.put_json('/v2/executions/123', update_params)
self.assertEqual(200, resp.status_int)
mock_ensure.assert_called_once_with('123')
mock_ensure.assert_called_once_with(
'123',
fields=(models.WorkflowExecution.id,)
)
mock_update.assert_called_once_with('123', update_params)
@mock.patch.object(
@ -704,7 +708,7 @@ class TestExecutionsController(base.APITest):
@mock.patch.object(
db_api,
'get_workflow_execution',
MOCK_WF_EX
mock.MagicMock(return_value=(states.RUNNING,))
)
def test_delete_unfished_execution(self):
resp = self.app.delete('/v2/executions/123', expect_errors=True)
@ -718,7 +722,7 @@ class TestExecutionsController(base.APITest):
@mock.patch.object(db_api,
'get_workflow_execution',
MOCK_ERROR_WF_EX)
mock.MagicMock(return_value=(states.ERROR,)))
@mock.patch.object(db_api,
'delete_workflow_execution',
MOCK_DELETE)
@ -729,7 +733,7 @@ class TestExecutionsController(base.APITest):
@mock.patch.object(db_api,
'get_workflow_execution',
MOCK_SUCCESS_WF_EX)
mock.MagicMock(return_value=(states.SUCCESS,)))
@mock.patch.object(db_api,
'delete_workflow_execution',
MOCK_DELETE)

View File

@ -1504,6 +1504,20 @@ class WorkflowExecutionTest(SQLAlchemyTest):
db_api.load_workflow_execution("not-existing-id")
)
def test_get_workflow_execution_with_columns(self):
with db_api.transaction():
created = db_api.create_workflow_execution(WF_EXECS[0])
fetched = db_api.get_workflow_execution(
created.id,
fields=(db_models.WorkflowExecution.state,)
)
self.assertNotEqual(created, fetched)
self.assertIsInstance(fetched, tuple)
self.assertEqual(1, len(fetched))
self.assertEqual(created.state, fetched.state)
def test_update_workflow_execution(self):
with db_api.transaction():
created = db_api.create_workflow_execution(WF_EXECS[0])