Support ZeroMQ messaging driver in cinder

NOTE:This patch introduces support for ZeroMQ driver for cinder single
backend case.
Multi-backend will be addressed in the next patch as part of the
blueprint.

CHANGES:ZeroMQ driver requires hostname for message delivery as there is no
broker inbetween.
So, extract the hostname and feed to the messaging client for zeromq
driver.

For the record, ZeroMq is a very lightweight distributed messaging system
specially designed for high throughput/low latency scenarios.
Addition of support for ZeroMQ would help cinder scale out with high
performance and be highly available as there is no centralised broker.

DocImpact
Document the configurations for ZeroMQ driver for Cinder

Change-Id: Ic4b4301e5d7ca1692fc91155ba53f2dd12f99311
Closes-Bug: #1440631
partially Implements bp cinder-zeromq-support
This commit is contained in:
Vivek Dhayaal 2016-01-24 11:17:37 +05:30
parent 55744deafb
commit 3320a89438
4 changed files with 53 additions and 61 deletions

View File

@ -789,6 +789,13 @@ class VolumeUtilsTestCase(test.TestCase):
self.assertEqual(pool,
volume_utils.extract_host(host, 'pool', True))
def test_get_volume_rpc_host(self):
host = 'Host@backend'
# default level is 'backend'
# check if host with backend is returned
self.assertEqual(volume_utils.extract_host(host),
volume_utils.get_volume_rpc_host(host))
def test_append_host(self):
host = 'Host'
pool = 'Pool'

View File

@ -91,22 +91,23 @@ class VolumeAPI(rpc.RPCAPI):
TOPIC = CONF.volume_topic
BINARY = 'cinder-volume'
def _get_cctxt(self, host, version):
new_host = utils.get_volume_rpc_host(host)
return self.client.prepare(server=new_host, version=version)
def create_consistencygroup(self, ctxt, group, host):
new_host = utils.extract_host(host)
cctxt = self.client.prepare(server=new_host, version='1.26')
cctxt = self._get_cctxt(host, '1.26')
cctxt.cast(ctxt, 'create_consistencygroup',
group=group)
def delete_consistencygroup(self, ctxt, group):
host = utils.extract_host(group.host)
cctxt = self.client.prepare(server=host, version='1.26')
cctxt = self._get_cctxt(group.host, '1.26')
cctxt.cast(ctxt, 'delete_consistencygroup',
group=group)
def update_consistencygroup(self, ctxt, group, add_volumes=None,
remove_volumes=None):
host = utils.extract_host(group.host)
cctxt = self.client.prepare(server=host, version='1.26')
cctxt = self._get_cctxt(group.host, '1.26')
cctxt.cast(ctxt, 'update_consistencygroup',
group=group,
add_volumes=add_volumes,
@ -114,21 +115,18 @@ class VolumeAPI(rpc.RPCAPI):
def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
source_cg=None):
new_host = utils.extract_host(group.host)
cctxt = self.client.prepare(server=new_host, version='1.31')
cctxt = self._get_cctxt(group.host, '1.31')
cctxt.cast(ctxt, 'create_consistencygroup_from_src',
group=group,
cgsnapshot=cgsnapshot,
source_cg=source_cg)
def create_cgsnapshot(self, ctxt, cgsnapshot):
host = utils.extract_host(cgsnapshot.consistencygroup.host)
cctxt = self.client.prepare(server=host, version='1.31')
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31')
cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot)
def delete_cgsnapshot(self, ctxt, cgsnapshot):
new_host = utils.extract_host(cgsnapshot.consistencygroup.host)
cctxt = self.client.prepare(server=new_host, version='1.31')
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31')
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
def create_volume(self, ctxt, volume, host, request_spec,
@ -143,8 +141,7 @@ class VolumeAPI(rpc.RPCAPI):
else:
version = '1.24'
new_host = utils.extract_host(host)
cctxt = self.client.prepare(server=new_host, version=version)
cctxt = self._get_cctxt(host, version)
request_spec_p = jsonutils.to_primitive(request_spec)
cctxt.cast(ctxt, 'create_volume', **msg_args)
@ -156,27 +153,23 @@ class VolumeAPI(rpc.RPCAPI):
else:
version = '1.15'
new_host = utils.extract_host(volume.host)
cctxt = self.client.prepare(server=new_host, version=version)
cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.20')
cctxt = self._get_cctxt(volume['host'], version='1.20')
cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'],
snapshot=snapshot)
def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
new_host = utils.extract_host(host)
cctxt = self.client.prepare(server=new_host, version='1.20')
cctxt = self._get_cctxt(host, version='1.20')
cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
unmanage_only=unmanage_only)
def attach_volume(self, ctxt, volume, instance_uuid, host_name,
mountpoint, mode):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.11')
cctxt = self._get_cctxt(volume['host'], '1.11')
return cctxt.call(ctxt, 'attach_volume',
volume_id=volume['id'],
instance_uuid=instance_uuid,
@ -185,33 +178,28 @@ class VolumeAPI(rpc.RPCAPI):
mode=mode)
def detach_volume(self, ctxt, volume, attachment_id):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.20')
cctxt = self._get_cctxt(volume['host'], '1.20')
return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'],
attachment_id=attachment_id)
def copy_volume_to_image(self, ctxt, volume, image_meta):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.3')
cctxt = self._get_cctxt(volume['host'], '1.3')
cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
image_meta=image_meta)
def initialize_connection(self, ctxt, volume, connector):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.0')
cctxt = self._get_cctxt(volume['host'], version='1.0')
return cctxt.call(ctxt, 'initialize_connection',
volume_id=volume['id'],
connector=connector)
def terminate_connection(self, ctxt, volume, connector, force=False):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.0')
cctxt = self._get_cctxt(volume['host'], version='1.0')
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
connector=connector, force=force)
def remove_export(self, ctxt, volume):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.30')
cctxt = self._get_cctxt(volume['host'], '1.30')
cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
def publish_service_capabilities(self, ctxt):
@ -219,13 +207,11 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'publish_service_capabilities')
def accept_transfer(self, ctxt, volume, new_user, new_project):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.9')
cctxt = self._get_cctxt(volume['host'], '1.9')
return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'],
new_user=new_user, new_project=new_project)
def extend_volume(self, ctxt, volume, new_size, reservations):
new_host = utils.extract_host(volume.host)
msg_args = {'volume_id': volume.id, 'new_size': new_size,
'reservations': reservations}
@ -235,11 +221,10 @@ class VolumeAPI(rpc.RPCAPI):
else:
version = '1.14'
cctxt = self.client.prepare(server=new_host, version=version)
cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'extend_volume', **msg_args)
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
new_host = utils.extract_host(volume.host)
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
@ -251,11 +236,10 @@ class VolumeAPI(rpc.RPCAPI):
else:
version = '1.8'
cctxt = self.client.prepare(server=new_host, version=version)
cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'migrate_volume', **msg_args)
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
new_host = utils.extract_host(volume.host)
msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
'error': error}
@ -266,7 +250,7 @@ class VolumeAPI(rpc.RPCAPI):
else:
version = '1.10'
cctxt = self.client.prepare(server=new_host, version=version)
cctxt = self._get_cctxt(volume.host, version)
return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
def retype(self, ctxt, volume, new_type_id, dest_host,
@ -287,29 +271,24 @@ class VolumeAPI(rpc.RPCAPI):
else:
version = '1.12'
new_host = utils.extract_host(volume.host)
cctxt = self.client.prepare(server=new_host, version=version)
cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, volume, ref):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.15')
cctxt = self._get_cctxt(volume['host'], '1.15')
cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
def promote_replica(self, ctxt, volume):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.17')
cctxt = self._get_cctxt(volume['host'], '1.17')
cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id'])
def reenable_replication(self, ctxt, volume):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.17')
cctxt = self._get_cctxt(volume['host'], '1.17')
cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id'])
def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status):
host = utils.extract_host(new_volume['host'])
cctxt = self.client.prepare(server=host, version='1.36')
cctxt = self._get_cctxt(new_volume['host'], '1.36')
cctxt.call(ctxt,
'update_migrated_volume',
volume=volume,
@ -317,13 +296,11 @@ class VolumeAPI(rpc.RPCAPI):
volume_status=original_volume_status)
def enable_replication(self, ctxt, volume):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.27')
cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt.cast(ctxt, 'enable_replication', volume=volume)
def disable_replication(self, ctxt, volume):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.27')
cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt.cast(ctxt, 'disable_replication',
volume=volume)
@ -331,24 +308,21 @@ class VolumeAPI(rpc.RPCAPI):
ctxt,
volume,
secondary=None):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.27')
cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt.cast(ctxt, 'failover_replication',
volume=volume,
secondary=secondary)
def list_replication_targets(self, ctxt, volume):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.27')
cctxt = self._get_cctxt(volume['host'], '1.27')
return cctxt.call(ctxt, 'list_replication_targets', volume=volume)
def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
cctxt = self.client.prepare(server=host, version='1.28')
cctxt = self._get_cctxt(host, '1.28')
cctxt.cast(ctxt, 'manage_existing_snapshot',
snapshot=snapshot,
ref=ref)
def get_capabilities(self, ctxt, host, discover):
new_host = utils.extract_host(host)
cctxt = self.client.prepare(server=new_host, version='1.29')
cctxt = self._get_cctxt(host, '1.29')
return cctxt.call(ctxt, 'get_capabilities', discover=discover)

View File

@ -623,6 +623,14 @@ def extract_host(host, level='backend', default_pool_name=False):
return None
def get_volume_rpc_host(host):
if CONF.rpc_backend and CONF.rpc_backend == "zmq":
# ZeroMQ RPC driver requires only the hostname.
# So, return just that.
return extract_host(host, 'host')
return extract_host(host)
def append_host(host, pool):
"""Encode pool into host info."""
if not host or not pool:

View File

@ -0,0 +1,3 @@
---
features:
- Added support for ZeroMQ messaging driver in cinder single backend config