Merge "Revert "Move check_encryption_provider to volume utils""
This commit is contained in:
commit
cdfa06c9ec
|
@ -1095,50 +1095,6 @@ class VolumeUtilsTestCase(test.TestCase):
|
|||
target_wwpns)
|
||||
self.assertEqual(ret, expected)
|
||||
|
||||
@ddt.data({'cipher': 'aes-xts-plain64',
|
||||
'provider': 'luks'},
|
||||
{'cipher': 'aes-xts-plain64',
|
||||
'provider': 'nova.volume.encryptors.luks.LuksEncryptor'})
|
||||
def test_check_encryption_provider(self, encryption_metadata):
|
||||
ctxt = context.get_admin_context()
|
||||
type_ref = volume_types.create(ctxt, "type1")
|
||||
encryption = db.volume_type_encryption_create(
|
||||
ctxt, type_ref['id'], encryption_metadata)
|
||||
with mock.patch(
|
||||
'cinder.db.sqlalchemy.api.volume_encryption_metadata_get',
|
||||
return_value=encryption):
|
||||
volume_data = {'id': fake.VOLUME_ID,
|
||||
'volume_type_id': type_ref['id']}
|
||||
ctxt = context.get_admin_context()
|
||||
volume = fake_volume.fake_volume_obj(ctxt, **volume_data)
|
||||
|
||||
ret = volume_utils.check_encryption_provider(
|
||||
db,
|
||||
volume,
|
||||
mock.sentinel.context)
|
||||
self.assertEqual('aes-xts-plain64', ret['cipher'])
|
||||
|
||||
def test_check_encryption_provider_invalid(self):
|
||||
encryption_metadata = {'cipher': 'aes-xts-plain64',
|
||||
'provider': 'invalid'}
|
||||
ctxt = context.get_admin_context()
|
||||
type_ref = volume_types.create(ctxt, "type1")
|
||||
encryption = db.volume_type_encryption_create(
|
||||
ctxt, type_ref['id'], encryption_metadata)
|
||||
with mock.patch(
|
||||
'cinder.db.sqlalchemy.api.volume_encryption_metadata_get',
|
||||
return_value=encryption):
|
||||
volume_data = {'id': fake.VOLUME_ID,
|
||||
'volume_type_id': type_ref['id']}
|
||||
ctxt = context.get_admin_context()
|
||||
volume = fake_volume.fake_volume_obj(ctxt, **volume_data)
|
||||
|
||||
self.assertRaises(exception.VolumeDriverException,
|
||||
volume_utils.check_encryption_provider,
|
||||
db,
|
||||
volume,
|
||||
mock.sentinel.context)
|
||||
|
||||
def test_check_image_metadata(self):
|
||||
image_meta = {'id': 1, 'min_disk': 3, 'status': 'active',
|
||||
'size': 1 * units.Gi}
|
||||
|
|
|
@ -2223,7 +2223,8 @@ class RBDTestCase(test.TestCase):
|
|||
self.assertEqual((True, None), ret)
|
||||
|
||||
@mock.patch('tempfile.NamedTemporaryFile')
|
||||
@mock.patch('cinder.volume.utils.check_encryption_provider',
|
||||
@mock.patch('cinder.volume.drivers.rbd.RBDDriver.'
|
||||
'_check_encryption_provider',
|
||||
return_value={'encryption_key_id': fake.ENCRYPTION_KEY_ID})
|
||||
def test_create_encrypted_volume(self,
|
||||
mock_check_enc_prov,
|
||||
|
@ -2247,8 +2248,8 @@ class RBDTestCase(test.TestCase):
|
|||
'cipher': 'aes-xts-essiv',
|
||||
'key_size': 256}
|
||||
|
||||
with mock.patch('cinder.volume.utils.'
|
||||
'check_encryption_provider', return_value=enc_info), \
|
||||
with mock.patch('cinder.volume.drivers.rbd.RBDDriver.'
|
||||
'_check_encryption_provider', return_value=enc_info), \
|
||||
mock.patch('cinder.volume.drivers.rbd.open') as mock_open, \
|
||||
mock.patch.object(self.driver, '_execute') as mock_exec:
|
||||
self.driver._create_encrypted_volume(self.volume_c,
|
||||
|
|
|
@ -22,6 +22,7 @@ import tempfile
|
|||
|
||||
from castellan import key_manager
|
||||
from eventlet import tpool
|
||||
from os_brick import encryptors
|
||||
from os_brick.initiator import linuxrbd
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
@ -711,6 +712,27 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
|||
return {'replication_status': fields.ReplicationStatus.DISABLED}
|
||||
return None
|
||||
|
||||
def _check_encryption_provider(self, volume, context):
|
||||
"""Check that this is a LUKS encryption provider.
|
||||
|
||||
:returns: encryption dict
|
||||
"""
|
||||
|
||||
encryption = self.db.volume_encryption_metadata_get(context, volume.id)
|
||||
provider = encryption['provider']
|
||||
if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP:
|
||||
provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider]
|
||||
if provider != encryptors.LUKS:
|
||||
message = _("Provider %s not supported.") % provider
|
||||
raise exception.VolumeDriverException(message=message)
|
||||
|
||||
if 'cipher' not in encryption or 'key_size' not in encryption:
|
||||
msg = _('encryption spec must contain "cipher" and'
|
||||
'"key_size"')
|
||||
raise exception.VolumeDriverException(message=msg)
|
||||
|
||||
return encryption
|
||||
|
||||
def _create_encrypted_volume(self, volume, context):
|
||||
"""Create an encrypted volume.
|
||||
|
||||
|
@ -718,7 +740,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
|||
and then uploading it to the volume.
|
||||
"""
|
||||
|
||||
encryption = volume_utils.check_encryption_provider(volume, context)
|
||||
encryption = self._check_encryption_provider(volume, context)
|
||||
|
||||
# Fetch the key associated with the volume and decode the passphrase
|
||||
keymgr = key_manager.API(CONF)
|
||||
|
|
|
@ -31,7 +31,6 @@ from castellan import key_manager as castellan_key_manager
|
|||
import eventlet
|
||||
from eventlet import tpool
|
||||
from keystoneauth1 import loading as ks_loading
|
||||
from os_brick import encryptors
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
@ -1053,28 +1052,6 @@ def make_initiator_target_all2all_map(initiator_wwpns, target_wwpns):
|
|||
return i_t_map
|
||||
|
||||
|
||||
def check_encryption_provider(db, volume, context):
|
||||
"""Check that this is a LUKS encryption provider.
|
||||
|
||||
:returns: encryption dict
|
||||
"""
|
||||
|
||||
encryption = db.volume_encryption_metadata_get(context, volume.id)
|
||||
provider = encryption['provider']
|
||||
if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP:
|
||||
provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider]
|
||||
if provider != encryptors.LUKS:
|
||||
message = _("Provider %s not supported.") % provider
|
||||
raise exception.VolumeDriverException(message=message)
|
||||
|
||||
if 'cipher' not in encryption or 'key_size' not in encryption:
|
||||
msg = _('encryption spec must contain "cipher" and '
|
||||
'"key_size"')
|
||||
raise exception.VolumeDriverException(message=msg)
|
||||
|
||||
return encryption
|
||||
|
||||
|
||||
def check_image_metadata(image_meta, vol_size):
|
||||
"""Validates the image metadata."""
|
||||
# Check whether image is active
|
||||
|
|
Loading…
Reference in New Issue