diff --git a/.testr.conf b/.testr.conf index e7a868376..c1022c03a 100644 --- a/.testr.conf +++ b/.testr.conf @@ -2,7 +2,7 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} \ - ${PYTHON:-python} -m subunit.run discover -s ${OS_TEST_PATH:-./barbican} -t . $LISTOPT $IDOPTION + ${PYTHON:-python} -m subunit.run discover -s ${OS_TEST_PATH:-./barbican/tests/} -t . $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list diff --git a/barbican/cmd/functionaltests/.testr.conf b/barbican/cmd/functionaltests/.testr.conf new file mode 100644 index 000000000..d976b049e --- /dev/null +++ b/barbican/cmd/functionaltests/.testr.conf @@ -0,0 +1,9 @@ +[DEFAULT] +test_command= + OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ + OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ + OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} \ + ${PYTHON:-python} -m coverage run -a -m subunit.run discover -s ./cmd -t . $LISTOPT $IDOPTION + +test_id_option=--load-list $IDFILE +test_list_option=--list diff --git a/barbican/cmd/functionaltests/__init__.py b/barbican/cmd/functionaltests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/barbican/cmd/functionaltests/test_db_manage.py b/barbican/cmd/functionaltests/test_db_manage.py new file mode 100644 index 000000000..3b8610959 --- /dev/null +++ b/barbican/cmd/functionaltests/test_db_manage.py @@ -0,0 +1,318 @@ +# Copyright (c) 2016 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +from testtools import testcase + +from barbican.common import config as barbican_config +from barbican.tests import utils +from functionaltests.api import base +from functionaltests.api.v1.behaviors import container_behaviors +from functionaltests.api.v1.behaviors import secret_behaviors +from functionaltests.api.v1.models import container_models +from functionaltests.api.v1.models import secret_models +from functionaltests.common import config +from sqlalchemy import create_engine + +# Import and configure logging. +BCONF = barbican_config.CONF +CONF = config.get_config() +admin_a = CONF.rbac_users.admin_a +admin_b = CONF.rbac_users.admin_b + + +class DBManageTestCase(base.TestCase): + + def setUp(self): + super(DBManageTestCase, self).setUp() + self.sbehaviors = secret_behaviors.SecretBehaviors(self.client) + self.cbehaviors = container_behaviors.ContainerBehaviors(self.client) + + db_url = BCONF.sql_connection + + time.sleep(5) + # Setup session for tests to query DB + engine = create_engine(db_url) + self.conn = engine.connect() + + def tearDown(self): + super(DBManageTestCase, self).tearDown() + self.conn.close() + self.sbehaviors.delete_all_created_secrets() + self.cbehaviors.delete_all_created_containers() + + def _create_secret_list(self, + user, + delete=False, + expiration="2050-02-28T19:14:44.180394"): + + secret_defaults_data = { + "name": "AES key", + "expiration": expiration, + "algorithm": "aes", + "bit_length": 256, + "mode": "cbc", + "payload": "gF6+lLoF3ohA9aPRpt+6bQ==", + "payload_content_type": "application/octet-stream", + "payload_content_encoding": "base64", + } + + secret_list = [] + + for i in range(0, 5): + secret_model = secret_models.SecretModel(**secret_defaults_data) + resp, secret_ref = self.sbehaviors.create_secret(secret_model, + user_name=user) + + self.assertEqual(resp.status_code, 201) + self.assertIsNotNone(secret_ref) + + secret_list.append(secret_ref) + + if delete is True: + self._delete_secret_list(secret_list, user) + + return secret_list + + def _create_container_uuid_list( + self, + user, + secret_expiration="2050-02-28T19:14:44.180394", + delete_secret=False, + delete_container=False): + + secret_list = self._create_secret_list( + user=user, + expiration=secret_expiration + ) + + container_data = { + "name": "containername", + "type": "generic", + "secret_refs": [ + { + "name": "secret", + "secret_ref": secret_list[0] + } + ] + } + container_list = [] + + for i in range(0, 5): + container_model = container_models.ContainerModel(**container_data) + post_container_resp, c_ref = self.cbehaviors.create_container( + container_model, + user_name=user) + + self.assertEqual(post_container_resp.status_code, 201) + self.assertIsNotNone(c_ref) + + container_list.append(c_ref) + + if delete_container is True: + self._delete_container_list(container_list, user) + + if delete_secret is True: + self._delete_secret_list(secret_list) + + return container_list + + def _delete_secret_list(self, secret_list, user): + + for secret in secret_list: + del_resp = self.sbehaviors.delete_secret(secret, user_name=user) + self.assertEqual(del_resp.status_code, 204) + + def _delete_container_list(self, container_list, user): + for container in container_list: + del_resp = self.cbehaviors.delete_container(container, + user_name=user) + self.assertEqual(del_resp.status_code, 204) + + def _get_uuid(self, ref): + uuid = ref.split('/')[-1] + + return uuid + + @testcase.attr('positive') + def test_active_secret_not_deleted(self): + """Verify that active secrest are not removed""" + project_a_secrets = self._create_secret_list(user=admin_a) + project_b_secrets = self._create_secret_list(user=admin_b) + + os.system("python barbican/cmd/db_manage.py clean -m 0 -p -e") + + results = self.conn.execute("select * from secrets") + secret_list = [] + for row in results: + secret_list.append(str(row[0])) + + for secret in project_a_secrets: + secret_uuid = self._get_uuid(secret) + self.assertIn(secret_uuid, secret_list) + + for secret in project_b_secrets: + secret_uuid = self._get_uuid(secret) + self.assertIn(secret_uuid, secret_list) + + @testcase.attr('positive') + def test_soft_deleted_secrets_are_removed(self): + """Test that soft deleted secrets are removed""" + + project_a_secrets = self._create_secret_list(user=admin_a, + delete=True) + project_b_secrets = self._create_secret_list(user=admin_b, + delete=True) + + os.system("python barbican/cmd/db_manage.py clean -m 0 -p -e") + + results = self.conn.execute("select * from secrets") + secret_list = [] + for row in results: + secret_list.append(str(row[0])) + + for secret in project_a_secrets: + secret_uuid = self._get_uuid(secret) + self.assertNotIn(secret_uuid, secret_list) + + for secret in project_b_secrets: + secret_uuid = self._get_uuid(secret) + self.assertNotIn(secret_uuid, secret_list) + + @testcase.attr('positive') + def test_expired_secrets_are_not_removed_from_db(self): + """Test expired secrests are left in soft deleted state. + + Currently this clean will set the threshold at the start + of the test. Expired secrets will be deleted and the + deleted at date will now be later then the threshold + date. + """ + + current_time = utils.create_timestamp_w_tz_and_offset(seconds=10) + project_a_secrets = self._create_secret_list(user=admin_a, + expiration=current_time) + project_b_secrets = self._create_secret_list(user=admin_b, + expiration=current_time) + + time.sleep(10) + + os.system("python barbican/cmd/db_manage.py clean -m 0 -p -e") + + results = self.conn.execute("select * from secrets") + secret_list = [] + for row in results: + secret_list.append(str(row[0])) + + for secret in project_a_secrets: + secret_uuid = self._get_uuid(secret) + self.assertIn(secret_uuid, secret_list) + + for secret in project_b_secrets: + secret_uuid = self._get_uuid(secret) + self.assertIn(secret_uuid, secret_list) + + @testcase.attr('positive') + def test_no_soft_deleted_secrets_in_db(self): + """Test that no soft deleted secrests are in db""" + os.system("python barbican/cmd/db_manage.py clean -m 0 -p -e") + + results = self.conn.execute("select * from secrets where deleted=1") + secret_list = [] + for row in results: + secret_list.append(str(row[0])) + + self.assertEqual(len(secret_list), 0) + + @testcase.attr('positive') + def test_active_containers_not_deleted(self): + """Active containers are not deleted""" + project_a_containers = self._create_container_uuid_list( + user=admin_a) + project_b_containers = self._create_container_uuid_list( + user=admin_b) + + os.system("python barbican/cmd/db_manage.py clean -m 0 -p -e") + + results = self.conn.execute("select * from containers") + container_list = [] + for row in results: + container_list.append(str(row[0])) + + for container in project_a_containers: + container_uuid = self._get_uuid(container) + self.assertIn(container_uuid, container_list) + + for container in project_b_containers: + container_uuid = self._get_uuid(container) + self.assertIn(container_uuid, container_list) + + @testcase.attr('positive') + def test_cleanup_soft_deleted_containers(self): + """Soft deleted containers are deleted""" + project_a_delete_containers = self._create_container_uuid_list( + user=admin_a, + delete_container=True) + project_b_delete_containers = self._create_container_uuid_list( + user=admin_b, + delete_container=True) + + os.system("python barbican/cmd/db_manage.py clean -m 0 -p -e") + + results = self.conn.execute("select * from containers") + container_list = [] + for row in results: + container_list.append(str(row[0])) + + for container in project_a_delete_containers: + container_uuid = self._get_uuid(container) + self.assertNotIn(container_uuid, container_list) + + for container in project_b_delete_containers: + container_uuid = self._get_uuid(container) + self.assertNotIn(container_uuid, container_list) + + @testcase.attr('positive') + def test_containers_with_exipred_secrets_are_deleted(self): + """Containers with expired secrets are deleted""" + current_time = utils.create_timestamp_w_tz_and_offset(seconds=10) + + project_a_delete_containers = self._create_container_uuid_list( + user=admin_a, + delete_container=True, + secret_expiration=current_time) + project_b_delete_containers = self._create_container_uuid_list( + user=admin_b, + delete_container=True, + secret_expiration=current_time) + + time.sleep(10) + + os.system("python barbican/cmd/db_manage.py clean -m 0 -p -e") + + results = self.conn.execute("select * from containers") + container_list = [] + for row in results: + container_list.append(str(row[0])) + + for container in project_a_delete_containers: + container_uuid = self._get_uuid(container) + self.assertNotIn(container_uuid, container_list) + + for container in project_b_delete_containers: + container_uuid = self._get_uuid(container) + self.assertNotIn(container_uuid, container_list) diff --git a/tox.ini b/tox.ini index eefc90981..5ec51ff81 100644 --- a/tox.ini +++ b/tox.ini @@ -69,6 +69,15 @@ commands = /bin/bash {toxinidir}/functionaltests/pretty_tox.sh '{posargs}' passenv = KMIP_PLUGIN_ENABLED +[testenv:cmd] +# This tox env is purely to make local test development easier +# Note: This requires local running instances of Barbican and Keystone +deps = -r{toxinidir}/test-requirements.txt +setenv = OS_TEST_PATH={toxinidir}/barbican/cmd/functionaltests +commands = + /usr/bin/find . -type f -name "*.pyc" -delete + /bin/bash {toxinidir}/functionaltests/pretty_tox.sh '{posargs}' + [flake8] exclude = .git,.idea,.tox,bin,dist,debian,rpmbuild,tools,*.egg-info,*.eggs,*openstack/common,contrib, functionaltests,*alembic_migrations/versions,*docs/target,*.egg