Use oslo.messaging topics for multibackend

We've built our multibackend support by abusing oslo.messaging's
Target.server and appending '@backend-name' prefix to the hostname.
This made implementation easier, as we're simply treating multibackends
as totally separated services.

While this worked in RabbitMQ, zmq is communicating explicitly using
hostnames, so appending anything to hostname in Target.server breaks
communication.

This commit modifies the messaging layer of cinder-volume to use
Target.topic to distinguish backends. This is done by:

* Making cinder-volume listen on new RPC server, with Target.server
  set to raw hostname, and topic is set to 'cinder-volume.host@backend'.
  'cinder-volume' prefix is added to keep compatibility with Newton's
  services (we're relying on how RabbitMQ transport is implemented in
  oslo.messaging).
* Note that old RPC server listening on 'cinder-volume' topic is left
  there, as we need it to recieve fanout messages from scheduler.
* When sending a message to cinder-volume, we're sending it using
  Target.topic to route it to correct host and backend. For backward
  compatibility it's controlled by conditional based on RPC version pin.

Closes-Bug: 1630975
Related-Bug: 1440631
Implements: cinder-zeromq-support
Change-Id: I22efbeb97e11838139e2b33226d1c10094d27c1d
This commit is contained in:
Michał Dulko 2016-11-16 15:34:59 +01:00
parent b3e96a58f0
commit df647d0ccd
6 changed files with 65 additions and 44 deletions

View File

@ -38,6 +38,7 @@ profiler_opts = importutils.try_import('osprofiler.opts')
from cinder.backup import rpcapi as backup_rpcapi
from cinder.common import constants
from cinder import context
from cinder import coordination
from cinder import exception
@ -48,6 +49,7 @@ from cinder import rpc
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import version
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as vol_utils
LOG = logging.getLogger(__name__)
@ -213,6 +215,7 @@ class Service(service.Service):
setup_profiler(binary, host)
self.rpcserver = None
self.backend_rpcserver = None
self.cluster_rpcserver = None
# TODO(geguileo): Remove method in O since it will no longer be used.
@ -241,16 +244,30 @@ class Service(service.Service):
LOG.debug("Creating RPC server for service %s", self.topic)
ctxt = context.get_admin_context()
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
obj_version_cap = objects.Service.get_minimum_obj_version(ctxt)
LOG.debug("Pinning object versions for RPC server serializer to %s",
obj_version_cap)
serializer = objects_base.CinderObjectSerializer(obj_version_cap)
target = messaging.Target(topic=self.topic, server=self.host)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
# NOTE(dulek): Kids, don't do that at home. We're relying here on
# oslo.messaging implementation details to keep backward compatibility
# with pre-Ocata services. This will not matter once we drop
# compatibility with them.
if self.topic == constants.VOLUME_TOPIC:
target = messaging.Target(
topic='%(topic)s.%(host)s' % {'topic': self.topic,
'host': self.host},
server=vol_utils.extract_host(self.host, 'host'))
self.backend_rpcserver = rpc.get_server(target, endpoints,
serializer)
self.backend_rpcserver.start()
# TODO(geguileo): In O - Remove the is_svc_upgrading_to_n part
if self.cluster and not self.is_svc_upgrading_to_n(self.binary):
LOG.info(_LI('Starting %(topic)s cluster %(cluster)s (version '
@ -393,6 +410,8 @@ class Service(service.Service):
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpcserver.stop()
if self.backend_rpcserver:
self.backend_rpcserver.stop()
if self.cluster_rpcserver:
self.cluster_rpcserver.stop()
except Exception:
@ -422,6 +441,8 @@ class Service(service.Service):
pass
if self.rpcserver:
self.rpcserver.wait()
if self.backend_rpcserver:
self.backend_rpcserver.wait()
if self.cluster_rpcserver:
self.cluster_rpcserver.wait()
super(Service, self).wait()

View File

@ -117,6 +117,10 @@ class VolumeRpcAPITestCase(test.TestCase):
self.addCleanup(self._cleanup)
self.can_send_version_mock = self.patch(
'oslo_messaging.RPCClient.can_send_version',
return_value=True)
def _cleanup(self):
self.fake_snapshot.destroy()
self.fake_volume_obj.destroy()
@ -203,8 +207,9 @@ class VolumeRpcAPITestCase(test.TestCase):
elif 'cgsnapshot' in kwargs:
host = kwargs['cgsnapshot'].consistencygroup.service_topic_queue
target['server'] = utils.extract_host(host)
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, host)
target['server'] = utils.extract_host(host, 'host')
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC,
utils.extract_host(host))
self.fake_args = None
self.fake_kwargs = None
@ -276,8 +281,9 @@ class VolumeRpcAPITestCase(test.TestCase):
elif 'group_snapshot' in kwargs:
host = kwargs['group_snapshot'].service_topic_queue
target['server'] = utils.extract_host(host)
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, host)
target['server'] = utils.extract_host(host, 'host')
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC,
utils.extract_host(host))
self.fake_args = None
self.fake_kwargs = None
@ -416,9 +422,8 @@ class VolumeRpcAPITestCase(test.TestCase):
version='3.0')
@ddt.data('3.0', '3.3')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_attach_volume_to_instance(self, version, can_send_version):
can_send_version.return_value = (version == '3.3')
def test_attach_volume_to_instance(self, version):
self.can_send_version_mock.return_value = (version == '3.3')
self._test_volume_api('attach_volume',
rpc_method='call',
volume=self.fake_volume_obj,
@ -429,9 +434,8 @@ class VolumeRpcAPITestCase(test.TestCase):
version=version)
@ddt.data('3.0', '3.3')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_attach_volume_to_host(self, version, can_send_version):
can_send_version.return_value = (version == '3.3')
def test_attach_volume_to_host(self, version):
self.can_send_version_mock.return_value = (version == '3.3')
self._test_volume_api('attach_volume',
rpc_method='call',
volume=self.fake_volume_obj,
@ -448,9 +452,8 @@ class VolumeRpcAPITestCase(test.TestCase):
self.fake_src_cg.obj_reset_changes(['my_cluster'])
@ddt.data('3.0', '3.3')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_attach_volume_to_cluster(self, version, can_send_version):
can_send_version.return_value = (version == '3.3')
def test_attach_volume_to_cluster(self, version):
self.can_send_version_mock.return_value = (version == '3.3')
self._set_cluster()
self._test_volume_api('attach_volume',
rpc_method='call',
@ -462,9 +465,8 @@ class VolumeRpcAPITestCase(test.TestCase):
version=version)
@ddt.data('3.0', '3.4')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_detach_volume(self, version, can_send_version):
can_send_version.return_value = (version == '3.4')
def test_detach_volume(self, version):
self.can_send_version_mock.return_value = (version == '3.4')
self._test_volume_api('detach_volume',
rpc_method='call',
volume=self.fake_volume_obj,
@ -472,9 +474,8 @@ class VolumeRpcAPITestCase(test.TestCase):
version=version)
@ddt.data('3.0', '3.4')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_detach_volume_cluster(self, version, can_send_version):
can_send_version.return_value = (version == '3.4')
def test_detach_volume_cluster(self, version):
self.can_send_version_mock.return_value = (version == '3.4')
self._set_cluster()
self._test_volume_api('detach_volume',
rpc_method='call',
@ -500,8 +501,7 @@ class VolumeRpcAPITestCase(test.TestCase):
connector='fake_connector',
version='3.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_initialize_connection_cluster(self, mock_can_send_version):
def test_initialize_connection_cluster(self):
self._set_cluster()
self._test_volume_api('initialize_connection',
rpc_method='call',
@ -676,12 +676,10 @@ class VolumeRpcAPITestCase(test.TestCase):
version='3.2')
@ddt.data(None, 'mycluster')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
@mock.patch('cinder.objects.backup.BackupDeviceInfo.from_primitive',
return_value={})
def test_get_backup_device_old(self, cluster_name, mock_from_primitive,
mock_can_send_version):
def test_get_backup_device_old(self, cluster_name, mock_from_primitive):
self.can_send_version_mock.return_value = False
self._change_cluster_name(self.fake_volume_obj, cluster_name)
self._test_volume_api('get_backup_device',
rpc_method='call',

View File

@ -800,13 +800,6 @@ class VolumeUtilsTestCase(test.TestCase):
volume_utils.extract_host,
None)
def test_get_volume_rpc_host(self):
host = 'Host@backend'
# default level is 'backend'
# check if host with backend is returned
self.assertEqual(volume_utils.extract_host(host),
volume_utils.get_volume_rpc_host(host))
def test_append_host(self):
host = 'Host'
pool = 'Pool'

View File

@ -116,16 +116,30 @@ class VolumeAPI(rpc.RPCAPI):
3.3 - Adds support for sending objects over RPC in attach_volume().
3.4 - Adds support for sending objects over RPC in detach_volume().
3.5 - Adds support for cluster in retype and migrate_volume
3.6 - Switch to use oslo.messaging topics to indicate backends instead
of @backend suffixes in server names.
"""
RPC_API_VERSION = '3.5'
RPC_API_VERSION = '3.6'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume'
def _get_cctxt(self, host=None, version=None, **kwargs):
if host is not None:
kwargs['server'] = utils.get_volume_rpc_host(host)
if host:
server = utils.extract_host(host)
# TODO(dulek): If we're pinned before 3.6, we should send stuff the
# old way - addressing server=host@backend, topic=cinder-volume.
# Otherwise we're addressing server=host,
# topic=cinder-volume.host@backend. This conditional can go away
# when we stop supporting 3.x.
if self.client.can_send_version('3.6'):
kwargs['topic'] = '%(topic)s.%(host)s' % {'topic': self.TOPIC,
'host': server}
server = utils.extract_host(server, 'host')
kwargs['server'] = server
return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
def create_consistencygroup(self, ctxt, group):

View File

@ -725,14 +725,6 @@ def extract_host(host, level='backend', default_pool_name=False):
return None
def get_volume_rpc_host(host):
if CONF.rpc_backend and CONF.rpc_backend == "zmq":
# ZeroMQ RPC driver requires only the hostname.
# So, return just that.
return extract_host(host, 'host')
return extract_host(host)
def append_host(host, pool):
"""Encode pool into host info."""
if not host or not pool:

View File

@ -0,0 +1,3 @@
---
features:
- Added support for ZMQ messaging layer in multibackend configuration.