From 394b8ccb47d8ca73e604715c2aa7340617cddabb Mon Sep 17 00:00:00 2001 From: Ade Lee Date: Tue, 8 Aug 2017 21:41:20 +0000 Subject: [PATCH] More changes to get ipa tests working --- novajoin_tempest_plugin/ipa/ipa_client.py | 23 ++-- .../tests/scenario/manager.py | 3 +- .../tests/scenario/novajoin_manager.py | 111 +++++++++++------- 3 files changed, 84 insertions(+), 53 deletions(-) diff --git a/novajoin_tempest_plugin/ipa/ipa_client.py b/novajoin_tempest_plugin/ipa/ipa_client.py index 062b91a..5ae5353 100644 --- a/novajoin_tempest_plugin/ipa/ipa_client.py +++ b/novajoin_tempest_plugin/ipa/ipa_client.py @@ -54,10 +54,9 @@ class IPABase(object): self.ccache = "MEMORY:" + str(uuid.uuid4()) os.environ['KRB5CCNAME'] = self.ccache + os.environ['KRB5_CLIENT_KTNAME'] = '/home/stack/krb5.keytab' if self._ipa_client_configured() and not api.isdone('finalize'): (hostname, realm) = self.get_host_and_realm() - kinit_keytab(str('nova/%s@%s' % (hostname, realm)), - self.keytab, self.ccache) api.bootstrap(context='novajoin') api.finalize() self.batch_args = list() @@ -159,23 +158,33 @@ class IPABase(object): class IPAClient(IPABase): def find_host(self, hostname): - params = [hostname] + params = [unicode(hostname)] return self._call_ipa('host_find', *params) def show_host(self, hostname): - params = [hostname] - return self._call_ipa('host-show', *params) + params = [unicode(hostname)] + return self._call_ipa('host_show', *params) def find_service(self, service_principal): - params = [service_principal] + params = [unicode(service_principal)] service_args = {} return self._call_ipa('service_find', *params, **service_args) def show_service(self, service_principal): - params = [service_principal] + params = [unicode(service_principal)] service_args = {} return self._call_ipa('service_show', *params, **service_args) + def get_service_cert(self, service_principal): + params = [unicode(service_principal)] + service_args = {} + result = self._call_ipa('service_find', *params, **service_args) + serviceresult = result['result'][0] + if 'serial_number' in serviceresult: + return serviceresult['serial_number'] + else: + return None + def service_managed_by_host(self, service_principal, host): """Return True if service is managed by specified host""" params = [service_principal] diff --git a/novajoin_tempest_plugin/tests/scenario/manager.py b/novajoin_tempest_plugin/tests/scenario/manager.py index 7eb7c82..04c9dd9 100644 --- a/novajoin_tempest_plugin/tests/scenario/manager.py +++ b/novajoin_tempest_plugin/tests/scenario/manager.py @@ -76,7 +76,8 @@ class ScenarioTest(tempest.test.BaseTestCase): if CONF.volume_feature_enabled.api_v2: cls.volumes_client = cls.os_primary.volumes_v2_client cls.snapshots_client = cls.os_primary.snapshots_v2_client - else: + + if CONF.volume_feature_enabled.api_v1: cls.volumes_client = cls.os_primary.volumes_client cls.snapshots_client = cls.os_primary.snapshots_client diff --git a/novajoin_tempest_plugin/tests/scenario/novajoin_manager.py b/novajoin_tempest_plugin/tests/scenario/novajoin_manager.py index 9bfca46..ebaf180 100644 --- a/novajoin_tempest_plugin/tests/scenario/novajoin_manager.py +++ b/novajoin_tempest_plugin/tests/scenario/novajoin_manager.py @@ -12,19 +12,30 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import subprocess + from oslo_log import log as logging from tempest import config -from tempest import test from novajoin_tempest_plugin.ipa import ipa_client +from novajoin_tempest_plugin.tests.scenario import manager + CONF = config.CONF LOG = logging.getLogger(__name__) -class NovajoinScenarioTest(test.BaseTestCase): +class NovajoinScenarioTest(manager.ScenarioTest): + + credentials = ['primary', 'admin'] + def setUp(self): super(NovajoinScenarioTest, self).setUp() + @classmethod + def setup_credentials(cls): + cls.set_network_resources() + super(NovajoinScenarioTest, cls).setup_credentials() + @classmethod def skip_checks(cls): super(NovajoinScenarioTest, cls).skip_checks() @@ -37,62 +48,72 @@ class NovajoinScenarioTest(test.BaseTestCase): cls.ipa_client = ipa_client.IPAClient() def verify_host_registered_with_ipa(self, host): - # check if specified host is registered with ipa - # basically doing a host-show result = self.ipa_client.find_host(host) - if result['count'] > 0: - return True - return False + self.assertTrue(result['count'] > 0) + + def verify_host_not_registered_with_ipa(self, host): + result = self.ipa_client.find_host(host) + self.assertFalse(result['count'] > 0) def verify_host_has_keytab(self, host): - # check if specified host entry has a keytab result = self.ipa_client.show_host(host)['result'] - keytab_present = result['Keytab'] - if 'True' in keytab_present: - return True - return False + self.assertTrue(result['has_keytab']) - def verify_service_exists(self, service, host): - # verify service exists for host on ipa server - # needed for the triple-O tests - service_principal = '{servicename}/{hostname}'.format( - servicename=service, hostname=host + def verify_service_created(self, service, host, realm): + service_principal = '{servicename}/{hostname}@{realm}'.format( + servicename=service, hostname=host, realm=realm ) result = self.ipa_client.find_service(service_principal) - if result['count'] > 0: - return True - return False + self.assertTrue(result['count'] > 0) - def verify_host_is_ipaclient(self, host, user, keypair): - # ssh into the host - # do test like "getent passwd admin" or similar - cmd = 'ssh -i {key} {username}@{hostname} -C "id admin"'.format( - key=keypair, username=user, hostname=host + def verify_service_deleted(self, service, host, realm): + service_principal = '{servicename}/{hostname}@{realm}'.format( + servicename=service, hostname=host, realm=realm ) - result = self.ssh_client.exec_command(cmd) + result = self.ipa_client.find_service(service_principal) + self.assertFalse(result['count'] > 0) + + def get_service_cert(self, service, host, realm): + service_principal = '{servicename}/{hostname}@{realm}'.format( + servicename=service, hostname=host, realm=realm + ) + return self.ipa_client.get_service_cert(service_principal) + + def verify_host_is_ipaclient(self, hostip, user, keypair): + cmd = 'id admin' + private_key = keypair['private_key'] + ssh_client = self.get_remote_client(hostip, user, private_key) + result = ssh_client.exec_command(cmd) params = ['uid', 'gid', 'groups'] - if all(x in result for x in params): - return True - return False + self.assertTrue(all(x in result for x in params)) - def verify_cert_tracked(self, host, user, keypair, cn): - # ssh into the host with the provided keypair - # run certmonger command to ensure cert is - # being tracked + def verify_overcloud_host_is_ipaclient(self, hostip, user): + keypair = '/home/stack/.ssh/id_rsa' + cmd = ['ssh', '-i', keypair, + '{user}@{hostip}'.format(user=user, hostip=hostip), + '-C', 'id admin'] - cmd = ( - 'ssh -i {key} {username}@{hostname} -C "sudo getcert list"'.format( - key=keypair, username=user, hostname=host) - ) - result = self.ssh_client.exec_command(cmd) - if cn in result: - return True - return False + result = subprocess.check_output(cmd) + params = ['uid', 'gid', 'groups'] + self.assertTrue(all(x in result for x in params)) + + def verify_cert_tracked(self, hostip, user, keypair, cert_id): + cmd = 'sudo getcert list -i {certid}'.format(certid=cert_id) + private_key = keypair['private_key'] + ssh_client = self.get_remote_client(hostip, user, private_key) + result = ssh_client.exec_command(cmd) + self.assertTrue('track: yes' in result) + + def verify_overcloud_cert_tracked(self, hostip, user, cert_id): + keypair = '/home/stack/.ssh/id_rsa' + cmd = ['ssh', '-i', keypair, + '{user}@{hostip}'.format(user=user, hostip=hostip), + '-C', 'sudo getcert list -i {certid}'.format(certid=cert_id)] + + result = subprocess.check_output(cmd) + self.assertTrue('track: yes' in result) def verify_cert_revoked(self, serial): # verify that the given certificate has been revoked result = self.ipa_client.show_cert(serial)['result'] - is_revoked = result['Revoked'] - if 'True' in is_revoked: - return True - return False + self.assertTrue(result['revoked'])