Security Pulse code

Change-Id: I797ab784111242c6526719e558003930f0e3fa7b
This commit is contained in:
C Sasi Kanth 2015-07-13 04:27:48 -07:00 committed by Anand Shanmugam
parent 561c8228ba
commit 14fab75160
20 changed files with 951 additions and 0 deletions

View File

View File

View File

@ -0,0 +1,151 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import ansible.inventory
import ansible.runner
import os
TMP_LOCATION = "/tmp/sec_hc/"
class ansible_runner(object):
def __init__(self,
os_node_list=[]):
self.openstack_node = os_node_list
# print self.openstack_node
self.remote_user = None
self.remote_pass = None
self.inventory = None
def execute_cmd(self, command, file_list=[], ips=[], roles=[]):
inventory = None
filetered_os_list = []
if ips:
filetered_os_list = self.get_os_node_list(ip_list=ips)
elif roles:
filetered_os_list = self.get_os_node_list(role_list=roles)
else:
filetered_os_list = self.openstack_node
# print filetered_os_list
if filetered_os_list:
inventory = self.init_ansible_inventory(filetered_os_list)
if inventory:
self.inventory = inventory
for f in file_list:
self.copy(f, TMP_LOCATION)
out = self.execute(command + " >> " + TMP_LOCATION + "output")
print (out)
out = self.fetch(TMP_LOCATION + 'output', TMP_LOCATION +
'output', 'no')
print (out)
self.execute("rm -rf /tmp/sec_hc/")
# print out
def set_ansible_inventory(self, inv):
self.inventory = inv
def set_credential(self, user, passwd):
self.remote_user = user
self.remote_pass = passwd
def init_ansible_inventory(self, os_node_list):
ip_list = []
for os_node in self.openstack_node:
ip_list.append(os_node.getIp())
self.remote_user = os_node.getUser()
self.remote_pass = os_node.getPassword()
# print ip_list
inventory = ansible.inventory.Inventory(ip_list)
return inventory
def get_os_node_list(self, ip_list=[], role_list=[]):
filetered_list = []
if not ip_list and not role_list:
return self.openstack_node
if ip_list and self.openstack_node:
for ip in ip_list:
for os_node in self.openstack_node:
if ip == os_node.getIp():
filetered_list.append(os_node)
elif role_list and self.openstack_node:
for role in role_list:
for os_node in self.self.openstack_node:
if role == os_node.getRole():
filetered_list.append(os_node)
return filetered_list
def copy(self, src, dest):
runner = ansible.runner.Runner(
module_name='copy',
module_args='src=%s dest=%s' % (src, dest),
remote_user=self.remote_user,
remote_pass=self.remote_pass,
inventory=self.inventory,
)
out = runner.run()
return out
def fetch(self, src, dest, flat='yes'):
runner = ansible.runner.Runner(
module_name='fetch',
module_args='src=%s dest=%s flat=%s' % (src, dest, flat),
remote_user=self.remote_user,
remote_pass=self.remote_pass,
inventory=self.inventory,
)
out = runner.run()
return out
# can perform all shell operations Ex: rm /tmp/output
def execute(self, command):
# print command
runner = ansible.runner.Runner(
module_name='shell',
module_args=command,
remote_user=self.remote_user,
remote_pass=self.remote_pass,
inventory=self.inventory,
)
out = runner.run()
return out
def get_results(self):
result = {}
if not os.path.isdir(TMP_LOCATION + 'output/'):
return result
files = os.walk(TMP_LOCATION + 'output/').next()[1]
for f in files:
try:
result[f] = open(TMP_LOCATION + 'output/' +
f + TMP_LOCATION + 'output', 'r').read()
except IOError:
print ("Error opening the file : " + TMP_LOCATION +
'output/' + f + TMP_LOCATION + 'output')
return result
"""
if __name__ == '__main__':
os_node_info_obj = openstack_node_info_reader("/home/ubuntu/
sasi/cpulse/cloudpulse/plugins/security_pulse/config/
openstack_config.yaml")
openstack_node_list = os_node_info_obj.get_host_list()
print openstack_node_list
flist=["/home/ubuntu/sasi/cpulse/cloudpulse/plugins/
security_pulse/testcase/TLS_Enablement_Check.py"]
ans_runner = ansible_runner(openstack_node_list)
ans_runner.execute_cmd("python "+TMP_LOCATION+
"TLS_Enablement_Check.py",file_list=flist)
"""

View File

@ -0,0 +1,42 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class openstack_node_obj(object):
def __init__(self, host, ip, user, password, role, name):
self.host = host
self.ip = ip
self.user = user
self.password = password
self.role = role
self.name = name
def getHost(self):
return self.host
def getIp(self):
return self.ip
def getUser(self):
return self.user
def getPassword(self):
return self.password
def getRole(self):
return self.role
def getName(self):
return self.name

View File

@ -0,0 +1,64 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
from openstack_node import openstack_node_obj
import yaml
class openstack_node_info_reader(object):
def __init__(self, os_node_file):
self.hostYamlObj = None
try:
fp = open(os_node_file)
except IOError as e:
print ("Error while opening the file...%s", e)
return
try:
self.hostYamlObj = yaml.load(fp)
except yaml.error.YAMLError as perr:
print ("Error while parsing...%s", perr)
return
def get_host_list(self):
openstack_host_list = []
for key in self.hostYamlObj.keys():
name = key
ip = self.hostYamlObj[key]["ip"]
hostname = key
username = self.hostYamlObj[key]["user"]
password = self.hostYamlObj[key]["password"]
role = self.hostYamlObj[key]["role"]
node_obj = openstack_node_obj(hostname, ip, username,
password, role, name)
openstack_host_list.append(node_obj)
return openstack_host_list
"""
def get_host_list(self):
return self.openstack_host_list
"""
def printHostList(self, openstack_host_list):
for hostObj in openstack_host_list:
print ("%s - %s - %s", hostObj.getIp(),
hostObj.getHost(), hostObj.getUser())
"""
if __name__ == '__main__':
os_node_info_obj = openstack_node_info_reader()
os_node_info_obj.get_host_list()
"""

View File

@ -0,0 +1,5 @@
control-1:
ip: 172.22.191.136
user: root
password: cisco123
role: controller

View File

@ -0,0 +1,54 @@
securityhealth:
global_data:
file_info_dir: /tmp/sec_hc/
common:
perform_on: [controller,compute]
testcase: [tls_enablement_check]
password_encryption_check:
perform_on: [controller]
input:
conf_file: [/etc/keystone/keystone.conf]
filepermission:
input:
baseline_file: /tmp/sec_hc/os_allnode_baseline
controller_dir: [/etc/keystone,/etc/nova,/etc/neutron]
compute_dir: [/etc/nova,/etc/neutron]
logfile_mode_check:
perform_on: [controller,compute]
input:
conf_file_dir: [/etc/keystone/,/etc/nova/,/etc/neutron/,/etc/glance/]
logrotate_cfg_check:
perform_on: [controller,compute]
input:
ks_admin_token_check:
perform_on: [controller]
input:
tls_enablement_check:
perform_on: [controller]
input:
keystone:
perform_on: [controller]
testcase: [token_mangement.token_deletion,service.service_restart]
token_mangement:
token_deletion:
input:
token_expiration:
input:
token_time: 10
configuration:
configuration_check:
input:
algorithm: md5
service:
service_restart:
input:
horizon:
perform_on: [controller]
testcase:
configuration:
configuration_check:
input:
conffile: [https.conf]
ServerTokens: Prod
ServerSignature: off
TraceEnable: off

View File

@ -0,0 +1,126 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import cloudpulse
# from cloudpulse.operator.ansible.openstack_node import openstack_node_obj
from cloudpulse.operator.ansible.openstack_node_info_reader import \
openstack_node_info_reader
from cloudpulse.scenario import base
from cloudpulse.scenario.plugins.security_pulse.testcase.tls_enable_test \
import tls_enablement_test
from cloudpulse.scenario.plugins.security_pulse.testcase.\
ks_admin_token_check import ks_admin_token_check
from cloudpulse.scenario.plugins.security_pulse.util.\
security_pulse_test_input import security_test_input_reader
from cloudpulse.scenario.plugins.security_pulse.util import \
security_pulse_test_util
import os
from oslo_config import cfg
TESTS_OPTS = [
cfg.StrOpt('testcase_input_file',
default='',
help='Security testcase input file')
]
CONF = cfg.CONF
security_pulse_test_group = cfg.OptGroup(name='security_pulse_test',
title='Security pulse test' +
' param input file')
CONF.register_group(security_pulse_test_group)
CONF.register_opts(TESTS_OPTS, security_pulse_test_group)
class security_common_test(base.Scenario):
def security_keystone_tls_enablement_check(self, *args, **kwargs):
testcase_input_file = ""
try:
testcase_input_file =\
cfg.CONF.security_pulse_test.testcase_input_file
except Exception as e:
print ("Exception while reading the testcase input file")
return (404, e.message, [])
if not os.path.isfile(testcase_input_file):
print ("Security Testcase input file not found")
return (404, "Security Testcase input file not found", [])
# print testcase_input_file
base_dir = os.path.dirname(cloudpulse.__file__)
input_reader = security_test_input_reader(testcase_input_file)
input_data = input_reader.process_security_input_file()
input_params = security_pulse_test_util.\
get_test_input_by_name("tls_enablement_check", input_data)
os_node_info_obj = \
openstack_node_info_reader(base_dir +
"/scenario/plugins/security_pulse/" +
"config/openstack_config.yaml")
openstack_node_list = os_node_info_obj.get_host_list()
input_params['os_host_list'] = openstack_node_list
# print input_params
tls_test = tls_enablement_test()
result = tls_test.perform_tls_enablement_test(input_params)
if not result:
return (404, "No result from test execution", [])
# print result
if result.startswith("Fail"):
return (404, result, [])
else:
return (200, result, [])
def security_keystone_admin_token_check(self, *args, **kwargs):
testcase_input_file = ""
try:
testcase_input_file =\
cfg.CONF.security_pulse_test.testcase_input_file
except Exception as e:
print ("Exception while reading the testcase input file")
return (404, e.message, [])
if not os.path.isfile(testcase_input_file):
return (404, "Security Testcase input file not found", [])
base_dir = os.path.dirname(cloudpulse.__file__)
input_reader = security_test_input_reader(testcase_input_file)
input_data = input_reader.process_security_input_file()
input_params = security_pulse_test_util.\
get_test_input_by_name("ks_admin_token_check", input_data)
os_node_info_obj = \
openstack_node_info_reader(base_dir +
"/scenario/plugins/security_pulse/" +
"config/openstack_config.yaml")
openstack_node_list = os_node_info_obj.get_host_list()
input_params['os_host_list'] = openstack_node_list
# print input_params
ks_test = ks_admin_token_check()
result = ks_test.perform_ks_admin_token_check_test(input_params)
if not result:
return (404, "No result from test execution", [])
# print result
test_status = None
data = ""
for r in result:
if test_status is None or r[2].startswith("Fail"):
test_status = "fail"
elif test_status is None:
test_status = "success"
data = data + r[0] + " -> " + r[1] + " -> " + r[2] + "\n"
if test_status == "fail":
return (404, data, [])
else:
return (200, data, [])
if __name__ == '__main__':
sct = security_common_test()
sct.security_tls_enablement_check()

View File

@ -0,0 +1,75 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import ConfigParser
import os
import pwd
import stat
class tls_enable_check(object):
def __init__(self):
pass
def read_tls_config(self, config):
try:
config.get("ldap", "use_tls")
except ConfigParser.NoOptionError:
print ("Fail - use_tls option is not enabled")
return
else:
use_tls = config.get("ldap", "use_tls")
if use_tls == 'false':
print ("Fail - use_tls option is enabled with 'false' value")
return
elif use_tls == 'true':
ca_dir = None
try:
ca_dir = config.get("ldap", "tls_cacertdir")
except ConfigParser.NoOptionError:
try:
tls_ca_file = config.get("ldap", "tls_cacertfile")
ca_dir = tls_ca_file[:tls_ca_file.rindex('/')]
except ConfigParser.NoOptionError:
print ("Fail - Both 'tls_ca_dir' and " +
"'tls_ca_file' are not defined")
return
if not ca_dir:
print ("Fail - Both 'tls_ca_dir' and " +
"'tls_ca_file' are not defined")
return
else:
for dirName, subdirList, fileList in os.walk(ca_dir):
os.chdir(dirName)
for f1 in fileList:
st = os.stat(f1)
user = pwd.getpwuid(st[stat.ST_UID])[0]
group = pwd.getpwuid(st[stat.ST_GID])[0]
# mode = oct(stat.S_IMODE(st[stat.ST_MODE]))
if user != 'keystone' or group != 'keystone':
print ("Fail - Certificate file directory " +
"user/group permission are user=%s, " +
"group=%s ", user, group)
return
print ("Success - TLS is enabled and the Certificate file " +
"permissions are 'keystone'")
return
if __name__ == '__main__':
tls_enable_check_obj = tls_enable_check()
config = ConfigParser.ConfigParser()
config.read("/etc/keystone/keystone.conf")
tls_enable_check_obj.read_tls_config(config)

View File

@ -0,0 +1,71 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import os
class keystone_admin_token_check(object):
def __init__(self):
pass
def keystone_admin_token_test(self):
ks_conf_file = "/etc/keystone/keystone.conf"
result = []
config = ConfigParser.ConfigParser()
if os.path.exists(ks_conf_file):
try:
config.read(ks_conf_file)
except Exception:
result.append("admin_token - keystone.conf not found - Fail")
else:
try:
config.get("DEFAULT", "admin_token")
except ConfigParser.NoOptionError:
result.append("admin_token - Not defined - Pass")
else:
result.append("admin_token - Defined - Fail")
else:
result.append("admin_token - keystone.conf not found - Fail")
ks_paste_conf_file = "/etc/keystone/keystone-paste.ini"
if os.path.exists(ks_paste_conf_file):
try:
config.read(ks_paste_conf_file)
except Exception:
result.append("admin_auth_token - keystone-paste.ini not " +
"found - Pass")
else:
try:
config.get("filter:admin_token_auth",
"paste.filter_factory")
except (ConfigParser.NoOptionError,
ConfigParser.NoSectionError):
result.append("admin_auth_token - Not defined - Pass")
else:
option = config.get("filter:admin_token_auth",
"paste.filter_factory")
if "AdminTokenAuthMiddleware" in option:
result.append("admin_auth_token - Defined - Fail")
else:
result.append("admin_auth_token - Not Defined - Pass")
else:
result.append("admin_auth_token - keystone-paste.ini not found " +
"- Pass")
print (result)
if __name__ == '__main__':
keystone_admin_token_check_obj = keystone_admin_token_check()
keystone_admin_token_check_obj.keystone_admin_token_test()

View File

@ -0,0 +1,53 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import os
TMP_LOCATION = "/tmp/sec_hc/"
class ks_admin_token_check(object):
def perform_ks_admin_token_check_test(self, input_params):
print ("Executing the test ", input_params.get('testcase_name'))
file_info_dir = input_params['global_data']['file_info_dir']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at \
test level or test case level")
return
os_hostobj_list = input_params['os_host_list']
base_dir = os.path.dirname(cloudpulse.__file__)
flist = [base_dir +
"/scenario/plugins/security_pulse/testcase/" +
"keystone_admin_token_check.py"]
ans_runner = ansible_runner(os_hostobj_list)
ans_runner.execute_cmd("python " + TMP_LOCATION +
"keystone_admin_token_check.py " +
TMP_LOCATION, file_list=flist)
result = ans_runner.get_results()
if not result:
return result
result_row = []
for key in result.keys():
obj = eval(result[key])
for r in obj:
result = r.split(" - ")
result_row.append([result[0], result[1], result[2]])
os.system('rm -rf ' + file_info_dir + 'output')
return result_row

View File

@ -0,0 +1,48 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import os
TMP_LOCATION = "/tmp/sec_hc/"
class tls_enablement_test(object):
def perform_tls_enablement_test(self, input_params):
print ("Executing the test ", input_params.get('testcase_name'))
file_info_dir = input_params['global_data']['file_info_dir']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at test level " +
"or test case level")
return
os_hostobj_list = input_params['os_host_list']
base_dir = os.path.dirname(cloudpulse.__file__)
flist = [base_dir + "/scenario/plugins/security_pulse" +
"/testcase/TLS_Enablement_Check.py"]
# print os_hostobj_list
ans_runner = ansible_runner(os_hostobj_list)
ans_runner.execute_cmd("python " + TMP_LOCATION +
"TLS_Enablement_Check.py " +
TMP_LOCATION, file_list=flist)
result = ans_runner.get_results()
if not result:
return result
os.system('rm -rf ' + file_info_dir + 'output')
for key in result.keys():
return result[key]

View File

@ -0,0 +1,142 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudpulse.scenario.plugins.security_pulse.util.security_test_data \
import security_test
from cloudpulse.scenario.plugins.security_pulse.util.security_testcase_data \
import security_testcase
import yaml
class security_test_input_reader(object):
def __init__(self, fileName):
self.secInputYamlObj = None
self.security_tests = []
try:
fp = open(fileName)
except IOError as e:
print ("Error while opening the file...%s", e)
return
try:
self.secInputYamlObj = yaml.load(fp)
except yaml.error.YAMLError as perr:
print ("Error while parsing...%s", perr)
return
def process_security_input_file(self):
# print self.secInputYamlObj
secTests = self.secInputYamlObj["securityhealth"]
globalVarData = {}
input_data = {}
sec_test_lst = []
for test_key in secTests.keys():
if test_key == "global_data":
for gkey in secTests[test_key].keys():
globalVarData[gkey] = secTests[test_key][gkey]
continue
sec_test_obj = security_test()
sec_test_obj.set_test_name(test_key)
sec_test_case_lst = []
test_data = secTests[test_key]
for test_case_key in test_data.keys():
if test_case_key == "perform_on":
sec_test_obj.set_perform_on(secTests[test_key]
[test_case_key])
elif test_case_key == "testcase":
sec_test_obj.set_test_to_execute(secTests[test_key]
[test_case_key])
else:
security_testcase_obj = security_testcase()
security_testcase_obj.set_test_name(test_case_key)
if "perform_on" in secTests[test_key][test_case_key]:
# print secTests[test_key][test_case_key]["perform_on"]
security_testcase_obj.\
set_perform_on(secTests[test_key]
[test_case_key]
["perform_on"])
test_input_dict = {}
if "input" in secTests[test_key][test_case_key]:
if secTests[test_key][test_case_key]["input"] \
is not None:
for test_case_input_key in \
secTests[test_key][test_case_key]["input"].\
keys():
test_input_dict[test_case_input_key] = \
(secTests[test_key][test_case_key]["input"]
[test_case_input_key])
security_testcase_obj.\
set_input_params(test_input_dict)
sec_test_case_lst.append(security_testcase_obj)
else:
sec_test_case_lst = sec_test_case_lst + \
self.process_testcase_input(test_key,
test_case_key,
secTests)
sec_test_obj.set_security_testcase(sec_test_case_lst)
sec_test_lst.append(sec_test_obj)
# security_test_input_reader.print_test_input(sec_test_lst)
# print globalVarData
input_data['global_data'] = globalVarData
input_data['sec_test_lst'] = sec_test_lst
return input_data
def process_testcase_input(self, test_key, test_case_key, secTests):
sec_test_case_lst = []
# print secTests[test_key][test_case_key]
for sub_test_case_key in (secTests[test_key]
[test_case_key]).keys():
security_testcase_obj = security_testcase()
security_testcase_obj.set_test_name(test_case_key + "." +
sub_test_case_key)
if "perform_on" in (secTests[test_key][test_case_key]
[sub_test_case_key]):
security_testcase_obj.\
set_perform_on(secTests[test_key][test_case_key]
[sub_test_case_key]["perform_on"])
if "input" in secTests[test_key][test_case_key][sub_test_case_key] \
and (secTests[test_key][test_case_key][sub_test_case_key]
["input"]) is not None:
test_input_dict = {}
for test_case_input_key in \
(secTests[test_key][test_case_key]
[sub_test_case_key]["input"]).keys():
test_input_dict[test_case_input_key] = \
(secTests[test_key][test_case_key]
[sub_test_case_key]["input"][test_case_input_key])
security_testcase_obj.set_input_params(test_input_dict)
sec_test_case_lst.append(security_testcase_obj)
return sec_test_case_lst
"""
@staticmethod
def print_test_input(sec_test_lst):
for test_obj in sec_test_lst:
print "TestName : %s " % test_obj.get_test_name()
print "Perform On : %s " % test_obj.get_perform_on()
print "Test to execute : %s " % test_obj.get_test_to_execute()
for test_case_obj in test_obj.get_security_testcase():
print " Test case Name : %s " % test_case_obj.\
get_test_name()
print " Perform On : %s " % test_case_obj.\
get_perform_on()
print " Input Params : %s " % test_case_obj.\
get_input_params()
"""
if __name__ == '__main__':
yhp = security_test_input_reader()
yhp.process_security_input_file()

View File

@ -0,0 +1,32 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def get_test_input_by_name(testcase_name, input_data):
sec_test_lst = input_data['sec_test_lst']
for test_obj in sec_test_lst:
for test_case_obj in test_obj.get_security_testcase():
if testcase_name == test_case_obj.get_test_name():
input_params = test_case_obj.get_input_params()
input_params['testcase_name'] = testcase_name
if test_case_obj.get_perform_on() is not None:
input_params['perform_on'] = \
test_case_obj.get_perform_on()
else:
input_params['perform_on'] = test_obj.get_perform_on()
input_params['test_name'] = test_obj.get_test_name()
input_params['global_data'] = input_data['global_data']
return input_params
return None

View File

@ -0,0 +1,47 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class security_test(object):
def __init__(self):
self.test_name = None
self.security_testcase = []
self.perform_on = []
self.test_to_execute = []
def get_test_name(self):
return self.test_name
def get_security_testcase(self):
return self.security_testcase
def set_test_name(self, test_name):
self.test_name = test_name
def set_security_testcase(self, security_testcase):
self.security_testcase = security_testcase
def get_perform_on(self):
return self.perform_on
def set_perform_on(self, perform_on):
self.perform_on = perform_on
def get_test_to_execute(self):
return self.test_to_execute
def set_test_to_execute(self, test_to_execute):
self.test_to_execute = test_to_execute

View File

@ -0,0 +1,40 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class security_testcase(object):
def __init__(self):
self.test_name = None
self.perform_on = []
self.input_params = {}
def get_test_name(self):
return self.test_name
def set_test_name(self, test_name):
self.test_name = test_name
def get_perform_on(self):
return self.perform_on
def set_perform_on(self, perform_on):
self.perform_on = perform_on
def get_input_params(self):
return self.input_params
def set_input_params(self, input_params):
self.input_params = input_params

View File

@ -5,6 +5,7 @@
pbr>=0.6,!=0.7,<1.0
Babel>=1.3
ansible
ecdsa>=0.13
eventlet>=0.17.3
iso8601>=0.1.9