Introducing 'workflow' as an individual entity

* Added Workflow DB model
* Removed 'workbook_name' parameter from Engine.start_workflow signature
* Fixed unit tests
* Fixed module trusts to create individual workbook entities when
 on workbook definition update
* Fixed API tests
* Switched v1 controllers to explicitly use DB API v1

Change-Id: I6a096e1c5d8082d3d1cdd47b575b85b28224d2a6
This commit is contained in:
Renat Akhmerov 2014-08-25 21:03:02 +07:00
parent 7c44b92b22
commit 2b64cae308
25 changed files with 288 additions and 97 deletions

View File

@ -22,7 +22,7 @@ import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.api.controllers.v1 import task
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral.openstack.common import log as logging
from mistral.utils import rest_utils

View File

@ -19,7 +19,7 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral.openstack.common import log as logging
from mistral.utils import rest_utils

View File

@ -22,7 +22,7 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral.openstack.common import log as logging
from mistral.utils import rest_utils

View File

@ -22,7 +22,7 @@ from mistral.api.controllers import resource
from mistral.api.controllers.v1 import execution
from mistral.api.controllers.v1 import listener
from mistral.api.controllers.v1 import workbook_definition
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral.openstack.common import log as logging
from mistral.services import workbooks
from mistral.utils import rest_utils
@ -87,7 +87,7 @@ class WorkbooksController(rest.RestController):
"""Create a new workbook."""
LOG.debug("Create workbook [workbook=%s]" % workbook)
db_model = workbooks.create_workbook(workbook.to_dict())
db_model = workbooks.create_workbook_v1(workbook.to_dict())
return Workbook.from_dict(db_model.to_dict())

View File

@ -16,9 +16,9 @@
import pecan
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral.openstack.common import log as logging
from mistral.services import scheduler
from mistral.services import workbooks
from mistral.utils import rest_utils
@ -33,7 +33,7 @@ class WorkbookDefinitionController(pecan.rest.RestController):
LOG.debug("Fetch workbook definition [workbook_name=%s]" %
workbook_name)
return db_api.workbook_definition_get(workbook_name)
return db_api.workbook_get(workbook_name).definition
@rest_utils.wrap_pecan_controller_exception
@pecan.expose(content_type="text/plain")
@ -44,8 +44,6 @@ class WorkbookDefinitionController(pecan.rest.RestController):
LOG.debug("Update workbook definition [workbook_name=%s, text=%s]" %
(workbook_name, text))
wb = db_api.workbook_definition_put(workbook_name, text)
wb = workbooks.update_workbook_v1(workbook_name, {'definition': text})
scheduler.create_associated_triggers(wb)
return wb['definition']
return wb.definition

View File

@ -75,6 +75,29 @@ def delete_workbook(name):
IMPL.delete_workbook(name)
# Workflows.
def get_workflow(name):
return IMPL.get_workflow(name)
def get_workflows():
return IMPL.get_workflows()
def create_workflow(values):
return IMPL.create_workflow(values)
def update_workflow(name, values):
return IMPL.update_workflow(name, values)
def delete_workflow(name):
IMPL.delete_workflow(name)
# Executions.
def get_execution(id):

View File

@ -147,6 +147,79 @@ def _get_workbook(name):
project_id=context.ctx().project_id).first()
# Workflows.
def get_workflow(name):
wf = _get_workflow(name)
if not wf:
raise exc.NotFoundException(
"Workflow not found [workflow_name=%s]" % name)
return wf
def get_workflows(**kwargs):
return _get_workflows(**kwargs)
@b.session_aware()
def create_workflow(values, session=None):
wf = models.Workflow()
wf.update(values.copy())
wf['project_id'] = context.ctx().project_id
try:
wf.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for workflow: %s"
% e.columns)
return wf
@b.session_aware()
def update_workflow(name, values, session=None):
wf = _get_workflow(name)
if not wf:
raise exc.NotFoundException(
"Workflow not found [workflow_name=%s]" % name)
wf.update(values.copy())
wf['project_id'] = context.ctx().project_id
return wf
@b.session_aware()
def delete_workflow(name, session=None):
wf = _get_workflow(name)
if not wf:
raise exc.NotFoundException(
"Workflow not found [workflow_name=%s]" % name)
session.delete(wf)
def _get_workflows(**kwargs):
query = b.model_query(models.Workflow)
proj = query.filter_by(project_id=context.ctx().project_id,
**kwargs)
public = query.filter_by(scope='public', **kwargs)
return proj.union(public).all()
def _get_workflow(name):
query = b.model_query(models.Workflow)
return query.filter_by(name=name,
project_id=context.ctx().project_id).first()
# Executions.
def get_execution(id):

View File

@ -34,7 +34,25 @@ class Workbook(mb.MistralModelBase):
name = sa.Column(sa.String(80), primary_key=True)
definition = sa.Column(sa.Text(), nullable=True)
spec = sa.Column(st.JsonDictType())
description = sa.Column(sa.String(200))
tags = sa.Column(st.JsonListType())
scope = sa.Column(sa.String(80))
project_id = sa.Column(sa.String(80))
trust_id = sa.Column(sa.String(80))
class Workflow(mb.MistralModelBase):
"""Contains info about workflow (including definition in Mistral DSL)."""
__tablename__ = 'workflows_v2'
__table_args__ = (
sa.UniqueConstraint('name'),
)
id = mb._id_column()
name = sa.Column(sa.String(80), primary_key=True)
definition = sa.Column(sa.Text(), nullable=True)
spec = sa.Column(st.JsonDictType())
tags = sa.Column(st.JsonListType())
scope = sa.Column(sa.String(80))
project_id = sa.Column(sa.String(80))

View File

@ -162,7 +162,10 @@ def add_openstack_data_to_context(context, db_workbook):
context = {}
if CONF.pecan.auth_enable:
workbook_ctx = trusts.create_context(db_workbook)
workbook_ctx = trusts.create_context(
db_workbook.trust_id, db_workbook.project_id
)
if workbook_ctx:
context.update({'openstack': workbook_ctx.to_dict()})

View File

@ -24,12 +24,11 @@ class Engine(object):
"""Engine interface."""
@abc.abstractmethod
def start_workflow(self, workbook_name, workflow_name, input, **params):
def start_workflow(self, workflow_name, workflow_input, **params):
"""Starts the specified workflow.
:param workbook_name: Workbook name.
:param workflow_name: Workflow name.
:param input: Workflow input data as a dictionary.
:param workflow_input: Workflow input data as a dictionary.
:param params: Additional workflow type specific parameters.
:return: Workflow execution object.
"""

View File

@ -42,17 +42,20 @@ class DefaultEngine(base.Engine):
self._engine_client = engine_client
self._executor_client = executor_client
def start_workflow(self, workbook_name, workflow_name, input, **params):
def start_workflow(self, workflow_name, workflow_input, **params):
db_api.start_tx()
try:
wb_db = db_api.get_workbook(workbook_name)
wf_db = db_api.get_workflow(workflow_name)
wb_spec = \
spec_parser.get_workbook_spec_from_yaml(wb_db.definition)
wf_spec = wb_spec.get_workflows()[workflow_name]
wf_spec = spec_parser.get_workflow_spec(wf_db.spec)
exec_db = self._create_db_execution(wb_db, wf_spec, input, params)
exec_db = self._create_db_execution(
wf_db,
wf_spec,
workflow_input,
params
)
wf_handler = wfh_factory.create_workflow_handler(exec_db, wf_spec)
@ -170,18 +173,18 @@ class DefaultEngine(base.Engine):
return new_db_tasks
def _create_db_execution(self, wb_db, wf_spec, input, params):
def _create_db_execution(self, wf_db, wf_spec, wf_input, params):
exec_db = db_api.create_execution({
'wf_spec': wf_spec.to_dict(),
'start_params': params or {},
'state': states.RUNNING,
'input': input or {},
'input': wf_input or {},
'output': {},
'context': copy.copy(input) or {},
'context': copy.copy(wf_input) or {},
'parent_task_id': params.get('parent_task_id')
})
data_flow.add_openstack_data_to_context(wb_db, exec_db.context)
data_flow.add_openstack_data_to_context(wf_db, exec_db.context)
data_flow.add_execution_to_context(exec_db, exec_db.context)
return exec_db
@ -223,15 +226,13 @@ class DefaultEngine(base.Engine):
)
def _run_workflow(self, task_db, task_spec):
wb_name = task_spec.get_workflow_namespace()
wf_name = task_spec.get_short_workflow_name()
wf_name = task_spec.get_workflow_name()
wf_input = task_db.parameters
start_params = copy.copy(task_spec.get_workflow_parameters())
start_params.update({'parent_task_id': task_db.id})
self._engine_client.start_workflow(
wb_name,
wf_name,
wf_input,
**start_params

View File

@ -90,8 +90,7 @@ class EngineServer(object):
def __init__(self, engine):
self._engine = engine
def start_workflow(self, rpc_ctx, workbook_name, workflow_name, input,
params):
def start_workflow(self, rpc_ctx, workflow_name, workflow_input, params):
"""Receives calls over RPC to start workflows on engine.
:param rpc_ctx: RPC request context.
@ -100,14 +99,13 @@ class EngineServer(object):
LOG.info(
"Received RPC request 'start_workflow'[rpc_ctx=%s,"
" workbook_name=%s, workflow_name=%s, input=%s, params=%s]"
% (rpc_ctx, workbook_name, workflow_name, input, params)
" workflow_name=%s, workflow_input=%s, params=%s]"
% (rpc_ctx, workflow_name, workflow_input, params)
)
return self._engine.start_workflow(
workbook_name,
workflow_name,
input,
workflow_input,
**params
)
@ -187,7 +185,7 @@ class EngineClient(base.Engine):
serializer=serializer
)
def start_workflow(self, workbook_name, workflow_name, input, **params):
def start_workflow(self, workflow_name, workflow_input, **params):
"""Starts workflow sending a request to engine over RPC.
:return: Workflow execution.
@ -196,9 +194,8 @@ class EngineClient(base.Engine):
return self._client.call(
auth_ctx.ctx(),
'start_workflow',
workbook_name=workbook_name,
workflow_name=workflow_name,
input=input,
workflow_input=workflow_input,
params=params
)

View File

@ -46,7 +46,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
wb = db_api.workbook_get(trigger['workbook_name'])
context.set_ctx(trusts.create_context(wb))
context.set_ctx(trusts.create_context(wb.trust_id, wb.project_id))
try:
task = spec_parser.get_workbook_spec_from_yaml(

View File

@ -20,14 +20,13 @@
from oslo.config import cfg
from mistral import context
from mistral.db import api as db_api
from mistral.utils.openstack import keystone
CONF = cfg.CONF
def create_trust(workbook):
def create_trust():
client = keystone.client()
ctx = context.ctx()
@ -35,7 +34,7 @@ def create_trust(workbook):
trustee_id = keystone.client_for_admin(
CONF.keystone_authtoken.admin_tenant_name).user_id
trust = client.trusts.create(
return client.trusts.create(
trustor_user=client.user_id,
trustee_user=trustee_id,
impersonation=True,
@ -43,27 +42,23 @@ def create_trust(workbook):
project=ctx.project_id
)
return db_api.workbook_update(
workbook.name,
{'trust_id': trust.id, 'project_id': ctx.project_id}
)
def create_context(workbook):
def create_context(trust_id, project_id):
"""Creates Mistral security context.
:param workbook: Workbook DB model.
:param trust_id: Trust Id.
:param project_id: Project Id.
:return: Mistral security context.
"""
if not workbook.trust_id:
if not trust_id:
return
if CONF.pecan.auth_enable:
client = keystone.client_for_trusts(workbook.trust_id)
client = keystone.client_for_trusts(trust_id)
return context.MistralContext(
user_id=client.user_id,
project_id=workbook.project_id,
project_id=project_id,
auth_token=client.auth_token
)
else:

View File

@ -16,17 +16,94 @@
from oslo.config import cfg
from mistral.db import api as db_api
from mistral import context
from mistral.db.v1 import api as db_api_v1
from mistral.db.v2 import api as db_api_v2
from mistral.services import scheduler
from mistral.services import trusts
from mistral.workbook import parser as spec_parser
def create_workbook(values):
workbook = db_api.workbook_create(values)
def create_workbook_v1(values):
_add_security_info(values)
return db_api_v1.workbook_create(values)
def update_workbook_v1(workbook_name, values):
wb_db = db_api_v1.workbook_update(workbook_name, values)
if 'definition' in values:
scheduler.create_associated_triggers(wb_db)
return wb_db
def create_workbook_v2(values):
_add_security_info(values)
db_api_v2.start_tx()
try:
wb_db = db_api_v2.create_workbook(values)
_check_workbook_definition_update(wb_db, values)
db_api_v2.commit_tx()
finally:
db_api_v2.end_tx()
return wb_db
def update_workbook_v2(workbook_name, values):
db_api_v2.start_tx()
try:
wb_db = db_api_v1.workbook_update(workbook_name, values)
_check_workbook_definition_update(wb_db, values)
db_api_v2.commit_tx()
finally:
db_api_v2.end_tx()
return wb_db
def _check_workbook_definition_update(wb_db, values):
if 'definition' not in values:
return
wb_spec = spec_parser.get_workbook_spec_from_yaml(values['definition'])
_create_actions(wb_db, wb_spec.get_actions())
_create_workflows(wb_db, wb_spec.get_workflows())
def _create_actions(wb_db, actions_spec):
if actions_spec:
# TODO(rakhmerov): Complete when action DB model is added.
pass
def _create_workflows(wb_db, workflows_spec):
if workflows_spec:
for wf_spec in workflows_spec:
db_api_v2.create_workflow(
{
'name': '%s.%s' % (wb_db.name, wf_spec.get_name()),
'spec': wf_spec.to_dict(),
'scope': wb_db.scope,
'trust_id': wb_db.trust_id,
'project_id': wb_db.project_id
}
)
def _add_security_info(values):
if cfg.CONF.pecan.auth_enable:
workbook = trusts.create_trust(workbook)
# TODO(akuznetsov) filter fields
# TODO(akuznetsov) create triggers
return workbook
values.update({
'trust_id': trusts.create_trust().id,
'project_id': context.ctx().project_id
})

View File

@ -21,7 +21,7 @@ from oslo.config import cfg
import pecan
import pecan.testing
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral.db.v1.sqlalchemy import models
from mistral.openstack.common import timeutils
from mistral.tests.api import base

View File

@ -18,7 +18,7 @@ import json
import mock
from webtest import app as webtest_app
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral import engine
from mistral import exceptions as ex
from mistral.tests.api import base

View File

@ -16,7 +16,7 @@
import mock
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral import exceptions
from mistral.tests.api import base

View File

@ -17,7 +17,7 @@
import json
import mock
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral import engine
from mistral.tests.api import base

View File

@ -16,7 +16,7 @@
import mock
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral import exceptions
from mistral.tests.api import base
@ -47,8 +47,8 @@ Triggers:
class TestWorkbookDefinitionController(base.FunctionalTest):
@mock.patch.object(db_api, "workbook_definition_get",
mock.MagicMock(return_value=DEFINITION))
@mock.patch.object(db_api, "workbook_get",
base.create_mock_workbook({'definition': DEFINITION}))
def test_get(self):
resp = self.app.get('/v1/workbooks/my_workbook/definition',
headers={"Content-Type": "text/plain"})
@ -66,11 +66,10 @@ class TestWorkbookDefinitionController(base.FunctionalTest):
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "workbook_definition_put",
mock.MagicMock(return_value={
@mock.patch.object(db_api, "workbook_update",
base.create_mock_workbook({
'name': 'my_workbook',
'definition': NEW_DEFINITION
}))
'definition': NEW_DEFINITION}))
def test_put(self):
resp = self.app.put('/v1/workbooks/my_workbook/definition',
NEW_DEFINITION,
@ -86,7 +85,7 @@ class TestWorkbookDefinitionController(base.FunctionalTest):
self.assertEqual(triggers[0]['pattern'], '* * * * *')
self.assertEqual(triggers[0]['workbook_name'], 'my_workbook')
@mock.patch.object(db_api, "workbook_definition_put",
@mock.patch.object(db_api, "workbook_update",
mock.MagicMock(
side_effect=exceptions.NotFoundException()))
def test_put_not_found(self):

View File

@ -16,7 +16,7 @@
import mock
from mistral.db import api as db_api
from mistral.db.v1 import api as db_api
from mistral import exceptions
from mistral.tests.api import base
@ -79,8 +79,8 @@ class TestWorkbooksController(base.FunctionalTest):
@mock.patch.object(db_api, "workbook_create",
mock.MagicMock(side_effect=exceptions.DBDuplicateEntry))
@mock.patch("mistral.services.trusts.create_trust",
mock.MagicMock(return_value=WORKBOOKS[0]))
@mock.patch("mistral.services.workbooks._add_security_info",
mock.MagicMock(return_value=None))
def test_post_dup(self):
resp = self.app.post_json('/v1/workbooks', WORKBOOKS[0],
expect_errors=True)

View File

@ -28,7 +28,6 @@ WORKBOOKS = [
{
'id': '1',
'name': 'my_workbook1',
'description': 'my description',
'definition': 'empty',
'tags': ['mc'],
'scope': 'public',
@ -160,7 +159,6 @@ class WorkbookTest(test_base.DbTestCase):
self.assertIn('Workbook ', s)
self.assertIn("'name': 'my_workbook1'", s)
self.assertIn("'description': 'my description'", s)
EXECUTIONS = [
{

View File

@ -14,18 +14,24 @@
import copy
import mock
from oslo.config import cfg
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine1 import default_engine as d_eng
from mistral.openstack.common import log as logging
from mistral.services import workbooks as wb_service
from mistral.tests import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import states
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK = """
---
Version: '2.0'
@ -51,13 +57,9 @@ class DefaultEngineTest(base.DbTestCase):
def setUp(self):
super(DefaultEngineTest, self).setUp()
wb_spec = spec_parser.get_workbook_spec_from_yaml(WORKBOOK)
db_api.create_workbook({
wb_service.create_workbook_v2({
'name': 'my_wb',
'description': 'Simple workbook for testing engine.',
'definition': WORKBOOK,
'spec': wb_spec.to_dict(),
'tags': ['test']
})
@ -73,7 +75,10 @@ class DefaultEngineTest(base.DbTestCase):
# Start workflow.
exec_db = self.engine.start_workflow(
'my_wb', 'wf1', wf_input, task_name='task2')
'my_wb.wf1',
wf_input,
task_name='task2'
)
self.assertIsNotNone(exec_db)
self.assertEqual(states.RUNNING, exec_db.state)
@ -105,8 +110,7 @@ class DefaultEngineTest(base.DbTestCase):
# Start workflow.
exec_db = self.engine.start_workflow(
'my_wb',
'wf1',
'my_wb.wf1',
wf_input,
task_name='task2'
)

View File

@ -12,14 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from mistral.db.v2 import api as db_api
from mistral.openstack.common import log as logging
from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine1 import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import states
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK = """
---
Version: '2.0'
@ -65,18 +72,14 @@ class SubworkflowsTest(base.EngineTestCase):
def setUp(self):
super(SubworkflowsTest, self).setUp()
wb_spec = spec_parser.get_workbook_spec_from_yaml(WORKBOOK)
db_api.create_workbook({
wb_service.create_workbook_v2({
'name': 'my_wb',
'description': 'Simple workbook for testing engine.',
'definition': WORKBOOK,
'spec': wb_spec.to_dict(),
'tags': ['test']
})
def test_subworkflow(self):
exec1_db = self.engine.start_workflow('my_wb', 'wf2', None)
exec1_db = self.engine.start_workflow('my_wb.wf2', None)
# Execution 1.
self.assertIsNotNone(exec1_db)

View File

@ -119,12 +119,15 @@ def evaluate_outbound_context(task_db):
)
def add_openstack_data_to_context(workbook_db, context):
def add_openstack_data_to_context(workflow_db, context):
if context is None:
context = {}
if CONF.pecan.auth_enable:
workbook_ctx = trusts.create_context(workbook_db)
workbook_ctx = trusts.create_context(
workflow_db.trust_id,
workflow_db.project_id
)
if workbook_ctx:
context.update({'openstack': workbook_ctx.to_dict()})