Refactor RPC serialization: add polymophic serializer

* The initial driver of this effort was that, as it turned out,
  it was impossible to have custom serialization for a custom
  type. For example, we had class Result and even had
  ResultSerializer but it wasn't used. So after taking a closer
  look it became clear that our serialization subsystem did not
  allow to register any alternative serializers by design.
  Because we had to use one serializer implementation registered
  in RPC. The solution is to make it more flexible by adding
  a special router-like serializer that knows to which specific
  serializer it needs to switch depending on an object type.
* Refactored serialization subsystem so that it's more flexible
  and manageble
* Added polymorphic entity serializer that can work as a router
  and switch between different serializers depending on their type
* Used Result and ResultSerializer in RPC instead of decomposing
  result object into primitive fields explicitly

Co-Authored-By: Dawid Deja <dawid.deja@intel.com>
Change-Id: I29d40a0b1b68a5410f3db2f7280c9c6244d55a84
(cherry picked from commit 93ed2099d1)
This commit is contained in:
Renat Akhmerov 2017-02-06 17:52:23 +07:00
parent 1db04b75f6
commit 8749353013
12 changed files with 364 additions and 108 deletions

View File

@ -19,7 +19,6 @@ from keystoneclient.v3 import client as keystone_client
import logging
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import serializer as messaging_serializer
from oslo_serialization import jsonutils
from osprofiler import profiler
import pecan
@ -27,6 +26,7 @@ from pecan import hooks
from mistral import auth
from mistral import exceptions as exc
from mistral import serialization
from mistral import utils
LOG = logging.getLogger(__name__)
@ -227,20 +227,22 @@ def context_from_config():
class RpcContextSerializer(messaging.Serializer):
def __init__(self, base=None):
self._base = base or messaging_serializer.JsonPayloadSerializer()
def __init__(self, entity_serializer=None):
self.entity_serializer = (
entity_serializer or serialization.get_polymorphic_serializer()
)
def serialize_entity(self, context, entity):
if not self._base:
if not self.entity_serializer:
return entity
return self._base.serialize_entity(context, entity)
return self.entity_serializer.serialize(entity)
def deserialize_entity(self, context, entity):
if not self._base:
if not self.entity_serializer:
return entity
return self._base.deserialize_entity(context, entity)
return self.entity_serializer.deserialize(entity)
def serialize_context(self, context):
ctx = context.to_dict()

View File

@ -23,7 +23,6 @@ from mistral.services import expiration_policy
from mistral.services import scheduler
from mistral import utils
from mistral.utils import profiler as profiler_utils
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@ -131,20 +130,16 @@ class EngineServer(service_base.MistralService):
**params
)
def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
result_error, wf_action):
def on_action_complete(self, rpc_ctx, action_ex_id, result, wf_action):
"""Receives RPC calls to communicate action result to engine.
:param rpc_ctx: RPC request context.
:param action_ex_id: Action execution id.
:param result_data: Action result data.
:param result_error: Action result error.
:param result: Action result data.
:param wf_action: True if given id points to a workflow execution.
:return: Action execution.
"""
result = wf_utils.Result(result_data, result_error)
LOG.info(
"Received RPC request 'on_action_complete'[rpc_ctx=%s,"
" action_ex_id=%s, result=%s]" %

View File

@ -13,11 +13,10 @@
# limitations under the License.
import kombu
from kombu import serialization
from mistral import exceptions as exc
from mistral import serialization as mistral_serialization
from mistral.utils import rpc_utils
from mistral.utils import serializers
IS_RECEIVED = 'kombu_rpc_is_received'
RESULT = 'kombu_rpc_result'
@ -29,6 +28,7 @@ class Base(object):
"""Base class for Client and Server."""
def __init__(self):
self._transport_url = None
self.serializer = None
@staticmethod
def _make_connection(amqp_host, amqp_port, amqp_user, amqp_password,
@ -110,21 +110,31 @@ class Base(object):
**kwargs
)
@staticmethod
def _register_mistral_serialization():
def _register_mistral_serialization(self):
"""Adds mistral serializer to available serializers in kombu.
:return: None
"""
serialization.register(
'mistral_serialization',
encoder=serializers.KombuSerializer.serialize,
decoder=serializers.KombuSerializer.deserialize,
content_type='application/json'
)
self.serializer = mistral_serialization.get_polymorphic_serializer()
def _check_backend(self):
backend = rpc_utils.get_rpc_backend(self._transport_url)
if backend not in ['rabbit', 'kombu']:
raise exc.MistralException("Unsupported backend: %s" % backend)
def _serialize_message(self, kwargs):
result = {}
for argname, arg in kwargs.items():
result[argname] = self.serializer.serialize(arg)
return result
def _deserialize_message(self, kwargs):
result = {}
for argname, arg in kwargs.items():
result[argname] = self.serializer.deserialize(arg)
return result

View File

@ -125,7 +125,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
body = {
'rpc_ctx': ctx.to_dict(),
'rpc_method': method,
'arguments': kwargs,
'arguments': self._serialize_message(kwargs),
'async': async_
}
@ -143,7 +143,6 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
routing_key=self.topic,
reply_to=self.queue_name,
correlation_id=correlation_id,
serializer='mistral_serialization',
delivery_mode=2
)
@ -157,6 +156,9 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
if res_type == 'error':
raise res_object
else:
res_object = self._deserialize_message(res_object)['body']
finally:
if not async_:
self._listener.remove_listener(correlation_id)

View File

@ -184,14 +184,16 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
return context
def publish_message(self, body, reply_to, corr_id, res_type='response'):
if res_type != 'error':
body = self._serialize_message({'body': body})
with kombu.producers[self.conn].acquire(block=True) as producer:
producer.publish(
body=body,
exchange=self.exchange,
routing_key=reply_to,
correlation_id=corr_id,
serializer='pickle' if res_type == 'error'
else 'mistral_serialization',
serializer='pickle' if res_type == 'error' else 'json',
type=res_type
)
@ -199,6 +201,12 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
try:
return self._on_message(request, message)
except Exception as e:
LOG.warning(
"Got exception while consuming message. Exception would be "
"send back to the caller."
)
LOG.debug("Exceptions: %s" % str(e))
# Wrap exception into another exception for compability with oslo.
self.publish_message(
exc.KombuException(e),
@ -217,7 +225,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
rpc_ctx = request.get('rpc_ctx')
redelivered = message.delivery_info.get('redelivered', None)
rpc_method_name = request.get('rpc_method')
arguments = request.get('arguments')
arguments = self._deserialize_message(request.get('arguments'))
correlation_id = message.properties['correlation_id']
reply_to = message.properties['reply_to']

View File

@ -23,7 +23,6 @@ from stevedore import driver
from mistral import context as auth_ctx
from mistral.engine import base
from mistral import exceptions as exc
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@ -224,8 +223,7 @@ class EngineClient(base.Engine):
auth_ctx.ctx(),
'on_action_complete',
action_ex_id=action_ex_id,
result_data=result.data,
result_error=result.error,
result=result,
wf_action=wf_action
)
@ -359,18 +357,7 @@ class ExecutorClient(base.Executor):
rpc_client_method = (self._client.async_call
if async_ else self._client.sync_call)
res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs)
# TODO(rakhmerov): It doesn't seem a good approach since we have
# a serializer for Result class. A better solution would be to
# use a composite serializer that dispatches serialization and
# deserialization to concrete serializers depending on object
# type.
return (
wf_utils.Result(data=res['data'], error=res['error'])
if res else None
)
return rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs)
class EventEngineClient(base.EventEngine):

192
mistral/serialization.py Normal file
View File

@ -0,0 +1,192 @@
# Copyright 2017 Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_serialization import jsonutils
_SERIALIZER = None
class Serializer(object):
"""Base interface for entity serializers.
A particular serializer knows how to convert a certain object
into a string and back from that string into an object whose
state is equivalent to the initial object.
"""
@abc.abstractmethod
def serialize(self, entity):
"""Converts the given object into a string.
:param entity: A object to be serialized.
:return String containing the state of the object in serialized form.
"""
raise NotImplementedError
@abc.abstractmethod
def deserialize(self, data_str):
"""Converts the given string into an object.
:param data_str: String containing the state of the object in
serialized form.
:return: An object.
"""
raise NotImplementedError
class DictBasedSerializer(Serializer):
"""Dictionary-based serializer.
It slightly simplifies implementing custom serializers by introducing
a contract based on dictionary. A serializer class extending this class
just needs to implement conversion from object into dict and from dict
to object. It doesn't need to convert into string and back as required
bye the base serializer contract. Conversion into string is implemented
once with regard to possible problems that may occur for collection and
primitive types as circular dependencies, correct date format etc.
"""
def serialize(self, entity):
if entity is None:
return None
entity_dict = self.serialize_to_dict(entity)
return jsonutils.dumps(
jsonutils.to_primitive(entity_dict, convert_instances=True)
)
def deserialize(self, data_str):
if data_str is None:
return None
entity_dict = jsonutils.loads(data_str)
return self.deserialize_from_dict(entity_dict)
@abc.abstractmethod
def serialize_to_dict(self, entity):
raise NotImplementedError
@abc.abstractmethod
def deserialize_from_dict(self, entity_dict):
raise NotImplementedError
class MistralSerializable(object):
"""A mixin to generate a serialization key for a custom object."""
@classmethod
def get_serialization_key(cls):
return "%s.%s" % (cls.__module__, cls.__name__)
class PolymorphicSerializer(Serializer):
"""Polymorphic serializer.
The purpose of this class is to server as a serialization router
between serializers that can work with entities of particular type.
All concrete serializers associated with concrete entity classes
should be registered via method 'register', after that an instance
of polymorphic serializer can be used as a universal serializer
for an RPC system or something else.
When converting an object into a string this serializer also writes
a special key into the result string sequence so that it's possible
to find a proper serializer when deserializing this object.
If a primitive value is given as an entity this serializer doesn't
do anything special and simply converts a value into a string using
jsonutils. Similar when it converts a string into a primitive value.
"""
def __init__(self):
# {serialization key: serializer}
self.serializers = {}
@staticmethod
def _get_serialization_key(entity_cls):
if issubclass(entity_cls, MistralSerializable):
return entity_cls.get_serialization_key()
return None
def register(self, entity_cls, serializer):
key = self._get_serialization_key(entity_cls)
if not key:
return
if key in self.serializers:
raise RuntimeError(
"A serializer for the entity class has already been"
" registered: %s" % entity_cls
)
self.serializers[key] = serializer
def cleanup(self):
self.serializers.clear()
def serialize(self, entity):
if entity is None:
return None
key = self._get_serialization_key(type(entity))
# Primitive or not registered type.
if not key:
return jsonutils.dumps(
jsonutils.to_primitive(entity, convert_instances=True)
)
serializer = self.serializers.get(key)
result = {
'__serial_key': key,
'__serial_data': serializer.serialize(entity)
}
return jsonutils.dumps(result)
def deserialize(self, data_str):
if data_str is None:
return None
data = jsonutils.loads(data_str)
if isinstance(data, dict) and '__serial_key' in data:
serializer = self.serializers.get(data['__serial_key'])
return serializer.deserialize(data['__serial_data'])
return data
def get_polymorphic_serializer():
global _SERIALIZER
if _SERIALIZER is None:
_SERIALIZER = PolymorphicSerializer()
return _SERIALIZER
def register_serializer(entity_cls, serializer):
get_polymorphic_serializer().register(entity_cls, serializer)
def cleanup():
get_polymorphic_serializer().cleanup()

View File

@ -738,5 +738,5 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.assertIn('UnicodeDecodeError: utf', wf_ex.state_info)
self.assertIn('UnicodeDecodeError: utf', task_ex.state_info)
self.assertIn("UnicodeDecodeError: utf", wf_ex.state_info)
self.assertIn("UnicodeDecodeError: utf", task_ex.state_info)

View File

@ -1300,6 +1300,7 @@ class WithItemsEngineTest(base.EngineTestCase):
@mock.patch.object(std_actions.HTTPAction, 'run')
def test_with_items_and_adhoc_action(self, mock_http_action):
mock_http_action.return_value = ''
wb_text = """---
version: "2.0"

View File

@ -0,0 +1,108 @@
# Copyright 2017 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral import serialization
from mistral.tests.unit import base
class MyClass(serialization.MistralSerializable):
def __init__(self, a, b):
self.a = a
self.b = b
def __eq__(self, other):
if not isinstance(other, MyClass):
return False
return other.a == self.a and other.b == self.b
class MyClassSerializer(serialization.DictBasedSerializer):
def serialize_to_dict(self, entity):
return {'a': entity.a, 'b': entity.b}
def deserialize_from_dict(self, entity_dict):
return MyClass(entity_dict['a'], entity_dict['b'])
class SerializationTest(base.BaseTest):
def setUp(self):
super(SerializationTest, self).setUp()
serialization.register_serializer(MyClass, MyClassSerializer())
self.addCleanup(serialization.cleanup)
def test_dict_based_serializer(self):
obj = MyClass('a', 'b')
serializer = MyClassSerializer()
s = serializer.serialize(obj)
self.assertEqual(obj, serializer.deserialize(s))
self.assertIsNone(serializer.serialize(None))
self.assertIsNone(serializer.deserialize(None))
def test_polymorphic_serializer_primitive_types(self):
serializer = serialization.get_polymorphic_serializer()
self.assertEqual(17, serializer.deserialize(serializer.serialize(17)))
self.assertEqual(
0.34,
serializer.deserialize(serializer.serialize(0.34))
)
self.assertEqual(-5, serializer.deserialize(serializer.serialize(-5)))
self.assertEqual(
-6.3,
serializer.deserialize(serializer.serialize(-6.3))
)
self.assertFalse(serializer.deserialize(serializer.serialize(False)))
self.assertTrue(serializer.deserialize(serializer.serialize(True)))
self.assertEqual(
'abc',
serializer.deserialize(serializer.serialize('abc'))
)
self.assertEqual(
{'a': 'b', 'c': 'd'},
serializer.deserialize(serializer.serialize({'a': 'b', 'c': 'd'}))
)
self.assertEqual(
['a', 'b', 'c'],
serializer.deserialize(serializer.serialize(['a', 'b', 'c']))
)
def test_polymorphic_serializer_custom_object(self):
serializer = serialization.get_polymorphic_serializer()
obj = MyClass('a', 'b')
s = serializer.serialize(obj)
self.assertIn('__serial_key', s)
self.assertIn('__serial_data', s)
self.assertEqual(obj, serializer.deserialize(s))
self.assertIsNone(serializer.serialize(None))
self.assertIsNone(serializer.deserialize(None))
def test_register_twice(self):
self.assertRaises(
RuntimeError,
serialization.register_serializer,
MyClass,
MyClassSerializer()
)

View File

@ -1,41 +0,0 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from oslo_serialization import jsonutils
class Serializer(object):
@staticmethod
@abc.abstractmethod
def serialize(entity):
pass
@staticmethod
@abc.abstractmethod
def deserialize(entity):
pass
class KombuSerializer(Serializer):
@staticmethod
def deserialize(entity):
return jsonutils.loads(entity)
@staticmethod
def serialize(entity):
return jsonutils.dumps(
jsonutils.to_primitive(entity, convert_instances=True)
)

View File

@ -14,12 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral import serialization
from mistral import utils
from mistral.utils import serializers
from osprofiler import profiler
class Result(object):
class Result(serialization.MistralSerializable):
"""Explicit data structure containing a result of task execution."""
def __init__(self, data=None, error=None, cancel=False):
@ -28,16 +27,9 @@ class Result(object):
self.cancel = cancel
def __repr__(self):
try:
profiler.start("action-result-repr")
import traceback
traceback.print_stack()
return 'Result [data=%s, error=%s, cancel=%s]' % (
repr(self.data), repr(self.error), str(self.cancel)
)
finally:
profiler.stop()
return 'Result [data=%s, error=%s, cancel=%s]' % (
repr(self.data), repr(self.error), str(self.cancel)
)
def cut_repr(self):
return 'Result [data=%s, error=%s, cancel=%s]' % (
@ -68,19 +60,19 @@ class Result(object):
if self.is_success() else {'result': self.error})
class ResultSerializer(serializers.Serializer):
@staticmethod
def serialize(entity):
class ResultSerializer(serialization.DictBasedSerializer):
def serialize_to_dict(self, entity):
return {
'data': entity.data,
'error': entity.error,
'cancel': entity.cancel
}
@staticmethod
def deserialize(entity):
def deserialize_from_dict(self, entity_dict):
return Result(
entity['data'],
entity['error'],
entity.get('cancel', False)
entity_dict['data'],
entity_dict['error'],
entity_dict.get('cancel', False)
)
serialization.register_serializer(Result, ResultSerializer())