Merge "Consolidate code that manages encryption keys"

This commit is contained in:
Zuul 2017-11-29 20:18:49 +00:00 committed by Gerrit Code Review
commit 6b5a225889
6 changed files with 80 additions and 34 deletions

View File

@ -26,6 +26,7 @@ import six
from cinder.db import base
from cinder import exception
from cinder.i18n import _
from cinder.volume import utils as vol_utils
service_opts = [
cfg.IntOpt('backup_metadata_version', default=2,
@ -97,10 +98,9 @@ class BackupMetadataAPI(base.Base):
continue
# Copy the encryption key UUID for backup
if key is 'encryption_key_id' and value is not None:
value = self._key_manager.store(
self.context,
self._key_manager.get(self.context, value)
)
value = vol_utils.clone_encryption_key(self.context,
self._key_manager,
value)
LOG.debug("Copying encryption key UUID for backup.")
container[type_tag][key] = value

View File

@ -463,7 +463,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
group_snapshot=None,
backup=None)
mock_is_encrypted.assert_called_once_with(self.ctxt, 1)
mock_is_encrypted.assert_called_with(self.ctxt, 1)
mock_get_volume_type_encryption.assert_called_once_with(self.ctxt, 1)
@mock.patch('cinder.volume.volume_types.is_encrypted')

View File

@ -20,6 +20,7 @@ import ddt
import time
import uuid
from castellan.common import exception as castellan_exception
from castellan import key_manager
import enum
import eventlet
@ -780,6 +781,40 @@ class VolumeTestCase(base.BaseVolumeTestCase):
self.assertEqual("error_deleting", volume.status)
volume.destroy()
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_delete_encrypted_volume_key_not_found(self):
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(self.context,
{'id': fake.VOLUME_TYPE_ID, 'name': 'LUKS'})
db.volume_type_encryption_create(
self.context, fake.VOLUME_TYPE_ID,
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(self.context, 'LUKS')
volume = self.volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
volume_id = volume['id']
volume['host'] = 'fake_host'
volume['status'] = 'available'
db.volume_update(self.context, volume_id, {'status': 'available'})
with mock.patch.object(
self.volume_api.key_manager,
'delete',
side_effect=castellan_exception.ManagedObjectNotFoundError):
self.volume_api.delete(self.context, volume)
volume = objects.Volume.get_by_id(self.context, volume_id)
self.assertEqual("deleting", volume.status)
volume.destroy()
def test_delete_busy_volume(self):
"""Test volume survives deletion if driver reports it as busy."""
volume = tests_utils.create_volume(self.context, **self.volume_params)

View File

@ -481,7 +481,9 @@ class API(base.Base):
encryption_key_id = volume.get('encryption_key_id', None)
if encryption_key_id is not None:
try:
self.key_manager.delete(context, encryption_key_id)
volume_utils.delete_encryption_key(context,
self.key_manager,
encryption_key_id)
except Exception as e:
volume.update({'status': 'error_deleting'})
volume.save()

View File

@ -11,7 +11,6 @@
# under the License.
from castellan.common import exception as castellan_exc
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import units
@ -391,30 +390,15 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
# Clone the existing key and associate a separate -- but
# identical -- key with each volume.
if encryption_key_id is not None:
encryption_key_id = key_manager.store(
context, key_manager.get(context, encryption_key_id))
encryption_key_id = vol_utils.clone_encryption_key(
context,
key_manager,
encryption_key_id)
else:
volume_type_encryption = (
volume_types.get_volume_type_encryption(context,
volume_type_id))
cipher = volume_type_encryption.cipher
length = volume_type_encryption.key_size
# NOTE(kaitlin-farr): dm-crypt expects the cipher in a
# hyphenated format (aes-xts-plain64). The algorithm needs
# to be parsed out to pass to the key manager (aes).
algorithm = cipher.split('-')[0] if cipher else None
try:
encryption_key_id = key_manager.create_key(
context,
algorithm=algorithm,
length=length)
except castellan_exc.KeyManagerError:
# The messaging back to the client here is
# purposefully terse, so we don't leak any sensitive
# details.
LOG.exception("Key manager error")
raise exception.Invalid(message="Key manager error")
encryption_key_id = vol_utils.create_encryption_key(
context,
key_manager,
volume_type_id)
return encryption_key_id

View File

@ -25,6 +25,7 @@ import re
import time
import uuid
from castellan.common import exception as castellan_exception
import eventlet
from eventlet import tpool
from oslo_concurrency import processutils
@ -892,13 +893,37 @@ def create_encryption_key(context, key_manager, volume_type_id):
cipher = volume_type_encryption.cipher
length = volume_type_encryption.key_size
algorithm = cipher.split('-')[0] if cipher else None
encryption_key_id = key_manager.create_key(
context,
algorithm=algorithm,
length=length)
try:
encryption_key_id = key_manager.create_key(
context,
algorithm=algorithm,
length=length)
except castellan_exception.KeyManagerError:
# The messaging back to the client here is
# purposefully terse, so we don't leak any sensitive
# details.
LOG.exception("Key manager error")
raise exception.Invalid(message="Key manager error")
return encryption_key_id
def delete_encryption_key(context, key_manager, encryption_key_id):
try:
key_manager.delete(context, encryption_key_id)
except castellan_exception.ManagedObjectNotFoundError:
pass
def clone_encryption_key(context, key_manager, encryption_key_id):
clone_key_id = None
if encryption_key_id is not None:
clone_key_id = key_manager.store(
context,
key_manager.get(context, encryption_key_id))
return clone_key_id
def is_replicated_str(str):
spec = (str or '').split()
return (len(spec) == 2 and