Implement Trust Flush via keystone-manage.

Creates a cli entry 'trust_flush' which removes
all expired trusts.

Change-Id: I1c85b67d24e05db86c85e722fbd773a411c24ac4
Closes-Bug: #1473292
This commit is contained in:
Vishakha Agarwal 2018-08-07 12:16:51 +05:30
parent 09975532c0
commit 8232dabcf9
7 changed files with 349 additions and 2 deletions

View File

@ -54,3 +54,12 @@ The delegation parameters are:
**Duration**
(Optional) Comprised of the start time and end time for the trust.
Removing Expired Trusts
===========================================================
In the SQL trust stores expired trusts are not automatically
removed. These trusts can be removed with::
$ keystone-manage trust_flush

View File

@ -22,3 +22,4 @@ Available commands:
* ``mapping_engine``: Test your federation mapping rules.
* ``saml_idp_metadata``: Generate identity provider metadata.
* ``token_flush``: Purge expired tokens.
* ``trust_flush``: Purge expired trusts.

View File

@ -41,7 +41,6 @@ from keystone.federation import utils as mapping_engine
from keystone.i18n import _
from keystone.server import backends
CONF = keystone.conf.CONF
LOG = log.getLogger(__name__)
@ -676,6 +675,36 @@ class TokenFlush(BaseApp):
)
class TrustFlush(BaseApp):
"""Flush expired trusts from the backend."""
name = 'trust_flush'
@classmethod
def add_argument_parser(cls, subparsers):
parser = super(TrustFlush, cls).add_argument_parser(subparsers)
parser.add_argument('--project-id', default=None,
help=('The id of the project of which the expired '
'trusts is to be purged'))
parser.add_argument('--trustor-user-id', default=None,
help=('The id of the trustor of which the expired '
'trusts is to be purged'))
parser.add_argument('--trustee-user-id', default=None,
help=('The id of the trustee of which the expired '
'trusts is to be purged'))
return parser
@classmethod
def main(cls):
drivers = backends.load_backends()
trust_manager = drivers['trust_api']
trust_manager.flush_expired_trusts(
project_id=CONF.command.project_id,
trustor_user_id=CONF.command.trustor_user_id,
trustee_user_id=CONF.command.trustee_user_id)
class MappingPurge(BaseApp):
"""Purge the mapping table."""
@ -1158,7 +1187,8 @@ CMDS = [
SamlIdentityProviderMetadata,
TokenFlush,
TokenRotate,
TokenSetup
TokenSetup,
TrustFlush
]

View File

@ -1627,3 +1627,47 @@ class TestTokenFlush(unit.TestCase):
tf = cli.TokenFlush()
tf.main()
self.assertThat(logging.output, matchers.Contains(expected_msg))
class TestTrustFlush(unit.SQLDriverOverrides, unit.BaseTestCase):
class FakeConfCommand(object):
def __init__(self, parent):
self.extension = False
self.project_id = parent.command_project_id
self.trustor_user_id = parent.command_trustor_user_id
self.trustee_user_id = parent.command_trustee_user_id
def setUp(self):
# Set up preset cli options and a parser
super(TestTrustFlush, self).setUp()
self.useFixture(database.Database())
self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
self.config_fixture.register_cli_opt(cli.command_opt)
# For unit tests that should not throw any erorrs,
# Use the argument parser to test that the combinations work
parser_test = argparse.ArgumentParser()
subparsers = parser_test.add_subparsers()
self.parser = cli.TrustFlush.add_argument_parser(subparsers)
def config_files(self):
config_files = super(TestTrustFlush, self).config_files()
config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
return config_files
def test_trust_flush(self):
self.command_project_id = None
self.command_trustor_user_id = None
self.command_trustee_user_id = None
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', self.FakeConfCommand(self)))
def fake_load_backends():
return dict(
trust_api=keystone.trust.core.Manager())
self.useFixture(fixtures.MockPatch(
'keystone.server.backends.load_backends',
side_effect=fake_load_backends))
trust = cli.TrustFlush()
trust.main()

View File

@ -18,6 +18,7 @@ from six.moves import range
from keystone.common import provider_api
from keystone import exception
from keystone.tests.unit import core
PROVIDERS = provider_api.ProviderAPIs
@ -182,3 +183,230 @@ class TrustTests(object):
uuid.uuid4().hex,
trust_data,
roles)
def test_flush_expired_trusts(self):
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
trust_ref1 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref1['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=1)
trust_ref2 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref2['expires_at'] = \
timeutils.utcnow() + datetime.timedelta(minutes=1)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref1['id'],
trust_ref1, roles)
self.assertIsNotNone(trust_data)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref2['id'],
trust_ref2, roles)
self.assertIsNotNone(trust_data)
PROVIDERS.trust_api.flush_expired_trusts()
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 1)
self.assertEqual(trust_ref2['id'], trusts[0]['id'])
def test_flush_expired_trusts_with_all_id(self):
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
trust_ref1 = core.new_trust_ref(
self.user_foo['id'], self.user_foo['id'],
project_id=self.tenant_bar['id'])
trust_ref1['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=1)
trust_ref2 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref2['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=5)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref1['id'],
trust_ref1, roles)
self.assertIsNotNone(trust_data)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref2['id'],
trust_ref2, roles)
self.assertIsNotNone(trust_data)
PROVIDERS.trust_api.flush_expired_trusts(
project_id=self.tenant_bar['id'],
trustor_user_id=self.user_foo['id'],
trustee_user_id=self.user_two['id'])
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 1)
self.assertEqual(trust_ref1['id'], trusts[0]['id'])
def test_flush_expired_trusts_with_no_project_id(self):
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
trust_ref1 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref1['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=1)
trust_ref2 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref2['expires_at'] = \
timeutils.utcnow() + datetime.timedelta(minutes=1)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref1['id'],
trust_ref1, roles)
self.assertIsNotNone(trust_data)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref2['id'],
trust_ref2, roles)
self.assertIsNotNone(trust_data)
PROVIDERS.trust_api.flush_expired_trusts(
trustor_user_id=self.user_foo['id'],
trustee_user_id=self.user_two['id'])
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 1)
self.assertEqual(trust_ref2['id'], trusts[0]['id'])
def test_flush_expired_trusts_with_no_trustor_id(self):
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
trust_ref1 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref1['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=1)
trust_ref2 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref2['expires_at'] = \
timeutils.utcnow() + datetime.timedelta(minutes=1)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref1['id'],
trust_ref1, roles)
self.assertIsNotNone(trust_data)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref2['id'],
trust_ref2, roles)
self.assertIsNotNone(trust_data)
PROVIDERS.trust_api.flush_expired_trusts(
project_id=self.tenant_bar['id'],
trustee_user_id=self.user_two['id'])
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 1)
self.assertEqual(trust_ref2['id'], trusts[0]['id'])
def test_flush_expired_trusts_with_no_trustee_id(self):
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
trust_ref1 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref1['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=1)
trust_ref2 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref2['expires_at'] = \
timeutils.utcnow() + datetime.timedelta(minutes=1)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref1['id'],
trust_ref1, roles)
self.assertIsNotNone(trust_data)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref2['id'],
trust_ref2, roles)
self.assertIsNotNone(trust_data)
PROVIDERS.trust_api.flush_expired_trusts(
project_id=self.tenant_bar['id'],
trustor_user_id=self.user_foo['id'])
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 1)
self.assertEqual(trust_ref2['id'], trusts[0]['id'])
def test_flush_expired_trusts_with_project_id(self):
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
trust_ref1 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref1['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=1)
trust_ref2 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.user_foo['id'])
trust_ref2['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=5)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref1['id'],
trust_ref1, roles)
self.assertIsNotNone(trust_data)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref2['id'],
trust_ref2, roles)
self.assertIsNotNone(trust_data)
PROVIDERS.trust_api.flush_expired_trusts(
project_id=self.tenant_bar['id'])
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 1)
self.assertEqual(trust_ref2['id'], trusts[0]['id'])
def test_flush_expired_trusts_with_trustee_id(self):
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
trust_ref1 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref1['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=1)
trust_ref2 = core.new_trust_ref(
self.user_foo['id'], self.user_foo['id'],
project_id=self.tenant_bar['id'])
trust_ref2['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=5)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref1['id'],
trust_ref1, roles)
self.assertIsNotNone(trust_data)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref2['id'],
trust_ref2, roles)
self.assertIsNotNone(trust_data)
PROVIDERS.trust_api.flush_expired_trusts(
trustee_user_id=self.user_two['id'])
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 1)
self.assertEqual(trust_ref2['id'], trusts[0]['id'])
def test_flush_expired_trusts_with_trustor_id(self):
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
trust_ref1 = core.new_trust_ref(
self.user_foo['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref1['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=1)
trust_ref2 = core.new_trust_ref(
self.user_two['id'], self.user_two['id'],
project_id=self.tenant_bar['id'])
trust_ref2['expires_at'] = \
timeutils.utcnow() - datetime.timedelta(minutes=5)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref1['id'],
trust_ref1, roles)
self.assertIsNotNone(trust_data)
trust_data = PROVIDERS.trust_api.create_trust(trust_ref2['id'],
trust_ref2, roles)
self.assertIsNotNone(trust_data)
PROVIDERS.trust_api.flush_expired_trusts(
trustor_user_id=self.user_foo['id'])
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 1)
self.assertEqual(trust_ref2['id'], trusts[0]['id'])

View File

@ -188,3 +188,18 @@ class Trust(base.TrustDriverBase):
trusts = query.filter_by(project_id=project_id)
for trust_ref in trusts:
trust_ref.deleted_at = timeutils.utcnow()
def flush_expired_trusts(self, project_id=None, trustor_user_id=None,
trustee_user_id=None):
with sql.session_for_write() as session:
query = session.query(TrustModel)
if project_id:
query = query.filter_by(project_id=project_id)
if trustor_user_id:
query = query.filter_by(trustor_user_id=trustor_user_id)
if trustee_user_id:
query = query.filter_by(trustee_user_id=trustee_user_id)
for ref in query:
if ref.expires_at is not None:
if ref.expires_at < timeutils.utcnow():
session.delete(ref)

View File

@ -0,0 +1,20 @@
---
feature:
- |
[`Bug 1473292 <https://bugs.launchpad.net/keystone/+bug/1473292>`_]
As trusts created by user are stored in database resulting db it to grow
larger as trusts that are expired are not automatically purged by keystone.
Thus this implements TrustFlush via keystone-manage to delete expired trusts.
Command:
$ keystone-manage trust-flush [Options]
Options:
project-id <string>: To purge expired trusts of given project-id.
trustor-user-id <string>: To purge expired trusts of given trustor-id.
trustee-user-id <string>: To purge expired trusts of given trustee-id.