From 51dbde4cc3fbb5356ae0c71dd71abd8724006218 Mon Sep 17 00:00:00 2001 From: Dave Belcher Date: Thu, 26 Mar 2015 19:34:45 +0000 Subject: [PATCH] Refactored/optimized reporting code I went through and cleaned up all the code in result_store. The focus was on removing redundancy, cleaning things up, and making the code more readable/maintainable. I think it's in pretty good shape now. Change-Id: I05b80867ada9026b40f2fe6cad10bbc345733b88 --- bandit/core/result_store.py | 263 ++++++++++-------------------------- 1 file changed, 69 insertions(+), 194 deletions(-) diff --git a/bandit/core/result_store.py b/bandit/core/result_store.py index 2ca7641c..e68e7357 100644 --- a/bandit/core/result_store.py +++ b/bandit/core/result_store.py @@ -17,6 +17,7 @@ """An object to store/access results associated with Bandit tests.""" +from collections import defaultdict from collections import OrderedDict from datetime import datetime import json @@ -40,6 +41,7 @@ class BanditResultStore(): self.agg_type = agg_type self.level = 0 self.max_lines = -1 + self.format = 'txt' @property def count(self): @@ -72,31 +74,18 @@ class BanditResultStore(): (issue_type, issue_text) = issue if self.agg_type == 'vuln': - if test in self.resstore: - self.resstore[test].append({'fname': filename, - 'lineno': lineno, - 'linerange': linerange, - 'issue_type': issue_type, - 'issue_text': issue_text}) - else: - self.resstore[test] = [{'fname': filename, - 'lineno': lineno, - 'linerange': linerange, - 'issue_type': issue_type, - 'issue_text': issue_text}] + key = test else: - if filename in self.resstore: - self.resstore[filename].append({'lineno': lineno, - 'linerange': linerange, - 'test': test, - 'issue_type': issue_type, - 'issue_text': issue_text}) - else: - self.resstore[filename] = [{'lineno': lineno, - 'linerange': linerange, - 'test': test, - 'issue_type': issue_type, - 'issue_text': issue_text}] + key = filename + + self.resstore.setdefault(key, []).append( + {'fname': filename, + 'test': test, + 'lineno': lineno, + 'linerange': linerange, + 'issue_type': issue_type, + 'issue_text': issue_text}) + self.count += 1 def _report_json(self, file_list, scores, excluded_files): @@ -128,48 +117,19 @@ class BanditResultStore(): 'score': self._sum_scores(score), 'issue totals': totals}) - # array indeces are determined by order of tuples defined in add() - if self.agg_type == 'file': - for item in self.resstore.items(): - filename = item[0] - filelist = item[1] - for issue in filelist: - issue['fname'] = filename - if self._check_severity(issue['issue_type']): - line_range = issue['linerange'] - line_num = issue['lineno'] - error_label = str(issue['test']).strip() - error_type = str(issue['issue_type']).strip() - reason = str(issue['issue_text']).strip() - code = self._get_code(issue, True) - holder = dict({"filename": filename, - "line_num": line_num, - "line_range": line_range, - "error_label": error_label, - "error_type": error_type, - "code": code, - "reason": reason}) - collector.append(holder) - else: - for item in self.resstore.items(): - vuln_label = item[0] - filelist = item[1] - for issue in filelist: - if self._check_severity(issue['issue_type']): - filename = str(issue['fname']) - line_range = issue['linerange'] - line_num = issue['lineno'] - error_type = str(issue['issue_type']).strip() - reason = str(issue['issue_text']).strip() - code = self._get_code(issue, True) - holder = dict({"filename": filename, - "line_num": line_num, - "line_range": line_range, - "error_label": vuln_label.strip(), - "error_type": error_type, - "code": code, - "reason": reason}) - collector.append(holder) + for group in self.resstore.items(): + issue_list = group[1] + for issue in issue_list: + if self._check_severity(issue['issue_type']): + code = self._get_code(issue, True) + holder = dict({"filename": issue['fname'], + "line_num": issue['lineno'], + "line_range": issue['linerange'], + "error_label": issue['test'], + "error_type": issue['issue_type'], + "code": code, + "reason": issue['issue_text']}) + collector.append(holder) if self.agg_type == 'vuln': machine_output['results'] = sorted(collector, @@ -181,74 +141,7 @@ class BanditResultStore(): return json.dumps(machine_output, sort_keys=True, indent=2, separators=(',', ': ')) - def _report_txt(self, files_list, scores, excluded_files): - '''Returns TXT string of results - - :param scope: Which files were inspected - :param scores: The scores awarded to each file in the scope - :param lines: # of lines around the issue line to display (optional) - :param level: What level of severity to display (optional) - :return: TXT string - ''' - - tmpstr_list = [] - - # print header - tmpstr_list.append("Run started:\n\t%s\n" % datetime.utcnow()) - - # print which files were inspected - tmpstr_list.append("Files in scope (%s):\n" % (len(files_list))) - - for item in zip(files_list, map(self._sum_scores, scores)): - tmpstr_list.append("\t%s (score: %i)\n" % item) - - # print which files were excluded - tmpstr_list.append("Files excluded (%s):\n" % (len(excluded_files))) - for item in excluded_files: - tmpstr_list.append("\n\t%s" % item) - - # print which files were skipped and why - tmpstr_list.append("Files skipped (%s):" % len(self.skipped)) - for (fname, reason) in self.skipped: - tmpstr_list.append("\n\t%s (%s)" % (fname, reason)) - - # print the results - tmpstr_list.append("\nTest results:\n") - if self.count == 0: - tmpstr_list.append("\tNo issues identified.\n") - # if aggregating by vulnerability type - elif self.agg_type == 'vuln': - for test, issues in self.resstore.items(): - for issue in issues: - - # if the result in't filtered out by severity - if self._check_severity(issue['issue_type']): - tmpstr_list.append("\n>> %s\n - %s::%s\n" % ( - issue['issue_text'], - issue['fname'], - issue['lineno'] - )) - - tmpstr_list.append( - self._get_code(issue, True)) - - # otherwise, aggregating by filename - else: - for filename, issues in self.resstore.items(): - for issue in issues: - issue['fname'] = filename - - # if the result isn't filtered out by severity - if self._check_severity(issue['issue_type']): - tmpstr_list.append("\n>> %s\n - %s::%s\n" % ( - issue['issue_text'], filename, issue['lineno'] - )) - - tmpstr_list.append( - self._get_code(issue, True)) - return "".join(tmpstr_list) - - def _report_tty(self, files_list, scores, excluded_files, lines=0): + def _report_text(self, files_list, scores, excluded_files, lines=0): '''Prints the contents of the result store :param scope: Which files were inspected @@ -260,14 +153,18 @@ class BanditResultStore(): tmpstr_list = [] - # get text colors from settings - get_setting = self.config.get_setting - color = {'HEADER': get_setting('color_HEADER'), - 'DEFAULT': get_setting('color_DEFAULT'), - 'INFO': get_setting('color_INFO'), - 'WARN': get_setting('color_WARN'), - 'ERROR': get_setting('color_ERROR') - } + # use a defaultdict to default to an empty string + color = defaultdict(str) + + if self.format == 'txt': + # get text colors from settings for TTY output + get_setting = self.config.get_setting + color = {'HEADER': get_setting('color_HEADER'), + 'DEFAULT': get_setting('color_DEFAULT'), + 'INFO': get_setting('color_INFO'), + 'WARN': get_setting('color_WARN'), + 'ERROR': get_setting('color_ERROR') + } # print header tmpstr_list.append("%sRun started:%s\n\t%s\n" % ( @@ -307,41 +204,23 @@ class BanditResultStore(): if self.count == 0: tmpstr_list.append("\tNo issues identified.\n") - # if aggregating by vulnerability type - elif self.agg_type == 'vuln': - for test, issues in self.resstore.items(): - for issue in issues: - # if the result in't filtered out by severity - if self._check_severity(issue['issue_type']): - tmpstr_list.append("\n%s>> %s\n - %s::%s%s\n" % ( - color.get(issue['issue_type'], color['DEFAULT']), - issue['issue_text'], - issue['fname'], - issue['lineno'], - color['DEFAULT'] - )) + for key, issues in self.resstore.items(): + for issue in issues: - tmpstr_list.append( - self._get_code(issue, True)) + # if the result in't filtered out by severity + if self._check_severity(issue['issue_type']): + tmpstr_list.append("\n%s>> %s\n - %s::%s%s\n" % ( + color.get(issue['issue_type'], color['DEFAULT']), + issue['issue_text'], + issue['fname'], + issue['lineno'], + color['DEFAULT'] + )) - # otherwise, aggregating by filename - else: - for filename, issues in self.resstore.items(): - for issue in issues: - issue['fname'] = filename - # if the result isn't filtered out by severity - if self._check_severity(issue['issue_type']): - tmpstr_list.append("\n%s>> %s\n - %s::%s%s\n" % ( - color.get( - issue['issue_type'], color['DEFAULT'] - ), - issue['issue_text'], filename, issue['lineno'], - color['DEFAULT'] - )) + tmpstr_list.append( + self._get_code(issue, True)) - tmpstr_list.append( - self._get_code(issue, True)) return ''.join(tmpstr_list) def report(self, files_list, scores, excluded_files=None, lines=-1, @@ -365,31 +244,27 @@ class BanditResultStore(): self.level = level self.max_lines = lines + self.format = output_format - if output_filename is None and output_format == 'txt': - print (self._report_tty(files_list, scores, - excluded_files=excluded_files)) # noqa - return + if self.format == 'json': + result = (self._report_json(files_list, scores, + excluded_files=excluded_files)) - if output_filename is None and output_format == 'json': - print (self._report_json(files_list, scores, - excluded_files=excluded_files)) # noqa - return - - if output_format == 'txt': - outer = self._report_txt(files_list, scores, - excluded_files=excluded_files) - with open(output_filename, 'w') as fout: - fout.write(outer) - print("TXT output written to file: %s" % output_filename) - return else: - outer = self._report_json(files_list, scores, - excluded_files=excluded_files) + # format is the default "txt" + if output_filename: + # output to file, specify plain text + self.format = 'plain' + result = self._report_text(files_list, scores, + excluded_files=excluded_files) + + if output_filename is not None: with open(output_filename, 'w') as fout: - fout.write(outer) - print("JSON output written to file: %s" % output_filename) - return + fout.write(result) + # XXX: Should this be log output? (ukbelch) + print("Output written to file: %s" % output_filename) + else: + print(result) def _get_code(self, issue, tabbed=False): '''Gets lines of code from a file