From e65ac5e4394475ffe822946bea01590028882744 Mon Sep 17 00:00:00 2001 From: Mauricio Harley Date: Fri, 23 Sep 2022 15:57:40 +0200 Subject: [PATCH] Add secret consumers functional tests This adds secret consumers functional tests to use with Barbican. Change-Id: I23e71e534d94e753c3e94154f39ec04219ab0fa6 --- .../key_manager/test_barbican_key_manager.py | 222 ++++++++++++++++++ 1 file changed, 222 insertions(+) diff --git a/castellan/tests/functional/key_manager/test_barbican_key_manager.py b/castellan/tests/functional/key_manager/test_barbican_key_manager.py index 5fdd1a32..b809b358 100644 --- a/castellan/tests/functional/key_manager/test_barbican_key_manager.py +++ b/castellan/tests/functional/key_manager/test_barbican_key_manager.py @@ -34,11 +34,13 @@ from castellan.common import exception from castellan.key_manager import barbican_key_manager from castellan.tests.functional import config from castellan.tests.functional.key_manager import test_key_manager +from castellan.tests import utils CONF = config.get_config() +@utils.parameterized_test_case class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase): def _create_key_manager(self): @@ -108,6 +110,226 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase): self.assertRaises(exception.Forbidden, self.key_mgr.store, None, key) + def test_secret_create_check_empty_consumers_list(self): + """Check that the consumers entity is a list and is empty.""" + + key = test_key_manager._get_test_passphrase() + self.assertIsNotNone(key) + + stored_id = self.key_mgr.store(self.ctxt, key) + self.addCleanup(self.key_mgr.delete, self.ctxt, stored_id, True) + self.assertIsNotNone(stored_id) + + resp = self.key_mgr.get(self.ctxt, stored_id) + consumers = resp.consumers + self.assertIsInstance(consumers, list) + self.assertEqual(len(consumers), 0) + + def test_secret_create_check_consumers_list_consistency(self): + """Consumers List Consistency + + Check that the consumers list contains a single element, + and that it corresponds to the consumer created. + """ + + key = test_key_manager._get_test_passphrase() + self.assertIsNotNone(key) + + stored_id = self.key_mgr.store(self.ctxt, key) + self.addCleanup(self.key_mgr.delete, self.ctxt, stored_id, True) + self.assertIsNotNone(stored_id) + + resource_id = uuidutils.generate_uuid() + consumer_data = { + 'service': 'dummy_service', + 'resource_type': 'dummy_resource_type', + 'resource_id': resource_id + } + self.key_mgr.add_consumer(self.ctxt, stored_id, consumer_data) + stored_secret = self.key_mgr.get(self.ctxt, stored_id) + self.assertIsNotNone(stored_secret) + self.assertIsInstance(stored_secret.consumers, list) + self.assertEqual(len(stored_secret.consumers), 1) + self.assertEqual(stored_secret.consumers[0]['service'], + consumer_data['service']) + self.assertEqual(stored_secret.consumers[0]['resource_type'], + consumer_data['resource_type']) + self.assertEqual(stored_secret.consumers[0]['resource_id'], + consumer_data['resource_id']) + + def test_secret_create_remove_nonexistent_consumer(self): + """Removing a nonexistent consumer should raise an exception.""" + key = test_key_manager._get_test_passphrase() + self.assertIsNotNone(key) + + stored_id = self.key_mgr.store(self.ctxt, key) + self.addCleanup(self.key_mgr.delete, self.ctxt, stored_id, True) + self.assertIsNotNone(stored_id) + + resource_id = uuidutils.generate_uuid() + consumer_data = { + 'service': 'dummy_service', + 'resource_type': 'dummy_resource_type', + 'resource_id': resource_id + } + self.assertRaises(exception.ManagedObjectNotFoundError, + self.key_mgr.remove_consumer, self.ctxt, + stored_id, consumer_data) + + @utils.parameterized_dataset({ + 'remove_one': [[{'service': 'service_test1', + 'resource_type': 'type_test1', + 'resource_id': 'id_test1'}, + {'service': 'service_test2', + 'resource_type': 'type_test2', + 'resource_id': 'id_test2'}], + [{'service': 'service_test1', + 'resource_type': 'type_test1', + 'resource_id': 'id_test1'}]], + 'remove_all': [[{'service': 'service_test1', + 'resource_type': 'type_test1', + 'resource_id': 'id_test1'}, + {'service': 'service_test2', + 'resource_type': 'type_test2', + 'resource_id': 'id_test2'}], + [{'service': 'service_test1', + 'resource_type': 'type_test1', + 'resource_id': 'id_test1'}, + {'service': 'service_test2', + 'resource_type': 'type_test2', + 'resource_id': 'id_test2'}]] + }) + def test_secret_create_and_adding_removing_consumers( + self, + add_consumers, + remove_consumers): + """The following activities are carried: + + Create a secret, then register each consumer + in the register_consumers list, then remove each consumer + in the remove_consumers list. + """ + key = test_key_manager._get_test_passphrase() + self.assertIsNotNone(key) + + stored_id = self.key_mgr.store(self.ctxt, key) + self.addCleanup(self.key_mgr.delete, self.ctxt, stored_id, True) + self.assertIsNotNone(stored_id) + + for consumer in add_consumers: + self.key_mgr.add_consumer(self.ctxt, stored_id, consumer) + stored_secret = self.key_mgr.get(self.ctxt, stored_id) + self.assertCountEqual(add_consumers, stored_secret.consumers) + + for consumer in remove_consumers: + self.key_mgr.remove_consumer(self.ctxt, stored_id, consumer) + stored_secret = self.key_mgr.get(self.ctxt, stored_id) + + removed_ids = set([v['resource_id'] for v in remove_consumers]) + remaining_consumers = [v for v in add_consumers + if v['resource_id'] not in removed_ids] + self.assertCountEqual(remaining_consumers, stored_secret.consumers) + + @utils.parameterized_dataset({ + 'no_args': [[{}]], + 'one_arg_1': [[{'service': 'service1'}]], + 'one_arg_2': [[{'resource_type': 'type1'}]], + 'one_arg_3': [[{'resource_id': 'id1'}]], + 'two_args_1': [[{'service': 'service1', + 'resource_type': 'type1'}]], + 'two_args_2': [[{'service': 'service1', + 'resource_id': 'id1'}]], + 'two_args_3': [[{'resource_type': 'type1', + 'resource_id': 'id'}]] + }) + def test_consumer_add_missing_positional_arguments(self, consumers): + """Missing Positional Arguments - Addition + + Tries to add a secret consumer without providing all of the required + positional arguments (service, resource_type, resource_id). + """ + key = test_key_manager._get_test_passphrase() + self.assertIsNotNone(key) + + stored_id = self.key_mgr.store(self.ctxt, key) + self.addCleanup(self.key_mgr.delete, self.ctxt, stored_id, True) + self.assertIsNotNone(stored_id) + + for consumer in consumers: + e = self.assertRaises( + TypeError, + self.key_mgr.add_consumer, + self.ctxt, stored_id, consumer) + self.assertIn('register_consumer() missing', str(e)) + + @utils.parameterized_dataset({ + 'no_args': [[{}]], + 'one_arg_1': [[{'service': 'service1'}]], + 'one_arg_2': [[{'resource_type': 'type1'}]], + 'one_arg_3': [[{'resource_id': 'id1'}]], + 'two_args_1': [[{'service': 'service1', + 'resource_type': 'type1'}]], + 'two_args_2': [[{'service': 'service1', + 'resource_id': 'id1'}]], + 'two_args_3': [[{'resource_type': 'type1', + 'resource_id': 'id'}]] + }) + def test_consumer_remove_missing_positional_arguments(self, consumers): + """Missing Positional Arguments - Removal + + Tries to remove a secret consumer without providing all of the required + positional arguments (service, resource_type, resource_id). + """ + key = test_key_manager._get_test_passphrase() + self.assertIsNotNone(key) + + stored_id = self.key_mgr.store(self.ctxt, key) + self.addCleanup(self.key_mgr.delete, self.ctxt, stored_id, True) + self.assertIsNotNone(stored_id) + + consumer_data = { + 'service': 'service1', + 'resource_type': 'type1', + 'resource_id': 'id1' + } + self.key_mgr.add_consumer(self.ctxt, stored_id, consumer_data) + for consumer in consumers: + e = self.assertRaises( + TypeError, + self.key_mgr.remove_consumer, + self.ctxt, stored_id, consumer) + self.assertIn('remove_consumer() missing', str(e)) + + def test_consumer_add_two_remove_one_check_consumers_list(self): + """Consumers addition and removal - check of list consistency + + Adds two consumers, removes one and verifies if the consumers + list's length is consistent (equals to 1). + """ + key = test_key_manager._get_test_passphrase() + self.assertIsNotNone(key) + + stored_id = self.key_mgr.store(self.ctxt, key) + self.addCleanup(self.key_mgr.delete, self.ctxt, stored_id, True) + self.assertIsNotNone(stored_id) + + consumers = [ + {'service': 'service1', + 'resource_type': 'type1', + 'resource_id': 'id1'}, + {'service': 'service2', + 'resource_type': 'type2', + 'resource_id': 'id2'} + ] + for consumer in consumers: + self.key_mgr.add_consumer(self.ctxt, stored_id, consumer) + stored_secret = self.key_mgr.get(self.ctxt, stored_id) + self.assertCountEqual(consumers, stored_secret.consumers) + + self.key_mgr.remove_consumer(self.ctxt, stored_id, consumers[0]) + stored_secret = self.key_mgr.get(self.ctxt, stored_id) + self.assertCountEqual(consumers[1:], stored_secret.consumers) + class BarbicanKeyManagerOSLOContextTestCase(BarbicanKeyManagerTestCase, base.BaseTestCase):