Use serial number or label for PKCS#11 tokens

The PKCS#11 standard does not make any guarantees about
slot numbering, so the slot ID alone should not be used
to identify a token.  Instead, the token's Serial Number
or Label should be used to ensure the correct token
is being used.

This patch adds two new config options to the p11_crypto
plugin: token_serial_number and token_label.

These new options allow for more flexibility in configuring
the PKCS#11 module.  The config may include either the token's
serial number or its label.

Serial numbers should be unique, so they take higher precedence.

Some devices allow tokens to have the same label, so this patch
ensures that only one token with the specified label is present.

If both serial number and label are given, only the serial number
will be checked and an error will be raised if it is not found.

slot_id continues to work as expected, although its use is discouraged
and may be deprecated in a future patch.  If the conf contains
only the slot_id, it will be used.  If the serial number or
label are also provided, the new logic will ignore the slot_id
and search for the serial number or label instead.

Change-Id: I115cf1a7006a6c85f37c5e50ded13134a3dfd1a3
(cherry picked from commit 69459a0ecf)
This commit is contained in:
Douglas Mendizábal 2020-10-14 16:04:47 -05:00
parent 5214aecca4
commit 1e2a12711a
6 changed files with 269 additions and 10 deletions

View File

@ -36,6 +36,14 @@ p11_crypto_plugin_group = cfg.OptGroup(name='p11_crypto_plugin',
p11_crypto_plugin_opts = [
cfg.StrOpt('library_path',
help=u._('Path to vendor PKCS11 library')),
cfg.StrOpt('token_serial_number',
help=u._('Token serial number used to identify the token to be '
'used. Required when the device has multiple tokens '
'with the same label.')),
cfg.StrOpt('token_label',
help=u._('Token label used to identify the token to '
'be used. Required when token_serial_number is '
'not specified.')),
cfg.StrOpt('login',
help=u._('Password to login to PKCS11 session'),
secret=True),
@ -46,7 +54,8 @@ p11_crypto_plugin_opts = [
cfg.StrOpt('hmac_label',
help=u._('Master HMAC Key label (as stored in the HSM)')),
cfg.IntOpt('slot_id',
help=u._('HSM Slot ID'),
help=u._('(Optional) HSM Slot ID that contains the token '
'device to be used.'),
default=1),
cfg.BoolOpt('rw_session',
help=u._('Flag for Read/Write Sessions'),
@ -310,7 +319,9 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
seed_random_buffer=seed_random_buffer,
generate_iv=plugin_conf.aes_gcm_generate_iv,
always_set_cka_sensitive=plugin_conf.always_set_cka_sensitive,
hmac_keywrap_mechanism=plugin_conf.hmac_keywrap_mechanism
hmac_keywrap_mechanism=plugin_conf.hmac_keywrap_mechanism,
token_serial_number=plugin_conf.token_serial_number,
token_label=plugin_conf.token_label
)
def _reinitialize_pkcs11(self):

View File

@ -12,6 +12,7 @@
# limitations under the License.
import collections
import itertools
import textwrap
import cffi
@ -27,8 +28,10 @@ LOG = utils.getLogger(__name__)
Attribute = collections.namedtuple("Attribute", ["type", "value"])
CKAttributes = collections.namedtuple("CKAttributes", ["template", "cffivals"])
CKMechanism = collections.namedtuple("CKMechanism", ["mech", "cffivals"])
Token = collections.namedtuple("Token", ["slot_id", "label", "serial_number"])
CKR_OK = 0
CK_TRUE = 1
CKF_RW_SESSION = (1 << 1)
CKF_SERIAL_SESSION = (1 << 2)
CKU_SO = 0
@ -263,11 +266,15 @@ def build_ffi():
ffi = cffi.FFI()
ffi.cdef(textwrap.dedent("""
typedef unsigned char CK_BYTE;
typedef CK_BYTE CK_CHAR;
typedef CK_BYTE CK_UTF8CHAR;
typedef CK_BYTE CK_BBOOL;
typedef unsigned long CK_ULONG;
typedef unsigned long CK_RV;
typedef unsigned long CK_SESSION_HANDLE;
typedef unsigned long CK_OBJECT_HANDLE;
typedef unsigned long CK_SLOT_ID;
typedef CK_SLOT_ID * CK_SLOT_ID_PTR;
typedef unsigned long CK_FLAGS;
typedef unsigned long CK_STATE;
typedef unsigned long CK_USER_TYPE;
@ -294,6 +301,44 @@ def build_ffi():
typedef CK_BYTE *CK_BYTE_PTR;
typedef CK_ULONG *CK_ULONG_PTR;
typedef struct CK_VERSION {
CK_BYTE major;
CK_BYTE minor;
} CK_VERSION;
typedef struct CK_SLOT_INFO {
CK_UTF8CHAR slotDescription[64];
CK_UTF8CHAR manufacturerID[32];
CK_FLAGS flags;
CK_VERSION hardwareVersion;
CK_VERSION firmwareVersion;
} CK_SLOT_INFO;
typedef CK_SLOT_INFO * CK_SLOT_INFO_PTR;
typedef struct CK_TOKEN_INFO {
CK_UTF8CHAR label[32];
CK_UTF8CHAR manufacturerID[32];
CK_UTF8CHAR model[16];
CK_CHAR serialNumber[16];
CK_FLAGS flags;
CK_ULONG ulMaxSessionCount;
CK_ULONG ulSessionCount;
CK_ULONG ulMaxRwSessionCount;
CK_ULONG ulRwSessionCount;
CK_ULONG ulMaxPinLen;
CK_ULONG ulMinPinLen;
CK_ULONG ulTotalPublicMemory;
CK_ULONG ulFreePublicMemory;
CK_ULONG ulTotalPrivateMemory;
CK_ULONG ulFreePrivateMemory;
CK_VERSION hardwareVersion;
CK_VERSION firmwareVersion;
CK_CHAR utcTime[16];
} CK_TOKEN_INFO;
typedef CK_TOKEN_INFO * CK_TOKEN_INFO_PTR;
typedef struct ck_session_info {
CK_SLOT_ID slot_id;
CK_STATE state;
@ -321,6 +366,9 @@ def build_ffi():
CK_RV C_GetSessionInfo(CK_SESSION_HANDLE, CK_SESSION_INFO_PTR);
CK_RV C_Login(CK_SESSION_HANDLE, CK_USER_TYPE, CK_UTF8CHAR_PTR,
CK_ULONG);
CK_RV C_GetSlotList(CK_BBOOL, CK_SLOT_ID_PTR, CK_ULONG_PTR);
CK_RV C_GetSlotInfo(CK_SLOT_ID, CK_SLOT_INFO_PTR);
CK_RV C_GetTokenInfo(CK_SLOT_ID, CK_TOKEN_INFO_PTR);
CK_RV C_GetAttributeValue(CK_SESSION_HANDLE, CK_OBJECT_HANDLE,
CK_ATTRIBUTE *, CK_ULONG);
CK_RV C_SetAttributeValue(CK_SESSION_HANDLE, CK_OBJECT_HANDLE,
@ -365,7 +413,9 @@ class PKCS11(object):
ffi=None, algorithm=None,
seed_random_buffer=None,
generate_iv=None, always_set_cka_sensitive=None,
hmac_keywrap_mechanism='CKM_SHA256_HMAC'):
hmac_keywrap_mechanism='CKM_SHA256_HMAC',
token_serial_number=None,
token_label=None):
if algorithm:
LOG.warning("WARNING: Using deprecated 'algorithm' argument.")
encryption_mechanism = encryption_mechanism or algorithm
@ -389,7 +439,10 @@ class PKCS11(object):
# Session options
self.login_passphrase = _to_bytes(login_passphrase)
self.rw_session = rw_session
self.slot_id = slot_id
self.slot_id = self._get_slot_id(
token_serial_number,
token_label,
slot_id)
# Algorithm options
self.algorithm = CKM_NAMES[encryption_mechanism]
@ -406,6 +459,72 @@ class PKCS11(object):
self._seed_random(session, seed_random_buffer)
self._rng_self_test(session)
self.return_session(session)
LOG.debug("Connected to PCKS11 sn: %s label: %s slot: %s",
token_serial_number, token_label, self.slot_id)
def _get_slot_id(self, token_serial_number, token_label, slot_id):
# First find out how many slots with tokens are available
slots_ptr = self.ffi.new("CK_ULONG_PTR")
rv = self.lib.C_GetSlotList(CK_TRUE, self.ffi.NULL, slots_ptr)
self._check_error(rv)
# Next get the Slot IDs for each of the available slots
slot_ids_ptr = self.ffi.new("CK_SLOT_ID[{}]".format(slots_ptr[0]))
rv = self.lib.C_GetSlotList(CK_TRUE, slot_ids_ptr, slots_ptr)
self._check_error(rv)
# Gather details from each token
tokens = list()
for id in slot_ids_ptr:
token_info_ptr = self.ffi.new("CK_TOKEN_INFO_PTR")
rv = self.lib.C_GetTokenInfo(id, token_info_ptr)
self._check_error(rv)
tokens.append(Token(
id,
self.ffi.string(token_info_ptr.label).decode("UTF-8").strip(),
self.ffi.string(
token_info_ptr.serialNumber
).decode("UTF-8").strip()
))
# Matching serial number gets highest priority
if token_serial_number:
for token in tokens:
if token.serial_number == token_serial_number:
LOG.debug("Found token sn: %s in slot %s",
token.serial_number,
token.slot_id)
if token_label:
LOG.warning(
"Ignoring token_label: %s from barbican.conf",
token_label
)
if slot_id:
LOG.warning("Ignoring slot_id: %s from barbican.conf",
slot_id)
return token.slot_id
raise ValueError("Token Serial Number not found.")
# Label match is next, raises an error if there's not exactly one match
if token_label:
matched = list(itertools.dropwhile(
lambda x: x.label != token_label,
tokens
))
if len(matched) > 1:
raise ValueError("More than one matching token label found")
if len(matched) < 1:
raise ValueError("Token Label not found.")
token = matched.pop()
LOG.debug("Found token label: %s in slot %s", token.label,
token.slot_id)
if slot_id:
LOG.warning("Ignoring slot_id: %s from barbican.conf", slot_id)
return token.slot_id
# If we got this far, slot_id was the only param given, so we return it
return slot_id
def get_session(self):
session = self._open_session(self.slot_id)

View File

@ -56,6 +56,8 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.cfg_mock.p11_crypto_plugin.hmac_label = 'hmac_label'
self.cfg_mock.p11_crypto_plugin.mkek_length = 32
self.cfg_mock.p11_crypto_plugin.slot_id = 1
self.cfg_mock.p11_crypto_plugin.token_serial_number = None
self.cfg_mock.p11_crypto_plugin.token_label = None
self.cfg_mock.p11_crypto_plugin.rw_session = True
self.cfg_mock.p11_crypto_plugin.pkek_length = 32
self.cfg_mock.p11_crypto_plugin.pkek_cache_ttl = 900
@ -288,6 +290,8 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
return pkcs11.CKR_OK
lib = mock.Mock()
lib.C_Initialize.return_value = pkcs11.CKR_OK
lib.C_GetSlotList.return_value = pkcs11.CKR_OK
lib.C_GetTokenInfo.return_value = pkcs11.CKR_OK
lib.C_OpenSession.return_value = pkcs11.CKR_OK
lib.C_CloseSession.return_value = pkcs11.CKR_OK
lib.C_GetSessionInfo.return_value = pkcs11.CKR_OK

View File

@ -30,6 +30,8 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.lib = mock.Mock()
self.lib.C_Initialize.return_value = pkcs11.CKR_OK
self.lib.C_Finalize.return_value = pkcs11.CKR_OK
self.lib.C_GetSlotList.side_effect = self._get_slot_list
self.lib.C_GetTokenInfo.side_effect = self._get_token_info
self.lib.C_OpenSession.side_effect = self._open_session
self.lib.C_CloseSession.return_value = pkcs11.CKR_OK
self.lib.C_GetSessionInfo.side_effect = self._get_session_user
@ -62,6 +64,10 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.hmac_keywrap_mechanism = 'CKM_SHA256_HMAC'
self.token_mock = mock.MagicMock()
self.token_mock.label = b'myLabel'
self.token_mock.serial_number = b'111111'
self.pkcs11 = pkcs11.PKCS11(
self.cfg_mock.library_path, self.cfg_mock.login_passphrase,
self.cfg_mock.rw_session, self.cfg_mock.slot_id,
@ -74,6 +80,31 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.ffi.buffer(buf)[:] = b'0' * length
return pkcs11.CKR_OK
def _get_slot_list(self, token_present, slot_ids_ptr, slots_ptr):
# default to mocking only one slot (ID: 1)
if slot_ids_ptr is not self.ffi.NULL:
slot_ids_ptr[0] = 1
slots_ptr[0] = 1
return pkcs11.CKR_OK
def _get_token_info(self, id, token_info_ptr):
token_info_ptr.serialNumber = self.token_mock.serial_number
token_info_ptr.label = self.token_mock.label
return pkcs11.CKR_OK
def _get_two_slot_list(self, token_present, slot_ids_ptr, slots_ptr):
# mock two slots (IDs: 1, 2)
if slot_ids_ptr is not self.ffi.NULL:
slot_ids_ptr[0] = 1
slot_ids_ptr[1] = 2
slots_ptr[0] = 2
return pkcs11.CKR_OK
def _get_two_token_info_same_label(self, id, token_info_ptr):
token_info_ptr.serialNumber = (str(id) * 6).encode('UTF-8')
token_info_ptr.label = self.token_mock.label
return pkcs11.CKR_OK
def _get_session_public(self, session, session_info_ptr):
if self.cfg_mock.rw_session:
session_info_ptr[0].state = pkcs11.CKS_RW_PUBLIC_SESSION
@ -146,6 +177,45 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def _verify(self, *args, **kwargs):
return pkcs11.CKR_OK
def test_get_slot_id_from_serial_number(self):
slot_id = self.pkcs11._get_slot_id('111111', None, 1)
self.assertEqual(1, slot_id)
def test_get_slot_id_from_label(self):
slot_id = self.pkcs11._get_slot_id(None, 'myLabel', 1)
self.assertEqual(1, slot_id)
def test_get_slot_id_backwards_compatibility(self):
slot_id = self.pkcs11._get_slot_id(None, None, 5)
self.assertEqual(5, slot_id)
def test_get_slot_id_from_serial_ignores_label(self):
slot_id = self.pkcs11._get_slot_id('111111', 'badLabel', 1)
self.assertEqual(1, slot_id)
def test_get_slot_id_from_serial_ignores_given_slot(self):
slot_id = self.pkcs11._get_slot_id('111111', None, 3)
self.assertEqual(1, slot_id)
def test_get_slot_id_from_label_ignores_given_slot(self):
slot_id = self.pkcs11._get_slot_id(None, 'myLabel', 3)
self.assertEqual(1, slot_id)
def test_get_slot_id_serial_not_found(self):
self.assertRaises(ValueError,
self.pkcs11._get_slot_id, '222222', None, 1)
def test_get_slot_id_label_not_found(self):
self.assertRaises(ValueError,
self.pkcs11._get_slot_id, None, 'badLabel', 1)
def test_get_slot_id_two_tokens_same_label(self):
self.lib.C_GetSlotList.side_effect = self._get_two_slot_list
self.lib.C_GetTokenInfo.side_effect = \
self._get_two_token_info_same_label
self.assertRaises(ValueError,
self.pkcs11._get_slot_id, None, 'myLabel', 1)
def test_public_get_session(self):
self.lib.C_GetSessionInfo.side_effect = self._get_session_public
sess = self.pkcs11.get_session()

View File

@ -98,6 +98,15 @@ The PKCS#11 plugin configuration looks like:
# Path to vendor PKCS11 library
library_path = '/usr/lib/libCryptoki2_64.so'
# Token serial number used to identify the token to be used. Required
# when the device has multiple tokens with the same label. (string
# value)
token_serial_number = 12345678
# Token label used to identify the token to be used. Required when
# token_serial_number is not specified. (string value)
#token_label = <None>
# Password to login to PKCS11 session
login = 'mypassword'
@ -110,8 +119,10 @@ The PKCS#11 plugin configuration looks like:
# Label to identify HMAC key in the HSM (must not be the same as MKEK label)
hmac_label = 'my_hmac_label'
# HSM Slot id (Should correspond to a configured PKCS11 slot). Default: 1
# slot_id = 1
# (Optional) HSM Slot ID that contains the token device to be used.
# (integer value)
#slot_id = 1
# Enable Read/Write session with the HSM?
# rw_session = True
@ -141,6 +152,15 @@ For a nCipher nShield Connect XC, the plugin configuration looks like:
# Path to vendor PKCS11 library
library_path = '/opt/nfast/toolkits/pkcs11/libcknfast.so'
# Token serial number used to identify the token to be used. Required
# when the device has multiple tokens with the same label. (string
# value)
token_serial_number = 12345678
# Token label used to identify the token to be used. Required when
# token_serial_number is not specified. (string value)
#token_label = <None>
# Password to login to PKCS11 session
login = 'XXX'
@ -153,7 +173,8 @@ For a nCipher nShield Connect XC, the plugin configuration looks like:
# Label to identify HMAC key in the HSM (must not be the same as MKEK label)
hmac_label = 'thales_hmac_0'
# HSM Slot id (Should correspond to a configured PKCS11 slot). Default: 1
# (Optional) HSM Slot ID that contains the token device to be used.
# (integer value)
# slot_id = 1
# Enable Read/Write session with the HSM?
@ -220,6 +241,15 @@ For an ATOS Bull HSM, the plugin configuration looks like:
# Path to vendor PKCS11 library
library_path = '/usr/lib64/libnethsm.so'
# Token serial number used to identify the token to be used. Required
# when the device has multiple tokens with the same label. (string
# value)
token_serial_number = 12345678
# Token label used to identify the token to be used. Required when
# token_serial_number is not specified. (string value)
#token_label = <None>
# Password to login to PKCS11 session
login = 'XXX'
@ -232,7 +262,8 @@ For an ATOS Bull HSM, the plugin configuration looks like:
# Label to identify HMAC key in the HSM (must not be the same as MKEK label)
hmac_label = 'atos_hmac_0'
# HSM Slot id (Should correspond to a configured PKCS11 slot). Default: 1
# (Optional) HSM Slot ID that contains the token device to be used.
# (integer value)
# slot_id = 1
# Enable Read/Write session with the HSM?
@ -293,6 +324,15 @@ The PKCS#11 plugin configuration looks like:
# Path to vendor PKCS11 library (string value)
library_path = '/opt/utimaco/lib/libcs_pkcs11_R2.so'
# Token serial number used to identify the token to be used. Required
# when the device has multiple tokens with the same label. (string
# value)
token_serial_number = 12345678
# Token label used to identify the token to be used. Required when
# token_serial_number is not specified. (string value)
#token_label = <None>
# Password to login to PKCS11 session (string value)
login = '$up3r$e<retP4ssw0rd'
@ -305,8 +345,9 @@ The PKCS#11 plugin configuration looks like:
# Master HMAC Key label (as stored in the HSM) (string value)
hmac_label = 'my_hmac_key'
# HSM Slot ID (integer value)
slot_id = 0
# (Optional) HSM Slot ID that contains the token device to be used.
# (integer value)
# slot_id = 1
# Flag for Read/Write Sessions (boolean value)
#rw_session = true

View File

@ -0,0 +1,14 @@
---
features:
- |
Added two options for the PKCS#11 Crypto Plugin:
`[p11_crypto_plugin]/token_serial_number` and
`[p11_crypto_plugin]/token_label`. Both are optional and can be used
instead of `[p11_crypto_plugin]/slot_id` to identify the Token to be
used by the PKCS#11 plugin. When either one of the new options is defined
the plugin will search all slots on the PKCS#11 device for a token that
matches the given value. `token_serial_number` has the highest precendence
and other values will be ignored when this value is set. If
`token_serial_number` is not set, then `token_label` has
the next highest precedence and `slot_id` will be ignored.
`slot_id` will be used when neither one of the new options is set.