diff --git a/novajoin_tempest_plugin/tests/scenario/novajoin_manager.py b/novajoin_tempest_plugin/tests/scenario/novajoin_manager.py index dd98d96..d613ac1 100644 --- a/novajoin_tempest_plugin/tests/scenario/novajoin_manager.py +++ b/novajoin_tempest_plugin/tests/scenario/novajoin_manager.py @@ -13,28 +13,33 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_log import log as logging +from tempest import config from tempest import clients from tempest.scenario import manager as mgr from tempest.lib.common import ssh -import tempest.test + +CONF = config.CONF +LOG = logging.getLogger(__name__) + class NovajoinScenarioTest(mgr.ScenarioTest): - credentials = ('primary', 'admin') - manager = clients.Manager() + manager = clients.Manager(credentials) def setUp(self): super(NovajoinScenarioTest, self).setUp() ssh_host = CONF.tripleo.undercloud_hostname ssh_user = CONF.stress.target_ssh_user ssh_key = CONF.stress.target_private_key_path - ssh_client = ssh.Client(ssh_host, ssh_user, key_filename=ssh_key) + self.ssh_client = ssh.Client(ssh_host, ssh_user, key_filename=ssh_key) @classmethod def skip_checks(cls): super(NovajoinScenarioTest, cls).skip_checks() - cmd = ('source ~/stackrc;openstack service list | grep novajoin') + + cmd = 'source ~/stackrc;openstack service list | grep novajoin' novajoin_enabled = ssh_client.exec_command(cmd) if not novajoin_enabled: raise cls.skipException("Novajoin is not enabled") @@ -42,7 +47,6 @@ class NovajoinScenarioTest(mgr.ScenarioTest): @classmethod def setup_clients(cls): super(NovajoinScenarioTest, cls).setup_clients() - # os = getattr(cls, 'os_%s' % cls.credentials[0]) # os_adm = getattr(cls, 'os_%s' % cls.credentials[1]) # set up ipa client @@ -50,65 +54,66 @@ class NovajoinScenarioTest(mgr.ScenarioTest): def verify_host_registered_with_ipa(self, host): # check if specified host is registered with ipa # basically doing a host-show - - cmd = 'ipa host-show {hostname}'.format(hostname = host) - result = ssh_client.exec_command(cmd) + + cmd = 'ipa host-show {hostname}'.format(hostname=host) + result = self.ssh_client.exec_command(cmd) if host in result: - return true - return false + return True + return False def verify_host_has_keytab(self, host): # check if specified host entry has a keytab - cmd = 'ipa host-show {hostname} | grep Keytab'.format(hostname = host) - result = ssh_client.exec_command(cmd) + cmd = 'ipa host-show {hostname} | grep Keytab'.format(hostname=host) + result = self.ssh_client.exec_command(cmd) if 'True' in result: - return true - return false + return True + return False def verify_service_exists(self, service, host): # verify service exists for host on ipa server # needed for the triple-O tests - cmd = 'ipa service-show {servicename}/{hostname}'.format( - servicename=service, hostname=host - ) - result = ssh_client.exec_command(cmd) + cmd = 'ipa service-show {servicename}/{hostname}'.format( + servicename=service, hostname=host + ) + result = self.ssh_client.exec_command(cmd) if service in result: - return true - return false + return True + return False def verify_host_is_ipaclient(self, host, user, keypair): # ssh into the host # do test like "getent passwd admin" or similar cmd = 'ssh -i {key} {username}@{hostname} -C "id admin"'.format( - key=keypair, username=user, hostname=host - ) - result = ssh_client.exec_command(cmd) - vars = ['uid', 'gid', 'groups'] - if all(x in result for x in vars): - return true - return false + key=keypair, username=user, hostname=host + ) + result = self.ssh_client.exec_command(cmd) + params = ['uid', 'gid', 'groups'] + if all(x in result for x in params): + return True + return False def verify_cert_tracked(self, host, user, keypair, cn): # ssh into the host with the provided keypair # run certmonger command to ensure cert is # being tracked - cmd = 'ssh -i {key} {username}@{hostname} -C "sudo getcert list"'.format( - key=keypair, username=user, hostname=host - ) - result = ssh_client.exec_command(cmd) + cmd = ( + 'ssh -i {key} {username}@{hostname} -C "sudo getcert list"'.format( + key=keypair, username=user, hostname=host) + ) + result = self.ssh_client.exec_command(cmd) if cn in result: - return true - return false + return True + return False def verify_cert_revoked(self, serial): # verify that the given certificate has been revoked cmd = 'ipa cert-show {serial} |grep Revoked'.format( - serial=serial - ) - result = ssh_client.exec_command(cmd) + serial=serial + ) + result = self.ssh_client.exec_command(cmd) if 'True' in result: - return true - return false + return True + return False