From f02232599a05d8ca5bc788cf00d8e0c9d73db7e9 Mon Sep 17 00:00:00 2001 From: Eric Harney Date: Wed, 29 Aug 2018 15:20:07 -0400 Subject: [PATCH] Move check_encryption_provider to volume utils Change-Id: Ia6c29848e5da12db782c0b91b64269ea461fcd6d --- cinder/tests/unit/test_volume_utils.py | 44 ++++++++++++++++++++ cinder/tests/unit/volume/drivers/test_rbd.py | 7 ++-- cinder/volume/drivers/rbd.py | 24 +---------- cinder/volume/utils.py | 23 ++++++++++ 4 files changed, 71 insertions(+), 27 deletions(-) diff --git a/cinder/tests/unit/test_volume_utils.py b/cinder/tests/unit/test_volume_utils.py index 2015e2bbd7e..f44c8a40f22 100644 --- a/cinder/tests/unit/test_volume_utils.py +++ b/cinder/tests/unit/test_volume_utils.py @@ -1093,3 +1093,47 @@ class VolumeUtilsTestCase(test.TestCase): ret = volume_utils.make_initiator_target_all2all_map(initiator_wwpns, target_wwpns) self.assertEqual(ret, expected) + + @ddt.data({'cipher': 'aes-xts-plain64', + 'provider': 'luks'}, + {'cipher': 'aes-xts-plain64', + 'provider': 'nova.volume.encryptors.luks.LuksEncryptor'}) + def test_check_encryption_provider(self, encryption_metadata): + ctxt = context.get_admin_context() + type_ref = volume_types.create(ctxt, "type1") + encryption = db.volume_type_encryption_create( + ctxt, type_ref['id'], encryption_metadata) + with mock.patch( + 'cinder.db.sqlalchemy.api.volume_encryption_metadata_get', + return_value=encryption): + volume_data = {'id': fake.VOLUME_ID, + 'volume_type_id': type_ref['id']} + ctxt = context.get_admin_context() + volume = fake_volume.fake_volume_obj(ctxt, **volume_data) + + ret = volume_utils.check_encryption_provider( + db, + volume, + mock.sentinel.context) + self.assertEqual('aes-xts-plain64', ret['cipher']) + + def test_check_encryption_provider_invalid(self): + encryption_metadata = {'cipher': 'aes-xts-plain64', + 'provider': 'invalid'} + ctxt = context.get_admin_context() + type_ref = volume_types.create(ctxt, "type1") + encryption = db.volume_type_encryption_create( + ctxt, type_ref['id'], encryption_metadata) + with mock.patch( + 'cinder.db.sqlalchemy.api.volume_encryption_metadata_get', + return_value=encryption): + volume_data = {'id': fake.VOLUME_ID, + 'volume_type_id': type_ref['id']} + ctxt = context.get_admin_context() + volume = fake_volume.fake_volume_obj(ctxt, **volume_data) + + self.assertRaises(exception.VolumeDriverException, + volume_utils.check_encryption_provider, + db, + volume, + mock.sentinel.context) diff --git a/cinder/tests/unit/volume/drivers/test_rbd.py b/cinder/tests/unit/volume/drivers/test_rbd.py index 83399c4954f..7de296becf5 100644 --- a/cinder/tests/unit/volume/drivers/test_rbd.py +++ b/cinder/tests/unit/volume/drivers/test_rbd.py @@ -2192,8 +2192,7 @@ class RBDTestCase(test.TestCase): self.assertEqual((True, None), ret) @mock.patch('tempfile.NamedTemporaryFile') - @mock.patch('cinder.volume.drivers.rbd.RBDDriver.' - '_check_encryption_provider', + @mock.patch('cinder.volume.utils.check_encryption_provider', return_value={'encryption_key_id': fake.ENCRYPTION_KEY_ID}) def test_create_encrypted_volume(self, mock_check_enc_prov, @@ -2217,8 +2216,8 @@ class RBDTestCase(test.TestCase): 'cipher': 'aes-xts-essiv', 'key_size': 256} - with mock.patch('cinder.volume.drivers.rbd.RBDDriver.' - '_check_encryption_provider', return_value=enc_info), \ + with mock.patch('cinder.volume.utils.' + 'check_encryption_provider', return_value=enc_info), \ mock.patch('cinder.volume.drivers.rbd.open') as mock_open, \ mock.patch.object(self.driver, '_execute') as mock_exec: self.driver._create_encrypted_volume(self.volume_c, diff --git a/cinder/volume/drivers/rbd.py b/cinder/volume/drivers/rbd.py index 140f5de1a4e..869f141eab8 100644 --- a/cinder/volume/drivers/rbd.py +++ b/cinder/volume/drivers/rbd.py @@ -22,7 +22,6 @@ import tempfile from castellan import key_manager from eventlet import tpool -from os_brick import encryptors from os_brick.initiator import linuxrbd from oslo_config import cfg from oslo_log import log as logging @@ -712,27 +711,6 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return {'replication_status': fields.ReplicationStatus.DISABLED} return None - def _check_encryption_provider(self, volume, context): - """Check that this is a LUKS encryption provider. - - :returns: encryption dict - """ - - encryption = self.db.volume_encryption_metadata_get(context, volume.id) - provider = encryption['provider'] - if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP: - provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider] - if provider != encryptors.LUKS: - message = _("Provider %s not supported.") % provider - raise exception.VolumeDriverException(message=message) - - if 'cipher' not in encryption or 'key_size' not in encryption: - msg = _('encryption spec must contain "cipher" and' - '"key_size"') - raise exception.VolumeDriverException(message=msg) - - return encryption - def _create_encrypted_volume(self, volume, context): """Create an encrypted volume. @@ -740,7 +718,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, and then uploading it to the volume. """ - encryption = self._check_encryption_provider(volume, context) + encryption = volume_utils.check_encryption_provider(volume, context) # Fetch the key associated with the volume and decode the passphrase keymgr = key_manager.API(CONF) diff --git a/cinder/volume/utils.py b/cinder/volume/utils.py index db5aef628bf..e448b9f27e3 100644 --- a/cinder/volume/utils.py +++ b/cinder/volume/utils.py @@ -31,6 +31,7 @@ from castellan import key_manager as castellan_key_manager import eventlet from eventlet import tpool from keystoneauth1 import loading as ks_loading +from os_brick import encryptors from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log as logging @@ -1033,3 +1034,25 @@ def make_initiator_target_all2all_map(initiator_wwpns, target_wwpns): i_t_map[i_wwpn].append(t_wwpn) return i_t_map + + +def check_encryption_provider(db, volume, context): + """Check that this is a LUKS encryption provider. + + :returns: encryption dict + """ + + encryption = db.volume_encryption_metadata_get(context, volume.id) + provider = encryption['provider'] + if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP: + provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider] + if provider != encryptors.LUKS: + message = _("Provider %s not supported.") % provider + raise exception.VolumeDriverException(message=message) + + if 'cipher' not in encryption or 'key_size' not in encryption: + msg = _('encryption spec must contain "cipher" and ' + '"key_size"') + raise exception.VolumeDriverException(message=msg) + + return encryption