kayobe/playbooks/kayobe-overcloud-host-confi.../tests/test_overcloud_host_configu...

352 lines
11 KiB
Python

#!/usr/bin/env python3
# Kayobe overcloud host configure tests.
# Uses py.test and TestInfra.
import ipaddress
import os
import time
import distro
import pytest
def _is_apt():
info = distro.linux_distribution()
return info[0].startswith('Ubuntu')
def _is_dnf():
info = distro.id()
return info in ['centos', 'rocky']
def _is_dnf_mirror():
info = distro.id()
return info == 'centos'
def test_network_ethernet(host):
interface = host.interface('dummy2')
assert interface.exists
assert '192.168.34.1' in interface.addresses
routes = host.check_output('/sbin/ip route show dev dummy2')
assert '192.168.40.0/24 via 192.168.34.254' in routes
def test_network_ethernet_vlan(host):
interface = host.interface('dummy2.42')
assert interface.exists
assert '192.168.35.1' in interface.addresses
assert host.file('/sys/class/net/dummy2.42/lower_dummy2').exists
# FIXME(bbezak): remove following IF after ansible-role-interfaces
# receive support for custom routes in NetworkManager
if not ('centos' in host.system_info.distribution.lower() or
'rocky' in host.system_info.distribution.lower()):
routes = host.check_output(
'/sbin/ip route show dev dummy2.42 table kayobe-test-route-table')
assert '192.168.40.0/24 via 192.168.35.254' in routes
rules = host.check_output(
'/sbin/ip rule show table kayobe-test-route-table')
expected = 'from 192.168.35.0/24 lookup kayobe-test-route-table'
assert expected in rules
def test_network_bridge(host):
interface = host.interface('br0')
assert interface.exists
assert '192.168.36.1' in interface.addresses
stp_status = host.file('/sys/class/net/br0/bridge/stp_state').content_string.strip()
assert '0' == stp_status
ports = ['dummy3', 'dummy4']
sys_ports = host.check_output('ls -1 /sys/class/net/br0/brif')
assert sys_ports == "\n".join(ports)
for port in ports:
interface = host.interface(port)
assert interface.exists
v4_addresses = [a for a in interface.addresses
if ipaddress.ip_address(a).version == '4']
assert not v4_addresses
def test_network_bridge_vlan(host):
interface = host.interface('br0.43')
assert interface.exists
assert '192.168.37.1' in interface.addresses
assert host.file('/sys/class/net/br0.43/lower_br0').exists
def test_network_bond(host):
interface = host.interface('bond0')
assert interface.exists
assert '192.168.38.1' in interface.addresses
sys_slaves = host.check_output('cat /sys/class/net/bond0/bonding/slaves')
# Ordering is not guaranteed, so compare sets.
sys_slaves = set(sys_slaves.split())
slaves = set(['dummy5', 'dummy6'])
assert sys_slaves == slaves
for slave in slaves:
interface = host.interface(slave)
assert interface.exists
assert not interface.addresses
def test_network_bond_vlan(host):
interface = host.interface('bond0.44')
assert interface.exists
assert '192.168.39.1' in interface.addresses
assert host.file('/sys/class/net/bond0.44/lower_bond0').exists
def test_network_bridge_no_ip(host):
interface = host.interface('br1')
assert interface.exists
assert not '192.168.40.1' in interface.addresses
stp_status = host.file('/sys/class/net/br1/bridge/stp_state').content_string.strip()
assert '1' == stp_status
@pytest.mark.skipif(not _is_apt(),
reason="systemd-networkd VLANs only supported on Ubuntu")
def test_network_systemd_vlan(host):
interface = host.interface('vlan45')
assert interface.exists
assert '192.168.41.1' in interface.addresses
def test_additional_user_account(host):
user = host.user("kayobe-test-user")
assert user.name == "kayobe-test-user"
assert user.group == "kayobe-test-user"
assert set(user.groups) == {"kayobe-test-user", "stack"}
assert user.gecos == "Kayobe test user"
with host.sudo():
assert user.password == 'kayobe-test-user-password'
def test_software_RAID(host):
slaves = host.check_output("ls -1 /sys/class/block/md0/slaves/")
assert slaves == "loop0\nloop1"
def test_luks(host):
# blkid returns an emptry string without root permissions
with host.sudo():
blkid = host.check_output('blkid /dev/md0')
assert 'TYPE="crypto_LUKS"' in blkid
def test_sysctls(host):
assert host.sysctl("fs.mount-max") == 99999
def test_cloud_init_is_disabled(host):
assert host.file("/etc/cloud/cloud-init.disabled").exists
def test_docker_storage_driver_is_overlay2(host):
with host.sudo("stack"):
info = host.check_output("docker info")
assert "overlay2" in info
@pytest.mark.parametrize('user', ['kolla', 'stack'])
def test_docker_image_download(host, user):
with host.sudo(user):
host.check_output("docker pull alpine")
@pytest.mark.parametrize('user', ['kolla', 'stack'])
def test_docker_container_run(host, user):
with host.sudo(user):
host.check_output("docker run --rm alpine /bin/true")
def test_timezone(host):
status = host.check_output("timedatectl status")
assert "Pacific/Honolulu" in status
def test_ntp_alternative_services_disabled(host):
# Tests that we don't have any conflicting NTP servers running
# NOTE(wszumski): We always mask services even if they don't exist
ntpd_service = host.service("ntp")
assert ntpd_service.is_masked
assert not ntpd_service.is_running
timesyncd_service = host.service("systemd-timesyncd")
assert timesyncd_service.is_masked
assert not timesyncd_service.is_running
def test_ntp_running(host):
# Tests that NTP services are enabled and running
assert host.package("chrony").is_installed
assert host.service("chronyd").is_enabled
assert host.service("chronyd").is_running
def test_ntp_non_default_time_server(host):
# Tests that the NTP pool has been changed from pool.ntp.org to
# time.cloudflare.com
if ('centos' in host.system_info.distribution.lower() or
'rocky' in host.system_info.distribution.lower()):
chrony_config = host.file("/etc/chrony.conf")
else:
# Debian based distributions use the following path
chrony_config = host.file("/etc/chrony/chrony.conf")
assert chrony_config.exists
assert "time.cloudflare.com" in chrony_config.content_string
def test_ntp_clock_synchronized(host):
# Tests that the clock is synchronized
status_output = host.check_output("timedatectl status")
assert "synchronized: yes" in status_output
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
def test_apt_config(host):
apt_config = host.file("/etc/apt/apt.conf.d/99retries")
assert apt_config.exists
assert apt_config.content_string == "Acquire::Retries 1;\n"
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
def test_apt_custom_package_repository_is_available(host):
with host.sudo():
host.check_output("apt -y install td-agent")
assert host.package("td-agent").is_installed
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
def test_apt_auth(host):
apt_auth = host.file("/etc/apt/auth.conf.d/test.conf")
assert apt_auth.exists
with host.sudo():
auth_lines = apt_auth.content_string.splitlines()
assert "machine https://apt.example.com" in auth_lines
assert "login foo" in auth_lines
assert "password bar" in auth_lines
@pytest.mark.parametrize('repo', ["appstream", "baseos", "extras", "epel"])
@pytest.mark.skipif(not _is_dnf_mirror(), reason="DNF OpenDev mirror only for CentOS 8")
def test_dnf_local_package_mirrors(host, repo):
# Depends on SITE_MIRROR_FQDN environment variable.
assert os.getenv('SITE_MIRROR_FQDN')
# NOTE(mgoddard): Should not require sudo but some files
# (/var/cache/dnf/expired_repos.json) can have incorrect permissions.
# https://bugzilla.redhat.com/show_bug.cgi?id=1636909
with host.sudo():
info = host.check_output("dnf repoinfo %s", repo)
assert os.getenv('SITE_MIRROR_FQDN') in info
@pytest.mark.skipif(not _is_dnf(), reason="DNF only supported on CentOS/Rocky")
def test_dnf_custom_package_repository_is_available(host):
with host.sudo():
host.check_output("dnf -y install td-agent")
assert host.package("td-agent").is_installed
@pytest.mark.skipif(not _is_dnf(), reason="DNF only supported on CentOS/Rocky")
def test_dnf_automatic(host):
assert host.package("dnf-automatic").is_installed
assert host.service("dnf-automatic.timer").is_enabled
assert host.service("dnf-automatic.timer").is_running
@pytest.mark.skipif(not _is_dnf(),
reason="tuned profile setting only supported on CentOS/Rocky")
def test_tuned_profile_is_active(host):
tuned_output = host.check_output("tuned-adm active")
assert "throughput-performance" in tuned_output
def test_firewalld_running(host):
assert host.package("firewalld").is_installed
assert host.service("firewalld.service").is_enabled
assert host.service("firewalld.service").is_running
def test_firewalld_zones(host):
# Verify that interfaces are on correct zones.
expected_zones = {
'dummy2.42': 'test-zone1',
'br0': 'test-zone2',
'br0.43': 'test-zone3',
'bond0': 'test-zone3',
'bond0.44': 'public'
}
for interface, expected_zone in expected_zones.items():
with host.sudo():
zone = host.check_output(
"firewall-cmd --get-zone-of-interface %s", interface)
assert zone == expected_zone
zone = host.check_output(
"firewall-cmd --permanent --get-zone-of-interface %s",
interface)
assert zone == expected_zone
def test_firewalld_rules(host):
# Verify that expected rules are present.
expected_info = {
'test-zone1': [
' services: ',
' ports: 8080/tcp',
' icmp-blocks: ',
],
'test-zone2': [
' services: http',
' ports: ',
' icmp-blocks: ',
],
'test-zone3': [
' services: ',
' ports: ',
' icmp-blocks: echo-request',
],
'public': [
' services: dhcpv6-client ssh',
' ports: ',
' icmp-blocks: ',
],
}
for zone, expected_lines in expected_info.items():
with host.sudo():
info = host.check_output(
"firewall-cmd --info-zone %s", zone)
info = info.splitlines()
perm_info = host.check_output(
"firewall-cmd --permanent --info-zone %s", zone)
perm_info = perm_info.splitlines()
for expected_line in expected_lines:
assert expected_line in info
assert expected_line in perm_info
@pytest.mark.skipif(not _is_dnf(),
reason="SELinux only supported on CentOS/Rocky")
def test_selinux(host):
selinux = host.check_output("sestatus")
selinux = selinux.splitlines()
# Remove duplicate whitespace characters in output
selinux = [" ".join(x.split()) for x in selinux]
assert "SELinux status: enabled" in selinux
assert "Current mode: permissive" in selinux
assert "Mode from config file: permissive" in selinux
def test_swap(host):
swapon = host.check_output("swapon -s")
swapon = swapon.splitlines()
assert len(swapon) > 1
swap_devs = [swap.split()[0] for swap in swapon[1:]]
assert "/swapfile" in swap_devs