Switching how we handle sessions in p11_crypto
Previously, we were trying to use pre-existing sessions and semaphores to deal with our concurrency issues. However, this led to HSM memory exhaustion due to the sessions never closing. To mitigate this, I'm switching to a model where we're opening and closing sessions as we need them. By extension, this removes our need for semaphores. We can explore ways of making this more efficient in the future, but this change should address the outstanding issue. Fixes Bug: 1436529 Change-Id: I830f3f770a93d5d82650ec93bb8fa89e3cee5a73
This commit is contained in:
parent
46d8888729
commit
34e3c16992
|
@ -16,7 +16,6 @@ import textwrap
|
|||
|
||||
import cffi
|
||||
from cryptography.hazmat.primitives import padding
|
||||
from eventlet import semaphore
|
||||
from oslo_config import cfg
|
||||
|
||||
from barbican.common import exception
|
||||
|
@ -250,9 +249,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
"""
|
||||
|
||||
def __init__(self, conf=cfg.CONF, ffi=None):
|
||||
self.enc_sem = semaphore.Semaphore()
|
||||
self.dec_sem = semaphore.Semaphore()
|
||||
self.verify_sem = semaphore.Semaphore()
|
||||
self.conf = conf
|
||||
self.block_size = 16 # in bytes
|
||||
# TODO(reaperhulk): abstract this so alternate algorithms/vendors
|
||||
# are possible.
|
||||
|
@ -264,12 +261,9 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
|
||||
self._check_error(self.lib.C_Initialize(self.ffi.NULL))
|
||||
|
||||
self.session = self._open_session(1)
|
||||
self.rw_session = self._open_session(1)
|
||||
self._login(conf.p11_crypto_plugin.login, self.session)
|
||||
self._login(conf.p11_crypto_plugin.login, self.rw_session)
|
||||
|
||||
self._perform_rng_self_test()
|
||||
# Open session to perform self-test and get/generate mkek and hmac
|
||||
session = self._create_working_session()
|
||||
self._perform_rng_self_test(session)
|
||||
|
||||
self.current_mkek_label = conf.p11_crypto_plugin.mkek_label
|
||||
self.current_hmac_label = conf.p11_crypto_plugin.hmac_label
|
||||
|
@ -279,12 +273,16 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
# cache current MKEK handle in the dictionary
|
||||
self._get_or_generate_mkek(
|
||||
self.current_mkek_label,
|
||||
conf.p11_crypto_plugin.mkek_length
|
||||
conf.p11_crypto_plugin.mkek_length,
|
||||
session
|
||||
)
|
||||
self._get_or_generate_hmac_key(self.current_hmac_label)
|
||||
self._get_or_generate_hmac_key(self.current_hmac_label, session)
|
||||
|
||||
def _perform_rng_self_test(self):
|
||||
test_random = self._generate_random(100)
|
||||
# Clean up the active session
|
||||
self._close_session(session)
|
||||
|
||||
def _perform_rng_self_test(self, session):
|
||||
test_random = self._generate_random(100, session)
|
||||
if self.ffi.buffer(test_random, 100)[:] == b"\x00" * 100:
|
||||
raise P11CryptoPluginException("Apparent RNG self-test failure.")
|
||||
|
||||
|
@ -301,6 +299,10 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
session = session_ptr[0]
|
||||
return session
|
||||
|
||||
def _close_session(self, session):
|
||||
rv = self.lib.C_CloseSession(session)
|
||||
self._check_error(rv)
|
||||
|
||||
def _login(self, password, session):
|
||||
rv = self.lib.C_Login(
|
||||
session,
|
||||
|
@ -310,6 +312,12 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
)
|
||||
self._check_error(rv)
|
||||
|
||||
def _create_working_session(self):
|
||||
"""Automatically opens a session and performs a login."""
|
||||
session = self._open_session(1)
|
||||
self._login(self.conf.p11_crypto_plugin.login, session)
|
||||
return session
|
||||
|
||||
def _check_error(self, value):
|
||||
if value != CKR_OK:
|
||||
raise P11CryptoPluginException(
|
||||
|
@ -342,8 +350,8 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
|
||||
return CKAttributes(attributes, val_list)
|
||||
|
||||
def _get_or_generate_mkek(self, mkek_label, mkek_length):
|
||||
mkek = self._get_key_handle(mkek_label)
|
||||
def _get_or_generate_mkek(self, mkek_label, mkek_length, session):
|
||||
mkek = self._get_key_handle(mkek_label, session)
|
||||
if not mkek:
|
||||
# Generate a key that is persistent and not extractable
|
||||
ck_attributes = self._build_attributes([
|
||||
|
@ -362,14 +370,14 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
Attribute(CKA_UNWRAP, True),
|
||||
Attribute(CKA_EXTRACTABLE, False)
|
||||
])
|
||||
mkek = self._generate_kek(ck_attributes.template)
|
||||
mkek = self._generate_kek(ck_attributes.template, session)
|
||||
|
||||
self.key_handles[mkek_label] = mkek
|
||||
|
||||
return mkek
|
||||
|
||||
def _get_or_generate_hmac_key(self, hmac_label):
|
||||
hmac_key = self._get_key_handle(hmac_label)
|
||||
def _get_or_generate_hmac_key(self, hmac_label, session):
|
||||
hmac_key = self._get_key_handle(hmac_label, session)
|
||||
if not hmac_key:
|
||||
# Generate a key that is persistent and not extractable
|
||||
ck_attributes = self._build_attributes([
|
||||
|
@ -384,13 +392,13 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
Attribute(CKA_TOKEN, True),
|
||||
Attribute(CKA_EXTRACTABLE, False)
|
||||
])
|
||||
hmac_key = self._generate_kek(ck_attributes.template)
|
||||
hmac_key = self._generate_kek(ck_attributes.template, session)
|
||||
|
||||
self.key_handles[hmac_label] = hmac_key
|
||||
|
||||
return hmac_key
|
||||
|
||||
def _get_key_handle(self, mkek_label):
|
||||
def _get_key_handle(self, mkek_label, session):
|
||||
if mkek_label in self.key_handles:
|
||||
return self.key_handles[mkek_label]
|
||||
|
||||
|
@ -400,19 +408,19 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
Attribute(CKA_LABEL, mkek_label)
|
||||
])
|
||||
rv = self.lib.C_FindObjectsInit(
|
||||
self.session, ck_attributes.template, len(ck_attributes.template)
|
||||
session, ck_attributes.template, len(ck_attributes.template)
|
||||
)
|
||||
self._check_error(rv)
|
||||
|
||||
returned_count = self.ffi.new("CK_ULONG *")
|
||||
object_handle_ptr = self.ffi.new("CK_OBJECT_HANDLE *")
|
||||
rv = self.lib.C_FindObjects(
|
||||
self.session, object_handle_ptr, 2, returned_count
|
||||
session, object_handle_ptr, 2, returned_count
|
||||
)
|
||||
self._check_error(rv)
|
||||
if returned_count[0] == 1:
|
||||
key = object_handle_ptr[0]
|
||||
rv = self.lib.C_FindObjectsFinal(self.session)
|
||||
rv = self.lib.C_FindObjectsFinal(session)
|
||||
self._check_error(rv)
|
||||
if returned_count[0] == 1:
|
||||
return key
|
||||
|
@ -421,9 +429,9 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
else:
|
||||
raise P11CryptoPluginKeyException()
|
||||
|
||||
def _generate_random(self, length):
|
||||
def _generate_random(self, length, session):
|
||||
buf = self.ffi.new("CK_BYTE[{0}]".format(length))
|
||||
rv = self.lib.C_GenerateRandom(self.session, buf, length)
|
||||
rv = self.lib.C_GenerateRandom(session, buf, length)
|
||||
self._check_error(rv)
|
||||
return buf
|
||||
|
||||
|
@ -439,7 +447,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
mech.parameter_len = 48 # sizeof(CK_AES_GCM_PARAMS)
|
||||
return CKMechanism(mech, gcm)
|
||||
|
||||
def _generate_kek(self, template):
|
||||
def _generate_kek(self, template, session):
|
||||
"""Generates both master and project KEKs
|
||||
|
||||
:param template: A tuple of tuples in (CKA_TYPE, VALUE) form
|
||||
|
@ -448,13 +456,13 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
mech.mechanism = CKM_AES_KEY_GEN
|
||||
object_handle_ptr = self.ffi.new("CK_OBJECT_HANDLE *")
|
||||
rv = self.lib.C_GenerateKey(
|
||||
self.rw_session, mech, template, len(template), object_handle_ptr
|
||||
session, mech, template, len(template), object_handle_ptr
|
||||
)
|
||||
|
||||
self._check_error(rv)
|
||||
return object_handle_ptr[0]
|
||||
|
||||
def _generate_wrapped_kek(self, kek_label, key_length):
|
||||
def _generate_wrapped_kek(self, kek_label, key_length, session):
|
||||
# generate a non-persistent key that is extractable
|
||||
ck_attributes = self._build_attributes([
|
||||
Attribute(CKA_CLASS, CKO_SECRET_KEY),
|
||||
|
@ -470,10 +478,10 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
Attribute(CKA_UNWRAP, True),
|
||||
Attribute(CKA_EXTRACTABLE, True)
|
||||
])
|
||||
kek = self._generate_kek(ck_attributes.template)
|
||||
kek = self._generate_kek(ck_attributes.template, session)
|
||||
mech = self.ffi.new("CK_MECHANISM *")
|
||||
mech.mechanism = CKM_AES_CBC_PAD
|
||||
iv = self._generate_random(16)
|
||||
iv = self._generate_random(16, session)
|
||||
mech.parameter = iv
|
||||
mech.parameter_len = 16
|
||||
mkek = self.key_handles[self.current_mkek_label]
|
||||
|
@ -484,10 +492,10 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
|
||||
buf = self.ffi.new("CK_BYTE[{0}]".format(padded_length))
|
||||
buf_len = self.ffi.new("CK_ULONG *", padded_length)
|
||||
rv = self.lib.C_WrapKey(self.rw_session, mech, mkek, kek, buf, buf_len)
|
||||
rv = self.lib.C_WrapKey(session, mech, mkek, kek, buf, buf_len)
|
||||
self._check_error(rv)
|
||||
wrapped_key = self.ffi.buffer(buf, buf_len[0])[:]
|
||||
hmac = self._compute_hmac(wrapped_key)
|
||||
hmac = self._compute_hmac(wrapped_key, session)
|
||||
return {
|
||||
'iv': base64.b64encode(self.ffi.buffer(iv)[:]),
|
||||
'wrapped_key': base64.b64encode(wrapped_key),
|
||||
|
@ -496,36 +504,34 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
'hmac_label': self.current_hmac_label
|
||||
}
|
||||
|
||||
def _compute_hmac(self, wrapped_key):
|
||||
def _compute_hmac(self, wrapped_key, session):
|
||||
mech = self.ffi.new("CK_MECHANISM *")
|
||||
mech.mechanism = CKM_SHA256_HMAC
|
||||
hmac_key = self.key_handles[self.current_hmac_label]
|
||||
rv = self.lib.C_SignInit(self.rw_session, mech, hmac_key)
|
||||
rv = self.lib.C_SignInit(session, mech, hmac_key)
|
||||
self._check_error(rv)
|
||||
|
||||
ck_bytes = self.ffi.new("CK_BYTE[]", wrapped_key)
|
||||
buf = self.ffi.new("CK_BYTE[32]")
|
||||
buf_len = self.ffi.new("CK_ULONG *", 32)
|
||||
rv = self.lib.C_Sign(
|
||||
self.rw_session, ck_bytes, len(wrapped_key), buf, buf_len
|
||||
)
|
||||
rv = self.lib.C_Sign(session, ck_bytes, len(wrapped_key), buf, buf_len)
|
||||
self._check_error(rv)
|
||||
return self.ffi.buffer(buf, buf_len[0])[:]
|
||||
|
||||
def _verify_hmac(self, hmac_key, sig, wrapped_key):
|
||||
def _verify_hmac(self, hmac_key, sig, wrapped_key, session):
|
||||
mech = self.ffi.new("CK_MECHANISM *")
|
||||
mech.mechanism = CKM_SHA256_HMAC
|
||||
with self.verify_sem:
|
||||
rv = self.lib.C_VerifyInit(self.rw_session, mech, hmac_key)
|
||||
self._check_error(rv)
|
||||
ck_bytes = self.ffi.new("CK_BYTE[]", wrapped_key)
|
||||
ck_sig = self.ffi.new("CK_BYTE[]", sig)
|
||||
rv = self.lib.C_Verify(
|
||||
self.rw_session, ck_bytes, len(wrapped_key), ck_sig, len(sig)
|
||||
)
|
||||
self._check_error(rv)
|
||||
|
||||
def _unwrap_key(self, plugin_meta):
|
||||
rv = self.lib.C_VerifyInit(session, mech, hmac_key)
|
||||
self._check_error(rv)
|
||||
ck_bytes = self.ffi.new("CK_BYTE[]", wrapped_key)
|
||||
ck_sig = self.ffi.new("CK_BYTE[]", sig)
|
||||
rv = self.lib.C_Verify(
|
||||
session, ck_bytes, len(wrapped_key), ck_sig, len(sig)
|
||||
)
|
||||
self._check_error(rv)
|
||||
|
||||
def _unwrap_key(self, plugin_meta, session):
|
||||
"""Unwraps byte string to key handle in HSM.
|
||||
|
||||
:param plugin_meta: kek_meta_dto plugin meta (json string)
|
||||
|
@ -535,12 +541,12 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
iv = base64.b64decode(meta['iv'])
|
||||
hmac = base64.b64decode(meta['hmac'])
|
||||
wrapped_key = base64.b64decode(meta['wrapped_key'])
|
||||
mkek = self._get_key_handle(meta['mkek_label'])
|
||||
hmac_key = self._get_key_handle(meta['hmac_label'])
|
||||
mkek = self._get_key_handle(meta['mkek_label'], session)
|
||||
hmac_key = self._get_key_handle(meta['hmac_label'], session)
|
||||
LOG.debug("Unwrapping key with %s mkek label", meta['mkek_label'])
|
||||
|
||||
LOG.debug("Verifying key with %s hmac label", meta['hmac_label'])
|
||||
self._verify_hmac(hmac_key, hmac, wrapped_key)
|
||||
self._verify_hmac(hmac_key, hmac, wrapped_key, session)
|
||||
|
||||
unwrapped = self.ffi.new("CK_OBJECT_HANDLE *")
|
||||
mech = self.ffi.new("CK_MECHANISM *")
|
||||
|
@ -561,7 +567,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
])
|
||||
|
||||
rv = self.lib.C_UnwrapKey(
|
||||
self.rw_session, mech, mkek, wrapped_key, len(wrapped_key),
|
||||
session, mech, mkek, wrapped_key, len(wrapped_key),
|
||||
ck_attributes.template, len(ck_attributes.template), unwrapped
|
||||
)
|
||||
self._check_error(rv)
|
||||
|
@ -577,75 +583,91 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
return unpadder.update(unencrypted) + unpadder.finalize()
|
||||
|
||||
def encrypt(self, encrypt_dto, kek_meta_dto, project_id):
|
||||
key = self._unwrap_key(kek_meta_dto.plugin_meta)
|
||||
iv = self._generate_random(16)
|
||||
session = self._create_working_session()
|
||||
|
||||
key = self._unwrap_key(kek_meta_dto.plugin_meta, session)
|
||||
iv = self._generate_random(16, session)
|
||||
ck_mechanism = self._build_gcm_mech(iv)
|
||||
with self.enc_sem:
|
||||
rv = self.lib.C_EncryptInit(self.session, ck_mechanism.mech, key)
|
||||
self._check_error(rv)
|
||||
# GCM does not require padding, but sometimes HSMs don't seem to
|
||||
# know that and then you need to pad things for no reason.
|
||||
pt_padded = self._pad(encrypt_dto.unencrypted)
|
||||
pt_len = len(pt_padded)
|
||||
# The GCM mechanism adds a 16 byte tag to the front of the
|
||||
# cyphertext (which is the same length as the (annoyingly) padded
|
||||
# plaintext) so adding 16 bytes guarantees sufficient space.
|
||||
ct_len = self.ffi.new("CK_ULONG *", pt_len + 16)
|
||||
ct = self.ffi.new("CK_BYTE[{0}]".format(pt_len + 16))
|
||||
rv = self.lib.C_Encrypt(
|
||||
self.session, pt_padded, pt_len, ct, ct_len
|
||||
)
|
||||
self._check_error(rv)
|
||||
|
||||
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
|
||||
self._check_error(rv)
|
||||
# GCM does not require padding, but sometimes HSMs don't seem to
|
||||
# know that and then you need to pad things for no reason.
|
||||
pt_padded = self._pad(encrypt_dto.unencrypted)
|
||||
pt_len = len(pt_padded)
|
||||
# The GCM mechanism adds a 16 byte tag to the front of the
|
||||
# cyphertext (which is the same length as the (annoyingly) padded
|
||||
# plaintext) so adding 16 bytes guarantees sufficient space.
|
||||
ct_len = self.ffi.new("CK_ULONG *", pt_len + 16)
|
||||
ct = self.ffi.new("CK_BYTE[{0}]".format(pt_len + 16))
|
||||
rv = self.lib.C_Encrypt(session, pt_padded, pt_len, ct, ct_len)
|
||||
self._check_error(rv)
|
||||
|
||||
cyphertext = self.ffi.buffer(ct, ct_len[0])[:]
|
||||
kek_meta_extended = json.dumps({
|
||||
'iv': base64.b64encode(self.ffi.buffer(iv)[:])
|
||||
})
|
||||
|
||||
self._close_session(session)
|
||||
|
||||
return plugin.ResponseDTO(cyphertext, kek_meta_extended)
|
||||
|
||||
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
project_id):
|
||||
key = self._unwrap_key(kek_meta_dto.plugin_meta)
|
||||
session = self._create_working_session()
|
||||
|
||||
key = self._unwrap_key(kek_meta_dto.plugin_meta, session)
|
||||
meta_extended = json.loads(kek_meta_extended)
|
||||
iv = base64.b64decode(meta_extended['iv'])
|
||||
iv = self.ffi.new("CK_BYTE[]", iv)
|
||||
ck_mechanism = self._build_gcm_mech(iv)
|
||||
with self.dec_sem:
|
||||
rv = self.lib.C_DecryptInit(self.session, ck_mechanism.mech, key)
|
||||
self._check_error(rv)
|
||||
pt = self.ffi.new(
|
||||
"CK_BYTE[{0}]".format(len(decrypt_dto.encrypted))
|
||||
)
|
||||
pt_len = self.ffi.new("CK_ULONG *", len(decrypt_dto.encrypted))
|
||||
rv = self.lib.C_Decrypt(
|
||||
self.session,
|
||||
decrypt_dto.encrypted,
|
||||
len(decrypt_dto.encrypted),
|
||||
pt,
|
||||
pt_len
|
||||
)
|
||||
self._check_error(rv)
|
||||
|
||||
rv = self.lib.C_DecryptInit(session, ck_mechanism.mech, key)
|
||||
self._check_error(rv)
|
||||
pt = self.ffi.new(
|
||||
"CK_BYTE[{0}]".format(len(decrypt_dto.encrypted))
|
||||
)
|
||||
pt_len = self.ffi.new("CK_ULONG *", len(decrypt_dto.encrypted))
|
||||
rv = self.lib.C_Decrypt(
|
||||
session,
|
||||
decrypt_dto.encrypted,
|
||||
len(decrypt_dto.encrypted),
|
||||
pt,
|
||||
pt_len
|
||||
)
|
||||
self._check_error(rv)
|
||||
|
||||
self._close_session(session)
|
||||
return self._unpad(self.ffi.buffer(pt, pt_len[0])[:])
|
||||
|
||||
def bind_kek_metadata(self, kek_meta_dto):
|
||||
# Enforce idempotency: If we've already generated a key leave now.
|
||||
if not kek_meta_dto.plugin_meta:
|
||||
session = self._create_working_session()
|
||||
|
||||
kek_length = 32
|
||||
kek_meta_dto.plugin_meta = json.dumps(
|
||||
self._generate_wrapped_kek(kek_meta_dto.kek_label, kek_length)
|
||||
self._generate_wrapped_kek(
|
||||
kek_meta_dto.kek_label,
|
||||
kek_length,
|
||||
session
|
||||
)
|
||||
)
|
||||
# To be persisted by Barbican:
|
||||
kek_meta_dto.algorithm = 'AES'
|
||||
kek_meta_dto.bit_length = kek_length * 8
|
||||
kek_meta_dto.mode = 'CBC'
|
||||
|
||||
# Clean up
|
||||
self._close_session(session)
|
||||
|
||||
return kek_meta_dto
|
||||
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
byte_length = generate_dto.bit_length / 8
|
||||
buf = self._generate_random(byte_length)
|
||||
session = self._create_working_session()
|
||||
buf = self._generate_random(byte_length, session)
|
||||
self._close_session(session)
|
||||
rand = self.ffi.buffer(buf)[:]
|
||||
assert len(rand) == byte_length
|
||||
return self.encrypt(plugin.EncryptDTO(rand), kek_meta_dto, project_id)
|
||||
|
|
|
@ -37,6 +37,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
self.lib = mock.Mock()
|
||||
self.lib.C_Initialize.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_OpenSession.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_CloseSession.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_FindObjectsInit.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_FindObjects.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_FindObjectsFinal.return_value = p11_crypto.CKR_OK
|
||||
|
@ -54,6 +55,8 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
ffi=self.ffi, conf=self.cfg_mock
|
||||
)
|
||||
|
||||
self.test_session = self.plugin._create_working_session()
|
||||
|
||||
def test_generate_calls_generate_random(self):
|
||||
with mock.patch.object(self.plugin, 'encrypt') as encrypt_mock:
|
||||
encrypt_mock.return_value = None
|
||||
|
@ -101,10 +104,11 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
p11_crypto.P11CryptoPluginKeyException,
|
||||
self.plugin._get_key_handle,
|
||||
'mylabel',
|
||||
self.test_session
|
||||
)
|
||||
|
||||
def test_get_key_handle_with_no_keys(self):
|
||||
result = self.plugin._get_key_handle('mylabel')
|
||||
result = self.plugin._get_key_handle('mylabel', self.test_session)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_get_key_handle_with_one_key(self):
|
||||
|
@ -115,7 +119,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
|
||||
self.lib.C_FindObjects.side_effect = one_key
|
||||
|
||||
key = self.plugin._get_key_handle('mylabel')
|
||||
key = self.plugin._get_key_handle('mylabel', self.test_session)
|
||||
self.assertEqual(key, 50)
|
||||
|
||||
def test_encrypt(self):
|
||||
|
@ -156,7 +160,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
self.lib.C_WrapKey.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_SignInit.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_Sign.return_value = p11_crypto.CKR_OK
|
||||
self.plugin._generate_wrapped_kek("label", 32)
|
||||
self.plugin._generate_wrapped_kek("label", 32, self.test_session)
|
||||
self.assertEqual(self.lib.C_WrapKey.call_count, 1)
|
||||
self.assertEqual(self.lib.C_SignInit.call_count, 1)
|
||||
self.assertEqual(self.lib.C_Sign.call_count, 1)
|
||||
|
@ -175,7 +179,8 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
genmock.return_value = self.ffi.new("CK_BYTE[100]")
|
||||
self.assertRaises(
|
||||
p11_crypto.P11CryptoPluginException,
|
||||
self.plugin._perform_rng_self_test
|
||||
self.plugin._perform_rng_self_test,
|
||||
self.test_session
|
||||
)
|
||||
|
||||
def test_check_error(self):
|
||||
|
@ -198,7 +203,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
self.lib.C_UnwrapKey.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_VerifyInit.return_value = p11_crypto.CKR_OK
|
||||
self.lib.C_Verify.return_value = p11_crypto.CKR_OK
|
||||
self.plugin._unwrap_key(json.dumps(plugin_meta))
|
||||
self.plugin._unwrap_key(json.dumps(plugin_meta), self.test_session)
|
||||
self.assertEqual(self.lib.C_UnwrapKey.call_count, 1)
|
||||
self.assertEqual(self.lib.C_Verify.call_count, 1)
|
||||
|
||||
|
|
Loading…
Reference in New Issue