fuel-ostf/fuel_plugin/ostf_adapter/nose_plugin/nose_discovery.py

127 lines
4.6 KiB
Python

# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
from nose import plugins
from fuel_plugin.ostf_adapter.nose_plugin import nose_test_runner
from fuel_plugin.ostf_adapter.nose_plugin import nose_utils
from fuel_plugin.ostf_adapter.storage import engine, models
from fuel_plugin.ostf_adapter \
.nose_plugin.nose_utils import process_deployment_tags
LOG = logging.getLogger(__name__)
class DiscoveryPlugin(plugins.Plugin):
enabled = True
name = 'discovery'
score = 15000
def __init__(self, session, deployment_info):
self.test_sets = {}
self.session = session
self.deployment_info = deployment_info
super(DiscoveryPlugin, self).__init__()
def options(self, parser, env=os.environ):
pass
def configure(self, options, conf):
pass
def afterImport(self, filename, module):
module = __import__(module, fromlist=[module])
LOG.info('Inspecting %s', filename)
if hasattr(module, '__profile__'):
profile = module.__profile__
profile['deployment_tags'] = [
tag.lower() for tag in profile.get('deployment_tags', [])
]
if process_deployment_tags(
self.deployment_info['deployment_tags'],
profile['deployment_tags']
):
if profile['id'] == "alternative_depl_tags_test":
LOG.info(profile)
profile['cluster_id'] = self.deployment_info['cluster_id']
with self.session.begin(subtransactions=True):
LOG.info('%s discovered.', module.__name__)
test_set = models.TestSet(**profile)
test_set = self.session.merge(test_set)
self.session.add(test_set)
self.test_sets[test_set.id] = test_set
def addSuccess(self, test):
test_id = test.id()
for test_set_id in self.test_sets.keys():
if test_set_id in test_id:
with self.session.begin(subtransactions=True):
data = dict()
data['cluster_id'] = self.deployment_info['cluster_id']
(data['title'], data['description'],
data['duration'], data['deployment_tags']) = \
nose_utils.get_description(test)
if process_deployment_tags(
self.deployment_info['deployment_tags'],
data['deployment_tags']
):
data.update(
{
'test_set_id': test_set_id,
'name': test_id
}
)
#merge doesn't work here so we must check
#tests existing with such test_set_id and cluster_id
#so we won't ended up with dublicating data upon tests
#in db.
_cluster_id = self.test_sets[test_set_id].cluster_id
tests = self.session.query(models.Test)\
.filter_by(cluster_id=_cluster_id)\
.filter_by(test_set_id=test_set_id)\
.filter_by(test_run_id=None)\
.filter_by(name=data['name'])\
.first()
if not tests:
LOG.info('%s added for %s', test_id, test_set_id)
test_obj = models.Test(**data)
self.session.add(test_obj)
def discovery(session, path, deployment_info=None):
"""Will discover all tests on provided path and save info in db
"""
deployment_info = deployment_info if deployment_info else dict()
LOG.info('Starting discovery for %r.', path)
nose_test_runner.SilentTestProgram(
addplugins=[DiscoveryPlugin(session, deployment_info)],
exit=False,
argv=['tests_discovery', '--collect-only', path]
)