Adding api to return List of tests in each scenario

Change-Id: I9b3466ea8b37dad33161b054e180db4d9964cccc
This commit is contained in:
Anand Shanmugam 2016-04-20 00:05:27 -07:00
parent e80d0602b9
commit c26eb471b7
4 changed files with 40 additions and 2 deletions

View File

@ -28,6 +28,8 @@ from cloudpulse.api.controllers.v1 import types
from cloudpulse.api.controllers.v1 import utils as api_utils
from cloudpulse import objects
from cloudpulse.scenario import base as plugin_base
class CpulsePatchType(types.JsonPatchType):
@ -37,6 +39,7 @@ class CpulsePatchType(types.JsonPatchType):
class Cpulse(base.APIBase):
"""API representation of a test.
This class enforces type checking and value constraints, and converts
@ -98,6 +101,7 @@ class Cpulse(base.APIBase):
class CpulseCollection(collection.Collection):
"""API representation of a collection of tests."""
cpulses = [Cpulse]
@ -122,11 +126,13 @@ class CpulseCollection(collection.Collection):
class cpulseController(rest.RestController):
"""REST controller for Cpulse.."""
def __init__(self):
super(cpulseController, self).__init__()
_custom_actions = {'detail': ['GET']}
_custom_actions = {'detail': ['GET'], 'list_tests': ['GET']}
def _get_tests_collection(self, marker, limit, sort_key, sort_dir,
expand=False, resource_url=None, failed=None,
@ -185,6 +191,16 @@ class cpulseController(rest.RestController):
test_ident)
return rpc_test_detail
@pecan.expose('json')
def list_tests(self):
"""Retrieve list of tests for each scenario.
:param none: No params needed.
"""
all_tests = plugin_base.Scenario.list_all_scenario_types()
all_test_dict = {key: "\n".join(all_tests[key]) for key in all_tests}
return all_test_dict
@wsme_pecan.wsexpose(Cpulse, body=Cpulse, status_code=201)
def post(self, test):
"""Create a new test.

View File

@ -37,11 +37,13 @@ def scenario(admin_only=False, operator=False, context=None):
class Scenario(object):
"""This is base class for any benchmark scenario.
You should create subclass of this class. And your test scenarios will
be auto discoverable and you will be able to specify it in test config.
"""
def __init__(self, context=None, admin_tests=None,
tenant_tests=None, operator_tests=None):
self._admin_tests = admin_tests
@ -188,6 +190,20 @@ class Scenario(object):
itertools.chain.from_iterable(scenarios_list))
return scenarios_list_flat
@classmethod
def list_all_scenario_types(scenario_cls):
"""Lists all the tests in all scenarios."""
scenarios = scenario_cls.list_all_scenarios()
scenario_dict = {}
for scenario in scenarios:
testype, test = scenario.split(".")
if testype in scenario_dict:
scenario_dict[testype].append(test)
else:
scenario_dict[testype] = []
scenario_dict[testype].append(test)
return scenario_dict
@classmethod
def validate(cls, name, config, admin=None, users=None, task=None):
"""Semantic check of benchmark arguments."""

View File

@ -27,7 +27,6 @@ CONF.register_opts(TESTS_OPTS, periodic_test_group)
class dummy_scenario(base.Scenario):
@base.scenario(operator=True)
def dummy_cloudtest(self, *args, **kwargs):
return (200, "success", ['dummy_result'])

View File

@ -59,6 +59,7 @@ CONF.register_opts(TESTS_OPTS, security_pulse_test_group)
class security_pulse_scenario(base.Scenario):
@base.scenario(admin_only=False, operator=False)
def password_encryption_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
@ -71,6 +72,7 @@ class security_pulse_scenario(base.Scenario):
result = pwd_test.perform_password_encryption_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def keystone_tls_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
@ -83,6 +85,7 @@ class security_pulse_scenario(base.Scenario):
result = test.perform_tls_enablement_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def keystone_admin_token_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
@ -95,6 +98,7 @@ class security_pulse_scenario(base.Scenario):
result = test.perform_ks_admin_token_check_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def file_comparision_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
@ -107,6 +111,7 @@ class security_pulse_scenario(base.Scenario):
result = test.perform_file_permission_check(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def logfile_mode_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
@ -119,6 +124,7 @@ class security_pulse_scenario(base.Scenario):
result = test.perform_log_file_mode_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def logfile_rotate_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
@ -131,6 +137,7 @@ class security_pulse_scenario(base.Scenario):
result = test.perform_log_file_rotate_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def mysql_tsl_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status: