Changes for multiple backend conf and friendly plugin names (Part 2)

Added feature flag to enable/disable multiple backend support.

Added friendly plugin name in secretstore and crypto plugins conf.

Modified way of defining multiple plugin conf. With multiple backend
support, now plugins supported and global default plugin is defined
explictly in service configuration. With feature flag on, plugins
to be loaded are identified via reading new configuration structure.

Change-Id: I3c761c5a441f42c9f54a67d72514e8c1357a2e6a
Partially-Implements: blueprint multiple-secret-backend
This commit is contained in:
Arun Kant 2016-08-01 12:29:59 -07:00
parent 14ae36d3e5
commit f4141861ef
20 changed files with 648 additions and 15 deletions

View File

@ -264,3 +264,20 @@ def set_middleware_defaults():
CONF = new_config()
LOG = logging.getLogger(__name__)
parse_args(CONF)
# Adding global scope dict for all different configs created in various
# modules. In barbican, each plugin module creates its own *new* config
# instance so its error prone to share/access config values across modules
# as these module imports introduce a cyclic dependency. To avoid this, each
# plugin can set this dict after its own config instance is created and parsed.
_CONFIGS = {}
def set_module_config(name, module_conf):
"""Each plugin can set its own conf instance with its group name."""
_CONFIGS[name] = module_conf
def get_module_config(name):
"""Get handle to plugin specific config instance by its group name."""
return _CONFIGS[name]

View File

@ -530,3 +530,36 @@ class P11CryptoKeyHandleException(PKCS11Exception):
class P11CryptoTokenException(PKCS11Exception):
message = u._("No token was found in slot %(slot_id)s")
class MultipleSecretStoreLookupFailed(BarbicanException):
"""Raised when a plugin lookup suffix is missing during config read."""
def __init__(self):
msg = u._("Plugin lookup property 'stores_lookup_suffix' is not "
"defined in service configuration")
super(MultipleSecretStoreLookupFailed, self).__init__(msg)
class MultipleStoreIncorrectGlobalDefault(BarbicanException):
"""Raised when a plugin lookup is missing or failed during config read."""
def __init__(self, occurence):
msg = None
if occurence > 1:
msg = u._("There are {count} plugins with global default as "
"True in service configuration. Only one plugin can have"
" this as True").format(count=occurence)
else:
msg = u._("There is no plugin defined with global default as True."
" One of plugin must be identified as global default")
super(MultipleStoreIncorrectGlobalDefault, self).__init__(msg)
class MultipleStorePluginValueMissing(BarbicanException):
"""Raised when a store plugin value is missing in service configuration."""
def __init__(self, section_name):
super(MultipleStorePluginValueMissing, self).__init__(
u._("In section '{0}', secret_store_plugin value is missing"
).format(section_name)
)
self.section_name = section_name

View File

@ -36,6 +36,13 @@ CONF = config.CONF
# Current API version
API_VERSION = 'v1'
# Added here to remove cyclic dependency.
# In barbican.model.models module SecretType.OPAQUE was imported from
# barbican.plugin.interface.secret_store which introduces a cyclic dependency
# if `secret_store` plugin needs to use db model classes. So moving shared
# value to another common python module which is already imported in both.
SECRET_TYPE_OPAQUE = "opaque"
def _do_allow_certain_content_types(func, content_types_list=[]):
# Allows you to bypass pecan's content-type restrictions
@ -176,3 +183,8 @@ def get_class_for(module_name, class_name):
def generate_uuid():
return str(uuid.uuid4())
def is_multiple_backends_enabled():
secretstore_conf = config.get_module_config('secretstore')
return secretstore_conf.secretstore.enable_multiple_secret_stores

View File

@ -31,7 +31,6 @@ from sqlalchemy import types as sql_types
from barbican.common import exception
from barbican.common import utils
from barbican import i18n as u
from barbican.plugin.interface import secret_store
LOG = utils.getLogger(__name__)
BASE = declarative.declarative_base()
@ -277,7 +276,7 @@ class Secret(BASE, SoftDeleteMixIn, ModelBase):
name = sa.Column(sa.String(255))
secret_type = sa.Column(sa.String(255),
server_default=secret_store.SecretType.OPAQUE)
server_default=utils.SECRET_TYPE_OPAQUE)
expiration = sa.Column(sa.DateTime, default=None)
algorithm = sa.Column(sa.String(255))
bit_length = sa.Column(sa.Integer)
@ -317,7 +316,7 @@ class Secret(BASE, SoftDeleteMixIn, ModelBase):
self.name = parsed_request.get('name')
self.secret_type = parsed_request.get(
'secret_type',
secret_store.SecretType.OPAQUE)
utils.SECRET_TYPE_OPAQUE)
expiration = self._iso_to_datetime(parsed_request.get
('expiration'))
self.expiration = expiration

View File

@ -240,6 +240,18 @@ class CryptoPluginBase(object):
persist the data that is assigned to these DTOs by the plugin.
"""
@abc.abstractmethod
def get_plugin_name(self):
"""Gets user friendly plugin name.
This plugin name is expected to be read from config file.
There will be a default defined for plugin name which can be customized
in specific deployment if needed.
This name needs to be unique across a deployment.
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def encrypt(self, encrypt_dto, kek_meta_dto, project_id):
"""Encryption handler function

View File

@ -20,6 +20,7 @@ from barbican.common import utils
from barbican import i18n as u
from barbican.plugin.crypto import crypto
from barbican.plugin.interface import secret_store
from barbican.plugin.util import multiple_backends
from barbican.plugin.util import utils as plugin_utils
@ -47,6 +48,8 @@ CONF.register_group(crypto_opt_group)
CONF.register_opts(crypto_opts, group=crypto_opt_group)
config.parse_args(CONF)
config.set_module_config("crypto", CONF)
class _CryptoPluginManager(named.NamedExtensionManager):
def __init__(self, conf=CONF, invoke_args=(), invoke_kwargs={}):
@ -57,12 +60,16 @@ class _CryptoPluginManager(named.NamedExtensionManager):
initializing a new instance of this class use the PLUGIN_MANAGER
at the module level.
"""
crypto_conf = config.get_module_config('crypto')
plugin_names = self._get_internal_plugin_names(crypto_conf)
super(_CryptoPluginManager, self).__init__(
conf.crypto.namespace,
conf.crypto.enabled_crypto_plugins,
crypto_conf.crypto.namespace,
plugin_names,
invoke_on_load=False, # Defer creating plugins to utility below.
invoke_args=invoke_args,
invoke_kwds=invoke_kwargs
invoke_kwds=invoke_kwargs,
name_order=True # extensions sorted as per order of plugin names
)
plugin_utils.instantiate_plugins(
@ -111,6 +118,22 @@ class _CryptoPluginManager(named.NamedExtensionManager):
return decrypting_plugin
def _get_internal_plugin_names(self, crypto_conf):
"""Gets plugin names used for loading via stevedore.
When multiple secret store support is enabled, then crypto plugin names
are read via updated configuration structure. If not enabled, then it
reads MultiStr property in 'crypto' config section.
"""
if utils.is_multiple_backends_enabled():
parsed_stores = multiple_backends.read_multiple_backends_config()
plugin_names = [store.crypto_plugin for store in parsed_stores
if store.crypto_plugin]
else:
plugin_names = crypto_conf.crypto.enabled_crypto_plugins
return plugin_names
def get_manager():
"""Return a singleton crypto plugin manager."""

View File

@ -69,6 +69,9 @@ p11_crypto_plugin_opts = [
cfg.IntOpt('seed_length',
help=u._('Amount of data to read from file for seed'),
default=32),
cfg.StrOpt('plugin_name',
help=u._('User friendly plugin name'),
default='PKCS11 HSM'),
]
CONF.register_group(p11_crypto_plugin_group)
CONF.register_opts(p11_crypto_plugin_opts, group=p11_crypto_plugin_group)
@ -104,6 +107,9 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
self._configure_object_cache()
def get_plugin_name(self):
return self.conf.p11_crypto_plugin.plugin_name
def encrypt(self, encrypt_dto, kek_meta_dto, project_id):
return self._call_pkcs11(self._encrypt, encrypt_dto, kek_meta_dto,
project_id)

View File

@ -34,7 +34,10 @@ simple_crypto_plugin_opts = [
cfg.StrOpt('kek',
default='dGhpcnR5X3R3b19ieXRlX2tleWJsYWhibGFoYmxhaGg=',
help=u._('Key encryption key to be used by Simple Crypto '
'Plugin'), secret=True)
'Plugin'), secret=True),
cfg.StrOpt('plugin_name',
help=u._('User friendly plugin name'),
default='Software Only Crypto'),
]
CONF.register_group(simple_crypto_plugin_group)
CONF.register_opts(simple_crypto_plugin_opts, group=simple_crypto_plugin_group)
@ -46,11 +49,15 @@ class SimpleCryptoPlugin(c.CryptoPluginBase):
def __init__(self, conf=CONF):
self.master_kek = conf.simple_crypto_plugin.kek
self.plugin_name = conf.simple_crypto_plugin.plugin_name
LOG.warning(u._LW("This plugin is NOT meant for a production "
"environment. This is meant just for development "
"and testing purposes. Please use another plugin "
"for production."))
def get_plugin_name(self):
return self.plugin_name
def _get_kek(self, kek_meta_dto):
if not kek_meta_dto.plugin_meta:
raise ValueError(u._('KEK not yet created.'))

View File

@ -74,7 +74,10 @@ dogtag_plugin_opts = [
default=cm.CA_INFO_DEFAULT_EXPIRATION_DAYS,
help=u._('Time in days for CA entries to expire')),
cfg.StrOpt('plugin_working_dir',
help=u._('Working directory for Dogtag plugin'))
help=u._('Working directory for Dogtag plugin')),
cfg.StrOpt('plugin_name',
help=u._('User friendly plugin name'),
default='Dogtag KRA'),
]
CONF.register_group(dogtag_plugin_group)
@ -203,9 +206,13 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
self.keyclient = kraclient.keys
self.keyclient.set_transport_cert(KRA_TRANSPORT_NICK)
self.plugin_name = conf.dogtag_plugin.plugin_name
LOG.debug(u._("completed DogtagKRAPlugin init"))
def get_plugin_name(self):
return self.plugin_name
def store_secret(self, secret_dto):
"""Store a secret in the KRA

View File

@ -23,6 +23,7 @@ from barbican.common import config
from barbican.common import exception
from barbican.common import utils
from barbican import i18n as u
from barbican.plugin.util import multiple_backends
from barbican.plugin.util import utils as plugin_utils
@ -42,12 +43,24 @@ store_opts = [
cfg.MultiStrOpt('enabled_secretstore_plugins',
default=DEFAULT_PLUGINS,
help=u._('List of secret store plugins to load.')
)
),
cfg.BoolOpt('enable_multiple_secret_stores',
default=False,
help=u._('Flag to enable multiple secret store plugin'
' backend support. Default is False')
),
cfg.ListOpt('stores_lookup_suffix',
default=None,
help=u._('List of suffix to use for looking up plugins which '
'are supported with multiple backend support.')
)
]
CONF.register_group(store_opt_group)
CONF.register_opts(store_opts, group=store_opt_group)
config.parse_args(CONF)
config.set_module_config("secretstore", CONF)
class SecretStorePluginNotFound(exception.BarbicanHTTPException):
"""Raised when no plugins are installed."""
@ -238,7 +251,7 @@ class SecretType(object):
opaque data. Opaque data can be any kind of data. This data type signals to
Barbican to just store the information and do not worry about the format or
encoding. This is the default type if no type is specified by the user."""
OPAQUE = "opaque"
OPAQUE = utils.SECRET_TYPE_OPAQUE
class KeyAlgorithm(object):
@ -345,6 +358,18 @@ class AsymmetricKeyMetadataDTO(object):
@six.add_metaclass(abc.ABCMeta)
class SecretStoreBase(object):
@abc.abstractmethod
def get_plugin_name(self):
"""Gets user friendly plugin name.
This plugin name is expected to be read from config file.
There will be a default defined for plugin name which can be customized
in specific deployment if needed.
This name needs to be unique across a deployment.
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def generate_symmetric_key(self, key_spec):
"""Generate a new symmetric key and store it.
@ -496,12 +521,16 @@ def _enforce_extensions_configured(plugin_related_function):
class SecretStorePluginManager(named.NamedExtensionManager):
def __init__(self, conf=CONF, invoke_args=(), invoke_kwargs={}):
ss_conf = config.get_module_config('secretstore')
plugin_names = self._get_internal_plugin_names(ss_conf)
super(SecretStorePluginManager, self).__init__(
conf.secretstore.namespace,
conf.secretstore.enabled_secretstore_plugins,
ss_conf.secretstore.namespace,
plugin_names,
invoke_on_load=False, # Defer creating plugins to utility below.
invoke_args=invoke_args,
invoke_kwds=invoke_kwargs
invoke_kwds=invoke_kwargs,
name_order=True # extensions sorted as per order of plugin names
)
plugin_utils.instantiate_plugins(
@ -574,6 +603,23 @@ class SecretStorePluginManager(named.NamedExtensionManager):
return plugin
raise SecretStoreSupportedPluginNotFound()
def _get_internal_plugin_names(self, secretstore_conf):
"""Gets plugin names used for loading via stevedore.
When multiple secret store support is enabled, then secret store plugin
names are read via updated configuration structure. If not enabled,
then it reads MultiStr property in 'secretstore' config section.
"""
if utils.is_multiple_backends_enabled():
parsed_stores = multiple_backends.read_multiple_backends_config()
plugin_names = [store.store_plugin for store in parsed_stores
if store.store_plugin]
else:
plugin_names = secretstore_conf.secretstore.\
enabled_secretstore_plugins
return plugin_names
def get_manager():
global _SECRET_STORE

View File

@ -77,7 +77,10 @@ kmip_opts = [
cfg.BoolOpt('pkcs1_only',
default=False,
help=u._('Only support PKCS#1 encoding of asymmetric keys'),
)
),
cfg.StrOpt('plugin_name',
help=u._('User friendly plugin name'),
default='KMIP HSM'),
]
CONF.register_group(kmip_opt_group)
CONF.register_opts(kmip_opts, group=kmip_opt_group)
@ -217,6 +220,8 @@ class KMIPSecretStore(ss.SecretStoreBase):
enums.CryptographicAlgorithm.RSA: ss.KeyAlgorithm.RSA
}
self.plugin_name = conf.kmip_plugin.plugin_name
if conf.kmip_plugin.keyfile is not None:
self._validate_keyfile_permissions(conf.kmip_plugin.keyfile)
@ -252,6 +257,9 @@ class KMIPSecretStore(ss.SecretStoreBase):
username=config.username,
password=config.password)
def get_plugin_name(self):
return self.plugin_name
def generate_symmetric_key(self, key_spec):
"""Generate a symmetric key.

View File

@ -0,0 +1,103 @@
# (c) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from oslo_config import cfg
from barbican.common import config
from barbican.common import exception
from barbican.common import utils
from barbican import i18n as u
LOG = utils.getLogger(__name__)
LOOKUP_PLUGINS_PREFIX = "secretstore:"
def read_multiple_backends_config():
"""Reads and validates multiple backend related configuration.
Multiple backends configuration is read only when multiple secret store
flag is enabled.
Configuration is validated to make sure that section specific to
provided suffix exists in service configuration. Also validated that only
one of section has global_default = True and its not missing.
"""
conf = config.get_module_config('secretstore')
parsed_stores = None
if utils.is_multiple_backends_enabled():
suffix_list = conf.secretstore.stores_lookup_suffix
if not suffix_list:
raise exception.MultipleSecretStoreLookupFailed()
def register_options_dynamically(conf, group_name):
store_opt_group = cfg.OptGroup(
name=group_name, title='Plugins needed for this backend')
store_opts = [
cfg.StrOpt('secret_store_plugin',
default=None,
help=u._('Internal name used to identify'
'secretstore_plugin')
),
cfg.StrOpt('crypto_plugin',
default=None,
help=u._('Internal name used to identify '
'crypto_plugin.')
),
cfg.BoolOpt('global_default',
default=False,
help=u._('Flag to indicate if this plugin is '
'global default plugin for deployment. '
'Default is False.')
),
]
conf.register_group(store_opt_group)
conf.register_opts(store_opts, group=store_opt_group)
group_names = []
# construct group names using those suffix and dynamically register
# oslo config options under that group name
for suffix in suffix_list:
group_name = LOOKUP_PLUGINS_PREFIX + suffix
register_options_dynamically(conf, group_name)
group_names.append(group_name)
store_conf = collections.namedtuple('store_conf', ['store_plugin',
'crypto_plugin',
'global_default'])
parsed_stores = []
global_default_count = 0
# Section related to group names based of suffix list are always found
# as we are dynamically registering group and its options.
for group_name in group_names:
conf_section = getattr(conf, group_name)
if conf_section.global_default:
global_default_count += 1
store_plugin = conf_section.secret_store_plugin
if not store_plugin:
raise exception.MultipleStorePluginValueMissing(conf_section)
parsed_stores.append(store_conf(store_plugin,
conf_section.crypto_plugin,
conf_section.global_default))
if global_default_count != 1:
raise exception.MultipleStoreIncorrectGlobalDefault(
global_default_count)
return parsed_stores

View File

@ -356,3 +356,6 @@ class WhenTestingSimpleCryptoPlugin(utils.BaseTestCase):
response_dto.kek_meta_extended,
mock.MagicMock())
self.assertEqual(16, len(key))
def test_get_plugin_name(self):
self.assertIsNotNone(self.plugin.get_plugin_name())

View File

@ -64,6 +64,9 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.cfg_mock.p11_crypto_plugin.seed_file = ''
self.cfg_mock.p11_crypto_plugin.seed_length = 32
self.plugin_name = 'Test PKCS11 plugin'
self.cfg_mock.p11_crypto_plugin.plugin_name = self.plugin_name
self.plugin = p11_crypto.P11CryptoPlugin(
conf=self.cfg_mock, pkcs11=self.pkcs11
)
@ -382,3 +385,6 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.assertEqual(self.pkcs11.finalize.call_count, 1)
self.assertEqual(self.plugin._create_pkcs11.call_count, 1)
self.assertEqual(self.plugin._configure_object_cache.call_count, 1)
def test_get_plugin_name(self):
self.assertEqual(self.plugin_name, self.plugin.get_plugin_name())

View File

@ -16,7 +16,11 @@
import mock
from barbican.common import utils as common_utils
from barbican.plugin.crypto import crypto
from barbican.plugin.crypto import manager as cm
from barbican.plugin.crypto import p11_crypto
from barbican.plugin.interface import secret_store as str
from barbican.plugin import store_crypto
from barbican.tests import utils
@ -27,6 +31,9 @@ class TestSecretStore(str.SecretStoreBase):
super(TestSecretStore, self).__init__()
self.alg_list = supported_alg_list
def get_plugin_name(self):
raise NotImplementedError # pragma: no cover
def generate_symmetric_key(self, key_spec):
raise NotImplementedError # pragma: no cover
@ -59,6 +66,9 @@ class TestSecretStoreWithTransportKey(str.SecretStoreBase):
super(TestSecretStoreWithTransportKey, self).__init__()
self.alg_list = supported_alg_list
def get_plugin_name(self):
raise NotImplementedError # pragma: no cover
def generate_symmetric_key(self, key_spec):
raise NotImplementedError # pragma: no cover
@ -243,3 +253,38 @@ class WhenTestingSecretStorePluginManager(utils.BaseTestCase):
self.manager.get_plugin_store(
key_spec=keySpec,
transport_key_needed=True))
class TestSecretStorePluginManagerMultipleBackend(
utils.MultipleBackendsTestCase):
def test_plugin_created_as_per_mulitple_backend_conf(self):
"""Check plugins are created as per multiple backend conf
"""
store_plugin_names = ['store_crypto', 'kmip_plugin', 'store_crypto']
crypto_plugin_names = ['p11_crypto', '', 'simple_crypto']
self.init_via_conf_file(store_plugin_names,
crypto_plugin_names, enabled=True)
with mock.patch('barbican.plugin.crypto.p11_crypto.P11CryptoPlugin.'
'_create_pkcs11') as m_pkcs11, \
mock.patch('kmip.pie.client.ProxyKmipClient') as m_kmip:
manager = str.SecretStorePluginManager()
# check pkcs11 and kmip plugin instantiation call is invoked
m_pkcs11.called_once_with(mock.ANY, mock.ANY)
m_kmip.called_once_with(mock.ANY)
# check store crypto adapter is matched as its defined first.
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
plugin_found = manager.get_plugin_store(keySpec)
self.assertIsInstance(plugin_found,
store_crypto.StoreCryptoAdapterPlugin)
# check pkcs11 crypto is matched as its defined first.
crypto_plugin = cm.get_manager().get_plugin_store_generate(
crypto.PluginSupportTypes.ENCRYPT_DECRYPT)
self.assertIsInstance(crypto_plugin, p11_crypto.P11CryptoPlugin)

View File

@ -52,9 +52,10 @@ class WhenTestingDogtagKRAPlugin(utils.BaseTestCase):
# create nss db for test only
self.nss_dir = tempfile.mkdtemp()
self.plugin_name = "Test Dogtag KRA plugin"
self.cfg_mock = mock.MagicMock(name='config mock')
self.cfg_mock.dogtag_plugin = mock.MagicMock(
nss_db_path=self.nss_dir)
nss_db_path=self.nss_dir, plugin_name=self.plugin_name)
self.plugin = dogtag_import.DogtagKRAPlugin(self.cfg_mock)
self.plugin.keyclient = self.keyclient_mock
@ -63,6 +64,9 @@ class WhenTestingDogtagKRAPlugin(utils.BaseTestCase):
self.patcher.stop()
os.rmdir(self.nss_dir)
def test_get_plugin_name(self):
self.assertEqual(self.plugin_name, self.plugin.get_plugin_name())
def test_generate_symmetric_key(self):
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.AES, 128)
self.plugin.generate_symmetric_key(key_spec)

View File

@ -920,3 +920,9 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
CONF.kmip_plugin.keyfile = '/some/path'
kss.KMIPSecretStore(CONF)
self.assertEqual(1, len(m.mock_calls))
def test_get_plugin_name(self):
CONF = kss.CONF
CONF.kmip_plugin.plugin_name = "Test KMIP Plugin"
secret_store = kss.KMIPSecretStore(CONF)
self.assertEqual("Test KMIP Plugin", secret_store.get_plugin_name())

View File

@ -0,0 +1,138 @@
# (c) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import mock
from barbican.common import config
from barbican.common import exception
from barbican.plugin.util import multiple_backends
from barbican.tests import utils as test_utils
class MockedManager(object):
NAME_PREFIX = "friendly_"
def __init__(self, names):
ExtTuple = collections.namedtuple('ExtTuple', ['name', 'obj'])
self.extensions = []
for name in names:
m = mock.MagicMock()
m.get_plugin_name.return_value = self.NAME_PREFIX + name
new_extension = ExtTuple(name, m)
self.extensions.append(new_extension)
class WhenReadingMultipleBackendsConfig(test_utils.MultipleBackendsTestCase):
def setUp(self):
super(WhenReadingMultipleBackendsConfig, self).setUp()
def test_successful_conf_read(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True,
global_default_index=1)
stores = multiple_backends.read_multiple_backends_config()
self.assertEqual(len(ss_plugins), len(stores))
self.assertEqual('ss_p1', stores[0].store_plugin)
self.assertEqual('cr_p1', stores[0].crypto_plugin)
self.assertEqual(False, stores[0].global_default)
self.assertEqual('ss_p2', stores[1].store_plugin)
self.assertEqual('cr_p2', stores[1].crypto_plugin)
self.assertEqual(True, stores[1].global_default)
self.assertEqual('ss_p3', stores[2].store_plugin)
self.assertEqual('cr_p3', stores[2].crypto_plugin)
self.assertEqual(False, stores[2].global_default)
def test_fail_when_store_plugin_name_missing(self):
ss_plugins = ['ss_p1', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
self.assertRaises(exception.MultipleStorePluginValueMissing,
multiple_backends.read_multiple_backends_config)
def test_fail_when_store_plugin_name_is_blank(self):
ss_plugins = ['ss_p1', '', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
self.assertRaises(exception.MultipleStorePluginValueMissing,
multiple_backends.read_multiple_backends_config)
def test_successful_conf_read_when_crypto_plugin_name_is_missing(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
stores = multiple_backends.read_multiple_backends_config()
self.assertEqual(len(ss_plugins), len(stores))
def test_conf_read_when_multiple_plugin_disabled(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=False)
stores = multiple_backends.read_multiple_backends_config()
self.assertIsNone(stores)
def test_successful_conf_read_when_crypto_plugin_name_is_blank(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', '', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
stores = multiple_backends.read_multiple_backends_config()
self.assertEqual(len(ss_plugins), len(stores))
def test_fail_when_global_default_not_specified(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True,
global_default_index=-1)
self.assertRaises(exception.MultipleStoreIncorrectGlobalDefault,
multiple_backends.read_multiple_backends_config)
def test_fail_when_stores_lookup_suffix_missing_when_enabled(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True,
global_default_index=0)
conf = config.get_module_config('secretstore')
conf.set_override("stores_lookup_suffix", [], group='secretstore',
enforce_type=True)
self.assertRaises(exception.MultipleSecretStoreLookupFailed,
multiple_backends.read_multiple_backends_config)
def test_fail_when_secretstore_section_missing(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True,
global_default_index=-1)
ss_conf = config.get_module_config('secretstore')
existing_value = ss_conf.secretstore.stores_lookup_suffix
existing_value.append('unknown_section')
ss_conf.set_override('stores_lookup_suffix', existing_value,
'secretstore')
self.assertRaises(exception.MultipleStorePluginValueMissing,
multiple_backends.read_multiple_backends_config)

View File

@ -22,7 +22,10 @@ import types
import uuid
import mock
from oslo_config import cfg
import oslotest.base as oslotest
from oslotest import createfile
import six
from six.moves.urllib import parse
import webtest
@ -30,7 +33,14 @@ import webtest
from OpenSSL import crypto
from barbican.api import app
from barbican.common import config
import barbican.context
from barbican.model import repositories
from barbican.plugin.crypto import manager as cm
from barbican.plugin.crypto import p11_crypto
from barbican.plugin.interface import secret_store
from barbican.plugin import kmip_secret_store as kss
from barbican.tests import database_utils
@ -400,6 +410,142 @@ def parameterized_dataset(build_data):
return decorator
def setup_oslo_config_conf(testcase, content, conf_instance=None):
conf_file_fixture = testcase.useFixture(
createfile.CreateFileWithContent('barbican', content))
if conf_instance is None:
conf_instance = cfg.CONF
conf_instance([], project="barbican",
default_config_files=[conf_file_fixture.path])
testcase.addCleanup(conf_instance.reset)
def setup_multiple_secret_store_plugins_conf(testcase, store_plugin_names,
crypto_plugin_names,
global_default_index,
conf_instance=None,
multiple_support_enabled=None):
"""Sets multiple secret store support conf as oslo conf file.
Generating file based conf based on input store and crypto plugin names
provided as list. Index specified in argument is used to mark that specific
secret store as global_default = True.
Input lists are
'store_plugins': ['store_crypto', 'kmip_plugin', 'store_crypto'],
'crypto_plugins': ['simple_crypto', '', 'p11_crypto'],
Sample output conf file generated is
[secretstore]
enable_multiple_secret_stores = True
stores_lookup_suffix = plugin_0, plugin_1, plugin_2
[secretstore:plugin_0]
secret_store_plugin = store_crypto
crypto_plugin = simple_crypto
global_default = True
[secretstore:plugin_1]
secret_store_plugin = kmip_plugin
[secretstore:plugin_2]
secret_store_plugin = store_crypto
crypto_plugin = p11_crypto
"""
def _get_conf_line(name, value, section=None):
out_line = "\n[{0}]\n".format(section) if section else ""
out_line += "{0} = {1}\n".format(name, value) if name else ""
return out_line
if multiple_support_enabled is None:
multiple_support_enabled = True
conf_content = ""
if store_plugin_names is not None:
if len(store_plugin_names) < len(crypto_plugin_names):
max_count = len(crypto_plugin_names)
else:
max_count = len(store_plugin_names)
lookup_names = ['plugin_{0}'.format(indx) for indx in range(max_count)]
section_names = ['secretstore:{0}'.format(lname) for lname in
lookup_names]
lookup_str = ", ".join(lookup_names)
conf_content = _get_conf_line('enable_multiple_secret_stores',
multiple_support_enabled,
section='secretstore')
conf_content += _get_conf_line('stores_lookup_suffix',
lookup_str, section=None)
for indx, section_name in enumerate(section_names):
if indx < len(store_plugin_names):
store_plugin = store_plugin_names[indx]
conf_content += _get_conf_line('secret_store_plugin',
store_plugin,
section=section_name)
else:
conf_content += _get_conf_line(None, None,
section=section_name)
if indx < len(crypto_plugin_names):
crypto_plugin = crypto_plugin_names[indx]
conf_content += _get_conf_line('crypto_plugin', crypto_plugin,
section=None)
if indx == global_default_index:
conf_content += _get_conf_line('global_default', 'True',
section=None)
setup_oslo_config_conf(testcase, conf_content, conf_instance)
class MultipleBackendsTestCase(database_utils.RepositoryTestCase):
def setUp(self):
super(MultipleBackendsTestCase, self).setUp()
def _mock_plugin_settings(self):
kmip_conf = kss.CONF
kmip_conf.kmip_plugin.username = "sample_username"
kmip_conf.kmip_plugin.password = "sample_password"
kmip_conf.kmip_plugin.keyfile = None
kmip_conf.kmip_plugin.pkcs1_only = False
pkcs11_conf = p11_crypto.CONF
pkcs11_conf.p11_crypto_plugin.library_path = "/tmp" # any dummy path
def init_via_conf_file(self, store_plugin_names, crypto_plugin_names,
enabled=True, global_default_index=0):
secretstore_conf = config.get_module_config('secretstore')
setup_multiple_secret_store_plugins_conf(
self, store_plugin_names=store_plugin_names,
crypto_plugin_names=crypto_plugin_names,
global_default_index=global_default_index,
conf_instance=secretstore_conf,
multiple_support_enabled=enabled)
# clear globals if already set in previous tests
secret_store._SECRET_STORE = None # clear secret store manager
cm._PLUGIN_MANAGER = None # clear crypto manager
self._mock_plugin_settings()
def _get_secret_store_entry(self, store_plugin, crypto_plugin):
all_ss = repositories.get_secret_stores_repository().get_all()
for ss in all_ss:
if (ss.store_plugin == store_plugin and
ss.crypto_plugin == crypto_plugin):
return ss
return None
def create_timestamp_w_tz_and_offset(timezone=None, days=0, hours=0, minutes=0,
seconds=0):
"""Creates a timestamp with a timezone and offset in days

View File

@ -263,6 +263,9 @@ enabled_crypto_plugins = simple_crypto
# the kek should be a 32-byte value which is base64 encoded
kek = 'YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY='
# User friendly plugin name
# plugin_name = 'Software Only Crypto'
[dogtag_plugin]
pem_path = '/etc/barbican/kra_admin_cert.pem'
dogtag_host = localhost
@ -274,6 +277,9 @@ simple_cmc_profile = 'caOtherCert'
ca_expiration_time = 1
plugin_working_dir = '/etc/barbican/dogtag'
# User friendly plugin name
# plugin_name = 'Dogtag KRA'
[p11_crypto_plugin]
# Path to vendor PKCS11 library
@ -297,6 +303,9 @@ hmac_label = 'my_hmac_label'
# Max number of items in pkek cache
# pkek_cache_limit = 100
# User friendly plugin name
# plugin_name = 'PKCS11 HSM'
# ================== KMIP plugin =====================
[kmip_plugin]
@ -308,6 +317,9 @@ keyfile = '/path/to/certs/cert.key'
certfile = '/path/to/certs/cert.crt'
ca_certs = '/path/to/certs/LocalCA.crt'
# User friendly plugin name
# plugin_name = 'KMIP HSM'
# ================= Certificate plugin ===================
[certificate]