Fix unit tests

Unit tests were fixed in order to support new system behaviour.
Unit tests cannot be executed alongside with integration. This issue
will be resolved in future patches.

Change-Id: Ie247327cd66bbec6e5eacb9c5f8ac280f8e3576e
This commit is contained in:
Artem Roma 2013-11-07 16:24:45 +02:00
parent ebffe0c075
commit 4378d36d0e
4 changed files with 177 additions and 332 deletions

View File

@ -57,16 +57,17 @@ def main():
return nailgun_hooks.after_initialization_environment_hook()
#performing cleaning of expired data (if any) in db
clean_db()
clean_db(engine.get_engine())
#discover testsets and their tests
CORE_PATH = pecan.conf.debug_tests if \
pecan.conf.get('debug_tests') else 'fuel_health'
discovery(path=CORE_PATH, session=engine.get_session())
session = engine.get_session()
discovery(path=CORE_PATH, session=session)
#cache needed data from test repository
cache_data()
cache_data(session)
host, port = pecan.conf.server.host, pecan.conf.server.port
srv = pywsgi.WSGIServer((host, int(port)), root)

View File

@ -18,7 +18,12 @@ from mock import patch, MagicMock
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from fuel_plugin.ostf_adapter.storage import engine
from fuel_plugin.utils.utils import cache_data
from fuel_plugin.ostf_adapter.nose_plugin.nose_discovery import discovery
from fuel_plugin.ostf_adapter.storage import models
from fuel_plugin.ostf_adapter.storage import simple_cache
TEST_PATH = 'fuel_plugin/tests/functional/dummy_tests'
class BaseWSGITest(unittest2.TestCase):
@ -30,6 +35,38 @@ class BaseWSGITest(unittest2.TestCase):
'postgresql+psycopg2://ostf:ostf@localhost/ostf'
)
cls.Session.configure(bind=cls.engine, autocommit=True)
session = cls.Session()
discovery(path=TEST_PATH, session=session)
cls.ext_id = 'fuel_plugin.tests.functional.dummy_tests.'
cls.expected = {
'cluster': {
'id': 1,
'deployment_tags': set(['ha', 'rhel', 'nova_network'])
},
'test_sets': ['general_test',
'stopped_test', 'ha_deployment_test'],
'tests': [cls.ext_id + test for test in [
('deployment_types_tests.ha_deployment_test.'
'HATest.test_ha_depl'),
('deployment_types_tests.ha_deployment_test.'
'HATest.test_ha_rhel_depl'),
'general_test.Dummy_test.test_fast_pass',
'general_test.Dummy_test.test_long_pass',
'general_test.Dummy_test.test_fast_fail',
'general_test.Dummy_test.test_fast_error',
'general_test.Dummy_test.test_fail_with_step',
'stopped_test.dummy_tests_stopped.test_really_long',
'stopped_test.dummy_tests_stopped.test_one_no_so_long',
'stopped_test.dummy_tests_stopped.test_not_long_at_all'
]]
}
@classmethod
def tearDownClass(cls):
cls.engine.execute('delete from test_sets')
def setUp(self):
#orm session wrapping
self.connection = self.engine.connect()
@ -39,6 +76,7 @@ class BaseWSGITest(unittest2.TestCase):
bind=self.connection
)
self.session = self.Session(autocommit=True)
cache_data(self.session)
#mocking
#request mocking
@ -74,3 +112,30 @@ class BaseWSGITest(unittest2.TestCase):
#end of test_case patching
self.request_patcher.stop()
self.pecan_conf_patcher.stop()
simple_cache.TEST_REPOSITORY = []
@property
def is_background_working(self):
is_working = True
with self.session.begin(subtransactions=True):
cluster_state = self.session.query(models.ClusterState)\
.filter_by(id=self.expected['cluster']['id'])\
.one()
is_working = is_working and set(cluster_state.deployment_tags) == \
self.expected['cluster']['deployment_tags']
cluster_testing_patterns = self.session\
.query(models.ClusterTestingPattern)\
.filter_by(cluster_id=self.expected['cluster']['id'])\
.all()
for testing_pattern in cluster_testing_patterns:
is_working = is_working and \
(testing_pattern.test_set_id in self.expected['test_sets'])
is_working = is_working and set(testing_pattern.tests)\
.issubset(set(self.expected['tests']))
return is_working

View File

@ -13,7 +13,7 @@
# under the License.
import json
from mock import patch
from mock import patch, Mock
import unittest2
@ -24,169 +24,85 @@ from fuel_plugin.tests.unit.base \
import BaseWSGITest
@unittest2.skip('Refactoring is needed')
class TestTestsController(BaseWSGITest):
@classmethod
def setUpClass(cls):
super(TestTestsController, cls).setUpClass()
def setUp(self):
super(TestTestsController, self).setUp()
self.controller = controllers.TestsController()
def test_get(self):
expected = {
'cluster': {
'id': 1,
'deployment_tags': set(['ha', 'rhel', 'nova_network'])
},
'tests': [
{
'testset': 'ha_deployment_test',
'name': 'fake empty test',
'id': ('fuel_plugin.tests.functional.dummy_tests.'
'deployment_types_tests.ha_deployment_test.'
'HATest.test_ha_depl'),
'description': (' This is empty test for any\n'
' ha deployment\n '),
},
{
'testset': 'ha_deployment_test',
'name': 'fake empty test',
'id': ('fuel_plugin.tests.functional.dummy_tests.'
'deployment_types_tests.ha_deployment_test.'
'HATest.test_ha_rhel_depl'),
'description': (' This is fake tests for ha\n'
' rhel deployment\n ')
}
]
}
res = self.controller.get(self.expected['cluster']['id'])
res = self.controller.get(expected['cluster']['id'])
self.assertTrue(self.is_background_working)
with self.session.begin(subtransactions=True):
cluster_state = self.session.query(models.ClusterState)\
.filter_by(id=expected['cluster']['id'])\
.one()
self.assertEqual(set(cluster_state.deployment_tags),
expected['cluster']['deployment_tags'])
cluster_testing_pattern = self.session\
.query(models.ClusterTestingPattern)\
.filter_by(cluster_id=expected['cluster']['id'])\
.filter_by(test_set_id=expected['tests'][0]['testset'])\
.one()
self.assertEqual(
set([test['name'] for test in expected['tests']]),
set(cluster_testing_pattern.tests)
)
self.assertEqual(res, expected['frontend'])
self.assertTrue(len(res) == len(self.expected['tests']))
self.assertTrue(
sorted([test['id'] for test in res]),
sorted(self.expected['tests'])
)
@unittest2.skip('Refactoring is needed')
class TestTestSetsController(BaseWSGITest):
@classmethod
def setUpClass(cls):
super(TestTestSetsController, cls).setUpClass()
def setUp(self):
super(TestTestSetsController, self).setUp()
self.controller = controllers.TestsetsController()
def tearDown(self):
super(TestTestSetsController, self).tearDown()
def test_get(self):
expected = {
'cluster_id': 1,
'frontend': [
{
'id': 'ha_deployment_test',
'name': 'Fake tests for HA deployment'
}
]
}
self.expected['test_set_description'] = [
'General fake tests',
'Long running 25 secs fake tests',
'Fake tests for HA deployment'
]
res = self.controller.get(self.expected['cluster']['id'])
#patch CORE_PATH from nose_discovery in order
#to process only testing data
self.assertTrue(self.is_background_working)
#haven't found more beautiful way to mock
#discovery function in wsgi_utils
def discovery_mock(**kwargs):
kwargs['path'] = TEST_PATH
return discovery(**kwargs)
with patch(
('fuel_plugin.ostf_adapter.wsgi.wsgi_utils.'
'nose_discovery.discovery'),
discovery_mock
):
with patch(
'fuel_plugin.ostf_adapter.wsgi.wsgi_utils.conf',
self.pecan_conf_mock
):
res = self.controller.get(expected['cluster_id'])
self.assertEqual(res, expected['frontend'])
self.assertTrue(
sorted([testset['id'] for testset in res]) ==
sorted(self.expected['test_sets'])
)
self.assertTrue(
sorted([testset['name'] for testset in res]) ==
sorted(self.expected['test_set_description'])
)
@unittest2.skip('Refactoring is needed')
class TestTestRunsController(BaseWSGITest):
@classmethod
def setUpClass(cls):
super(TestTestRunsController, cls).setUpClass()
def setUp(self):
super(TestTestRunsController, self).setUp()
#test_runs depends on tests and test_sets data
#in database so we must execute discovery function
#in setUp in order to provide this data
depl_info = {
'cluster_id': 1,
'deployment_tags': set([
'ha',
'rhel'
])
}
controllers.TestsetsController().get(self.expected['cluster']['id'])
discovery(
session=self.session,
deployment_info=depl_info,
path=TEST_PATH
)
self.testruns = [
self.request_mock.body = json.dumps([
{
'testset': 'ha_deployment_test',
'metadata': {'cluster_id': 1}
}
]
}]
)
self.controller = controllers.TestrunsController()
self.plugin_mock = Mock()
self.plugin_mock.kill.return_value = True
self.nose_plugin_patcher = patch(
'fuel_plugin.ostf_adapter.storage.models.nose_plugin.get_plugin',
lambda *args: self.plugin_mock
)
self.nose_plugin_patcher.start()
def tearDown(self):
super(TestTestRunsController, self).tearDown()
self.nose_plugin_patcher.stop()
class TestTestRunsPostController(TestTestRunsController):
@classmethod
def setUpClass(cls):
super(TestTestRunsController, cls).setUpClass()
def setUp(self):
super(TestTestRunsPostController, self).setUp()
def tearDown(self):
super(TestTestRunsPostController, self).tearDown()
def test_post(self):
expected = {
self.expected['testrun_post'] = {
'testset': 'ha_deployment_test',
'status': 'running',
'cluster_id': 1,
@ -202,31 +118,23 @@ class TestTestRunsPostController(TestTestRunsController):
}
}
with patch(
'fuel_plugin.ostf_adapter.wsgi.controllers.request.body',
json.dumps(self.testruns)
):
res = self.controller.post()[0]
res = self.controller.post()[0]
#checking wheter controller is working properly
#by testing its blackbox behaviour
for key in expected.keys():
for key in self.expected['testrun_post'].keys():
if key == 'tests':
self.assertTrue(
set(expected[key]['names']) ==
set([test['id'] for test in res[key]])
sorted(self.expected['testrun_post'][key]['names']) ==
sorted([test['id'] for test in res[key]])
)
else:
self.assertTrue(expected[key] == res[key])
self.assertTrue(
self.expected['testrun_post'][key] == res[key]
)
#checking wheter all necessary writing to database
#has been performed
test_run = self.session.query(models.TestRun)\
.filter_by(test_set_id=expected['testset'])\
.filter_by(cluster_id=expected['cluster_id'])\
.first()
self.assertTrue(test_run)
.filter_by(test_set_id=self.expected['testrun_post']['testset'])\
.filter_by(cluster_id=self.expected['testrun_post']['cluster_id'])\
.one()
testrun_tests = self.session.query(models.Test)\
.filter(models.Test.test_run_id != (None))\
@ -235,51 +143,35 @@ class TestTestRunsPostController(TestTestRunsController):
tests_names = [
test.name for test in testrun_tests
]
self.assertTrue(set(tests_names) == set(expected['tests']['names']))
self.assertTrue(
all(
[test.status == 'wait_running' for test in testrun_tests]
)
sorted(tests_names) ==
sorted(self.expected['testrun_post']['tests']['names'])
)
@unittest2.skip('Is broken, fixing is appreciated')
#@unittest2.skip('Is broken, fixing is appreciated')
class TestTestRunsPutController(TestTestRunsController):
@classmethod
def setUpClass(cls):
super(TestTestRunsPutController, cls).setUpClass()
def setUp(self):
super(TestTestRunsPutController, self).setUp()
self.test_run = self.controller.post()[0]
self.nose_adapter_session_patcher = patch(
('fuel_plugin.ostf_adapter.nose_plugin.'
'nose_adapter.engine.get_session'),
lambda *args: self.session
with self.session.begin(subtransactions=True):
self.session.query(models.Test)\
.filter_by(test_run_id=int(self.test_run['id']))\
.update({'status': 'running'})
self.request_mock.body = json.dumps(
[{
'status': 'stopped',
'id': self.test_run['id']
}]
)
self.nose_adapter_session_patcher.start()
#this test_case needs data on particular test_run
#already present in database. That is suppotred by
#following code
with patch(
'fuel_plugin.ostf_adapter.wsgi.controllers.request.body',
json.dumps(self.testruns)
):
self.stored_test_run = self.controller.post()[0]
def tearDown(self):
self.nose_adapter_session_patcher.stop()
super(TestTestRunsPutController, self).tearDown()
def test_put_stopped(self):
expected = {
'id': int(self.stored_test_run['id']),
'testset': self.stored_test_run['testset'],
'status': 'running', # seems like incorrect !!!!!!!!!
self.expected['testrun_put'] = {
'id': int(self.test_run['id']),
'testset': 'ha_deployment_test',
'cluster_id': 1,
'tests': {
'names': [
@ -293,41 +185,30 @@ class TestTestRunsPutController(TestTestRunsController):
}
}
testruns_to_stop = [
{
'id': int(self.stored_test_run['id']),
'metadata': {
'cluster_id': int(self.stored_test_run['cluster_id'])
},
'status': 'stopped'
}
]
res = self.controller.put()[0]
with patch(
'fuel_plugin.ostf_adapter.wsgi.controllers.request.body',
json.dumps(testruns_to_stop)
):
res = self.controller.put()[0]
#checking wheter controller is working properly
#by testing its blackbox behaviour
for key in expected.keys():
for key in self.expected['testrun_put'].keys():
if key == 'tests':
self.assertTrue(
set(expected[key]['names']) ==
set([test['id'] for test in res[key]])
sorted(self.expected['testrun_put'][key]['names']) ==
sorted([test['id'] for test in res[key]])
)
else:
self.assertTrue(expected[key] == res[key])
self.assertTrue(
self.expected['testrun_put'][key] == res[key]
)
testrun_tests = self.session.query(models.Test)\
.filter(models.Test.test_run_id == expected['id'])\
.filter_by(test_run_id=self.expected['testrun_put']['id'])\
.all()
tests_names = [
test.name for test in testrun_tests
]
self.assertTrue(set(tests_names) == set(expected['tests']['names']))
self.assertTrue(
sorted(tests_names) ==
sorted(self.expected['testrun_put']['tests']['names'])
)
self.assertTrue(
all(
@ -336,71 +217,37 @@ class TestTestRunsPutController(TestTestRunsController):
)
@unittest2.skip('Refactoring is needed')
class TestClusterRedployment(BaseWSGITest):
@classmethod
def setUpClass(cls):
super(TestClusterRedployment, cls).setUpClass()
class TestClusterRedeployment(BaseWSGITest):
def setUp(self):
super(TestClusterRedployment, self).setUp()
super(TestClusterRedeployment, self).setUp()
self.controller = controllers.TestsetsController()
def tearDown(self):
super(TestClusterRedployment, self).tearDown()
self.controller.get(self.expected['cluster']['id'])
def test_cluster_redeployment_with_different_tags(self):
expected = {
'cluster_id': 1,
'old_test_set_id': 'ha_deployment_test',
'new_test_set_id': 'multinode_deployment_test',
'old_depl_tags': set(['ha', 'rhel', 'nova_network']),
'new_depl_tags': set(['multinode', 'ubuntu', 'nova_network'])
self.expected = {
'cluster': {
'id': 1,
'deployment_tags': set(['multinode', 'ubuntu', 'nova_network'])
},
'test_sets': ['general_test',
'stopped_test', 'multinode_deployment_test'],
'tests': [self.ext_id + test for test in [
('deployment_types_tests.multinode_deployment_test.'
'MultinodeTest.test_multi_novanet_depl'),
('deployment_types_tests.multinode_deployment_test.'
'MultinodeTest.test_multi_depl'),
'general_test.Dummy_test.test_fast_pass',
'general_test.Dummy_test.test_long_pass',
'general_test.Dummy_test.test_fast_fail',
'general_test.Dummy_test.test_fast_error',
'general_test.Dummy_test.test_fail_with_step',
'stopped_test.dummy_tests_stopped.test_really_long',
'stopped_test.dummy_tests_stopped.test_one_no_so_long',
'stopped_test.dummy_tests_stopped.test_not_long_at_all'
]]
}
def discovery_mock(**kwargs):
kwargs['path'] = TEST_PATH
return discovery(**kwargs)
#start discoverying for testsets and tests for given cluster info
with patch(
('fuel_plugin.ostf_adapter.wsgi.'
'wsgi_utils.nose_discovery.discovery'),
discovery_mock
):
with patch(
'fuel_plugin.ostf_adapter.wsgi.wsgi_utils.conf',
self.pecan_conf_mock
):
self.controller.get(expected['cluster_id'])
cluster_state = self.session.query(models.ClusterState)\
.filter_by(id=expected['cluster_id'])\
.first()
if not cluster_state:
raise AssertionError(
'There must be info about current cluster state in db'
)
self.assertEqual(
set(cluster_state.deployment_tags),
set(expected['old_depl_tags'])
)
test_set = self.session.query(models.TestSet)\
.filter_by(id=expected['old_test_set_id'])\
.filter_by(cluster_id=expected['cluster_id'])\
.first()
deployment_tags = test_set.deployment_tags \
if test_set.deployment_tags else []
self.assertTrue(
set(deployment_tags).issubset(expected['old_depl_tags'])
)
#patch request_to_nailgun function in orded to emulate
#redeployment of cluster
cluster_data = set(
@ -412,70 +259,6 @@ class TestClusterRedployment(BaseWSGITest):
'wsgi_utils._get_cluster_depl_tags'),
lambda *args: cluster_data
):
with patch(
('fuel_plugin.ostf_adapter.wsgi.'
'wsgi_utils.nose_discovery.discovery'),
discovery_mock
):
with patch(
'fuel_plugin.ostf_adapter.wsgi.wsgi_utils.conf',
self.pecan_conf_mock
):
self.controller.get(expected['cluster_id'])
self.controller.get(self.expected['cluster']['id'])
new_cluster_state = self.session.query(models.ClusterState)\
.filter_by(id=expected['cluster_id'])\
.first()
self.assertEqual(
set(new_cluster_state.deployment_tags),
expected['new_depl_tags']
)
#check whether testset and bound with
#it test have been deleted from db
old_test_set = self.session.query(models.TestSet)\
.filter_by(id=expected['old_test_set_id'])\
.filter_by(cluster_id=expected['cluster_id'])\
.first()
if old_test_set:
raise AssertionError(
"There must not be test_set for old deployment in db"
)
old_tests = self.session.query(models.Test)\
.filter_by(test_set_id=expected['old_test_set_id'])\
.filter_by(cluster_id=expected['cluster_id'])\
.all()
if old_tests:
raise AssertionError(
"There must not be tests for old deployment in db"
)
#check whether new test set and tests are present in db
#after 'redeployment' of cluster
new_test_set = self.session.query(models.TestSet)\
.filter_by(id=expected['new_test_set_id'])\
.filter_by(cluster_id=expected['cluster_id'])\
.first()
self.assertTrue(new_test_set)
deployment_tags = new_test_set.deployment_tags \
if new_test_set.deployment_tags else []
self.assertTrue(
set(deployment_tags).issubset(expected['new_depl_tags'])
)
new_tests = self.session.query(models.Test)\
.filter_by(test_set_id=expected['new_test_set_id'])\
.filter_by(cluster_id=expected['cluster_id'])\
.all()
self.assertTrue(new_tests)
for test in new_tests:
deployment_tags = test.deployment_tags \
if test.deployment_tags else []
self.assertTrue(
set(deployment_tags).issubset(expected['new_depl_tags'])
)
self.assertTrue(self.is_background_working)

View File

@ -14,12 +14,10 @@
from sqlalchemy.orm import joinedload
from fuel_plugin.ostf_adapter.storage import engine, models, simple_cache
from fuel_plugin.ostf_adapter.storage import models, simple_cache
def clean_db():
eng = engine.get_engine()
def clean_db(eng):
conn = eng.connect()
conn.execute('delete from cluster_testing_pattern;')
@ -29,9 +27,7 @@ def clean_db():
conn.close()
def cache_data():
session = engine.get_session()
def cache_data(session):
with session.begin(subtransactions=True):
test_repository = session.query(models.TestSet)\
.options(joinedload('tests'))\