Use barbican.conf in barbican-manage

This patch updates the hsm subcommand in barbican-manage to read
any required values from barbican.conf.  Users may continue to
specify those values as parameters in the command line, and those
values will take precedence over values in barbican.conf.

Existing scripts that call barbican-manage should continue to work
as expected as the values passed will be used instead of looking
into barbican.conf.

Change-Id: I4e86e73bbdef0e16d3699cec1cc8f7e17dfb643b
(cherry picked from commit 666034475a)
This commit is contained in:
Douglas Mendizábal 2020-10-22 15:05:33 -05:00
parent 601f5ec733
commit 606ff8e77b
4 changed files with 217 additions and 83 deletions

View File

@ -32,6 +32,7 @@ from barbican.common import config
from barbican.model import clean
from barbican.model.migration import commands
from barbican.model import sync
from barbican.plugin.crypto import p11_crypto
from barbican.plugin.crypto import pkcs11
import barbican.version
@ -71,8 +72,8 @@ class DbCommands(object):
@args('--soft-delete-expired-secrets', '-e', action='store_true',
dest='do_soft_delete_expired_secrets', default=False,
help='Soft delete secrets that are expired.')
def clean(self, dburl=None, min_days=None, verbose=None, log_file=None,
do_clean_unassociated_projects=None,
def clean(self, conf, dburl=None, min_days=None, verbose=None,
log_file=None, do_clean_unassociated_projects=None,
do_soft_delete_expired_secrets=None):
"""Clean soft deletions in the database"""
if dburl is None:
@ -96,7 +97,7 @@ class DbCommands(object):
help='the message for the DB change')
@args('--autogenerate', action="store_true", dest='autogen',
default=False, help='autogenerate from models')
def revision(self, dburl=None, message=None, autogen=None):
def revision(self, conf, dburl=None, message=None, autogen=None):
"""Process the 'revision' Alembic command."""
if dburl is None:
commands.generate(autogenerate=autogen, message=str(message),
@ -112,7 +113,7 @@ class DbCommands(object):
@args('--version', '-v', metavar='<version>', default='head',
help='the version to upgrade to, or else '
'the latest/head if not specified.')
def upgrade(self, dburl=None, version=None):
def upgrade(self, conf, dburl=None, version=None):
"""Process the 'upgrade' Alembic command."""
if dburl is None:
commands.upgrade(to_version=str(version),
@ -126,7 +127,7 @@ class DbCommands(object):
help='barbican database URL')
@args('--verbose', '-V', action='store_true', dest='verbose',
default=False, help='Show full information about the revisions.')
def history(self, dburl=None, verbose=None):
def history(self, conf, dburl=None, verbose=None):
if dburl is None:
commands.history(verbose, sql_url=CONF.sql_connection)
else:
@ -138,7 +139,7 @@ class DbCommands(object):
help='barbican database URL')
@args('--verbose', '-V', action='store_true', dest='verbose',
default=False, help='Show full information about the revisions.')
def current(self, dburl=None, verbose=None):
def current(self, conf, dburl=None, verbose=None):
if dburl is None:
commands.current(verbose, sql_url=CONF.sql_connection)
else:
@ -154,7 +155,8 @@ class DbCommands(object):
dest='log_file',
help='Set log file location. '
'Default value for log_file can be found in barbican.conf')
def sync_secret_stores(self, dburl=None, verbose=None, log_file=None):
def sync_secret_stores(self, conf, dburl=None, verbose=None,
log_file=None):
"""Sync secret_stores table with barbican.conf"""
if dburl is None:
dburl = CONF.sql_connection
@ -170,61 +172,66 @@ class DbCommands(object):
class HSMCommands(object):
"""Class for managing HSM/pkcs11 plugin"""
_CKK_AES = 'CKK_AES'
description = "Subcommands for managing HSM/PKCS11"
check_mkek_description = "Checks if a MKEK label is available"
@args('--library-path', metavar='<library-path>', dest='libpath',
default='/usr/lib/libCryptoki2_64.so',
help='Path to vendor PKCS11 library')
@args('--slot-id', metavar='<slot-id>', dest='slotid', default=1,
help='HSM Slot id (Should correspond to a configured PKCS11 slot, \
default is 1)')
@args('--passphrase', metavar='<passphrase>', default=None, required=True,
help='Password to login to PKCS11 session')
@args('--label', '-L', metavar='<label>', default='primarymkek',
help='The label of the Master Key Encrypt Key')
help='Path to vendor PKCS#11 library')
@args('--slot-id', metavar='<slot-id>', dest='slotid',
help='HSM Slot ID containing Token to be used.')
@args('--passphrase', metavar='<passphrase>',
help='Password (PIN) to login to PKCS#11 Token')
@args('--label', '-L', metavar='<label>',
help='The label of the Master Key Encryption Key')
@args('--hmac-wrap-mechanism', metavar='<hmac key wrap mechanism>',
dest='hmacwrap', default='CKM_SHA256_HMAC',
help='HMAC Key wrap mechanism, default is CKM_SHA256_HMAC')
def check_mkek(self, passphrase, libpath=None, slotid=None, label=None,
hmacwrap=None):
CKK_AES = 'CKK_AES'
self._create_pkcs11_session(str(passphrase), str(libpath),
int(slotid), str(hmacwrap))
handle = self.pkcs11.get_key_handle(CKK_AES, str(label), self.session)
dest='hmacwrap',
help='HMAC Key wrap mechanism')
def check_mkek(self, conf, passphrase=None, libpath=None, slotid=None,
label=None, hmacwrap=None):
self._create_pkcs11_session(conf, passphrase, libpath, slotid,
hmacwrap)
if label is None:
label = conf.p11_crypto_plugin.mkek_label
handle = self.pkcs11.get_key_handle(self._CKK_AES, label, self.session)
self.pkcs11.return_session(self.session)
if not handle:
print("Label {label} is not set.".format(label=label))
sys.exit(1)
print("Key labeled {} found!".format(label))
gen_mkek_description = "Generates a new MKEK"
@args('--library-path', metavar='<library-path>', dest='libpath',
default='/usr/lib/libCryptoki2_64.so',
help='Path to vendor PKCS11 library')
@args('--slot-id', metavar='<slot-id>', dest='slotid', default=1,
help='HSM Slot id (Should correspond to a configured PKCS11 slot, \
default is 1)')
@args('--passphrase', metavar='<passphrase>', default=None, required=True,
help='Password to login to PKCS11 session')
@args('--label', '-L', metavar='<label>', default='primarymkek',
help='The label of the Master Key Encrypt Key')
@args('--length', '-l', metavar='<length>', default=32,
@args('--slot-id', metavar='<slot-id>', dest='slotid',
help='HSM Slot ID containing Token to be used.')
@args('--passphrase', metavar='<passphrase>',
help='Password (PIN) to login to PKCS#11 Token')
@args('--label', '-L', metavar='<label>',
help='The label of the Master Key Encryption Key')
@args('--length', '-l', metavar='<length>',
help='The length in bytes of the Master Key Encryption Key'
' (default is 32)')
@args('--hmac-wrap-mechanism', metavar='<hmac key wrap mechanism>',
dest='hmacwrap', default='CKM_SHA256_HMAC',
dest='hmacwrap',
help='HMAC Key wrap mechanism, default is CKM_SHA256_HMAC')
def gen_mkek(self, passphrase, libpath=None, slotid=None, label=None,
length=None, hmacwrap=None):
CKK_AES = 'CKK_AES'
def gen_mkek(self, conf, passphrase=None, libpath=None, slotid=None,
label=None, length=None, hmacwrap=None):
CKM_AES_KEY_GEN = 'CKM_AES_KEY_GEN'
self._create_pkcs11_session(str(passphrase), str(libpath),
int(slotid), str(hmacwrap))
self._verify_label_does_not_exist(CKK_AES, str(label), self.session)
self.pkcs11.generate_key(CKK_AES, int(length), CKM_AES_KEY_GEN,
self.session, str(label),
self._create_pkcs11_session(conf, passphrase, libpath, slotid,
hmacwrap)
if label is None:
label = conf.p11_crypto_plugin.mkek_label or 'primarymkek'
self._verify_label_does_not_exist(self._CKK_AES, label, self.session)
if length is None:
length = conf.p11_crypto_plugin.mkek_length or 32
if type(length) is not int:
length = int(length)
self.pkcs11.generate_key(self._CKK_AES, length, CKM_AES_KEY_GEN,
self.session, label,
encrypt=True, wrap=True, master_key=True)
self.pkcs11.return_session(self.session)
print("MKEK successfully generated!")
@ -232,61 +239,72 @@ class HSMCommands(object):
check_hmac_description = "Checks if a HMAC key label is available"
@args('--library-path', metavar='<library-path>', dest='libpath',
default='/usr/lib/libCryptoki2_64.so',
help='Path to vendor PKCS11 library')
@args('--slot-id', metavar='<slot-id>', dest='slotid', default=1,
help='HSM Slot id (Should correspond to a configured PKCS11 slot, \
default is 1)')
@args('--passphrase', metavar='<passphrase>', default=None, required=True,
help='Password to login to PKCS11 session')
@args('--label', '-L', metavar='<label>', default='primarymkek',
help='Path to vendor PKCS#11 library')
@args('--slot-id', metavar='<slot-id>', dest='slotid',
help='HSM Slot ID containing Token to be used.')
@args('--passphrase', metavar='<passphrase>',
help='Password (PIN) to login to PKCS#11 Token')
@args('--label', '-L', metavar='<label>',
help='The label of the Master HMAC key')
@args('--key-type', '-t', metavar='<key type>', dest='keytype',
default='CKK_AES', help='The HMAC Key Type (e.g. CKK_AES)')
help='The HMAC Key Type (e.g. CKK_AES)')
@args('--hmac-wrap-mechanism', metavar='<hmac key wrap mechanism>',
dest='hmacwrap', default='CKM_SHA256_HMAC',
help='HMAC Key wrap mechanism, default is CKM_SHA256_HMAC')
def check_hmac(self, passphrase, libpath=None, slotid=None, label=None,
keytype=None, hmacwrap=None):
self._create_pkcs11_session(str(passphrase), str(libpath),
int(slotid), str(hmacwrap))
handle = self.pkcs11.get_key_handle(str(keytype), str(label),
self.session)
dest='hmacwrap',
help='HMAC Key wrap mechanism')
def check_hmac(self, conf, passphrase=None, libpath=None, slotid=None,
label=None, keytype=None, hmacwrap=None):
self._create_pkcs11_session(conf, passphrase, libpath, slotid,
hmacwrap)
if label is None:
label = conf.p11_crypto_plugin.hmac_label
if keytype is None:
keytype = conf.p11_crypto_plugin.hmac_key_type
handle = self.pkcs11.get_key_handle(keytype, label, self.session)
self.pkcs11.return_session(self.session)
if not handle:
print("Label {label} is not set.".format(label=label))
sys.exit(1)
print("Key labeled {} found!".format(label))
gen_hmac_description = "Generates a new HMAC key"
@args('--library-path', metavar='<library-path>', dest='libpath',
default='/usr/lib/libCryptoki2_64.so',
help='Path to vendor PKCS11 library')
@args('--slot-id', metavar='<slot-id>', dest='slotid', default=1,
help='HSM Slot id (Should correspond to a configured PKCS11 slot, \
default is 1)')
@args('--passphrase', metavar='<passphrase>', default=None, required=True,
help='Password to login to PKCS11 session')
@args('--label', '-L', metavar='<label>', default='primarymkek',
@args('--slot-id', metavar='<slot-id>', dest='slotid',
help='HSM Slot ID containing Token to be used.')
@args('--passphrase', metavar='<passphrase>',
help='Password (PIN) to login to PKCS#11 Token')
@args('--label', '-L', metavar='<label>',
help='The label of the Master HMAC Key')
@args('--key-type', '-t', metavar='<key type>', dest='keytype',
default='CKK_AES', help='The HMAC Key Type (e.g. CKK_AES)')
@args('--length', '-l', metavar='<length>', default=32,
help='The HMAC Key Type (e.g. CKK_AES)')
@args('--length', '-l', metavar='<length>',
help='The length in bytes of the Master HMAC Key (default is 32)')
@args('--mechanism', '-m', metavar='<mechanism>',
default='CKM_AES_KEY_GEN', help='The HMAC Key Generation mechanism')
help='The HMAC Key Generation mechanism')
@args('--hmac-wrap-mechanism', metavar='<hmac key wrap mechanism>',
dest='hmacwrap', default='CKM_SHA256_HMAC',
dest='hmacwrap',
help='HMAC Key wrap mechanism, default is CKM_SHA256_HMAC')
def gen_hmac(self, passphrase, libpath=None, slotid=None, label=None,
keytype=None, mechanism=None, length=None, hmacwrap=None):
self._create_pkcs11_session(str(passphrase), str(libpath), int(slotid),
str(hmacwrap))
self._verify_label_does_not_exist(str(keytype), str(label),
self.session)
self.pkcs11.generate_key(str(keytype), int(length), str(mechanism),
self.session, str(label),
sign=True, master_key=True)
def gen_hmac(self, conf, passphrase=None, libpath=None, slotid=None,
label=None, keytype=None, mechanism=None, length=None,
hmacwrap=None):
self._create_pkcs11_session(conf, passphrase, libpath, slotid,
hmacwrap)
if label is None:
label = conf.p11_crypto_plugin.hmac_label or 'primaryhmac'
if keytype is None:
keytype = conf.p11_crypto_plugin.hmac_key_type
self._verify_label_does_not_exist(keytype, label, self.session)
if length is None:
# barbican.conf doesn't have an HMAC length
length = 32 # bytes
elif type(length) is not int:
length = int(length)
if mechanism is None:
mechanism = conf.p11_crypto_plugin.hmac_keygen_mechanism
self.pkcs11.generate_key(keytype, length, mechanism, self.session,
label, sign=True, master_key=True)
self.pkcs11.return_session(self.session)
print("HMAC successfully generated!")
@ -294,18 +312,31 @@ class HSMCommands(object):
@args('--dry-run', action="store_true", dest='dryrun', default=False,
help='Displays changes that will be made (Non-destructive)')
def rewrap_pkek(self, dryrun=None):
def rewrap_pkek(self, conf, dryrun=None):
rewrapper = pkcs11_rewrap.KekRewrap(pkcs11_rewrap.CONF)
rewrapper.execute(dryrun)
rewrapper.pkcs11.return_session(rewrapper.hsm_session)
def _create_pkcs11_session(self, passphrase, libpath, slotid,
def _create_pkcs11_session(self, conf, passphrase, libpath, slotid,
hmacwrap):
if passphrase is None:
passphrase = conf.p11_crypto_plugin.login
if libpath is None:
libpath = conf.p11_crypto_plugin.library_path
if slotid is None:
slotid = conf.p11_crypto_plugin.slot_id
elif type(slotid) is not int:
slotid = int(slotid)
if hmacwrap is None:
hmacwrap = conf.p11_crypto_plugin.hmac_keywrap_mechanism
self.pkcs11 = pkcs11.PKCS11(
library_path=libpath, login_passphrase=passphrase,
rw_session=True, slot_id=slotid,
encryption_mechanism='CKM_AES_CBC',
hmac_keywrap_mechanism=hmacwrap
hmac_keywrap_mechanism=hmacwrap,
token_serial_number=conf.p11_crypto_plugin.token_serial_number,
token_label=conf.p11_crypto_plugin.token_label
)
self.session = self.pkcs11.get_session()
@ -386,6 +417,7 @@ def main():
"""Parse options and call the appropriate class/method."""
CONF = config.new_config()
CONF.register_cli_opt(category_opt)
p11_crypto.register_opts(CONF)
try:
logging.register_options(CONF)
@ -415,7 +447,7 @@ def main():
# call the action with the remaining arguments
try:
return fn(*fn_args, **fn_kwargs)
return fn(CONF, *fn_args, **fn_kwargs)
except Exception as e:
sys.exit("ERROR: %s" % e)

View File

@ -107,6 +107,11 @@ def list_opts():
yield p11_crypto_plugin_group, p11_crypto_plugin_opts
def register_opts(conf):
for group, options in list_opts():
conf.register_opts(options, group)
def json_dumps_compact(data):
return json.dumps(data, separators=(',', ':'))

View File

@ -161,10 +161,42 @@ class TestBarbicanManage(TestBarbicanManageBase):
mock_pkcs11.return_value.get_session.return_value = 1
mock_pkcs11.return_value.get_key_handle.return_value = 1
mock_getkey = mock_pkcs11.return_value.get_key_handle
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'check_mkek',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd',
'--slot', '0', '--label', 'mocklabel'], mock_getkey, 'CKK_AES',
'mocklabel', 1)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_check_mkek_no_label(self, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
mock_pkcs11.return_value.get_key_handle.return_value = 1
mock_getkey = mock_pkcs11.return_value.get_key_handle
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'check_mkek',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd'],
mock_getkey, 'CKK_AES', None, 1)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_check_mkek_defaults(self, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
mock_pkcs11.return_value.get_key_handle.return_value = 1
mock_getkey = mock_pkcs11.return_value.get_key_handle
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'check_mkek'],
mock_getkey, 'CKK_AES', None, 1)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
@mock.patch('sys.exit')
def test_hsm_check_mkek_not_found(self, mock_exit, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
mock_pkcs11.return_value.get_key_handle.return_value = None
mock_getkey = mock_pkcs11.return_value.get_key_handle
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'check_mkek',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd',
'--label', 'mocklabel'], mock_getkey, 'CKK_AES', 'mocklabel', 1)
mock_exit.assert_called_once_with(1)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_gen_mkek(self, mock_pkcs11):
@ -179,6 +211,19 @@ class TestBarbicanManage(TestBarbicanManageBase):
32, 'CKM_AES_KEY_GEN', 1, 'mocklabel', encrypt=True, wrap=True,
master_key=True)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_gen_mkek_default_label(self, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
mock_pkcs11.return_value.get_key_handle.return_value = None
mock_pkcs11.return_value.generate_key.return_value = 0
mock_genkey = mock_pkcs11.return_value.generate_key
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'gen_mkek',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd'],
mock_genkey, 'CKK_AES',
32, 'CKM_AES_KEY_GEN', 1, 'primarymkek', encrypt=True, wrap=True,
master_key=True)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_gen_hmac(self, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
@ -214,6 +259,28 @@ class TestBarbicanManage(TestBarbicanManageBase):
'--library-path', 'mocklib', '--passphrase', 'mockpassewd',
'--label', 'mocklabel'], mock_getkey, 'CKK_AES', 'mocklabel', 1)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_check_hmac_no_label(self, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
mock_pkcs11.return_value.get_key_handle.return_value = 1
mock_getkey = mock_pkcs11.return_value.get_key_handle
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'check_hmac',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd'],
mock_getkey, 'CKK_AES', None, 1)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
@mock.patch('sys.exit')
def test_hsm_check_hmac_not_found(self, mock_exit, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
mock_pkcs11.return_value.get_key_handle.return_value = None
mock_getkey = mock_pkcs11.return_value.get_key_handle
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'check_hmac',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd',
'--label', 'mocklabel'], mock_getkey, 'CKK_AES', 'mocklabel', 1)
mock_exit.assert_called_once_with(1)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_gen_hmac_non_default_length(self, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
@ -226,3 +293,23 @@ class TestBarbicanManage(TestBarbicanManageBase):
'--passphrase', 'mockpassewd', '--label', 'mocklabel'],
mock_genkey, 'CKK_AES', 48, 'CKM_AES_KEY_GEN', 1, 'mocklabel',
sign=True, master_key=True)
@mock.patch('barbican.plugin.crypto.pkcs11.PKCS11')
def test_hsm_gen_hmac_default_label(self, mock_pkcs11):
mock_pkcs11.return_value.get_session.return_value = 1
mock_pkcs11.return_value.get_key_handle.return_value = None
mock_pkcs11.return_value.generate_key.return_value = 0
mock_genkey = mock_pkcs11.return_value.generate_key
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'gen_hmac',
'--library-path', 'mocklib', '--passphrase', 'mockpassewd'],
mock_genkey, 'CKK_AES',
32, 'CKM_AES_KEY_GEN', 1, 'primaryhmac', sign=True,
master_key=True)
@mock.patch('barbican.cmd.barbican_manage.pkcs11_rewrap')
def test_rewrap_pkek(self, mock_rewrap):
mock_execute = mock_rewrap.KekRewrap.return_value.execute
self._main_test_helper(
['barbican.cmd.barbican_manage', 'hsm', 'rewrap_pkek',
'--dry-run'], mock_execute, True)

View File

@ -0,0 +1,10 @@
---
features:
- |
The hsm subcommand for the barbican-manage command line tool no longer
requires any parameters at run time. If any value used by the PKCS#11
value is needed it will be taken from /etc/barbican/barbican.conf.
You may continue to specify any values on the command line, and those
will take precedence over the values specified in barbican.conf, so any
existing scripts that use barbican-manage should continue to work as
expected.