Merge "Use partial loading of workflow execution in the controller"
This commit is contained in:
commit
d703e15a5f
|
@ -27,6 +27,7 @@ from mistral.api.controllers.v2 import task
|
|||
from mistral.api.controllers.v2 import types
|
||||
from mistral import context
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral import exceptions as exc
|
||||
from mistral.rpc import clients as rpc
|
||||
from mistral.services import workflows as wf_service
|
||||
|
@ -117,7 +118,10 @@ class ExecutionsController(rest.RestController):
|
|||
def _compute_delta(wf_ex):
|
||||
with db_api.transaction():
|
||||
# ensure that workflow execution exists
|
||||
db_api.get_workflow_execution(id)
|
||||
db_api.get_workflow_execution(
|
||||
id,
|
||||
fields=(db_models.WorkflowExecution.id,)
|
||||
)
|
||||
|
||||
delta = {}
|
||||
|
||||
|
@ -283,12 +287,15 @@ class ExecutionsController(rest.RestController):
|
|||
LOG.debug("Delete execution [id=%s]", id)
|
||||
|
||||
if not force:
|
||||
wf_ex = db_api.get_workflow_execution(id)
|
||||
state = db_api.get_workflow_execution(
|
||||
id,
|
||||
fields=(db_models.WorkflowExecution.state,)
|
||||
)[0]
|
||||
|
||||
if not states.is_completed(wf_ex.state):
|
||||
if not states.is_completed(state):
|
||||
raise exc.NotAllowedException(
|
||||
"Only completed executions can be deleted."
|
||||
" Execution {} is in {} state".format(id, wf_ex.state)
|
||||
" Execution {} is in {} state".format(id, state)
|
||||
)
|
||||
|
||||
return rest_utils.rest_retry_on_db_error(
|
||||
|
|
|
@ -242,8 +242,9 @@ def delete_action_executions(**kwargs):
|
|||
|
||||
# Workflow executions.
|
||||
|
||||
def get_workflow_execution(id):
|
||||
return IMPL.get_workflow_execution(id)
|
||||
# TODO(rakhmerov): Add 'fields' parameter to all 'get' methods.
|
||||
def get_workflow_execution(id, fields=()):
|
||||
return IMPL.get_workflow_execution(id, fields=fields)
|
||||
|
||||
|
||||
def load_workflow_execution(name):
|
||||
|
|
|
@ -293,8 +293,12 @@ def _get_db_object_by_name(model, name, filter_=None, order_by=None):
|
|||
return query.filter(final_filter).first()
|
||||
|
||||
|
||||
def _get_db_object_by_id(model, id, insecure=False):
|
||||
query = b.model_query(model) if insecure else _secure_query(model)
|
||||
def _get_db_object_by_id(model, id, insecure=False, columns=()):
|
||||
query = (
|
||||
b.model_query(model, columns=columns)
|
||||
if insecure
|
||||
else _secure_query(model, *columns)
|
||||
)
|
||||
|
||||
return query.filter_by(id=id).first()
|
||||
|
||||
|
@ -310,6 +314,7 @@ def _get_db_object_by_name_and_namespace_or_id(model, identifier,
|
|||
match_name = sa.and_(match_name, model.namespace == namespace)
|
||||
|
||||
match_id = model.id == identifier
|
||||
|
||||
query = query.filter(
|
||||
sa.or_(
|
||||
match_id,
|
||||
|
@ -747,13 +752,14 @@ def _get_action_executions(**kwargs):
|
|||
# Workflow executions.
|
||||
|
||||
@b.session_aware()
|
||||
def get_workflow_execution(id, session=None):
|
||||
def get_workflow_execution(id, fields=(), session=None):
|
||||
ctx = context.ctx()
|
||||
|
||||
wf_ex = _get_db_object_by_id(
|
||||
models.WorkflowExecution,
|
||||
id,
|
||||
insecure=ctx.is_admin
|
||||
insecure=ctx.is_admin,
|
||||
columns=fields
|
||||
)
|
||||
|
||||
if not wf_ex:
|
||||
|
@ -1116,9 +1122,12 @@ def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
|
|||
|
||||
def _get_completed_root_executions_query(columns):
|
||||
query = b.model_query(models.WorkflowExecution, columns=columns)
|
||||
|
||||
# Only WorkflowExecution that are not a child of other WorkflowExecution.
|
||||
query = query.filter(models.WorkflowExecution.
|
||||
task_execution_id == sa.null())
|
||||
query = query.filter(
|
||||
models.WorkflowExecution.task_execution_id == sa.null()
|
||||
)
|
||||
|
||||
query = query.filter(
|
||||
models.WorkflowExecution.state.in_(
|
||||
[states.SUCCESS,
|
||||
|
@ -1126,6 +1135,7 @@ def _get_completed_root_executions_query(columns):
|
|||
states.CANCELLED]
|
||||
)
|
||||
)
|
||||
|
||||
return query
|
||||
|
||||
|
||||
|
|
|
@ -372,7 +372,11 @@ class TestExecutionsController(base.APITest):
|
|||
resp = self.app.put_json('/v2/executions/123', update_params)
|
||||
|
||||
self.assertEqual(200, resp.status_int)
|
||||
mock_ensure.assert_called_once_with('123')
|
||||
|
||||
mock_ensure.assert_called_once_with(
|
||||
'123',
|
||||
fields=(models.WorkflowExecution.id,)
|
||||
)
|
||||
mock_update.assert_called_once_with('123', update_params)
|
||||
|
||||
@mock.patch.object(
|
||||
|
@ -704,7 +708,7 @@ class TestExecutionsController(base.APITest):
|
|||
@mock.patch.object(
|
||||
db_api,
|
||||
'get_workflow_execution',
|
||||
MOCK_WF_EX
|
||||
mock.MagicMock(return_value=(states.RUNNING,))
|
||||
)
|
||||
def test_delete_unfished_execution(self):
|
||||
resp = self.app.delete('/v2/executions/123', expect_errors=True)
|
||||
|
@ -718,7 +722,7 @@ class TestExecutionsController(base.APITest):
|
|||
|
||||
@mock.patch.object(db_api,
|
||||
'get_workflow_execution',
|
||||
MOCK_ERROR_WF_EX)
|
||||
mock.MagicMock(return_value=(states.ERROR,)))
|
||||
@mock.patch.object(db_api,
|
||||
'delete_workflow_execution',
|
||||
MOCK_DELETE)
|
||||
|
@ -729,7 +733,7 @@ class TestExecutionsController(base.APITest):
|
|||
|
||||
@mock.patch.object(db_api,
|
||||
'get_workflow_execution',
|
||||
MOCK_SUCCESS_WF_EX)
|
||||
mock.MagicMock(return_value=(states.SUCCESS,)))
|
||||
@mock.patch.object(db_api,
|
||||
'delete_workflow_execution',
|
||||
MOCK_DELETE)
|
||||
|
|
|
@ -1504,6 +1504,20 @@ class WorkflowExecutionTest(SQLAlchemyTest):
|
|||
db_api.load_workflow_execution("not-existing-id")
|
||||
)
|
||||
|
||||
def test_get_workflow_execution_with_columns(self):
|
||||
with db_api.transaction():
|
||||
created = db_api.create_workflow_execution(WF_EXECS[0])
|
||||
|
||||
fetched = db_api.get_workflow_execution(
|
||||
created.id,
|
||||
fields=(db_models.WorkflowExecution.state,)
|
||||
)
|
||||
|
||||
self.assertNotEqual(created, fetched)
|
||||
self.assertIsInstance(fetched, tuple)
|
||||
self.assertEqual(1, len(fetched))
|
||||
self.assertEqual(created.state, fetched.state)
|
||||
|
||||
def test_update_workflow_execution(self):
|
||||
with db_api.transaction():
|
||||
created = db_api.create_workflow_execution(WF_EXECS[0])
|
||||
|
|
Loading…
Reference in New Issue