cloudpulse/cloudpulse/scenario/plugins/operator_tests/operator.py

437 lines
17 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudpulse.openstack.api import keystone_session
from cloudpulse.openstack.api.nova_api import NovaHealth
from cloudpulse.operator.ansible.openstack_node_info_reader import \
openstack_node_info_reader
from cloudpulse.scenario import base
import errno
import os
from oslo_config import cfg
from oslo_utils import importutils
import re
import shlex
import simplejson
from subprocess import PIPE
from subprocess import Popen
TESTS_OPTS = [
cfg.StrOpt('operator_setup_file',
default='/etc/cloudpulse/openstack_config.yaml',
help='Setup File for the operator'),
cfg.BoolOpt('containerized',
default=True,
help='enable if the processes are running as containers'),
cfg.StrOpt('rabbit_container',
default='rabbitmq',
help='name of the rabitmq container'),
cfg.StrOpt('galera_container',
default='mariadb',
help='name of the galera cluster container'),
cfg.StrOpt('ceph_container',
default='ceph',
help='name of the ceph cluster container'),
]
PERIODIC_TESTS_OPTS = [
cfg.IntOpt('rabbitmq_check',
default=0,
help='The rabbitmq periodic check'),
cfg.IntOpt('galera_check',
default=0,
help='The galera periodic check'),
cfg.IntOpt('ceph_check',
default=0,
help='The ceph periodic check'),
cfg.IntOpt('docker_check',
default=0,
help='The docker periodic check'),
cfg.IntOpt('node_check',
default=0,
help='The Node Check peiodic check'),
cfg.IntOpt('all_operator_tests',
default=0,
help='Run all operator tests')
]
CONF = cfg.CONF
operator_test_group = cfg.OptGroup(name='operator_test',
title='Options for the Operators')
CONF.register_group(operator_test_group)
CONF.register_opts(TESTS_OPTS, operator_test_group)
periodic_test_group = cfg.OptGroup(name='periodic_tests',
title='Periodic tests to be run')
CONF.register_opts(PERIODIC_TESTS_OPTS, periodic_test_group)
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
def execute(command):
try:
command = shlex.split(command)
stdout = None
stderr = None
p = Popen(command, shell=False, stdout=PIPE,
stderr=PIPE,
bufsize=-1, env=os.environ, close_fds=True)
stdout, stderr = p.communicate()
except OSError as e:
if e.errno == errno.ENOENT:
return {'status': 127, 'output': ""}
else:
return {'status': 126, 'output': ""}
if p.returncode == 126 or p.returncode == 127:
stdout = str(b"")
return {'status': p.returncode, 'output': stdout}
def get_container_name(name):
cmd = "ansible -o all -i 127.0.0.1, -a 'docker ps' -u root"
op = execute(cmd)
if op['status']:
return None
dockerps = op['output'].split('\\n')
for line in dockerps:
if name in line:
linear = line.split()
return linear[len(linear) - 1].strip('\n')
return None
def create_error_msg(error_msg, reason_msg, error_hosts):
if error_msg:
error_msg = "{}; {}: {}".format(error_msg, ', '.join(error_hosts),
reason_msg)
else:
error_msg = "{}: {}".format(', '.join(error_hosts), reason_msg)
return error_msg
class operator_scenario(base.Scenario):
def _get_keystone_session_creds(self):
creds = {'session': keystone_session._get_kssession(),
'endpoint_type': 'internalURL'
}
return creds
def _get_nova_hypervior_list(self):
importutils.import_module('keystonemiddleware.auth_token')
creds = self._get_keystone_session_creds()
nova = NovaHealth(creds)
return nova.nova_hypervisor_list()
def load(self):
self.os_node_info_obj = openstack_node_info_reader(
cfg.CONF.operator_test.operator_setup_file)
def is_metric_pool(self, is_containerized, ceph_json):
if 'HEALTH_WARN' not in ceph_json['health']['status']:
return False
checks = ceph_json['health']['checks']
err = checks.get('MANY_OBJECTS_PER_PG', None)
if (not err or 'HEALTH_WARN' not in err.get('severity', None) or
('1 pools have many more objects per pg than average' not in
err['summary']['message'])):
return False
cmd = (r"ceph df")
if is_containerized:
ceph_container = get_container_name("cephmon")
cmd = ("'docker exec %s %s'" % (ceph_container, cmd))
cmd = "ansible -o all -u root -i 127.0.0.1, -a " + cmd + ' -u root'
ret = execute(cmd)
if ret['status']:
return False
for line in ret['output'].split('\\n'):
if 'metrics' not in line:
continue
try:
num_obj = int(line.split()[-1])
except ValueError:
return False
if num_obj > 1000:
return True
return False
@base.scenario(admin_only=False, operator=True)
def rabbitmq_check(self):
self.load()
anscmd = "ansible -o all -i 127.0.0.1, -a "
cmd = "rabbitmqctl cluster_status -q"
is_containerized = cfg.CONF.operator_test.containerized
if is_containerized:
rabbit_container = get_container_name('rabbitmq')
cmd = ("'docker exec %s %s'" % (rabbit_container, cmd))
cmd = anscmd + cmd + " -u root "
res = execute(cmd)
if not res['status']:
node_status = res['output']
node_status_string = node_status.replace('\\n', '')
node_status_string = node_status_string.replace(' ', '')
nodes = []
running = []
mathobj = re.search(
r'nodes,\[{disc,\[(.*?)\]', node_status_string, re.M | re.I)
if mathobj:
nodes = [x.rstrip(" ").lstrip(" ").rstrip("'").lstrip("'")
for x in mathobj.group(1).split(",")]
mathobj = re.search(
r'running_nodes,\[(.*?)\]}', node_status_string, re.M | re.I)
if mathobj:
running = [x.rstrip(" ").lstrip(" ").rstrip("'").lstrip("'")
for x in mathobj.group(1).split(",")]
diffnodes = list(set(nodes) - set(running))
if diffnodes:
return (404, ("Failed Nodes : %s" %
str(diffnodes)))
else:
return (200, "Running Nodes : %s" % str(nodes),
['RabbitMQ-server Running'])
else:
return (404, ("RabbitMQ-server test failed :%s" %
"rabbitmq-service is down", []))
@base.scenario(admin_only=False, operator=True)
def galera_check(self):
self.load()
anscmd = "ansible -o all -i 127.0.0.1, -a "
galera = self.os_node_info_obj.get_galera_details()
cmd = ((r'mysql -u %s -p%s -e "SHOW STATUS;"') %
(galera['username'], galera['password']))
is_containerized = cfg.CONF.operator_test.containerized
if is_containerized:
galera_container = get_container_name('mariadb')
cmd = ("'docker exec %s %s'" % (galera_container, cmd))
cmd = anscmd + cmd + ' -u root'
res = execute(cmd)
if not res['status']:
galera_status = res['output']
if 'wsrep_incoming_addresses' not in galera_status:
return (404, ("Galera Cluster Test Failed: %s" %
"Invalid cluster status", []))
galera_status_string = galera_status.replace('\\n', '')
mathobj = re.search(r'wsrep_incoming_addresses\s+(.*?)wsrep.*$',
galera_status_string, re.M | re.I)
nodes = mathobj.group(1)
return (200, "Active Nodes : %s" % nodes,
['Galera Cluster Test Passed'])
else:
return (404, ("Galera Cluster Test Failed: %s" %
"service access failed", []))
@base.scenario(admin_only=False, operator=True)
def docker_check(self):
self.load()
node_list = self.os_node_info_obj.get_host_list()
nodeip_list = [node.ip for node in node_list]
anscmd = "ansible -o all -i %s -a " % ','.join(nodeip_list)
cmd = "'docker ps -a --format \{\{.Names\}\} --filter %s '" \
% "status=exited"
cmd = anscmd + cmd + ' -u root'
res = execute(cmd)
docker_down = []
docker_down_msg = "Docker daemon down"
ssh_failed = []
ssh_failed_msg = "Host(s) unreachable via SSH"
unknown_status = []
unknown_status_msg = "Failure reason unknown"
container_exited = ""
docker_failed = ""
res['output'] = res['output'].split('\n')
output = [x for x in res['output'] if not re.match(r'^\s*$', x)]
for line in output:
line = line.split('|')
if 'FAILED' in line[1]:
docker_down.append(line[0].strip())
elif 'UNREACHABLE' in line[1]:
ssh_failed.append(line[0].strip())
elif any(status in line[1] for status in ['SUCCESS', 'CHANGED']):
if len(line) < 3:
continue
line[3] = line[3].replace(' ', '')
line[3] = line[3].replace('(stdout)', '')
if not re.match(r'^\s*$', line[3]):
line[3] = line[3].replace('\\n', ', ')
if container_exited:
container_exited = "{}; {}: {}"\
.format(container_exited,
line[0].strip(),
line[3].strip())
else:
container_exited = "{}: {}".format(line[0].strip(),
line[3].strip())
else:
unknown_status.append(line[0].strip())
if container_exited:
docker_failed = container_exited
if docker_down:
docker_failed = create_error_msg(docker_failed, docker_down_msg,
docker_down)
if ssh_failed:
docker_failed = create_error_msg(docker_failed, ssh_failed_msg,
ssh_failed)
if unknown_status:
docker_failed = create_error_msg(docker_failed, unknown_status_msg,
unknown_status)
if docker_failed:
return (404, docker_failed, [])
else:
return (200, "All docker containers are up",
['Docker container Test Passed'])
@base.scenario(admin_only=False, operator=True)
def ceph_check(self):
self.load()
storage_nodes_from_ansible_config = [node.name.lower()
for node in
self.os_node_info_obj
.get_host_list()
if "block_storage" in
node.role.split()]
if storage_nodes_from_ansible_config:
cmd = (r"ceph -f json status")
is_containerized = cfg.CONF.operator_test.containerized
if is_containerized:
ceph_container = get_container_name("cephmon")
cmd = ("'docker exec %s %s'" % (ceph_container, cmd))
anscmd = "ansible -o all -i 127.0.0.1, -a "
cmd = anscmd + cmd + ' -u root'
res = execute(cmd)
if not res['status']:
ceph_status = res['output']
ceph_status = ceph_status.replace('\n', '')
ceph_data = ceph_status.split('|')
ceph_str = ceph_data[3].replace(' (stdout) ', '') \
.replace('\\n', '')
ceph_json = simplejson.loads(ceph_str)
# Handle ceph status in luminous, result should be picked form
# 'status' instead of 'overall_status'
if len(ceph_json['health']['summary']) and \
'summary' in list(ceph_json['health']['summary'][0].keys()) \
and 'mon health preluminous compat warning' in \
ceph_json['health']['summary'][0]['summary']:
overall_status = ceph_json['health']['status']
if ('HEALTH_WARN' in overall_status and
self.is_metric_pool(is_containerized, ceph_json)):
overall_status = 'HEALTH_OK'
else:
overall_status = ceph_json['health']['overall_status']
num_of_osd = ceph_json['osdmap']['osdmap']['num_osds']
num_up_osds = ceph_json['osdmap']['osdmap']['num_up_osds']
if overall_status == 'HEALTH_OK':
return (200, "Overall Status = %s, "
"Cluster status = %s/%s" %
(overall_status, num_up_osds, num_of_osd))
else:
return (404, "Overall Status = %s, "
"Cluster status = %s/%s" %
(overall_status, num_up_osds, num_of_osd))
else:
return (300, ("Ceph cluster test skipped "
"as no dedicated storage found"))
@base.scenario(admin_only=False, operator=True)
def node_check(self):
failed_hosts = []
self.load()
nodes_from_ansible_config = [node.name for node in
self.os_node_info_obj.get_host_list()
if "compute" in node.role.split()]
nova_hypervisor_list = self._get_nova_hypervior_list()
if nova_hypervisor_list[0] != 200:
return (404, ("Cannot get hypervisor list from "
"Nova reason-%sa") % nova_hypervisor_list[1])
nodes_from_nova = [node for node in nova_hypervisor_list[2]]
anscmd = ("ansible -o all -i '%s' -m ping -u root" %
','.join(nodes_from_ansible_config))
res = execute(anscmd)
res['output'] = res['output'].split('\n')
output = filter(lambda x: not re.match(r'^\s*$', x), res['output'])
for line in output:
if "SUCCESS" not in line:
failed_hosts.append(line.split('|')[0].strip())
# Check if ansible ping cmd failed with reason other than unreachable
# nodes
if res['status'] and not failed_hosts:
return (404, "Unable to perform ping test on nodes, ansible cmd: "
"'%s' failed" % anscmd)
# Check if nova also recognizes that passed nodes were up
nova_failed_hosts = [node for node in nodes_from_ansible_config if
node not in nodes_from_nova]
failed_hosts = list(set(nova_failed_hosts + failed_hosts))
if not failed_hosts:
return (200, "All nodes are up. nova hypervisor list = %s" %
', '.join(nodes_from_nova))
else:
msg = ("The following nodes are down: %s. nova hypervisor list = "
"%s" % (', '.join(failed_hosts),
', '.join(nodes_from_nova)))
return (404, msg)
@base.scenario(admin_only=False, operator=True)
def all_operator_tests(self):
test_list = [func for func in dir(self) if base.Scenario.is_scenario(
self, func) if not func.startswith(
'__') if not func.startswith('all')]
result = 200
resultmsg = ""
for func in test_list:
funccall = getattr(self, func)
try:
funres = funccall()
except Exception as e:
funres = [404, str(e)]
# Catch all status other than success and skipped
if funres[0] not in [200, 300]:
resultmsg += ("%s failed: %s\n" % (func, funres[1]))
result = 404
if not resultmsg:
resultmsg = "All Tests passed"
return (result, resultmsg)