initial commit

This commit is contained in:
Jamie Finnigan 2014-07-16 10:27:50 -07:00
parent c796659aea
commit 60339cad50
23 changed files with 520 additions and 2 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.pyc

View File

@ -1,4 +1,18 @@
bandit
Bandit
======
Python AST-based static analyzer from OpenStack Security Group
A Python AST-based static analyzer from OpenStack Security Group.
References
----------
Python AST module documentation: https://docs.python.org/2/library/ast.html
Green Tree Snakes - the missing Python AST docs:
http://greentreesnakes.readthedocs.org/en/latest/
Usage
-----
Example usage across a code tree:
find ~/openstack-repo/keystone -name '*.py' | xargs ./main.py

19
TODO Normal file
View File

@ -0,0 +1,19 @@
possible tests:
- Popen shell=True
- import of possibly-dangerous imports
- bad file perms (os.chmod https://docs.python.org/2/library/os.html#os.chmod)
- taint checking / lack of input validation (object returned by requests.get()/.post() has headers, content, text, json attributes)
- hardcoded passwords
- logging sensitive information
- sql commands into sql alchemy
- poor crypto primitives
- temp file creation
- wildcard injection
- port binding 0.0.0.0
- TLS requests w/out cert checks
- SSLv2 forced
- eval/exec functions
- sudo calls
- de-serializing (pickle? yaml? json?)

0
bandit/__init__.py Normal file
View File

75
bandit/manager.py Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
import sys, logging
import ast
from bandit import result_store as b_result_store
from bandit import node_visitor as b_node_visitor
from bandit import meta_ast as b_meta_ast
class BanditManager():
scope = []
progress = 50
def __init__(self, debug=False):
self.logger = self._init_logger(debug)
self.b_ma = b_meta_ast.BanditMetaAst(self.logger)
self.b_rs = b_result_store.BanditResultStore(self.logger)
def get_logger(self):
return self.logger
def get_resultstore(self):
return self.b_rs
def output_results(self, lines, level):
self.b_rs.report(scope=self.scope, lines=lines, level=level)
def output_metaast(self):
self.b_ma.report()
def run_scope(self, scope):
if scope:
self.scope = scope
sys.stdout.write("%s [" % len(scope))
for i, fname in enumerate(scope):
self.logger.debug("working on file : %s" % fname)
if i % self.progress == 0:
sys.stdout.write("%s.. " % i)
sys.stdout.flush()
try:
with open(fname, 'rU') as fdata:
try:
self._execute_ast_visitor(fname, fdata, self.b_ma, self.b_rs)
except KeyboardInterrupt as e:
sys.exit(2)
except IOError as e:
self.logger.error("%s" % e.strerror)
sys.stdout.write("]\n")
sys.stdout.flush()
else:
self.logger.info("no filename/s provided, working from stdin")
try:
self._execute_ast_visitor('STDIN', sys.stdin, self.b_ma, self.b_rs)
except KeyboardInterrupt:
self.logger.debug("exiting")
sys.exit(1)
def _execute_ast_visitor(self, fname, fdata, b_ma, b_rs):
if fdata != None:
res = b_node_visitor.BanditNodeVisitor(fname, self.logger, b_ma, b_rs)
res.visit(ast.parse("".join(fdata.readlines())))
def _init_logger(self, debug=False):
log_level = logging.INFO
if debug:
log_level = logging.DEBUG
log_format = '[%(module)s]\t%(levelname)s\t%(message)s'
logger = logging.getLogger()
logger.setLevel(log_level)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(handler)
logger.debug("logging initialized")
return logger

25
bandit/meta_ast.py Normal file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env python
from collections import OrderedDict
class BanditMetaAst():
nodes = OrderedDict()
def __init__(self, logger):
self.logger = logger
def add_node(self, node, parent_id, depth):
node_id = hex(id(node))
self.logger.debug('adding node : %s [%s]' % (node_id, depth))
self.nodes[node_id] = {'raw':node, 'parent_id':parent_id, 'depth':depth}
def report(self):
tmpstr = ""
for k,v in self.nodes.items():
tmpstr += "Node: %s\n" % k
tmpstr += "\t%s\n" % str(v)
tmpstr += "Length : %s\n" % len(self.nodes)
print(tmpstr)

90
bandit/node_visitor.py Executable file
View File

@ -0,0 +1,90 @@
#!/usr/bin/env python
import sys
import ast, _ast
from bandit import tester as b_tester
from bandit import utils as b_utils
class BanditNodeVisitor(ast.NodeVisitor):
imports = set()
import_aliases = {}
callstack = ""
calldone = False
callbasenode = None
logger = None
results = None
tester = None
fname = None
depth = 0
def __init__(self, fname, logger, metaast, results):
self.seen = 0
self.fname = fname
self.logger = logger
self.metaast = metaast
self.results = results
self.tester = b_tester.BanditTester(self.logger, self.results)
def _get_Call_name(self, node):
if type(node.func) == _ast.Name:
return(b_utils.deepgetattr(node, 'func.id'))
elif type(node.func) == _ast.Attribute:
prefix = ""
if type(node.func.value) == _ast.Name:
prefix = b_utils.deepgetattr(node, 'func.value.id') + "."
return("%s%s" % (prefix, b_utils.deepgetattr(node, 'func.attr')))
def visit_Call(self, node):
self.tester.test_call(node, name=self.callstack)
if self.callstack == "":
self.callbasenode = node
self.callstack = self._get_Call_name(node)
#nested calls
if type(node.func) == _ast.Attribute:
if type(node.func.value) == _ast.Call:
self.callstack = ".".join([self._get_Call_name(node.func.value), self.callstack])
else:
self.calldone = True
else:
self.calldone = True
#done with nested
if (self.calldone):
self.logger.debug("PARSED COMPLETE CALLSTACK: %s" % self.callstack)
self.logger.debug("\tBASENODE: %s" % ast.dump(self.callbasenode))
file_detail = (self.fname, node.lineno)
self.tester.test_call_with_name(file_detail, self.callstack, self.callbasenode, self.imports, self.import_aliases)
self.callstack = ""
self.calldone = False
super(BanditNodeVisitor, self).generic_visit(node)
def visit_Import(self, node):
for alias in node.names:
if alias.asname:
self.import_aliases[alias.asname] = alias.name
self.imports.add(alias.name)
file_detail = (self.fname, node.lineno)
self.tester.test_import_name(file_detail, alias.name)
super(BanditNodeVisitor, self).generic_visit(node)
def visit_ImportFrom(self, node):
module = node.module
for alias in node.names:
if alias.asname:
self.import_aliases[alias.asname] = module + "." + alias.name
self.imports.add(module + "." + alias.name)
file_detail = (self.fname, node.lineno)
self.tester.test_import_name(file_detail, module + "." + alias.name)
super(BanditNodeVisitor, self).generic_visit(node)
def visit(self, node):
self.seen += 1
self.logger.debug("entering: %s %s [%s]" % (hex(id(node)), type(node), self.depth))
self.logger.debug(ast.dump(node))
self.metaast.add_node(node, '', self.depth)
self.depth += 1
super(BanditNodeVisitor, self).visit(node)
self.depth -= 1
self.logger.debug("%s\texiting : %s" % (self.depth, hex(id(node))))

49
bandit/result_store.py Normal file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python
"""An object to store/access results associated with Bandit tests."""
from collections import OrderedDict
import linecache
from sys import stdout
import utils
class BanditResultStore():
resstore = OrderedDict()
count = 0
def __init__(self, logger):
self.count = 0
self.logger = logger
def add(self, file_detail, issue_type, issue_text):
filename, lineno = file_detail
if filename in self.resstore:
self.resstore[filename].append((lineno, issue_type, issue_text))
else:
self.resstore[filename] = [(lineno, issue_type, issue_text),]
self.count += 1
def report(self, scope, lines=0, level=1, is_tty=stdout.isatty()):
if level >= len(utils.sev):
level = len(utils.sev) - 1
tmpstr = ""
if self.count > 0:
tmpstr += "%sFiles tested (%s):%s\n\t" % (utils.color['HEADER'], len(scope), utils.color['DEFAULT']) if is_tty else "File tested (%s):\n\t" % (len(scope))
tmpstr += "%s\n" % "\n\t".join(scope)
tmpstr += "%sTest results:%s\n" % (utils.color['HEADER'], utils.color['DEFAULT']) if is_tty else "Test results:\n"
for filename,issues in self.resstore.items():
for lineno, issue_type, issue_text in issues:
if utils.sev.index(issue_type) >= level:
tmpstr += "%s>> %s\n - %s::%s%s\n" % (utils.color.get(issue_type, utils.color['DEFAULT']), issue_text, filename, lineno, utils.color['DEFAULT']) if is_tty else ">> %s\n - %s::%s\n" % (issue_text, filename, lineno)
for i in utils.mid_range(lineno, lines):
line = linecache.getline(filename, i)
#linecache returns '' if line does not exist
if line != '':
tmpstr += "\t%3d %s" % (i, linecache.getline(filename, i))
print(tmpstr)
else:
self.logger.error("no results to display - %s files scanned" % self.count)

92
bandit/tester.py Normal file
View File

@ -0,0 +1,92 @@
#!/usr/bin/env python
import ast, _ast
import stat
class BanditTester():
results = None
def __init__(self, logger, results):
self.logger = logger
self.results = results
self.last_result = None
def _ast_args_to_str(self, args):
res = '\n\tArgument/s:\n\t\t%s' % '\n\t\t'.join([ast.dump(arg) for arg in args])
res = ''
return res
def test_call(self, call, name=None):
self.logger.debug("test_call (%s) executed with Call object: %s" % (name, ast.dump(call)))
return False
def test_call_with_name(self, file_detail, name, call, imports, aliases):
self.logger.debug('test_call_name executed with name : %s' % name)
self.logger.debug('test_call_name callobj : %s' % ast.dump(call))
self.logger.debug('test_call_name imports : %s' % imports)
self.logger.debug('test_call_name aliases : %s' % aliases)
bad_name_sets = [
( ['pickle.loads', 'pickle.dumps',],
'Pickle library appears to be in use, possible security issue.'),
( ['hashlib.md5',],
'Use of insecure MD5 hash function.'),
( ['subprocess.Popen',],
'Use of possibly-insecure system call function (subprocess.Popen).'),
( ['subprocess.call',],
'Use of possibly-insecure system call function (subprocess.call).'),
( ['mktemp',],
'Use of insecure and deprecated function (mktemp).'),
( ['eval',],
'Use of possibly-insecure function - consider using the safer ast.literal_eval().'),
]
#subs in import aliases
if name in aliases:
name = aliases[name]
else:
for alias in aliases:
if name != None and name.startswith('%s.' % alias):
name = "%s.%s" % (aliases[alias], name[len(alias) + 1:])
#do tests
if name != None:
#specific tests based on function names
#if 'Popen' in name:
if name == 'subprocess.Popen':
if hasattr(call, 'keywords'):
for k in call.keywords:
if k.arg == 'shell' and isinstance(k.value, _ast.Name):
if k.value.id == 'True':
self.results.add(file_detail, 'ERROR', 'Popen call with shell=True identified, security issue. %s' % self._ast_args_to_str(call.args))
if 'requests' in name and ('get' in name or 'post' in name):
if hasattr(call, 'keywords'):
for k in call.keywords:
if k.arg == 'verify' and isinstance(k.value, _ast.Name):
if k.value.id == 'False':
self.results.add(file_detail, 'ERROR', 'Requests call with verify=False disabling SSL certificate checks, security issue. %s' % self._ast_args_to_str(call.args))
if 'chmod' in name :
if hasattr(call, 'args') and len(call.args) == 2:
if isinstance(call.args[1], _ast.Num):
if (call.args[1].n & stat.S_IWOTH) or (call.args[1].n & stat.S_IXGRP):
try:
self.results.add(file_detail, 'ERROR', 'Chmod setting a permissive mask %s on file (%s).' % (oct(call.args[1].n), call.args[0].s))
except AttributeError:
self.results.add(file_detail, 'ERROR', 'Chmod setting a permissive mask %s on file.' % (oct(call.args[1].n)))
#test for 'bad' names defined above
for bad_name_set in bad_name_sets:
for bad_name in bad_name_set[0]:
#if name.startswith(bad_name) or name.endswith(bad_name):
if name == bad_name:
self.results.add(file_detail, 'WARN', "%s %s" % (bad_name_set[1], self._ast_args_to_str(call.args)))
def test_import_name(self, file_detail, name):
self.logger.debug('test_import_name executed with name : %s' % name)
warn_on_import = ['pickle', 'subprocess', 'crypto']
for mod in warn_on_import:
if name.startswith(mod):
self.results.add(file_detail, 'INFO', "Consider possible security implications associated with '%s' module" % mod)

48
bandit/utils.py Normal file
View File

@ -0,0 +1,48 @@
#!/usr/bin/env python
import symtable
"""Various helper functions."""
sev = [ 'INFO', 'WARN', 'ERROR' ]
color = {
'DEFAULT': '\033[0m',
'HEADER': '\033[95m',
'INFO': '\033[94m',
'WARN': '\033[93m',
'ERROR': '\033[91m',
}
def deepgetattr(obj, attr):
"""Recurses through an attribute chain to get the ultimate value."""
for key in attr.split('.'):
obj = getattr(obj, key)
return obj
def describe_symbol(sym):
assert type(sym) == symtable.Symbol
print("Symbol:", sym.get_name())
for prop in [
'referenced', 'imported', 'parameter',
'global', 'declared_global', 'local',
'free', 'assigned', 'namespace']:
if getattr(sym, 'is_' + prop)():
print(' is', prop)
def mid_range(mid, count):
if count == 1:
return range(mid, mid + 1)
diff = count / 2
if count % 2 == 0:
start = mid - diff
stop = mid + diff
else:
start = mid - diff
stop = mid + diff + 1
if start < 1:
stop = stop + (start * -1) + 1
start = 1
return range(start, stop)

4
examples/crypto-md5.py Normal file
View File

@ -0,0 +1,4 @@
import hashlib
hashlib.md5(1)
hashlib.md5(1).hexdigest()

5
examples/eval.py Normal file
View File

@ -0,0 +1,5 @@
import os
print(eval("1+1"))
print(eval("os.getcwd()"))
print(eval("os.chmod('%s', 0777)" % 'test.txt'))

View File

@ -0,0 +1,14 @@
from subprocess import Popen as pop
import hashlib as h
import hashlib as hh
import hashlib as hhh
import hashlib as hhhh
from pickle import loads as lp
pop('gcc --version', shell=True)
h.md5('1')
hh.md5('2')
hhh.md5('3').hexdigest()
hhhh.md5('4')
lp({'key':'value'})

4
examples/imports.py Normal file
View File

@ -0,0 +1,4 @@
import os
import pickle
import sys
import subprocess

16
examples/os-chmod.py Normal file
View File

@ -0,0 +1,16 @@
import os
import stat
os.chmod('/etc/passwd', 0777)
os.chmod('/etc/passwd', 0227)
os.chmod('/etc/passwd', 07)
os.chmod('/etc/passwd', 0664)
os.chmod('/etc/passwd', 0777)
os.chmod('/etc/passwd', 0777)
os.chmod('/etc/passwd', 0777)
os.chmod('/etc/passwd', 0777)
os.chmod('~/.bashrc', 511)
os.chmod('/etc/hosts', 0o777)
os.chmod('/etc/hosts', 0o777)
os.chmod('/tmp/oh_hai', 0x1ff)
os.chmod('/etc/passwd', stat.S_IRWXU)

4
examples/pickle.py Normal file
View File

@ -0,0 +1,4 @@
import pickle
pick = pickle.dumps({'a':'b', 'c':'d'})
raw = pickle.loads(pick)

View File

@ -0,0 +1,6 @@
import requests as r
r.get('https://gmail.com', verify=True)
r.get('https://gmail.com', verify=False)
r.post('https://gmail.com', verify=True)
r.post('https://gmail.com', verify=False)

View File

@ -0,0 +1,6 @@
import requests
requests.get('https://gmail.com', verify=True)
requests.get('https://gmail.com', verify=False)
requests.post('https://gmail.com', verify=True)
requests.post('https://gmail.com', verify=False)

View File

@ -0,0 +1,5 @@
import subprocess
subprocess.call([ "ls",
"-l"
])

View File

@ -0,0 +1,3 @@
import subprocess
subprocess.call(["ls", "-l"])

View File

@ -0,0 +1,7 @@
from subprocess import Popen as pop
def Popen():
print('hi')
pop('gcc --version', shell=True)
Popen('gcc --version', shell=True)

View File

@ -0,0 +1,4 @@
import subprocess
subprocess.Popen('gcc --version', shell=True)

27
main.py Executable file
View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
import sys, argparse
from bandit import manager as b_manager
if __name__=='__main__':
parser = argparse.ArgumentParser(description='Bandit - a Python source code analyzer.')
parser.add_argument('files', metavar='file', type=str, nargs='+',
help='source file/s to be tested')
parser.add_argument('-C', '--context', dest='context', action='store',
default=0, type=int,
help='number of context lines to print')
parser.add_argument('-l', '--level', dest='level', action='count',
default=1, help='results level filter')
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
help='turn on debug mode')
parser.set_defaults(debug=False)
args = parser.parse_args()
b_mgr = b_manager.BanditManager(args.debug)
b_mgr.run_scope(args.files)
b_mgr.output_results(args.context, args.level - 1)
#b_mgr.output_metaast()