Merge "Adding multiple backend db model and repository support (Part 1)"

This commit is contained in:
Jenkins 2016-09-12 21:11:47 +00:00 committed by Gerrit Code Review
commit d1040279f8
5 changed files with 862 additions and 0 deletions

View File

@ -0,0 +1,62 @@
"""Model for multiple backend support
Revision ID: 39cf2e645cba
Revises: d2780d5aa510
Create Date: 2016-07-29 16:45:22.953811
"""
# revision identifiers, used by Alembic.
revision = '39cf2e645cba'
down_revision = 'd2780d5aa510'
from alembic import op
import sqlalchemy as sa
def upgrade():
ctx = op.get_context()
con = op.get_bind()
table_exists = ctx.dialect.has_table(con.engine, 'secret_stores')
if not table_exists:
op.create_table(
'secret_stores',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('deleted', sa.Boolean(), nullable=False),
sa.Column('status', sa.String(length=20), nullable=False),
sa.Column('store_plugin', sa.String(length=255), nullable=False),
sa.Column('crypto_plugin', sa.String(length=255), nullable=True),
sa.Column('global_default', sa.Boolean(), nullable=False,
default=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('store_plugin', 'crypto_plugin',
name='_secret_stores_plugin_names_uc'),
sa.UniqueConstraint('name',
name='_secret_stores_name_uc')
)
table_exists = ctx.dialect.has_table(con.engine, 'project_secret_store')
if not table_exists:
op.create_table(
'project_secret_store',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('deleted', sa.Boolean(), nullable=False),
sa.Column('status', sa.String(length=20), nullable=False),
sa.Column('project_id', sa.String(length=36), nullable=False),
sa.Column('secret_store_id', sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(['project_id'], ['projects.id'],),
sa.ForeignKeyConstraint(
['secret_store_id'], ['secret_stores.id'],),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('project_id',
name='_project_secret_store_project_uc')
)
op.create_index(op.f('ix_project_secret_store_project_id'),
'project_secret_store', ['project_id'], unique=True)

View File

@ -1379,3 +1379,102 @@ class ProjectQuotas(BASE, ModelBase):
if self.cas:
ret['cas'] = self.cas
return ret
class SecretStores(BASE, ModelBase):
"""List of secret stores defined via service configuration.
This class provides a list of secret stores entities with their respective
secret store plugin and crypto plugin names.
SecretStores deletes are NOT soft-deletes.
"""
__tablename__ = 'secret_stores'
store_plugin = sa.Column(sa.String(255), nullable=False)
crypto_plugin = sa.Column(sa.String(255), nullable=True)
global_default = sa.Column(sa.Boolean, nullable=False, default=False)
name = sa.Column(sa.String(255), nullable=False)
__table_args__ = (sa.UniqueConstraint(
'store_plugin', 'crypto_plugin',
name='_secret_stores_plugin_names_uc'),
sa.UniqueConstraint('name', name='_secret_stores_name_uc'),)
def __init__(self, name, store_plugin, crypto_plugin=None,
global_default=None):
"""Creates secret store entity."""
super(SecretStores, self).__init__()
msg = u._("Must supply non-Blank {0} argument for SecretStores entry.")
if not name:
raise exception.MissingArgumentError(msg.format("name"))
if not store_plugin:
raise exception.MissingArgumentError(msg.format("store_plugin"))
self.store_plugin = store_plugin
self.name = name
self.crypto_plugin = crypto_plugin
if global_default is not None:
self.global_default = global_default
self.status = States.ACTIVE
def _do_extra_dict_fields(self):
"""Sub-class hook method: return dict of fields."""
return {'secret_store_id': self.id,
'store_plugin': self.store_plugin,
'crypto_plugin': self.crypto_plugin,
'global_default': self.global_default,
'name': self.name}
class ProjectSecretStore(BASE, ModelBase):
"""Stores secret store to be used for new project secrets.
This class maintains secret store and project mapping so that new project
secret entries uses it as plugin backend.
ProjectSecretStores deletes are NOT soft-deletes.
"""
__tablename__ = 'project_secret_store'
secret_store_id = sa.Column(sa.String(36),
sa.ForeignKey('secret_stores.id'),
index=True,
nullable=False)
project_id = sa.Column(sa.String(36),
sa.ForeignKey('projects.id'),
index=True,
nullable=False)
secret_store = orm.relationship("SecretStores", backref="project_store")
project = orm.relationship('Project',
backref=orm.backref('preferred_secret_store'))
__table_args__ = (sa.UniqueConstraint(
'project_id', name='_project_secret_store_project_uc'),)
def __init__(self, project_id, secret_store_id):
"""Creates project secret store mapping entity."""
super(ProjectSecretStore, self).__init__()
msg = u._("Must supply non-None {0} argument for ProjectSecretStore "
" entry.")
if not project_id:
raise exception.MissingArgumentError(msg.format("project_id"))
self.project_id = project_id
if not secret_store_id:
raise exception.MissingArgumentError(msg.format("secret_store_id"))
self.secret_store_id = secret_store_id
self.status = States.ACTIVE
def _do_extra_dict_fields(self):
"""Sub-class hook method: return dict of fields."""
return {'secret_store_id': self.secret_store_id,
'project_id': self.project_id}

View File

@ -68,6 +68,8 @@ _SECRET_META_REPOSITORY = None
_SECRET_USER_META_REPOSITORY = None
_SECRET_REPOSITORY = None
_TRANSPORT_KEY_REPOSITORY = None
_SECRET_STORES_REPOSITORY = None
_PROJECT_SECRET_STORE_REPOSITORY = None
CONF = config.CONF
@ -2189,6 +2191,153 @@ class ProjectQuotasRepo(BaseRepo):
entity.delete(session=session)
class SecretStoresRepo(BaseRepo):
"""Repository for the SecretStores entity.
SecretStores entries are not soft delete. So there is no
need to have deleted=False filter in queries.
"""
def get_all(self, session=None):
"""Get list of available secret stores.
Status value is not used while getting complete list as
we will just maintain ACTIVE ones. No other state is used and
needed here.
:param session: SQLAlchemy session object.
:return: None
"""
session = self.get_session(session)
query = session.query(models.SecretStores)
query.order_by(models.SecretStores.created_at.asc())
return query.all()
def _do_entity_name(self):
"""Sub-class hook: return entity name, such as for debugging."""
return "SecretStores"
def _do_build_get_query(self, entity_id, external_project_id, session):
"""Sub-class hook: build a retrieve query."""
return session.query(models.SecretStores).filter_by(
id=entity_id)
def _do_validate(self, values):
"""Sub-class hook: validate values."""
pass
class ProjectSecretStoreRepo(BaseRepo):
"""Repository for the ProjectSecretStore entity.
ProjectSecretStore entries are not soft delete. So there is no
need to have deleted=False filter in queries.
"""
def get_secret_store_for_project(self, project_id, external_project_id,
suppress_exception=False, session=None):
"""Returns preferred secret store for a project if set.
:param project_id: ID of project whose preferred secret store is set
:param external_project_id: external ID of project whose preferred
secret store is set
:param suppress_exception: when True, NotFound is not raised
:param session: SQLAlchemy session object.
Will return preferred secret store by external project id if provided
otherwise uses barbican project identifier to lookup.
Throws exception in case no preferred secret store is defined and
supporess_exception=False. If suppress_exception is True, then returns
None for no preferred secret store for a project found.
"""
session = self.get_session(session)
if external_project_id is None:
query = session.query(models.ProjectSecretStore).filter_by(
project_id=project_id)
else:
query = session.query(models.ProjectSecretStore)
query = query.join(models.Project,
models.ProjectSecretStore.project)
query = query.filter(models.Project.external_id ==
external_project_id)
try:
entity = query.one()
except sa_orm.exc.NoResultFound:
LOG.info(u._LE("No preferred secret store found for project = %s"),
project_id)
entity = None
if not suppress_exception:
_raise_entity_not_found(self._do_entity_name(), project_id)
return entity
def create_or_update_for_project(self, project_id, secret_store_id,
session=None):
"""Create or update preferred secret store for a project.
:param project_id: ID of project whose preferred secret store is set
:param secret_store_id: ID of secret store
:param session: SQLAlchemy session object.
:return: None
If preferred secret store is not set for given project, then create
new preferred secret store setting for that project. If secret store
setting for project is already there, then it updates with given secret
store id.
"""
session = self.get_session(session)
try:
entity = self.get_secret_store_for_project(project_id, None,
session=session)
except exception.NotFound:
entity = self.create_from(
models.ProjectSecretStore(project_id, secret_store_id),
session=session)
else:
entity.secret_store_id = secret_store_id
entity.save(session)
return entity
def get_count_by_secret_store(self, secret_store_id, session=None):
"""Gets count of projects mapped to a given secret store.
:param secret_store_id: id of secret stores entity
:param session: existing db session reference. If None, gets session.
:return: an number 0 or greater
This method is supposed to provide count of projects which are
currently set to use input secret store as their preferred store. This
is used when existing secret store configuration is removed and
validation is done to make sure that there are no projects using it as
preferred secret store.
"""
session = self.get_session(session)
query = session.query(models.ProjectSecretStore).filter_by(
secret_store_id=secret_store_id)
return query.count()
def _do_entity_name(self):
"""Sub-class hook: return entity name, such as for debugging."""
return "ProjectSecretStore"
def _do_build_get_query(self, entity_id, external_project_id, session):
"""Sub-class hook: build a retrieve query."""
return session.query(models.ProjectSecretStore).filter_by(
id=entity_id)
def _do_validate(self, values):
"""Sub-class hook: validate values."""
pass
def _build_get_project_entities_query(self, project_id, session):
"""Builds query for getting preferred secret stores list for a project.
:param project_id: id of barbican project entity
:param session: existing db session reference.
"""
return session.query(models.ProjectSecretStore).filter_by(
project_id=project_id)
def get_ca_repository():
"""Returns a singleton Secret repository instance."""
global _CA_REPOSITORY
@ -2316,6 +2465,19 @@ def get_transport_key_repository():
return _get_repository(_TRANSPORT_KEY_REPOSITORY, TransportKeyRepo)
def get_secret_stores_repository():
"""Returns a singleton Secret Stores repository instance."""
global _SECRET_STORES_REPOSITORY
return _get_repository(_SECRET_STORES_REPOSITORY, SecretStoresRepo)
def get_project_secret_store_repository():
"""Returns a singleton Project Secret Store repository instance."""
global _PROJECT_SECRET_STORE_REPOSITORY
return _get_repository(_PROJECT_SECRET_STORE_REPOSITORY,
ProjectSecretStoreRepo)
def _get_repository(global_ref, repo_class):
if not global_ref:
global_ref = repo_class()

View File

@ -0,0 +1,426 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from barbican.common import exception
from barbican.model import models
from barbican.model import repositories
from barbican.tests import database_utils
class WhenTestingSecretStoresRepo(database_utils.RepositoryTestCase):
def setUp(self):
super(WhenTestingSecretStoresRepo, self).setUp()
self.s_stores_repo = repositories.get_secret_stores_repository()
self.def_name = "PKCS11 HSM"
self.def_store_plugin = "store_crypto"
self.def_crypto_plugin = "p11_crypto"
self.default_secret_store = self._create_secret_store(
self.def_name, self.def_store_plugin, self.def_crypto_plugin, True)
def _create_secret_store(self, name, store_plugin, crypto_plugin=None,
global_default=None):
session = self.s_stores_repo.get_session()
s_stores_model = models.SecretStores(name=name,
store_plugin=store_plugin,
crypto_plugin=crypto_plugin,
global_default=global_default)
s_stores = self.s_stores_repo.create_from(s_stores_model,
session=session)
s_stores.save(session=session)
session.commit()
return s_stores
def test_get_by_entity_id(self):
session = self.s_stores_repo.get_session()
s_stores = self.s_stores_repo.get(self.default_secret_store.id,
session=session)
self.assertIsNotNone(s_stores)
self.assertEqual(self.def_store_plugin, s_stores.store_plugin)
self.assertEqual(self.def_crypto_plugin, s_stores.crypto_plugin)
self.assertEqual(True, s_stores.global_default)
self.assertEqual(models.States.ACTIVE, s_stores.status)
def test_should_raise_notfound_exception_get_by_entity_id(self):
self.assertRaises(exception.NotFound, self.s_stores_repo.get,
"invalid_id", suppress_exception=False)
def test_delete_entity_by_id(self):
session = self.s_stores_repo.get_session()
s_stores = self.s_stores_repo.get(self.default_secret_store.id,
session=session)
self.assertIsNotNone(s_stores)
self.s_stores_repo.delete_entity_by_id(self.default_secret_store.id,
None, session=session)
s_stores = self.s_stores_repo.get(self.default_secret_store.id,
suppress_exception=True,
session=session)
self.assertIsNone(s_stores)
def test_get_all(self):
session = self.s_stores_repo.get_session()
all_stores = self.s_stores_repo.get_all(session=session)
self.assertIsNotNone(all_stores)
self.assertEqual(1, len(all_stores))
self._create_secret_store("db backend", "store_crypto",
"simple_crypto", False)
all_stores = self.s_stores_repo.get_all(session=session)
self.assertEqual(2, len(all_stores))
self.assertEqual("simple_crypto", all_stores[1].crypto_plugin)
self.assertEqual("store_crypto", all_stores[1].store_plugin)
self.assertEqual("db backend", all_stores[1].name)
self.assertEqual(False, all_stores[1].global_default)
def test_no_data_case_for_get_all(self):
self.s_stores_repo.delete_entity_by_id(self.default_secret_store.id,
None)
session = self.s_stores_repo.get_session()
all_stores = self.s_stores_repo.get_all(session=session)
self.assertEqual([], all_stores)
def test_get_all_check_sorting_order(self):
"""Check that all stores are sorted in ascending creation time
"""
session = self.s_stores_repo.get_session()
self._create_secret_store("second_name", "second_store",
"second_crypto", False)
m_stores = self._create_secret_store("middle_name", "middle_store",
"middle_crypto", False)
self._create_secret_store("last_name", "last_store", "last_crypto",
False)
all_stores = self.s_stores_repo.get_all(session=session)
self.assertIsNotNone(all_stores)
self.assertEqual(4, len(all_stores))
# returned list is sorted by created_at field so check for last entry
self.assertEqual("last_crypto", all_stores[3].crypto_plugin)
self.assertEqual("last_store", all_stores[3].store_plugin)
self.assertEqual("last_name", all_stores[3].name)
self.assertEqual(False, all_stores[3].global_default)
# Now delete in between entry and create as new entry
self.s_stores_repo.delete_entity_by_id(m_stores.id, None,
session=session)
all_stores = self.s_stores_repo.get_all(session=session)
self._create_secret_store("middle_name", "middle_store",
"middle_crypto", False)
all_stores = self.s_stores_repo.get_all(session=session)
# now newly created entry should be last one.
self.assertEqual("middle_crypto", all_stores[3].crypto_plugin)
self.assertEqual("middle_store", all_stores[3].store_plugin)
self.assertEqual("middle_name", all_stores[3].name)
self.assertEqual(False, all_stores[3].global_default)
def test_should_raise_duplicate_for_same_plugin_names(self):
"""Check for store and crypto plugin name combination uniqueness"""
name = 'second_name'
store_plugin = 'second_store'
crypto_plugin = 'second_crypto'
self._create_secret_store(name, store_plugin, crypto_plugin, False)
self.assertRaises(exception.Duplicate, self._create_secret_store,
"thrid_name", store_plugin, crypto_plugin, False)
def test_should_raise_duplicate_for_same_names(self):
"""Check for secret store 'name' uniqueness"""
name = 'Db backend'
store_plugin = 'second_store'
crypto_plugin = 'second_crypto'
self._create_secret_store(name, store_plugin, crypto_plugin, False)
self.assertRaises(exception.Duplicate, self._create_secret_store,
name, "another_store", "another_crypto", False)
def test_do_entity_name(self):
"""Code coverage for entity_name which is used in case of exception.
Raising duplicate error for store and crypto plugin combination
"""
name = "DB backend"
store_plugin = 'second_store'
crypto_plugin = 'second_crypto'
self._create_secret_store(name, store_plugin, crypto_plugin, False)
try:
self._create_secret_store(name, store_plugin, crypto_plugin, False)
self.assertFail()
except exception.Duplicate as ex:
self.assertIn("SecretStores", ex.message)
class WhenTestingProjectSecretStoreRepo(database_utils.RepositoryTestCase):
def setUp(self):
super(WhenTestingProjectSecretStoreRepo, self).setUp()
self.proj_store_repo = repositories.\
get_project_secret_store_repository()
self.def_name = "PKCS11 HSM"
self.def_store_plugin = "store_crypto"
self.def_crypto_plugin = "p11_crypto"
self.default_secret_store = self._create_secret_store(
self.def_name, self.def_store_plugin, self.def_crypto_plugin, True)
def _create_secret_store(self, name, store_plugin, crypto_plugin=None,
global_default=None):
s_stores_repo = repositories.get_secret_stores_repository()
session = s_stores_repo.get_session()
s_stores_model = models.SecretStores(name=name,
store_plugin=store_plugin,
crypto_plugin=crypto_plugin,
global_default=global_default)
s_stores = s_stores_repo.create_from(s_stores_model,
session=session)
s_stores.save(session=session)
session.commit()
return s_stores
def _create_project(self):
session = self.proj_store_repo.get_session()
project = models.Project()
project.external_id = "keystone_project_id" + uuid.uuid4().hex
project.save(session=session)
return project
def _create_project_store(self, project_id, secret_store_id):
session = self.proj_store_repo.get_session()
proj_model = models.ProjectSecretStore(project_id, secret_store_id)
proj_s_store = self.proj_store_repo.create_from(proj_model, session)
proj_s_store.save(session=session)
return proj_s_store
def test_get_by_entity_id(self):
"""Tests for 'get' call by project secret store id"""
project = self._create_project()
proj_s_store = self._create_project_store(project.id,
self.default_secret_store.id)
session = self.proj_store_repo.get_session()
s_stores = self.proj_store_repo.get(proj_s_store.id, session=session)
self.assertIsNotNone(proj_s_store)
self.assertEqual(project.id, proj_s_store.project_id)
self.assertEqual(self.default_secret_store.id,
proj_s_store.secret_store_id)
self.assertEqual(models.States.ACTIVE, s_stores.status)
# assert values via relationship
self.assertEqual(self.default_secret_store.store_plugin,
proj_s_store.secret_store.store_plugin)
self.assertEqual(project.external_id, proj_s_store.project.external_id)
def test_should_raise_notfound_exception_get_by_entity_id(self):
self.assertRaises(exception.NotFound, self.proj_store_repo.get,
"invalid_id", suppress_exception=False)
def test_delete_entity_by_id(self):
project = self._create_project()
proj_s_store = self._create_project_store(project.id,
self.default_secret_store.id)
session = self.proj_store_repo.get_session()
proj_s_store = self.proj_store_repo.get(proj_s_store.id,
session=session)
self.assertIsNotNone(proj_s_store)
self.proj_store_repo.delete_entity_by_id(proj_s_store.id, None,
session=session)
proj_s_store = self.proj_store_repo.get(proj_s_store.id,
suppress_exception=True,
session=session)
self.assertIsNone(proj_s_store)
def test_should_raise_duplicate_for_same_project_id(self):
"""Check preferred secret store is set only once for project"""
project1 = self._create_project()
name = "first_name"
store_plugin = 'first_store'
crypto_plugin = 'first_crypto'
s_store1 = self._create_secret_store(name, store_plugin,
crypto_plugin, False)
# set preferred secret store for project1
self._create_project_store(project1.id,
s_store1.id)
name = "second_name"
store_plugin = 'second_store'
crypto_plugin = 'second_crypto'
s_store2 = self._create_secret_store(name, store_plugin,
crypto_plugin, False)
self.assertRaises(exception.Duplicate, self._create_project_store,
project1.id, s_store2.id)
def test_do_entity_name(self):
"""Code coverage for entity_name which is used in case of exception.
Raising duplicate error when try to set another entry for existing
project
"""
project1 = self._create_project()
name = "first name"
store_plugin = 'first_store'
crypto_plugin = 'first_crypto'
s_store1 = self._create_secret_store(name, store_plugin,
crypto_plugin, False)
# set preferred secret store for project1
self._create_project_store(project1.id,
s_store1.id)
try:
name = "second_name"
store_plugin = 'second_store'
crypto_plugin = 'second_crypto'
s_store2 = self._create_secret_store(name, store_plugin,
crypto_plugin, False)
self._create_project_store(project1.id, s_store2.id)
self.assertFail()
except exception.Duplicate as ex:
self.assertIn("ProjectSecretStore", ex.message)
def test_get_secret_store_for_project(self):
project1 = self._create_project()
name = "first_name"
store_plugin = 'first_store'
crypto_plugin = 'first_crypto'
s_store1 = self._create_secret_store(name, store_plugin,
crypto_plugin, False)
# set preferred secret store for project1
proj_s_store = self._create_project_store(project1.id, s_store1.id)
# get preferred secret store by barbican project id
read_project_s_store = self.proj_store_repo.\
get_secret_store_for_project(project1.id, None)
self.assertEqual(proj_s_store.project_id,
read_project_s_store.project_id)
self.assertEqual(proj_s_store.secret_store_id,
read_project_s_store.secret_store_id)
# get preferred secret store by keystone project id
read_project_s_store = self.proj_store_repo.\
get_secret_store_for_project(None, project1.external_id)
self.assertEqual(proj_s_store.project_id,
read_project_s_store.project_id)
self.assertEqual(project1.external_id,
read_project_s_store.project.external_id)
self.assertEqual(proj_s_store.secret_store_id,
read_project_s_store.secret_store_id)
def test_raise_notfound_exception_get_secret_store_for_project(self):
self.assertRaises(exception.NotFound,
self.proj_store_repo.get_secret_store_for_project,
"invalid_id", None, suppress_exception=False)
def test_with_exception_suppressed_get_secret_store_for_project(self):
returned_value = self.proj_store_repo.\
get_secret_store_for_project("invalid_id", None,
suppress_exception=True)
self.assertIsNone(returned_value)
def test_get_project_entities(self):
entities = self.proj_store_repo.get_project_entities(uuid.uuid4().hex)
self.assertEqual([], entities)
def test_create_or_update_for_project(self):
project1 = self._create_project()
name = "first_name"
store_plugin = 'first_store'
crypto_plugin = 'first_crypto'
s_store1 = self._create_secret_store(name, store_plugin,
crypto_plugin, False)
# assert that no preferred secret store is set project.
entity = self.proj_store_repo.get_secret_store_for_project(
project1.id, None, suppress_exception=True)
self.assertIsNone(entity)
# create/set preferred secret store now
created_entity = self.proj_store_repo.create_or_update_for_project(
project1.id, s_store1.id)
entity = self.proj_store_repo.get_secret_store_for_project(
project1.id, None, suppress_exception=False)
self.assertIsNotNone(entity) # new preferred secret store
self.assertEqual(project1.id, entity.project_id)
self.assertEqual(s_store1.id, entity.secret_store_id)
self.assertEqual(store_plugin, entity.secret_store.store_plugin)
self.assertEqual(crypto_plugin, entity.secret_store.crypto_plugin)
self.assertEqual(name, entity.secret_store.name)
name = 'second_name'
store_plugin = 'second_store'
crypto_plugin = 'second_crypto'
s_store2 = self._create_secret_store(name, store_plugin,
crypto_plugin, False)
updated_entity = self.proj_store_repo.create_or_update_for_project(
project1.id, s_store2.id)
self.assertEqual(created_entity.id, updated_entity.id)
self.assertEqual(s_store2.id, updated_entity.secret_store_id)
def test_get_count_by_secret_store(self):
project1 = self._create_project()
name = "first_name"
store_plugin = 'first_store'
crypto_plugin = 'first_crypto'
s_store1 = self._create_secret_store(name, store_plugin,
crypto_plugin, False)
count = self.proj_store_repo.get_count_by_secret_store(s_store1.id)
self.assertEqual(0, count)
# create/set preferred secret store now
self.proj_store_repo.create_or_update_for_project(project1.id,
s_store1.id)
count = self.proj_store_repo.get_count_by_secret_store(s_store1.id)
self.assertEqual(1, count)
project2 = self._create_project()
self.proj_store_repo.create_or_update_for_project(project2.id,
s_store1.id)
count = self.proj_store_repo.get_count_by_secret_store(s_store1.id)
self.assertEqual(2, count)

View File

@ -639,5 +639,118 @@ class WhenCreatingNewProjectQuotas(utils.BaseTestCase):
self.assertEqual(106,
project_quotas.to_dict_fields()['cas'])
class WhenCreatingNewSecretStores(utils.BaseTestCase):
def setUp(self):
super(WhenCreatingNewSecretStores, self).setUp()
def test_new_secret_stores_for_all_input(self):
name = "db backend"
store_plugin = 'store_crypto'
crypto_plugin = 'simple_crypto'
ss = models.SecretStores(name, store_plugin, crypto_plugin,
global_default=True)
self.assertEqual(store_plugin, ss.store_plugin)
self.assertEqual(crypto_plugin, ss.crypto_plugin)
self.assertEqual(name, ss.name)
self.assertEqual(True, ss.global_default)
self.assertEqual(models.States.ACTIVE, ss.status)
def test_new_secret_stores_required_input_only(self):
store_plugin = 'store_crypto'
name = "db backend"
ss = models.SecretStores(name, store_plugin)
self.assertEqual(store_plugin, ss.store_plugin)
self.assertEqual(name, ss.name)
self.assertIsNone(ss.crypto_plugin)
self.assertIsNone(ss.global_default) # False default is not used
self.assertEqual(models.States.ACTIVE, ss.status)
def test_should_throw_exception_missing_store_plugin(self):
name = "db backend"
self.assertRaises(exception.MissingArgumentError,
models.SecretStores, name, None)
self.assertRaises(exception.MissingArgumentError,
models.SecretStores, name, "")
def test_should_throw_exception_missing_name(self):
store_plugin = 'store_crypto'
self.assertRaises(exception.MissingArgumentError,
models.SecretStores, None, store_plugin)
self.assertRaises(exception.MissingArgumentError,
models.SecretStores, "", store_plugin)
def test_secret_stores_check_to_dict_fields(self):
name = "pkcs11 backend"
store_plugin = 'store_crypto'
crypto_plugin = 'p11_crypto'
ss = models.SecretStores(name, store_plugin, crypto_plugin,
global_default=True)
self.assertEqual(store_plugin,
ss.to_dict_fields()['store_plugin'])
self.assertEqual(crypto_plugin,
ss.to_dict_fields()['crypto_plugin'])
self.assertEqual(True,
ss.to_dict_fields()['global_default'])
self.assertEqual(models.States.ACTIVE,
ss.to_dict_fields()['status'])
self.assertEqual(name, ss.to_dict_fields()['name'])
# check with required input only
ss = models.SecretStores(name, store_plugin)
self.assertEqual(store_plugin,
ss.to_dict_fields()['store_plugin'])
self.assertIsNone(ss.to_dict_fields()['crypto_plugin'])
self.assertIsNone(ss.to_dict_fields()['global_default'])
self.assertEqual(models.States.ACTIVE,
ss.to_dict_fields()['status'])
self.assertEqual(name, ss.to_dict_fields()['name'])
class WhenCreatingNewProjectSecretStore(utils.BaseTestCase):
def setUp(self):
super(WhenCreatingNewProjectSecretStore, self).setUp()
def test_new_project_secret_store(self):
project_id = 'proj_123456'
name = "db backend"
store_plugin = 'store_crypto'
crypto_plugin = 'simple_crypto'
ss = models.SecretStores(name, store_plugin, crypto_plugin,
global_default=True)
ss.id = "ss_123456"
project_ss = models.ProjectSecretStore(project_id, ss.id)
self.assertEqual(project_id, project_ss.project_id)
self.assertEqual(ss.id, project_ss.secret_store_id)
self.assertEqual(models.States.ACTIVE, project_ss.status)
def test_should_throw_exception_missing_project_id(self):
self.assertRaises(exception.MissingArgumentError,
models.ProjectSecretStore, None, "ss_123456")
self.assertRaises(exception.MissingArgumentError,
models.ProjectSecretStore, "", "ss_123456")
def test_should_throw_exception_missing_secret_store_id(self):
self.assertRaises(exception.MissingArgumentError,
models.ProjectSecretStore, "proj_123456", None)
self.assertRaises(exception.MissingArgumentError,
models.ProjectSecretStore, "proj_123456", "")
def test_project_secret_store_check_to_dict_fields(self):
project_id = 'proj_123456'
secret_store_id = 'ss_7689012'
project_ss = models.ProjectSecretStore(project_id, secret_store_id)
self.assertEqual(project_id,
project_ss.to_dict_fields()['project_id'])
self.assertEqual(secret_store_id,
project_ss.to_dict_fields()['secret_store_id'])
self.assertEqual(models.States.ACTIVE,
project_ss.to_dict_fields()['status'])
if __name__ == '__main__':
unittest.main()