Temporary backport logwrap from fuel-devops

Temporary backport logwrap from fuel-devops as
producing human readable logs
This is required already for debug purposes

Change-Id: Idd4e0519a4b02a5ef2e5b52e96669972a07a20f0
This commit is contained in:
Alexey Stepanov 2016-10-05 13:04:29 +03:00
parent 7892e9b588
commit a87ec53f71
2 changed files with 372 additions and 46 deletions

View File

@ -19,6 +19,7 @@ import logging
import unittest
# pylint: disable=import-error
import mock
from mock import call
from mock import Mock
from mock import patch
@ -29,48 +30,185 @@ from core.helpers import log_helpers
# pylint: disable=no-self-use
@patch('core.helpers.log_helpers.logger', autospec=True)
@mock.patch('core.helpers.log_helpers.logger', autospec=True)
class TestLogWrap(unittest.TestCase):
def test_positive(self, logger):
def test_no_args(self, logger):
@log_helpers.logwrap
def func():
return 'No args'
result = func()
self.assertEqual(result, 'No args')
logger.assert_has_calls((
mock.call.log(
level=logging.DEBUG,
msg="Calling: \n'func'()"
),
mock.call.log(
level=logging.DEBUG,
msg="Done: 'func' with result:\n{}".format(
log_helpers.pretty_repr(result))
),
))
def test_args_simple(self, logger):
arg = 'test arg'
@log_helpers.logwrap
def func(tst):
return tst
result = func(arg)
self.assertEqual(result, arg)
logger.assert_has_calls((
mock.call.log(
level=logging.DEBUG,
msg="Calling: \n'func'(\n 'tst'={},\n)".format(
log_helpers.pretty_repr(
arg, indent=8, no_indent_start=True)
)
),
mock.call.log(
level=logging.DEBUG,
msg="Done: 'func' with result:\n{}".format(
log_helpers.pretty_repr(result))
),
))
def test_args_defaults(self, logger):
arg = 'test arg'
@log_helpers.logwrap
def func(tst=arg):
return tst
result = func()
self.assertEqual(result, arg)
logger.assert_has_calls((
mock.call.log(
level=logging.DEBUG,
msg="Calling: \n'func'(\n 'tst'={},\n)".format(
log_helpers.pretty_repr(
arg, indent=8, no_indent_start=True))
),
mock.call.log(
level=logging.DEBUG,
msg="Done: 'func' with result:\n{}".format(
log_helpers.pretty_repr(result))
),
))
def test_args_complex(self, logger):
string = 'string'
dictionary = {'key': 'dictionary'}
@log_helpers.logwrap
def func(param_string, param_dictionary):
return param_string, param_dictionary
result = func(string, dictionary)
self.assertEqual(result, (string, dictionary))
# raise ValueError(logger.mock_calls)
logger.assert_has_calls((
mock.call.log(
level=logging.DEBUG,
msg="Calling: \n'func'("
"\n 'param_string'={string},"
"\n 'param_dictionary'={dictionary},\n)".format(
string=log_helpers.pretty_repr(
string,
indent=8, no_indent_start=True),
dictionary=log_helpers.pretty_repr(
dictionary,
indent=8, no_indent_start=True)
)
),
mock.call.log(
level=logging.DEBUG,
msg="Done: 'func' with result:\n{}".format(
log_helpers.pretty_repr(result))
),
))
def test_args_kwargs(self, logger):
targs = ['string1', 'string2']
tkwargs = {'key': 'tkwargs'}
@log_helpers.logwrap
def func(*args, **kwargs):
return 'complete with {} {}'.format(args, kwargs)
call_args = 't', 'e'
call_kwargs = dict(s='s', t='t')
result = func(*call_args, **call_kwargs)
self.assertEqual(
result,
'complete with {} {}'.format(call_args, call_kwargs)
)
return tuple(args), kwargs
result = func(*targs, **tkwargs)
self.assertEqual(result, (tuple(targs), tkwargs))
# raise ValueError(logger.mock_calls)
logger.assert_has_calls((
call.debug(
"Calling: 'func' with args: {!r} {!r}".format(
call_args, call_kwargs)),
call.debug(
"Done: 'func' with result: {!r}".format(result))
mock.call.log(
level=logging.DEBUG,
msg="Calling: \n'func'("
"\n 'args'={args},"
"\n 'kwargs'={kwargs},\n)".format(
args=log_helpers.pretty_repr(
tuple(targs),
indent=8, no_indent_start=True),
kwargs=log_helpers.pretty_repr(
tkwargs,
indent=8, no_indent_start=True)
)
),
mock.call.log(
level=logging.DEBUG,
msg="Done: 'func' with result:\n{}".format(
log_helpers.pretty_repr(result))
),
))
def test_negative(self, logger):
@log_helpers.logwrap
def func(*args, **kwargs):
raise ValueError(args, kwargs)
call_args = 't', 'e'
call_kwargs = dict(s='s', t='t')
def func():
raise ValueError('as expected')
with self.assertRaises(ValueError):
func(*call_args, **call_kwargs)
func()
logger.assert_has_calls((
call.debug(
"Calling: 'func' with args: {!r} {!r}".format(
call_args, call_kwargs)),
call.exception(
"'func' raised: ValueError({}, {})\n".format(
call_args, call_kwargs))
mock.call.log(
level=logging.DEBUG,
msg="Calling: \n'func'()"
),
mock.call.log(
level=logging.ERROR,
msg="Failed: \n'func'()",
exc_info=True
),
))
def test_negative_substitutions(self, logger):
new_logger = mock.Mock(spec=logging.Logger, name='logger')
log = mock.Mock(name='log')
new_logger.attach_mock(log, 'log')
@log_helpers.logwrap(
log=new_logger,
log_level=logging.INFO,
exc_level=logging.WARNING
)
def func():
raise ValueError('as expected')
with self.assertRaises(ValueError):
func()
self.assertEqual(len(logger.mock_calls), 0)
log.assert_has_calls((
mock.call(
level=logging.INFO,
msg="Calling: \n'func'()"
),
mock.call(
level=logging.WARNING,
msg="Failed: \n'func'()",
exc_info=True
),
))

View File

@ -14,30 +14,218 @@
from __future__ import unicode_literals
import collections
import functools
import inspect
import logging
import sys
import warnings
import six
from core import logger
def logwrap(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
logger.debug(
"Calling: {!r} with args: {!r} {!r}".format(
func.__name__, args, kwargs
)
# pylint: disable=no-member
def _get_arg_names(func):
"""get argument names for function
:param func: func
:return: list of function argnames
:rtype: list
>>> def tst_1():
... pass
>>> _get_arg_names(tst_1)
[]
>>> def tst_2(arg):
... pass
>>> _get_arg_names(tst_2)
['arg']
"""
# noinspection PyUnresolvedReferences
return (
[arg for arg in inspect.getargspec(func=func).args] if six.PY2 else
list(inspect.signature(obj=func).parameters.keys())
)
def _getcallargs(func, *positional, **named):
"""get real function call arguments without calling function
:rtype: dict
"""
# noinspection PyUnresolvedReferences
if sys.version_info[0:2] < (3, 5): # apply_defaults is py35 feature
orig_args = inspect.getcallargs(func, *positional, **named)
# Construct OrderedDict as Py3
arguments = collections.OrderedDict(
[(key, orig_args[key]) for key in _get_arg_names(func)]
)
try:
result = func(*args, **kwargs)
logger.debug(
"Done: {!r} with result: {!r}".format(func.__name__, result))
except BaseException as e:
logger.exception(
'{func!r} raised: {exc!r}\n'.format(func=func.__name__, exc=e))
raise
return result
return wrapped
if six.PY2:
# args and kwargs is not bound in py27
# Note: py27 inspect is not unicode
if 'args' in orig_args:
arguments[b'args'] = orig_args['args']
if 'kwargs' in orig_args:
arguments[b'kwargs'] = orig_args['kwargs']
return arguments
sig = inspect.signature(func).bind(*positional, **named)
sig.apply_defaults() # after bind we doesn't have defaults
return sig.arguments
# pylint:enable=no-member
def _simple(item):
"""Check for nested iterations: True, if not"""
return not isinstance(item, (list, set, tuple, dict))
_formatters = {
'simple': "{spc:<{indent}}{val!r}".format,
'text': "{spc:<{indent}}{prefix}'''{string}'''".format,
'dict': "\n{spc:<{indent}}{key!r:{size}}: {val},".format,
}
def pretty_repr(src, indent=0, no_indent_start=False, max_indent=20):
"""Make human readable repr of object
:param src: object to process
:type src: object
:param indent: start indentation, all next levels is +4
:type indent: int
:param no_indent_start: do not indent open bracket and simple parameters
:type no_indent_start: bool
:param max_indent: maximal indent before classic repr() call
:type max_indent: int
:return: formatted string
"""
if _simple(src) or indent >= max_indent:
indent = 0 if no_indent_start else indent
if isinstance(src, (six.binary_type, six.text_type)):
if isinstance(src, six.binary_type):
string = src.decode(
encoding='utf-8',
errors='backslashreplace'
)
prefix = 'b'
else:
string = src
prefix = 'u'
return _formatters['text'](
spc='',
indent=indent,
prefix=prefix,
string=string
)
return _formatters['simple'](
spc='',
indent=indent,
val=src
)
if isinstance(src, dict):
prefix, suffix = '{', '}'
result = ''
max_len = len(max([repr(key) for key in src])) if src else 0
for key, val in src.items():
result += _formatters['dict'](
spc='',
indent=indent + 4,
size=max_len,
key=key,
val=pretty_repr(val, indent + 8, no_indent_start=True)
)
return (
'\n{start:>{indent}}'.format(
start=prefix,
indent=indent + 1
) +
result +
'\n{end:>{indent}}'.format(end=suffix, indent=indent + 1)
)
if isinstance(src, list):
prefix, suffix = '[', ']'
elif isinstance(src, tuple):
prefix, suffix = '(', ')'
else:
prefix, suffix = '{', '}'
result = ''
for elem in src:
if _simple(elem):
result += '\n'
result += pretty_repr(elem, indent + 4) + ','
return (
'\n{start:>{indent}}'.format(
start=prefix,
indent=indent + 1) +
result +
'\n{end:>{indent}}'.format(end=suffix, indent=indent + 1)
)
def logwrap(log=logger, log_level=logging.DEBUG, exc_level=logging.ERROR):
"""Log function calls
:type log: logging.Logger
:type log_level: int
:type exc_level: int
:rtype: callable
"""
warnings.warn(
'logwrap is moved to fuel-devops 3.0.3,'
' please change imports after switch',
DeprecationWarning)
def real_decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
call_args = _getcallargs(func, *args, **kwargs)
args_repr = ""
if len(call_args) > 0:
args_repr = "\n " + "\n ".join((
"{key!r}={val},".format(
key=key,
val=pretty_repr(val, indent=8, no_indent_start=True)
)
for key, val in call_args.items())
) + '\n'
log.log(
level=log_level,
msg="Calling: \n{name!r}({arguments})".format(
name=func.__name__,
arguments=args_repr
)
)
try:
result = func(*args, **kwargs)
log.log(
level=log_level,
msg="Done: {name!r} with result:\n{result}".format(
name=func.__name__,
result=pretty_repr(result))
)
except BaseException:
log.log(
level=exc_level,
msg="Failed: \n{name!r}({arguments})".format(
name=func.__name__,
arguments=args_repr,
),
exc_info=True
)
raise
return result
return wrapped
if not isinstance(log, logging.Logger):
func, log = log, logger
return real_decorator(func)
return real_decorator
class QuietLogger(object):