Refactor PKCS#11 to allow configurable mechanisms

Allow for setting the PKCS#11 encryption and hmac algorithms
in the config file.

This patch also implements CKM_AES_CBC encryption and
decryption.

Change-Id: I847b4b17df51bc4846c37a1e19e6adec76f46b38
Co-Authored-By: Ade Lee <alee@redhat.com>
This commit is contained in:
Ade Lee 2018-08-02 16:35:19 -04:00 committed by Douglas Mendizábal
parent 08ca2287dd
commit df8c62aab3
12 changed files with 283 additions and 106 deletions

View File

@ -185,12 +185,16 @@ class HSMCommands(object):
@args('--label', '-L', metavar='<label>', default='primarymkek',
help='The label of the Master Key Encrypt Key')
@args('--length', '-l', metavar='<length>', default=32,
help='The length of the Master Key Encrypt Key (default is 32)')
help='The length in bytes of the Master Key Encryption Key'
' (default is 32)')
def gen_mkek(self, passphrase, libpath=None, slotid=None, label=None,
length=None):
CKK_AES = 'CKK_AES'
CKM_AES_KEY_GEN = 'CKM_AES_KEY_GEN'
self._create_pkcs11_session(str(passphrase), str(libpath), int(slotid))
self._verify_label_does_not_exist(str(label), self.session)
self.pkcs11.generate_key(int(length), self.session, str(label),
self._verify_label_does_not_exist(CKK_AES, str(label), self.session)
self.pkcs11.generate_key(CKK_AES, int(length), CKM_AES_KEY_GEN,
self.session, str(label),
encrypt=True, wrap=True, master_key=True)
self.pkcs11.return_session(self.session)
print("MKEK successfully generated!")
@ -207,13 +211,19 @@ class HSMCommands(object):
help='Password to login to PKCS11 session')
@args('--label', '-L', metavar='<label>', default='primarymkek',
help='The label of the Master HMAC Key')
@args('--key-type', '-t', metavar='<key type>', dest='keytype',
default='CKK_AES', help='The HMAC Key Type (e.g. CKK_AES)')
@args('--length', '-l', metavar='<length>', default=32,
help='The length of the Master HMAC Key (default is 32)')
help='The length in bytes of the Master HMAC Key (default is 32)')
@args('--mechanism', '-m', metavar='<mechanism>',
default='CKM_AES_KEY_GEN', help='The HMAC Key Generation mechanism')
def gen_hmac(self, passphrase, libpath=None, slotid=None, label=None,
length=None):
keytype=None, mechanism=None, length=None):
self._create_pkcs11_session(str(passphrase), str(libpath), int(slotid))
self._verify_label_does_not_exist(str(label), self.session)
self.pkcs11.generate_key(int(length), self.session, str(label),
self._verify_label_does_not_exist(str(keytype), str(label),
self.session)
self.pkcs11.generate_key(str(keytype), int(length), str(mechanism),
self.session, str(label),
sign=True, master_key=True)
self.pkcs11.return_session(self.session)
print("HMAC successfully generated!")
@ -230,12 +240,13 @@ class HSMCommands(object):
def _create_pkcs11_session(self, passphrase, libpath, slotid):
self.pkcs11 = pkcs11.PKCS11(
library_path=libpath, login_passphrase=passphrase,
rw_session=True, slot_id=slotid
rw_session=True, slot_id=slotid,
encryption_mechanism='CKM_AES_CBC',
)
self.session = self.pkcs11.get_session()
def _verify_label_does_not_exist(self, label, session):
key_handle = self.pkcs11.get_key_handle(label, session)
def _verify_label_does_not_exist(self, key_type, label, session):
key_handle = self.pkcs11.get_key_handle(key_type, label, session)
if key_handle:
print(
"The label {label} already exists! "

View File

@ -40,11 +40,11 @@ p11_crypto_plugin_opts = [
help=u._('Password to login to PKCS11 session'),
secret=True),
cfg.StrOpt('mkek_label',
help=u._('Master KEK label (used in the HSM)')),
help=u._('Master KEK label (as stored in the HSM)')),
cfg.IntOpt('mkek_length',
help=u._('Master KEK length in bytes.')),
cfg.StrOpt('hmac_label',
help=u._('HMAC label (used in the HSM)')),
help=u._('Master HMAC Key label (as stored in the HSM)')),
cfg.IntOpt('slot_id',
help=u._('HSM Slot ID'),
default=1),
@ -60,9 +60,15 @@ p11_crypto_plugin_opts = [
cfg.IntOpt('pkek_cache_limit',
help=u._('Project KEK Cache Item Limit'),
default=100),
cfg.StrOpt('algorithm',
help=u._('Secret encryption algorithm'),
default='VENDOR_SAFENET_CKM_AES_GCM'),
cfg.StrOpt('encryption_mechanism',
help=u._('Secret encryption mechanism'),
default='CKM_AES_CBC', deprecated_name='algorithm'),
cfg.StrOpt('hmac_key_type',
help=u._('HMAC Key Type'),
default='CKK_AES'),
cfg.StrOpt('hmac_keygen_mechanism',
help=u._('HMAC Key Generation Algorithm'),
default='CKM_AES_KEY_GEN'),
cfg.StrOpt('seed_file',
help=u._('File to pull entropy for seeding RNG'),
default=''),
@ -104,13 +110,16 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
self.pkcs11 = pkcs11 or self._create_pkcs11(plugin_conf, ffi)
# Save conf arguments
self.encryption_mechanism = plugin_conf.encryption_mechanism
self.mkek_key_type = 'CKK_AES'
self.mkek_length = plugin_conf.mkek_length
self.mkek_label = plugin_conf.mkek_label
self.hmac_label = plugin_conf.hmac_label
self.hmac_key_type = plugin_conf.hmac_key_type
self.hmac_keygen_mechanism = plugin_conf.hmac_keygen_mechanism
self.pkek_length = plugin_conf.pkek_length
self.pkek_cache_ttl = plugin_conf.pkek_cache_ttl
self.pkek_cache_limit = plugin_conf.pkek_cache_limit
self.algorithm = plugin_conf.algorithm
self._configure_object_cache()
@ -167,9 +176,10 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
if 'session' in locals():
self._return_session(session)
kek_meta_extended = json_dumps_compact(
{'iv': base64.b64encode(ct_data['iv'])}
)
kek_meta_extended = json_dumps_compact({
'iv': base64.b64encode(ct_data['iv']),
'mechanism': self.encryption_mechanism
})
return plugin.ResponseDTO(ct_data['ct'], kek_meta_extended)
def _decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
@ -177,11 +187,12 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
kek = self._load_kek_from_meta_dto(kek_meta_dto)
meta_extended = json.loads(kek_meta_extended)
iv = base64.b64decode(meta_extended['iv'])
mech = meta_extended['mechanism']
try:
session = self._get_session()
pt_data = self.pkcs11.decrypt(
kek, iv, decrypt_dto.encrypted, session
mech, kek, iv, decrypt_dto.encrypted, session
)
finally:
if 'session' in locals():
@ -216,7 +227,8 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
self._return_session(session)
kek_meta_extended = json_dumps_compact(
{'iv': base64.b64encode(ct_data['iv'])}
{'iv': base64.b64encode(ct_data['iv']),
'mechanism': self.encryption_mechanism}
)
return plugin.ResponseDTO(ct_data['ct'], kek_meta_extended)
@ -234,8 +246,8 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
self.caching_session_lock = threading.RLock()
# Cache master keys
self._get_master_key(self.mkek_label)
self._get_master_key(self.hmac_label)
self._get_master_key(self.mkek_key_type, self.mkek_label)
self._get_master_key(self.hmac_key_type, self.hmac_label)
def _pkek_cache_add(self, kek, label):
with self.pkek_cache_lock:
@ -286,8 +298,8 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
login_passphrase=plugin_conf.login,
rw_session=plugin_conf.rw_session,
slot_id=plugin_conf.slot_id,
encryption_mechanism=plugin_conf.encryption_mechanism,
ffi=ffi,
algorithm=plugin_conf.algorithm,
seed_random_buffer=seed_random_buffer,
generate_iv=plugin_conf.generate_iv,
)
@ -314,13 +326,13 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
def _return_session(self, session):
self.pkcs11.return_session(session)
def _get_master_key(self, label):
def _get_master_key(self, key_type, label):
with self.mk_cache_lock:
session = self.caching_session
key = self.mk_cache.get(label, None)
if key is None:
with self.caching_session_lock:
key = self.pkcs11.get_key_handle(label, session)
key = self.pkcs11.get_key_handle(key_type, label, session)
if key is None:
raise exception.P11CryptoKeyHandleException(
u._("Could not find key labeled {0}").format(label)
@ -350,8 +362,8 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
with self.caching_session_lock:
session = self.caching_session
# Get master keys
mkek = self._get_master_key(mkek_label)
mkhk = self._get_master_key(hmac_label)
mkek = self._get_master_key(self.mkek_key_type, mkek_label)
mkhk = self._get_master_key(self.hmac_key_type, hmac_label)
# Verify HMAC
self.pkcs11.verify_hmac(mkhk, hmac, kek_data, session)
@ -368,11 +380,13 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
with self.caching_session_lock:
session = self.caching_session
# Get master keys
mkek = self._get_master_key(self.mkek_label)
mkhk = self._get_master_key(self.hmac_label)
mkek = self._get_master_key(self.mkek_key_type, self.mkek_label)
mkhk = self._get_master_key(self.hmac_key_type, self.hmac_label)
# Generate KEK
kek = self.pkcs11.generate_key(key_length, session, encrypt=True)
kek = self.pkcs11.generate_key(
'CKK_AES', key_length, 'CKM_AES_KEY_GEN', session, encrypt=True
)
# Wrap KEK
wkek = self.pkcs11.wrap_key(mkek, kek, session)
@ -401,9 +415,11 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
u._("A master key with that label already exists")
)
mk = self.pkcs11.generate_key(
key_length, session, key_label,
'CKK_AES', key_length, 'CKM_AES_KEY_GEN', session,
key_label=key_label,
encrypt=True, wrap=True, master_key=True
)
self.mk_cache[key_label] = mk
return mk
@ -411,12 +427,14 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
with self.mk_cache_lock, self.caching_session_lock:
session = self.caching_session
if key_label in self.mk_cache or \
self.pkcs11.get_key_handle(key_label, session) is not None:
self.pkcs11.get_key_handle(key_label, session, 'hmac') \
is not None:
raise exception.P11CryptoPluginKeyException(
u._("A master key with that label already exists")
)
mk = self.pkcs11.generate_key(
key_length, session, key_label, sign=True, master_key=True
self.hmac_key_type, key_length, self.hmac_keygen_mechanism,
session, key_label, sign=True, master_key=True
)
self.mk_cache[key_label] = mk
return mk

View File

@ -15,6 +15,7 @@ import collections
import textwrap
import cffi
from cryptography.hazmat.primitives import padding
from barbican.common import exception
from barbican.common import utils
@ -39,6 +40,14 @@ CKS_RW_USER_FUNCTIONS = 3
CKO_SECRET_KEY = 4
CKK_AES = 0x1f
CKK_GENERIC_SECRET = 0x10
CKK_SHA256_HMAC = 0x0000002B
_KEY_TYPES = {
'CKK_AES': CKK_AES,
'CKK_GENERIC_SECRET': CKK_GENERIC_SECRET,
'CKK_SHA256_HMAC': CKK_SHA256_HMAC
}
CKA_CLASS = 0
CKA_TOKEN = 1
@ -128,13 +137,30 @@ CKM_AES_CBC = 0x1082
CKM_AES_CBC_PAD = 0x1085
CKM_AES_GCM = 0x1087
CKM_AES_KEY_WRAP = 0x1090
CKM_GENERIC_SECRET_KEY_GEN = 0x350
VENDOR_SAFENET_CKM_AES_GCM = 0x8000011c
CKM_NAMES = {
# Thales Vendor-defined Mechanisms
CKM_NC_SHA256_HMAC_KEY_GEN = 0xDE436997
_ENCRYPTION_MECHANISMS = {
'CKM_AES_CBC': CKM_AES_CBC,
'CKM_AES_GCM': CKM_AES_GCM,
'VENDOR_SAFENET_CKM_AES_GCM': VENDOR_SAFENET_CKM_AES_GCM
'VENDOR_SAFENET_CKM_AES_GCM': VENDOR_SAFENET_CKM_AES_GCM,
}
_CBC_IV_SIZE = 16 # bytes
_CBC_BLOCK_SIZE = 128 # bits
_KEY_GEN_MECHANISMS = {
'CKM_AES_KEY_GEN': CKM_AES_KEY_GEN,
'CKM_NC_SHA256_HMAC_KEY_GEN': CKM_NC_SHA256_HMAC_KEY_GEN
}
CKM_NAMES = dict()
CKM_NAMES.update(_ENCRYPTION_MECHANISMS)
CKM_NAMES.update(_KEY_GEN_MECHANISMS)
ERROR_CODES = {
1: 'CKR_CANCEL',
2: 'CKR_HOST_MEMORY',
@ -286,6 +312,8 @@ def build_ffi():
CK_RV C_GetSessionInfo(CK_SESSION_HANDLE, CK_SESSION_INFO_PTR);
CK_RV C_Login(CK_SESSION_HANDLE, CK_USER_TYPE, CK_UTF8CHAR_PTR,
CK_ULONG);
CK_RV C_GetAttributeValue(CK_SESSION_HANDLE, CK_OBJECT_HANDLE,
CK_ATTRIBUTE *, CK_ULONG);
CK_RV C_SetAttributeValue(CK_SESSION_HANDLE, CK_OBJECT_HANDLE,
CK_ATTRIBUTE *, CK_ULONG);
CK_RV C_DestroyObject(CK_SESSION_HANDLE, CK_OBJECT_HANDLE);
@ -324,9 +352,22 @@ def build_ffi():
class PKCS11(object):
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
ffi=None, algorithm='CKM_AES_GCM',
encryption_mechanism=None,
ffi=None, algorithm=None,
seed_random_buffer=None,
generate_iv=None):
if algorithm:
LOG.warning("WARNING: Using deprecated 'algorithm' argument.")
encryption_mechanism = encryption_mechanism or algorithm
if encryption_mechanism not in _ENCRYPTION_MECHANISMS:
raise ValueError("Invalid encryption_mechanism.")
self.encrypt_mech = _ENCRYPTION_MECHANISMS[encryption_mechanism]
self.encrypt = getattr(
self,
'_{}_encrypt'.format(encryption_mechanism)
)
self.ffi = ffi or build_ffi()
self.lib = self.ffi.dlopen(library_path)
rv = self.lib.C_Initialize(self.ffi.NULL)
@ -338,7 +379,7 @@ class PKCS11(object):
self.slot_id = slot_id
# Algorithm options
self.algorithm = CKM_NAMES[algorithm]
self.algorithm = CKM_NAMES[encryption_mechanism]
self.blocksize = 16
self.noncesize = 12
self.gcmtagsize = 16
@ -368,10 +409,10 @@ class PKCS11(object):
buf = self._generate_random(length, session)
return self.ffi.buffer(buf)[:]
def get_key_handle(self, label, session):
def get_key_handle(self, key_type, label, session):
attributes = self._build_attributes([
Attribute(CKA_CLASS, CKO_SECRET_KEY),
Attribute(CKA_KEY_TYPE, CKK_AES),
Attribute(CKA_KEY_TYPE, _KEY_TYPES[key_type]),
Attribute(CKA_LABEL, str(label))
])
rv = self.lib.C_FindObjectsInit(
@ -392,7 +433,54 @@ class PKCS11(object):
raise exception.P11CryptoPluginKeyException()
return key
def encrypt(self, key, pt_data, session):
def _CKM_AES_CBC_encrypt(self, key, pt_data, session):
iv = self._generate_random(_CBC_IV_SIZE, session)
ck_mechanism = self._build_cbc_mechanism(iv)
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
self._check_error(rv)
padder = padding.PKCS7(_CBC_BLOCK_SIZE).padder()
padded_pt_data = padder.update(pt_data)
padded_pt_data += padder.finalize()
pt_len = len(padded_pt_data)
ct_len = self.ffi.new("CK_ULONG *", pt_len)
ct = self.ffi.new("CK_BYTE[{}]".format(ct_len[0]))
rv = self.lib.C_Encrypt(session, padded_pt_data, pt_len, ct, ct_len)
self._check_error(rv)
return {
"iv": self.ffi.buffer(iv)[:],
"ct": self.ffi.buffer(ct, ct_len[0])[:]
}
def _build_cbc_mechanism(self, iv):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.encrypt_mech
mech.parameter = iv
mech.parameter_len = _CBC_IV_SIZE
return CKMechanism(mech, None)
def _CKM_AES_CBC_decrypt(self, key, iv, ct_data, session):
iv = self.ffi.new("CK_BYTE[{}]".format(len(iv)), iv)
ck_mechanism = self._build_cbc_mechanism(iv)
rv = self.lib.C_DecryptInit(session, ck_mechanism.mech, key)
self._check_error(rv)
ct_len = len(ct_data)
pt_len = self.ffi.new("CK_ULONG *", ct_len)
pt = self.ffi.new("CK_BYTE[{0}]".format(pt_len[0]))
rv = self.lib.C_Decrypt(session, ct_data, ct_len, pt, pt_len)
self._check_error(rv)
pt = self.ffi.buffer(pt, pt_len[0])[:]
unpadder = padding.PKCS7(_CBC_BLOCK_SIZE).unpadder()
unpadded_pt = unpadder.update(pt)
unpadded_pt += unpadder.finalize()
return unpadded_pt
def _VENDOR_SAFENET_CKM_AES_GCM_encrypt(self, key, pt_data, session):
iv = None
if self.generate_iv:
iv = self._generate_random(self.noncesize, session)
@ -419,9 +507,25 @@ class PKCS11(object):
return {
"iv": self.ffi.buffer(ct, ct_len[0])[-self.gcmtagsize:],
"ct": self.ffi.buffer(ct, ct_len[0])[:-self.gcmtagsize]
}
}
def decrypt(self, key, iv, ct_data, session):
def _build_gcm_mechanism(self, iv=None):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.algorithm
gcm = self.ffi.new("CK_AES_GCM_PARAMS *")
if iv:
iv_len = len(iv)
gcm.pIv = iv
gcm.ulIvLen = iv_len
gcm.ulIvBits = iv_len * 8
gcm.ulTagBits = self.gcmtagsize * 8
mech.parameter = gcm
mech.parameter_len = 48
return CKMechanism(mech, gcm)
def _VENDOR_SAFENET_CKM_AES_GCM_decrypt(self, key, iv, ct_data, session):
iv = self.ffi.new("CK_BYTE[{0}]".format(len(iv)), iv)
ck_mechanism = self._build_gcm_mechanism(iv)
rv = self.lib.C_DecryptInit(session, ck_mechanism.mech, key)
@ -452,23 +556,39 @@ class PKCS11(object):
return pt
def generate_key(self, key_length, session, key_label=None,
encrypt=False, sign=False, wrap=False, master_key=False):
if not encrypt and not sign and not wrap:
def _CKM_AES_GCM_encrypt(self, key, pt_data, session):
return self._VENDOR_SAFENET_CKM_AES_GCM_encrypt(key, pt_data, session)
def _CKM_AES_GCM_decrypt(self, key, iv, ct_data, session):
return self._VENDOR_SAFENET_CKM_AES_GCM_decrypt(key, ct_data, session)
def decrypt(self, mechanism, key, iv, ct_data, session):
if mechanism not in _ENCRYPTION_MECHANISMS:
raise ValueError(u._("Unsupported decryption mechanism"))
return getattr(self, '_{}_decrypt'.format(mechanism))(
key, iv, ct_data, session
)
def generate_key(self, key_type, key_length, mechanism, session,
key_label=None, master_key=False,
encrypt=False, sign=False, wrap=False):
if not any((encrypt, sign, wrap)):
raise exception.P11CryptoPluginException()
if master_key and not key_label:
raise ValueError(u._("key_label must be set for master_keys"))
token = True if master_key else False
extractable = False if master_key else True
token = master_key
extractable = not master_key
# in some HSMs extractable keys cannot be marked sensitive
sensitive = not extractable
ck_attributes = [
Attribute(CKA_CLASS, CKO_SECRET_KEY),
Attribute(CKA_KEY_TYPE, CKK_AES),
Attribute(CKA_KEY_TYPE, _KEY_TYPES[key_type]),
Attribute(CKA_VALUE_LEN, key_length),
Attribute(CKA_TOKEN, token),
Attribute(CKA_PRIVATE, True),
Attribute(CKA_SENSITIVE, True),
Attribute(CKA_SENSITIVE, sensitive),
Attribute(CKA_ENCRYPT, encrypt),
Attribute(CKA_DECRYPT, encrypt),
Attribute(CKA_SIGN, sign),
@ -481,7 +601,9 @@ class PKCS11(object):
ck_attributes.append(Attribute(CKA_LABEL, key_label))
ck_attributes = self._build_attributes(ck_attributes)
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = CKM_AES_KEY_GEN
mech.mechanism = _KEY_GEN_MECHANISMS[mechanism]
obj_handle_ptr = self.ffi.new("CK_OBJECT_HANDLE *")
rv = self.lib.C_GenerateKey(
session, mech, ck_attributes.template, len(ck_attributes.template),
@ -657,19 +779,3 @@ class PKCS11(object):
if test_random == b'\x00' * 100:
raise exception.P11CryptoPluginException(
u._("Apparent RNG self-test failure."))
def _build_gcm_mechanism(self, iv=None):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.algorithm
gcm = self.ffi.new("CK_AES_GCM_PARAMS *")
if iv:
iv_len = len(iv)
gcm.pIv = iv
gcm.ulIvLen = iv_len
gcm.ulIvBits = iv_len * 8
gcm.ulTagBits = self.gcmtagsize * 8
mech.parameter = gcm
mech.parameter_len = 48
return CKMechanism(mech, gcm)

View File

@ -165,8 +165,9 @@ class TestBarbicanManage(TestBarbicanManageBase):
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'gen_mkek',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd',
'--label', 'mocklabel'], mock_genkey,
32, 1, 'mocklabel', encrypt=True, wrap=True, master_key=True)
'--label', 'mocklabel'], mock_genkey, 'CKK_AES',
32, 'CKM_AES_KEY_GEN', 1, 'mocklabel', encrypt=True, wrap=True,
master_key=True)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_gen_hmac(self, mock_pkcs11):
@ -177,8 +178,8 @@ class TestBarbicanManage(TestBarbicanManageBase):
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'gen_hmac',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd',
'--label', 'mocklabel'], mock_genkey,
32, 1, 'mocklabel', sign=True, master_key=True)
'--label', 'mocklabel'], mock_genkey, 'CKK_AES',
32, 'CKM_AES_KEY_GEN', 1, 'mocklabel', sign=True, master_key=True)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_gen_mkek_non_default_length(self, mock_pkcs11):
@ -190,8 +191,8 @@ class TestBarbicanManage(TestBarbicanManageBase):
['barbican.cmd.barbican_manage', 'hsm', 'gen_mkek',
'--length', '48', '--library-path', 'mocklib',
'--passphrase', 'mockpassewd', '--label', 'mocklabel'],
mock_genkey, 48, 1, 'mocklabel', encrypt=True, wrap=True,
master_key=True)
mock_genkey, 'CKK_AES', 48, 'CKM_AES_KEY_GEN', 1, 'mocklabel',
encrypt=True, wrap=True, master_key=True)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_gen_hmac_non_default_length(self, mock_pkcs11):
@ -203,4 +204,5 @@ class TestBarbicanManage(TestBarbicanManageBase):
['barbican.cmd.barbican_manage', 'hsm', 'gen_hmac',
'--length', '48', '--library-path', 'mocklib',
'--passphrase', 'mockpassewd', '--label', 'mocklabel'],
mock_genkey, 48, 1, 'mocklabel', sign=True, master_key=True)
mock_genkey, 'CKK_AES', 48, 'CKM_AES_KEY_GEN', 1, 'mocklabel',
sign=True, master_key=True)

View File

@ -60,7 +60,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.cfg_mock.p11_crypto_plugin.pkek_length = 32
self.cfg_mock.p11_crypto_plugin.pkek_cache_ttl = 900
self.cfg_mock.p11_crypto_plugin.pkek_cache_limit = 10
self.cfg_mock.p11_crypto_plugin.algorithm = 'CKM_AES_GCM'
self.cfg_mock.p11_crypto_plugin.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.p11_crypto_plugin.seed_file = ''
self.cfg_mock.p11_crypto_plugin.seed_length = 32
@ -154,13 +154,13 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
def test_decrypt(self):
ct = b'ctct'
kek_meta_extended = '{"iv":"AAAA"}'
kek_meta_extended = '{"iv":"AAAA","mechanism":"CKM_AES_CBC"}'
decrypt_dto = plugin_import.DecryptDTO(ct)
kek_meta = mock.MagicMock()
kek_meta.kek_label = 'pkek'
kek_meta.plugin_meta = ('{"iv": "iv==",'
'"hmac": "hmac",'
'"wrapped_key": "wrappedkey==",'
'"wrapped_key": "c2VjcmV0a2V5BwcHBwcHBw==",'
'"mkek_label": "mkek_label",'
'"hmac_label": "hmac_label"}')
pt = self.plugin.decrypt(decrypt_dto,
@ -183,7 +183,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
'Testing error handling'
)
ct = b'ctct'
kek_meta_extended = '{"iv":"AAAA"}'
kek_meta_extended = '{"iv":"AAAA","mechanism":"CKM_AES_CBC"}'
decrypt_dto = plugin_import.DecryptDTO(ct)
kek_meta = mock.MagicMock()
kek_meta.kek_label = 'pkek'
@ -273,6 +273,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.pkcs11.get_key_handle.return_value = None
self.assertRaises(ex.P11CryptoKeyHandleException,
self.plugin._get_master_key,
self.plugin.mkek_key_type,
'bad_key_label')
def test_cached_kek_expired(self):

View File

@ -59,11 +59,13 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.cfg_mock.login_passphrase = 'foobar'
self.cfg_mock.rw_session = False
self.cfg_mock.slot_id = 1
self.cfg_mock.algorithm = 'CKM_AES_GCM'
self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC'
self.pkcs11 = pkcs11.PKCS11(
self.cfg_mock.library_path, self.cfg_mock.login_passphrase,
self.cfg_mock.rw_session, self.cfg_mock.slot_id, ffi=self.ffi
self.cfg_mock.rw_session, self.cfg_mock.slot_id,
self.cfg_mock.encryption_mechanism,
ffi=self.ffi
)
def _generate_random(self, session, buf, length):
@ -183,7 +185,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.pkcs11._rng_self_test, mock.MagicMock())
def test_get_key_handle_one_key(self):
key = self.pkcs11.get_key_handle('foo', mock.MagicMock())
key = self.pkcs11.get_key_handle('CKK_AES', 'foo', mock.MagicMock())
self.assertEqual(2, key)
@ -193,7 +195,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_get_key_handle_no_keys(self):
self.lib.C_FindObjects.side_effect = self._find_objects_zero
key = self.pkcs11.get_key_handle('foo', mock.MagicMock())
key = self.pkcs11.get_key_handle('CKK_AES', 'foo', mock.MagicMock())
self.assertIsNone(key)
@ -205,21 +207,24 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.lib.C_FindObjects.side_effect = self._find_objects_two
self.assertRaises(exception.P11CryptoPluginKeyException,
self.pkcs11.get_key_handle, 'foo', mock.MagicMock())
self.pkcs11.get_key_handle, 'CKK_AES', 'foo',
mock.MagicMock())
self.assertEqual(1, self.lib.C_FindObjectsInit.call_count)
self.assertEqual(1, self.lib.C_FindObjects.call_count)
self.assertEqual(1, self.lib.C_FindObjectsFinal.call_count)
def test_generate_session_key(self):
key = self.pkcs11.generate_key(16, mock.MagicMock(), encrypt=True)
key = self.pkcs11.generate_key('CKK_AES', 16, 'CKM_AES_KEY_GEN',
mock.MagicMock(), encrypt=True)
self.assertEqual(3, key)
self.assertEqual(1, self.lib.C_GenerateKey.call_count)
def test_generate_master_key(self):
key = self.pkcs11.generate_key(16, mock.MagicMock(), key_label='key',
key = self.pkcs11.generate_key('CKK_AES', 16, 'CKM_AES_KEY_GEN',
mock.MagicMock(), key_label='key',
encrypt=True, master_key=True)
self.assertEqual(3, key)
@ -228,18 +233,22 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_generate_key_no_flags(self):
self.assertRaises(exception.P11CryptoPluginException,
self.pkcs11.generate_key, mock.MagicMock(),
mock.MagicMock())
self.pkcs11.generate_key, 'CKK_AES', 16,
mock.MagicMock(), mock.MagicMock())
def test_generate_master_key_no_label(self):
self.assertRaises(ValueError, self.pkcs11.generate_key,
'CKK_AES', 16,
mock.MagicMock(), mock.MagicMock(),
encrypt=True, master_key=True)
def test_encrypt_with_no_iv_generation(self):
pt = b'0123456789ABCDEF'
self.pkcs11.generate_iv = False
ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock())
ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
mock.MagicMock(),
pt, mock.MagicMock()
)
self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
self.assertGreater(len(ct['iv']), 0)
@ -251,7 +260,9 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_encrypt_with_iv_generation(self):
pt = b'0123456789ABCDEF'
self.pkcs11.generate_iv = True
ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock())
ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
mock.MagicMock(), pt, mock.MagicMock()
)
self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
self.assertGreater(len(ct['iv']), 0)
@ -261,9 +272,10 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_Encrypt.call_count)
def test_decrypt(self):
ct = b'FEDCBA9876543210' + b'0' * self.pkcs11.gcmtagsize
ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
iv = b'0' * self.pkcs11.noncesize
pt = self.pkcs11.decrypt(mock.MagicMock(), iv, ct, mock.MagicMock())
pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
mock.MagicMock(), iv, ct, mock.MagicMock())
pt_len = len(ct) - self.pkcs11.gcmtagsize
self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
@ -272,9 +284,10 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_Decrypt.call_count)
def test_decrypt_with_pad(self):
ct = b'\x03\x03\x03CBA9876543210' + b'0' * self.pkcs11.gcmtagsize
ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
iv = b'0' * self.pkcs11.blocksize
pt = self.pkcs11.decrypt(mock.MagicMock(), iv, ct, mock.MagicMock())
pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
mock.MagicMock(), iv, ct, mock.MagicMock())
pt_len = len(ct) - self.pkcs11.gcmtagsize - 3
self.assertEqual(pt[:pt_len], ct[3:-self.pkcs11.gcmtagsize][::-1])
@ -283,9 +296,10 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_Decrypt.call_count)
def test_decrypt_with_pad_new_iv(self):
ct = b'\x03\x03\x03CBA9876543210' + b'0' * self.pkcs11.gcmtagsize
ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
iv = b'0' * self.pkcs11.gcmtagsize
pt = self.pkcs11.decrypt(mock.MagicMock(), iv, ct, mock.MagicMock())
pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
mock.MagicMock(), iv, ct, mock.MagicMock())
pt_len = len(ct) - self.pkcs11.gcmtagsize
self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
@ -294,9 +308,10 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_Decrypt.call_count)
def test_decrypt_with_pad_wrong_size(self):
ct = b'\x03\x03\x03CBA987654321' + b'0' * self.pkcs11.gcmtagsize
ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
iv = b'0' * self.pkcs11.blocksize
pt = self.pkcs11.decrypt(mock.MagicMock(), iv, ct, mock.MagicMock())
pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
mock.MagicMock(), iv, ct, mock.MagicMock())
pt_len = len(ct) - self.pkcs11.gcmtagsize
self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
@ -305,9 +320,10 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_Decrypt.call_count)
def test_decrypt_with_pad_wrong_length(self):
ct = b'\x03EDCBA9876543210' + b'0' * self.pkcs11.gcmtagsize
ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
iv = b'0' * self.pkcs11.blocksize
pt = self.pkcs11.decrypt(mock.MagicMock(), iv, ct, mock.MagicMock())
pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
mock.MagicMock(), iv, ct, mock.MagicMock())
pt_len = len(ct) - self.pkcs11.gcmtagsize
self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
@ -316,9 +332,10 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_Decrypt.call_count)
def test_decrypt_with_too_large_pad(self):
ct = b'\x11EDCBA9876543210' + b'0' * self.pkcs11.gcmtagsize
ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
iv = b'0' * self.pkcs11.blocksize
pt = self.pkcs11.decrypt(mock.MagicMock(), iv, ct, mock.MagicMock())
pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
mock.MagicMock(), iv, ct, mock.MagicMock())
pt_len = len(ct) - self.pkcs11.gcmtagsize
self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])

View File

@ -666,5 +666,9 @@ def is_vault_enabled():
return os.environ.get('VAULT_PLUGIN_ENABLED') is not None
def is_pkcs11_enabled():
return os.environ.get('PKCS11_PLUGIN_ENABLED') is not None
class DummyClassForTesting(object):
pass

View File

@ -568,7 +568,7 @@ class OrdersTestCase(base.TestCase):
self.assertRegex(order_resp.model.order_ref, regex)
@testcase.attr('positive')
@testtools.skipIf(utils.is_vault_enabled(),
@testtools.skipIf(utils.is_vault_enabled() or utils.is_pkcs11_enabled(),
"Vault does not support this operation")
def test_encryption_using_generated_key(self):
"""Tests functionality of a generated asymmetric key pair."""

View File

@ -236,7 +236,7 @@ class RSATestCase(base.TestCase):
self.verify_container_keys_equal(secrets, with_passphrase=True)
@testcase.attr('positive')
@testtools.skipIf(utils.is_vault_enabled(),
@testtools.skipIf(utils.is_vault_enabled() or utils.is_pkcs11_enabled(),
"Vault does not support this operation")
def test_rsa_order_container(self):
"""Post an order for a container"""
@ -246,7 +246,8 @@ class RSATestCase(base.TestCase):
self.verify_container_keys_valid(secrets)
@testcase.attr('positive')
@testtools.skipIf(utils.is_kmip_enabled() or utils.is_vault_enabled(),
@testtools.skipIf(utils.is_kmip_enabled() or utils.is_vault_enabled()
or utils.is_pkcs11_enabled(),
"PyKMIP does not support this operation")
def test_rsa_order_container_with_passphrase(self):
"""Post an order for a container with a passphrase"""
@ -267,7 +268,7 @@ class RSATestCase(base.TestCase):
self.verify_container_keys_equal(secrets)
@testcase.attr('positive')
@testtools.skipIf(utils.is_vault_enabled(),
@testtools.skipIf(utils.is_vault_enabled() or utils.is_pkcs11_enabled(),
"Vault does not support this operation")
def test_rsa_order_certificate_from_ordered_container(self):
"""Post an order for a certificate"""
@ -280,7 +281,8 @@ class RSATestCase(base.TestCase):
self.verify_certificate_order_status(order_status)
@testcase.attr('positive')
@testtools.skipIf(utils.is_kmip_enabled() or utils.is_vault_enabled(),
@testtools.skipIf(utils.is_kmip_enabled() or utils.is_vault_enabled()
or utils.is_pkcs11_enabled(),
"PyKMIP does not support this operation")
def test_rsa_order_certificate_from_ordered_container_with_pass(self):
"""Post an order for a certificate"""

View File

@ -28,6 +28,8 @@ if [[ "$plugin" == "kmip" ]]; then
export KMIP_PLUGIN_ENABLED=1
elif [[ "$plugin" == "vault" ]]; then
export VAULT_PLUGIN_ENABLED=1
elif [[ "$plugin" == "pkcs11" ]]; then
export PKCS11_PLUGIN_ENABLED=1
fi
# run the tests sequentially

View File

@ -0,0 +1,12 @@
---
features:
- |
Added new options to the PKCS#11 Cryptographic Plugin configuration to
enable the use of different encryption and hmac mechanisms.
Added support for `CKM_AES_CBC` encryption in the PKCS#11 Cryptographic
Plugin.
deprecations:
- |
Deprecated the `p11_crypto_plugin:algoritm` option. Users should update
their configuration to use `p11_crypto_plugin:encryption_mechanism`
instead.

View File

@ -115,6 +115,7 @@ commands =
coverage xml -o cover/coverage.xml
passenv = KMIP_PLUGIN_ENABLED
VAULT_PLUGIN_ENABLED
PKCS11_PLUGIN_ENABLED
[testenv:py35functional]
basepython = python3
@ -129,6 +130,7 @@ commands =
coverage xml -o cover/coverage.xml
passenv = KMIP_PLUGIN_ENABLED
VAULT_PLUGIN_ENABLED
PKCS11_PLUGIN_ENABLED
[testenv:cmd]
basepython = python3