Fix PEP8 issues
This commit is contained in:
parent
0d8f06b56a
commit
ed20d98c84
|
@ -13,28 +13,33 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
from tempest import config
|
||||
|
||||
from tempest import clients
|
||||
from tempest.scenario import manager as mgr
|
||||
from tempest.lib.common import ssh
|
||||
import tempest.test
|
||||
|
||||
CONF = config.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NovajoinScenarioTest(mgr.ScenarioTest):
|
||||
|
||||
credentials = ('primary', 'admin')
|
||||
manager = clients.Manager()
|
||||
manager = clients.Manager(credentials)
|
||||
|
||||
def setUp(self):
|
||||
super(NovajoinScenarioTest, self).setUp()
|
||||
ssh_host = CONF.tripleo.undercloud_hostname
|
||||
ssh_user = CONF.stress.target_ssh_user
|
||||
ssh_key = CONF.stress.target_private_key_path
|
||||
ssh_client = ssh.Client(ssh_host, ssh_user, key_filename=ssh_key)
|
||||
self.ssh_client = ssh.Client(ssh_host, ssh_user, key_filename=ssh_key)
|
||||
|
||||
@classmethod
|
||||
def skip_checks(cls):
|
||||
super(NovajoinScenarioTest, cls).skip_checks()
|
||||
cmd = ('source ~/stackrc;openstack service list | grep novajoin')
|
||||
|
||||
cmd = 'source ~/stackrc;openstack service list | grep novajoin'
|
||||
novajoin_enabled = ssh_client.exec_command(cmd)
|
||||
if not novajoin_enabled:
|
||||
raise cls.skipException("Novajoin is not enabled")
|
||||
|
@ -42,7 +47,6 @@ class NovajoinScenarioTest(mgr.ScenarioTest):
|
|||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(NovajoinScenarioTest, cls).setup_clients()
|
||||
|
||||
# os = getattr(cls, 'os_%s' % cls.credentials[0])
|
||||
# os_adm = getattr(cls, 'os_%s' % cls.credentials[1])
|
||||
# set up ipa client
|
||||
|
@ -50,65 +54,66 @@ class NovajoinScenarioTest(mgr.ScenarioTest):
|
|||
def verify_host_registered_with_ipa(self, host):
|
||||
# check if specified host is registered with ipa
|
||||
# basically doing a host-show
|
||||
|
||||
cmd = 'ipa host-show {hostname}'.format(hostname = host)
|
||||
result = ssh_client.exec_command(cmd)
|
||||
|
||||
cmd = 'ipa host-show {hostname}'.format(hostname=host)
|
||||
result = self.ssh_client.exec_command(cmd)
|
||||
if host in result:
|
||||
return true
|
||||
return false
|
||||
return True
|
||||
return False
|
||||
|
||||
def verify_host_has_keytab(self, host):
|
||||
# check if specified host entry has a keytab
|
||||
|
||||
cmd = 'ipa host-show {hostname} | grep Keytab'.format(hostname = host)
|
||||
result = ssh_client.exec_command(cmd)
|
||||
cmd = 'ipa host-show {hostname} | grep Keytab'.format(hostname=host)
|
||||
result = self.ssh_client.exec_command(cmd)
|
||||
if 'True' in result:
|
||||
return true
|
||||
return false
|
||||
return True
|
||||
return False
|
||||
|
||||
def verify_service_exists(self, service, host):
|
||||
# verify service exists for host on ipa server
|
||||
# needed for the triple-O tests
|
||||
|
||||
cmd = 'ipa service-show {servicename}/{hostname}'.format(
|
||||
servicename=service, hostname=host
|
||||
)
|
||||
result = ssh_client.exec_command(cmd)
|
||||
cmd = 'ipa service-show {servicename}/{hostname}'.format(
|
||||
servicename=service, hostname=host
|
||||
)
|
||||
result = self.ssh_client.exec_command(cmd)
|
||||
if service in result:
|
||||
return true
|
||||
return false
|
||||
return True
|
||||
return False
|
||||
|
||||
def verify_host_is_ipaclient(self, host, user, keypair):
|
||||
# ssh into the host
|
||||
# do test like "getent passwd admin" or similar
|
||||
cmd = 'ssh -i {key} {username}@{hostname} -C "id admin"'.format(
|
||||
key=keypair, username=user, hostname=host
|
||||
)
|
||||
result = ssh_client.exec_command(cmd)
|
||||
vars = ['uid', 'gid', 'groups']
|
||||
if all(x in result for x in vars):
|
||||
return true
|
||||
return false
|
||||
key=keypair, username=user, hostname=host
|
||||
)
|
||||
result = self.ssh_client.exec_command(cmd)
|
||||
params = ['uid', 'gid', 'groups']
|
||||
if all(x in result for x in params):
|
||||
return True
|
||||
return False
|
||||
|
||||
def verify_cert_tracked(self, host, user, keypair, cn):
|
||||
# ssh into the host with the provided keypair
|
||||
# run certmonger command to ensure cert is
|
||||
# being tracked
|
||||
|
||||
cmd = 'ssh -i {key} {username}@{hostname} -C "sudo getcert list"'.format(
|
||||
key=keypair, username=user, hostname=host
|
||||
)
|
||||
result = ssh_client.exec_command(cmd)
|
||||
cmd = (
|
||||
'ssh -i {key} {username}@{hostname} -C "sudo getcert list"'.format(
|
||||
key=keypair, username=user, hostname=host)
|
||||
)
|
||||
result = self.ssh_client.exec_command(cmd)
|
||||
if cn in result:
|
||||
return true
|
||||
return false
|
||||
return True
|
||||
return False
|
||||
|
||||
def verify_cert_revoked(self, serial):
|
||||
# verify that the given certificate has been revoked
|
||||
cmd = 'ipa cert-show {serial} |grep Revoked'.format(
|
||||
serial=serial
|
||||
)
|
||||
result = ssh_client.exec_command(cmd)
|
||||
serial=serial
|
||||
)
|
||||
result = self.ssh_client.exec_command(cmd)
|
||||
if 'True' in result:
|
||||
return true
|
||||
return false
|
||||
return True
|
||||
return False
|
||||
|
|
Loading…
Reference in New Issue