Sheepdog:make full use of all sheepdog nodes

The sheepdog driver of cinder should use a policy to make full
use of all sheepdog nodes of a cluster instead of single node.
It can solve single point of failure and hot spot problem.

This patch uses random method. When it needs to run a dog command
or provide a location path to qemu, it gets a random node from all
sheepdog nodes of a cluster.

Change-Id: I0280b71203d99829796244afbd9a8f308b7e910a
Closes-Bug: #1560807
This commit is contained in:
zhangsong 2016-03-23 14:39:56 +08:00
parent 8899193fba
commit bac0d4e550
2 changed files with 89 additions and 28 deletions

View File

@ -110,6 +110,10 @@ class SheepdogDriverTestDataGenerator(object):
return ('env', 'LC_ALL=C', 'LANG=C', 'dog', 'node', 'info',
'-a', SHEEP_ADDR, '-p', SHEEP_PORT, '-r')
def cmd_dog_node_list(self):
return ('env', 'LC_ALL=C', 'LANG=C', 'dog', 'node', 'list',
'-a', SHEEP_ADDR, '-p', SHEEP_PORT, '-r')
CMD_DOG_CLUSTER_INFO = ('env', 'LC_ALL=C', 'LANG=C', 'dog', 'cluster',
'info', '-a', SHEEP_ADDR, '-p', SHEEP_PORT)
@ -150,6 +154,10 @@ class SheepdogDriverTestDataGenerator(object):
COLLIE_NODE_INFO = """
0 107287605248 3623897354 3%
Total 107287605248 3623897354 3% 54760833024
"""
COLLIE_NODE_LIST = """
0 127.0.0.1:7000 128 1
"""
COLLIE_CLUSTER_INFO_0_5 = """\
@ -389,7 +397,8 @@ class SheepdogClientTestCase(test.TestCase):
self.driver.db = self.db
self.driver.do_setup(None)
self.test_data = SheepdogDriverTestDataGenerator()
self.client = self.driver.client
node_list = [SHEEP_ADDR]
self.client = sheepdog.SheepdogClient(node_list, SHEEP_PORT)
self._addr = SHEEP_ADDR
self._port = SHEEP_PORT
self._vdiname = self.test_data.TEST_VOLUME.name
@ -465,7 +474,6 @@ class SheepdogClientTestCase(test.TestCase):
def test_run_dog_unknown_error(self, fake_logger, fake_execute):
args = ('cluster', 'info')
cmd = self.test_data.CMD_DOG_CLUSTER_INFO
cmd = self.test_data.CMD_DOG_CLUSTER_INFO
exit_code = 1
stdout = 'stdout dummy'
stderr = 'stderr dummy'
@ -1054,6 +1062,32 @@ class SheepdogClientTestCase(test.TestCase):
self.assertTrue(fake_logger.error.called)
self.assertEqual(expected_msg, ex.msg)
@mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
def test_update_node_list_success(self, fake_execute):
expected_cmd = ('node', 'list', '-r')
fake_execute.return_value = (self.test_data.COLLIE_NODE_LIST, '')
self.client.update_node_list()
fake_execute.assert_called_once_with(*expected_cmd)
@mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
@mock.patch.object(sheepdog, 'LOG')
def test_update_node_list_unknown_error(self, fake_logger, fake_execute):
cmd = self.test_data.cmd_dog_node_list()
exit_code = 2
stdout = 'stdout_dummy'
stderr = 'stderr_dummy'
expected_msg = self.test_data.sheepdog_cmd_error(cmd=cmd,
exit_code=exit_code,
stdout=stdout,
stderr=stderr)
fake_execute.side_effect = exception.SheepdogCmdError(
cmd=cmd, exit_code=exit_code, stdout=stdout.replace('\n', '\\n'),
stderr=stderr.replace('\n', '\\n'))
ex = self.assertRaises(exception.SheepdogCmdError,
self.client.update_node_list)
self.assertTrue(fake_logger.error.called)
self.assertEqual(expected_msg, ex.msg)
class SheepdogDriverTestCase(test.TestCase):
def setUp(self):
@ -1077,10 +1111,12 @@ class SheepdogDriverTestCase(test.TestCase):
self._dst_vdiname = self.test_data.TEST_CLONED_VOLUME.name
self._dst_vdisize = self.test_data.TEST_CLONED_VOLUME.size
@mock.patch.object(sheepdog.SheepdogClient, 'update_node_list')
@mock.patch.object(sheepdog.SheepdogClient, 'check_cluster_status')
def test_check_for_setup_error(self, fake_execute):
def test_check_for_setup_error(self, fake_check, fake_update):
self.driver.check_for_setup_error()
fake_execute.assert_called_once_with()
fake_check.assert_called_once_with()
fake_update.assert_called_once_with()
@mock.patch.object(sheepdog.SheepdogClient, 'create')
def test_create_volume(self, fake_execute):

View File

@ -22,6 +22,7 @@ SheepDog Volume Driver.
import errno
import eventlet
import io
import random
import re
from oslo_concurrency import processutils
@ -55,6 +56,7 @@ CONF.register_opts(sheepdog_opts)
class SheepdogClient(object):
"""Sheepdog command executor."""
QEMU_SHEEPDOG_PREFIX = 'sheepdog:'
DOG_RESP_CONNECTION_ERROR = 'failed to connect to'
DOG_RESP_CLUSTER_RUNNING = 'Cluster status: running'
@ -76,13 +78,26 @@ class SheepdogClient(object):
QEMU_IMG_RESP_VDI_NOT_FOUND = 'No vdi found'
QEMU_IMG_RESP_SIZE_TOO_LARGE = 'An image is too large.'
def __init__(self, addr, port):
self.addr = addr
def __init__(self, node_list, port):
self.node_list = node_list
self.port = port
def get_addr(self):
"""Get a random node in sheepdog cluster."""
return self.node_list[random.randint(0, len(self.node_list) - 1)]
def local_path(self, volume):
"""Return a sheepdog location path."""
return "sheepdog:%(addr)s:%(port)s:%(name)s" % {
'addr': self.get_addr(),
'port': self.port,
'name': volume['name']}
def _run_dog(self, command, subcommand, *params):
"""Execute dog command wrapper."""
addr = self.get_addr()
cmd = ('env', 'LC_ALL=C', 'LANG=C', 'dog', command, subcommand,
'-a', self.addr, '-p', self.port) + params
'-a', addr, '-p', self.port) + params
try:
(_stdout, _stderr) = utils.execute(*cmd)
if _stderr.startswith(self.DOG_RESP_CONNECTION_ERROR):
@ -95,7 +110,7 @@ class SheepdogClient(object):
# by old Sheepdog users.
reason = (_('Failed to connect to sheep daemon. '
'addr: %(addr)s, port: %(port)s'),
{'addr': self.addr, 'port': self.port})
{'addr': addr, 'port': self.port})
raise exception.SheepdogError(reason=reason)
return (_stdout, _stderr)
except OSError as e:
@ -111,7 +126,7 @@ class SheepdogClient(object):
if _stderr.startswith(self.DOG_RESP_CONNECTION_ERROR):
reason = (_('Failed to connect to sheep daemon. '
'addr: %(addr)s, port: %(port)s'),
{'addr': self.addr, 'port': self.port})
{'addr': addr, 'port': self.port})
raise exception.SheepdogError(reason=reason)
raise exception.SheepdogCmdError(
cmd=e.cmd,
@ -120,7 +135,8 @@ class SheepdogClient(object):
stderr=e.stderr.replace('\n', '\\n'))
def _run_qemu_img(self, command, *params):
"""Executes qemu-img command wrapper"""
"""Executes qemu-img command wrapper."""
addr = self.get_addr()
cmd = ['env', 'LC_ALL=C', 'LANG=C', 'qemu-img', command]
for param in params:
if param.startswith(self.QEMU_SHEEPDOG_PREFIX):
@ -129,7 +145,7 @@ class SheepdogClient(object):
param = param.replace(self.QEMU_SHEEPDOG_PREFIX,
'%(prefix)s%(addr)s:%(port)s:' %
{'prefix': self.QEMU_SHEEPDOG_PREFIX,
'addr': self.addr, 'port': self.port},
'addr': addr, 'port': self.port},
1)
cmd.append(param)
try:
@ -147,7 +163,7 @@ class SheepdogClient(object):
if self.QEMU_IMG_RESP_CONNECTION_ERROR in _stderr:
reason = (_('Failed to connect to sheep daemon. '
'addr: %(addr)s, port: %(port)s'),
{'addr': self.addr, 'port': self.port})
{'addr': addr, 'port': self.port})
raise exception.SheepdogError(reason=reason)
raise exception.SheepdogCmdError(
cmd=e.cmd,
@ -296,6 +312,18 @@ class SheepdogClient(object):
LOG.error(_LE('Failed to get volume status. %s'), e)
return _stdout
def update_node_list(self):
try:
(_stdout, _stderr) = self._run_dog('node', 'list', '-r')
except exception.SheepdogCmdError as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Failed to get node list. %s'), e)
node_list = []
stdout = _stdout.strip('\n')
for line in stdout.split('\n'):
node_list.append(line.split()[1].split(':')[0])
self.node_list = node_list
class SheepdogIOWrapper(io.RawIOBase):
"""File-like object with Sheepdog backend."""
@ -403,18 +431,20 @@ class SheepdogDriver(driver.VolumeDriver):
def __init__(self, *args, **kwargs):
super(SheepdogDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(sheepdog_opts)
self.addr = self.configuration.sheepdog_store_address
addr = self.configuration.sheepdog_store_address
self.port = self.configuration.sheepdog_store_port
self.client = SheepdogClient(self.addr, self.port)
self.stats_pattern = re.compile(r'[\w\s%]*Total\s(\d+)\s(\d+)*')
self._stats = {}
self.node_list = [addr]
self.client = SheepdogClient(self.node_list, self.port)
def check_for_setup_error(self):
"""Check cluster status and update node list."""
self.client.check_cluster_status()
self.client.update_node_list()
def _is_cloneable(self, image_location, image_meta):
"""Check the image can be clone or not."""
if image_location is None:
return False
@ -459,12 +489,11 @@ class SheepdogDriver(driver.VolumeDriver):
self.create_cloned_volume(volume, volume_ref)
self.client.resize(volume.name, volume.size)
vol_path = self.local_path(volume)
vol_path = self.client.local_path(volume)
return {'provider_location': vol_path}, True
def create_cloned_volume(self, volume, src_vref):
"""Clone a sheepdog volume from another volume."""
snapshot_name = src_vref['name'] + '-temp-snapshot'
snapshot = {
'name': snapshot_name,
@ -508,7 +537,8 @@ class SheepdogDriver(driver.VolumeDriver):
# see volume/drivers/manager.py:_create_volume
self.client.delete(volume.name)
# convert and store into sheepdog
image_utils.convert_image(tmp, self.local_path(volume), 'raw')
image_utils.convert_image(tmp, self.client.local_path(volume),
'raw')
self.client.resize(volume.name, volume.size)
def copy_volume_to_image(self, context, volume, image_service, image_meta):
@ -523,7 +553,7 @@ class SheepdogDriver(driver.VolumeDriver):
'-f', 'raw',
'-t', 'none',
'-O', 'raw',
self.local_path(volume),
self.client.local_path(volume),
tmp)
self._try_execute(*cmd)
@ -538,12 +568,6 @@ class SheepdogDriver(driver.VolumeDriver):
"""Delete a sheepdog snapshot."""
self.client.delete_snapshot(snapshot.volume_name, snapshot.name)
def local_path(self, volume):
return "sheepdog:%(addr)s:%(port)s:%(name)s" % {
'addr': self.addr,
'port': self.port,
'name': volume['name']}
def ensure_export(self, context, volume):
"""Safely and synchronously recreate an export for a logical volume."""
pass
@ -561,7 +585,7 @@ class SheepdogDriver(driver.VolumeDriver):
'driver_volume_type': 'sheepdog',
'data': {
'name': volume['name'],
'hosts': [self.addr],
'hosts': [self.client.get_addr()],
'ports': ["%d" % self.port],
}
}
@ -631,7 +655,7 @@ class SheepdogDriver(driver.VolumeDriver):
raise exception.SheepdogError(reason=msg)
try:
sheepdog_fd = SheepdogIOWrapper(self.addr, self.port,
sheepdog_fd = SheepdogIOWrapper(self.client.get_addr(), self.port,
src_volume, temp_snapshot_name)
backup_service.backup(backup, sheepdog_fd)
finally:
@ -639,5 +663,6 @@ class SheepdogDriver(driver.VolumeDriver):
def restore_backup(self, context, backup, volume, backup_service):
"""Restore an existing backup to a new or existing volume."""
sheepdog_fd = SheepdogIOWrapper(self.addr, self.port, volume)
sheepdog_fd = SheepdogIOWrapper(self.client.get_addr(),
self.port, volume)
backup_service.restore(backup, volume['id'], sheepdog_fd)