Preparations for configurable serialization

This patch moves message envelope logic to driver level
and separates serialization logic from envelop logic

Change-Id: I33c52193357fe298d82b1eb36c9b95edf7e500a4
This commit is contained in:
dukhlov 2015-12-03 05:51:00 -05:00 committed by Dmitriy Ukhlov
parent 8b764c8287
commit bbf0efa29e
5 changed files with 134 additions and 35 deletions

View File

@ -15,8 +15,6 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_messaging._drivers import common
from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr
@ -149,7 +147,7 @@ class PikaDriver(object):
self._allowed_remote_exmods = allowed_remote_exmods
self._pika_engine = pika_drv_engine.PikaEngine(
conf, url, default_exchange
conf, url, default_exchange, allowed_remote_exmods
)
self._reply_listener = pika_drv_lstnr.RpcReplyPikaListener(
self._pika_engine
@ -192,13 +190,10 @@ class PikaDriver(object):
)
if reply is not None:
if reply.message['failure']:
ex = common.deserialize_remote_exception(
reply.message['failure'], self._allowed_remote_exmods
)
raise ex
if reply.failure is not None:
raise reply.failure
return reply.message['result']
return reply.result
def _declare_notification_queue_binding(self, target, timeout=None):
if timeout is not None and timeout < 0:

View File

@ -21,6 +21,7 @@ from pika import credentials as pika_credentials
import pika_pool
import six
import socket
import sys
@ -29,6 +30,8 @@ import time
LOG = logging.getLogger(__name__)
_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
def _is_eventlet_monkey_patched(module):
"""Determines safely is eventlet patching for module enabled or not
@ -91,7 +94,8 @@ class PikaEngine(object):
# (it should be defined in 'select' module of standard library in future)
TCP_USER_TIMEOUT = 18
def __init__(self, conf, url, default_exchange=None):
def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None):
self.conf = conf
self._force_select_poller_use = _is_eventlet_monkey_patched('select')
@ -108,6 +112,10 @@ class PikaEngine(object):
default_exchange
)
self.allowed_remote_exmods = [_EXCEPTIONS_MODULE]
if allowed_remote_exmods:
self.allowed_remote_exmods.extend(allowed_remote_exmods)
self.rpc_listener_prefetch_count = (
conf.oslo_messaging_pika.rpc_listener_prefetch_count
)

View File

@ -59,3 +59,10 @@ class HostConnectionNotAllowedException(EstablishConnectionException):
not allowed host (because of reconnection policy for example)
"""
pass
class UnsupportedDriverVersion(exceptions.MessagingException):
"""Is raised when message is received but was sent by different,
not supported driver version
"""
pass

View File

@ -11,38 +11,66 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import time
import traceback
import uuid
from concurrent import futures
from oslo_log import log as logging
from oslo_messaging._drivers import common
from oslo_messaging import exceptions
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_serialization import jsonutils
from oslo_utils import importutils
from pika import exceptions as pika_exceptions
from pika import spec as pika_spec
import pika_pool
import retrying
import six
import socket
import time
import uuid
import oslo_messaging
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
LOG = logging.getLogger(__name__)
_VERSION_HEADER = "version"
_VERSION = "1.0"
class RemoteExceptionMixin(object):
def __init__(self, module, clazz, message, trace):
self.module = module
self.clazz = clazz
self.message = message
self.trace = trace
self._str_msgs = message + "\n" + "\n".join(trace)
def __str__(self):
return self._str_msgs
class PikaIncomingMessage(object):
def __init__(self, pika_engine, channel, method, properties, body, no_ack):
headers = getattr(properties, "headers", {})
version = headers.get(_VERSION_HEADER, None)
if not utils.version_is_compatible(version, _VERSION):
raise pika_drv_exc.UnsupportedDriverVersion(
"Message's version: {} is not compatible with driver version: "
"{}".format(version, _VERSION))
self._pika_engine = pika_engine
self._no_ack = no_ack
self._channel = channel
self.delivery_tag = method.delivery_tag
self.version = version
self.content_type = getattr(properties, "content_type",
"application/json")
self.content_encoding = getattr(properties, "content_encoding",
@ -61,9 +89,7 @@ class PikaIncomingMessage(object):
)
)
message_dict = common.deserialize_msg(
jsonutils.loads(body, encoding=self.content_encoding)
)
message_dict = jsonutils.loads(body, encoding=self.content_encoding)
context_dict = {}
@ -101,15 +127,37 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
if not (self.msg_id and self.reply_q):
return
if failure:
failure = common.serialize_remote_exception(failure, log_failure)
msg = {
'result': reply,
'failure': failure,
'_msg_id': self.msg_id,
}
if failure is not None:
if isinstance(failure, RemoteExceptionMixin):
failure_data = {
'class': failure.clazz,
'module': failure.module,
'message': failure.message,
'tb': failure.trace
}
else:
tb = traceback.format_exception(*failure)
failure = failure[1]
cls_name = six.text_type(failure.__class__.__name__)
mod_name = six.text_type(failure.__class__.__module__)
failure_data = {
'class': cls_name,
'module': mod_name,
'message': six.text_type(failure),
'tb': tb
}
msg['_failure'] = failure_data
if reply is not None:
msg['_result'] = reply
reply_outgoing_message = PikaOutgoingMessage(
self._pika_engine, msg, self.ctxt, content_type=self.content_type,
content_encoding=self.content_encoding
@ -154,6 +202,49 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
)
class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
def __init__(self, pika_engine, channel, method, properties, body, no_ack):
self.result = None
self.failure = None
super(RpcReplyPikaIncomingMessage, self).__init__(
pika_engine, channel, method, properties, body, no_ack
)
if self.failure is not None:
trace = self.failure.get('tb', [])
message = self.failure.get('message', "")
class_name = self.failure.get('class')
module_name = self.failure.get('module')
res_exc = None
if module_name in pika_engine.allowed_remote_exmods:
try:
module = importutils.import_module(module_name)
klass = getattr(module, class_name)
ex_type = type(
klass.__name__,
(RemoteExceptionMixin, klass),
{}
)
res_exc = ex_type(module_name, class_name, message, trace)
except ImportError as e:
LOG.warn(
"Can not deserialize remote exception [module:{}, "
"class:{}]. {}".format(module_name, class_name, str(e))
)
# if we have not processed failure yet, use RemoteError class
if res_exc is None:
res_exc = oslo_messaging.RemoteError(
class_name, message, trace
)
self.failure = res_exc
class PikaOutgoingMessage(object):
def __init__(self, pika_engine, message, context,
content_type="application/json", content_encoding="utf-8"):
@ -253,16 +344,14 @@ class PikaOutgoingMessage(object):
properties = pika_spec.BasicProperties(
content_encoding=self.content_encoding,
content_type=self.content_type,
headers={_VERSION_HEADER: _VERSION},
delivery_mode=2 if persistent else 1
)
pool = (self._pika_engine.connection_with_confirmation_pool
if confirm else self._pika_engine.connection_pool)
body = jsonutils.dumps(
common.serialize_msg(msg_dict),
encoding=self.content_encoding
)
body = jsonutils.dumps(msg_dict, encoding=self.content_encoding)
LOG.debug(
"Sending message:[body:{}; properties: {}] to target: "

View File

@ -255,7 +255,7 @@ class RpcReplyPikaPoller(PikaPoller):
msg = super(RpcReplyPikaPoller, self).poll(timeout)
if msg is None:
return None
return pika_drv_msg.PikaIncomingMessage(
return pika_drv_msg.RpcReplyPikaIncomingMessage(
self._pika_engine, *msg
)