123 lines
4.0 KiB
Python
123 lines
4.0 KiB
Python
# Copyright 2010-2011 OpenStack Foundation
|
|
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
|
|
import fixtures
|
|
import tempfile
|
|
import testtools
|
|
|
|
from sphinx.testing.path import path
|
|
from sphinx.testing.util import SphinxTestApp
|
|
|
|
|
|
def example_dir(name=""):
|
|
return os.path.join(os.path.dirname(__file__), 'examples', name)
|
|
|
|
|
|
_TRUE_VALUES = ('True', 'true', '1', 'yes')
|
|
|
|
|
|
class with_app:
|
|
def __init__(self, **kwargs):
|
|
if 'srcdir' in kwargs:
|
|
self.srcdir = path(kwargs['srcdir'])
|
|
self.sphinx_app_args = kwargs
|
|
|
|
def __call__(self, f):
|
|
def newf(*args, **kwargs):
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
tmpdir = path(tmpdirname)
|
|
tmproot = tmpdir / self.srcdir.basename()
|
|
self.srcdir.copytree(tmproot)
|
|
self.sphinx_app_args['srcdir'] = tmproot
|
|
self.builddir = tmproot.joinpath('_build')
|
|
|
|
app = SphinxTestApp(freshenv=True, **self.sphinx_app_args)
|
|
|
|
f(*args, app, app._status, app._warning, **kwargs)
|
|
|
|
app.cleanup()
|
|
return newf
|
|
|
|
|
|
class OutputStreamCapture(fixtures.Fixture):
|
|
"""Capture output streams during tests.
|
|
|
|
This fixture captures errant printing to stderr / stdout during
|
|
the tests and lets us see those streams at the end of the test
|
|
runs instead. Useful to see what was happening during failed
|
|
tests.
|
|
"""
|
|
def setUp(self):
|
|
super(OutputStreamCapture, self).setUp()
|
|
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
|
|
self.out = self.useFixture(fixtures.StringStream('stdout'))
|
|
self.useFixture(
|
|
fixtures.MonkeyPatch('sys.stdout', self.out.stream))
|
|
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
|
|
self.err = self.useFixture(fixtures.StringStream('stderr'))
|
|
self.useFixture(
|
|
fixtures.MonkeyPatch('sys.stderr', self.err.stream))
|
|
|
|
@property
|
|
def stderr(self):
|
|
return self.err._details["stderr"].as_text()
|
|
|
|
@property
|
|
def stdout(self):
|
|
return self.out._details["stdout"].as_text()
|
|
|
|
|
|
class Timeout(fixtures.Fixture):
|
|
"""Setup per test timeouts.
|
|
|
|
In order to avoid test deadlocks we support setting up a test
|
|
timeout parameter read from the environment. In almost all
|
|
cases where the timeout is reached this means a deadlock.
|
|
|
|
A class level TIMEOUT_SCALING_FACTOR also exists, which allows
|
|
extremely long tests to specify they need more time.
|
|
"""
|
|
|
|
def __init__(self, timeout, scaling=1):
|
|
super(Timeout, self).__init__()
|
|
try:
|
|
self.test_timeout = int(timeout)
|
|
except ValueError:
|
|
# If timeout value is invalid do not set a timeout.
|
|
self.test_timeout = 0
|
|
if scaling >= 1:
|
|
self.test_timeout *= scaling
|
|
else:
|
|
raise ValueError('scaling value must be >= 1')
|
|
|
|
def setUp(self):
|
|
super(Timeout, self).setUp()
|
|
if self.test_timeout > 0:
|
|
self.useFixture(fixtures.Timeout(self.test_timeout, gentle=True))
|
|
|
|
|
|
class TestCase(testtools.TestCase):
|
|
|
|
"""Test case base class for all unit tests."""
|
|
|
|
def setUp(self):
|
|
"""Run before each test method to initialize test environment."""
|
|
super(TestCase, self).setUp()
|
|
self.useFixture(Timeout(
|
|
os.environ.get('OS_TEST_TIMEOUT', 0)))
|
|
self.useFixture(OutputStreamCapture())
|