Merge "Move check_encryption_provider to volume utils"

This commit is contained in:
Zuul 2018-10-04 15:44:34 +00:00 committed by Gerrit Code Review
commit 860b362862
4 changed files with 71 additions and 27 deletions

View File

@ -1093,3 +1093,47 @@ class VolumeUtilsTestCase(test.TestCase):
ret = volume_utils.make_initiator_target_all2all_map(initiator_wwpns,
target_wwpns)
self.assertEqual(ret, expected)
@ddt.data({'cipher': 'aes-xts-plain64',
'provider': 'luks'},
{'cipher': 'aes-xts-plain64',
'provider': 'nova.volume.encryptors.luks.LuksEncryptor'})
def test_check_encryption_provider(self, encryption_metadata):
ctxt = context.get_admin_context()
type_ref = volume_types.create(ctxt, "type1")
encryption = db.volume_type_encryption_create(
ctxt, type_ref['id'], encryption_metadata)
with mock.patch(
'cinder.db.sqlalchemy.api.volume_encryption_metadata_get',
return_value=encryption):
volume_data = {'id': fake.VOLUME_ID,
'volume_type_id': type_ref['id']}
ctxt = context.get_admin_context()
volume = fake_volume.fake_volume_obj(ctxt, **volume_data)
ret = volume_utils.check_encryption_provider(
db,
volume,
mock.sentinel.context)
self.assertEqual('aes-xts-plain64', ret['cipher'])
def test_check_encryption_provider_invalid(self):
encryption_metadata = {'cipher': 'aes-xts-plain64',
'provider': 'invalid'}
ctxt = context.get_admin_context()
type_ref = volume_types.create(ctxt, "type1")
encryption = db.volume_type_encryption_create(
ctxt, type_ref['id'], encryption_metadata)
with mock.patch(
'cinder.db.sqlalchemy.api.volume_encryption_metadata_get',
return_value=encryption):
volume_data = {'id': fake.VOLUME_ID,
'volume_type_id': type_ref['id']}
ctxt = context.get_admin_context()
volume = fake_volume.fake_volume_obj(ctxt, **volume_data)
self.assertRaises(exception.VolumeDriverException,
volume_utils.check_encryption_provider,
db,
volume,
mock.sentinel.context)

View File

@ -2212,8 +2212,7 @@ class RBDTestCase(test.TestCase):
self.assertEqual((True, None), ret)
@mock.patch('tempfile.NamedTemporaryFile')
@mock.patch('cinder.volume.drivers.rbd.RBDDriver.'
'_check_encryption_provider',
@mock.patch('cinder.volume.utils.check_encryption_provider',
return_value={'encryption_key_id': fake.ENCRYPTION_KEY_ID})
def test_create_encrypted_volume(self,
mock_check_enc_prov,
@ -2237,8 +2236,8 @@ class RBDTestCase(test.TestCase):
'cipher': 'aes-xts-essiv',
'key_size': 256}
with mock.patch('cinder.volume.drivers.rbd.RBDDriver.'
'_check_encryption_provider', return_value=enc_info), \
with mock.patch('cinder.volume.utils.'
'check_encryption_provider', return_value=enc_info), \
mock.patch('cinder.volume.drivers.rbd.open') as mock_open, \
mock.patch.object(self.driver, '_execute') as mock_exec:
self.driver._create_encrypted_volume(self.volume_c,

View File

@ -22,7 +22,6 @@ import tempfile
from castellan import key_manager
from eventlet import tpool
from os_brick import encryptors
from os_brick.initiator import linuxrbd
from oslo_config import cfg
from oslo_log import log as logging
@ -712,27 +711,6 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return {'replication_status': fields.ReplicationStatus.DISABLED}
return None
def _check_encryption_provider(self, volume, context):
"""Check that this is a LUKS encryption provider.
:returns: encryption dict
"""
encryption = self.db.volume_encryption_metadata_get(context, volume.id)
provider = encryption['provider']
if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP:
provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider]
if provider != encryptors.LUKS:
message = _("Provider %s not supported.") % provider
raise exception.VolumeDriverException(message=message)
if 'cipher' not in encryption or 'key_size' not in encryption:
msg = _('encryption spec must contain "cipher" and'
'"key_size"')
raise exception.VolumeDriverException(message=msg)
return encryption
def _create_encrypted_volume(self, volume, context):
"""Create an encrypted volume.
@ -740,7 +718,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
and then uploading it to the volume.
"""
encryption = self._check_encryption_provider(volume, context)
encryption = volume_utils.check_encryption_provider(volume, context)
# Fetch the key associated with the volume and decode the passphrase
keymgr = key_manager.API(CONF)

View File

@ -31,6 +31,7 @@ from castellan import key_manager as castellan_key_manager
import eventlet
from eventlet import tpool
from keystoneauth1 import loading as ks_loading
from os_brick import encryptors
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
@ -1033,3 +1034,25 @@ def make_initiator_target_all2all_map(initiator_wwpns, target_wwpns):
i_t_map[i_wwpn].append(t_wwpn)
return i_t_map
def check_encryption_provider(db, volume, context):
"""Check that this is a LUKS encryption provider.
:returns: encryption dict
"""
encryption = db.volume_encryption_metadata_get(context, volume.id)
provider = encryption['provider']
if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP:
provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider]
if provider != encryptors.LUKS:
message = _("Provider %s not supported.") % provider
raise exception.VolumeDriverException(message=message)
if 'cipher' not in encryption or 'key_size' not in encryption:
msg = _('encryption spec must contain "cipher" and '
'"key_size"')
raise exception.VolumeDriverException(message=msg)
return encryption