Replace password to "***" in the debug message

Use regex pattern to replace password to "***" for both env vars and
request body output

Also includes a minor refactor to move the code up in the file as
suggested by termie and henry regarding the review comments in
https://review.openstack.org/#/c/26487/
(Original Change-Id: I890415c755dd383749f2d4382f53d0b3a6badc6c)

Fix bug 1166697

Change-Id: I671ea25cca78b4dea1fbf2e63c89b82912279f2d
This commit is contained in:
gengjh 2013-04-09 22:13:31 +08:00 committed by Dolph Mathews
parent 8dd57da146
commit 912c3668dc
2 changed files with 87 additions and 14 deletions

View File

@ -20,6 +20,7 @@
"""Utility methods for working with WSGI servers."""
import re
import socket
import sys
@ -48,6 +49,34 @@ CONTEXT_ENV = 'openstack.context'
PARAMS_ENV = 'openstack.params'
_RE_PASS = re.compile(r'([\'"].*?password[\'"]\s*:\s*u?[\'"]).*?([\'"])',
re.DOTALL)
def mask_password(message, is_unicode=False, secret="***"):
"""Replace password with 'secret' in message.
:param message: The string which include security information.
:param is_unicode: Is unicode string ?
:param secret: substitution string default to "***".
:returns: The string
For example:
>>> mask_password('"password" : "aaaaa"')
'"password" : "***"'
>>> mask_password("'original_password' : 'aaaaa'")
"'original_password' : '***'"
>>> mask_password("u'original_password' : u'aaaaa'")
"u'original_password' : u'***'"
"""
if is_unicode:
message = unicode(message)
# Match the group 1,2 and replace all others with 'secret'
secret = r"\g<1>" + secret + r"\g<2>"
result = _RE_PASS.sub(secret, message)
return result
class WritableLogger(object):
"""A thin wrapper that responds to `write` and logs."""
@ -379,21 +408,23 @@ class Debug(Middleware):
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, req):
LOG.debug('%s %s %s', ('*' * 20), 'REQUEST ENVIRON', ('*' * 20))
for key, value in req.environ.items():
LOG.debug('%s = %s', key, value)
LOG.debug('')
LOG.debug('%s %s %s', ('*' * 20), 'REQUEST BODY', ('*' * 20))
for line in req.body_file:
LOG.debug(line)
LOG.debug('')
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug('%s %s %s', ('*' * 20), 'REQUEST ENVIRON', ('*' * 20))
for key, value in req.environ.items():
LOG.debug('%s = %s', key, mask_password(value,
is_unicode=True))
LOG.debug('')
LOG.debug('%s %s %s', ('*' * 20), 'REQUEST BODY', ('*' * 20))
for line in req.body_file:
LOG.debug(mask_password(line))
LOG.debug('')
resp = req.get_response(self.application)
LOG.debug('%s %s %s', ('*' * 20), 'RESPONSE HEADERS', ('*' * 20))
for (key, value) in resp.headers.iteritems():
LOG.debug('%s = %s', key, value)
LOG.debug('')
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug('%s %s %s', ('*' * 20), 'RESPONSE HEADERS', ('*' * 20))
for (key, value) in resp.headers.iteritems():
LOG.debug('%s = %s', key, value)
LOG.debug('')
resp.app_iter = self.print_generator(resp.app_iter)

View File

@ -21,13 +21,55 @@ from keystone.openstack.common import jsonutils
from keystone import test
class ApplicationTest(test.TestCase):
class FakeApp(wsgi.Application):
def index(self, context):
return {'a': 'b'}
class BaseWSGITest(test.TestCase):
def setUp(self):
self.app = FakeApp()
super(BaseWSGITest, self).setUp()
def _make_request(self, url='/'):
req = webob.Request.blank(url)
args = {'action': 'index', 'controller': None}
req.environ['wsgiorg.routing_args'] = [None, args]
return req
def test_mask_password(self):
message = ("test = 'password': 'aaaaaa', 'param1': 'value1', "
"\"new_password\": 'bbbbbb'")
self.assertEqual(wsgi.mask_password(message, True),
u"test = 'password': '***', 'param1': 'value1', "
"\"new_password\": '***'")
message = "test = 'password' : 'aaaaaa'"
self.assertEqual(wsgi.mask_password(message, False, '111'),
"test = 'password' : '111'")
message = u"test = u'password' : u'aaaaaa'"
self.assertEqual(wsgi.mask_password(message, True),
u"test = u'password' : u'***'")
message = 'test = "password" : "aaaaaaaaa"'
self.assertEqual(wsgi.mask_password(message),
'test = "password" : "***"')
message = 'test = "original_password" : "aaaaaaaaa"'
self.assertEqual(wsgi.mask_password(message),
'test = "original_password" : "***"')
message = 'test = "original_password" : ""'
self.assertEqual(wsgi.mask_password(message),
'test = "original_password" : "***"')
message = 'test = "param1" : "value"'
self.assertEqual(wsgi.mask_password(message),
'test = "param1" : "value"')
class ApplicationTest(BaseWSGITest):
def test_response_content_type(self):
class FakeApp(wsgi.Application):
def index(self, context):