Abort volume creation when encryption spec is invalid

When the encryption spec is missing the cipher field,
just fail the request rather than sending an invalid
request to Castellan and letting the Barbican server
reject it.

This happens when an admin had previously created an
invalid encryption spec.

Similarly, check for a malformed cipher field in
image_utils decode_cipher, such as "aes" instead of
"aes-xts-plain64".  An invalid cipher field like this
would previously cause a failure during volume creation
in some volume drivers.

Partial-Bug: #1926630 (1/2)
Change-Id: Ia4b1c303a057fe70cf88bdbbebc1d4f62474f011
This commit is contained in:
Eric Harney 2021-05-04 14:06:55 +00:00
parent 2b7396e074
commit 0a1cc1a2d4
4 changed files with 54 additions and 7 deletions

View File

@ -965,7 +965,12 @@ def decode_cipher(cipher_spec, key_size):
documented under linux/Documentation/device-mapper/dm-crypt.txt in the
kernel source tree.
"""
cipher_alg, cipher_mode, ivgen_alg = cipher_spec.split('-')
try:
cipher_alg, cipher_mode, ivgen_alg = cipher_spec.split('-')
except ValueError:
raise exception.InvalidVolumeType(
reason="Invalid cipher field in encryption type")
cipher_alg = cipher_alg + '-' + str(key_size)
return {'cipher_alg': cipher_alg,

View File

@ -2198,3 +2198,9 @@ class TestImageUtils(test.TestCase):
'ivgen_alg': 'essiv'}
result = image_utils.decode_cipher('aes-xts-essiv', 256)
self.assertEqual(expected, result)
def test_decode_cipher_invalid(self):
self.assertRaises(exception.InvalidVolumeType,
image_utils.decode_cipher,
'aes',
256)

View File

@ -983,15 +983,15 @@ class VolumeUtilsTestCase(test.TestCase):
def test_create_encryption_key_encrypted(self, create_key,
get_volume_type_encryption,
is_encryption):
enc_key = {'cipher': 'aes-xts-plain64',
'key_size': 256,
'provider': 'p1',
'control_location': 'front-end',
'encryption_id': 'uuid1'}
enc_spec = {'cipher': 'aes-xts-plain64',
'key_size': 256,
'provider': 'p1',
'control_location': 'front-end',
'encryption_id': 'uuid1'}
ctxt = context.get_admin_context()
type_ref1 = volume_types.create(ctxt, "type1")
encryption = db.volume_type_encryption_create(
ctxt, type_ref1['id'], enc_key)
ctxt, type_ref1['id'], enc_spec)
get_volume_type_encryption.return_value = encryption
CONF.set_override(
'backend',
@ -1010,6 +1010,39 @@ class VolumeUtilsTestCase(test.TestCase):
algorithm='aes',
length=256)
@mock.patch('cinder.volume.volume_types.is_encrypted', return_value=True)
@mock.patch('cinder.volume.volume_types.get_volume_type_encryption')
@mock.patch('cinder.keymgr.conf_key_mgr.ConfKeyManager.create_key')
def test_create_encryption_key_invalid_spec(self, create_key,
get_volume_type_encryption,
is_encryption):
enc_spec = {'cipher': None,
'key_size': 256,
'provider': 'p1',
'control_location': 'front-end',
'encryption_id': 'uuid1'}
ctxt = context.get_admin_context()
type_ref1 = volume_types.create(ctxt, "type1")
encryption = db.volume_type_encryption_create(
ctxt, type_ref1['id'], enc_spec)
get_volume_type_encryption.return_value = encryption
CONF.set_override(
'backend',
'cinder.keymgr.conf_key_mgr.ConfKeyManager',
group='key_manager')
km = key_manager.API()
self.assertRaises(exception.Invalid,
volume_utils.create_encryption_key,
ctxt,
km,
fake.VOLUME_TYPE_ID)
is_encryption.assert_called_once_with(ctxt,
fake.VOLUME_TYPE_ID)
get_volume_type_encryption.assert_called_once_with(
ctxt,
fake.VOLUME_TYPE_ID)
create_key.assert_not_called()
@ddt.data('<is> True', '<is> true', '<is> yes')
def test_is_replicated_spec_true(self, enabled):
res = volume_utils.is_replicated_spec({'replication_enabled': enabled})

View File

@ -1010,6 +1010,9 @@ def create_encryption_key(context: context.RequestContext,
cipher = volume_type_encryption.cipher
length = volume_type_encryption.key_size
algorithm = cipher.split('-')[0] if cipher else None
if algorithm is None:
raise exception.InvalidVolumeType(
message="Invalid encryption spec")
try:
encryption_key_id = key_manager.create_key(
context,