terracotta/terracotta/rpc.py

290 lines
8.0 KiB
Python

# Copyright 2015 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from oslo import messaging
from oslo_messaging.rpc import client
from mistral import context as auth_ctx
from mistral.engine import base
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
_TRANSPORT = None
_ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
def cleanup():
"""Intended to be used by tests to recreate all RPC related objects."""
global _TRANSPORT
global _ENGINE_CLIENT
global _EXECUTOR_CLIENT
_TRANSPORT = None
_ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
def get_transport():
global _TRANSPORT
if not _TRANSPORT:
_TRANSPORT = messaging.get_transport(cfg.CONF)
return _TRANSPORT
def get_engine_client():
global _ENGINE_CLIENT
if not _ENGINE_CLIENT:
_ENGINE_CLIENT = EngineClient(get_transport())
return _ENGINE_CLIENT
def get_executor_client():
global _EXECUTOR_CLIENT
if not _EXECUTOR_CLIENT:
_EXECUTOR_CLIENT = ExecutorClient(get_transport())
return _EXECUTOR_CLIENT
class GlobalManagerServer(object):
"""RPC Engine server."""
def __init__(self, manager):
self._manager = manager
def wrap_messaging_exception(method):
"""This decorator unwrap remote error in one of MistralException.
oslo.messaging has different behavior on raising exceptions
when fake or rabbit transports are used. In case of rabbit
transport it raises wrapped RemoteError which forwards directly
to API. Wrapped RemoteError contains one of MistralException raised
remotely on Engine and for correct exception interpretation we
need to unwrap and raise given exception and manually send it to
API layer.
"""
def decorator(*args, **kwargs):
try:
return method(*args, **kwargs)
except client.RemoteError as e:
exc_cls = getattr(exc, e.exc_type)
raise exc_cls(e.value)
return decorator
class EngineClient(base.Engine):
"""RPC Engine client."""
def __init__(self, transport):
"""Constructs an RPC client for engine.
:param transport: Messaging transport.
"""
serializer = auth_ctx.RpcContextSerializer(
auth_ctx.JsonPayloadSerializer())
self._client = messaging.RPCClient(
transport,
messaging.Target(topic=cfg.CONF.engine.topic),
serializer=serializer
)
@wrap_messaging_exception
def start_workflow(self, wf_name, wf_input, **params):
"""Starts workflow sending a request to engine over RPC.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'start_workflow',
workflow_name=wf_name,
workflow_input=wf_input or {},
params=params
)
def on_task_state_change(self, task_ex_id, state):
return self._client.call(
auth_ctx.ctx(),
'on_task_state_change',
task_ex_id=task_ex_id,
state=state
)
@wrap_messaging_exception
def on_action_complete(self, action_ex_id, result):
"""Conveys action result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
state of a action execution once action has executed. One of the
clients of this method is Mistral REST API server that receives
action result from the outside action handlers.
Note: calling this method serves an event notifying Mistral that
it possibly needs to move the workflow on, i.e. run other workflow
tasks for which all dependencies are satisfied.
:return: Task.
"""
return self._client.call(
auth_ctx.ctx(),
'on_action_complete',
action_ex_id=action_ex_id,
result_data=result.data,
result_error=result.error
)
@wrap_messaging_exception
def pause_workflow(self, execution_id):
"""Stops the workflow with the given execution id.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'pause_workflow',
execution_id=execution_id
)
@wrap_messaging_exception
def resume_workflow(self, execution_id):
"""Resumes the workflow with the given execution id.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'resume_workflow',
execution_id=execution_id
)
@wrap_messaging_exception
def stop_workflow(self, execution_id, state, message=None):
"""Stops workflow execution with given status.
Once stopped, the workflow is complete with SUCCESS or ERROR,
and can not be resumed.
:param execution_id: Workflow execution id
:param state: State assigned to the workflow: SUCCESS or ERROR
:param message: Optional information string
:return: Workflow execution, model.Execution
"""
return self._client.call(
auth_ctx.ctx(),
'stop_workflow',
execution_id=execution_id,
state=state,
message=message
)
@wrap_messaging_exception
def rollback_workflow(self, execution_id):
"""Rolls back the workflow with the given execution id.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'rollback_workflow',
execution_id=execution_id
)
class LocalManagerServer(object):
"""RPC Executor server."""
def __init__(self, manager):
self._executor = manager
def run_action(self, rpc_ctx, action_ex_id, action_class_str,
attributes, params):
"""Receives calls over RPC to run action on executor.
:param rpc_ctx: RPC request context dictionary.
"""
LOG.info(
"Received RPC request 'run_action'[rpc_ctx=%s,"
" action_ex_id=%s, action_class=%s, attributes=%s, params=%s]"
% (rpc_ctx, action_ex_id, action_class_str, attributes, params)
)
self._executor.run_action(
action_ex_id,
action_class_str,
attributes,
params
)
class ExecutorClient(base.Executor):
"""RPC Executor client."""
def __init__(self, transport):
"""Constructs an RPC client for the Executor.
:param transport: Messaging transport.
:type transport: Transport.
"""
serializer = auth_ctx.RpcContextSerializer(
auth_ctx.JsonPayloadSerializer()
)
self.topic = cfg.CONF.executor.topic
self._client = messaging.RPCClient(
transport,
messaging.Target(),
serializer=serializer
)
def run_action(self, action_ex_id, action_class_str, attributes,
action_params, target=None):
"""Sends a request to run action to executor."""
kwargs = {
'action_ex_id': action_ex_id,
'action_class_str': action_class_str,
'attributes': attributes,
'params': action_params
}
self._client.prepare(topic=self.topic, server=target).cast(
auth_ctx.ctx(),
'run_action',
**kwargs
)