Make rpc_backend not engine specific

The rpc_backend with kombu and oslo are being used by the executor
and event engine as well. This patch move the rpc_backend up one
level so it's not engine specific. Also Event engine has its own module
and the EventEngine class is defined in the engine module. This patch
moves the EventEngine to it's own base file in the event_engine module.

Implements: blueprint mistral-actions-run-by-engine

Change-Id: Ie814a26e05f5ca6bfba10f20a7d5921836aa7602
This commit is contained in:
Winson Chan 2017-04-14 00:10:52 +00:00
parent 62d8c5edaf
commit eb09a90749
46 changed files with 253 additions and 197 deletions

View File

@ -22,7 +22,7 @@ from mistral.api import access_control
from mistral import config as m_config
from mistral import context as ctx
from mistral.db.v2 import api as db_api_v2
from mistral.engine.rpc_backend import rpc
from mistral.rpc import base as rpc
from mistral.service import coordination
from mistral.services import periodic

View File

@ -24,8 +24,8 @@ from mistral.api.controllers.v2 import resources
from mistral.api.controllers.v2 import types
from mistral import context
from mistral.db.v2 import api as db_api
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.rpc import clients as rpc
from mistral.utils import filter_utils
from mistral.utils import rest_utils
from mistral.workflow import states

View File

@ -26,8 +26,8 @@ from mistral.api.controllers.v2 import task
from mistral.api.controllers.v2 import types
from mistral import context
from mistral.db.v2 import api as db_api
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.rpc import clients as rpc
from mistral.services import workflows as wf_service
from mistral.utils import filter_utils
from mistral.utils import rest_utils

View File

@ -26,9 +26,9 @@ from mistral.api.controllers.v2 import resources
from mistral.api.controllers.v2 import types
from mistral import context
from mistral.db.v2 import api as db_api
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from mistral.rpc import clients as rpc
from mistral.utils import filter_utils
from mistral.utils import rest_utils
from mistral.workflow import data_flow

View File

@ -44,9 +44,9 @@ from oslo_service import service
from mistral.api import service as api_service
from mistral import config
from mistral.engine import engine_server
from mistral.engine.rpc_backend import rpc
from mistral.event_engine import event_engine_server
from mistral.executors import executor_server
from mistral.rpc import base as rpc
from mistral import version

View File

@ -1,4 +1,5 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -128,19 +129,6 @@ class Engine(object):
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class EventEngine(object):
"""Action event trigger interface."""
@abc.abstractmethod
def create_event_trigger(self, trigger, events):
raise NotImplementedError()
@abc.abstractmethod
def delete_event_trigger(self, trigger, events):
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class TaskPolicy(object):
"""Task policy.

View File

@ -17,7 +17,7 @@ from oslo_log import log as logging
from mistral import config as cfg
from mistral.db.v2 import api as db_api
from mistral.engine import default_engine
from mistral.engine.rpc_backend import rpc
from mistral.rpc import base as rpc
from mistral.service import base as service_base
from mistral.services import expiration_policy
from mistral.services import scheduler

View File

@ -22,10 +22,10 @@ import six
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import dispatcher
from mistral.engine.rpc_backend import rpc
from mistral.engine import utils as engine_utils
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from mistral.rpc import clients as rpc
from mistral.services import scheduler
from mistral.services import workflows as wf_service
from mistral import utils

View File

@ -0,0 +1,34 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class EventEngine(object):
"""Action event trigger interface."""
@abc.abstractmethod
def create_event_trigger(self, trigger, events):
raise NotImplementedError()
@abc.abstractmethod
def update_event_trigger(self, trigger):
raise NotImplementedError()
@abc.abstractmethod
def delete_event_trigger(self, trigger, events):
raise NotImplementedError()

View File

@ -1,4 +1,5 @@
# Copyright 2016 Catalyst IT Ltd
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -25,10 +26,11 @@ import yaml
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.engine.rpc_backend import rpc
from mistral.event_engine import base
from mistral import exceptions
from mistral import expressions
from mistral import messaging as mistral_messaging
from mistral.rpc import clients as rpc
from mistral.services import security
@ -126,7 +128,7 @@ class NotificationsConverter(object):
return edef.convert(event)
class EventEngine(object):
class DefaultEventEngine(base.EventEngine):
"""Event engine server.
A separate service that is responsible for listening event notification

View File

@ -1,4 +1,5 @@
# Copyright 2016 - Nokia Networks
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,8 +16,8 @@
from oslo_log import log as logging
from mistral import config as cfg
from mistral.engine.rpc_backend import rpc
from mistral.event_engine import event_engine
from mistral.event_engine import default_event_engine as evt_eng
from mistral.rpc import base as rpc
from mistral.service import base as service_base
from mistral.utils import profiler as profiler_utils
@ -88,4 +89,4 @@ class EventEngineServer(service_base.MistralService):
def get_oslo_service():
return EventEngineServer(event_engine.EventEngine())
return EventEngineServer(evt_eng.DefaultEventEngine())

View File

@ -17,9 +17,9 @@ from oslo_log import log as logging
from osprofiler import profiler
from mistral.actions import action_factory as a_f
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.executors import base
from mistral.rpc import clients as rpc
from mistral.utils import inspect_utils as i_u
from mistral.workflow import utils as wf_utils

View File

@ -15,8 +15,8 @@
from oslo_log import log as logging
from mistral import config as cfg
from mistral.engine.rpc_backend import rpc
from mistral.executors import default_executor as exe
from mistral.rpc import base as rpc
from mistral.service import base as service_base
from mistral import utils
from mistral.utils import profiler as profiler_utils

View File

@ -15,15 +15,16 @@
from oslo_config import cfg
from oslo_log import log as logging
from mistral.engine.rpc_backend import rpc
from mistral.rpc import base as rpc_base
from mistral.rpc import clients as rpc_clients
LOG = logging.getLogger(__name__)
class RemoteExecutor(rpc.ExecutorClient):
class RemoteExecutor(rpc_clients.ExecutorClient):
"""Executor that passes execution request to a remote executor."""
def __init__(self):
self.topic = cfg.CONF.executor.topic
self._client = rpc.get_rpc_client_driver()(cfg.CONF.executor)
self._client = rpc_base.get_rpc_client_driver()(cfg.CONF.executor)

View File

@ -1,4 +1,5 @@
# Copyright 2015 - Mirantis, Inc.
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,6 +15,101 @@
import abc
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import client
from stevedore import driver
from mistral import exceptions as exc
LOG = logging.getLogger(__name__)
_IMPL_CLIENT = None
_IMPL_SERVER = None
_TRANSPORT = None
def cleanup():
"""Intended to be used by tests to recreate all RPC related objects."""
global _TRANSPORT
_TRANSPORT = None
# TODO(rakhmerov): This method seems misplaced. Now we have different kind
# of transports (oslo, kombu) and this module should not have any oslo
# specific things anymore.
def get_transport():
global _TRANSPORT
if not _TRANSPORT:
_TRANSPORT = messaging.get_transport(cfg.CONF)
return _TRANSPORT
def get_rpc_server_driver():
rpc_impl = cfg.CONF.rpc_implementation
global _IMPL_SERVER
if not _IMPL_SERVER:
_IMPL_SERVER = driver.DriverManager(
'mistral.rpc.backends',
'%s_server' % rpc_impl
).driver
return _IMPL_SERVER
def get_rpc_client_driver():
rpc_impl = cfg.CONF.rpc_implementation
global _IMPL_CLIENT
if not _IMPL_CLIENT:
_IMPL_CLIENT = driver.DriverManager(
'mistral.rpc.backends',
'%s_client' % rpc_impl
).driver
return _IMPL_CLIENT
def _wrap_exception_and_reraise(exception):
message = "%s: %s" % (exception.__class__.__name__, exception.args[0])
raise exc.MistralException(message)
def wrap_messaging_exception(method):
"""This decorator unwrap remote error in one of MistralException.
oslo.messaging has different behavior on raising exceptions
when fake or rabbit transports are used. In case of rabbit
transport it raises wrapped RemoteError which forwards directly
to API. Wrapped RemoteError contains one of MistralException raised
remotely on Engine and for correct exception interpretation we
need to unwrap and raise given exception and manually send it to
API layer.
"""
def decorator(*args, **kwargs):
try:
return method(*args, **kwargs)
except exc.MistralException:
raise
except (client.RemoteError, exc.KombuException, Exception) as e:
if hasattr(e, 'exc_type') and hasattr(exc, e.exc_type):
exc_cls = getattr(exc, e.exc_type)
raise exc_cls(e.value)
_wrap_exception_and_reraise(e)
return decorator
class RPCClient(object):
def __init__(self, conf):

View File

@ -16,24 +16,18 @@
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import client
from osprofiler import profiler
from stevedore import driver
from mistral import context as auth_ctx
from mistral.engine import base as eng
from mistral import exceptions as exc
from mistral.event_engine import base as evt_eng
from mistral.executors import base as exe
from mistral.rpc import base
LOG = logging.getLogger(__name__)
_IMPL_CLIENT = None
_IMPL_SERVER = None
_TRANSPORT = None
_ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
_EVENT_ENGINE_CLIENT = None
@ -42,29 +36,15 @@ _EVENT_ENGINE_CLIENT = None
def cleanup():
"""Intended to be used by tests to recreate all RPC related objects."""
global _TRANSPORT
global _ENGINE_CLIENT
global _EXECUTOR_CLIENT
global _EVENT_ENGINE_CLIENT
_TRANSPORT = None
_ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
_EVENT_ENGINE_CLIENT = None
# TODO(rakhmerov): This method seems misplaced. Now we have different kind
# of transports (oslo, kombu) and this module should not have any oslo
# specific things anymore.
def get_transport():
global _TRANSPORT
if not _TRANSPORT:
_TRANSPORT = messaging.get_transport(cfg.CONF)
return _TRANSPORT
def get_engine_client():
global _ENGINE_CLIENT
@ -92,65 +72,6 @@ def get_event_engine_client():
return _EVENT_ENGINE_CLIENT
def get_rpc_server_driver():
rpc_impl = cfg.CONF.rpc_implementation
global _IMPL_SERVER
if not _IMPL_SERVER:
_IMPL_SERVER = driver.DriverManager(
'mistral.engine.rpc_backend',
'%s_server' % rpc_impl
).driver
return _IMPL_SERVER
def get_rpc_client_driver():
rpc_impl = cfg.CONF.rpc_implementation
global _IMPL_CLIENT
if not _IMPL_CLIENT:
_IMPL_CLIENT = driver.DriverManager(
'mistral.engine.rpc_backend',
'%s_client' % rpc_impl
).driver
return _IMPL_CLIENT
def _wrap_exception_and_reraise(exception):
message = "%s: %s" % (exception.__class__.__name__, exception.args[0])
raise exc.MistralException(message)
def wrap_messaging_exception(method):
"""This decorator unwrap remote error in one of MistralException.
oslo.messaging has different behavior on raising exceptions
when fake or rabbit transports are used. In case of rabbit
transport it raises wrapped RemoteError which forwards directly
to API. Wrapped RemoteError contains one of MistralException raised
remotely on Engine and for correct exception interpretation we
need to unwrap and raise given exception and manually send it to
API layer.
"""
def decorator(*args, **kwargs):
try:
return method(*args, **kwargs)
except exc.MistralException:
raise
except (client.RemoteError, exc.KombuException, Exception) as e:
if hasattr(e, 'exc_type') and hasattr(exc, e.exc_type):
exc_cls = getattr(exc, e.exc_type)
raise exc_cls(e.value)
_wrap_exception_and_reraise(e)
return decorator
class EngineClient(eng.Engine):
"""RPC Engine client."""
@ -159,9 +80,9 @@ class EngineClient(eng.Engine):
:param rpc_conf_dict: Dict containing RPC configuration.
"""
self._client = get_rpc_client_driver()(rpc_conf_dict)
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
@wrap_messaging_exception
@base.wrap_messaging_exception
def start_workflow(self, wf_identifier, wf_input, description='',
**params):
"""Starts workflow sending a request to engine over RPC.
@ -177,7 +98,7 @@ class EngineClient(eng.Engine):
params=params
)
@wrap_messaging_exception
@base.wrap_messaging_exception
def start_action(self, action_name, action_input,
description=None, **params):
"""Starts action sending a request to engine over RPC.
@ -193,7 +114,7 @@ class EngineClient(eng.Engine):
params=params
)
@wrap_messaging_exception
@base.wrap_messaging_exception
@profiler.trace('engine-client-on-action-complete', hide_args=True)
def on_action_complete(self, action_ex_id, result, wf_action=False,
async_=False):
@ -229,7 +150,7 @@ class EngineClient(eng.Engine):
wf_action=wf_action
)
@wrap_messaging_exception
@base.wrap_messaging_exception
def pause_workflow(self, wf_ex_id):
"""Stops the workflow with the given execution id.
@ -243,7 +164,7 @@ class EngineClient(eng.Engine):
execution_id=wf_ex_id
)
@wrap_messaging_exception
@base.wrap_messaging_exception
def rerun_workflow(self, task_ex_id, reset=True, env=None):
"""Rerun the workflow.
@ -265,7 +186,7 @@ class EngineClient(eng.Engine):
env=env
)
@wrap_messaging_exception
@base.wrap_messaging_exception
def resume_workflow(self, wf_ex_id, env=None):
"""Resumes the workflow with the given execution id.
@ -281,7 +202,7 @@ class EngineClient(eng.Engine):
env=env
)
@wrap_messaging_exception
@base.wrap_messaging_exception
def stop_workflow(self, wf_ex_id, state, message=None):
"""Stops workflow execution with given status.
@ -303,7 +224,7 @@ class EngineClient(eng.Engine):
message=message
)
@wrap_messaging_exception
@base.wrap_messaging_exception
def rollback_workflow(self, wf_ex_id):
"""Rolls back the workflow with the given execution id.
@ -326,7 +247,7 @@ class ExecutorClient(exe.Executor):
"""Constructs an RPC client for the Executor."""
self.topic = cfg.CONF.executor.topic
self._client = get_rpc_client_driver()(rpc_conf_dict)
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
@profiler.trace('executor-client-run-action')
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
@ -362,12 +283,12 @@ class ExecutorClient(exe.Executor):
return rpc_client_method(auth_ctx.ctx(), 'run_action', **rpc_kwargs)
class EventEngineClient(eng.EventEngine):
class EventEngineClient(evt_eng.EventEngine):
"""RPC EventEngine client."""
def __init__(self, rpc_conf_dict):
"""Constructs an RPC client for the EventEngine service."""
self._client = get_rpc_client_driver()(rpc_conf_dict)
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
def create_event_trigger(self, trigger, events):
return self._client.sync_call(

View File

@ -14,7 +14,7 @@
import sys
from mistral.engine.rpc_backend.kombu import kombu_client
from mistral.rpc.kombu import kombu_client
# Example of using Kombu based RPC client.

View File

@ -14,7 +14,7 @@
import sys
from mistral.engine.rpc_backend.kombu import kombu_server
from mistral.rpc.kombu import kombu_server
# Simple example of endpoint of RPC server, which just

View File

@ -19,11 +19,11 @@ from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from mistral.engine.rpc_backend import base as rpc_base
from mistral.engine.rpc_backend.kombu import base as kombu_base
from mistral.engine.rpc_backend.kombu import kombu_hosts
from mistral.engine.rpc_backend.kombu import kombu_listener
from mistral import exceptions as exc
from mistral.rpc import base as rpc_base
from mistral.rpc.kombu import base as kombu_base
from mistral.rpc.kombu import kombu_hosts
from mistral.rpc.kombu import kombu_listener
from mistral import utils

View File

@ -20,7 +20,7 @@ import threading
from oslo_log import log as logging
from mistral.engine.rpc_backend.kombu import base as kombu_base
from mistral.rpc.kombu import base as kombu_base
LOG = logging.getLogger(__name__)

View File

@ -24,10 +24,10 @@ import oslo_messaging as messaging
from stevedore import driver
from mistral import context as auth_ctx
from mistral.engine.rpc_backend import base as rpc_base
from mistral.engine.rpc_backend.kombu import base as kombu_base
from mistral.engine.rpc_backend.kombu import kombu_hosts
from mistral import exceptions as exc
from mistral.rpc import base as rpc_base
from mistral.rpc.kombu import base as kombu_base
from mistral.rpc.kombu import kombu_hosts
LOG = logging.getLogger(__name__)

View File

@ -15,11 +15,10 @@
import oslo_messaging as messaging
from mistral import context as auth_ctx
from mistral.engine.rpc_backend import base as rpc_base
from mistral.engine.rpc_backend import rpc
from mistral.rpc import base as rpc
class OsloRPCClient(rpc_base.RPCClient):
class OsloRPCClient(rpc.RPCClient):
def __init__(self, conf):
super(OsloRPCClient, self).__init__(conf)
self.topic = conf.topic

View File

@ -16,14 +16,13 @@ from oslo_log import log as logging
import oslo_messaging as messaging
from mistral import context as ctx
from mistral.engine.rpc_backend import base as rpc_base
from mistral.engine.rpc_backend import rpc
from mistral.rpc import base as rpc
LOG = logging.getLogger(__name__)
class OsloRPCServer(rpc_base.RPCServer):
class OsloRPCServer(rpc.RPCServer):
def __init__(self, conf):
super(OsloRPCServer, self).__init__(conf)

View File

@ -19,8 +19,8 @@ from oslo_service import threadgroup
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api_v2
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.rpc import clients as rpc
from mistral.services import security
from mistral.services import triggers

View File

@ -18,10 +18,10 @@ import six
import time
from mistral.db.v2 import api as db_api
from mistral.engine.rpc_backend import rpc
from mistral.engine import utils as eng_utils
from mistral import exceptions as exc
from mistral.lang import parser
from mistral.rpc import clients as rpc
from mistral.services import security

View File

@ -25,8 +25,9 @@ import oslo_messaging
from mistral.api.controllers.v2 import action_execution
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.rpc import base as rpc_base
from mistral.rpc import clients as rpc_clients
from mistral.tests.unit.api import base
from mistral.utils import rest_utils
from mistral.workflow import states
@ -194,7 +195,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DELETE = mock.MagicMock(return_value=None)
@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock())
@mock.patch.object(rpc_base, '_IMPL_CLIENT', mock.Mock())
class TestActionExecutionsController(base.APITest):
def setUp(self):
super(TestActionExecutionsController, self).setUp()
@ -223,7 +224,7 @@ class TestActionExecutionsController(base.APITest):
self.assertEqual(404, resp.status_int)
@mock.patch.object(rpc.EngineClient, 'start_action')
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
def test_post(self, f):
f.return_value = ACTION_EX_DB.to_dict()
@ -251,7 +252,7 @@ class TestActionExecutionsController(base.APITest):
run_sync=True
)
@mock.patch.object(rpc.EngineClient, 'start_action')
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
def test_post_json(self, f):
f.return_value = ACTION_EX_DB.to_dict()
@ -278,7 +279,7 @@ class TestActionExecutionsController(base.APITest):
save_result=True
)
@mock.patch.object(rpc.EngineClient, 'start_action')
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
def test_post_without_input(self, f):
f.return_value = ACTION_EX_DB.to_dict()
f.return_value['output'] = {'result': '123'}
@ -320,7 +321,7 @@ class TestActionExecutionsController(base.APITest):
self.assertEqual(400, resp.status_int)
@mock.patch.object(rpc.EngineClient, 'on_action_complete')
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
def test_put(self, f):
f.return_value = UPDATED_ACTION_EX_DB
@ -334,7 +335,7 @@ class TestActionExecutionsController(base.APITest):
wf_utils.Result(data=ACTION_EX_DB.output)
)
@mock.patch.object(rpc.EngineClient, 'on_action_complete')
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
def test_put_error_with_output(self, f):
f.return_value = ERROR_ACTION_EX_WITH_OUTPUT
@ -351,7 +352,7 @@ class TestActionExecutionsController(base.APITest):
wf_utils.Result(error=ERROR_ACTION_RES_WITH_OUTPUT)
)
@mock.patch.object(rpc.EngineClient, 'on_action_complete')
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
def test_put_error_with_unknown_reason(self, f):
f.return_value = ERROR_ACTION_EX_FOR_EMPTY_OUTPUT
resp = self.app.put_json('/v2/action_executions/123', ERROR_ACTION)
@ -364,7 +365,7 @@ class TestActionExecutionsController(base.APITest):
wf_utils.Result(error=DEFAULT_ERROR_OUTPUT)
)
@mock.patch.object(rpc.EngineClient, 'on_action_complete')
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
def test_put_error_with_unknown_reason_output_none(self, f):
f.return_value = ERROR_ACTION_EX_FOR_EMPTY_OUTPUT
resp = self.app.put_json(
@ -380,7 +381,7 @@ class TestActionExecutionsController(base.APITest):
wf_utils.Result(error=DEFAULT_ERROR_OUTPUT)
)
@mock.patch.object(rpc.EngineClient, 'on_action_complete')
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
def test_put_cancelled(self, on_action_complete_mock_func):
on_action_complete_mock_func.return_value = CANCELLED_ACTION_EX_DB
@ -395,7 +396,7 @@ class TestActionExecutionsController(base.APITest):
)
@mock.patch.object(
rpc.EngineClient,
rpc_clients.EngineClient,
'on_action_complete',
MOCK_NOT_FOUND
)
@ -430,7 +431,7 @@ class TestActionExecutionsController(base.APITest):
self.assertEqual(400, resp.status_int)
@mock.patch.object(rpc.EngineClient, 'on_action_complete')
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
def test_put_without_result(self, f):
action_ex = copy.deepcopy(UPDATED_ACTION)
del action_ex['output']

View File

@ -85,7 +85,7 @@ class TestEventTriggerController(base.APITest):
@mock.patch.object(db_api, "get_workflow_definition", MOCK_WF)
@mock.patch.object(db_api, "create_event_trigger", MOCK_TRIGGER)
@mock.patch.object(db_api, "get_event_triggers", MOCK_TRIGGERS)
@mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client')
@mock.patch('mistral.rpc.clients.get_event_engine_client')
def test_post(self, mock_rpc_client):
client = mock.Mock()
mock_rpc_client.return_value = client
@ -134,7 +134,7 @@ class TestEventTriggerController(base.APITest):
self.assertEqual(404, resp.status_int)
@mock.patch.object(db_api, 'ensure_event_trigger_exists', MOCK_NONE)
@mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client')
@mock.patch('mistral.rpc.clients.get_event_engine_client')
@mock.patch('mistral.db.v2.api.update_event_trigger')
def test_put(self, mock_update, mock_rpc_client):
client = mock.Mock()
@ -167,7 +167,7 @@ class TestEventTriggerController(base.APITest):
self.assertEqual(400, resp.status_int)
@mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client')
@mock.patch('mistral.rpc.clients.get_event_engine_client')
@mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER)
@mock.patch.object(db_api, "get_event_triggers",
mock.MagicMock(return_value=[]))

View File

@ -29,8 +29,9 @@ from mistral.api.controllers.v2 import execution
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import api as sql_db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.rpc import base as rpc_base
from mistral.rpc import clients as rpc_clients
from mistral.tests.unit.api import base
from mistral.tests.unit import base as unit_base
from mistral import utils
@ -131,7 +132,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException())
@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock())
@mock.patch.object(rpc_base, '_IMPL_CLIENT', mock.Mock())
class TestExecutionsController(base.APITest):
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
def test_get(self):
@ -159,7 +160,7 @@ class TestExecutionsController(base.APITest):
mock.MagicMock(return_value=None)
)
@mock.patch.object(
rpc.EngineClient,
rpc_clients.EngineClient,
'pause_workflow',
MOCK_UPDATED_WF_EX
)
@ -182,7 +183,7 @@ class TestExecutionsController(base.APITest):
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
@mock.patch.object(rpc.EngineClient, 'stop_workflow')
@mock.patch.object(rpc_clients.EngineClient, 'stop_workflow')
def test_put_state_error(self, mock_stop_wf):
update_exec = {
'id': WF_EX['id'],
@ -210,7 +211,7 @@ class TestExecutionsController(base.APITest):
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
@mock.patch.object(rpc.EngineClient, 'stop_workflow')
@mock.patch.object(rpc_clients.EngineClient, 'stop_workflow')
def test_put_state_cancelled(self, mock_stop_wf):
update_exec = {
'id': WF_EX['id'],
@ -243,7 +244,7 @@ class TestExecutionsController(base.APITest):
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
@mock.patch.object(rpc.EngineClient, 'resume_workflow')
@mock.patch.object(rpc_clients.EngineClient, 'resume_workflow')
def test_put_state_resume(self, mock_resume_wf):
update_exec = {
'id': WF_EX['id'],
@ -297,7 +298,7 @@ class TestExecutionsController(base.APITest):
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
@mock.patch.object(rpc.EngineClient, 'stop_workflow')
@mock.patch.object(rpc_clients.EngineClient, 'stop_workflow')
def test_put_state_info_unset(self, mock_stop_wf):
update_exec = {
'id': WF_EX['id'],
@ -454,7 +455,7 @@ class TestExecutionsController(base.APITest):
self.assertIn(expected_fault, resp.json['faultstring'])
@mock.patch.object(rpc.EngineClient, 'start_workflow')
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
def test_post(self, f):
f.return_value = WF_EX.to_dict()
@ -472,7 +473,11 @@ class TestExecutionsController(base.APITest):
**json.loads(exec_dict['params'])
)
@mock.patch.object(rpc.EngineClient, 'start_workflow', MOCK_ACTION_EXC)
@mock.patch.object(
rpc_clients.EngineClient,
'start_workflow',
MOCK_ACTION_EXC
)
def test_post_throws_exception(self):
context = self.assertRaises(
webtest_app.AppError,

View File

@ -20,8 +20,8 @@ import mock
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.rpc import clients as rpc
from mistral.tests.unit.api import base
from mistral.workflow import data_flow
from mistral.workflow import states

View File

@ -22,9 +22,10 @@ from oslo_service import service
from mistral.db.v2 import api as db_api
from mistral.engine import engine_server
from mistral.engine.rpc_backend import rpc
from mistral.executors import base as exe
from mistral.executors import executor_server
from mistral.rpc import base as rpc_base
from mistral.rpc import clients as rpc_clients
from mistral.tests.unit import base
from mistral.workflow import states
@ -57,7 +58,8 @@ class EngineTestCase(base.DbTestCase):
cfg.CONF.set_default('rpc_backend', 'fake')
# Drop all RPC objects (transport, clients).
rpc.cleanup()
rpc_base.cleanup()
rpc_clients.cleanup()
exe.cleanup()
self.threads = []
@ -65,7 +67,7 @@ class EngineTestCase(base.DbTestCase):
# Start remote executor.
if cfg.CONF.executor.type == 'remote':
LOG.info("Starting remote executor threads...")
self.executor_client = rpc.get_executor_client()
self.executor_client = rpc_clients.get_executor_client()
exe_svc = executor_server.get_oslo_service(setup_profiler=False)
self.executor = exe_svc.executor
self.threads.append(eventlet.spawn(launch_service, exe_svc))
@ -73,7 +75,7 @@ class EngineTestCase(base.DbTestCase):
# Start engine.
LOG.info("Starting engine threads...")
self.engine_client = rpc.get_engine_client()
self.engine_client = rpc_clients.get_engine_client()
eng_svc = engine_server.get_oslo_service(setup_profiler=False)
self.engine = eng_svc.engine
self.threads.append(eventlet.spawn(launch_service, eng_svc))

View File

@ -13,8 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from mistral.tests.unit import base
class KombuTestCase(base.BaseTest):
pass
def setUp(self):
super(KombuTestCase, self).setUp()
cfg.CONF.set_default('rpc_backend', 'kombu')

View File

@ -14,15 +14,15 @@
# under the License.
from mistral import exceptions as exc
from mistral.tests.unit.engine.rpc_backend.kombu import base
from mistral.tests.unit.engine.rpc_backend.kombu import fake_kombu
from mistral.tests.unit.rpc.kombu import base
from mistral.tests.unit.rpc.kombu import fake_kombu
import mock
from six import moves
with mock.patch.dict('sys.modules', kombu=fake_kombu):
from mistral.engine.rpc_backend.kombu import base as kombu_base
from mistral.engine.rpc_backend.kombu import kombu_client
from mistral.rpc.kombu import base as kombu_base
from mistral.rpc.kombu import kombu_client
class TestException(exc.MistralException):

View File

@ -14,16 +14,16 @@
# under the License.
from mistral import exceptions as exc
from mistral.tests.unit.engine.rpc_backend.kombu import base
from mistral.tests.unit.engine.rpc_backend.kombu import fake_kombu
from mistral.tests.unit.rpc.kombu import base
from mistral.tests.unit.rpc.kombu import fake_kombu
from mistral import utils
import mock
from six import moves
with mock.patch.dict('sys.modules', kombu=fake_kombu):
from mistral.engine.rpc_backend.kombu import base as kombu_base
from mistral.engine.rpc_backend.kombu import kombu_listener
from mistral.rpc.kombu import base as kombu_base
from mistral.rpc.kombu import kombu_listener
class TestException(exc.MistralException):

View File

@ -14,15 +14,15 @@
# under the License.
from mistral import exceptions as exc
from mistral.tests.unit.engine.rpc_backend.kombu import base
from mistral.tests.unit.engine.rpc_backend.kombu import fake_kombu
from mistral.tests.unit.rpc.kombu import base
from mistral.tests.unit.rpc.kombu import fake_kombu
import mock
import socket
from stevedore import driver
with mock.patch.dict('sys.modules', kombu=fake_kombu):
from mistral.engine.rpc_backend.kombu import kombu_server
from mistral.rpc.kombu import kombu_server
class TestException(exc.MistralError):

View File

@ -1,4 +1,5 @@
# Copyright 2016 Catalyst IT Ltd
# Copyright 2017 Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -18,8 +19,8 @@ import mock
from oslo_config import cfg
from mistral.db.v2.sqlalchemy import api as db_api
from mistral.engine.rpc_backend import rpc
from mistral.event_engine import event_engine
from mistral.event_engine import default_event_engine as evt_eng
from mistral.rpc import clients as rpc
from mistral.services import workflows
from mistral.tests.unit import base
@ -61,7 +62,7 @@ class EventEngineTest(base.DbTestCase):
@mock.patch.object(rpc, 'get_engine_client', mock.Mock())
def test_event_engine_start_with_no_triggers(self):
e_engine = event_engine.EventEngine()
e_engine = evt_eng.DefaultEventEngine()
self.addCleanup(e_engine.handler_tg.stop)
@ -74,7 +75,7 @@ class EventEngineTest(base.DbTestCase):
def test_event_engine_start_with_triggers(self, mock_start):
trigger = db_api.create_event_trigger(EVENT_TRIGGER)
e_engine = event_engine.EventEngine()
e_engine = evt_eng.DefaultEventEngine()
self.addCleanup(e_engine.handler_tg.stop)
@ -96,7 +97,7 @@ class EventEngineTest(base.DbTestCase):
def test_process_event_queue(self, mock_start):
db_api.create_event_trigger(EVENT_TRIGGER)
e_engine = event_engine.EventEngine()
e_engine = evt_eng.DefaultEventEngine()
self.addCleanup(e_engine.handler_tg.stop)
@ -138,8 +139,8 @@ class NotificationsConverterTest(base.BaseTest):
}
]
converter = event_engine.NotificationsConverter()
converter.definitions = [event_engine.EventDefinition(event_def)
converter = evt_eng.NotificationsConverter()
converter.definitions = [evt_eng.EventDefinition(event_def)
for event_def in reversed(definition_cfg)]
notification = {
@ -165,8 +166,8 @@ class NotificationsConverterTest(base.BaseTest):
}
]
converter = event_engine.NotificationsConverter()
converter.definitions = [event_engine.EventDefinition(event_def)
converter = evt_eng.NotificationsConverter()
converter.definitions = [evt_eng.EventDefinition(event_def)
for event_def in reversed(definition_cfg)]
notification = {

View File

@ -18,8 +18,8 @@ import mock
from oslo_config import cfg
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.rpc import clients as rpc
from mistral.services import periodic
from mistral.services import security
from mistral.services import triggers as t_s

View File

@ -40,11 +40,11 @@ console_scripts =
wsgi_scripts =
mistral-wsgi-api = mistral.api.wsgi:init_application
mistral.engine.rpc_backend =
oslo_client = mistral.engine.rpc_backend.oslo.oslo_client:OsloRPCClient
oslo_server = mistral.engine.rpc_backend.oslo.oslo_server:OsloRPCServer
kombu_client = mistral.engine.rpc_backend.kombu.kombu_client:KombuRPCClient
kombu_server = mistral.engine.rpc_backend.kombu.kombu_server:KombuRPCServer
mistral.rpc.backends =
oslo_client = mistral.rpc.oslo.oslo_client:OsloRPCClient
oslo_server = mistral.rpc.oslo.oslo_server:OsloRPCServer
kombu_client = mistral.rpc.kombu.kombu_client:KombuRPCClient
kombu_server = mistral.rpc.kombu.kombu_server:KombuRPCServer
oslo.config.opts =
mistral.config = mistral.config:list_opts