deb-alembic/alembic/testing/assertions.py

204 lines
5.6 KiB
Python

from __future__ import absolute_import
import re
from .. import util
from sqlalchemy.engine import default
from ..util.compat import text_type, py3k
import contextlib
from sqlalchemy.util import decorator
from sqlalchemy import exc as sa_exc
import warnings
from . import mock
if not util.sqla_094:
def eq_(a, b, msg=None):
"""Assert a == b, with repr messaging on failure."""
assert a == b, msg or "%r != %r" % (a, b)
def ne_(a, b, msg=None):
"""Assert a != b, with repr messaging on failure."""
assert a != b, msg or "%r == %r" % (a, b)
def is_(a, b, msg=None):
"""Assert a is b, with repr messaging on failure."""
assert a is b, msg or "%r is not %r" % (a, b)
def assert_raises(except_cls, callable_, *args, **kw):
try:
callable_(*args, **kw)
success = False
except except_cls:
success = True
# assert outside the block so it works for AssertionError too !
assert success, "Callable did not raise an exception"
def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
try:
callable_(*args, **kwargs)
assert False, "Callable did not raise an exception"
except except_cls as e:
assert re.search(
msg, text_type(e), re.UNICODE), "%r !~ %s" % (msg, e)
print(text_type(e).encode('utf-8'))
else:
from sqlalchemy.testing.assertions import eq_, ne_, is_, \
assert_raises_message, assert_raises
def eq_ignore_whitespace(a, b, msg=None):
a = re.sub(r'^\s+?|\n', "", a)
a = re.sub(r' {2,}', " ", a)
b = re.sub(r'^\s+?|\n', "", b)
b = re.sub(r' {2,}', " ", b)
# convert for unicode string rendering,
# using special escape character "!U"
if py3k:
b = re.sub(r'!U', '', b)
else:
b = re.sub(r'!U', 'u', b)
assert a == b, msg or "%r != %r" % (a, b)
def assert_compiled(element, assert_string, dialect=None):
dialect = _get_dialect(dialect)
eq_(
text_type(element.compile(dialect=dialect)).
replace("\n", "").replace("\t", ""),
assert_string.replace("\n", "").replace("\t", "")
)
_dialects = {}
def _get_dialect(name):
if name is None or name == 'default':
return default.DefaultDialect()
else:
try:
return _dialects[name]
except KeyError:
dialect_mod = getattr(
__import__('sqlalchemy.dialects.%s' % name).dialects, name)
_dialects[name] = d = dialect_mod.dialect()
if name == 'postgresql':
d.implicit_returning = True
elif name == 'mssql':
d.legacy_schema_aliasing = False
return d
def expect_warnings(*messages, **kw):
"""Context manager which expects one or more warnings.
With no arguments, squelches all SAWarnings emitted via
sqlalchemy.util.warn and sqlalchemy.util.warn_limited. Otherwise
pass string expressions that will match selected warnings via regex;
all non-matching warnings are sent through.
The expect version **asserts** that the warnings were in fact seen.
Note that the test suite sets SAWarning warnings to raise exceptions.
"""
return _expect_warnings(sa_exc.SAWarning, messages, **kw)
@contextlib.contextmanager
def expect_warnings_on(db, *messages, **kw):
"""Context manager which expects one or more warnings on specific
dialects.
The expect version **asserts** that the warnings were in fact seen.
"""
spec = db_spec(db)
if isinstance(db, util.string_types) and not spec(config._current):
yield
elif not _is_excluded(*db):
yield
else:
with expect_warnings(*messages, **kw):
yield
def emits_warning(*messages):
"""Decorator form of expect_warnings().
Note that emits_warning does **not** assert that the warnings
were in fact seen.
"""
@decorator
def decorate(fn, *args, **kw):
with expect_warnings(assert_=False, *messages):
return fn(*args, **kw)
return decorate
def emits_warning_on(db, *messages):
"""Mark a test as emitting a warning on a specific dialect.
With no arguments, squelches all SAWarning failures. Or pass one or more
strings; these will be matched to the root of the warning description by
warnings.filterwarnings().
Note that emits_warning_on does **not** assert that the warnings
were in fact seen.
"""
@decorator
def decorate(fn, *args, **kw):
with expect_warnings_on(db, *messages):
return fn(*args, **kw)
return decorate
@contextlib.contextmanager
def _expect_warnings(exc_cls, messages, regex=True, assert_=True):
if regex:
filters = [re.compile(msg, re.I) for msg in messages]
else:
filters = messages
seen = set(filters)
real_warn = warnings.warn
def our_warn(msg, exception=None, *arg, **kw):
if exception and not issubclass(exception, exc_cls):
return real_warn(msg, exception, *arg, **kw)
if not filters:
return
for filter_ in filters:
if (regex and filter_.match(msg)) or \
(not regex and filter_ == msg):
seen.discard(filter_)
break
else:
if exception is None:
real_warn(msg, *arg, **kw)
else:
real_warn(msg, exception, *arg, **kw)
with mock.patch("warnings.warn", our_warn):
yield
if assert_:
assert not seen, "Warnings were not seen: %s" % \
", ".join("%r" % (s.pattern if regex else s) for s in seen)