Simplifying Result Store

This change removes the results store, instead we just use a list
of issues. This makes things much simpler, any code that was of
value in the results store has been moved into the new Issue class
(replaceing the old named tuple) or into the manager.

Change-Id: I595a047f9df44b0e7df763d7c87624c9579da83c
This commit is contained in:
Tim Kelsey 2015-09-02 23:58:25 +01:00
parent ed5de4a475
commit 9e522d3322
13 changed files with 322 additions and 452 deletions

View File

@ -24,10 +24,9 @@ from bandit.core import context # noqa
from bandit.core import manager # noqa
from bandit.core import meta_ast # noqa
from bandit.core import node_visitor # noqa
from bandit.core import result_store # noqa
from bandit.core import test_set # noqa
from bandit.core import tester # noqa
from bandit.core import utils # noqa
from bandit.core.constants import * # noqa
from bandit.core.objects import * # noqa
from bandit.core.issue import * # noqa
from bandit.core.test_properties import * # noqa

View File

@ -24,9 +24,11 @@ import sysconfig
import appdirs
from bandit.core import config as b_config
from bandit.core import constants
from bandit.core import manager as b_manager
from bandit.core import utils
BASE_CONFIG = 'bandit.yaml'
logger = logging.getLogger()
@ -230,8 +232,10 @@ def main():
b_mgr.output_metaast()
# trigger output of results by Bandit Manager
b_mgr.output_results(args.context_lines, args.severity - 1,
args.confidence - 1, args.output_file,
b_mgr.output_results(args.context_lines,
constants.RANKING[args.severity - 1],
constants.RANKING[args.confidence - 1],
args.output_file,
args.output_format)
# return an exit code of 1 if there are results, 0 otherwise

View File

@ -19,10 +19,9 @@ from bandit.core import context # noqa
from bandit.core import manager # noqa
from bandit.core import meta_ast # noqa
from bandit.core import node_visitor # noqa
from bandit.core import result_store # noqa
from bandit.core import test_set # noqa
from bandit.core import tester # noqa
from bandit.core import utils # noqa
from bandit.core.constants import * # noqa
from bandit.core.objects import * # noqa
from bandit.core.issue import * # noqa
from bandit.core.test_properties import * # noqa

View File

@ -26,28 +26,32 @@ from bandit.core import constants
logger = logging.getLogger(__name__)
def report_csv(result_store, file_list, scores, excluded_files):
'''Prints/returns warnings in JSON format
def _sum_scores(manager, sev):
summation = 0
for scores in manager.scores:
summation += sum(scores['CONFIDENCE'][sev:])
summation += sum(scores['SEVERITY'][sev:])
return summation
:param result_store: results of scan as BanditResultStore object
:param files_list: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param excluded_files: Which files were excluded from the scope
:return: A collection containing the CSV data
def report_csv(manager, filename, sev_level, conf_level, lines=-1,
out_format='csv'):
'''Prints issues in CSV format
:param manager: the bandit manager object
:param filename: The output file name, or None for stdout
:param sev_level: Filtering severity level
:param conf_level: Filtering confidence level
:param lines: Number of lines to report, -1 for all
:param out_format: The ouput format name
'''
results = result_store._get_issue_list()
results = manager.get_issue_list()
# Remove the code from all the issues in the list, as we will not
# be including it in the CSV data.
def del_code(issue):
del issue['code']
map(del_code, results)
if filename is None:
filename = 'bandit_results.csv'
if result_store.out_file is None:
result_store.out_file = 'bandit_results.csv'
with open(result_store.out_file, 'w') as fout:
with open(filename, 'w') as fout:
fieldnames = ['filename',
'test_name',
'issue_severity',
@ -59,33 +63,37 @@ def report_csv(result_store, file_list, scores, excluded_files):
writer = csv.DictWriter(fout, fieldnames=fieldnames,
extrasaction='ignore')
writer.writeheader()
writer.writerows(results)
for result in results:
if result.filter(sev_level, conf_level):
writer.writerow(result.as_dict(with_code=False))
print("CSV output written to file: %s" % result_store.out_file)
print("CSV output written to file: %s" % filename)
def report_json(result_store, file_list, scores, excluded_files):
'''Prints/returns warnings in JSON format
def report_json(manager, filename, sev_level, conf_level, lines=-1,
out_format='json'):
'''''Prints issues in JSON format
:param result_store: results of scan as BanditResultStore object
:param files_list: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param excluded_files: Which files were excluded from the scope
:return: JSON string
:param manager: the bandit manager object
:param filename: The output file name, or None for stdout
:param sev_level: Filtering severity level
:param conf_level: Filtering confidence level
:param lines: Number of lines to report, -1 for all
:param out_format: The ouput format name
'''
stats = dict(zip(file_list, scores))
stats = dict(zip(manager.files_list, manager.scores))
machine_output = dict({'results': [], 'errors': [], 'stats': []})
collector = list()
for (fname, reason) in result_store.skipped:
for (fname, reason) in manager.skipped:
machine_output['errors'].append({'filename': fname,
'reason': reason})
'reason': reason})
for filer, score in six.iteritems(stats):
totals = {}
for i in range(result_store.sev_level, len(constants.RANKING)):
severity = constants.RANKING[i]
rank = constants.RANKING
sev_idx = rank.index(sev_level)
for i in range(sev_idx, len(rank)):
severity = rank[i]
severity_value = constants.RANKING_VALUES[severity]
try:
sc = score['SEVERITY'][i] / severity_value
@ -95,14 +103,18 @@ def report_json(result_store, file_list, scores, excluded_files):
machine_output['stats'].append({
'filename': filer,
'score': result_store._sum_scores(score),
'score': _sum_scores(manager, sev_idx),
'issue totals': totals})
collector = result_store._get_issue_list()
results = manager.get_issue_list()
collector = []
for result in results:
if result.filter(sev_level, conf_level):
collector.append(result.as_dict())
if result_store.agg_type == 'vuln':
if manager.agg_type == 'vuln':
machine_output['results'] = sorted(collector,
key=itemgetter('error_type'))
key=itemgetter('test_name'))
else:
machine_output['results'] = sorted(collector,
key=itemgetter('filename'))
@ -110,29 +122,30 @@ def report_json(result_store, file_list, scores, excluded_files):
# timezone agnostic format
TS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
time_string = result_store.generated_time.strftime(TS_FORMAT)
time_string = datetime.datetime.utcnow().strftime(TS_FORMAT)
machine_output['generated_at'] = time_string
result = json.dumps(machine_output, sort_keys=True,
indent=2, separators=(',', ': '))
if result_store.out_file:
with open(result_store.out_file, 'w') as fout:
if filename:
with open(filename, 'w') as fout:
fout.write(result)
# XXX: Should this be log output? (ukbelch)
print("JSON output written to file: %s" % result_store.out_file)
logger.info("JSON output written to file: %s" % filename)
else:
print(result)
def report_text(result_store, files_list, scores, excluded_files):
'''Prints the contents of the result store
def report_text(manager, filename, sev_level, conf_level, lines=-1,
out_format='txt'):
'''Prints issues in Text formt
:param result_store: results of scan as BanditResultStore object
:param files_list: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param excluded_files: List of files excluded from the scope
:return: TXT string with appropriate TTY coloring for terminals
:param manager: the bandit manager object
:param filename: The output file name, or None for stdout
:param sev_level: Filtering severity level
:param conf_level: Filtering confidence level
:param lines: Number of lines to report, -1 for all
:param out_format: The ouput format name
'''
tmpstr_list = []
@ -140,9 +153,9 @@ def report_text(result_store, files_list, scores, excluded_files):
# use a defaultdict to default to an empty string
color = collections.defaultdict(str)
if result_store.format == 'txt':
if out_format == 'txt':
# get text colors from settings for TTY output
get_setting = result_store.config.get_setting
get_setting = manager.b_conf.get_setting
color = {'HEADER': get_setting('color_HEADER'),
'DEFAULT': get_setting('color_DEFAULT'),
'LOW': get_setting('color_LOW'),
@ -157,30 +170,30 @@ def report_text(result_store, files_list, scores, excluded_files):
datetime.datetime.utcnow()
))
if result_store.verbose:
if manager.verbose:
# print which files were inspected
tmpstr_list.append("\n%sFiles in scope (%s):%s\n" % (
color['HEADER'], len(files_list),
color['HEADER'], len(manager.files_list),
color['DEFAULT']
))
for item in zip(files_list, map(result_store._sum_scores, scores)):
for item in zip(manager.files_list, map(_sum_scores, manager.scores)):
tmpstr_list.append("\t%s (score: %i)\n" % item)
# print which files were excluded and why
tmpstr_list.append("\n%sFiles excluded (%s):%s\n" %
(color['HEADER'], len(excluded_files),
(color['HEADER'], len(manager.skipped),
color['DEFAULT']))
for fname in excluded_files:
for fname in manager.skipped:
tmpstr_list.append("\t%s\n" % fname)
# print which files were skipped and why
tmpstr_list.append("\n%sFiles skipped (%s):%s\n" % (
color['HEADER'], len(result_store.skipped),
color['HEADER'], len(manager.skipped),
color['DEFAULT']
))
for (fname, reason) in result_store.skipped:
for (fname, reason) in manager.skipped:
tmpstr_list.append("\t%s (%s)\n" % (fname, reason))
# print the results
@ -188,75 +201,74 @@ def report_text(result_store, files_list, scores, excluded_files):
color['HEADER'], color['DEFAULT']
))
if result_store.count == 0:
issues = manager.get_issue_list()
if not len(issues):
tmpstr_list.append("\tNo issues identified.\n")
for filename, issues in result_store.resstore.items():
for issue in issues:
for issue in issues:
# if the result isn't filtered out by severity
if issue.filter(sev_level, conf_level):
tmpstr_list.append("\n%s>> Issue: %s\n" % (
color.get(issue.severity, color['DEFAULT']),
issue.text
))
tmpstr_list.append(" Severity: %s Confidence: %s\n" % (
issue.severity.capitalize(),
issue.confidence.capitalize()
))
tmpstr_list.append(" Location: %s:%s\n" % (
issue.fname,
issue.lineno
))
tmpstr_list.append(color['DEFAULT'])
# if the result isn't filtered out by severity
if (result_store._check_severity(issue['issue_severity']) and
result_store._check_confidence(issue['issue_confidence'])):
tmpstr_list.append("\n%s>> Issue: %s\n" % (
color.get(issue['issue_severity'], color['DEFAULT']),
issue['issue_text']
))
tmpstr_list.append(" Severity: %s Confidence: %s\n" % (
issue['issue_severity'].capitalize(),
issue['issue_confidence'].capitalize()
))
tmpstr_list.append(" Location: %s:%s\n" % (
issue['fname'],
issue['lineno']
))
tmpstr_list.append(color['DEFAULT'])
tmpstr_list.append(
result_store._get_code(issue, True))
tmpstr_list.append(
issue.get_code(lines, True))
result = ''.join(tmpstr_list)
if result_store.out_file:
with open(result_store.out_file, 'w') as fout:
if filename:
with open(filename, 'w') as fout:
fout.write(result)
logger.info("Text output written to file: %s", result_store.out_file)
logger.info("Text output written to file: %s", filename)
else:
print(result)
def report_xml(result_store, file_list, scores, excluded_files):
'''Prints/returns warnings in XML format (Xunit compatible)
def report_xml(manager, filename, sev_level, conf_level, lines=-1,
out_format='xml'):
'''Prints issues in XML formt
:param result_store: results of scan as BanditResultStore object
:param files_list: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param excluded_files: Which files were excluded from the scope
:return: A collection containing the XML data
:param manager: the bandit manager object
:param filename: The output file name, or None for stdout
:param sev_level: Filtering severity level
:param conf_level: Filtering confidence level
:param lines: Number of lines to report, -1 for all
:param out_format: The ouput format name
'''
import xml.etree.cElementTree as ET
if result_store.out_file is None:
result_store.out_file = 'bandit_results.xml'
if filename is None:
filename = 'bandit_results.xml'
items = result_store.resstore.items()
root = ET.Element('testsuite', name='bandit', tests=str(len(items)))
for filename, issues in items:
for issue in issues:
test = issue['test']
testcase = ET.SubElement(root, 'testcase',
classname=filename, name=test)
if (result_store._check_severity(issue['issue_severity']) and
result_store._check_confidence(issue['issue_confidence'])):
text = 'Severity: %s Confidence: %s\n%s\nLocation %s:%s'
text = text % (
issue['issue_severity'], issue['issue_confidence'],
issue['issue_text'], issue['fname'], issue['lineno'])
ET.SubElement(testcase, 'error',
type=issue['issue_severity'],
message=issue['issue_text']).text = text
issues = manager.get_issue_list()
root = ET.Element('testsuite', name='bandit', tests=str(len(issues)))
for issue in issues:
test = issue.test
testcase = ET.SubElement(root, 'testcase',
classname=issue.fname, name=test)
if issue.filter(sev_level, conf_level):
text = 'Severity: %s Confidence: %s\n%s\nLocation %s:%s'
text = text % (
issue.severity, issue.confidence,
issue.text, issue.fname, issue.lineno)
ET.SubElement(testcase, 'error',
type=issue.severity,
message=issue.text).text = text
tree = ET.ElementTree(root)
tree.write(result_store.out_file, encoding='utf-8', xml_declaration=True)
tree.write(filename, encoding='utf-8', xml_declaration=True)
print("XML output written to file: %s" % result_store.out_file)
print("XML output written to file: %s" % filename)

84
bandit/core/issue.py Normal file
View File

@ -0,0 +1,84 @@
# -*- coding:utf-8 -*-
#
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from bandit.core import constants
from bandit.core import utils
import linecache
class Issue(object):
def __init__(self, severity, confidence=constants.CONFIDENCE_DEFAULT,
text="", ident=None):
self.severity = severity
self.confidence = confidence
self.text = text
self.ident = ident
self.fname = ""
self.test = ""
self.lineno = -1
self.linerange = []
def __str__(self):
return "Issue: '%s' from %s: Severity: %s Confidence: %s at %s:%i" % (
self.text, (self.ident or self.test), self.severity,
self.confidence, self.fname, self.lineno)
def filter(self, confidence, severity):
'''Used to filter on confidence and severity.
This wil return false if either the confidence or severity of the issue
are lower then the given threashold values.
:param confidence: Confidence threashold
:param confidence: Severity threashold
'''
rank = constants.RANKING
return (rank.index(self.severity) >= rank.index(severity) and
rank.index(self.confidence) >= rank.index(confidence))
def get_code(self, max_lines=-1, tabbed=False):
'''Gets lines of code from a file the generated this issue.
:param max_lines: Max lines of context to return
:param tabbed: Use tabbing in the output
:return: strings of code
'''
lc = linecache
file_len = sum(1 for line in open(self.fname))
lines = utils.lines_with_context(self.lineno, self.linerange,
max_lines, file_len)
if not tabbed:
return ''.join([lc.getline(self.fname, l) for l in lines])
return ''.join(["%s\t%s" % (l, lc.getline(self.fname, l))
for l in lines])
def as_dict(self, with_code=True):
'''Convert the issue to a dict of values for outputting.'''
out = {
'filename': self.fname,
'test_name': self.test,
'issue_severity': self.severity,
'issue_confidence': self.confidence,
'issue_text': self.text,
'line_number': self.lineno,
'line_range': self.linerange,
}
if with_code:
out['code'] = self.get_code()
return out

View File

@ -20,9 +20,9 @@ import os
import sys
from bandit.core import constants as constants
from bandit.core import extension_loader
from bandit.core import meta_ast as b_meta_ast
from bandit.core import node_visitor as b_node_visitor
from bandit.core import result_store as b_result_store
from bandit.core import test_set as b_test_set
@ -51,8 +51,9 @@ class BanditManager():
self.files_list = []
self.excluded_files = []
self.b_ma = b_meta_ast.BanditMetaAst()
self.b_rs = b_result_store.BanditResultStore(self.b_conf, agg_type,
verbose)
self.skipped = []
self.results = []
self.agg_type = agg_type
# if the profile name was specified, try to find it in the config
if profile_name:
@ -76,15 +77,14 @@ class BanditManager():
self.progress = self.b_conf.get_setting('progress')
self.scores = []
def get_issue_list(self):
return self.results
@property
def has_tests(self):
return self.b_ts.has_tests
@property
def get_resultstore(self):
return self.b_rs
def results_count(self, sev_filter=None, conf_filter=None):
def results_count(self, sev_filter=0, conf_filter=0):
'''Return the count of results
:param sev_filter: Severity level to filter lower
@ -95,18 +95,8 @@ class BanditManager():
rank = constants.RANKING
for issue_file in self.b_rs.resstore:
for issue in self.b_rs.resstore[issue_file]:
if (sev_filter and
rank.index(issue['issue_severity']) < sev_filter):
# don't count if this doesn't match filter requirement
continue
if (conf_filter and
rank.index(issue['issue_confidence']) < conf_filter):
continue
for issue in self.results:
if issue.filter(rank[sev_filter], rank[conf_filter]):
count += 1
return count
@ -122,13 +112,26 @@ class BanditManager():
:param output_format: output format, 'csv', 'json', 'txt', or 'xml'
:return: -
'''
try:
formatters_mgr = extension_loader.MANAGER.formatters_mgr
try:
formatter = formatters_mgr[output_format]
except KeyError: # Unrecognized format, so use text instead
formatter = formatters_mgr['txt']
output_format = 'txt'
self.b_rs.report(
self.files_list, self.scores,
excluded_files=self.excluded_files, lines=lines,
sev_level=sev_level, conf_level=conf_level,
output_filename=output_filename, output_format=output_format
)
if output_format == 'csv':
lines = 1
elif formatter.name == 'txt' and output_filename:
output_format = 'plain'
report_func = formatter.plugin
report_func(self, filename=output_filename,
sev_level=sev_level, conf_level=conf_level,
lines=lines, out_format=output_format)
except IOError:
print("Unable to write to file: %s" % self.out_file)
def output_metaast(self):
'''Outputs all the nodes from the Meta AST.'''
@ -220,18 +223,16 @@ class BanditManager():
try:
# parse the current file
score = self._execute_ast_visitor(
fname, fdata, self.b_ma,
self.b_rs, self.b_ts
)
fname, fdata, self.b_ma, self.b_ts)
self.scores.append(score)
except KeyboardInterrupt as e:
sys.exit(2)
except IOError as e:
self.b_rs.skip(fname, e.strerror)
self.skipped.append((fname, e.strerror))
new_files_list.remove(fname)
except SyntaxError as e:
self.b_rs.skip(fname,
"syntax error while parsing AST from file")
self.skipped.append(
(fname, "syntax error while parsing AST from file"))
new_files_list.remove(fname)
if len(self.files_list) > self.progress:
@ -241,22 +242,22 @@ class BanditManager():
# reflect any files which may have been skipped
self.files_list = new_files_list
def _execute_ast_visitor(self, fname, fdata, b_ma, b_rs, b_ts):
def _execute_ast_visitor(self, fname, fdata, b_ma, b_ts):
'''Execute AST parse on each file
:param fname: The name of the file being parsed
:param fdata: The file data of the file being parsed
:param b_ma: The class Meta AST instance
:param b_rs: The class result store instance
:param b_ts: The class test set instance
:return: The accumulated test score
'''
score = []
if fdata is not None:
res = b_node_visitor.BanditNodeVisitor(
fname, self.b_conf, b_ma, b_rs, b_ts, self.debug
fname, self.b_conf, b_ma, b_ts, self.debug
)
score = res.process(fdata)
self.results.extend(res.tester.results)
return score

View File

@ -33,7 +33,7 @@ class BanditNodeVisitor(object):
'imports': None, 'import_aliases': None, 'call': None,
'function': None, 'lineno': None, 'skip_lines': None}
def __init__(self, fname, config, metaast, results, testset,
def __init__(self, fname, config, metaast, testset,
debug):
self.debug = debug
self.seen = 0
@ -45,14 +45,13 @@ class BanditNodeVisitor(object):
self.fname = fname
self.config = config
self.metaast = metaast
self.results = results
self.testset = testset
self.imports = set()
self.context_template['imports'] = self.imports
self.import_aliases = {}
self.context_template['import_aliases'] = self.import_aliases
self.tester = b_tester.BanditTester(
self.config, self.results, self.testset, self.debug
self.config, self.testset, self.debug
)
# in some cases we can't determine a qualified name

View File

@ -1,19 +0,0 @@
# -*- coding:utf-8 -*-
#
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
Issue = namedtuple('Issue', 'severity confidence text')

View File

@ -1,213 +0,0 @@
# -*- coding:utf-8 -*-
#
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""An object to store/access results associated with Bandit tests."""
from collections import OrderedDict
import datetime
import linecache
from bandit.core import constants
from bandit.core import extension_loader
from bandit.core import utils
class BanditResultStore():
count = 0
skipped = None
def __init__(self, config, agg_type, verbose):
self.resstore = OrderedDict()
self.count = 0
self.skipped = []
self.config = config
self.agg_type = agg_type
self.sev_level = 0
self.conf_level = 0
self.max_lines = -1
self.format = 'txt'
self.out_file = None
self.verbose = verbose
self.generated_time = datetime.datetime.utcnow()
def skip(self, filename, reason):
'''Indicates that the specified file was skipped and why
:param filename: The file that was skipped
:param reason: Why the file was skipped
:return: -
'''
self.skipped.append((filename, reason))
def add(self, context, test, issue):
'''Adds a result, with the context and the issue that was found
:param context: Context of the node
:param test: The type (function name) of the test
:param issue: Which issue was found
:return: -
'''
filename = context['filename']
lineno = context['lineno']
linerange = context['linerange']
(issue_severity, issue_confidence, issue_text) = issue
if self.agg_type == 'vuln':
key = test
else:
key = filename
self.resstore.setdefault(key, []).append(
{'fname': filename,
'test': test,
'lineno': lineno,
'linerange': linerange,
'issue_severity': issue_severity,
'issue_confidence': issue_confidence,
'issue_text': issue_text})
self.count += 1
def _write_report(self, files_list, scores, excluded_files):
formatters_mgr = extension_loader.MANAGER.formatters_mgr
try:
formatter = formatters_mgr[self.format]
except KeyError: # Unrecognized format, so use text instead
formatter = formatters_mgr['txt']
if self.format == 'csv':
self.max_lines = 1
elif formatter.name == 'txt' and self.out_file:
self.format = 'plain'
report_func = formatter.plugin
report_func(self, files_list, scores, excluded_files=excluded_files)
def report(self, files_list, scores, excluded_files=None, lines=-1,
sev_level=1, conf_level=1, output_filename=None,
output_format=None):
'''Prints the contents of the result store
:param scope: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param lines: # of lines around the issue line to display (optional)
:param sev_level: What level of severity to display (optional)
:param conf_level: What level of confidence to display (optional)
:param output_filename: File to output the results (optional)
:param output_format: File type to output (csv|json|txt|xml)
:return: -
'''
if not excluded_files:
excluded_files = []
if sev_level >= len(constants.RANKING):
sev_level = len(constants.RANKING) - 1
if conf_level >= len(constants.RANKING):
conf_level = len(constants.RANKING) - 1
self.sev_level = sev_level
self.conf_level = conf_level
self.max_lines = lines
self.format = output_format
self.out_file = output_filename
try:
self._write_report(files_list, scores, excluded_files)
except IOError:
print("Unable to write to file: %s" % self.out_file)
def _get_issue_list(self):
collector = list()
for group in self.resstore.items():
issue_list = group[1]
for issue in issue_list:
if (self._check_severity(issue['issue_severity']) and
self._check_confidence(issue['issue_confidence'])):
code = self._get_code(issue, True)
holder = dict({
"filename": issue['fname'],
"line_number": issue['lineno'],
"line_range": issue['linerange'],
"test_name": issue['test'],
"issue_severity": issue['issue_severity'],
"issue_confidence": issue['issue_confidence'],
"code": code,
"issue_text": issue['issue_text']
})
collector.append(holder)
return collector
def _get_code(self, issue, tabbed=False):
'''Gets lines of code from a file
:param filename: Filename of file with code in it
:param line_list: A list of integers corresponding to line numbers
:return: string of code
'''
issue_line = []
prepend = ""
file_len = sum(1 for line in open(issue['fname']))
lines = utils.lines_with_context(issue['lineno'],
issue['linerange'],
self.max_lines,
file_len)
for l in lines:
if l:
if tabbed:
prepend = "%s\t" % l
issue_line.append(prepend + linecache.getline(
issue['fname'],
l))
return ''.join(issue_line)
def _sum_scores(self, scores):
'''Get total of all scores
This just computes the sum of all recorded scores, filtering them
on the chosen minimum severity level.
:param score_list: the list of scores to total
:return: an integer total sum of all scores above the threshold
'''
total = 0
for score_type in scores:
total = total + sum(scores[score_type][self.sev_level:])
return total
def _check_severity(self, severity):
'''Check severity level
returns true if the issue severity is above the threshold.
:param severity: the severity of the issue being checked
:return: boolean result
'''
return constants.RANKING.index(severity) >= self.sev_level
def _check_confidence(self, confidence):
'''Check confidence level
returns true if the issue confidence is above the threshold.
:param confidence: the confidence of the issue being checked
:return: boolean result
'''
return constants.RANKING.index(confidence) >= self.conf_level

View File

@ -29,12 +29,9 @@ logger = logging.getLogger(__name__)
class BanditTester():
results = None
def __init__(self, config, results, testset, debug):
def __init__(self, config, testset, debug):
self.config = config
self.results = results
self.results = []
self.testset = testset
self.last_result = None
self.debug = debug
@ -75,29 +72,23 @@ class BanditTester():
else:
result = test(context)
# the test call returns a 2- or 3-tuple
# - (issue_severity, issue_text) or
# - (issue_severity, issue_confidence, issue_text)
# add default confidence level, if not returned by test
if (result is not None and len(result) == 2):
result = (
result[0],
constants.CONFIDENCE_DEFAULT,
result[1]
)
# if we have a result, record it and update scores
if result is not None:
self.results.add(temp_context, name, result)
result.fname = temp_context['filename']
result.lineno = temp_context['lineno']
result.linerange = temp_context['linerange']
result.test = test.__name__
self.results.append(result)
logger.debug(
"Issue identified by %s: %s", name, result
)
sev = constants.RANKING.index(result[0])
val = constants.RANKING_VALUES[result[0]]
sev = constants.RANKING.index(result.severity)
val = constants.RANKING_VALUES[result.severity]
scores['SEVERITY'][sev] += val
con = constants.RANKING.index(result[1])
val = constants.RANKING_VALUES[result[1]]
con = constants.RANKING.index(result.confidence)
val = constants.RANKING_VALUES[result.confidence]
scores['CONFIDENCE'][con] += val
except Exception as e:

View File

@ -87,7 +87,8 @@ def blacklist_calls(context, config):
return bandit.Issue(
severity=level, confidence=confidence,
text="%s %s" % (message, context.call_args_string)
text="%s %s" % (message, context.call_args_string),
ident=context.call_function_name_qual
)

View File

@ -174,7 +174,7 @@ class FunctionalTests(testtools.TestCase):
def test_nonsense(self):
'''Test that a syntactically invalid module is skipped.'''
self.run_example('nonsense.py')
self.assertEqual(1, len(self.b_mgr.b_rs.skipped))
self.assertEqual(1, len(self.b_mgr.skipped))
def test_okay(self):
'''Test a vulnerability-free file.'''
@ -385,27 +385,6 @@ class FunctionalTests(testtools.TestCase):
self.check_example('try_except_pass.py', expect)
def test_multiline_code(self):
'''Test issues in multiline statements return code as expected.'''
self.run_example('multiline-str.py')
self.assertEqual(0, len(self.b_mgr.b_rs.skipped))
self.assertEqual(1, len(self.b_mgr.files_list))
self.assertTrue(self.b_mgr.files_list[0].endswith('multiline-str.py'))
issues = self.b_mgr.b_rs._get_issue_list()
self.assertEqual(3, len(issues))
self.assertTrue(
issues[0]['filename'].endswith('examples/multiline-str.py')
)
self.assertEqual(4, issues[0]['line_number'])
self.assertEqual(range(2, 7), issues[0]['line_range'])
self.assertIn('/tmp', issues[0]['code'])
self.assertEqual(18, issues[1]['line_number'])
self.assertEqual(range(16, 19), issues[1]['line_range'])
self.assertIn('/tmp', issues[1]['code'])
self.assertEqual(23, issues[2]['line_number'])
self.assertEqual(range(22, 31), issues[2]['line_range'])
self.assertIn('/tmp', issues[2]['code'])
def test_weak_cryptographic_key(self):
'''Test for weak key sizes.'''
expect = {
@ -413,3 +392,25 @@ class FunctionalTests(testtools.TestCase):
'CONFIDENCE': {'HIGH': 8}
}
self.check_example('weak_cryptographic_key_sizes.py', expect)
def test_multiline_code(self):
'''Test issues in multiline statements return code as expected.'''
self.run_example('multiline-str.py')
self.assertEqual(0, len(self.b_mgr.skipped))
self.assertEqual(1, len(self.b_mgr.files_list))
self.assertTrue(self.b_mgr.files_list[0].endswith('multiline-str.py'))
issues = self.b_mgr.get_issue_list()
self.assertEqual(3, len(issues))
self.assertTrue(
issues[0].fname.endswith('examples/multiline-str.py')
)
self.assertEqual(4, issues[0].lineno)
self.assertEqual(range(2, 7), issues[0].linerange)
self.assertIn('/tmp', issues[0].get_code())
self.assertEqual(18, issues[1].lineno)
self.assertEqual(range(16, 19), issues[1].linerange)
self.assertIn('/tmp', issues[1].get_code())
self.assertEqual(23, issues[2].lineno)
self.assertEqual(range(22, 31), issues[2].linerange)
self.assertIn('/tmp', issues[2].get_code())

View File

@ -27,6 +27,7 @@ from bandit.core import constants
from bandit.core import config
from bandit.core import manager
from bandit.core import formatters
from bandit.core import issue
class FormattersTests(testtools.TestCase):
@ -41,21 +42,28 @@ class FormattersTests(testtools.TestCase):
'lineno': 4,
'linerange': [4]}
self.check_name = 'hardcoded_bind_all_interfaces'
self.issue = (bandit.MEDIUM, bandit.MEDIUM,
self.issue = issue.Issue(bandit.MEDIUM, bandit.MEDIUM,
'Possible binding to all interfaces.')
self.manager.b_rs.out_file = self.tmp_fname
self.manager.b_rs.add(self.context, self.check_name, self.issue)
self.manager.out_file = self.tmp_fname
self.issue.fname = self.context['filename']
self.issue.lineno = self.context['lineno']
self.issue.linerange = self.context['linerange']
self.issue.test = self.check_name
self.manager.results.append(self.issue)
def test_report_csv(self):
formatters.report_csv(self.manager.b_rs, None, None, None)
formatters.report_csv(self.manager, self.tmp_fname,
self.issue.severity, self.issue.confidence)
with open(self.tmp_fname) as f:
reader = csv.DictReader(f)
data = six.next(reader)
self.assertEqual(self.tmp_fname, data['filename'])
self.assertEqual(self.issue[0], data['issue_severity'])
self.assertEqual(self.issue[1], data['issue_confidence'])
self.assertEqual(self.issue[2], data['issue_text'])
self.assertEqual(self.issue.severity, data['issue_severity'])
self.assertEqual(self.issue.confidence, data['issue_confidence'])
self.assertEqual(self.issue.text, data['issue_text'])
self.assertEqual(six.text_type(self.context['lineno']),
data['line_number'])
self.assertEqual(six.text_type(self.context['linerange']),
@ -63,21 +71,22 @@ class FormattersTests(testtools.TestCase):
self.assertEqual(self.check_name, data['test_name'])
def test_report_json(self):
file_list = ['binding.py']
scores = [{'SEVERITY': [0] * len(constants.RANKING),
'CONFIDENCE': [0] * len(constants.RANKING)}]
self.manager.files_list = ['binding.py']
self.manager.scores = [{'SEVERITY': [0] * len(constants.RANKING),
'CONFIDENCE': [0] * len(constants.RANKING)}]
formatters.report_json(self.manager.b_rs, file_list, scores, None)
formatters.report_json(self.manager, self.tmp_fname,
self.issue.severity, self.issue.confidence)
with open(self.tmp_fname) as f:
data = json.loads(f.read())
self.assertIsNotNone(data['generated_at'])
self.assertEqual(self.tmp_fname, data['results'][0]['filename'])
self.assertEqual(self.issue[0],
self.assertEqual(self.issue.severity,
data['results'][0]['issue_severity'])
self.assertEqual(self.issue[1],
self.assertEqual(self.issue.confidence,
data['results'][0]['issue_confidence'])
self.assertEqual(self.issue[2], data['results'][0]['issue_text'])
self.assertEqual(self.issue.text, data['results'][0]['issue_text'])
self.assertEqual(self.context['lineno'],
data['results'][0]['line_number'])
self.assertEqual(self.context['linerange'],
@ -87,21 +96,22 @@ class FormattersTests(testtools.TestCase):
self.assertEqual(0, data['stats'][0]['score'])
def test_report_text(self):
self.manager.b_rs.format = 'txt'
self.manager.b_rs.verbose = True
self.manager.verbose = True
file_list = ['binding.py']
scores = [{'SEVERITY': [0] * len(constants.RANKING),
'CONFIDENCE': [0] * len(constants.RANKING)}]
exc_files = ['test_binding.py']
formatters.report_text(self.manager.b_rs, file_list, scores, exc_files)
formatters.report_text(self.manager, self.tmp_fname,
self.issue.severity, self.issue.confidence)
with open(self.tmp_fname) as f:
data = f.read()
expected = '>> Issue: %s' % self.issue[2]
expected = '>> Issue: %s' % self.issue.text
self.assertIn(expected, data)
expected = ' Severity: %s Confidence: %s' % (
self.issue[0].capitalize(), self.issue[1].capitalize())
self.issue.severity.capitalize(),
self.issue.confidence.capitalize())
self.assertIn(expected, data)
expected = ' Location: %s:%d' % (self.tmp_fname,
self.context['lineno'])
@ -128,13 +138,14 @@ class FormattersTests(testtools.TestCase):
return d
def test_report_xml(self):
formatters.report_xml(self.manager.b_rs, None, None, None)
formatters.report_xml(self.manager, self.tmp_fname,
self.issue.severity, self.issue.confidence)
with open(self.tmp_fname) as f:
data = self._xml_to_dict(ET.XML(f.read()))
self.assertEqual(self.tmp_fname,
data['testsuite']['testcase']['@classname'])
self.assertEqual(self.issue[2],
self.assertEqual(self.issue.text,
data['testsuite']['testcase']['error']['@message'])
self.assertEqual(self.check_name,
data['testsuite']['testcase']['@name'])