# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from cloudpulse.openstack.api import keystone_session from cloudpulse.openstack.api.nova_api import NovaHealth from cloudpulse.operator.ansible.openstack_node_info_reader import \ openstack_node_info_reader from cloudpulse.scenario import base import errno import os from oslo_config import cfg from oslo_utils import importutils import re import shlex import simplejson from subprocess import PIPE from subprocess import Popen TESTS_OPTS = [ cfg.StrOpt('operator_setup_file', default='/etc/cloudpulse/openstack_config.yaml', help='Setup File for the operator'), cfg.BoolOpt('containerized', default=True, help='enable if the processes are running as containers'), cfg.StrOpt('rabbit_container', default='rabbitmq', help='name of the rabitmq container'), cfg.StrOpt('galera_container', default='mariadb', help='name of the galera cluster container'), cfg.StrOpt('ceph_container', default='ceph', help='name of the ceph cluster container'), ] PERIODIC_TESTS_OPTS = [ cfg.IntOpt('rabbitmq_check', default=0, help='The rabbitmq periodic check'), cfg.IntOpt('galera_check', default=0, help='The galera periodic check'), cfg.IntOpt('ceph_check', default=0, help='The ceph periodic check'), cfg.IntOpt('docker_check', default=0, help='The docker periodic check'), cfg.IntOpt('node_check', default=0, help='The Node Check peiodic check'), cfg.IntOpt('all_operator_tests', default=0, help='Run all operator tests') ] CONF = cfg.CONF operator_test_group = cfg.OptGroup(name='operator_test', title='Options for the Operators') CONF.register_group(operator_test_group) CONF.register_opts(TESTS_OPTS, operator_test_group) periodic_test_group = cfg.OptGroup(name='periodic_tests', title='Periodic tests to be run') CONF.register_opts(PERIODIC_TESTS_OPTS, periodic_test_group) cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token', group='keystone_authtoken') def execute(command): try: command = shlex.split(command) stdout = None stderr = None p = Popen(command, shell=False, stdout=PIPE, stderr=PIPE, bufsize=-1, env=os.environ, close_fds=True) stdout, stderr = p.communicate() except OSError as e: if e.errno == errno.ENOENT: return {'status': 127, 'output': ""} else: return {'status': 126, 'output': ""} if p.returncode == 126 or p.returncode == 127: stdout = b'' return {'status': p.returncode, 'output': stdout.decode('utf-8')} def get_container_name(name): cmd = "ansible -o all -i 127.0.0.1, -a 'docker ps' -u root" op = execute(cmd) if op['status']: return None dockerps = op['output'].split('\\n') for line in dockerps: if name in line: linear = line.split() return linear[len(linear) - 1].strip('\n') return None def create_error_msg(error_msg, reason_msg, error_hosts): if error_msg: error_msg = "{}; {}: {}".format(error_msg, ', '.join(error_hosts), reason_msg) else: error_msg = "{}: {}".format(', '.join(error_hosts), reason_msg) return error_msg class operator_scenario(base.Scenario): def _get_keystone_session_creds(self): creds = {'session': keystone_session._get_kssession(), 'endpoint_type': 'internalURL' } return creds def _get_nova_hypervior_list(self): importutils.import_module('keystonemiddleware.auth_token') creds = self._get_keystone_session_creds() nova = NovaHealth(creds) return nova.nova_hypervisor_list() def load(self): self.os_node_info_obj = openstack_node_info_reader( cfg.CONF.operator_test.operator_setup_file) def is_metric_pool(self, is_containerized, ceph_json): if 'HEALTH_WARN' not in ceph_json['health']['status']: return False checks = ceph_json['health']['checks'] err = checks.get('MANY_OBJECTS_PER_PG', None) if (not err or 'HEALTH_WARN' not in err.get('severity', None) or ('1 pools have many more objects per pg than average' not in err['summary']['message'])): return False cmd = (r"ceph df") if is_containerized: ceph_container = get_container_name("cephmon") cmd = ("'docker exec %s %s'" % (ceph_container, cmd)) cmd = "ansible -o all -u root -i 127.0.0.1, -a " + cmd + ' -u root' ret = execute(cmd) if ret['status']: return False for line in ret['output'].split('\\n'): if 'metrics' not in line: continue try: num_obj = int(line.split()[-1]) except ValueError: return False if num_obj > 1000: return True return False @base.scenario(admin_only=False, operator=True) def rabbitmq_check(self): self.load() anscmd = "ansible -o all -i 127.0.0.1, -a " cmd = "rabbitmqctl cluster_status -q" is_containerized = cfg.CONF.operator_test.containerized if is_containerized: rabbit_container = get_container_name('rabbitmq') cmd = ("'docker exec %s %s'" % (rabbit_container, cmd)) cmd = anscmd + cmd + " -u root " res = execute(cmd) if not res['status']: node_status = res['output'] node_status_string = node_status.replace('\\n', '') node_status_string = node_status_string.replace(' ', '') nodes = [] running = [] mathobj = re.search( r'nodes,\[{disc,\[(.*?)\]', node_status_string, re.M | re.I) if mathobj: nodes = [x.rstrip(" ").lstrip(" ").rstrip("'").lstrip("'") for x in mathobj.group(1).split(",")] mathobj = re.search( r'running_nodes,\[(.*?)\]}', node_status_string, re.M | re.I) if mathobj: running = [x.rstrip(" ").lstrip(" ").rstrip("'").lstrip("'") for x in mathobj.group(1).split(",")] diffnodes = list(set(nodes) - set(running)) if diffnodes: return (404, ("Failed Nodes : %s" % str(diffnodes))) else: return (200, "Running Nodes : %s" % str(nodes), ['RabbitMQ-server Running']) else: return (404, ("RabbitMQ-server test failed :%s" % "rabbitmq-service is down", [])) @base.scenario(admin_only=False, operator=True) def galera_check(self): self.load() anscmd = "ansible -o all -i 127.0.0.1, -a " galera = self.os_node_info_obj.get_galera_details() cmd = ((r'mysql -u %s -p%s -e "SHOW STATUS;"') % (galera['username'], galera['password'])) is_containerized = cfg.CONF.operator_test.containerized if is_containerized: galera_container = get_container_name('mariadb') cmd = ("'docker exec %s %s'" % (galera_container, cmd)) cmd = anscmd + cmd + ' -u root' res = execute(cmd) if not res['status']: galera_status = res['output'] if 'wsrep_incoming_addresses' not in galera_status: return (404, ("Galera Cluster Test Failed: %s" % "Invalid cluster status", [])) galera_status_string = galera_status.replace('\\n', '') mathobj = re.search(r'wsrep_incoming_addresses\s+(.*?)wsrep.*$', galera_status_string, re.M | re.I) nodes = mathobj.group(1) return (200, "Active Nodes : %s" % nodes, ['Galera Cluster Test Passed']) else: return (404, ("Galera Cluster Test Failed: %s" % "service access failed", [])) @base.scenario(admin_only=False, operator=True) def docker_check(self): self.load() node_list = self.os_node_info_obj.get_host_list() nodeip_list = [node.ip for node in node_list] anscmd = "ansible -o all -i %s -a " % ','.join(nodeip_list) cmd = "'docker ps -a --format \{\{.Names\}\} --filter %s '" \ % "status=exited" cmd = anscmd + cmd + ' -u root' res = execute(cmd) docker_down = [] docker_down_msg = "Docker daemon down" ssh_failed = [] ssh_failed_msg = "Host(s) unreachable via SSH" unknown_status = [] unknown_status_msg = "Failure reason unknown" container_exited = "" docker_failed = "" res['output'] = res['output'].split('\n') output = [x for x in res['output'] if not re.match(r'^\s*$', x)] for line in output: line = line.split('|') if 'FAILED' in line[1]: docker_down.append(line[0].strip()) elif 'UNREACHABLE' in line[1]: ssh_failed.append(line[0].strip()) elif any(status in line[1] for status in ['SUCCESS', 'CHANGED']): if len(line) < 3: continue line[3] = line[3].replace(' ', '') line[3] = line[3].replace('(stdout)', '') if not re.match(r'^\s*$', line[3]): line[3] = line[3].replace('\\n', ', ') if container_exited: container_exited = "{}; {}: {}"\ .format(container_exited, line[0].strip(), line[3].strip()) else: container_exited = "{}: {}".format(line[0].strip(), line[3].strip()) else: unknown_status.append(line[0].strip()) if container_exited: docker_failed = container_exited if docker_down: docker_failed = create_error_msg(docker_failed, docker_down_msg, docker_down) if ssh_failed: docker_failed = create_error_msg(docker_failed, ssh_failed_msg, ssh_failed) if unknown_status: docker_failed = create_error_msg(docker_failed, unknown_status_msg, unknown_status) if docker_failed: return (404, docker_failed, []) else: return (200, "All docker containers are up", ['Docker container Test Passed']) @base.scenario(admin_only=False, operator=True) def ceph_check(self): self.load() storage_nodes_from_ansible_config = [node.name.lower() for node in self.os_node_info_obj .get_host_list() if "block_storage" in node.role.split()] if storage_nodes_from_ansible_config: cmd = (r"ceph -f json status") is_containerized = cfg.CONF.operator_test.containerized if is_containerized: ceph_container = get_container_name("cephmon") cmd = ("'docker exec %s %s'" % (ceph_container, cmd)) anscmd = "ansible -o all -i 127.0.0.1, -a " cmd = anscmd + cmd + ' -u root' res = execute(cmd) if not res['status']: ceph_status = res['output'] ceph_status = ceph_status.replace('\n', '') ceph_data = ceph_status.split('|') ceph_str = ceph_data[3].replace(' (stdout) ', '') \ .replace('\\n', '') ceph_json = simplejson.loads(ceph_str) # Handle ceph status in luminous, result should be picked form # 'status' instead of 'overall_status' if len(ceph_json['health']['summary']) and \ 'summary' in list(ceph_json['health']['summary'][0].keys()) \ and 'mon health preluminous compat warning' in \ ceph_json['health']['summary'][0]['summary']: overall_status = ceph_json['health']['status'] if ('HEALTH_WARN' in overall_status and self.is_metric_pool(is_containerized, ceph_json)): overall_status = 'HEALTH_OK' else: overall_status = ceph_json['health']['overall_status'] num_of_osd = ceph_json['osdmap']['osdmap']['num_osds'] num_up_osds = ceph_json['osdmap']['osdmap']['num_up_osds'] if overall_status == 'HEALTH_OK': return (200, "Overall Status = %s, " "Cluster status = %s/%s" % (overall_status, num_up_osds, num_of_osd)) else: return (404, "Overall Status = %s, " "Cluster status = %s/%s" % (overall_status, num_up_osds, num_of_osd)) else: return (300, ("Ceph cluster test skipped " "as no dedicated storage found")) @base.scenario(admin_only=False, operator=True) def node_check(self): failed_hosts = [] self.load() nodes_from_ansible_config = [node.name for node in self.os_node_info_obj.get_host_list() if "compute" in node.role.split()] nova_hypervisor_list = self._get_nova_hypervior_list() if nova_hypervisor_list[0] != 200: return (404, ("Cannot get hypervisor list from " "Nova reason-%sa") % nova_hypervisor_list[1]) nodes_from_nova = [node for node in nova_hypervisor_list[2]] anscmd = ("ansible -o all -i '%s' -m ping -u root" % ','.join(nodes_from_ansible_config)) res = execute(anscmd) res['output'] = res['output'].split('\n') output = filter(lambda x: not re.match(r'^\s*$', x), res['output']) for line in output: if "SUCCESS" not in line: failed_hosts.append(line.split('|')[0].strip()) # Check if ansible ping cmd failed with reason other than unreachable # nodes if res['status'] and not failed_hosts: return (404, "Unable to perform ping test on nodes, ansible cmd: " "'%s' failed" % anscmd) # Check if nova also recognizes that passed nodes were up nova_failed_hosts = [node for node in nodes_from_ansible_config if node not in nodes_from_nova] failed_hosts = list(set(nova_failed_hosts + failed_hosts)) if not failed_hosts: return (200, "All nodes are up. nova hypervisor list = %s" % ', '.join(nodes_from_nova)) else: msg = ("The following nodes are down: %s. nova hypervisor list = " "%s" % (', '.join(failed_hosts), ', '.join(nodes_from_nova))) return (404, msg) @base.scenario(admin_only=False, operator=True) def all_operator_tests(self): test_list = [func for func in dir(self) if base.Scenario.is_scenario( self, func) if not func.startswith( '__') if not func.startswith('all')] result = 200 resultmsg = "" for func in test_list: funccall = getattr(self, func) try: funres = funccall() except Exception as e: funres = [404, str(e)] # Catch all status other than success and skipped if funres[0] not in [200, 300]: resultmsg += ("%s failed: %s\n" % (func, funres[1])) result = 404 if not resultmsg: resultmsg = "All Tests passed" return (result, resultmsg)