More changes to get ipa tests working
This commit is contained in:
parent
ff9395ce6e
commit
394b8ccb47
|
@ -54,10 +54,9 @@ class IPABase(object):
|
|||
|
||||
self.ccache = "MEMORY:" + str(uuid.uuid4())
|
||||
os.environ['KRB5CCNAME'] = self.ccache
|
||||
os.environ['KRB5_CLIENT_KTNAME'] = '/home/stack/krb5.keytab'
|
||||
if self._ipa_client_configured() and not api.isdone('finalize'):
|
||||
(hostname, realm) = self.get_host_and_realm()
|
||||
kinit_keytab(str('nova/%s@%s' % (hostname, realm)),
|
||||
self.keytab, self.ccache)
|
||||
api.bootstrap(context='novajoin')
|
||||
api.finalize()
|
||||
self.batch_args = list()
|
||||
|
@ -159,23 +158,33 @@ class IPABase(object):
|
|||
class IPAClient(IPABase):
|
||||
|
||||
def find_host(self, hostname):
|
||||
params = [hostname]
|
||||
params = [unicode(hostname)]
|
||||
return self._call_ipa('host_find', *params)
|
||||
|
||||
def show_host(self, hostname):
|
||||
params = [hostname]
|
||||
return self._call_ipa('host-show', *params)
|
||||
params = [unicode(hostname)]
|
||||
return self._call_ipa('host_show', *params)
|
||||
|
||||
def find_service(self, service_principal):
|
||||
params = [service_principal]
|
||||
params = [unicode(service_principal)]
|
||||
service_args = {}
|
||||
return self._call_ipa('service_find', *params, **service_args)
|
||||
|
||||
def show_service(self, service_principal):
|
||||
params = [service_principal]
|
||||
params = [unicode(service_principal)]
|
||||
service_args = {}
|
||||
return self._call_ipa('service_show', *params, **service_args)
|
||||
|
||||
def get_service_cert(self, service_principal):
|
||||
params = [unicode(service_principal)]
|
||||
service_args = {}
|
||||
result = self._call_ipa('service_find', *params, **service_args)
|
||||
serviceresult = result['result'][0]
|
||||
if 'serial_number' in serviceresult:
|
||||
return serviceresult['serial_number']
|
||||
else:
|
||||
return None
|
||||
|
||||
def service_managed_by_host(self, service_principal, host):
|
||||
"""Return True if service is managed by specified host"""
|
||||
params = [service_principal]
|
||||
|
|
|
@ -76,7 +76,8 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
|||
if CONF.volume_feature_enabled.api_v2:
|
||||
cls.volumes_client = cls.os_primary.volumes_v2_client
|
||||
cls.snapshots_client = cls.os_primary.snapshots_v2_client
|
||||
else:
|
||||
|
||||
if CONF.volume_feature_enabled.api_v1:
|
||||
cls.volumes_client = cls.os_primary.volumes_client
|
||||
cls.snapshots_client = cls.os_primary.snapshots_client
|
||||
|
||||
|
|
|
@ -12,19 +12,30 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import subprocess
|
||||
|
||||
from oslo_log import log as logging
|
||||
from tempest import config
|
||||
from tempest import test
|
||||
|
||||
from novajoin_tempest_plugin.ipa import ipa_client
|
||||
from novajoin_tempest_plugin.tests.scenario import manager
|
||||
|
||||
CONF = config.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NovajoinScenarioTest(test.BaseTestCase):
|
||||
class NovajoinScenarioTest(manager.ScenarioTest):
|
||||
|
||||
credentials = ['primary', 'admin']
|
||||
|
||||
def setUp(self):
|
||||
super(NovajoinScenarioTest, self).setUp()
|
||||
|
||||
@classmethod
|
||||
def setup_credentials(cls):
|
||||
cls.set_network_resources()
|
||||
super(NovajoinScenarioTest, cls).setup_credentials()
|
||||
|
||||
@classmethod
|
||||
def skip_checks(cls):
|
||||
super(NovajoinScenarioTest, cls).skip_checks()
|
||||
|
@ -37,62 +48,72 @@ class NovajoinScenarioTest(test.BaseTestCase):
|
|||
cls.ipa_client = ipa_client.IPAClient()
|
||||
|
||||
def verify_host_registered_with_ipa(self, host):
|
||||
# check if specified host is registered with ipa
|
||||
# basically doing a host-show
|
||||
result = self.ipa_client.find_host(host)
|
||||
if result['count'] > 0:
|
||||
return True
|
||||
return False
|
||||
self.assertTrue(result['count'] > 0)
|
||||
|
||||
def verify_host_not_registered_with_ipa(self, host):
|
||||
result = self.ipa_client.find_host(host)
|
||||
self.assertFalse(result['count'] > 0)
|
||||
|
||||
def verify_host_has_keytab(self, host):
|
||||
# check if specified host entry has a keytab
|
||||
result = self.ipa_client.show_host(host)['result']
|
||||
keytab_present = result['Keytab']
|
||||
if 'True' in keytab_present:
|
||||
return True
|
||||
return False
|
||||
self.assertTrue(result['has_keytab'])
|
||||
|
||||
def verify_service_exists(self, service, host):
|
||||
# verify service exists for host on ipa server
|
||||
# needed for the triple-O tests
|
||||
service_principal = '{servicename}/{hostname}'.format(
|
||||
servicename=service, hostname=host
|
||||
def verify_service_created(self, service, host, realm):
|
||||
service_principal = '{servicename}/{hostname}@{realm}'.format(
|
||||
servicename=service, hostname=host, realm=realm
|
||||
)
|
||||
result = self.ipa_client.find_service(service_principal)
|
||||
if result['count'] > 0:
|
||||
return True
|
||||
return False
|
||||
self.assertTrue(result['count'] > 0)
|
||||
|
||||
def verify_host_is_ipaclient(self, host, user, keypair):
|
||||
# ssh into the host
|
||||
# do test like "getent passwd admin" or similar
|
||||
cmd = 'ssh -i {key} {username}@{hostname} -C "id admin"'.format(
|
||||
key=keypair, username=user, hostname=host
|
||||
def verify_service_deleted(self, service, host, realm):
|
||||
service_principal = '{servicename}/{hostname}@{realm}'.format(
|
||||
servicename=service, hostname=host, realm=realm
|
||||
)
|
||||
result = self.ssh_client.exec_command(cmd)
|
||||
result = self.ipa_client.find_service(service_principal)
|
||||
self.assertFalse(result['count'] > 0)
|
||||
|
||||
def get_service_cert(self, service, host, realm):
|
||||
service_principal = '{servicename}/{hostname}@{realm}'.format(
|
||||
servicename=service, hostname=host, realm=realm
|
||||
)
|
||||
return self.ipa_client.get_service_cert(service_principal)
|
||||
|
||||
def verify_host_is_ipaclient(self, hostip, user, keypair):
|
||||
cmd = 'id admin'
|
||||
private_key = keypair['private_key']
|
||||
ssh_client = self.get_remote_client(hostip, user, private_key)
|
||||
result = ssh_client.exec_command(cmd)
|
||||
params = ['uid', 'gid', 'groups']
|
||||
if all(x in result for x in params):
|
||||
return True
|
||||
return False
|
||||
self.assertTrue(all(x in result for x in params))
|
||||
|
||||
def verify_cert_tracked(self, host, user, keypair, cn):
|
||||
# ssh into the host with the provided keypair
|
||||
# run certmonger command to ensure cert is
|
||||
# being tracked
|
||||
def verify_overcloud_host_is_ipaclient(self, hostip, user):
|
||||
keypair = '/home/stack/.ssh/id_rsa'
|
||||
cmd = ['ssh', '-i', keypair,
|
||||
'{user}@{hostip}'.format(user=user, hostip=hostip),
|
||||
'-C', 'id admin']
|
||||
|
||||
cmd = (
|
||||
'ssh -i {key} {username}@{hostname} -C "sudo getcert list"'.format(
|
||||
key=keypair, username=user, hostname=host)
|
||||
)
|
||||
result = self.ssh_client.exec_command(cmd)
|
||||
if cn in result:
|
||||
return True
|
||||
return False
|
||||
result = subprocess.check_output(cmd)
|
||||
params = ['uid', 'gid', 'groups']
|
||||
self.assertTrue(all(x in result for x in params))
|
||||
|
||||
def verify_cert_tracked(self, hostip, user, keypair, cert_id):
|
||||
cmd = 'sudo getcert list -i {certid}'.format(certid=cert_id)
|
||||
private_key = keypair['private_key']
|
||||
ssh_client = self.get_remote_client(hostip, user, private_key)
|
||||
result = ssh_client.exec_command(cmd)
|
||||
self.assertTrue('track: yes' in result)
|
||||
|
||||
def verify_overcloud_cert_tracked(self, hostip, user, cert_id):
|
||||
keypair = '/home/stack/.ssh/id_rsa'
|
||||
cmd = ['ssh', '-i', keypair,
|
||||
'{user}@{hostip}'.format(user=user, hostip=hostip),
|
||||
'-C', 'sudo getcert list -i {certid}'.format(certid=cert_id)]
|
||||
|
||||
result = subprocess.check_output(cmd)
|
||||
self.assertTrue('track: yes' in result)
|
||||
|
||||
def verify_cert_revoked(self, serial):
|
||||
# verify that the given certificate has been revoked
|
||||
result = self.ipa_client.show_cert(serial)['result']
|
||||
is_revoked = result['Revoked']
|
||||
if 'True' in is_revoked:
|
||||
return True
|
||||
return False
|
||||
self.assertTrue(result['revoked'])
|
||||
|
|
Loading…
Reference in New Issue