Merge pull request #98 from meejah/ticket97-loop-arg

Ticket97 loop arg
This commit is contained in:
Tobias Oberstein 2017-04-08 13:55:53 +02:00 committed by GitHub
commit b2df90f4db
8 changed files with 566 additions and 364 deletions

View File

@ -77,6 +77,33 @@ def test_batched_successful_call(framework_aio):
assert calls[2] == (("third call", ), dict())
def test_batched_successful_call_explicit_loop(framework_aio):
'''
batched calls really happen in batches
'''
# Trollius doesn't come with this, so won't work on py2
pytest.importorskip('asyncio.test_utils')
from asyncio.test_utils import TestLoop
def time_gen():
yield
yield
new_loop = TestLoop(time_gen)
calls = []
def foo(*args, **kw):
calls.append((args, kw))
txa = txaio.with_config(loop=new_loop)
batched = txa.make_batched_timer(5)
batched.call_later(1, foo, "first call")
new_loop.advance_time(2.0)
new_loop._run_once()
assert len(calls) == 1
def test_batched_cancel(framework_aio):
'''
we can cancel uncalled call_laters
@ -88,7 +115,6 @@ def test_batched_cancel(framework_aio):
def time_gen():
yield
yield
yield
new_loop = TestLoop(time_gen)
calls = []
@ -123,7 +149,6 @@ def test_batched_cancel_too_late(framework_aio):
def time_gen():
yield
yield
yield
new_loop = TestLoop(time_gen)
calls = []

View File

@ -29,6 +29,7 @@ from mock import patch
import pytest
import txaio
from txaio.testutil import replace_loop
from util import run_once
def test_default_reactor(framework_tx):
@ -64,6 +65,97 @@ def test_explicit_reactor_future(framework):
assert c[0] == 'call_soon'
def test_create_future_explicit_loop(framework):
"""
process events on alternate loop= for create_future later
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()
import asyncio
alt_loop = asyncio.new_event_loop()
txa = txaio.with_config(loop=alt_loop)
f = txa.create_future()
results = []
f.add_done_callback(lambda r: results.append(r.result()))
assert results == []
txaio.resolve(f, 'some result')
# run_once() runs the txaio.config.loop so we shouldn't get any
# results until we spin alt_loop
assert results == []
run_once()
assert results == []
with replace_loop(alt_loop):
run_once()
assert results == ['some result']
def test_create_future_success_explicit_loop(framework):
"""
process events on alternate loop= for create_future later
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()
import asyncio
alt_loop = asyncio.new_event_loop()
txa = txaio.with_config(loop=alt_loop)
f = txa.create_future_success('some result')
results = []
f.add_done_callback(lambda r: results.append(r.result()))
# run_once() runs the txaio.config.loop so we shouldn't get any
# results until we spin alt_loop
assert results == []
run_once()
assert results == []
with replace_loop(alt_loop):
run_once()
assert results == ['some result']
def test_create_future_failure_explicit_loop(framework):
"""
process events on alternate loop= for create_future later
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()
import asyncio
alt_loop = asyncio.new_event_loop()
the_exception = Exception('bad')
txa = txaio.with_config(loop=alt_loop)
f = txa.create_future_error(the_exception)
results = []
def boom(r):
try:
results.append(r.result())
except Exception as e:
results.append(e)
f.add_done_callback(boom)
# run_once() runs the txaio.config.loop so we shouldn't get any
# results until we spin alt_loop
assert results == []
run_once()
assert results == []
with replace_loop(alt_loop):
run_once()
assert results == [the_exception]
def test_explicit_reactor_coroutine(framework):
"""
If we set an event-loop, Futures + Tasks should use it.
@ -122,7 +214,6 @@ def test_call_later_aio(framework_aio):
# even though we only do one call, I guess TestLoop needs
# a "trailing" yield? "or something"
when = yield 0
print("Hmmm", when)
from asyncio.test_utils import TestLoop
new_loop = TestLoop(time_gen)

View File

@ -39,7 +39,7 @@ def run_once():
try:
import asyncio
from asyncio.test_utils import run_once as _run_once
return _run_once(asyncio.get_event_loop())
return _run_once(txaio.config.loop or asyncio.get_event_loop())
except ImportError:
import trollius as asyncio

View File

@ -36,7 +36,7 @@ version = __version__
# see aio.py for asyncio/trollius implementation
class _Config:
class _Config(object):
"""
This holds all valid configuration options, accessed as
class-level variables. For example, if you were using asyncio:
@ -58,6 +58,7 @@ class _Config:
__all__ = (
'with_config', # allow mutliple custom configurations at once
'using_twisted', # True if we're using Twisted
'using_asyncio', # True if we're using asyncio
'use_twisted', # sets the library to use Twisted, or exception

View File

@ -38,7 +38,7 @@ class _BatchedTimer(IBatchedTimer):
"""
def __init__(self, bucket_milliseconds, chunk_size,
seconds_provider, delayed_call_creator):
seconds_provider, delayed_call_creator, loop=None):
if bucket_milliseconds <= 0.0:
raise ValueError(
"bucket_milliseconds must be > 0.0"
@ -48,6 +48,7 @@ class _BatchedTimer(IBatchedTimer):
self._get_seconds = seconds_provider
self._create_delayed_call = delayed_call_creator
self._buckets = dict() # real seconds -> (IDelayedCall, list)
self._loop = loop
def call_later(self, delay, func, *args, **kwargs):
"""

View File

@ -46,6 +46,7 @@ def _throw_usage_error(*args, **kw):
# all the txaio API methods just raise the error
with_config = _throw_usage_error
create_future = _throw_usage_error
create_future_success = _throw_usage_error
create_future_error = _throw_usage_error

View File

@ -55,17 +55,60 @@ except ImportError:
from trollius import iscoroutine
from trollius import Future
config = _Config()
config.loop = asyncio.get_event_loop()
def with_config(loop=None):
"""
:return: an instance of the txaio API with the given
configuration. This won't affect anything using the 'gloabl'
config nor other instances created using this function.
If you need to customize txaio configuration separately (e.g. to
use multiple event-loops in asyncio), you can take code like this:
import txaio
class FunTimes(object):
def something_async(self):
return txaio.call_later(1, lambda: 'some result')
and instead do this:
import txaio
class FunTimes(object):
txaio = txaio
def something_async(self):
# this will run in the local/new event loop created in the constructor
return self.txaio.call_later(1, lambda: 'some result')
fun0 = FunTimes()
fun1 = FunTimes()
fun1.txaio = txaio.with_config(loop=asyncio.new_event_loop())
So `fun1` will run its futures on the newly-created event loop,
while `fun0` will work just as it did before this `with_config`
method was introduced (after 2.6.2).
"""
cfg = _Config()
if loop is not None:
cfg.loop = loop
return _AsyncioApi(cfg)
# logging should probably all be folded into _AsyncioApi as well
_stderr, _stdout = sys.stderr, sys.stdout
_loggers = weakref.WeakSet() # weak-ref's of each logger we've created before start_logging()
_log_level = 'info' # re-set by start_logging
_started_logging = False
_categories = {}
using_twisted = False
using_asyncio = True
def add_log_categories(categories):
_categories.update(categories)
@ -101,7 +144,8 @@ class FailedFuture(IFailedFuture):
return str(self.value)
# API methods for txaio, exported via the top-level __init__.py
# logging API methods
def _log(logger, level, format=u'', **kwargs):
@ -226,192 +270,6 @@ def start_logging(out=_stdout, level='info'):
logger._set_log_level(level)
def failure_message(fail):
"""
:param fail: must be an IFailedFuture
returns a unicode error-message
"""
try:
return u'{0}: {1}'.format(
fail._value.__class__.__name__,
str(fail._value),
)
except Exception:
return u'Failed to produce failure message for "{0}"'.format(fail)
def failure_traceback(fail):
"""
:param fail: must be an IFailedFuture
returns a traceback instance
"""
return fail._traceback
def failure_format_traceback(fail):
"""
:param fail: must be an IFailedFuture
returns a string
"""
try:
f = six.StringIO()
traceback.print_exception(
fail._type,
fail.value,
fail._traceback,
file=f,
)
return f.getvalue()
except Exception:
return u"Failed to format failure traceback for '{0}'".format(fail)
_unspecified = object()
def create_future(result=_unspecified, error=_unspecified):
if result is not _unspecified and error is not _unspecified:
raise ValueError("Cannot have both result and error.")
f = Future(loop=config.loop)
if result is not _unspecified:
resolve(f, result)
elif error is not _unspecified:
reject(f, error)
return f
def create_future_success(result):
return create_future(result=result)
def create_future_error(error=None):
f = create_future()
reject(f, error)
return f
def as_future(fun, *args, **kwargs):
try:
res = fun(*args, **kwargs)
except Exception:
return create_future_error(create_failure())
else:
if isinstance(res, Future):
return res
elif iscoroutine(res):
return asyncio.Task(res, loop=config.loop)
else:
return create_future_success(res)
def is_future(obj):
return iscoroutine(obj) or isinstance(obj, Future)
def call_later(delay, fun, *args, **kwargs):
# loop.call_later doesn't support kwargs
real_call = functools.partial(fun, *args, **kwargs)
return config.loop.call_later(delay, real_call)
def make_batched_timer(bucket_seconds, chunk_size=100):
"""
Creates and returns an object implementing
:class:`txaio.IBatchedTimer`.
:param bucket_seconds: the number of seconds in each bucket. That
is, a value of 5 means that any timeout within a 5 second
window will be in the same bucket, and get notified at the
same time. This is only accurate to "milliseconds".
:param chunk_size: when "doing" the callbacks in a particular
bucket, this controls how many we do at once before yielding to
the reactor.
"""
def get_seconds():
return config.loop.time()
return _BatchedTimer(
bucket_seconds * 1000.0, chunk_size,
seconds_provider=get_seconds,
delayed_call_creator=call_later,
)
def is_called(future):
return future.done()
def resolve(future, result=None):
future.set_result(result)
def reject(future, error=None):
if error is None:
error = create_failure() # will be error if we're not in an "except"
elif isinstance(error, Exception):
error = FailedFuture(type(error), error, None)
else:
if not isinstance(error, IFailedFuture):
raise RuntimeError("reject requires an IFailedFuture or Exception")
future.set_exception(error.value)
def create_failure(exception=None):
"""
This returns an object implementing IFailedFuture.
If exception is None (the default) we MUST be called within an
"except" block (such that sys.exc_info() returns useful
information).
"""
if exception:
return FailedFuture(type(exception), exception, None)
return FailedFuture(*sys.exc_info())
def add_callbacks(future, callback, errback):
"""
callback or errback may be None, but at least one must be
non-None.
XXX beware the "f._result" hack to get "chainable-callback" type
behavior.
"""
def done(f):
try:
res = f.result()
if callback:
x = callback(res)
if x is not None:
f._result = x
except Exception:
if errback:
errback(create_failure())
return future.add_done_callback(done)
def gather(futures, consume_exceptions=True):
"""
This returns a Future that waits for all the Futures in the list
``futures``
:param futures: a list of Futures (or coroutines?)
:param consume_exceptions: if True, any errors are eaten and
returned in the result list.
"""
# from the asyncio docs: "If return_exceptions is True, exceptions
# in the tasks are treated the same as successful results, and
# gathered in the result list; otherwise, the first raised
# exception will be immediately propagated to the returned
# future."
return asyncio.gather(*futures, return_exceptions=consume_exceptions)
def set_global_log_level(level):
"""
Set the global log level on all loggers instantiated by txaio.
@ -426,13 +284,219 @@ def get_global_log_level():
return _log_level
def sleep(delay, loop=None):
"""
Inline sleep for use in co-routines.
# asyncio API methods; the module-level functions are (now, for
# backwards-compat) exported from a default instance of this class
:param delay: Time to sleep in seconds.
:type delay: float
:param reactor: The asyncio loop to use.
:type reactor: None (to use the default loop) or a loop.
"""
raise Exception('not implemented yet')
_unspecified = object()
class _AsyncioApi(object):
using_twisted = False
using_asyncio = True
def __init__(self, config):
if config.loop is None:
config.loop = asyncio.get_event_loop()
self._config = config
def failure_message(self, fail):
"""
:param fail: must be an IFailedFuture
returns a unicode error-message
"""
try:
return u'{0}: {1}'.format(
fail._value.__class__.__name__,
str(fail._value),
)
except Exception:
return u'Failed to produce failure message for "{0}"'.format(fail)
def failure_traceback(self, fail):
"""
:param fail: must be an IFailedFuture
returns a traceback instance
"""
return fail._traceback
def failure_format_traceback(self, fail):
"""
:param fail: must be an IFailedFuture
returns a string
"""
try:
f = six.StringIO()
traceback.print_exception(
fail._type,
fail.value,
fail._traceback,
file=f,
)
return f.getvalue()
except Exception:
return u"Failed to format failure traceback for '{0}'".format(fail)
def create_future(self, result=_unspecified, error=_unspecified):
if result is not _unspecified and error is not _unspecified:
raise ValueError("Cannot have both result and error.")
f = Future(loop=self._config.loop)
if result is not _unspecified:
resolve(f, result)
elif error is not _unspecified:
reject(f, error)
return f
def create_future_success(self, result):
return self.create_future(result=result)
def create_future_error(self, error=None):
f = self.create_future()
reject(f, error)
return f
def as_future(self, fun, *args, **kwargs):
try:
res = fun(*args, **kwargs)
except Exception:
return create_future_error(create_failure())
else:
if isinstance(res, Future):
return res
elif iscoroutine(res):
return asyncio.Task(res, loop=self._config.loop)
else:
return create_future_success(res)
def is_future(self, obj):
return iscoroutine(obj) or isinstance(obj, Future)
def call_later(self, delay, fun, *args, **kwargs):
# loop.call_later doesn't support kwargs
real_call = functools.partial(fun, *args, **kwargs)
return self._config.loop.call_later(delay, real_call)
def make_batched_timer(self, bucket_seconds, chunk_size=100):
"""
Creates and returns an object implementing
:class:`txaio.IBatchedTimer`.
:param bucket_seconds: the number of seconds in each bucket. That
is, a value of 5 means that any timeout within a 5 second
window will be in the same bucket, and get notified at the
same time. This is only accurate to "milliseconds".
:param chunk_size: when "doing" the callbacks in a particular
bucket, this controls how many we do at once before yielding to
the reactor.
"""
def get_seconds():
return self._config.loop.time()
return _BatchedTimer(
bucket_seconds * 1000.0, chunk_size,
seconds_provider=get_seconds,
delayed_call_creator=self.call_later,
)
def is_called(self, future):
return future.done()
def resolve(self, future, result=None):
future.set_result(result)
def reject(self, future, error=None):
if error is None:
error = create_failure() # will be error if we're not in an "except"
elif isinstance(error, Exception):
error = FailedFuture(type(error), error, None)
else:
if not isinstance(error, IFailedFuture):
raise RuntimeError("reject requires an IFailedFuture or Exception")
future.set_exception(error.value)
def create_failure(self, exception=None):
"""
This returns an object implementing IFailedFuture.
If exception is None (the default) we MUST be called within an
"except" block (such that sys.exc_info() returns useful
information).
"""
if exception:
return FailedFuture(type(exception), exception, None)
return FailedFuture(*sys.exc_info())
def add_callbacks(self, future, callback, errback):
"""
callback or errback may be None, but at least one must be
non-None.
XXX beware the "f._result" hack to get "chainable-callback" type
behavior.
"""
def done(f):
try:
res = f.result()
if callback:
x = callback(res)
if x is not None:
f._result = x
except Exception:
if errback:
errback(create_failure())
return future.add_done_callback(done)
def gather(self, futures, consume_exceptions=True):
"""
This returns a Future that waits for all the Futures in the list
``futures``
:param futures: a list of Futures (or coroutines?)
:param consume_exceptions: if True, any errors are eaten and
returned in the result list.
"""
# from the asyncio docs: "If return_exceptions is True, exceptions
# in the tasks are treated the same as successful results, and
# gathered in the result list; otherwise, the first raised
# exception will be immediately propagated to the returned
# future."
return asyncio.gather(*futures, return_exceptions=consume_exceptions)
def sleep(self, delay):
"""
Inline sleep for use in co-routines.
:param delay: Time to sleep in seconds.
:type delay: float
"""
return asyncio.sleep(delay)
_default_api = _AsyncioApi(config)
using_twisted = _default_api.using_twisted
using_asyncio = _default_api.using_asyncio
sleep = _default_api.sleep
failure_message = _default_api.failure_message
failure_traceback = _default_api.failure_traceback
failure_format_traceback = _default_api.failure_format_traceback
create_future = _default_api.create_future
create_future_success = _default_api.create_future_success
create_future_error = _default_api.create_future_error
as_future = _default_api.as_future
is_future = _default_api.is_future
call_later = _default_api.call_later
make_batched_timer = _default_api.make_batched_timer
is_called = _default_api.is_called
resolve = _default_api.resolve
reject = _default_api.reject
create_failure = _default_api.create_failure
add_callbacks = _default_api.add_callbacks
gather = _default_api.gather
sleep = _default_api.sleep

View File

@ -130,6 +130,18 @@ def add_log_categories(categories):
_categories.update(categories)
def with_config(loop=None):
global config
if loop is not None:
if config.loop is not None and config.loop is not loop:
raise RuntimeError(
"Twisted has only a single, global reactor. You passed in "
"a reactor different from the one already configured "
"in txaio.config.loop"
)
return _TxApi(config)
# NOTE: beware that twisted.logger._logger.Logger copies itself via an
# overriden __get__ method when used as recommended as a class
# descriptor. So, we override __get__ to just return ``self`` which
@ -344,183 +356,184 @@ def start_logging(out=_stdout, level='info'):
log.startLogging(out)
def failure_message(fail):
"""
:param fail: must be an IFailedFuture
returns a unicode error-message
"""
try:
return u'{0}: {1}'.format(
fail.value.__class__.__name__,
fail.getErrorMessage(),
)
except Exception:
return 'Failed to produce failure message for "{0}"'.format(fail)
def failure_traceback(fail):
"""
:param fail: must be an IFailedFuture
returns a traceback instance
"""
return fail.tb
def failure_format_traceback(fail):
"""
:param fail: must be an IFailedFuture
returns a string
"""
try:
f = six.StringIO()
fail.printTraceback(file=f)
return f.getvalue()
except Exception:
return u"Failed to format failure traceback for '{0}'".format(fail)
_unspecified = object()
def create_future(result=_unspecified, error=_unspecified):
if result is not _unspecified and error is not _unspecified:
raise ValueError("Cannot have both result and error.")
class _TxApi(object):
f = Deferred()
if result is not _unspecified:
resolve(f, result)
elif error is not _unspecified:
reject(f, error)
return f
def __init__(self, config):
self._config = config
def failure_message(self, fail):
"""
:param fail: must be an IFailedFuture
returns a unicode error-message
"""
try:
return u'{0}: {1}'.format(
fail.value.__class__.__name__,
fail.getErrorMessage(),
)
except Exception:
return 'Failed to produce failure message for "{0}"'.format(fail)
# maybe delete, just use create_future()
def create_future_success(result):
return succeed(result)
def failure_traceback(self, fail):
"""
:param fail: must be an IFailedFuture
returns a traceback instance
"""
return fail.tb
def failure_format_traceback(self, fail):
"""
:param fail: must be an IFailedFuture
returns a string
"""
try:
f = six.StringIO()
fail.printTraceback(file=f)
return f.getvalue()
except Exception:
return u"Failed to format failure traceback for '{0}'".format(fail)
# maybe delete, just use create_future()
def create_future_error(error=None):
return fail(create_failure(error))
def create_future(self, result=_unspecified, error=_unspecified):
if result is not _unspecified and error is not _unspecified:
raise ValueError("Cannot have both result and error.")
f = Deferred()
if result is not _unspecified:
resolve(f, result)
elif error is not _unspecified:
reject(f, error)
return f
def as_future(fun, *args, **kwargs):
return maybeDeferred(fun, *args, **kwargs)
def create_future_success(self, result):
return succeed(result)
def create_future_error(self, error=None):
return fail(create_failure(error))
def is_future(obj):
return isinstance(obj, Deferred)
def as_future(self, fun, *args, **kwargs):
return maybeDeferred(fun, *args, **kwargs)
def is_future(self, obj):
return isinstance(obj, Deferred)
def call_later(delay, fun, *args, **kwargs):
return IReactorTime(_get_loop()).callLater(delay, fun, *args, **kwargs)
def call_later(self, delay, fun, *args, **kwargs):
return IReactorTime(self._get_loop()).callLater(delay, fun, *args, **kwargs)
def make_batched_timer(self, bucket_seconds, chunk_size=100):
"""
Creates and returns an object implementing
:class:`txaio.IBatchedTimer`.
def make_batched_timer(bucket_seconds, chunk_size=100):
"""
Creates and returns an object implementing
:class:`txaio.IBatchedTimer`.
:param bucket_seconds: the number of seconds in each bucket. That
is, a value of 5 means that any timeout within a 5 second
window will be in the same bucket, and get notified at the
same time. This is only accurate to "milliseconds".
:param bucket_seconds: the number of seconds in each bucket. That
is, a value of 5 means that any timeout within a 5 second
window will be in the same bucket, and get notified at the
same time. This is only accurate to "milliseconds".
:param chunk_size: when "doing" the callbacks in a particular
bucket, this controls how many we do at once before yielding to
the reactor.
"""
:param chunk_size: when "doing" the callbacks in a particular
bucket, this controls how many we do at once before yielding to
the reactor.
"""
def get_seconds():
return self._get_loop().seconds()
def get_seconds():
return _get_loop().seconds()
def create_delayed_call(delay, fun, *args, **kwargs):
return self._get_loop().callLater(delay, fun, *args, **kwargs)
def create_delayed_call(delay, fun, *args, **kwargs):
return _get_loop().callLater(delay, fun, *args, **kwargs)
return _BatchedTimer(
bucket_seconds * 1000.0, chunk_size,
seconds_provider=get_seconds,
delayed_call_creator=create_delayed_call,
)
return _BatchedTimer(
bucket_seconds * 1000.0, chunk_size,
seconds_provider=get_seconds,
delayed_call_creator=create_delayed_call,
)
def is_called(self, future):
return future.called
def resolve(self, future, result=None):
future.callback(result)
def is_called(future):
return future.called
def reject(self, future, error=None):
if error is None:
error = create_failure()
elif isinstance(error, Exception):
error = Failure(error)
else:
if not isinstance(error, Failure):
raise RuntimeError("reject requires a Failure or Exception")
future.errback(error)
def create_failure(self, exception=None):
"""
Create a Failure instance.
def resolve(future, result=None):
future.callback(result)
if ``exception`` is None (the default), we MUST be inside an
"except" block. This encapsulates the exception into an object
that implements IFailedFuture
"""
if exception:
return Failure(exception)
return Failure()
def add_callbacks(self, future, callback, errback):
"""
callback or errback may be None, but at least one must be
non-None.
"""
assert future is not None
if callback is None:
assert errback is not None
future.addErrback(errback)
else:
# Twisted allows errback to be None here
future.addCallbacks(callback, errback)
return future
def reject(future, error=None):
if error is None:
error = create_failure()
elif isinstance(error, Exception):
error = Failure(error)
else:
if not isinstance(error, Failure):
raise RuntimeError("reject requires a Failure or Exception")
future.errback(error)
def gather(self, futures, consume_exceptions=True):
def completed(res):
rtn = []
for (ok, value) in res:
rtn.append(value)
if not ok and not consume_exceptions:
value.raiseException()
return rtn
# XXX if consume_exceptions is False in asyncio.gather(), it will
# abort on the first raised exception -- should we set
# fireOnOneErrback=True (if consume_exceptions=False?) -- but then
# we'll have to wrap the errback() to extract the "real" failure
# from the FirstError that gets thrown if you set that ...
def create_failure(exception=None):
"""
Create a Failure instance.
dl = DeferredList(list(futures), consumeErrors=consume_exceptions)
# we unpack the (ok, value) tuples into just a list of values, so
# that the callback() gets the same value in asyncio and Twisted.
add_callbacks(dl, completed, None)
return dl
if ``exception`` is None (the default), we MUST be inside an
"except" block. This encapsulates the exception into an object
that implements IFailedFuture
"""
if exception:
return Failure(exception)
return Failure()
def sleep(self, delay):
"""
Inline sleep for use in co-routines.
:param delay: Time to sleep in seconds.
:type delay: float
"""
d = Deferred()
self._get_loop().callLater(delay, d.callback, None)
return d
def add_callbacks(future, callback, errback):
"""
callback or errback may be None, but at least one must be
non-None.
"""
assert future is not None
if callback is None:
assert errback is not None
future.addErrback(errback)
else:
# Twisted allows errback to be None here
future.addCallbacks(callback, errback)
return future
def gather(futures, consume_exceptions=True):
def completed(res):
rtn = []
for (ok, value) in res:
rtn.append(value)
if not ok and not consume_exceptions:
value.raiseException()
return rtn
# XXX if consume_exceptions is False in asyncio.gather(), it will
# abort on the first raised exception -- should we set
# fireOnOneErrback=True (if consume_exceptions=False?) -- but then
# we'll have to wrap the errback() to extract the "real" failure
# from the FirstError that gets thrown if you set that ...
dl = DeferredList(list(futures), consumeErrors=consume_exceptions)
# we unpack the (ok, value) tuples into just a list of values, so
# that the callback() gets the same value in asyncio and Twisted.
add_callbacks(dl, completed, None)
return dl
# methods internal to this implementation
def _get_loop():
if config.loop is None:
from twisted.internet import reactor
config.loop = reactor
return config.loop
def _get_loop(self):
"""
internal helper
"""
# we import and assign the default here (and not, e.g., when
# making Config) so as to delay importing reactor as long as
# possible in case someone is installing a custom one.
if self._config.loop is None:
from twisted.internet import reactor
self._config.loop = reactor
return self._config.loop
def set_global_log_level(level):
@ -538,17 +551,23 @@ def get_global_log_level():
return _log_level
def sleep(delay, reactor=None):
"""
Inline sleep for use in co-routines.
_default_api = _TxApi(config)
:param delay: Time to sleep in seconds.
:type delay: float
:param reactor: The Twisted reactor to use.
:type reactor: None or provider of ``IReactorTime``.
"""
if not reactor:
from twisted.internet import reactor
d = Deferred()
reactor.callLater(delay, d.callback, None)
return d
failure_message = _default_api.failure_message
failure_traceback = _default_api.failure_traceback
failure_format_traceback = _default_api.failure_format_traceback
create_future = _default_api.create_future
create_future_success = _default_api.create_future_success
create_future_error = _default_api.create_future_error
as_future = _default_api.as_future
is_future = _default_api.is_future
call_later = _default_api.call_later
make_batched_timer = _default_api.make_batched_timer
is_called = _default_api.is_called
resolve = _default_api.resolve
reject = _default_api.reject
create_failure = _default_api.create_failure
add_callbacks = _default_api.add_callbacks
gather = _default_api.gather
sleep = _default_api.sleep