do calls via custom loops, flake8
This commit is contained in:
parent
5896f6e613
commit
99c48038b4
101
txaio/aio.py
101
txaio/aio.py
|
@ -59,6 +59,49 @@ except ImportError:
|
|||
config = _Config()
|
||||
|
||||
|
||||
def with_config(loop=None):
|
||||
"""
|
||||
:return: an instance of the txaio API with the given
|
||||
configuration. This won't affect anything using the 'gloabl'
|
||||
config nor other instances created using this function.
|
||||
|
||||
If you need to customize txaio configuration separately (e.g. to
|
||||
use multiple event-loops in asyncio), you can take code like this:
|
||||
|
||||
import txaio
|
||||
|
||||
|
||||
class FunTimes(object):
|
||||
|
||||
def something_async(self):
|
||||
return txaio.call_later(1, lambda: 'some result')
|
||||
|
||||
and instead do this:
|
||||
|
||||
import txaio
|
||||
|
||||
|
||||
class FunTimes(object):
|
||||
txaio = txaio
|
||||
|
||||
def something_async(self):
|
||||
# this will run in the local/new event loop created in the constructor
|
||||
return self.txaio.call_later(1, lambda: 'some result')
|
||||
|
||||
fun0 = FunTimes()
|
||||
fun1 = FunTimes()
|
||||
fun1.txaio = txaio.with_config(loop=asyncio.new_event_loop())
|
||||
|
||||
So `fun1` will run its futures on the newly-created event loop,
|
||||
while `fun0` will work just as it did before this `with_config`
|
||||
method was introduced (after 2.6.2).
|
||||
"""
|
||||
cfg = _Config()
|
||||
if loop is not None:
|
||||
cfg.loop = loop
|
||||
return _AsyncioApi(cfg)
|
||||
|
||||
|
||||
# logging should probably all be folded into _AsyncioApi as well
|
||||
_stderr, _stdout = sys.stderr, sys.stdout
|
||||
_loggers = weakref.WeakSet() # weak-ref's of each logger we've created before start_logging()
|
||||
|
@ -257,7 +300,6 @@ class _AsyncioApi(object):
|
|||
config.loop = asyncio.get_event_loop()
|
||||
self._config = config
|
||||
|
||||
|
||||
def failure_message(self, fail):
|
||||
"""
|
||||
:param fail: must be an IFailedFuture
|
||||
|
@ -271,7 +313,6 @@ class _AsyncioApi(object):
|
|||
except Exception:
|
||||
return u'Failed to produce failure message for "{0}"'.format(fail)
|
||||
|
||||
|
||||
def failure_traceback(self, fail):
|
||||
"""
|
||||
:param fail: must be an IFailedFuture
|
||||
|
@ -279,7 +320,6 @@ class _AsyncioApi(object):
|
|||
"""
|
||||
return fail._traceback
|
||||
|
||||
|
||||
def failure_format_traceback(self, fail):
|
||||
"""
|
||||
:param fail: must be an IFailedFuture
|
||||
|
@ -297,31 +337,25 @@ class _AsyncioApi(object):
|
|||
except Exception:
|
||||
return u"Failed to format failure traceback for '{0}'".format(fail)
|
||||
|
||||
|
||||
def create_future(self, result=_unspecified, error=_unspecified, loop=None):
|
||||
def create_future(self, result=_unspecified, error=_unspecified):
|
||||
if result is not _unspecified and error is not _unspecified:
|
||||
raise ValueError("Cannot have both result and error.")
|
||||
|
||||
f = Future(loop=loop or config.loop)
|
||||
f = Future(loop=self._config.loop)
|
||||
if result is not _unspecified:
|
||||
resolve(f, result)
|
||||
elif error is not _unspecified:
|
||||
reject(f, error)
|
||||
return f
|
||||
|
||||
def create_future_success(self, result):
|
||||
return self.create_future(result=result)
|
||||
|
||||
def create_future_success(self, result, loop=None):
|
||||
return create_future(result=result, loop=loop)
|
||||
|
||||
|
||||
def create_future_error(self, error=None, loop=None):
|
||||
f = create_future(loop=loop)
|
||||
def create_future_error(self, error=None):
|
||||
f = self.create_future()
|
||||
reject(f, error)
|
||||
return f
|
||||
|
||||
|
||||
# XXX how to pass "loop" arg? could pop it out of kwargs, but .. what
|
||||
# if you're "as_future"-ing a function that itself takes a "loop" arg?
|
||||
def as_future(self, fun, *args, **kwargs):
|
||||
try:
|
||||
res = fun(*args, **kwargs)
|
||||
|
@ -331,22 +365,19 @@ class _AsyncioApi(object):
|
|||
if isinstance(res, Future):
|
||||
return res
|
||||
elif iscoroutine(res):
|
||||
return asyncio.Task(res, loop=config.loop)
|
||||
return asyncio.Task(res, loop=self._config.loop)
|
||||
else:
|
||||
return create_future_success(res)
|
||||
|
||||
|
||||
def is_future(self, obj):
|
||||
return iscoroutine(obj) or isinstance(obj, Future)
|
||||
|
||||
|
||||
def call_later(self, delay, fun, *args, **kwargs):
|
||||
# loop.call_later doesn't support kwargs
|
||||
real_call = functools.partial(fun, *args, **kwargs)
|
||||
return config.loop.call_later(delay, real_call)
|
||||
return self._config.loop.call_later(delay, real_call)
|
||||
|
||||
|
||||
def make_batched_timer(self, bucket_seconds, chunk_size=100, loop=None):
|
||||
def make_batched_timer(self, bucket_seconds, chunk_size=100):
|
||||
"""
|
||||
Creates and returns an object implementing
|
||||
:class:`txaio.IBatchedTimer`.
|
||||
|
@ -361,37 +392,21 @@ class _AsyncioApi(object):
|
|||
the reactor.
|
||||
"""
|
||||
|
||||
# XXX this duplicates code from 'call_later', but I don't see an
|
||||
# alternative
|
||||
if loop is not None:
|
||||
|
||||
def _create_call_later(delay, fun, *args, **kwargs):
|
||||
real_call = functools.partial(fun, *args, **kwargs)
|
||||
return loop.call_later(delay, real_call)
|
||||
the_loop = loop
|
||||
|
||||
else:
|
||||
_create_call_later = call_later
|
||||
the_loop = config.loop
|
||||
|
||||
def get_seconds():
|
||||
return the_loop.time()
|
||||
return self._config.loop.time()
|
||||
|
||||
return _BatchedTimer(
|
||||
bucket_seconds * 1000.0, chunk_size,
|
||||
seconds_provider=get_seconds,
|
||||
delayed_call_creator=_create_call_later,
|
||||
delayed_call_creator=self.call_later,
|
||||
)
|
||||
|
||||
|
||||
def is_called(self, future):
|
||||
return future.done()
|
||||
|
||||
|
||||
def resolve(self, future, result=None):
|
||||
future.set_result(result)
|
||||
|
||||
|
||||
def reject(self, future, error=None):
|
||||
if error is None:
|
||||
error = create_failure() # will be error if we're not in an "except"
|
||||
|
@ -402,7 +417,6 @@ class _AsyncioApi(object):
|
|||
raise RuntimeError("reject requires an IFailedFuture or Exception")
|
||||
future.set_exception(error.value)
|
||||
|
||||
|
||||
def create_failure(self, exception=None):
|
||||
"""
|
||||
This returns an object implementing IFailedFuture.
|
||||
|
@ -415,7 +429,6 @@ class _AsyncioApi(object):
|
|||
return FailedFuture(type(exception), exception, None)
|
||||
return FailedFuture(*sys.exc_info())
|
||||
|
||||
|
||||
def add_callbacks(self, future, callback, errback):
|
||||
"""
|
||||
callback or errback may be None, but at least one must be
|
||||
|
@ -436,7 +449,6 @@ class _AsyncioApi(object):
|
|||
errback(create_failure())
|
||||
return future.add_done_callback(done)
|
||||
|
||||
|
||||
def gather(self, futures, consume_exceptions=True):
|
||||
"""
|
||||
This returns a Future that waits for all the Futures in the list
|
||||
|
@ -462,9 +474,8 @@ class _AsyncioApi(object):
|
|||
:param delay: Time to sleep in seconds.
|
||||
:type delay: float
|
||||
"""
|
||||
d = Deferred()
|
||||
self._config.loop.callLater(delay, d.callback, None)
|
||||
return d
|
||||
return asyncio.sleep(delay)
|
||||
|
||||
|
||||
_default_api = _AsyncioApi(config)
|
||||
|
||||
|
|
Loading…
Reference in New Issue