deb-alembic/alembic/util/langhelpers.py

332 lines
9.5 KiB
Python

import textwrap
import warnings
import inspect
import uuid
import collections
from .compat import callable, exec_, string_types, with_metaclass
from sqlalchemy.util import format_argspec_plus, update_wrapper
from sqlalchemy.util.compat import inspect_getfullargspec
class _ModuleClsMeta(type):
def __setattr__(cls, key, value):
super(_ModuleClsMeta, cls).__setattr__(key, value)
cls._update_module_proxies(key)
class ModuleClsProxy(with_metaclass(_ModuleClsMeta)):
"""Create module level proxy functions for the
methods on a given class.
The functions will have a compatible signature
as the methods.
"""
_setups = collections.defaultdict(lambda: (set(), []))
@classmethod
def _update_module_proxies(cls, name):
attr_names, modules = cls._setups[cls]
for globals_, locals_ in modules:
cls._add_proxied_attribute(name, globals_, locals_, attr_names)
def _install_proxy(self):
attr_names, modules = self._setups[self.__class__]
for globals_, locals_ in modules:
globals_['_proxy'] = self
for attr_name in attr_names:
globals_[attr_name] = getattr(self, attr_name)
def _remove_proxy(self):
attr_names, modules = self._setups[self.__class__]
for globals_, locals_ in modules:
globals_['_proxy'] = None
for attr_name in attr_names:
del globals_[attr_name]
@classmethod
def create_module_class_proxy(cls, globals_, locals_):
attr_names, modules = cls._setups[cls]
modules.append(
(globals_, locals_)
)
cls._setup_proxy(globals_, locals_, attr_names)
@classmethod
def _setup_proxy(cls, globals_, locals_, attr_names):
for methname in dir(cls):
cls._add_proxied_attribute(methname, globals_, locals_, attr_names)
@classmethod
def _add_proxied_attribute(cls, methname, globals_, locals_, attr_names):
if not methname.startswith('_'):
meth = getattr(cls, methname)
if callable(meth):
locals_[methname] = cls._create_method_proxy(
methname, globals_, locals_)
else:
attr_names.add(methname)
@classmethod
def _create_method_proxy(cls, name, globals_, locals_):
fn = getattr(cls, name)
spec = inspect.getargspec(fn)
if spec[0] and spec[0][0] == 'self':
spec[0].pop(0)
args = inspect.formatargspec(*spec)
num_defaults = 0
if spec[3]:
num_defaults += len(spec[3])
name_args = spec[0]
if num_defaults:
defaulted_vals = name_args[0 - num_defaults:]
else:
defaulted_vals = ()
apply_kw = inspect.formatargspec(
name_args, spec[1], spec[2],
defaulted_vals,
formatvalue=lambda x: '=' + x)
def _name_error(name):
raise NameError(
"Can't invoke function '%s', as the proxy object has "
"not yet been "
"established for the Alembic '%s' class. "
"Try placing this code inside a callable." % (
name, cls.__name__
))
globals_['_name_error'] = _name_error
translations = getattr(fn, "_legacy_translations", [])
if translations:
outer_args = inner_args = "*args, **kw"
translate_str = "args, kw = _translate(%r, %r, %r, args, kw)" % (
fn.__name__,
tuple(spec),
translations
)
def translate(fn_name, spec, translations, args, kw):
return_kw = {}
return_args = []
for oldname, newname in translations:
if oldname in kw:
warnings.warn(
"Argument %r is now named %r "
"for method %s()." % (
oldname, newname, fn_name
))
return_kw[newname] = kw.pop(oldname)
return_kw.update(kw)
args = list(args)
if spec[3]:
pos_only = spec[0][:-len(spec[3])]
else:
pos_only = spec[0]
for arg in pos_only:
if arg not in return_kw:
try:
return_args.append(args.pop(0))
except IndexError:
raise TypeError(
"missing required positional argument: %s"
% arg)
return_args.extend(args)
return return_args, return_kw
globals_['_translate'] = translate
else:
outer_args = args[1:-1]
inner_args = apply_kw[1:-1]
translate_str = ""
func_text = textwrap.dedent("""\
def %(name)s(%(args)s):
%(doc)r
%(translate)s
try:
p = _proxy
except NameError:
_name_error('%(name)s')
return _proxy.%(name)s(%(apply_kw)s)
e
""" % {
'name': name,
'translate': translate_str,
'args': outer_args,
'apply_kw': inner_args,
'doc': fn.__doc__,
})
lcl = {}
exec_(func_text, globals_, lcl)
return lcl[name]
def _with_legacy_names(translations):
def decorate(fn):
fn._legacy_translations = translations
return fn
return decorate
def asbool(value):
return value is not None and \
value.lower() == 'true'
def rev_id():
return uuid.uuid4().hex[-12:]
def to_list(x, default=None):
if x is None:
return default
elif isinstance(x, string_types):
return [x]
elif isinstance(x, collections.Iterable):
return list(x)
else:
return [x]
def to_tuple(x, default=None):
if x is None:
return default
elif isinstance(x, string_types):
return (x, )
elif isinstance(x, collections.Iterable):
return tuple(x)
else:
return (x, )
def unique_list(seq, hashfunc=None):
seen = set()
seen_add = seen.add
if not hashfunc:
return [x for x in seq
if x not in seen
and not seen_add(x)]
else:
return [x for x in seq
if hashfunc(x) not in seen
and not seen_add(hashfunc(x))]
def dedupe_tuple(tup):
return tuple(unique_list(tup))
class memoized_property(object):
"""A read-only @property that is only evaluated once."""
def __init__(self, fget, doc=None):
self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
def __get__(self, obj, cls):
if obj is None:
return self
obj.__dict__[self.__name__] = result = self.fget(obj)
return result
class immutabledict(dict):
def _immutable(self, *arg, **kw):
raise TypeError("%s object is immutable" % self.__class__.__name__)
__delitem__ = __setitem__ = __setattr__ = \
clear = pop = popitem = setdefault = \
update = _immutable
def __new__(cls, *args):
new = dict.__new__(cls)
dict.__init__(new, *args)
return new
def __init__(self, *args):
pass
def __reduce__(self):
return immutabledict, (dict(self), )
def union(self, d):
if not self:
return immutabledict(d)
else:
d2 = immutabledict(self)
dict.update(d2, d)
return d2
def __repr__(self):
return "immutabledict(%s)" % dict.__repr__(self)
class Dispatcher(object):
def __init__(self, uselist=False):
self._registry = {}
self.uselist = uselist
def dispatch_for(self, target, qualifier='default'):
def decorate(fn):
if self.uselist:
self._registry.setdefault((target, qualifier), []).append(fn)
else:
assert (target, qualifier) not in self._registry
self._registry[(target, qualifier)] = fn
return fn
return decorate
def dispatch(self, obj, qualifier='default'):
if isinstance(obj, string_types):
targets = [obj]
elif isinstance(obj, type):
targets = obj.__mro__
else:
targets = type(obj).__mro__
for spcls in targets:
if qualifier != 'default' and (spcls, qualifier) in self._registry:
return self._fn_or_list(
self._registry[(spcls, qualifier)])
elif (spcls, 'default') in self._registry:
return self._fn_or_list(
self._registry[(spcls, 'default')])
else:
raise ValueError("no dispatch function for object: %s" % obj)
def _fn_or_list(self, fn_or_list):
if self.uselist:
def go(*arg, **kw):
for fn in fn_or_list:
fn(*arg, **kw)
return go
else:
return fn_or_list
def branch(self):
"""Return a copy of this dispatcher that is independently
writable."""
d = Dispatcher()
if self.uselist:
d._registry.update(
(k, [fn for fn in self._registry[k]])
for k in self._registry
)
else:
d._registry.update(self._registry)
return d