Provide unit test coverage for process_results

Add unit test coverage for process_results.

The code that handles test_attr_list includes a typo 'sesion',
so fixing that with 'session', to pass the new unit tests.

Story: 2000698
Change-Id: Iebfb49d6e89fb55cee888e2966335630a972427a
This commit is contained in:
Andrea Frittoli (andreaf) 2016-08-15 10:19:19 +01:00
parent 3ea602d94d
commit 9dca0f2716
2 changed files with 394 additions and 4 deletions

View File

@ -181,7 +181,7 @@ def process_results(results):
if ('attr', attr) not in test_metadata:
test_meta_dict = {'attr': attr}
api.add_test_metadata(test_meta_dict, db_test.id,
sesion=session)
session=session)
api.add_test_run_metadata(results[test]['metadata'], test_run.id,
session)
if results[test]['attachments']:

View File

@ -13,13 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import sys
import tempfile
from dateutil import parser as date_parser
import fixtures
import mock
from oslo_config import cfg
import sys
import tempfile
import testtools
from subunit2sql import exceptions
from subunit2sql.migrations import cli as migration_cli
@ -214,3 +216,391 @@ class TestMain(base.TestCase):
sys.stdin, attachments=False, attr_regex='\[(.*)\]',
targets=[mock.sentinel.extension])
process_results_mock.assert_called_once_with(fake_get_results)
class TestProcessResults(base.TestCase):
def setUp(self):
super(TestProcessResults, self).setUp()
cfg.CONF.reset()
shell.cli_opts()
# Ensure the config defaults are what we need
cfg.CONF.set_override(name='run_at', override=None)
cfg.CONF.set_override(name='artifacts', override=None)
cfg.CONF.set_override(name='run_id', override=None)
# Mock the whole subunit2sql.db.api module
self.db_api_mock = self.useFixture(fixtures.MockPatch(
'subunit2sql.shell.api')).mock
# Mock session
self.fake_session = mock.Mock(name='session', spec=['close'])
self.db_api_mock.get_session.return_value = self.fake_session
# Mock get_run_totals
self.fake_totals = dict(skips=11, fails=22, success=33)
self.get_run_totals_mock = self.useFixture(
fixtures.MockPatch('subunit2sql.shell.get_run_totals',
return_value=self.fake_totals)).mock
def test_process_results_no_results(self):
fake_run_time = 'run time'
fake_results = dict(run_time=fake_run_time)
fake_db_run_id = 'run id'
fake_db_run = mock.Mock(name='db run')
fake_db_run.id = fake_db_run_id
self.db_api_mock.create_run.return_value = fake_db_run
# Setup Configuration
fake_run_at = '2016-08-17 10:58:00.000'
cfg.CONF.set_override(name='run_at', override=fake_run_at)
fake_artifacts = 'artifacts'
cfg.CONF.set_override(name='artifacts', override=fake_artifacts)
fake_run_id = 'run_id'
cfg.CONF.set_override(name='run_id', override=fake_run_id)
fake_run_meta = 'run_meta'
cfg.CONF.set_override(name='run_meta', override=fake_run_meta)
# Run process_results
shell.process_results(fake_results)
self.db_api_mock.get_session.assert_called_once()
expected_run_at = date_parser.parse(fake_run_at)
self.db_api_mock.create_run.assert_called_once_with(
self.fake_totals['skips'], self.fake_totals['fails'],
self.fake_totals['success'], fake_run_time, fake_artifacts,
id=fake_run_id, run_at=expected_run_at, session=self.fake_session)
self.db_api_mock.add_run_metadata.assert_called_once_with(
fake_run_meta, fake_db_run_id, self.fake_session)
self.fake_session.close.assert_called_once()
@mock.patch('subunit2sql.read_subunit.get_duration')
def test_process_results_new_tests(self, get_duration_mock):
fake_run_time = 'run time'
fake_results = dict(test1={'status': 'success', 'start_time': 0,
'end_time': 1, 'metadata': None,
'attachments': None},
test2={'status': 'fail', 'start_time': 0,
'end_time': 2, 'metadata': None,
'attachments': None},
test3={'status': 'skip', 'start_time': 0,
'end_time': 3, 'metadata': None,
'attachments': None})
fake_results_all = copy.deepcopy(fake_results)
fake_results_all['run_time'] = fake_run_time
# Mock create_run
fake_db_run_id = 'run id'
fake_db_run = mock.Mock(name='db run')
fake_db_run.id = fake_db_run_id
self.db_api_mock.create_run.return_value = fake_db_run
# Tests are not found in the DB
self.db_api_mock.get_test_by_test_id.return_value = None
# Mock create test
get_duration_mock.return_value = 'fake_duration'
fake_db_test_id = 'test_db_id'
fake_test_create = mock.Mock(name='db test')
fake_test_create.id = fake_db_test_id
self.db_api_mock.create_test.return_value = fake_test_create
# Run process_results
shell.process_results(fake_results_all)
# Check that we lookup all tests in the DB
expected_test_by_id_calls = [mock.call(x, self.fake_session) for x in
fake_results]
self.db_api_mock.get_test_by_test_id.assert_has_calls(
expected_test_by_id_calls, any_order=True)
# Check that all tests are created in the DB
expected_create_test_calls = [
mock.call('test1', 1, 1, 0, 'fake_duration', self.fake_session),
mock.call('test2', 1, 0, 1, 'fake_duration', self.fake_session),
mock.call('test3', 0, 0, 0, 'fake_duration', self.fake_session)]
self.db_api_mock.create_test.assert_has_calls(
expected_create_test_calls, any_order=True)
# Check that a test_run for each test is created in the DB
expected_create_test_run_calls = [
mock.call(fake_db_test_id, fake_db_run_id,
fake_results[x]['status'], fake_results[x]['start_time'],
fake_results[x]['end_time'], self.fake_session)
for x in fake_results]
self.db_api_mock.create_test_run.assert_has_calls(
expected_create_test_run_calls, any_order=True)
self.fake_session.close.assert_called_once()
@mock.patch('subunit2sql.shell.running_avg')
def test_process_results_existing_tests(self, running_avg_mock):
fake_run_time = 'run time'
# Setup a common fake DB test
fake_db_test = mock.Mock(name='db test')
fake_db_test.id = 'test id'
fake_db_test.run_count = 3
fake_db_test.success = 2
fake_db_test.failure = 1
# Setup results
fake_results = dict(test1={'status': 'success', 'start_time': 0,
'end_time': 1, 'metadata': None,
'attachments': None},
test2={'status': 'fail', 'start_time': 0,
'end_time': 2, 'metadata': None,
'attachments': None},
test3={'status': 'skip', 'start_time': 0,
'end_time': 3, 'metadata': None,
'attachments': None})
fake_results_all = copy.deepcopy(fake_results)
fake_results_all['run_time'] = fake_run_time
# Mock create run
fake_db_run_id = 'run id'
fake_db_run = mock.Mock(name='db run')
fake_db_run.id = fake_db_run_id
self.db_api_mock.create_run.return_value = fake_db_run
# Tests are found in the DB
self.db_api_mock.get_test_by_test_id.return_value = fake_db_test
# Mock running avg
fake_running_avg = 'running average'
def fake_running_avg_method(test, values, result):
values['run_time'] = fake_running_avg
return values
running_avg_mock.side_effect = fake_running_avg_method
# Run process_results
shell.process_results(fake_results_all)
# Check that we lookup all tests in the DB
expected_test_by_id_calls = [mock.call(x, self.fake_session) for x in
fake_results]
self.db_api_mock.get_test_by_test_id.assert_has_calls(
expected_test_by_id_calls, any_order=True)
# Check that counters for tests are updated (if not skip)
expected_update_test_calls = [
mock.call({'run_count': 4, 'success': 3,
'run_time': fake_running_avg},
fake_db_test.id, self.fake_session),
mock.call({'run_count': 4, 'failure': 2},
fake_db_test.id, self.fake_session)]
self.db_api_mock.update_test.assert_has_calls(
expected_update_test_calls, any_order=True)
# Check that a test_run for each test is created in the DB
expected_create_test_run_calls = [
mock.call(fake_db_test.id, fake_db_run_id,
fake_results[x]['status'],
fake_results[x]['start_time'],
fake_results[x]['end_time'], self.fake_session)
for x in fake_results]
self.db_api_mock.create_test_run.assert_has_calls(
expected_create_test_run_calls, any_order=True)
self.fake_session.close.assert_called_once()
def test_process_results_existing_tests_invalid_status(self):
fake_run_time = 'run time'
# Setup a common fake DB test
fake_db_test = mock.Mock(name='db test')
fake_db_test.run_count = 3
# Setup results
fake_results = dict(test1={'status': 'invalid', 'start_time': 0,
'end_time': 1, 'metadata': None,
'attachments': None})
fake_results_all = copy.deepcopy(fake_results)
fake_results_all['run_time'] = fake_run_time
# Tests are found in the DB
self.db_api_mock.get_test_by_test_id.return_value = fake_db_test
with testtools.ExpectedException(exceptions.UnknownStatus,
'^.*\n.*%s$' % 'invalid'):
# Run process_results
shell.process_results(fake_results_all)
# Check that we lookup all tests in the DB
expected_test_by_id_calls = [mock.call(x, self.fake_session) for x in
fake_results]
self.db_api_mock.get_test_by_test_id.assert_has_calls(
expected_test_by_id_calls, any_order=True)
# Check that a run is created in the DB
self.db_api_mock.create_run.assert_called_once_with(
self.fake_totals['skips'], self.fake_totals['fails'],
self.fake_totals['success'], fake_run_time, None, id=None,
run_at=None, session=self.fake_session)
# Check no test run was added to the DB
self.db_api_mock.create_test_run.assert_not_called()
# FIXME(andreaf) Session is not closed.
# https://storyboard.openstack.org/?#!/story/2000702
# self.fake_session.close.assert_called_once()
def test_process_results_test_metadata_new(self):
# Setup the metadata prefix configuration
fake_meta_prefix = '_meta_'
cfg.CONF.set_override(name='test_attr_prefix',
override=fake_meta_prefix)
fake_run_time = 'run time'
# Setup a common fake DB test
fake_db_test = mock.Mock(name='db test')
fake_db_test.id = 'test id'
fake_db_test.run_count = 3
fake_db_test.success = 2
fake_db_test.failure = 1
# Setup results
expected_metadata_list = [fake_meta_prefix + 'a',
fake_meta_prefix + 'b']
extra_metadata_list = ['c', 'd']
full_metadata_list = expected_metadata_list + extra_metadata_list
fake_results = dict(
test1={'status': 'fail', 'start_time': 0, 'end_time': 1,
'metadata': {'attrs': ','.join(full_metadata_list)},
'attachments': None})
fake_results_all = copy.deepcopy(fake_results)
fake_results_all['run_time'] = fake_run_time
# Mock create run
fake_db_run_id = 'run id'
fake_db_run = mock.Mock(name='db run')
fake_db_run.id = fake_db_run_id
self.db_api_mock.create_run.return_value = fake_db_run
# Mock create test run
fake_db_test_run_id = 'test run id'
fake_db_test_run = mock.Mock(name='db test run')
fake_db_test_run.id = fake_db_test_run_id
self.db_api_mock.create_test_run.return_value = fake_db_test_run
# Tests are found in the DB
self.db_api_mock.get_test_by_test_id.return_value = fake_db_test
# Test metadata is not found in the DB
self.db_api_mock.get_test_metadata.return_value = []
# Run process_results
shell.process_results(fake_results_all)
# Check only matching metadata is added to test_metadata
expected_add_test_metadata_calls = [
mock.call({'attr': x}, fake_db_test.id, session=self.fake_session)
for x in expected_metadata_list]
self.db_api_mock.add_test_metadata.assert_has_calls(
expected_add_test_metadata_calls, any_order=True)
# Check all metadata is added to test_run_metadata
self.db_api_mock.add_test_run_metadata.assert_has_calls([
mock.call(fake_results['test1']['metadata'], fake_db_test_run_id,
self.fake_session)])
self.fake_session.close.assert_called_once()
def test_process_results_test_metadata_existing(self):
# Setup the metadata prefix configuration
fake_meta_prefix = '_meta_'
cfg.CONF.set_override(name='test_attr_prefix',
override=fake_meta_prefix)
fake_run_time = 'run time'
# Setup a common fake DB test
fake_db_test = mock.Mock(name='db test')
fake_db_test.id = 'test id'
fake_db_test.run_count = 3
fake_db_test.success = 2
fake_db_test.failure = 1
# Setup results
expected_metadata_list = [fake_meta_prefix + 'a',
fake_meta_prefix + 'b']
existing_metadata_list = [fake_meta_prefix + 'c',
fake_meta_prefix + 'd']
full_metadata_list = expected_metadata_list + existing_metadata_list
fake_results = dict(
test1={'status': 'fail', 'start_time': 0, 'end_time': 1,
'metadata': {'attrs': ','.join(full_metadata_list)},
'attachments': None})
fake_results_all = copy.deepcopy(fake_results)
fake_results_all['run_time'] = fake_run_time
# Mock create run
fake_db_run_id = 'run id'
fake_db_run = mock.Mock(name='db run')
fake_db_run.id = fake_db_run_id
self.db_api_mock.create_run.return_value = fake_db_run
# Mock create test run
fake_db_test_run_id = 'test run id'
fake_db_test_run = mock.Mock(name='db test run')
fake_db_test_run.id = fake_db_test_run_id
self.db_api_mock.create_test_run.return_value = fake_db_test_run
# Tests are found in the DB
self.db_api_mock.get_test_by_test_id.return_value = fake_db_test
# Test metadata is found in the DB
test_metadata_list = []
for value in existing_metadata_list:
test_metadata = mock.Mock()
test_metadata.key = 'attr'
test_metadata.value = value
test_metadata_list.append(test_metadata)
self.db_api_mock.get_test_metadata.return_value = test_metadata_list
# Run process_results
shell.process_results(fake_results_all)
# Check only matching metadata is added to test_metadata
expected_add_test_metadata_calls = [
mock.call({'attr': x}, fake_db_test.id, session=self.fake_session)
for x in expected_metadata_list]
self.db_api_mock.add_test_metadata.assert_has_calls(
expected_add_test_metadata_calls, any_order=True)
# Check all metadata is added to test_run_metadata
self.db_api_mock.add_test_run_metadata.assert_has_calls([
mock.call(fake_results['test1']['metadata'], fake_db_test_run_id,
self.fake_session)])
self.fake_session.close.assert_called_once()
def test_process_result_test_metadata_no_prefix(self):
# Setup the metadata prefix configuration
cfg.CONF.set_override(name='test_attr_prefix',
override='')
fake_run_time = 'run time'
# Setup a common fake DB test
fake_db_test = mock.Mock(name='db test')
fake_db_test.id = 'test id'
fake_db_test.run_count = 3
fake_db_test.success = 2
fake_db_test.failure = 1
# Setup results
expected_metadata_list = ['a', 'b', 'c']
fake_results = dict(
test1={'status': 'fail', 'start_time': 0, 'end_time': 1,
'metadata': {'attrs': ','.join(expected_metadata_list)},
'attachments': None})
fake_results_all = copy.deepcopy(fake_results)
fake_results_all['run_time'] = fake_run_time
# Mock create run
fake_db_run_id = 'run id'
fake_db_run = mock.Mock(name='db run')
fake_db_run.id = fake_db_run_id
self.db_api_mock.create_run.return_value = fake_db_run
# Mock create test run
fake_db_test_run_id = 'test run id'
fake_db_test_run = mock.Mock(name='db test run')
fake_db_test_run.id = fake_db_test_run_id
self.db_api_mock.create_test_run.return_value = fake_db_test_run
# Tests are found in the DB
self.db_api_mock.get_test_by_test_id.return_value = fake_db_test
# Run process_results
shell.process_results(fake_results_all)
# Check test metadata is not added
self.db_api_mock.add_test_metadata.assert_not_called()
# Check all metadata is added to test_run_metadata
self.db_api_mock.add_test_run_metadata.assert_has_calls([
mock.call(fake_results['test1']['metadata'],
fake_db_test_run_id,
self.fake_session)])
self.fake_session.close.assert_called_once()
def test_process_result_test_run_attachments(self):
fake_run_time = 'run time'
# Setup a common fake DB test
fake_db_test = mock.Mock(name='db test')
fake_db_test.id = 'test id'
fake_db_test.run_count = 3
fake_db_test.success = 2
fake_db_test.failure = 1
# Setup results
fake_attachment = 'some text'
fake_results = dict(
test1={'status': 'fail', 'start_time': 0, 'end_time': 1,
'metadata': None, 'attachments': fake_attachment})
fake_results_all = copy.deepcopy(fake_results)
fake_results_all['run_time'] = fake_run_time
# Mock create run
fake_db_run_id = 'run id'
fake_db_run = mock.Mock(name='db run')
fake_db_run.id = fake_db_run_id
self.db_api_mock.create_run.return_value = fake_db_run
# Mock create test run
fake_db_test_run_id = 'test run id'
fake_db_test_run = mock.Mock(name='db test run')
fake_db_test_run.id = fake_db_test_run_id
self.db_api_mock.create_test_run.return_value = fake_db_test_run
# Tests are found in the DB
self.db_api_mock.get_test_by_test_id.return_value = fake_db_test
# Run process_results
shell.process_results(fake_results_all)
# Check attachments are added to the test run
self.db_api_mock.add_test_metadata.assert_not_called()
# Check all metadata is added to test_run_metadata
self.db_api_mock.add_test_run_attachments.assert_has_calls([
mock.call(fake_attachment, fake_db_test_run_id,
self.fake_session)])
self.fake_session.close.assert_called_once()