From 9e522d3322295b6236315f4f1eabe47f2c2a58ff Mon Sep 17 00:00:00 2001 From: Tim Kelsey Date: Wed, 2 Sep 2015 23:58:25 +0100 Subject: [PATCH] Simplifying Result Store This change removes the results store, instead we just use a list of issues. This makes things much simpler, any code that was of value in the results store has been moved into the new Issue class (replaceing the old named tuple) or into the manager. Change-Id: I595a047f9df44b0e7df763d7c87624c9579da83c --- bandit/__init__.py | 3 +- bandit/bandit.py | 8 +- bandit/core/__init__.py | 3 +- bandit/core/formatters.py | 232 +++++++++++++++------------- bandit/core/issue.py | 84 ++++++++++ bandit/core/manager.py | 71 ++++----- bandit/core/node_visitor.py | 5 +- bandit/core/objects.py | 19 --- bandit/core/result_store.py | 213 ------------------------- bandit/core/tester.py | 35 ++--- bandit/plugins/blacklist_calls.py | 3 +- tests/functional/test_functional.py | 45 +++--- tests/unit/core/test_formatters.py | 53 ++++--- 13 files changed, 322 insertions(+), 452 deletions(-) create mode 100644 bandit/core/issue.py delete mode 100644 bandit/core/objects.py delete mode 100644 bandit/core/result_store.py diff --git a/bandit/__init__.py b/bandit/__init__.py index 247876e7..5abded84 100644 --- a/bandit/__init__.py +++ b/bandit/__init__.py @@ -24,10 +24,9 @@ from bandit.core import context # noqa from bandit.core import manager # noqa from bandit.core import meta_ast # noqa from bandit.core import node_visitor # noqa -from bandit.core import result_store # noqa from bandit.core import test_set # noqa from bandit.core import tester # noqa from bandit.core import utils # noqa from bandit.core.constants import * # noqa -from bandit.core.objects import * # noqa +from bandit.core.issue import * # noqa from bandit.core.test_properties import * # noqa diff --git a/bandit/bandit.py b/bandit/bandit.py index 1c8eca7d..a6d7b254 100644 --- a/bandit/bandit.py +++ b/bandit/bandit.py @@ -24,9 +24,11 @@ import sysconfig import appdirs from bandit.core import config as b_config +from bandit.core import constants from bandit.core import manager as b_manager from bandit.core import utils + BASE_CONFIG = 'bandit.yaml' logger = logging.getLogger() @@ -230,8 +232,10 @@ def main(): b_mgr.output_metaast() # trigger output of results by Bandit Manager - b_mgr.output_results(args.context_lines, args.severity - 1, - args.confidence - 1, args.output_file, + b_mgr.output_results(args.context_lines, + constants.RANKING[args.severity - 1], + constants.RANKING[args.confidence - 1], + args.output_file, args.output_format) # return an exit code of 1 if there are results, 0 otherwise diff --git a/bandit/core/__init__.py b/bandit/core/__init__.py index 93db1437..319eae59 100644 --- a/bandit/core/__init__.py +++ b/bandit/core/__init__.py @@ -19,10 +19,9 @@ from bandit.core import context # noqa from bandit.core import manager # noqa from bandit.core import meta_ast # noqa from bandit.core import node_visitor # noqa -from bandit.core import result_store # noqa from bandit.core import test_set # noqa from bandit.core import tester # noqa from bandit.core import utils # noqa from bandit.core.constants import * # noqa -from bandit.core.objects import * # noqa +from bandit.core.issue import * # noqa from bandit.core.test_properties import * # noqa diff --git a/bandit/core/formatters.py b/bandit/core/formatters.py index 5f804625..6d26a842 100644 --- a/bandit/core/formatters.py +++ b/bandit/core/formatters.py @@ -26,28 +26,32 @@ from bandit.core import constants logger = logging.getLogger(__name__) -def report_csv(result_store, file_list, scores, excluded_files): - '''Prints/returns warnings in JSON format +def _sum_scores(manager, sev): + summation = 0 + for scores in manager.scores: + summation += sum(scores['CONFIDENCE'][sev:]) + summation += sum(scores['SEVERITY'][sev:]) + return summation - :param result_store: results of scan as BanditResultStore object - :param files_list: Which files were inspected - :param scores: The scores awarded to each file in the scope - :param excluded_files: Which files were excluded from the scope - :return: A collection containing the CSV data + +def report_csv(manager, filename, sev_level, conf_level, lines=-1, + out_format='csv'): + '''Prints issues in CSV format + + :param manager: the bandit manager object + :param filename: The output file name, or None for stdout + :param sev_level: Filtering severity level + :param conf_level: Filtering confidence level + :param lines: Number of lines to report, -1 for all + :param out_format: The ouput format name ''' - results = result_store._get_issue_list() + results = manager.get_issue_list() - # Remove the code from all the issues in the list, as we will not - # be including it in the CSV data. - def del_code(issue): - del issue['code'] - map(del_code, results) + if filename is None: + filename = 'bandit_results.csv' - if result_store.out_file is None: - result_store.out_file = 'bandit_results.csv' - - with open(result_store.out_file, 'w') as fout: + with open(filename, 'w') as fout: fieldnames = ['filename', 'test_name', 'issue_severity', @@ -59,33 +63,37 @@ def report_csv(result_store, file_list, scores, excluded_files): writer = csv.DictWriter(fout, fieldnames=fieldnames, extrasaction='ignore') writer.writeheader() - writer.writerows(results) + for result in results: + if result.filter(sev_level, conf_level): + writer.writerow(result.as_dict(with_code=False)) - print("CSV output written to file: %s" % result_store.out_file) + print("CSV output written to file: %s" % filename) -def report_json(result_store, file_list, scores, excluded_files): - '''Prints/returns warnings in JSON format +def report_json(manager, filename, sev_level, conf_level, lines=-1, + out_format='json'): + '''''Prints issues in JSON format - :param result_store: results of scan as BanditResultStore object - :param files_list: Which files were inspected - :param scores: The scores awarded to each file in the scope - :param excluded_files: Which files were excluded from the scope - :return: JSON string + :param manager: the bandit manager object + :param filename: The output file name, or None for stdout + :param sev_level: Filtering severity level + :param conf_level: Filtering confidence level + :param lines: Number of lines to report, -1 for all + :param out_format: The ouput format name ''' - stats = dict(zip(file_list, scores)) - + stats = dict(zip(manager.files_list, manager.scores)) machine_output = dict({'results': [], 'errors': [], 'stats': []}) - collector = list() - for (fname, reason) in result_store.skipped: + for (fname, reason) in manager.skipped: machine_output['errors'].append({'filename': fname, - 'reason': reason}) + 'reason': reason}) for filer, score in six.iteritems(stats): totals = {} - for i in range(result_store.sev_level, len(constants.RANKING)): - severity = constants.RANKING[i] + rank = constants.RANKING + sev_idx = rank.index(sev_level) + for i in range(sev_idx, len(rank)): + severity = rank[i] severity_value = constants.RANKING_VALUES[severity] try: sc = score['SEVERITY'][i] / severity_value @@ -95,14 +103,18 @@ def report_json(result_store, file_list, scores, excluded_files): machine_output['stats'].append({ 'filename': filer, - 'score': result_store._sum_scores(score), + 'score': _sum_scores(manager, sev_idx), 'issue totals': totals}) - collector = result_store._get_issue_list() + results = manager.get_issue_list() + collector = [] + for result in results: + if result.filter(sev_level, conf_level): + collector.append(result.as_dict()) - if result_store.agg_type == 'vuln': + if manager.agg_type == 'vuln': machine_output['results'] = sorted(collector, - key=itemgetter('error_type')) + key=itemgetter('test_name')) else: machine_output['results'] = sorted(collector, key=itemgetter('filename')) @@ -110,29 +122,30 @@ def report_json(result_store, file_list, scores, excluded_files): # timezone agnostic format TS_FORMAT = "%Y-%m-%dT%H:%M:%SZ" - time_string = result_store.generated_time.strftime(TS_FORMAT) + time_string = datetime.datetime.utcnow().strftime(TS_FORMAT) machine_output['generated_at'] = time_string result = json.dumps(machine_output, sort_keys=True, indent=2, separators=(',', ': ')) - if result_store.out_file: - with open(result_store.out_file, 'w') as fout: + if filename: + with open(filename, 'w') as fout: fout.write(result) - # XXX: Should this be log output? (ukbelch) - print("JSON output written to file: %s" % result_store.out_file) + logger.info("JSON output written to file: %s" % filename) else: print(result) -def report_text(result_store, files_list, scores, excluded_files): - '''Prints the contents of the result store +def report_text(manager, filename, sev_level, conf_level, lines=-1, + out_format='txt'): + '''Prints issues in Text formt - :param result_store: results of scan as BanditResultStore object - :param files_list: Which files were inspected - :param scores: The scores awarded to each file in the scope - :param excluded_files: List of files excluded from the scope - :return: TXT string with appropriate TTY coloring for terminals + :param manager: the bandit manager object + :param filename: The output file name, or None for stdout + :param sev_level: Filtering severity level + :param conf_level: Filtering confidence level + :param lines: Number of lines to report, -1 for all + :param out_format: The ouput format name ''' tmpstr_list = [] @@ -140,9 +153,9 @@ def report_text(result_store, files_list, scores, excluded_files): # use a defaultdict to default to an empty string color = collections.defaultdict(str) - if result_store.format == 'txt': + if out_format == 'txt': # get text colors from settings for TTY output - get_setting = result_store.config.get_setting + get_setting = manager.b_conf.get_setting color = {'HEADER': get_setting('color_HEADER'), 'DEFAULT': get_setting('color_DEFAULT'), 'LOW': get_setting('color_LOW'), @@ -157,30 +170,30 @@ def report_text(result_store, files_list, scores, excluded_files): datetime.datetime.utcnow() )) - if result_store.verbose: + if manager.verbose: # print which files were inspected tmpstr_list.append("\n%sFiles in scope (%s):%s\n" % ( - color['HEADER'], len(files_list), + color['HEADER'], len(manager.files_list), color['DEFAULT'] )) - for item in zip(files_list, map(result_store._sum_scores, scores)): + for item in zip(manager.files_list, map(_sum_scores, manager.scores)): tmpstr_list.append("\t%s (score: %i)\n" % item) # print which files were excluded and why tmpstr_list.append("\n%sFiles excluded (%s):%s\n" % - (color['HEADER'], len(excluded_files), + (color['HEADER'], len(manager.skipped), color['DEFAULT'])) - for fname in excluded_files: + for fname in manager.skipped: tmpstr_list.append("\t%s\n" % fname) # print which files were skipped and why tmpstr_list.append("\n%sFiles skipped (%s):%s\n" % ( - color['HEADER'], len(result_store.skipped), + color['HEADER'], len(manager.skipped), color['DEFAULT'] )) - for (fname, reason) in result_store.skipped: + for (fname, reason) in manager.skipped: tmpstr_list.append("\t%s (%s)\n" % (fname, reason)) # print the results @@ -188,75 +201,74 @@ def report_text(result_store, files_list, scores, excluded_files): color['HEADER'], color['DEFAULT'] )) - if result_store.count == 0: + issues = manager.get_issue_list() + if not len(issues): tmpstr_list.append("\tNo issues identified.\n") - for filename, issues in result_store.resstore.items(): - for issue in issues: + for issue in issues: + # if the result isn't filtered out by severity + if issue.filter(sev_level, conf_level): + tmpstr_list.append("\n%s>> Issue: %s\n" % ( + color.get(issue.severity, color['DEFAULT']), + issue.text + )) + tmpstr_list.append(" Severity: %s Confidence: %s\n" % ( + issue.severity.capitalize(), + issue.confidence.capitalize() + )) + tmpstr_list.append(" Location: %s:%s\n" % ( + issue.fname, + issue.lineno + )) + tmpstr_list.append(color['DEFAULT']) - # if the result isn't filtered out by severity - if (result_store._check_severity(issue['issue_severity']) and - result_store._check_confidence(issue['issue_confidence'])): - tmpstr_list.append("\n%s>> Issue: %s\n" % ( - color.get(issue['issue_severity'], color['DEFAULT']), - issue['issue_text'] - )) - tmpstr_list.append(" Severity: %s Confidence: %s\n" % ( - issue['issue_severity'].capitalize(), - issue['issue_confidence'].capitalize() - )) - tmpstr_list.append(" Location: %s:%s\n" % ( - issue['fname'], - issue['lineno'] - )) - tmpstr_list.append(color['DEFAULT']) - - tmpstr_list.append( - result_store._get_code(issue, True)) + tmpstr_list.append( + issue.get_code(lines, True)) result = ''.join(tmpstr_list) - if result_store.out_file: - with open(result_store.out_file, 'w') as fout: + if filename: + with open(filename, 'w') as fout: fout.write(result) - logger.info("Text output written to file: %s", result_store.out_file) + logger.info("Text output written to file: %s", filename) else: print(result) -def report_xml(result_store, file_list, scores, excluded_files): - '''Prints/returns warnings in XML format (Xunit compatible) +def report_xml(manager, filename, sev_level, conf_level, lines=-1, + out_format='xml'): + '''Prints issues in XML formt - :param result_store: results of scan as BanditResultStore object - :param files_list: Which files were inspected - :param scores: The scores awarded to each file in the scope - :param excluded_files: Which files were excluded from the scope - :return: A collection containing the XML data + :param manager: the bandit manager object + :param filename: The output file name, or None for stdout + :param sev_level: Filtering severity level + :param conf_level: Filtering confidence level + :param lines: Number of lines to report, -1 for all + :param out_format: The ouput format name ''' import xml.etree.cElementTree as ET - if result_store.out_file is None: - result_store.out_file = 'bandit_results.xml' + if filename is None: + filename = 'bandit_results.xml' - items = result_store.resstore.items() - root = ET.Element('testsuite', name='bandit', tests=str(len(items))) - for filename, issues in items: - for issue in issues: - test = issue['test'] - testcase = ET.SubElement(root, 'testcase', - classname=filename, name=test) - if (result_store._check_severity(issue['issue_severity']) and - result_store._check_confidence(issue['issue_confidence'])): - text = 'Severity: %s Confidence: %s\n%s\nLocation %s:%s' - text = text % ( - issue['issue_severity'], issue['issue_confidence'], - issue['issue_text'], issue['fname'], issue['lineno']) - ET.SubElement(testcase, 'error', - type=issue['issue_severity'], - message=issue['issue_text']).text = text + issues = manager.get_issue_list() + root = ET.Element('testsuite', name='bandit', tests=str(len(issues))) + + for issue in issues: + test = issue.test + testcase = ET.SubElement(root, 'testcase', + classname=issue.fname, name=test) + if issue.filter(sev_level, conf_level): + text = 'Severity: %s Confidence: %s\n%s\nLocation %s:%s' + text = text % ( + issue.severity, issue.confidence, + issue.text, issue.fname, issue.lineno) + ET.SubElement(testcase, 'error', + type=issue.severity, + message=issue.text).text = text tree = ET.ElementTree(root) - tree.write(result_store.out_file, encoding='utf-8', xml_declaration=True) + tree.write(filename, encoding='utf-8', xml_declaration=True) - print("XML output written to file: %s" % result_store.out_file) + print("XML output written to file: %s" % filename) diff --git a/bandit/core/issue.py b/bandit/core/issue.py new file mode 100644 index 00000000..c7c5f57a --- /dev/null +++ b/bandit/core/issue.py @@ -0,0 +1,84 @@ +# -*- coding:utf-8 -*- +# +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from bandit.core import constants +from bandit.core import utils + +import linecache + + +class Issue(object): + def __init__(self, severity, confidence=constants.CONFIDENCE_DEFAULT, + text="", ident=None): + self.severity = severity + self.confidence = confidence + self.text = text + self.ident = ident + self.fname = "" + self.test = "" + self.lineno = -1 + self.linerange = [] + + def __str__(self): + return "Issue: '%s' from %s: Severity: %s Confidence: %s at %s:%i" % ( + self.text, (self.ident or self.test), self.severity, + self.confidence, self.fname, self.lineno) + + def filter(self, confidence, severity): + '''Used to filter on confidence and severity. + + This wil return false if either the confidence or severity of the issue + are lower then the given threashold values. + + :param confidence: Confidence threashold + :param confidence: Severity threashold + ''' + rank = constants.RANKING + return (rank.index(self.severity) >= rank.index(severity) and + rank.index(self.confidence) >= rank.index(confidence)) + + def get_code(self, max_lines=-1, tabbed=False): + '''Gets lines of code from a file the generated this issue. + + :param max_lines: Max lines of context to return + :param tabbed: Use tabbing in the output + :return: strings of code + ''' + lc = linecache + file_len = sum(1 for line in open(self.fname)) + lines = utils.lines_with_context(self.lineno, self.linerange, + max_lines, file_len) + + if not tabbed: + return ''.join([lc.getline(self.fname, l) for l in lines]) + return ''.join(["%s\t%s" % (l, lc.getline(self.fname, l)) + for l in lines]) + + def as_dict(self, with_code=True): + '''Convert the issue to a dict of values for outputting.''' + out = { + 'filename': self.fname, + 'test_name': self.test, + 'issue_severity': self.severity, + 'issue_confidence': self.confidence, + 'issue_text': self.text, + 'line_number': self.lineno, + 'line_range': self.linerange, + } + + if with_code: + out['code'] = self.get_code() + return out diff --git a/bandit/core/manager.py b/bandit/core/manager.py index c17c37c1..d61cd256 100644 --- a/bandit/core/manager.py +++ b/bandit/core/manager.py @@ -20,9 +20,9 @@ import os import sys from bandit.core import constants as constants +from bandit.core import extension_loader from bandit.core import meta_ast as b_meta_ast from bandit.core import node_visitor as b_node_visitor -from bandit.core import result_store as b_result_store from bandit.core import test_set as b_test_set @@ -51,8 +51,9 @@ class BanditManager(): self.files_list = [] self.excluded_files = [] self.b_ma = b_meta_ast.BanditMetaAst() - self.b_rs = b_result_store.BanditResultStore(self.b_conf, agg_type, - verbose) + self.skipped = [] + self.results = [] + self.agg_type = agg_type # if the profile name was specified, try to find it in the config if profile_name: @@ -76,15 +77,14 @@ class BanditManager(): self.progress = self.b_conf.get_setting('progress') self.scores = [] + def get_issue_list(self): + return self.results + @property def has_tests(self): return self.b_ts.has_tests - @property - def get_resultstore(self): - return self.b_rs - - def results_count(self, sev_filter=None, conf_filter=None): + def results_count(self, sev_filter=0, conf_filter=0): '''Return the count of results :param sev_filter: Severity level to filter lower @@ -95,18 +95,8 @@ class BanditManager(): rank = constants.RANKING - for issue_file in self.b_rs.resstore: - for issue in self.b_rs.resstore[issue_file]: - - if (sev_filter and - rank.index(issue['issue_severity']) < sev_filter): - # don't count if this doesn't match filter requirement - continue - - if (conf_filter and - rank.index(issue['issue_confidence']) < conf_filter): - continue - + for issue in self.results: + if issue.filter(rank[sev_filter], rank[conf_filter]): count += 1 return count @@ -122,13 +112,26 @@ class BanditManager(): :param output_format: output format, 'csv', 'json', 'txt', or 'xml' :return: - ''' + try: + formatters_mgr = extension_loader.MANAGER.formatters_mgr + try: + formatter = formatters_mgr[output_format] + except KeyError: # Unrecognized format, so use text instead + formatter = formatters_mgr['txt'] + output_format = 'txt' - self.b_rs.report( - self.files_list, self.scores, - excluded_files=self.excluded_files, lines=lines, - sev_level=sev_level, conf_level=conf_level, - output_filename=output_filename, output_format=output_format - ) + if output_format == 'csv': + lines = 1 + elif formatter.name == 'txt' and output_filename: + output_format = 'plain' + + report_func = formatter.plugin + report_func(self, filename=output_filename, + sev_level=sev_level, conf_level=conf_level, + lines=lines, out_format=output_format) + + except IOError: + print("Unable to write to file: %s" % self.out_file) def output_metaast(self): '''Outputs all the nodes from the Meta AST.''' @@ -220,18 +223,16 @@ class BanditManager(): try: # parse the current file score = self._execute_ast_visitor( - fname, fdata, self.b_ma, - self.b_rs, self.b_ts - ) + fname, fdata, self.b_ma, self.b_ts) self.scores.append(score) except KeyboardInterrupt as e: sys.exit(2) except IOError as e: - self.b_rs.skip(fname, e.strerror) + self.skipped.append((fname, e.strerror)) new_files_list.remove(fname) except SyntaxError as e: - self.b_rs.skip(fname, - "syntax error while parsing AST from file") + self.skipped.append( + (fname, "syntax error while parsing AST from file")) new_files_list.remove(fname) if len(self.files_list) > self.progress: @@ -241,22 +242,22 @@ class BanditManager(): # reflect any files which may have been skipped self.files_list = new_files_list - def _execute_ast_visitor(self, fname, fdata, b_ma, b_rs, b_ts): + def _execute_ast_visitor(self, fname, fdata, b_ma, b_ts): '''Execute AST parse on each file :param fname: The name of the file being parsed :param fdata: The file data of the file being parsed :param b_ma: The class Meta AST instance - :param b_rs: The class result store instance :param b_ts: The class test set instance :return: The accumulated test score ''' score = [] if fdata is not None: res = b_node_visitor.BanditNodeVisitor( - fname, self.b_conf, b_ma, b_rs, b_ts, self.debug + fname, self.b_conf, b_ma, b_ts, self.debug ) score = res.process(fdata) + self.results.extend(res.tester.results) return score diff --git a/bandit/core/node_visitor.py b/bandit/core/node_visitor.py index 801a0f23..3f83496a 100755 --- a/bandit/core/node_visitor.py +++ b/bandit/core/node_visitor.py @@ -33,7 +33,7 @@ class BanditNodeVisitor(object): 'imports': None, 'import_aliases': None, 'call': None, 'function': None, 'lineno': None, 'skip_lines': None} - def __init__(self, fname, config, metaast, results, testset, + def __init__(self, fname, config, metaast, testset, debug): self.debug = debug self.seen = 0 @@ -45,14 +45,13 @@ class BanditNodeVisitor(object): self.fname = fname self.config = config self.metaast = metaast - self.results = results self.testset = testset self.imports = set() self.context_template['imports'] = self.imports self.import_aliases = {} self.context_template['import_aliases'] = self.import_aliases self.tester = b_tester.BanditTester( - self.config, self.results, self.testset, self.debug + self.config, self.testset, self.debug ) # in some cases we can't determine a qualified name diff --git a/bandit/core/objects.py b/bandit/core/objects.py deleted file mode 100644 index 5530e6ea..00000000 --- a/bandit/core/objects.py +++ /dev/null @@ -1,19 +0,0 @@ -# -*- coding:utf-8 -*- -# -# Copyright 2014 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from collections import namedtuple - -Issue = namedtuple('Issue', 'severity confidence text') diff --git a/bandit/core/result_store.py b/bandit/core/result_store.py deleted file mode 100644 index 6e45390e..00000000 --- a/bandit/core/result_store.py +++ /dev/null @@ -1,213 +0,0 @@ -# -*- coding:utf-8 -*- -# -# Copyright 2014 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -"""An object to store/access results associated with Bandit tests.""" - -from collections import OrderedDict -import datetime -import linecache - -from bandit.core import constants -from bandit.core import extension_loader -from bandit.core import utils - - -class BanditResultStore(): - count = 0 - skipped = None - - def __init__(self, config, agg_type, verbose): - self.resstore = OrderedDict() - self.count = 0 - self.skipped = [] - self.config = config - self.agg_type = agg_type - self.sev_level = 0 - self.conf_level = 0 - self.max_lines = -1 - self.format = 'txt' - self.out_file = None - self.verbose = verbose - self.generated_time = datetime.datetime.utcnow() - - def skip(self, filename, reason): - '''Indicates that the specified file was skipped and why - - :param filename: The file that was skipped - :param reason: Why the file was skipped - :return: - - ''' - self.skipped.append((filename, reason)) - - def add(self, context, test, issue): - '''Adds a result, with the context and the issue that was found - - :param context: Context of the node - :param test: The type (function name) of the test - :param issue: Which issue was found - :return: - - ''' - filename = context['filename'] - lineno = context['lineno'] - linerange = context['linerange'] - (issue_severity, issue_confidence, issue_text) = issue - - if self.agg_type == 'vuln': - key = test - else: - key = filename - - self.resstore.setdefault(key, []).append( - {'fname': filename, - 'test': test, - 'lineno': lineno, - 'linerange': linerange, - 'issue_severity': issue_severity, - 'issue_confidence': issue_confidence, - 'issue_text': issue_text}) - - self.count += 1 - - def _write_report(self, files_list, scores, excluded_files): - formatters_mgr = extension_loader.MANAGER.formatters_mgr - try: - formatter = formatters_mgr[self.format] - except KeyError: # Unrecognized format, so use text instead - formatter = formatters_mgr['txt'] - - if self.format == 'csv': - self.max_lines = 1 - elif formatter.name == 'txt' and self.out_file: - self.format = 'plain' - - report_func = formatter.plugin - report_func(self, files_list, scores, excluded_files=excluded_files) - - def report(self, files_list, scores, excluded_files=None, lines=-1, - sev_level=1, conf_level=1, output_filename=None, - output_format=None): - '''Prints the contents of the result store - - :param scope: Which files were inspected - :param scores: The scores awarded to each file in the scope - :param lines: # of lines around the issue line to display (optional) - :param sev_level: What level of severity to display (optional) - :param conf_level: What level of confidence to display (optional) - :param output_filename: File to output the results (optional) - :param output_format: File type to output (csv|json|txt|xml) - :return: - - ''' - - if not excluded_files: - excluded_files = [] - - if sev_level >= len(constants.RANKING): - sev_level = len(constants.RANKING) - 1 - if conf_level >= len(constants.RANKING): - conf_level = len(constants.RANKING) - 1 - - self.sev_level = sev_level - self.conf_level = conf_level - self.max_lines = lines - self.format = output_format - self.out_file = output_filename - - try: - self._write_report(files_list, scores, excluded_files) - except IOError: - print("Unable to write to file: %s" % self.out_file) - - def _get_issue_list(self): - - collector = list() - - for group in self.resstore.items(): - issue_list = group[1] - for issue in issue_list: - if (self._check_severity(issue['issue_severity']) and - self._check_confidence(issue['issue_confidence'])): - code = self._get_code(issue, True) - holder = dict({ - "filename": issue['fname'], - "line_number": issue['lineno'], - "line_range": issue['linerange'], - "test_name": issue['test'], - "issue_severity": issue['issue_severity'], - "issue_confidence": issue['issue_confidence'], - "code": code, - "issue_text": issue['issue_text'] - }) - collector.append(holder) - - return collector - - def _get_code(self, issue, tabbed=False): - '''Gets lines of code from a file - - :param filename: Filename of file with code in it - :param line_list: A list of integers corresponding to line numbers - :return: string of code - ''' - issue_line = [] - prepend = "" - - file_len = sum(1 for line in open(issue['fname'])) - lines = utils.lines_with_context(issue['lineno'], - issue['linerange'], - self.max_lines, - file_len) - - for l in lines: - if l: - if tabbed: - prepend = "%s\t" % l - issue_line.append(prepend + linecache.getline( - issue['fname'], - l)) - - return ''.join(issue_line) - - def _sum_scores(self, scores): - '''Get total of all scores - - This just computes the sum of all recorded scores, filtering them - on the chosen minimum severity level. - :param score_list: the list of scores to total - :return: an integer total sum of all scores above the threshold - ''' - total = 0 - for score_type in scores: - total = total + sum(scores[score_type][self.sev_level:]) - return total - - def _check_severity(self, severity): - '''Check severity level - - returns true if the issue severity is above the threshold. - :param severity: the severity of the issue being checked - :return: boolean result - ''' - return constants.RANKING.index(severity) >= self.sev_level - - def _check_confidence(self, confidence): - '''Check confidence level - - returns true if the issue confidence is above the threshold. - :param confidence: the confidence of the issue being checked - :return: boolean result - ''' - return constants.RANKING.index(confidence) >= self.conf_level diff --git a/bandit/core/tester.py b/bandit/core/tester.py index 472e84aa..fe098122 100644 --- a/bandit/core/tester.py +++ b/bandit/core/tester.py @@ -29,12 +29,9 @@ logger = logging.getLogger(__name__) class BanditTester(): - - results = None - - def __init__(self, config, results, testset, debug): + def __init__(self, config, testset, debug): self.config = config - self.results = results + self.results = [] self.testset = testset self.last_result = None self.debug = debug @@ -75,29 +72,23 @@ class BanditTester(): else: result = test(context) - # the test call returns a 2- or 3-tuple - # - (issue_severity, issue_text) or - # - (issue_severity, issue_confidence, issue_text) - - # add default confidence level, if not returned by test - if (result is not None and len(result) == 2): - result = ( - result[0], - constants.CONFIDENCE_DEFAULT, - result[1] - ) - # if we have a result, record it and update scores if result is not None: - self.results.add(temp_context, name, result) + result.fname = temp_context['filename'] + result.lineno = temp_context['lineno'] + result.linerange = temp_context['linerange'] + result.test = test.__name__ + + self.results.append(result) + logger.debug( "Issue identified by %s: %s", name, result ) - sev = constants.RANKING.index(result[0]) - val = constants.RANKING_VALUES[result[0]] + sev = constants.RANKING.index(result.severity) + val = constants.RANKING_VALUES[result.severity] scores['SEVERITY'][sev] += val - con = constants.RANKING.index(result[1]) - val = constants.RANKING_VALUES[result[1]] + con = constants.RANKING.index(result.confidence) + val = constants.RANKING_VALUES[result.confidence] scores['CONFIDENCE'][con] += val except Exception as e: diff --git a/bandit/plugins/blacklist_calls.py b/bandit/plugins/blacklist_calls.py index 937f9529..d961e1c1 100644 --- a/bandit/plugins/blacklist_calls.py +++ b/bandit/plugins/blacklist_calls.py @@ -87,7 +87,8 @@ def blacklist_calls(context, config): return bandit.Issue( severity=level, confidence=confidence, - text="%s %s" % (message, context.call_args_string) + text="%s %s" % (message, context.call_args_string), + ident=context.call_function_name_qual ) diff --git a/tests/functional/test_functional.py b/tests/functional/test_functional.py index d43e6076..913ab1dc 100644 --- a/tests/functional/test_functional.py +++ b/tests/functional/test_functional.py @@ -174,7 +174,7 @@ class FunctionalTests(testtools.TestCase): def test_nonsense(self): '''Test that a syntactically invalid module is skipped.''' self.run_example('nonsense.py') - self.assertEqual(1, len(self.b_mgr.b_rs.skipped)) + self.assertEqual(1, len(self.b_mgr.skipped)) def test_okay(self): '''Test a vulnerability-free file.''' @@ -385,27 +385,6 @@ class FunctionalTests(testtools.TestCase): self.check_example('try_except_pass.py', expect) - def test_multiline_code(self): - '''Test issues in multiline statements return code as expected.''' - self.run_example('multiline-str.py') - self.assertEqual(0, len(self.b_mgr.b_rs.skipped)) - self.assertEqual(1, len(self.b_mgr.files_list)) - self.assertTrue(self.b_mgr.files_list[0].endswith('multiline-str.py')) - issues = self.b_mgr.b_rs._get_issue_list() - self.assertEqual(3, len(issues)) - self.assertTrue( - issues[0]['filename'].endswith('examples/multiline-str.py') - ) - self.assertEqual(4, issues[0]['line_number']) - self.assertEqual(range(2, 7), issues[0]['line_range']) - self.assertIn('/tmp', issues[0]['code']) - self.assertEqual(18, issues[1]['line_number']) - self.assertEqual(range(16, 19), issues[1]['line_range']) - self.assertIn('/tmp', issues[1]['code']) - self.assertEqual(23, issues[2]['line_number']) - self.assertEqual(range(22, 31), issues[2]['line_range']) - self.assertIn('/tmp', issues[2]['code']) - def test_weak_cryptographic_key(self): '''Test for weak key sizes.''' expect = { @@ -413,3 +392,25 @@ class FunctionalTests(testtools.TestCase): 'CONFIDENCE': {'HIGH': 8} } self.check_example('weak_cryptographic_key_sizes.py', expect) + + def test_multiline_code(self): + '''Test issues in multiline statements return code as expected.''' + self.run_example('multiline-str.py') + self.assertEqual(0, len(self.b_mgr.skipped)) + self.assertEqual(1, len(self.b_mgr.files_list)) + self.assertTrue(self.b_mgr.files_list[0].endswith('multiline-str.py')) + issues = self.b_mgr.get_issue_list() + self.assertEqual(3, len(issues)) + self.assertTrue( + issues[0].fname.endswith('examples/multiline-str.py') + ) + + self.assertEqual(4, issues[0].lineno) + self.assertEqual(range(2, 7), issues[0].linerange) + self.assertIn('/tmp', issues[0].get_code()) + self.assertEqual(18, issues[1].lineno) + self.assertEqual(range(16, 19), issues[1].linerange) + self.assertIn('/tmp', issues[1].get_code()) + self.assertEqual(23, issues[2].lineno) + self.assertEqual(range(22, 31), issues[2].linerange) + self.assertIn('/tmp', issues[2].get_code()) diff --git a/tests/unit/core/test_formatters.py b/tests/unit/core/test_formatters.py index 5ab8466a..c8acaa17 100644 --- a/tests/unit/core/test_formatters.py +++ b/tests/unit/core/test_formatters.py @@ -27,6 +27,7 @@ from bandit.core import constants from bandit.core import config from bandit.core import manager from bandit.core import formatters +from bandit.core import issue class FormattersTests(testtools.TestCase): @@ -41,21 +42,28 @@ class FormattersTests(testtools.TestCase): 'lineno': 4, 'linerange': [4]} self.check_name = 'hardcoded_bind_all_interfaces' - self.issue = (bandit.MEDIUM, bandit.MEDIUM, + self.issue = issue.Issue(bandit.MEDIUM, bandit.MEDIUM, 'Possible binding to all interfaces.') - self.manager.b_rs.out_file = self.tmp_fname - self.manager.b_rs.add(self.context, self.check_name, self.issue) + self.manager.out_file = self.tmp_fname + + self.issue.fname = self.context['filename'] + self.issue.lineno = self.context['lineno'] + self.issue.linerange = self.context['linerange'] + self.issue.test = self.check_name + + self.manager.results.append(self.issue) def test_report_csv(self): - formatters.report_csv(self.manager.b_rs, None, None, None) + formatters.report_csv(self.manager, self.tmp_fname, + self.issue.severity, self.issue.confidence) with open(self.tmp_fname) as f: reader = csv.DictReader(f) data = six.next(reader) self.assertEqual(self.tmp_fname, data['filename']) - self.assertEqual(self.issue[0], data['issue_severity']) - self.assertEqual(self.issue[1], data['issue_confidence']) - self.assertEqual(self.issue[2], data['issue_text']) + self.assertEqual(self.issue.severity, data['issue_severity']) + self.assertEqual(self.issue.confidence, data['issue_confidence']) + self.assertEqual(self.issue.text, data['issue_text']) self.assertEqual(six.text_type(self.context['lineno']), data['line_number']) self.assertEqual(six.text_type(self.context['linerange']), @@ -63,21 +71,22 @@ class FormattersTests(testtools.TestCase): self.assertEqual(self.check_name, data['test_name']) def test_report_json(self): - file_list = ['binding.py'] - scores = [{'SEVERITY': [0] * len(constants.RANKING), - 'CONFIDENCE': [0] * len(constants.RANKING)}] + self.manager.files_list = ['binding.py'] + self.manager.scores = [{'SEVERITY': [0] * len(constants.RANKING), + 'CONFIDENCE': [0] * len(constants.RANKING)}] - formatters.report_json(self.manager.b_rs, file_list, scores, None) + formatters.report_json(self.manager, self.tmp_fname, + self.issue.severity, self.issue.confidence) with open(self.tmp_fname) as f: data = json.loads(f.read()) self.assertIsNotNone(data['generated_at']) self.assertEqual(self.tmp_fname, data['results'][0]['filename']) - self.assertEqual(self.issue[0], + self.assertEqual(self.issue.severity, data['results'][0]['issue_severity']) - self.assertEqual(self.issue[1], + self.assertEqual(self.issue.confidence, data['results'][0]['issue_confidence']) - self.assertEqual(self.issue[2], data['results'][0]['issue_text']) + self.assertEqual(self.issue.text, data['results'][0]['issue_text']) self.assertEqual(self.context['lineno'], data['results'][0]['line_number']) self.assertEqual(self.context['linerange'], @@ -87,21 +96,22 @@ class FormattersTests(testtools.TestCase): self.assertEqual(0, data['stats'][0]['score']) def test_report_text(self): - self.manager.b_rs.format = 'txt' - self.manager.b_rs.verbose = True + self.manager.verbose = True file_list = ['binding.py'] scores = [{'SEVERITY': [0] * len(constants.RANKING), 'CONFIDENCE': [0] * len(constants.RANKING)}] exc_files = ['test_binding.py'] - formatters.report_text(self.manager.b_rs, file_list, scores, exc_files) + formatters.report_text(self.manager, self.tmp_fname, + self.issue.severity, self.issue.confidence) with open(self.tmp_fname) as f: data = f.read() - expected = '>> Issue: %s' % self.issue[2] + expected = '>> Issue: %s' % self.issue.text self.assertIn(expected, data) expected = ' Severity: %s Confidence: %s' % ( - self.issue[0].capitalize(), self.issue[1].capitalize()) + self.issue.severity.capitalize(), + self.issue.confidence.capitalize()) self.assertIn(expected, data) expected = ' Location: %s:%d' % (self.tmp_fname, self.context['lineno']) @@ -128,13 +138,14 @@ class FormattersTests(testtools.TestCase): return d def test_report_xml(self): - formatters.report_xml(self.manager.b_rs, None, None, None) + formatters.report_xml(self.manager, self.tmp_fname, + self.issue.severity, self.issue.confidence) with open(self.tmp_fname) as f: data = self._xml_to_dict(ET.XML(f.read())) self.assertEqual(self.tmp_fname, data['testsuite']['testcase']['@classname']) - self.assertEqual(self.issue[2], + self.assertEqual(self.issue.text, data['testsuite']['testcase']['error']['@message']) self.assertEqual(self.check_name, data['testsuite']['testcase']['@name'])