diff --git a/barbican_tempest_plugin/tests/scenario/barbican_manager.py b/barbican_tempest_plugin/tests/scenario/barbican_manager.py index 0844d96..a98d2cc 100644 --- a/barbican_tempest_plugin/tests/scenario/barbican_manager.py +++ b/barbican_tempest_plugin/tests/scenario/barbican_manager.py @@ -38,7 +38,7 @@ LOG = logging.getLogger(__name__) class BarbicanScenarioTest(mgr.ScenarioTest): - credentials = ('primary', ) + credentials = ('primary', 'admin') manager = clients.Manager() def setUp(self): @@ -66,6 +66,7 @@ class BarbicanScenarioTest(mgr.ScenarioTest): super(BarbicanScenarioTest, cls).setup_clients() os = getattr(cls, 'os_%s' % cls.credentials[0]) + os_adm = getattr(cls, 'os_%s' % cls.credentials[1]) cls.consumer_client = os.secret_v1.ConsumerClient( service='key-manager' ) @@ -78,6 +79,18 @@ class BarbicanScenarioTest(mgr.ScenarioTest): service='key-manager' ) + if CONF.compute_feature_enabled.attach_encrypted_volume: + if CONF.volume_feature_enabled.api_v2: + cls.admin_volume_types_client =\ + os_adm.volume_types_v2_client + cls.admin_encryption_types_client =\ + os_adm.encryption_types_v2_client + else: + cls.admin_volume_types_client =\ + os_adm.volume_types_client + cls.admin_encryption_types_client =\ + os_adm.encryption_types_client + def _get_uuid(self, href): return href.split('/')[-1] @@ -156,3 +169,16 @@ class BarbicanScenarioTest(mgr.ScenarioTest): LOG.debug("Uploaded image %s", img_uuid) return img_uuid + + def create_encryption_type(self, client=None, type_id=None, provider=None, + key_size=None, cipher=None, + control_location=None): + if not client: + client = self.admin_encryption_types_client + if not type_id: + volume_type = self.create_volume_type() + type_id = volume_type['id'] + LOG.debug("Creating an encryption type for volume type: %s", type_id) + client.create_encryption_type( + type_id, provider=provider, key_size=key_size, cipher=cipher, + control_location=control_location) diff --git a/barbican_tempest_plugin/tests/scenario/manager.py b/barbican_tempest_plugin/tests/scenario/manager.py index 385c590..698c70d 100644 --- a/barbican_tempest_plugin/tests/scenario/manager.py +++ b/barbican_tempest_plugin/tests/scenario/manager.py @@ -18,8 +18,10 @@ from oslo_log import log from tempest.common import compute from tempest.common import image as common_image +from tempest.common.utils.linux import remote_client from tempest.common import waiters from tempest import config +from tempest import exceptions from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import test_utils from tempest.lib import exceptions as lib_exc @@ -320,3 +322,234 @@ class ScenarioTest(tempest.test.BaseTestCase): self.compute_floating_ips_client.associate_floating_ip_to_server( floating_ip['ip'], thing['id']) return floating_ip + + def nova_volume_attach(self, server, volume_to_attach): + volume = self.servers_client.attach_volume( + server['id'], volumeId=volume_to_attach['id'], device='/dev/%s' + % CONF.compute.volume_device_name)['volumeAttachment'] + self.assertEqual(volume_to_attach['id'], volume['id']) + waiters.wait_for_volume_resource_status(self.volumes_client, + volume['id'], 'in-use') + + # Return the updated volume after the attachment + return self.volumes_client.show_volume(volume['id'])['volume'] + + def nova_volume_detach(self, server, volume): + self.servers_client.detach_volume(server['id'], volume['id']) + waiters.wait_for_volume_resource_status(self.volumes_client, + volume['id'], 'available') + + volume = self.volumes_client.show_volume(volume['id'])['volume'] + self.assertEqual('available', volume['status']) + + def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt', + private_key=None): + ssh_client = self.get_remote_client(ip_address, + private_key=private_key) + if dev_name is not None: + ssh_client.make_fs(dev_name) + ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name, + mount_path)) + cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path + ssh_client.exec_command(cmd_timestamp) + timestamp = ssh_client.exec_command('sudo cat %s/timestamp' + % mount_path) + if dev_name is not None: + ssh_client.exec_command('sudo umount %s' % mount_path) + return timestamp + + def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt', + private_key=None): + ssh_client = self.get_remote_client(ip_address, + private_key=private_key) + if dev_name is not None: + ssh_client.mount(dev_name, mount_path) + timestamp = ssh_client.exec_command('sudo cat %s/timestamp' + % mount_path) + if dev_name is not None: + ssh_client.exec_command('sudo umount %s' % mount_path) + return timestamp + + def get_server_ip(self, server): + """Get the server fixed or floating IP. + + Based on the configuration we're in, return a correct ip + address for validating that a guest is up. + """ + if CONF.validation.connect_method == 'floating': + # The tests calling this method don't have a floating IP + # and can't make use of the validation resources. So the + # method is creating the floating IP there. + return self.create_floating_ip(server)['ip'] + elif CONF.validation.connect_method == 'fixed': + # Determine the network name to look for based on config or creds + # provider network resources. + if CONF.validation.network_for_ssh: + addresses = server['addresses'][ + CONF.validation.network_for_ssh] + else: + creds_provider = self._get_credentials_provider() + net_creds = creds_provider.get_primary_creds() + network = getattr(net_creds, 'network', None) + addresses = (server['addresses'][network['name']] + if network else []) + for address in addresses: + if (address['version'] == CONF.validation.ip_version_for_ssh + and address['OS-EXT-IPS:type'] == 'fixed'): + return address['addr'] + raise exceptions.ServerUnreachable(server_id=server['id']) + else: + raise lib_exc.InvalidConfiguration() + + def get_remote_client(self, ip_address, username=None, private_key=None): + """Get a SSH client to a remote server + + @param ip_address the server floating or fixed IP address to use + for ssh validation + @param username name of the Linux account on the remote server + @param private_key the SSH private key to use + @return a RemoteClient object + """ + + if username is None: + username = CONF.validation.image_ssh_user + # Set this with 'keypair' or others to log in with keypair or + # username/password. + if CONF.validation.auth_method == 'keypair': + password = None + if private_key is None: + private_key = self.keypair['private_key'] + else: + password = CONF.validation.image_ssh_password + private_key = None + linux_client = remote_client.RemoteClient(ip_address, username, + pkey=private_key, + password=password) + try: + linux_client.validate_authentication() + except Exception as e: + message = ('Initializing SSH connection to %(ip)s failed. ' + 'Error: %(error)s' % {'ip': ip_address, + 'error': e}) + caller = test_utils.find_test_caller() + if caller: + message = '(%s) %s' % (caller, message) + LOG.exception(message) + self._log_console_output() + raise + + return linux_client + + def _default_security_group(self, client=None, tenant_id=None): + """Get default secgroup for given tenant_id. + + :returns: default secgroup for given tenant + """ + if client is None: + client = self.security_groups_client + if not tenant_id: + tenant_id = client.tenant_id + sgs = [ + sg for sg in list(client.list_security_groups().values())[0] + if sg['tenant_id'] == tenant_id and sg['name'] == 'default' + ] + msg = "No default security group for tenant %s." % (tenant_id) + self.assertGreater(len(sgs), 0, msg) + return sgs[0] + + def _create_security_group(self): + # Create security group + sg_name = data_utils.rand_name(self.__class__.__name__) + sg_desc = sg_name + " description" + secgroup = self.compute_security_groups_client.create_security_group( + name=sg_name, description=sg_desc)['security_group'] + self.assertEqual(secgroup['name'], sg_name) + self.assertEqual(secgroup['description'], sg_desc) + self.addCleanup( + test_utils.call_and_ignore_notfound_exc, + self.compute_security_groups_client.delete_security_group, + secgroup['id']) + + # Add rules to the security group + self._create_loginable_secgroup_rule(secgroup['id']) + + return secgroup + + def _create_loginable_secgroup_rule(self, secgroup_id=None): + _client = self.compute_security_groups_client + _client_rules = self.compute_security_group_rules_client + if secgroup_id is None: + sgs = _client.list_security_groups()['security_groups'] + for sg in sgs: + if sg['name'] == 'default': + secgroup_id = sg['id'] + + # These rules are intended to permit inbound ssh and icmp + # traffic from all sources, so no group_id is provided. + # Setting a group_id would only permit traffic from ports + # belonging to the same security group. + rulesets = [ + { + # ssh + 'ip_protocol': 'tcp', + 'from_port': 22, + 'to_port': 22, + 'cidr': '0.0.0.0/0', + }, + { + # ping + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'cidr': '0.0.0.0/0', + } + ] + rules = list() + for ruleset in rulesets: + sg_rule = _client_rules.create_security_group_rule( + parent_group_id=secgroup_id, **ruleset)['security_group_rule'] + rules.append(sg_rule) + return rules + + def _create_security_group_rule(self, secgroup=None, + sec_group_rules_client=None, + tenant_id=None, + security_groups_client=None, **kwargs): + """Create a rule from a dictionary of rule parameters. + + Create a rule in a secgroup. if secgroup not defined will search for + default secgroup in tenant_id. + + :param secgroup: the security group. + :param tenant_id: if secgroup not passed -- the tenant in which to + search for default secgroup + :param kwargs: a dictionary containing rule parameters: + for example, to allow incoming ssh: + rule = { + direction: 'ingress' + protocol:'tcp', + port_range_min: 22, + port_range_max: 22 + } + """ + if sec_group_rules_client is None: + sec_group_rules_client = self.security_group_rules_client + if security_groups_client is None: + security_groups_client = self.security_groups_client + if not tenant_id: + tenant_id = security_groups_client.tenant_id + if secgroup is None: + secgroup = self._default_security_group( + client=security_groups_client, tenant_id=tenant_id) + + ruleset = dict(security_group_id=secgroup['id'], + tenant_id=secgroup['tenant_id']) + ruleset.update(kwargs) + + sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset) + sg_rule = sg_rule['security_group_rule'] + + self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id']) + self.assertEqual(secgroup['id'], sg_rule['security_group_id']) + + return sg_rule diff --git a/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py b/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py new file mode 100644 index 0000000..9c42694 --- /dev/null +++ b/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py @@ -0,0 +1,115 @@ +# Copyright (c) 2017 Johns Hopkins University Applied Physics Laboratory +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging +from tempest import config +from tempest.lib import decorators +from tempest import test + +from barbican_tempest_plugin.tests.scenario import barbican_manager + +CONF = config.CONF +LOG = logging.getLogger(__name__) + + +class VolumeEncryptionTest(barbican_manager.BarbicanScenarioTest): + + """The test suite for encrypted cinder volumes + + This test is for verifying the functionality of encrypted cinder volumes. + For both LUKS and cryptsetup encryption types, this test performs + the following: + * Creates an image in Glance + * Boots an instance from the image + * Creates an encryption type (as admin) + * Creates a volume of that encryption type (as a regular user) + * Attaches and detaches the encrypted volume to the instance + NOTE (dane-fichter): These tests use a key stored in Barbican, unlike + the original volume encryption scenario in Tempest. + """ + + @classmethod + def skip_checks(cls): + super(VolumeEncryptionTest, cls).skip_checks() + if not CONF.compute_feature_enabled.attach_encrypted_volume: + raise cls.skipException('Encrypted volume attach is not supported') + + def create_encrypted_volume(self, encryption_provider, volume_type): + volume_type = self.create_volume_type(name=volume_type) + self.create_encryption_type(type_id=volume_type['id'], + provider=encryption_provider, + key_size=256, + cipher='aes-xts-plain64', + control_location='front-end') + return self.create_volume(volume_type=volume_type['name']) + + def attach_detach_volume(self, server, volume, keypair): + # Attach volume + attached_volume = self.nova_volume_attach(server, volume) + + # Write a timestamp to volume + server_ip = self.get_server_ip(server) + timestamp = self.create_timestamp( + server_ip, + dev_name=CONF.compute.volume_device_name, + private_key=keypair['private_key'] + ) + timestamp2 = self.get_timestamp( + server_ip, + dev_name=CONF.compute.volume_device_name, + private_key=keypair['private_key'] + ) + self.assertEqual(timestamp, timestamp2) + + # Detach volume + self.nova_volume_detach(server, attached_volume) + + @decorators.idempotent_id('89165fb4-5534-4b9d-8429-97ccffb8f86f') + @test.services('compute', 'volume', 'image') + def test_encrypted_cinder_volumes_luks(self): + img_uuid = self.sign_and_upload_image() + LOG.info("Creating keypair and security group") + keypair = self.create_keypair() + security_group = self._create_security_group() + server = self.create_server( + name='signed_img_server', + image_id=img_uuid, + key_name=keypair['name'], + security_groups=[{'name': security_group['name']}], + wait_until='ACTIVE' + ) + volume = self.create_encrypted_volume('nova.volume.encryptors.' + 'luks.LuksEncryptor', + volume_type='luks') + self.attach_detach_volume(server, volume, keypair) + + @decorators.idempotent_id('cbc752ed-b716-4727-910f-956ccf965723') + @test.services('compute', 'volume', 'image') + def test_encrypted_cinder_volumes_cryptsetup(self): + img_uuid = self.sign_and_upload_image() + LOG.info("Creating keypair and security group") + keypair = self.create_keypair() + security_group = self._create_security_group() + + server = self.create_server( + name='signed_img_server', + image_id=img_uuid, + key_name=keypair['name'], + security_groups=[{'name': security_group['name']}], + wait_until='ACTIVE' + ) + volume = self.create_encrypted_volume('nova.volume.encryptors.' + 'cryptsetup.CryptsetupEncryptor', + volume_type='cryptsetup') + self.attach_detach_volume(server, volume, keypair)