Prepare engine and client for object parameters

We are moving away from passing JSON objects to passing versioned
objects at the RPC interface. This means the RPC client and RPC server
will need to understand (marshaling) this new data format. This patch
adds the basic serializers from oslo.versionedobject. Hope it will work
so we don't have to derive a subclass from it.

Change-Id: I1f2a6af38327d4e90727f22eead3689b7bbc3131
This commit is contained in:
tengqm 2016-10-12 10:14:59 -04:00
parent 4c3d19bb60
commit 374c80612e
11 changed files with 179 additions and 33 deletions

View File

@ -16,11 +16,13 @@ RPC_ATTRS = (
ENGINE_TOPIC,
ENGINE_DISPATCHER_TOPIC,
ENGINE_HEALTH_MGR_TOPIC,
RPC_API_VERSION_BASE,
RPC_API_VERSION,
) = (
'senlin-engine',
'engine-dispatcher',
'engine-health-mgr',
'1.0',
'1.1',
)

View File

@ -14,6 +14,7 @@ import eventlet
from oslo_config import cfg
import oslo_messaging as messaging
from senlin.common import consts
from senlin.common import context
# An alias for the default serializer
@ -83,18 +84,23 @@ def cleanup():
NOTIFIER = None
def get_rpc_server(target, endpoint):
def get_rpc_server(target, endpoint, serializer=None):
"""Return a configured oslo_messaging rpc server."""
serializer = RequestContextSerializer(JsonPayloadSerializer())
if serializer is None:
serializer = JsonPayloadSerializer()
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet',
serializer=serializer)
def get_rpc_client(**kwargs):
def get_rpc_client(topic, server, serializer=None):
"""Return a configured oslo_messaging RPCClient."""
target = messaging.Target(**kwargs)
serializer = RequestContextSerializer(JsonPayloadSerializer())
target = messaging.Target(topic=topic, server=server,
version=consts.RPC_API_VERSION_BASE)
if serializer is None:
serializer = JsonPayloadSerializer()
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT, target, serializer=serializer)

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_context import context as oslo_context
from oslo_log import log as logging
import oslo_messaging
@ -83,8 +84,10 @@ def notify(method, engine_id=None, **kwargs):
:param method: remote method to call
:param engine_id: dispatcher to notify; None implies broadcast
'''
client = rpc_messaging.get_rpc_client(version=consts.RPC_API_VERSION)
# TODO(Qiming): Check if we need ovo serializer here
serializer = None
client = rpc_messaging.get_rpc_client(consts.ENGINE_TOPIC, cfg.CONF.host,
serializer=serializer)
if engine_id:
# Notify specific dispatcher identified by engine_id

View File

@ -297,21 +297,14 @@ def notify(engine_id, method, **kwargs):
:param method: remote method to call
"""
timeout = cfg.CONF.engine_life_check_timeout
client = rpc.get_rpc_client(version=consts.RPC_API_VERSION)
client = rpc.get_rpc_client(consts.ENGINE_HEALTH_MGR_TOPIC, None)
if engine_id:
# Notify specific dispatcher identified by engine_id
call_context = client.prepare(
version=consts.RPC_API_VERSION,
timeout=timeout,
topic=consts.ENGINE_HEALTH_MGR_TOPIC,
server=engine_id)
call_context = client.prepare(timeout=timeout, server=engine_id)
else:
# Broadcast to all disptachers
call_context = client.prepare(
version=consts.RPC_API_VERSION,
timeout=timeout,
topic=consts.ENGINE_HEALTH_MGR_TOPIC)
call_context = client.prepare(timeout=timeout)
ctx = context.get_admin_context()

View File

@ -90,7 +90,6 @@ class EngineService(service.Service):
# which happens after the fork when spawning multiple worker processes
self.engine_id = None
self.TG = None
self.target = None
self._rpc_server = None
self.cleanup_timer = None
self.cleanup_count = 0
@ -116,8 +115,12 @@ class EngineService(service.Service):
target = oslo_messaging.Target(version=consts.RPC_API_VERSION,
server=self.host,
topic=self.topic)
self.target = target
self._rpc_server = rpc_messaging.get_rpc_server(target, self)
# TODO(Qiming): uncomment the following line to enable
# new RPC server
# serializer = obj_base.VersionedObjectSerializer()
serializer = None
self._rpc_server = rpc_messaging.get_rpc_server(
target, self, serializer=serializer)
self._rpc_server.start()
# create a health manager RPC service for this engine.

View File

@ -19,6 +19,7 @@ from senlin import objects
from senlin.objects import fields
VersionedObjectDictCompat = base.VersionedObjectDictCompat
VersionedObjectSerializer = base.VersionedObjectSerializer
class SenlinObject(base.VersionedObject):

View File

@ -19,7 +19,6 @@ import six
from senlin.common.i18n import _
CONF = cfg.CONF
CONF.import_opt("max_nodes_per_cluster", "senlin.common.config")
# Field alias for code readability
BooleanField = fields.BooleanField
@ -217,6 +216,7 @@ class Capacity(fields.Integer):
def __init__(self, minimum=0, maximum=None):
super(Capacity, self).__init__()
CONF.import_opt("max_nodes_per_cluster", "senlin.common.config")
if minimum > CONF.max_nodes_per_cluster:
err = _("The value of 'minimum' cannot be greater than the global "

View File

@ -18,6 +18,7 @@ from oslo_config import cfg
from senlin.common import consts
from senlin.common import messaging
from senlin.objects import base as object_base
class EngineClient(object):
@ -29,13 +30,15 @@ class EngineClient(object):
1.1 - Add cluster-collect call.
"""
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
self._client = messaging.get_rpc_client(
topic=consts.ENGINE_TOPIC,
server=cfg.CONF.host,
version=self.BASE_RPC_API_VERSION)
# TODO(Qiming): remove this when the migration is done
self._client = messaging.get_rpc_client(consts.ENGINE_TOPIC,
cfg.CONF.host)
serializer = object_base.VersionedObjectSerializer()
self.client = messaging.get_rpc_client(consts.ENGINE_TOPIC,
cfg.CONF.host,
serializer=serializer)
@staticmethod
def make_msg(method, **kwargs):

View File

@ -11,6 +11,7 @@
# under the License.
import mock
from oslo_config import cfg
from oslo_context import context
import oslo_messaging
@ -106,12 +107,15 @@ class TestDispatcher(base.SenlinTestCase):
@mock.patch.object(context, 'get_current')
@mock.patch.object(messaging, 'get_rpc_client')
def test_notify_broadcast(self, mock_rpc, mock_get_current):
cfg.CONF.set_override('host', 'HOSTNAME', enforce_type=True)
fake_ctx = mock.Mock()
mock_get_current.return_value = fake_ctx
mock_rpc.return_value = mock.Mock()
dispatcher.notify('METHOD')
mock_rpc.assert_called_once_with(version=consts.RPC_API_VERSION)
mock_rpc.assert_called_once_with(consts.ENGINE_TOPIC, 'HOSTNAME',
serializer=None)
mock_client = mock_rpc.return_value
mock_client.prepare.assert_called_once_with(
version=consts.RPC_API_VERSION,
@ -124,6 +128,7 @@ class TestDispatcher(base.SenlinTestCase):
@mock.patch.object(context, 'get_current')
@mock.patch.object(messaging, 'get_rpc_client')
def test_notify_single_server(self, mock_rpc, mock_get_current):
cfg.CONF.set_override('host', 'HOSTNAME', enforce_type=True)
fake_ctx = mock.Mock()
mock_get_current.return_value = fake_ctx
mock_rpc.return_value = mock.Mock()
@ -131,7 +136,8 @@ class TestDispatcher(base.SenlinTestCase):
self.assertTrue(result)
mock_rpc.assert_called_once_with(version=consts.RPC_API_VERSION)
mock_rpc.assert_called_once_with(consts.ENGINE_TOPIC, 'HOSTNAME',
serializer=None)
mock_client = mock_rpc.return_value
mock_client.prepare.assert_called_once_with(
version=consts.RPC_API_VERSION,
@ -143,6 +149,7 @@ class TestDispatcher(base.SenlinTestCase):
@mock.patch.object(messaging, 'get_rpc_client')
def test_notify_timeout(self, mock_rpc):
cfg.CONF.set_override('host', 'HOSTNAME', enforce_type=True)
mock_rpc.return_value = mock.Mock()
mock_client = mock_rpc.return_value
mock_context = mock_client.prepare.return_value
@ -151,8 +158,8 @@ class TestDispatcher(base.SenlinTestCase):
result = dispatcher.notify('METHOD')
self.assertFalse(result)
mock_rpc.assert_called_once_with(version=consts.RPC_API_VERSION)
mock_rpc.assert_called_once_with(consts.ENGINE_TOPIC, 'HOSTNAME',
serializer=None)
mock_client.prepare.assert_called_once_with(
version=consts.RPC_API_VERSION,
topic=consts.ENGINE_DISPATCHER_TOPIC,

View File

@ -74,9 +74,9 @@ class EngineBasicTest(base.SenlinTestCase):
mock_msg_cls.assert_called_once_with(version=consts.RPC_API_VERSION,
server=self.eng.host,
topic=self.eng.topic)
self.assertEqual(mock_target, self.eng.target)
self.get_rpc.assert_called_once_with(mock_target, self.eng)
self.get_rpc.assert_called_once_with(mock_target, self.eng,
serializer=None)
self.assertEqual(self.fake_rpc_server, self.eng._rpc_server)
self.fake_rpc_server.start.assert_called_once_with()

View File

@ -0,0 +1,128 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import oslo_messaging
import testtools
from senlin.common import consts
from senlin.common import messaging
class TestUtilFunctions(testtools.TestCase):
@mock.patch.object(oslo_messaging, "get_rpc_server")
@mock.patch("senlin.common.messaging.RequestContextSerializer")
@mock.patch("senlin.common.messaging.JsonPayloadSerializer")
def test_get_rpc_server(self, mock_json_serializer,
mock_context_serializer,
mock_get_rpc_server):
x_target = mock.Mock()
x_endpoint = mock.Mock()
x_json_serializer = mock.Mock()
mock_json_serializer.return_value = x_json_serializer
x_context_serializer = mock.Mock()
mock_context_serializer.return_value = x_context_serializer
x_rpc_server = mock.Mock()
mock_get_rpc_server.return_value = x_rpc_server
res = messaging.get_rpc_server(x_target, x_endpoint)
self.assertEqual(x_rpc_server, res)
mock_json_serializer.assert_called_once_with()
mock_context_serializer.assert_called_once_with(x_json_serializer)
mock_get_rpc_server.assert_called_once_with(
messaging.TRANSPORT, x_target, [x_endpoint],
executor='eventlet', serializer=x_context_serializer)
@mock.patch.object(oslo_messaging, "get_rpc_server")
@mock.patch("senlin.common.messaging.RequestContextSerializer")
@mock.patch("senlin.common.messaging.JsonPayloadSerializer")
def test_get_rpc_server_with_serializer(self, mock_json_serializer,
mock_context_serializer,
mock_get_rpc_server):
x_target = mock.Mock()
x_endpoint = mock.Mock()
x_serializer = mock.Mock()
x_context_serializer = mock.Mock()
mock_context_serializer.return_value = x_context_serializer
x_rpc_server = mock.Mock()
mock_get_rpc_server.return_value = x_rpc_server
res = messaging.get_rpc_server(x_target, x_endpoint,
serializer=x_serializer)
self.assertEqual(x_rpc_server, res)
self.assertEqual(0, mock_json_serializer.call_count)
mock_context_serializer.assert_called_once_with(x_serializer)
mock_get_rpc_server.assert_called_once_with(
messaging.TRANSPORT, x_target, [x_endpoint],
executor='eventlet', serializer=x_context_serializer)
@mock.patch("oslo_messaging.RPCClient")
@mock.patch("senlin.common.messaging.RequestContextSerializer")
@mock.patch("senlin.common.messaging.JsonPayloadSerializer")
@mock.patch("oslo_messaging.Target")
def test_get_rpc_client(self, mock_target, mock_json_serializer,
mock_context_serializer,
mock_rpc_client):
x_topic = mock.Mock()
x_server = mock.Mock()
x_target = mock.Mock()
mock_target.return_value = x_target
x_json_serializer = mock.Mock()
mock_json_serializer.return_value = x_json_serializer
x_context_serializer = mock.Mock()
mock_context_serializer.return_value = x_context_serializer
x_rpc_client = mock.Mock()
mock_rpc_client.return_value = x_rpc_client
res = messaging.get_rpc_client(x_topic, x_server)
self.assertEqual(x_rpc_client, res)
mock_target.assert_called_once_with(
topic=x_topic, server=x_server,
version=consts.RPC_API_VERSION_BASE)
mock_json_serializer.assert_called_once_with()
mock_context_serializer.assert_called_once_with(x_json_serializer)
mock_rpc_client.assert_called_once_with(
messaging.TRANSPORT, x_target, serializer=x_context_serializer)
@mock.patch("oslo_messaging.RPCClient")
@mock.patch("senlin.common.messaging.RequestContextSerializer")
@mock.patch("senlin.common.messaging.JsonPayloadSerializer")
@mock.patch("oslo_messaging.Target")
def test_get_rpc_client_with_serializer(self, mock_target,
mock_json_serializer,
mock_context_serializer,
mock_rpc_client):
x_topic = mock.Mock()
x_server = mock.Mock()
x_target = mock.Mock()
x_serializer = mock.Mock()
mock_target.return_value = x_target
x_context_serializer = mock.Mock()
mock_context_serializer.return_value = x_context_serializer
x_rpc_client = mock.Mock()
mock_rpc_client.return_value = x_rpc_client
res = messaging.get_rpc_client(x_topic, x_server,
serializer=x_serializer)
self.assertEqual(x_rpc_client, res)
mock_target.assert_called_once_with(
topic=x_topic, server=x_server,
version=consts.RPC_API_VERSION_BASE)
self.assertEqual(0, mock_json_serializer.call_count)
mock_context_serializer.assert_called_once_with(x_serializer)
mock_rpc_client.assert_called_once_with(
messaging.TRANSPORT, x_target, serializer=x_context_serializer)