Fix tests for subunit

Refactored stdout a bit to allow easier use of debugging tools.
This commit is contained in:
Clint Byrum 2015-11-04 13:01:31 -08:00
parent b6bff9707c
commit e7d63bfd10
2 changed files with 25 additions and 17 deletions

View File

@ -35,7 +35,9 @@ def get_queues():
queues_data = queues.collect()
def main(argv=None):
def main(argv=None, stdout=None):
if stdout is None:
stdout = sys.stdout
if argv is None:
argv = sys.argv
parser = argparse.ArgumentParser(argv[0])
@ -72,13 +74,14 @@ def main(argv=None):
content = json.dumps(collected, indent=1, sort_keys=True).encode('utf-8')
if args.subunit is not None:
file_name = args.subunit or 'counters.json'
stream = subunit_v2.StreamResultToBytes(sys.stdout)
stream = subunit_v2.StreamResultToBytes(stdout)
stream.startTestRun()
stream.status(file_name=file_name, file_bytes=content,
mime_type='application/json')
stream.stopTestRun()
else:
print(content.encode('utf-8'))
stdout.write(content)
stdout.write(b"\n")
if __name__ == '__main__':
main()

View File

@ -19,17 +19,27 @@ test_collect
Tests for `openstack_qa_tools.collect`
"""
import functools
import json
import mock
import fixtures
from openstack_qa_tools import collect
from openstack_qa_tools.tests import base
import six
import subunit
import testtools
from testtools import content as ttc
class StreamResult(testtools.StreamResult):
counters_content = None
def status(self, test_id=None, test_status=None, test_tags=None,
runnable=True, file_name=None, file_bytes=None, eof=False,
mime_type=None, route_code=None, timestamp=None):
if test_id:
return
if file_name != 'counters.json':
return
self.counters_content = file_bytes
class TestCollect(base.TestCase):
@ -37,7 +47,6 @@ class TestCollect(base.TestCase):
def setUp(self):
super(TestCollect, self).setUp()
self.stdout = six.BytesIO()
self.useFixture(fixtures.MonkeyPatch('sys.stdout', self.stdout))
self.attachments = []
@mock.patch('openstack_qa_tools.collectors.mysql.collect')
@ -45,8 +54,8 @@ class TestCollect(base.TestCase):
def test_collect_main(self, queues_mock, mysql_mock):
mysql_mock.return_value = {}
queues_mock.return_value = {}
collect.main(['os-collect-counters'])
content = json.loads(self.stdout.getvalue())
collect.main(['os-collect-counters'], self.stdout)
content = json.loads(self.stdout.getvalue().decode('utf-8'))
self.assertTrue(isinstance(content, dict))
self.assertIn('mysql', content)
self.assertIn('queues', content)
@ -62,21 +71,17 @@ class TestCollect(base.TestCase):
def test_collect_main_subunit(self, queues_mock, mysql_mock):
mysql_mock.return_value = {}
queues_mock.return_value = {}
collect.main(['os-collect-counters', '--subunit'])
collect.main(['os-collect-counters', '--subunit'], self.stdout)
self.stdout.seek(0)
stream = subunit.ByteStreamToStreamResult(self.stdout)
starts = testtools.StreamResult()
summary = testtools.StreamSummary()
outcomes = testtools.StreamToDict(
functools.partial(self._parse_outcome))
result = testtools.CopyStreamResult([starts, outcomes, summary])
result = StreamResult()
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
self.assertIn('counters.json', self.attachments)
content = json.loads(self.attachments['counters.json'])
self.assertIsNotNone(result.counters_content)
content = json.loads(result.counters_content.decode('utf-8'))
self.assertTrue(isinstance(content, dict))
self.assertIn('mysql', content)
self.assertIn('queues', content)