batched_timer support for loop=

This commit is contained in:
meejah 2017-04-06 01:25:00 -06:00
parent 92b064f795
commit ed81db742f
3 changed files with 43 additions and 6 deletions

View File

@ -77,6 +77,31 @@ def test_batched_successful_call(framework_aio):
assert calls[2] == (("third call", ), dict())
def test_batched_successful_call_explicit_loop(framework_aio):
'''
batched calls really happen in batches
'''
# Trollius doesn't come with this, so won't work on py2
pytest.importorskip('asyncio.test_utils')
from asyncio.test_utils import TestLoop
def time_gen():
yield
yield
new_loop = TestLoop(time_gen)
calls = []
def foo(*args, **kw):
calls.append((args, kw))
batched = txaio.make_batched_timer(5, loop=new_loop)
batched.call_later(1, foo, "first call")
new_loop.advance_time(2.0)
new_loop._run_once()
assert len(calls) == 1
def test_batched_cancel(framework_aio):
'''
we can cancel uncalled call_laters
@ -88,7 +113,6 @@ def test_batched_cancel(framework_aio):
def time_gen():
yield
yield
yield
new_loop = TestLoop(time_gen)
calls = []
@ -123,7 +147,6 @@ def test_batched_cancel_too_late(framework_aio):
def time_gen():
yield
yield
yield
new_loop = TestLoop(time_gen)
calls = []

View File

@ -38,7 +38,7 @@ class _BatchedTimer(IBatchedTimer):
"""
def __init__(self, bucket_milliseconds, chunk_size,
seconds_provider, delayed_call_creator):
seconds_provider, delayed_call_creator, loop=None):
if bucket_milliseconds <= 0.0:
raise ValueError(
"bucket_milliseconds must be > 0.0"
@ -48,6 +48,7 @@ class _BatchedTimer(IBatchedTimer):
self._get_seconds = seconds_provider
self._create_delayed_call = delayed_call_creator
self._buckets = dict() # real seconds -> (IDelayedCall, list)
self._loop = loop
def call_later(self, delay, func, *args, **kwargs):
"""

View File

@ -317,7 +317,7 @@ def call_later(delay, fun, *args, **kwargs):
return config.loop.call_later(delay, real_call)
def make_batched_timer(bucket_seconds, chunk_size=100):
def make_batched_timer(bucket_seconds, chunk_size=100, loop=None):
"""
Creates and returns an object implementing
:class:`txaio.IBatchedTimer`.
@ -332,13 +332,26 @@ def make_batched_timer(bucket_seconds, chunk_size=100):
the reactor.
"""
# XXX this duplicates code from 'call_later', but I don't see an
# alternative
if loop is not None:
def _create_call_later(delay, fun, *args, **kwargs):
real_call = functools.partial(fun, *args, **kwargs)
return loop.call_later(delay, real_call)
the_loop = loop
else:
_create_call_later = call_later
the_loop = config.loop
def get_seconds():
return config.loop.time()
return the_loop.time()
return _BatchedTimer(
bucket_seconds * 1000.0, chunk_size,
seconds_provider=get_seconds,
delayed_call_creator=call_later,
delayed_call_creator=_create_call_later,
)