330 lines
10 KiB
Python
Executable File
330 lines
10 KiB
Python
Executable File
#!/usr/bin/env python
|
|
"""Run Tulip unittests.
|
|
|
|
Usage:
|
|
python3 runtests.py [flags] [pattern] ...
|
|
|
|
Patterns are matched against the fully qualified name of the test,
|
|
including package, module, class and method,
|
|
e.g. 'tests.test_events.PolicyTests.testPolicy'.
|
|
|
|
For full help, try --help.
|
|
|
|
runtests.py --coverage is equivalent of:
|
|
|
|
$(COVERAGE) run --branch runtests.py -v
|
|
$(COVERAGE) html $(list of files)
|
|
$(COVERAGE) report -m $(list of files)
|
|
|
|
"""
|
|
|
|
# Originally written by Beech Horn (for NDB).
|
|
|
|
from __future__ import print_function
|
|
import optparse
|
|
import gc
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import sys
|
|
import textwrap
|
|
PY33 = (sys.version_info >= (3, 3))
|
|
if PY33:
|
|
import importlib.machinery
|
|
else:
|
|
import imp
|
|
try:
|
|
import coverage
|
|
except ImportError:
|
|
coverage = None
|
|
if sys.version_info < (3,):
|
|
sys.exc_clear()
|
|
|
|
try:
|
|
import unittest
|
|
from unittest.signals import installHandler
|
|
except ImportError:
|
|
import unittest2 as unittest
|
|
from unittest2.signals import installHandler
|
|
|
|
ARGS = optparse.OptionParser(description="Run all unittests.", usage="%prog [options] [pattern] [pattern2 ...]")
|
|
ARGS.add_option(
|
|
'-v', '--verbose', action="store_true", dest='verbose',
|
|
default=0, help='verbose')
|
|
ARGS.add_option(
|
|
'-x', action="store_true", dest='exclude', help='exclude tests')
|
|
ARGS.add_option(
|
|
'-f', '--failfast', action="store_true", default=False,
|
|
dest='failfast', help='Stop on first fail or error')
|
|
ARGS.add_option(
|
|
'-c', '--catch', action="store_true", default=False,
|
|
dest='catchbreak', help='Catch control-C and display results')
|
|
ARGS.add_option(
|
|
'-m', '--monkey-patch', action="store_true", default=False,
|
|
dest='monkey_patch', help='Enable eventlet monkey patching')
|
|
ARGS.add_option(
|
|
'--forever', action="store_true", dest='forever', default=False,
|
|
help='run tests forever to catch sporadic errors')
|
|
ARGS.add_option(
|
|
'--findleaks', action='store_true', dest='findleaks',
|
|
help='detect tests that leak memory')
|
|
ARGS.add_option(
|
|
'-r', '--randomize', action='store_true',
|
|
help='randomize test execution order.')
|
|
ARGS.add_option(
|
|
'--seed', type=int,
|
|
help='random seed to reproduce a previous random run')
|
|
ARGS.add_option(
|
|
'-q', action="store_true", dest='quiet', help='quiet')
|
|
ARGS.add_option(
|
|
'--tests', action="store", dest='testsdir', default='tests',
|
|
help='tests directory')
|
|
ARGS.add_option(
|
|
'--coverage', action="store_true", dest='coverage',
|
|
help='enable html coverage report')
|
|
|
|
|
|
if PY33:
|
|
def load_module(modname, sourcefile):
|
|
loader = importlib.machinery.SourceFileLoader(modname, sourcefile)
|
|
return loader.load_module()
|
|
else:
|
|
def load_module(modname, sourcefile):
|
|
return imp.load_source(modname, sourcefile)
|
|
|
|
|
|
def load_modules(basedir, suffix='.py'):
|
|
def list_dir(prefix, dir):
|
|
files = []
|
|
|
|
modpath = os.path.join(dir, '__init__.py')
|
|
if os.path.isfile(modpath):
|
|
mod = os.path.split(dir)[-1]
|
|
files.append(('{0}{1}'.format(prefix, mod), modpath))
|
|
|
|
prefix = '{0}{1}.'.format(prefix, mod)
|
|
|
|
for name in os.listdir(dir):
|
|
path = os.path.join(dir, name)
|
|
|
|
if os.path.isdir(path):
|
|
files.extend(list_dir('{0}{1}.'.format(prefix, name), path))
|
|
else:
|
|
if (name != '__init__.py' and
|
|
name.endswith(suffix) and
|
|
not name.startswith(('.', '_'))):
|
|
files.append(('{0}{1}'.format(prefix, name[:-3]), path))
|
|
|
|
return files
|
|
|
|
mods = []
|
|
for modname, sourcefile in list_dir('', basedir):
|
|
if modname == 'runtests':
|
|
continue
|
|
try:
|
|
mod = load_module(modname, sourcefile)
|
|
mods.append((mod, sourcefile))
|
|
except SyntaxError:
|
|
raise
|
|
#except Exception as err:
|
|
# print("Skipping '{0}': {1}".format(modname, err), file=sys.stderr)
|
|
|
|
return mods
|
|
|
|
|
|
def randomize_tests(tests, seed):
|
|
if seed is None:
|
|
seed = random.randrange(10000000)
|
|
random.seed(seed)
|
|
print("Using random seed", seed)
|
|
random.shuffle(tests._tests)
|
|
|
|
|
|
class TestsFinder:
|
|
|
|
def __init__(self, testsdir, includes=(), excludes=()):
|
|
self._testsdir = testsdir
|
|
self._includes = includes
|
|
self._excludes = excludes
|
|
self.find_available_tests()
|
|
|
|
def find_available_tests(self):
|
|
"""
|
|
Find available test classes without instantiating them.
|
|
"""
|
|
self._test_factories = []
|
|
mods = [mod for mod, _ in load_modules(self._testsdir)]
|
|
for mod in mods:
|
|
for name in set(dir(mod)):
|
|
if name.endswith('Tests'):
|
|
self._test_factories.append(getattr(mod, name))
|
|
|
|
def load_tests(self):
|
|
"""
|
|
Load test cases from the available test classes and apply
|
|
optional include / exclude filters.
|
|
"""
|
|
loader = unittest.TestLoader()
|
|
suite = unittest.TestSuite()
|
|
for test_factory in self._test_factories:
|
|
tests = loader.loadTestsFromTestCase(test_factory)
|
|
if self._includes:
|
|
tests = [test
|
|
for test in tests
|
|
if any(re.search(pat, test.id())
|
|
for pat in self._includes)]
|
|
if self._excludes:
|
|
tests = [test
|
|
for test in tests
|
|
if not any(re.search(pat, test.id())
|
|
for pat in self._excludes)]
|
|
suite.addTests(tests)
|
|
return suite
|
|
|
|
|
|
class TestResult(unittest.TextTestResult):
|
|
|
|
def __init__(self, stream, descriptions, verbosity):
|
|
super().__init__(stream, descriptions, verbosity)
|
|
self.leaks = []
|
|
|
|
def startTest(self, test):
|
|
super().startTest(test)
|
|
gc.collect()
|
|
|
|
def addSuccess(self, test):
|
|
super().addSuccess(test)
|
|
gc.collect()
|
|
if gc.garbage:
|
|
if self.showAll:
|
|
self.stream.writeln(
|
|
" Warning: test created {} uncollectable "
|
|
"object(s).".format(len(gc.garbage)))
|
|
# move the uncollectable objects somewhere so we don't see
|
|
# them again
|
|
self.leaks.append((self.getDescription(test), gc.garbage[:]))
|
|
del gc.garbage[:]
|
|
|
|
|
|
class TestRunner(unittest.TextTestRunner):
|
|
resultclass = TestResult
|
|
|
|
def run(self, test):
|
|
result = super().run(test)
|
|
if result.leaks:
|
|
self.stream.writeln("{0} tests leaks:".format(len(result.leaks)))
|
|
for name, leaks in result.leaks:
|
|
self.stream.writeln(' '*4 + name + ':')
|
|
for leak in leaks:
|
|
self.stream.writeln(' '*8 + repr(leak))
|
|
return result
|
|
|
|
|
|
def runtests():
|
|
args, pattern = ARGS.parse_args()
|
|
|
|
if args.coverage and coverage is None:
|
|
URL = "bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py"
|
|
print(textwrap.dedent("""
|
|
coverage package is not installed.
|
|
|
|
To install coverage3 for Python 3, you need:
|
|
- Setuptools (https://pypi.python.org/pypi/setuptools)
|
|
|
|
What worked for me:
|
|
- download {0}
|
|
* curl -O https://{0}
|
|
- python3 ez_setup.py
|
|
- python3 -m easy_install coverage
|
|
""".format(URL)).strip())
|
|
sys.exit(1)
|
|
|
|
testsdir = os.path.abspath(args.testsdir)
|
|
if not os.path.isdir(testsdir):
|
|
print("Tests directory is not found: {0}\n".format(testsdir))
|
|
ARGS.print_help()
|
|
return
|
|
|
|
excludes = includes = []
|
|
if args.exclude:
|
|
excludes = pattern
|
|
else:
|
|
includes = pattern
|
|
|
|
v = 0 if args.quiet else args.verbose + 1
|
|
failfast = args.failfast
|
|
catchbreak = args.catchbreak
|
|
findleaks = args.findleaks
|
|
runner_factory = TestRunner if findleaks else unittest.TextTestRunner
|
|
|
|
if args.monkey_patch:
|
|
print("Enable eventlet monkey patching of the Python standard library")
|
|
import eventlet
|
|
eventlet.monkey_patch()
|
|
|
|
if args.coverage:
|
|
cov = coverage.coverage(branch=True,
|
|
source=['asyncio'],
|
|
)
|
|
cov.start()
|
|
|
|
logger = logging.getLogger()
|
|
if v == 0:
|
|
level = logging.CRITICAL
|
|
elif v == 1:
|
|
level = logging.ERROR
|
|
elif v == 2:
|
|
level = logging.WARNING
|
|
elif v == 3:
|
|
level = logging.INFO
|
|
elif v >= 4:
|
|
level = logging.DEBUG
|
|
logging.basicConfig(level=level)
|
|
|
|
finder = TestsFinder(args.testsdir, includes, excludes)
|
|
if catchbreak:
|
|
installHandler()
|
|
import tests
|
|
if hasattr(tests.asyncio, 'coroutines'):
|
|
debug = tests.asyncio.coroutines._DEBUG
|
|
else:
|
|
# Tulip <= 3.4.1
|
|
debug = tests.asyncio.tasks._DEBUG
|
|
if debug:
|
|
print("Run tests in debug mode")
|
|
else:
|
|
print("Run tests in release mode")
|
|
sys.stdout.flush()
|
|
try:
|
|
if args.forever:
|
|
while True:
|
|
tests = finder.load_tests()
|
|
if args.randomize:
|
|
randomize_tests(tests, args.seed)
|
|
result = runner_factory(verbosity=v,
|
|
failfast=failfast).run(tests)
|
|
if not result.wasSuccessful():
|
|
sys.exit(1)
|
|
else:
|
|
tests = finder.load_tests()
|
|
if args.randomize:
|
|
randomize_tests(tests, args.seed)
|
|
result = runner_factory(verbosity=v,
|
|
failfast=failfast).run(tests)
|
|
sys.exit(not result.wasSuccessful())
|
|
finally:
|
|
if args.coverage:
|
|
cov.stop()
|
|
cov.save()
|
|
cov.html_report(directory='htmlcov')
|
|
print("\nCoverage report:")
|
|
cov.report(show_missing=False)
|
|
here = os.path.dirname(os.path.abspath(__file__))
|
|
print("\nFor html report:")
|
|
print("open file://{0}/htmlcov/index.html".format(here))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
runtests()
|