Fix multipath iSCSI encrypted volume attach failure

This is to copy fix of bug 1439869 from Nova to os-brick.

Currently iSCSI volume attachment fails if iscsi_use_multipath is
set to True. This is because the encryptor requests cryptsetup
to create the symlink to the LUKS device with the same name of
the device-mapper multipath device. To avoid the name collision,
this patch adds the 'crypt-' prefix to the symlink.

Change-Id: Ia001204df9b14f635ab998590e8add119c1aec23
Closes-Bug: #1439869
This commit is contained in:
lisali 2016-07-06 13:48:20 +08:00
parent 8bf860729e
commit 9d2bb5e15d
2 changed files with 84 additions and 3 deletions

View File

@ -19,6 +19,8 @@ import os
from os_brick.encryptors import base
from os_brick import exception
from os_brick.i18n import _LW
from oslo_concurrency import processutils
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
@ -55,10 +57,46 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
self.symlink_path = connection_info['data']['device_path']
# a unique name for the volume -- e.g., the iSCSI participant name
self.dev_name = self.symlink_path.split('/')[-1]
self.dev_name = 'crypt-%s' % os.path.basename(self.symlink_path)
# NOTE(lixiaoy1): This is to import fix for 1439869 from Nova.
# NOTE(tsekiyama): In older version of nova, dev_name was the same
# as the symlink name. Now it has 'crypt-' prefix to avoid conflict
# with multipath device symlink. To enable rolling update, we use the
# old name when the encrypted volume already exists.
old_dev_name = os.path.basename(self.symlink_path)
wwn = data.get('multipath_id')
if self._is_crypt_device_available(old_dev_name):
self.dev_name = old_dev_name
LOG.debug("Using old encrypted volume name: %s", self.dev_name)
elif wwn and wwn != old_dev_name:
# FibreChannel device could be named '/dev/mapper/<WWN>'.
if self._is_crypt_device_available(wwn):
self.dev_name = wwn
LOG.debug("Using encrypted volume name from wwn: %s",
self.dev_name)
# the device's actual path on the compute host -- e.g., /dev/sd_
self.dev_path = os.path.realpath(self.symlink_path)
def _is_crypt_device_available(self, dev_name):
if not os.path.exists('/dev/mapper/%s' % dev_name):
return False
try:
self._execute('cryptsetup', 'status', dev_name, run_as_root=True)
except processutils.ProcessExecutionError as e:
# If /dev/mapper/<dev_name> is a non-crypt block device (such as a
# normal disk or multipath device), exit_code will be 1. In the
# case, we will omit the warning message.
if e.exit_code != 1:
LOG.warning(_LW('cryptsetup status %(dev_name) exited '
'abnormally (status %(exit_code)s): %(err)s'),
{"dev_name": dev_name, "exit_code": e.exit_code,
"err": e.stderr})
return False
return True
def _get_passphrase(self, key):
"""Convert raw key to string."""
return binascii.hexlify(key).decode('utf-8')

View File

@ -15,6 +15,7 @@
import binascii
import copy
import mock
import six
@ -33,7 +34,9 @@ def fake__get_key(context):
class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
def _create(self, root_helper, connection_info, keymgr, execute):
@mock.patch('os.path.exists', return_value=False)
def _create(self, mock_exists, root_helper,
connection_info, keymgr, execute):
return cryptsetup.CryptsetupEncryptor(root_helper=root_helper,
connection_info=connection_info,
keymgr=keymgr,
@ -43,7 +46,7 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
super(CryptsetupEncryptorTestCase, self).setUp()
self.dev_path = self.connection_info['data']['device_path']
self.dev_name = self.dev_path.split('/')[-1]
self.dev_name = 'crypt-%s' % self.dev_path.split('/')[-1]
self.symlink_path = self.dev_path
@ -109,3 +112,43 @@ class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
connection_info=connection_info,
keymgr=fake.fake_api())
self.assertIn(type, six.text_type(exc))
@mock.patch('os.path.exists', return_value=True)
def test_init_volume_encryption_with_old_name(self,
mock_exists):
# If an old name crypt device exists, dev_path should be the old name.
old_dev_name = self.dev_path.split('/')[-1]
encryptor = cryptsetup.CryptsetupEncryptor(
root_helper=self.root_helper,
connection_info=self.connection_info,
keymgr=fake.fake_api(),
execute=self.mock_execute)
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
self.assertEqual(old_dev_name, encryptor.dev_name)
self.assertEqual(self.dev_path, encryptor.dev_path)
self.assertEqual(self.symlink_path, encryptor.symlink_path)
mock_exists.assert_called_once_with('/dev/mapper/%s' % old_dev_name)
self.mock_execute.assert_called_once_with(
'cryptsetup', 'status', old_dev_name, run_as_root=True)
@mock.patch('os.path.exists', side_effect=[False, True])
def test_init_volume_encryption_with_wwn(self, mock_exists):
# If an wwn name crypt device exists, dev_path should be based on wwn.
old_dev_name = self.dev_path.split('/')[-1]
wwn = 'fake_wwn'
connection_info = copy.deepcopy(self.connection_info)
connection_info['data']['multipath_id'] = wwn
encryptor = cryptsetup.CryptsetupEncryptor(
root_helper=self.root_helper,
connection_info=connection_info,
keymgr=fake.fake_api(),
execute=self.mock_execute)
self.assertFalse(encryptor.dev_name.startswith('crypt-'))
self.assertEqual(wwn, encryptor.dev_name)
self.assertEqual(self.dev_path, encryptor.dev_path)
self.assertEqual(self.symlink_path, encryptor.symlink_path)
mock_exists.assert_has_calls([
mock.call('/dev/mapper/%s' % old_dev_name),
mock.call('/dev/mapper/%s' % wwn)])
self.mock_execute.assert_called_once_with(
'cryptsetup', 'status', wwn, run_as_root=True)