Complete session hadling refactoring
All parts of system have been refactored. Session creation has been simplified. Now session is created only when it is desperately needed and its using is maximized instead of uncontrollable producing for each and every block of code which is supposed to communicate with db. This refactoring simplifies db data transfer management and makes it more explicit for further developing. Unit and functional tests have been updated. Change-Id: I15a980432f9da5cac5f04b1f4f4af64d1a002b60
This commit is contained in:
parent
fbf9be7c2f
commit
b296839ccc
|
@ -56,14 +56,14 @@ def main():
|
|||
if getattr(cli_args, 'after_init_hook'):
|
||||
return nailgun_hooks.after_initialization_environment_hook()
|
||||
|
||||
with engine.contexted_session(pecan.conf.dbpath) as session:
|
||||
#performing cleaning of expired data (if any) in db
|
||||
mixins.clean_db(engine.get_engine())
|
||||
mixins.clean_db(session)
|
||||
|
||||
#discover testsets and their tests
|
||||
CORE_PATH = pecan.conf.debug_tests if \
|
||||
pecan.conf.get('debug_tests') else 'fuel_health'
|
||||
|
||||
session = engine.get_session()
|
||||
nose_discovery.discovery(path=CORE_PATH, session=session)
|
||||
|
||||
#cache needed data from test repository
|
||||
|
|
|
@ -16,10 +16,13 @@
|
|||
import requests
|
||||
from pecan import conf
|
||||
from sqlalchemy.orm import joinedload
|
||||
import logging
|
||||
|
||||
from fuel_plugin.ostf_adapter.storage import models, engine
|
||||
from fuel_plugin.ostf_adapter.storage import models
|
||||
from fuel_plugin.ostf_adapter.nose_plugin import nose_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
REQ_SES = requests.Session()
|
||||
REQ_SES.trust_env = False
|
||||
|
||||
|
@ -29,18 +32,15 @@ NAILGUN_API_URL = 'api/clusters/{0}'
|
|||
TEST_REPOSITORY = []
|
||||
|
||||
|
||||
def clean_db(eng):
|
||||
conn = eng.connect()
|
||||
def clean_db(session):
|
||||
session.query(models.ClusterTestingPattern).delete()
|
||||
session.query(models.ClusterState).delete()
|
||||
session.query(models.TestSet).delete()
|
||||
|
||||
conn.execute('delete from cluster_testing_pattern;')
|
||||
conn.execute('delete from cluster_state;')
|
||||
conn.execute('delete from test_sets;')
|
||||
|
||||
conn.close()
|
||||
session.commit()
|
||||
|
||||
|
||||
def cache_test_repository(session):
|
||||
with session.begin(subtransactions=True):
|
||||
test_repository = session.query(models.TestSet)\
|
||||
.options(joinedload('tests'))\
|
||||
.all()
|
||||
|
@ -69,13 +69,11 @@ def discovery_check(session, cluster):
|
|||
'deployment_tags': cluster_deployment_args
|
||||
}
|
||||
|
||||
with session.begin(subtransactions=True):
|
||||
cluster_state = session.query(models.ClusterState)\
|
||||
.filter_by(id=cluster_data['cluster_id'])\
|
||||
.first()
|
||||
|
||||
if not cluster_state:
|
||||
with session.begin(subtransactions=True):
|
||||
session.add(
|
||||
models.ClusterState(
|
||||
id=cluster_data['cluster_id'],
|
||||
|
@ -83,24 +81,20 @@ def discovery_check(session, cluster):
|
|||
)
|
||||
)
|
||||
|
||||
with session.begin(subtransactions=True):
|
||||
#flush data to db, cuz _add_cluster_testing_pattern
|
||||
#is dependent on it
|
||||
session.flush()
|
||||
|
||||
_add_cluster_testing_pattern(session, cluster_data)
|
||||
|
||||
return
|
||||
|
||||
old_deployment_tags = cluster_state.deployment_tags
|
||||
if set(old_deployment_tags) != cluster_data['deployment_tags']:
|
||||
with session.begin(subtransactions=True):
|
||||
#delete testruns and their tests if cluster was redeployed
|
||||
session.query(models.ClusterTestingPattern)\
|
||||
.filter_by(cluster_id=cluster_state.id)\
|
||||
.delete()
|
||||
|
||||
#separate block "with" is need here to resolve
|
||||
#situation where previous deletion blocks table
|
||||
#that is using in following update
|
||||
with session.begin(subtransactions=True):
|
||||
#make "rediscovering" of testsets for redeployed cluster
|
||||
_add_cluster_testing_pattern(session, cluster_data)
|
||||
|
||||
cluster_state.deployment_tags = \
|
||||
|
@ -166,7 +160,6 @@ def _get_cluster_depl_tags(cluster_id):
|
|||
|
||||
|
||||
def _add_cluster_testing_pattern(session, cluster_data):
|
||||
with session.begin(subtransactions=True):
|
||||
to_database = []
|
||||
for test_set in TEST_REPOSITORY:
|
||||
if nose_utils.process_deployment_tags(
|
||||
|
|
|
@ -19,7 +19,7 @@ from pecan import conf
|
|||
from fuel_plugin.ostf_adapter.nose_plugin import nose_storage_plugin
|
||||
from fuel_plugin.ostf_adapter.nose_plugin import nose_test_runner
|
||||
from fuel_plugin.ostf_adapter.nose_plugin import nose_utils
|
||||
from fuel_plugin.ostf_adapter.storage import storage_utils, engine, models
|
||||
from fuel_plugin.ostf_adapter.storage import engine, models
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -29,9 +29,6 @@ class NoseDriver(object):
|
|||
def __init__(self):
|
||||
LOG.warning('Initializing Nose Driver')
|
||||
self._named_threads = {}
|
||||
session = engine.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
storage_utils.update_all_running_test_runs(session)
|
||||
|
||||
def check_current_running(self, unique_id):
|
||||
return unique_id in self._named_threads
|
||||
|
@ -46,14 +43,19 @@ class NoseDriver(object):
|
|||
argv_add = [test_set.test_path] + test_set.additional_arguments
|
||||
|
||||
self._named_threads[test_run.id] = nose_utils.run_proc(
|
||||
self._run_tests, test_run.id, test_run.cluster_id, argv_add)
|
||||
self._run_tests,
|
||||
conf.dbpath,
|
||||
test_run.id,
|
||||
test_run.cluster_id,
|
||||
argv_add
|
||||
)
|
||||
|
||||
def _run_tests(self, test_run_id, cluster_id, argv_add):
|
||||
session = engine.get_session()
|
||||
def _run_tests(self, dbpath, test_run_id, cluster_id, argv_add):
|
||||
with engine.contexted_session(dbpath) as session:
|
||||
try:
|
||||
nose_test_runner.SilentTestProgram(
|
||||
addplugins=[nose_storage_plugin.StoragePlugin(
|
||||
test_run_id, str(cluster_id))],
|
||||
session, test_run_id, str(cluster_id))],
|
||||
exit=False,
|
||||
argv=['ostf_tests'] + argv_add)
|
||||
self._named_threads.pop(int(test_run_id), None)
|
||||
|
@ -63,8 +65,7 @@ class NoseDriver(object):
|
|||
models.TestRun.update_test_run(
|
||||
session, test_run_id, status='finished')
|
||||
|
||||
def kill(self, test_run_id, cluster_id, cleanup=None):
|
||||
session = engine.get_session()
|
||||
def kill(self, session, test_run_id, cluster_id, cleanup=None):
|
||||
if test_run_id in self._named_threads:
|
||||
|
||||
try:
|
||||
|
@ -83,6 +84,7 @@ class NoseDriver(object):
|
|||
if cleanup:
|
||||
nose_utils.run_proc(
|
||||
self._clean_up,
|
||||
conf.dbpath,
|
||||
test_run_id,
|
||||
cluster_id,
|
||||
cleanup)
|
||||
|
@ -93,9 +95,8 @@ class NoseDriver(object):
|
|||
return True
|
||||
return False
|
||||
|
||||
def _clean_up(self, test_run_id, cluster_id, cleanup):
|
||||
session = engine.get_session()
|
||||
|
||||
def _clean_up(self, dbpath, test_run_id, cluster_id, cleanup):
|
||||
with engine.contexted_session(dbpath) as session:
|
||||
#need for performing proper cleaning up for current cluster
|
||||
cluster_deployment_info = \
|
||||
session.query(models.ClusterState.deployment_tags)\
|
||||
|
|
|
@ -52,11 +52,13 @@ class DiscoveryPlugin(plugins.Plugin):
|
|||
tag.lower() for tag in profile.get('deployment_tags', [])
|
||||
]
|
||||
|
||||
with self.session.begin(subtransactions=True):
|
||||
try:
|
||||
test_set = models.TestSet(**profile)
|
||||
self.session.merge(test_set)
|
||||
self.test_sets[test_set.id] = test_set
|
||||
|
||||
#flush test_sets data into db
|
||||
self.session.commit()
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
('An error has occured while processing'
|
||||
|
@ -70,8 +72,6 @@ class DiscoveryPlugin(plugins.Plugin):
|
|||
test_id = test.id()
|
||||
for test_set_id in self.test_sets.keys():
|
||||
if test_set_id in test_id:
|
||||
with self.session.begin(subtransactions=True):
|
||||
|
||||
data = dict()
|
||||
|
||||
(data['title'], data['description'],
|
||||
|
@ -88,6 +88,9 @@ class DiscoveryPlugin(plugins.Plugin):
|
|||
try:
|
||||
test_obj = models.Test(**data)
|
||||
self.session.merge(test_obj)
|
||||
|
||||
#flush tests data into db
|
||||
self.session.commit()
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
('An error has occured while '
|
||||
|
|
|
@ -20,7 +20,7 @@ from pecan import conf
|
|||
import unittest2
|
||||
|
||||
from fuel_plugin.ostf_adapter.nose_plugin import nose_utils
|
||||
from fuel_plugin.ostf_adapter.storage import models, engine
|
||||
from fuel_plugin.ostf_adapter.storage import models
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -31,15 +31,13 @@ class StoragePlugin(plugins.Plugin):
|
|||
name = 'storage'
|
||||
score = 15000
|
||||
|
||||
def __init__(
|
||||
self, test_run_id, cluster_id):
|
||||
def __init__(self, session, test_run_id, cluster_id):
|
||||
self.session = session
|
||||
self.test_run_id = test_run_id
|
||||
self.cluster_id = cluster_id
|
||||
super(StoragePlugin, self).__init__()
|
||||
self._start_time = None
|
||||
|
||||
self.session = engine.get_session()
|
||||
|
||||
def options(self, parser, env=os.environ):
|
||||
env['NAILGUN_HOST'] = str(conf.nailgun.host)
|
||||
env['NAILGUN_PORT'] = str(conf.nailgun.port)
|
||||
|
@ -64,8 +62,6 @@ class StoragePlugin(plugins.Plugin):
|
|||
data['step'], data['message'] = \
|
||||
nose_utils.format_failure_message(exc_value)
|
||||
|
||||
with self.session.begin(subtransactions=True):
|
||||
|
||||
tests_to_update = nose_utils.get_tests_ids_to_update(test)
|
||||
|
||||
for test_id in tests_to_update:
|
||||
|
@ -75,6 +71,7 @@ class StoragePlugin(plugins.Plugin):
|
|||
test_id,
|
||||
data
|
||||
)
|
||||
self.session.commit()
|
||||
|
||||
def addSuccess(self, test, capt=None):
|
||||
self._add_message(test, status='success')
|
||||
|
|
|
@ -12,48 +12,22 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pecan import conf
|
||||
from sqlalchemy import create_engine, orm, pool
|
||||
from contextlib import contextmanager
|
||||
from sqlalchemy import create_engine, orm
|
||||
|
||||
|
||||
_ENGINE = None
|
||||
_MAKER = None
|
||||
_REDISINST = None
|
||||
@contextmanager
|
||||
def contexted_session(dbpath):
|
||||
'''Allows to handle session via context manager
|
||||
'''
|
||||
engine = create_engine(dbpath)
|
||||
session = orm.Session(bind=engine)
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False):
|
||||
"""Return a SQLAlchemy session."""
|
||||
global _MAKER
|
||||
global _SLAVE_MAKER
|
||||
maker = _MAKER
|
||||
|
||||
if maker is None:
|
||||
engine = get_engine()
|
||||
maker = get_maker(engine, autocommit, expire_on_commit)
|
||||
|
||||
else:
|
||||
_MAKER = maker
|
||||
|
||||
session = maker()
|
||||
return session
|
||||
|
||||
|
||||
def get_engine(dbpath=None, pool_type=None):
|
||||
"""Return a SQLAlchemy engine."""
|
||||
global _ENGINE
|
||||
engine = _ENGINE
|
||||
|
||||
if engine is None:
|
||||
dbpath = dbpath if dbpath is not None else conf.dbpath
|
||||
engine = create_engine(dbpath,
|
||||
poolclass=pool_type or pool.NullPool)
|
||||
_ENGINE = engine
|
||||
return engine
|
||||
|
||||
|
||||
def get_maker(engine, autocommit=True, expire_on_commit=False):
|
||||
"""Return a SQLAlchemy sessionmaker using the given engine."""
|
||||
return orm.sessionmaker(
|
||||
bind=engine,
|
||||
autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit)
|
||||
try:
|
||||
yield session
|
||||
session.commit()
|
||||
except Exception:
|
||||
session.rollback()
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
|
|
@ -250,13 +250,8 @@ class TestRun(BASE):
|
|||
|
||||
@property
|
||||
def enabled_tests(self):
|
||||
session = engine.get_session()
|
||||
tests = session.query(Test)\
|
||||
.filter_by(test_run_id=self.id)\
|
||||
.order_by(Test.name)
|
||||
|
||||
return [test.name for test
|
||||
in tests if test.status != 'disabled']
|
||||
in self.tests if test.status != 'disabled']
|
||||
|
||||
def is_finished(self):
|
||||
return self.status == 'finished'
|
||||
|
@ -372,17 +367,16 @@ class TestRun(BASE):
|
|||
if cls.is_last_running(session, test_set.id,
|
||||
metadata['cluster_id']):
|
||||
|
||||
with session.begin(subtransactions=True):
|
||||
test_run = cls.add_test_run(
|
||||
session, test_set.id,
|
||||
metadata['cluster_id'], tests=tests)
|
||||
|
||||
retvalue = test_run.frontend
|
||||
session.close()
|
||||
#flush test_run data to db
|
||||
session.flush()
|
||||
|
||||
plugin.run(test_run, test_set)
|
||||
|
||||
return retvalue
|
||||
return test_run.frontend
|
||||
return {}
|
||||
|
||||
def restart(self, session, tests=None):
|
||||
|
@ -406,7 +400,7 @@ class TestRun(BASE):
|
|||
"""
|
||||
plugin = nose_plugin.get_plugin(self.test_set.driver)
|
||||
killed = plugin.kill(
|
||||
self.id, self.cluster_id,
|
||||
session, self.id, self.cluster_id,
|
||||
cleanup=self.test_set.cleanup_path)
|
||||
if killed:
|
||||
Test.update_running_tests(
|
||||
|
|
|
@ -48,7 +48,8 @@ def setup_config(custom_pecan_config):
|
|||
|
||||
def setup_app(config=None):
|
||||
setup_config(config or {})
|
||||
app_hooks = [hooks.SessionHook(), hooks.ExceptionHandling()]
|
||||
app_hooks = [hooks.SessionHook(dbpath=pecan.conf.dbpath),
|
||||
hooks.ExceptionHandling()]
|
||||
app = pecan.make_app(
|
||||
pecan.conf.app.root,
|
||||
debug=pecan.conf.debug,
|
||||
|
|
|
@ -45,7 +45,7 @@ class TestsetsController(BaseRestController):
|
|||
@expose('json')
|
||||
def get(self, cluster):
|
||||
mixins.discovery_check(request.session, cluster)
|
||||
with request.session.begin(subtransactions=True):
|
||||
|
||||
needed_testsets = request.session\
|
||||
.query(models.ClusterTestingPattern.test_set_id)\
|
||||
.filter_by(cluster_id=cluster)
|
||||
|
@ -65,7 +65,6 @@ class TestsController(BaseRestController):
|
|||
@expose('json')
|
||||
def get(self, cluster):
|
||||
mixins.discovery_check(request.session, cluster)
|
||||
with request.session.begin(subtransactions=True):
|
||||
needed_tests_list = request.session\
|
||||
.query(models.ClusterTestingPattern.tests)\
|
||||
.filter_by(cluster_id=cluster)
|
||||
|
@ -94,14 +93,12 @@ class TestrunsController(BaseRestController):
|
|||
|
||||
@expose('json')
|
||||
def get_all(self):
|
||||
with request.session.begin(subtransactions=True):
|
||||
test_runs = request.session.query(models.TestRun).all()
|
||||
|
||||
return [item.frontend for item in test_runs]
|
||||
|
||||
@expose('json')
|
||||
def get_one(self, test_run_id):
|
||||
with request.session.begin(subtransactions=True):
|
||||
test_run = request.session.query(models.TestRun)\
|
||||
.filter_by(id=test_run_id).first()
|
||||
if test_run and isinstance(test_run, models.TestRun):
|
||||
|
@ -110,7 +107,6 @@ class TestrunsController(BaseRestController):
|
|||
|
||||
@expose('json')
|
||||
def get_last(self, cluster_id):
|
||||
with request.session.begin(subtransactions=True):
|
||||
test_run_ids = request.session.query(func.max(models.TestRun.id)) \
|
||||
.group_by(models.TestRun.test_set_id)\
|
||||
.filter_by(cluster_id=cluster_id)
|
||||
|
@ -131,7 +127,6 @@ class TestrunsController(BaseRestController):
|
|||
metadata = test_run['metadata']
|
||||
tests = test_run.get('tests', [])
|
||||
|
||||
with request.session.begin(subtransactions=True):
|
||||
test_set = models.TestSet.get_test_set(
|
||||
request.session,
|
||||
test_set
|
||||
|
|
|
@ -13,9 +13,9 @@
|
|||
# under the License.
|
||||
|
||||
import logging
|
||||
from sqlalchemy import create_engine, orm, pool
|
||||
|
||||
from pecan import hooks
|
||||
from fuel_plugin.ostf_adapter.storage import engine
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -28,5 +28,23 @@ class ExceptionHandling(hooks.PecanHook):
|
|||
|
||||
class SessionHook(hooks.PecanHook):
|
||||
|
||||
def __init__(self, dbpath):
|
||||
self.engine = create_engine(dbpath, poolclass=pool.NullPool)
|
||||
|
||||
def before(self, state):
|
||||
state.request.session = engine.get_session()
|
||||
state.request.session = orm.Session(bind=self.engine)
|
||||
|
||||
def after(self, state):
|
||||
try:
|
||||
state.request.session.commit()
|
||||
except Exception:
|
||||
state.request.session.rollback()
|
||||
raise
|
||||
finally:
|
||||
state.request.session.close()
|
||||
|
||||
def on_error(self, state, e):
|
||||
LOG.exception('Pecan state %r', state)
|
||||
|
||||
state.session.rollback()
|
||||
state.session.close()
|
||||
|
|
|
@ -385,7 +385,7 @@ class AdapterTests(BaseAdapterTest):
|
|||
])
|
||||
|
||||
self.compare(resp, assertions)
|
||||
time.sleep(5)
|
||||
time.sleep(10)
|
||||
|
||||
resp = self.client.testruns_last(cluster_id)
|
||||
|
||||
|
@ -420,7 +420,8 @@ class AdapterTests(BaseAdapterTest):
|
|||
self.client.run_with_timeout(testset, tests, cluster_id, 70)
|
||||
self.client.restart_with_timeout(testset, tests, cluster_id, 10)
|
||||
|
||||
resp = self.client.restart_tests_last(testset, disabled_test, cluster_id)
|
||||
resp = self.client.restart_tests_last(testset, disabled_test,
|
||||
cluster_id)
|
||||
|
||||
assertions = Response([
|
||||
{
|
||||
|
|
|
@ -68,9 +68,8 @@ class BaseWSGITest(unittest2.TestCase):
|
|||
self.Session.configure(
|
||||
bind=self.connection
|
||||
)
|
||||
self.session = self.Session(autocommit=True)
|
||||
self.session = self.Session()
|
||||
|
||||
with self.session.begin(subtransactions=True):
|
||||
test_sets = self.session.query(models.TestSet).all()
|
||||
|
||||
#need this if start unit tests in conjuction with integration
|
||||
|
@ -120,7 +119,6 @@ class BaseWSGITest(unittest2.TestCase):
|
|||
def is_background_working(self):
|
||||
is_working = True
|
||||
|
||||
with self.session.begin(subtransactions=True):
|
||||
cluster_state = self.session.query(models.ClusterState)\
|
||||
.filter_by(id=self.expected['cluster']['id'])\
|
||||
.one()
|
||||
|
|
|
@ -140,7 +140,7 @@ class TestTestRunsPostController(TestTestRunsController):
|
|||
self.expected['testrun_post'][key] == res[key]
|
||||
)
|
||||
|
||||
test_run = self.session.query(models.TestRun)\
|
||||
self.session.query(models.TestRun)\
|
||||
.filter_by(test_set_id=self.expected['testrun_post']['testset'])\
|
||||
.filter_by(cluster_id=self.expected['testrun_post']['cluster_id'])\
|
||||
.one()
|
||||
|
@ -164,11 +164,13 @@ class TestTestRunsPutController(TestTestRunsController):
|
|||
super(TestTestRunsPutController, self).setUp()
|
||||
self.test_run = self.controller.post()[0]
|
||||
|
||||
with self.session.begin(subtransactions=True):
|
||||
self.session.query(models.Test)\
|
||||
.filter_by(test_run_id=int(self.test_run['id']))\
|
||||
.update({'status': 'running'})
|
||||
|
||||
#flush data which test is depend on into db
|
||||
self.session.commit()
|
||||
|
||||
self.request_mock.body = json.dumps(
|
||||
[{
|
||||
'status': 'stopped',
|
||||
|
|
Loading…
Reference in New Issue