Simplify base test cases

Previous changes (Ifa270536481fcb19c476c9c62d89e6c5cae36ca1 and
I44251db399cd73390a9d1931a7f253662002ba10) separated out test setup
that had to import Neutron to allow the api tests to run.  The api
tests previously imported Tempest, and errors would result if both
Neutron and Tempest were imported in the same test run.  Now that the
api tests do not import Tempest, the base test cases can be simplified
by reversing the referenced changes.

A dependent change to neutron-fwaas removes reference to testlib
plugin: I0f2098cfd380fb6978d643cfd09bcc5cf8ddbdb9

Change-Id: Ifca5615680217818b8c5e8fc2dee5d089fbd9532
This commit is contained in:
Maru Newby 2015-03-23 23:18:44 +00:00 committed by armando-migliaccio
parent 0c2cf8b460
commit a314544def
23 changed files with 237 additions and 167 deletions

View File

@ -52,7 +52,7 @@ import abc
import six
import testtools
from neutron.tests import sub_base
from neutron.tests import base
@six.add_metaclass(abc.ABCMeta)
@ -101,7 +101,7 @@ class BaseNeutronClient(object):
pass
class BaseTestApi(sub_base.SubBaseTestCase):
class BaseTestApi(base.BaseTestCase):
scenarios = ()
@ -113,7 +113,7 @@ class BaseTestApi(sub_base.SubBaseTestCase):
self.client.setUp(self)
def test_network_lifecycle(self):
net = self.client.create_network(name=sub_base.get_rand_name())
net = self.client.create_network(name=base.get_rand_name())
listed_networks = dict((x.id, x.name)
for x in self.client.get_networks())
self.assertIn(net.id, listed_networks)

View File

@ -22,8 +22,8 @@ from tempest_lib import exceptions
import testscenarios
from neutron.tests.api import base_v2
from neutron.tests import sub_base
from neutron.tests.tempest import test as t_test
from neutron.tests import base
from neutron.tests.tempest.api.network import base as t_base
# Required to generate tests from scenarios. Not compatible with nose.
load_tests = testscenarios.load_tests_apply_scenarios
@ -34,7 +34,7 @@ class TempestRestClient(base_v2.BaseNeutronClient):
@property
def client(self):
if not hasattr(self, '_client'):
manager = t_test.BaseTestCase.get_client_manager()
manager = t_base.BaseNetworkTest.get_client_manager()
self._client = manager.network_client
return self._client
@ -56,19 +56,19 @@ class TempestRestClient(base_v2.BaseNeutronClient):
def _create_network(self, **kwargs):
# Internal method - use create_network() instead
body = self.client.create_network(**kwargs)
return sub_base.AttributeDict(body['network'])
return base.AttributeDict(body['network'])
def update_network(self, id_, **kwargs):
body = self.client.update_network(id_, **kwargs)
return sub_base.AttributeDict(body['network'])
return base.AttributeDict(body['network'])
def get_network(self, id_, **kwargs):
body = self.client.show_network(id_, **kwargs)
return sub_base.AttributeDict(body['network'])
return base.AttributeDict(body['network'])
def get_networks(self, **kwargs):
body = self.client.list_networks(**kwargs)
return [sub_base.AttributeDict(x) for x in body['networks']]
return [base.AttributeDict(x) for x in body['networks']]
def delete_network(self, id_):
self.client.delete_network(id_)
@ -76,3 +76,7 @@ class TempestRestClient(base_v2.BaseNeutronClient):
class TestApiWithRestClient(base_v2.BaseTestApi):
scenarios = [('tempest', {'client': TempestRestClient()})]
def setUp(self):
raise self.skipException(
'Tempest fixture requirements prevent this test from running')

View File

@ -13,36 +13,40 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Base test case for tests that do not rely on Tempest.
To change behavoir that is common to all tests, please target
the neutron.tests.sub_base module instead.
If a test needs to import a dependency like Tempest, see
neutron.tests.sub_base for a base test class that can be used without
errors due to duplicate configuration definitions.
"""Base test cases for all neutron tests.
"""
import contextlib
import gc
import logging as std_logging
import os
import os.path
import random
import traceback
import weakref
import eventlet.timeout
import fixtures
import mock
from oslo_concurrency.fixture import lockutils
from oslo_config import cfg
from oslo_messaging import conffixture as messaging_conffixture
from oslo_utils import strutils
import testtools
from neutron.agent.linux import external_process
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron.db import agentschedulers_db
from neutron import manager
from neutron import policy
from neutron.tests import fake_notifier
from neutron.tests import sub_base
from neutron.tests import post_mortem_debug
CONF = cfg.CONF
CONF.import_opt('state_path', 'neutron.common.config')
LOG_FORMAT = sub_base.LOG_FORMAT
LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
TEST_ROOT_DIR = os.path.dirname(__file__)
@ -60,7 +64,125 @@ def fake_consume_in_threads(self):
return []
bool_from_env = sub_base.bool_from_env
def get_rand_name(max_length=None, prefix='test'):
name = prefix + str(random.randint(1, 0x7fffffff))
return name[:max_length] if max_length is not None else name
def bool_from_env(key, strict=False, default=False):
value = os.environ.get(key)
return strutils.bool_from_string(value, strict=strict, default=default)
class AttributeDict(dict):
"""
Provide attribute access (dict.key) to dictionary values.
"""
def __getattr__(self, name):
"""Allow attribute access for all keys in the dict."""
if name in self:
return self[name]
raise AttributeError(_("Unknown attribute '%s'.") % name)
class DietTestCase(testtools.TestCase):
"""Same great taste, less filling.
BaseTestCase is responsible for doing lots of plugin-centric setup
that not all tests require (or can tolerate). This class provides
only functionality that is common across all tests.
"""
def setUp(self):
super(DietTestCase, self).setUp()
# Configure this first to ensure pm debugging support for setUp()
debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
if debugger:
self.addOnException(post_mortem_debug.get_exception_handler(
debugger))
if bool_from_env('OS_DEBUG'):
_level = std_logging.DEBUG
else:
_level = std_logging.INFO
capture_logs = bool_from_env('OS_LOG_CAPTURE')
if not capture_logs:
std_logging.basicConfig(format=LOG_FORMAT, level=_level)
self.log_fixture = self.useFixture(
fixtures.FakeLogger(
format=LOG_FORMAT,
level=_level,
nuke_handlers=capture_logs,
))
test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
if test_timeout == -1:
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
# If someone does use tempfile directly, ensure that it's cleaned up
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
self.addCleanup(mock.patch.stopall)
if bool_from_env('OS_STDOUT_CAPTURE'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if bool_from_env('OS_STDERR_CAPTURE'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.addOnException(self.check_for_systemexit)
def check_for_systemexit(self, exc_info):
if isinstance(exc_info[1], SystemExit):
self.fail("A SystemExit was raised during the test. %s"
% traceback.format_exception(*exc_info))
@contextlib.contextmanager
def assert_max_execution_time(self, max_execution_time=5):
with eventlet.timeout.Timeout(max_execution_time, False):
yield
return
self.fail('Execution of this test timed out')
def assertOrderedEqual(self, expected, actual):
expect_val = self.sort_dict_lists(expected)
actual_val = self.sort_dict_lists(actual)
self.assertEqual(expect_val, actual_val)
def sort_dict_lists(self, dic):
for key, value in dic.iteritems():
if isinstance(value, list):
dic[key] = sorted(value)
elif isinstance(value, dict):
dic[key] = self.sort_dict_lists(value)
return dic
def assertDictSupersetOf(self, expected_subset, actual_superset):
"""Checks that actual dict contains the expected dict.
After checking that the arguments are of the right type, this checks
that each item in expected_subset is in, and matches, what is in
actual_superset. Separate tests are done, so that detailed info can
be reported upon failure.
"""
if not isinstance(expected_subset, dict):
self.fail("expected_subset (%s) is not an instance of dict" %
type(expected_subset))
if not isinstance(actual_superset, dict):
self.fail("actual_superset (%s) is not an instance of dict" %
type(actual_superset))
for k, v in expected_subset.items():
self.assertIn(k, actual_superset)
self.assertEqual(v, actual_superset[k],
"Key %(key)s expected: %(exp)r, actual %(act)r" %
{'key': k, 'exp': v, 'act': actual_superset[k]})
class ProcessMonitorFixture(fixtures.Fixture):
@ -85,7 +207,7 @@ class ProcessMonitorFixture(fixtures.Fixture):
self.instances.append(instance)
class BaseTestCase(sub_base.SubBaseTestCase):
class BaseTestCase(DietTestCase):
@staticmethod
def config_parse(conf=None, args=None):
@ -209,3 +331,57 @@ class BaseTestCase(sub_base.SubBaseTestCase):
group = kw.pop('group', None)
for k, v in kw.iteritems():
CONF.set_override(k, v, group)
def setup_coreplugin(self, core_plugin=None):
self.useFixture(PluginFixture(core_plugin))
def setup_notification_driver(self, notification_driver=None):
self.addCleanup(fake_notifier.reset)
if notification_driver is None:
notification_driver = [fake_notifier.__name__]
cfg.CONF.set_override("notification_driver", notification_driver)
class PluginFixture(fixtures.Fixture):
def __init__(self, core_plugin=None):
self.core_plugin = core_plugin
def setUp(self):
super(PluginFixture, self).setUp()
self.dhcp_periodic_p = mock.patch(
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
# Plugin cleanup should be triggered last so that
# test-specific cleanup has a chance to release references.
self.addCleanup(self.cleanup_core_plugin)
if self.core_plugin is not None:
cfg.CONF.set_override('core_plugin', self.core_plugin)
def cleanup_core_plugin(self):
"""Ensure that the core plugin is deallocated."""
nm = manager.NeutronManager
if not nm.has_instance():
return
# TODO(marun) Fix plugins that do not properly initialize notifiers
agentschedulers_db.AgentSchedulerDbMixin.agent_notifiers = {}
# Perform a check for deallocation only if explicitly
# configured to do so since calling gc.collect() after every
# test increases test suite execution time by ~50%.
check_plugin_deallocation = (
bool_from_env('OS_CHECK_PLUGIN_DEALLOCATION'))
if check_plugin_deallocation:
plugin = weakref.ref(nm._instance.plugin)
nm.clear_instance()
if check_plugin_deallocation:
gc.collect()
# TODO(marun) Ensure that mocks are deallocated?
if plugin() and not isinstance(plugin(), mock.Base):
raise AssertionError(
'The plugin for this test was not deallocated.')

View File

@ -12,7 +12,7 @@
#
from neutron.common import constants as n_const
from neutron.tests import sub_base
from neutron.tests import base
def create_resource(prefix, creation_func, *args, **kwargs):
@ -25,7 +25,7 @@ def create_resource(prefix, creation_func, *args, **kwargs):
:param *args *kwargs: These will be passed to the create function.
"""
while True:
name = sub_base.get_rand_name(
name = base.get_rand_name(
max_length=n_const.DEVICE_NAME_MAX_LEN,
prefix=prefix)
try:

View File

@ -19,10 +19,10 @@ from neutron.agent.common import ovs_lib
from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.common import constants as n_const
from neutron.tests.common import base
from neutron.tests import base as tests_base
from neutron.tests.common import base as common_base
from neutron.tests.common import net_helpers
from neutron.tests.functional import base as functional_base
from neutron.tests import sub_base
BR_PREFIX = 'test-br'
@ -36,7 +36,7 @@ ICMP_BLOCK_RULE = '-p icmp -j DROP'
#TODO(jschwarz): Move these two functions to neutron/tests/common/
get_rand_name = sub_base.get_rand_name
get_rand_name = tests_base.get_rand_name
def get_rand_bridge_name():
@ -69,7 +69,7 @@ class BaseOVSLinuxTestCase(testscenarios.WithScenarios, BaseLinuxTestCase):
self.ip = ip_lib.IPWrapper()
def create_ovs_bridge(self, br_prefix=BR_PREFIX):
br = base.create_resource(br_prefix, self.ovs.add_bridge)
br = common_base.create_resource(br_prefix, self.ovs.add_bridge)
self.addCleanup(br.destroy)
return br
@ -78,7 +78,7 @@ class BaseOVSLinuxTestCase(testscenarios.WithScenarios, BaseLinuxTestCase):
br.replace_port(name, ('type', 'internal'))
self.addCleanup(br.delete_port, name)
return name
port_name = base.create_resource(PORT_PREFIX, create_port)
port_name = common_base.create_resource(PORT_PREFIX, create_port)
port_dev = self.ip.device(port_name)
ns.add_device_to_namespace(port_dev)
port_dev.link.set_up()

View File

@ -25,10 +25,9 @@ from neutron.common import exceptions as q_exc
from neutron import context
from neutron import manager
from neutron.tests.api import base_v2
from neutron.tests import sub_base
from neutron.tests import base
from neutron.tests.unit.ml2 import test_ml2_plugin
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
# Each plugin must add a class to plugin_configurations that can configure the
@ -70,20 +69,20 @@ class PluginClient(base_v2.BaseNeutronClient):
kwargs.setdefault('shared', False)
data = dict(network=kwargs)
result = self.plugin.create_network(self.ctx, data)
return sub_base.AttributeDict(result)
return base.AttributeDict(result)
def update_network(self, id_, **kwargs):
data = dict(network=kwargs)
result = self.plugin.update_network(self.ctx, id_, data)
return sub_base.AttributeDict(result)
return base.AttributeDict(result)
def get_network(self, *args, **kwargs):
result = self.plugin.get_network(self.ctx, *args, **kwargs)
return sub_base.AttributeDict(result)
return base.AttributeDict(result)
def get_networks(self, *args, **kwargs):
result = self.plugin.get_networks(self.ctx, *args, **kwargs)
return [sub_base.AttributeDict(x) for x in result]
return [base.AttributeDict(x) for x in result]
def delete_network(self, id_):
self.plugin.delete_network(self.ctx, id_)
@ -100,8 +99,7 @@ def get_scenarios():
class TestPluginApi(base_v2.BaseTestApi,
testlib_api.SqlTestCase,
testlib_plugin.PluginSetupHelper):
testlib_api.SqlTestCase):
scenarios = get_scenarios()
@ -112,5 +110,5 @@ class TestPluginApi(base_v2.BaseTestApi,
# setUp, since that setup will be called by SqlTestCase.setUp.
super(TestPluginApi, self).setUp(setup_parent=False)
testlib_api.SqlTestCase.setUp(self)
self.setup_coreplugin(self.plugin_conf.plugin_name)
self.useFixture(base.PluginFixture(self.plugin_conf.plugin_name))
self.plugin_conf.setUp(self)

View File

@ -28,7 +28,6 @@ from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.scheduler import l3_agent_scheduler
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
_uuid = uuidutils.generate_uuid
@ -40,8 +39,7 @@ class FakeL3PluginWithAgents(common_db_mixin.CommonDbMixin,
pass
class L3HATestFramework(testlib_api.SqlTestCase,
testlib_plugin.PluginSetupHelper):
class L3HATestFramework(testlib_api.SqlTestCase):
def setUp(self):
super(L3HATestFramework, self).setUp()

View File

@ -19,7 +19,6 @@ from neutron.openstack.common import uuidutils
from neutron.services.metering.agents import metering_agent
from neutron.tests import base
from neutron.tests import fake_notifier
from neutron.tests.unit import testlib_plugin
_uuid = uuidutils.generate_uuid
@ -36,8 +35,7 @@ ROUTERS = [{'status': 'ACTIVE',
'id': _uuid()}]
class TestMeteringOperations(base.BaseTestCase,
testlib_plugin.NotificationSetupHelper):
class TestMeteringOperations(base.BaseTestCase):
def setUp(self):
super(TestMeteringOperations, self).setUp()

View File

@ -38,7 +38,6 @@ from neutron import quota
from neutron.tests import base
from neutron.tests import fake_notifier
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
@ -87,7 +86,7 @@ class ResourceIndexTestCase(base.BaseTestCase):
self.assertEqual(link['rel'], 'self')
class APIv2TestBase(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
class APIv2TestBase(base.BaseTestCase):
def setUp(self):
super(APIv2TestBase, self).setUp()
@ -1093,7 +1092,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.assertEqual(res.status_int, 400)
class SubresourceTest(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
class SubresourceTest(base.BaseTestCase):
def setUp(self):
super(SubresourceTest, self).setUp()
@ -1368,7 +1367,7 @@ class QuotaTest(APIv2TestBase):
self.assertEqual(res.status_int, exc.HTTPCreated.code)
class ExtensionTestCase(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
class ExtensionTestCase(base.BaseTestCase):
def setUp(self):
super(ExtensionTestCase, self).setUp()
plugin = 'neutron.neutron_plugin_base_v2.NeutronPluginBaseV2'

View File

@ -29,11 +29,9 @@ from neutron import quota
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
class ExtensionTestCase(testlib_api.WebTestCase,
testlib_plugin.PluginSetupHelper):
class ExtensionTestCase(testlib_api.WebTestCase):
def _resotre_attr_map(self):
attributes.RESOURCE_ATTRIBUTE_MAP = self._saved_attr_map

View File

@ -41,7 +41,6 @@ from neutron import manager
from neutron.tests import base
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
@ -78,8 +77,7 @@ def _get_create_db_method(resource):
return 'create_%s' % resource
class NeutronDbPluginV2TestCase(testlib_api.WebTestCase,
testlib_plugin.PluginSetupHelper):
class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
resource_prefix_map = {}

View File

@ -20,11 +20,9 @@ from neutron import context
from neutron import manager
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
class TestNetworks(testlib_api.SqlTestCase,
testlib_plugin.PluginSetupHelper):
class TestNetworks(testlib_api.SqlTestCase):
def setUp(self):
super(TestNetworks, self).setUp()
self._tenant_id = 'test-tenant'

View File

@ -31,7 +31,6 @@ from neutron.db import models_v2
from neutron.extensions import dhcpagentscheduler
from neutron.scheduler import dhcp_agent_scheduler
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
# Required to generate tests from scenarios. Not compatible with nose.
load_tests = testscenarios.load_tests_apply_scenarios
@ -271,8 +270,7 @@ class TestNetworksFailover(TestDhcpSchedulerBaseTestCase,
self.assertIn('foo4', res_ids)
class DHCPAgentWeightSchedulerTestCase(TestDhcpSchedulerBaseTestCase,
testlib_plugin.PluginSetupHelper):
class DHCPAgentWeightSchedulerTestCase(TestDhcpSchedulerBaseTestCase):
"""Unit test scenarios for WeightScheduler.schedule."""
hostc = {

View File

@ -30,7 +30,6 @@ from neutron.openstack.common import uuidutils
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
_uuid = uuidutils.generate_uuid
FAKE_GW_PORT_ID = _uuid()
@ -75,8 +74,7 @@ class TestDbSepPlugin(test_l3_plugin.TestL3NatServicePlugin,
supported_extension_aliases = ["router", "ext-gw-mode"]
class TestL3GwModeMixin(testlib_api.SqlTestCase,
testlib_plugin.PluginSetupHelper):
class TestL3GwModeMixin(testlib_api.SqlTestCase):
def setUp(self):
super(TestL3GwModeMixin, self).setUp()

View File

@ -32,7 +32,6 @@ from neutron.tests import base
from neutron.tests.unit.extensions import extendedattribute as extattr
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
from neutron import wsgi
_uuid = test_api_v2._uuid
@ -67,8 +66,7 @@ class ExtensionExtendedAttributeTestPlugin(
return self.objh[id]
class ExtensionExtendedAttributeTestCase(base.BaseTestCase,
testlib_plugin.PluginSetupHelper):
class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
def setUp(self):
super(ExtensionExtendedAttributeTestCase, self).setUp()
plugin = (

View File

@ -29,7 +29,6 @@ from neutron import quota
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
class ProviderExtensionManager(object):
@ -47,8 +46,7 @@ class ProviderExtensionManager(object):
return pnet.get_extended_resources(version)
class ProvidernetExtensionTestCase(testlib_api.WebTestCase,
testlib_plugin.PluginSetupHelper):
class ProvidernetExtensionTestCase(testlib_api.WebTestCase):
fmt = 'json'
def setUp(self):

View File

@ -51,7 +51,6 @@ from neutron.tests.unit import test_agent_ext_plugin
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_api_v2_extension
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import testlib_plugin
LOG = logging.getLogger(__name__)
@ -2356,8 +2355,7 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
self._test_notify_op_agent(self._test_floatingips_op_agent)
class L3BaseForIntTests(test_db_plugin.NeutronDbPluginV2TestCase,
testlib_plugin.NotificationSetupHelper):
class L3BaseForIntTests(test_db_plugin.NeutronDbPluginV2TestCase):
mock_rescheduling = True
@ -2378,8 +2376,7 @@ class L3BaseForIntTests(test_db_plugin.NeutronDbPluginV2TestCase,
self.setup_notification_driver()
class L3BaseForSepTests(test_db_plugin.NeutronDbPluginV2TestCase,
testlib_plugin.NotificationSetupHelper):
class L3BaseForSepTests(test_db_plugin.NeutronDbPluginV2TestCase):
def setUp(self, plugin=None, ext_mgr=None):
# the plugin without L3 support

View File

@ -44,7 +44,6 @@ from neutron.tests import base
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
# the below code is required for the following reason
# (as documented in testscenarios)
@ -855,8 +854,7 @@ class L3DvrScheduler(l3_db.L3_NAT_db_mixin,
pass
class L3DvrSchedulerTestCase(testlib_api.SqlTestCase,
testlib_plugin.PluginSetupHelper):
class L3DvrSchedulerTestCase(testlib_api.SqlTestCase):
def setUp(self):
plugin = 'neutron.plugins.ml2.plugin.Ml2Plugin'
@ -1168,8 +1166,7 @@ class L3HAPlugin(db_v2.NeutronDbPluginV2,
class L3HATestCaseMixin(testlib_api.SqlTestCase,
L3SchedulerBaseMixin,
testlib_plugin.PluginSetupHelper):
L3SchedulerBaseMixin):
def setUp(self):
super(L3HATestCaseMixin, self).setUp()
@ -1435,7 +1432,6 @@ class L3HALeastRoutersSchedulerTestCase(L3HATestCaseMixin):
class TestGetL3AgentsWithAgentModeFilter(testlib_api.SqlTestCase,
testlib_plugin.PluginSetupHelper,
L3SchedulerBaseMixin):
"""Test cases to test get_l3_agents.

View File

@ -23,7 +23,6 @@ from neutron import manager
from neutron.plugins.common import constants
from neutron.tests import base
from neutron.tests.unit import dummy_plugin
from neutron.tests.unit import testlib_plugin
LOG = logging.getLogger(__name__)
@ -39,8 +38,7 @@ class CorePluginWithAgentNotifiers(object):
'dhcp': 'dhcp_agent_notifier'}
class NeutronManagerTestCase(base.BaseTestCase,
testlib_plugin.PluginSetupHelper):
class NeutronManagerTestCase(base.BaseTestCase):
def setUp(self):
super(NeutronManagerTestCase, self).setUp()

View File

@ -32,15 +32,13 @@ from neutron import quota
from neutron.tests import base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
TARGET_PLUGIN = 'neutron.plugins.ml2.plugin.Ml2Plugin'
_get_path = test_api_v2._get_path
class QuotaExtensionTestCase(testlib_api.WebTestCase,
testlib_plugin.PluginSetupHelper):
class QuotaExtensionTestCase(testlib_api.WebTestCase):
def setUp(self):
super(QuotaExtensionTestCase, self).setUp()

View File

@ -18,14 +18,14 @@ from oslo_config import cfg
from oslo_messaging import conffixture as messaging_conffixture
from neutron.common import rpc
from neutron.tests import sub_base
from neutron.tests import base
CONF = cfg.CONF
CONF.import_opt('state_path', 'neutron.common.config')
class ServiceTestCase(sub_base.SubBaseTestCase):
class ServiceTestCase(base.DietTestCase):
# the class cannot be based on BaseTestCase since it mocks rpc.Connection
def setUp(self):

View File

@ -30,7 +30,6 @@ from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
DEFAULT_SERVICE_DEFS = [{'service_class': constants.DUMMY,
@ -160,8 +159,7 @@ class TestServiceTypeExtensionManager(object):
return []
class ServiceTypeExtensionTestCaseBase(testlib_api.WebTestCase,
testlib_plugin.PluginSetupHelper):
class ServiceTypeExtensionTestCaseBase(testlib_api.WebTestCase):
fmt = 'json'
def setUp(self):

View File

@ -1,76 +0,0 @@
# Copyright 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gc
import weakref
import mock
from oslo_config import cfg
from neutron.db import agentschedulers_db
from neutron import manager
from neutron.tests import base
from neutron.tests import fake_notifier
class PluginSetupHelper(object):
"""Mixin for use with testtools.TestCase."""
def cleanup_core_plugin(self):
"""Ensure that the core plugin is deallocated."""
nm = manager.NeutronManager
if not nm.has_instance():
return
# TODO(marun) Fix plugins that do not properly initialize notifiers
agentschedulers_db.AgentSchedulerDbMixin.agent_notifiers = {}
# Perform a check for deallocation only if explicitly
# configured to do so since calling gc.collect() after every
# test increases test suite execution time by ~50%.
check_plugin_deallocation = (
base.bool_from_env('OS_CHECK_PLUGIN_DEALLOCATION'))
if check_plugin_deallocation:
plugin = weakref.ref(nm._instance.plugin)
nm.clear_instance()
if check_plugin_deallocation:
gc.collect()
# TODO(marun) Ensure that mocks are deallocated?
if plugin() and not isinstance(plugin(), mock.Base):
self.fail('The plugin for this test was not deallocated.')
def setup_coreplugin(self, core_plugin=None):
self.dhcp_periodic_p = mock.patch(
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
# Plugin cleanup should be triggered last so that
# test-specific cleanup has a chance to release references.
self.addCleanup(self.cleanup_core_plugin)
if core_plugin is not None:
cfg.CONF.set_override('core_plugin', core_plugin)
class NotificationSetupHelper(object):
"""Mixin for use with testtools.TestCase."""
def setup_notification_driver(self, notification_driver=None):
self.addCleanup(fake_notifier.reset)
if notification_driver is None:
notification_driver = [fake_notifier.__name__]
cfg.CONF.set_override("notification_driver", notification_driver)