Upload system tests results to TestRail

Add reporter which uploads tests reports from
Jenkins to TestRail.
Script takes a name of swarm runner job in
Jenkins (e.g. -j "6.1.swarm.runner") or swarm
view (e.g. -w "6.0_swarm") as parameter. By
default it gets results from latest runner build,
but build number can be specified manually.

Change-Id: I9943f2de047f831b72b2233b89cf07626b15bd37
Partial-bug: #1415467
This commit is contained in:
Artem Panchenko 2015-02-09 09:48:18 +02:00
parent 672cf552ed
commit 42a19dbf4d
2 changed files with 394 additions and 0 deletions

View File

@ -0,0 +1,127 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import urllib2
from settings import JENKINS
from settings import logger
def get_jobs_for_view(view):
"""Return list of jobs from specified view
"""
view_url = "/".join([JENKINS["url"], 'view', view, 'api/json'])
logger.debug("Request view data from {}".format(view_url))
req = urllib2.Request(view_url)
opener = urllib2.build_opener(urllib2.HTTPHandler)
s = opener.open(req).read()
opener.close()
view_data = json.loads(s)
jobs = [job["name"] for job in view_data["jobs"]]
return jobs
class Build():
def __init__(self, name, number):
"""Get build info via Jenkins API, get test info via direct HTTP
request.
If number is 'latest', get latest completed build.
"""
self.name = name
if number == 'latest':
job_info = self.get_job_info(depth=0)
self.number = job_info["lastCompletedBuild"]["number"]
else:
self.number = int(number)
self.build_data = self.get_build_data(depth=0)
self.url = self.build_data["url"]
def get_job_info(self, depth=1):
job_url = "/".join([JENKINS["url"], 'job', self.name,
'api/json?depth={depth}'.format(depth=depth)])
logger.debug("Request job info from {}".format(job_url))
return json.load(urllib2.urlopen(job_url))
def get_build_data(self, depth=1):
build_url = "/".join([JENKINS["url"], 'job',
self.name,
str(self.number),
'api/json?depth={depth}'.format(depth=depth)])
logger.debug("Request build data from {}".format(build_url))
return json.load(urllib2.urlopen(build_url))
def get_test_data(self, url):
test_url = "/".join([url.rstrip("/"), 'testReport', 'api/json'])
logger.debug("Request test data from {}".format(test_url))
response = urllib2.urlopen(test_url)
return json.load(response)
def test_data(self):
try:
data = self.get_test_data(self.url)
except Exception as e:
logger.warning("No test data for {0}: {1}".format(
self.url,
e,
))
# If we failed to get any tests for the build, return
# meta test case 'jenkins' with status 'failed'.
data = {
"suites": [
{
"cases": [
{
"name": "jenkins",
"className": "jenkins",
"status": "failed",
"duration": 0
}
]
}
]
}
return data
def __str__(self):
string = "\n".join([
"{0}: {1}".format(*item) for item in self.build_record()
])
return string
def build_record(self):
"""Return list of pairs.
We cannot use dictionary, because columns are ordered.
"""
data = [
('number', str(self.number)),
('id', self.build_data["id"]),
('description', self.build_data["description"]),
('url', self.build_data["url"]),
]
test_data = self.test_data()
for suite in test_data['suites']:
for case in suite['cases']:
column_id = case['className'].lower().replace("_", "-")
data.append((column_id, case['status'].lower()))
return data

267
fuelweb_test/testrail/report.py Executable file
View File

@ -0,0 +1,267 @@
#!/usr/bin/env python
#
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import re
from logging import DEBUG
from optparse import OptionParser
from builds import Build
from builds import get_jobs_for_view
from settings import logger
from settings import TestRailSettings
from testrail_client import TestRailProject
class TestResult():
def __init__(self, name, group, status, duration, url=None,
version=None, description=None):
self.name = name
self.group = group
self._status = status
self.duration = duration
self.url = url
self.version = version
self.description = description
self.available_statuses = {
'passed': ['passed', 'fixed'],
'failed': ['failed', 'regression'],
'skipped': ['skipped']
}
@property
def status(self):
for s in self.available_statuses.keys():
if self._status in self.available_statuses[s]:
return s
logger.error('Unsupported result status: "{0}"!'.format(self._status))
return self._status
@status.setter
def status(self, value):
self._status = value
def __str__(self):
result_dict = {
'name': self.name,
'group': self.group,
'status': self.status,
'duration': self.duration,
'url': self.url,
'version': self.version,
'description': self.description
}
return str(result_dict)
def retry(count=3):
def wrapped(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
i = 0
while True:
try:
return func(*args, **kwargs)
except:
i += 1
if i >= count:
raise
return wrapper
return wrapped
def get_downstream_builds(jenkins_build_data):
return [{'name': b['jobName'], 'number': b['buildNumber']}
for b in jenkins_build_data['subBuilds']]
def get_version(jenkins_build_data):
parameters = [a['parameters'] for a in jenkins_build_data['actions']
if 'parameters' in a.keys()][0]
iso_link = [p['value'] for p in parameters if
p['name'].lower() == 'magnet_link'][0]
return (re.search(r'.*\bfuel-(\d+\.\d+)-(\d+)-.*', iso_link).group(1),
int(re.search(r'.*\bfuel-(\d+\.\d+)-(\d+)-.*', iso_link).group(2)))
@retry(count=3)
def get_tests_results(systest_build):
tests_results = []
test_build = Build(systest_build['name'], systest_build['number'])
for test in test_build.test_data()['suites'][0]['cases']:
test_result = TestResult(
name=test['name'],
group=test['className'],
status=test['status'].lower(),
duration='{0}s'.format(int(test['duration']) + 1),
url='{0}testReport/(root)/{1}/'.format(test_build.url,
test['name']),
version='_'.join([test_build.build_data["id"]] +
(test_build.build_data["description"]
or test['name']).split()),
description=test_build.build_data["description"] or
test['name'],
)
tests_results.append(test_result)
return tests_results
def publish_results(project, test_plan, tests_suite, config_id, results):
test_run_id = [run['id'] for run in test_plan['entries'][0]['runs'] if
config_id in run['config_ids']][0]
tests = project.get_tests(run_id=test_run_id)
results_to_publish = []
for result in results:
test = project.get_test_by_group(run_id=test_run_id,
group=result.group,
tests=tests)
if not test:
logger.error("Test for '{0}' group not found: {1}".format(
result.group, result.url))
continue
existing_results_versions = [r['version'] for r in
project.get_results_for_test(test['id'])]
if result.version not in existing_results_versions:
results_to_publish.append(result)
try:
if len(results_to_publish) > 0:
project.add_results_for_cases(run_id=test_run_id,
tests_suite=tests_suite,
tests_results=results_to_publish)
except:
logger.error('Failed to add new results for tests: {0}'.format(
[r.group for r in results_to_publish]
))
raise
return results_to_publish
def main():
parser = OptionParser(
description="Publish results of Jenkins build to TestRail."
" See conf.py for configuration."
)
parser.add_option('-j', '--job-name', dest='job_name', default=None,
help='Jenkins swarm runner job name')
parser.add_option('-N', '--build-number', dest='build_number',
default='latest',
help='Jenkins swarm runner build number')
parser.add_option("-w", "--view", dest="jenkins_view", default=False,
help="Get system tests jobs from Jenkins view")
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="Enable debug output")
(options, args) = parser.parse_args()
if options.verbose:
logger.setLevel(DEBUG)
tests_results_centos = []
tests_results_ubuntu = []
case_group = TestRailSettings.test_suite
if options.jenkins_view:
jobs = get_jobs_for_view(options.jenkins_view)
tests_jobs = [{'name': j, 'number': 'latest'}
for j in jobs if 'system_test' in j]
runner_job = [j for j in jobs if 'runner' in j][0]
runner_build = Build(runner_job, 'latest')
elif options.job_name:
runner_build = Build(options.job_name, options.build_number)
tests_jobs = get_downstream_builds(runner_build.build_data)
else:
logger.error("Please specify either Jenkins swarm runner job name (-j)"
" or Jenkins view with system tests jobs (-w). Exiting..")
return
milestone, iso_number = get_version(runner_build.build_data)
for systest_build in tests_jobs:
if 'centos' in systest_build['name'].lower():
tests_results_centos.extend(get_tests_results(systest_build))
elif 'ubuntu' in systest_build['name'].lower():
tests_results_ubuntu.extend(get_tests_results(systest_build))
project = TestRailProject(url=TestRailSettings.url,
user=TestRailSettings.user,
password=TestRailSettings.password,
project=TestRailSettings.project)
test_plan_name = '{milestone} {case_group} iso #{iso_number}'.format(
milestone=milestone, case_group=case_group, iso_number=iso_number)
operation_systems = [{'name': config['name'], 'id': config['id']}
for config in project.get_config_by_name(
'Operation System')['configs']]
if not project.get_plan_by_name(test_plan_name):
plan_entries = [project.test_run_struct(
name='{case_group}'.format(case_group=case_group),
suite=case_group,
milestone=milestone,
description='Results of system tests ({case_group}) on iso # '
'"{iso_number}"'.format(case_group=case_group,
iso_number=iso_number),
config_ids=[os['id']],
include_all=True) for os in operation_systems]
test_plan = project.add_plan(test_plan_name,
description='',
milestone=milestone,
entires=[
{
'suite_id': project.get_suite(
case_group)['id'],
'config_ids': [os['id'] for os in
operation_systems],
'runs': plan_entries
}
])
else:
test_plan = project.get_plan_by_name(test_plan_name)
logger.debug('Uploading tests results to TestRail...')
for os in operation_systems:
if 'centos' in os['name'].lower():
tests_results_centos = publish_results(project=project,
test_plan=test_plan,
tests_suite=case_group,
config_id=os['id'],
results=tests_results_centos
)
if 'ubuntu' in os['name'].lower():
tests_results_ubuntu = publish_results(project=project,
test_plan=test_plan,
tests_suite=case_group,
config_id=os['id'],
results=tests_results_ubuntu
)
logger.debug('Added new results for tests (CentOS): {tests}'.format(
tests=[r.group for r in tests_results_centos]
))
logger.debug('Added new results for tests (Ubuntu): {tests}'.format(
tests=[r.group for r in tests_results_ubuntu]
))
logger.info('Report URL: {0}'.format(test_plan['url']))
if __name__ == "__main__":
main()