deb-python-eventlet/tests/queue_test.py

341 lines
9.5 KiB
Python

import eventlet
from eventlet import event, hubs, queue
from tests import LimitedTestCase, main
def do_bail(q):
eventlet.Timeout(0, RuntimeError())
try:
result = q.get()
return result
except RuntimeError:
return 'timed out'
class TestQueue(LimitedTestCase):
def test_send_first(self):
q = eventlet.Queue()
q.put('hi')
self.assertEqual(q.get(), 'hi')
def test_send_last(self):
q = eventlet.Queue()
def waiter(q):
self.assertEqual(q.get(), 'hi2')
gt = eventlet.spawn(eventlet.with_timeout, 0.1, waiter, q)
eventlet.sleep(0)
eventlet.sleep(0)
q.put('hi2')
gt.wait()
def test_max_size(self):
q = eventlet.Queue(2)
results = []
def putter(q):
q.put('a')
results.append('a')
q.put('b')
results.append('b')
q.put('c')
results.append('c')
gt = eventlet.spawn(putter, q)
eventlet.sleep(0)
self.assertEqual(results, ['a', 'b'])
self.assertEqual(q.get(), 'a')
eventlet.sleep(0)
self.assertEqual(results, ['a', 'b', 'c'])
self.assertEqual(q.get(), 'b')
self.assertEqual(q.get(), 'c')
gt.wait()
def test_zero_max_size(self):
q = eventlet.Queue(0)
def sender(evt, q):
q.put('hi')
evt.send('done')
def receiver(q):
x = q.get()
return x
evt = event.Event()
gt = eventlet.spawn(sender, evt, q)
eventlet.sleep(0)
assert not evt.ready()
gt2 = eventlet.spawn(receiver, q)
self.assertEqual(gt2.wait(), 'hi')
self.assertEqual(evt.wait(), 'done')
gt.wait()
def test_resize_up(self):
q = eventlet.Queue(0)
def sender(evt, q):
q.put('hi')
evt.send('done')
evt = event.Event()
gt = eventlet.spawn(sender, evt, q)
eventlet.sleep(0)
assert not evt.ready()
q.resize(1)
eventlet.sleep(0)
assert evt.ready()
gt.wait()
def test_resize_down(self):
q = eventlet.Queue(5)
for i in range(5):
q.put(i)
self.assertEqual(list(q.queue), list(range(5)))
q.resize(1)
eventlet.sleep(0)
self.assertEqual(list(q.queue), list(range(5)))
def test_resize_to_Unlimited(self):
q = eventlet.Queue(0)
def sender(evt, q):
q.put('hi')
evt.send('done')
evt = event.Event()
gt = eventlet.spawn(sender, evt, q)
eventlet.sleep()
self.assertFalse(evt.ready())
q.resize(None)
eventlet.sleep()
self.assertTrue(evt.ready())
gt.wait()
def test_multiple_waiters(self):
# tests that multiple waiters get their results back
q = eventlet.Queue()
sendings = ['1', '2', '3', '4']
gts = [eventlet.spawn(q.get) for x in sendings]
eventlet.sleep(0.01) # get 'em all waiting
q.put(sendings[0])
q.put(sendings[1])
q.put(sendings[2])
q.put(sendings[3])
results = set()
for i, gt in enumerate(gts):
results.add(gt.wait())
self.assertEqual(results, set(sendings))
def test_waiters_that_cancel(self):
q = eventlet.Queue()
gt = eventlet.spawn(do_bail, q)
self.assertEqual(gt.wait(), 'timed out')
q.put('hi')
self.assertEqual(q.get(), 'hi')
def test_getting_before_sending(self):
q = eventlet.Queue()
gt = eventlet.spawn(q.put, 'sent')
self.assertEqual(q.get(), 'sent')
gt.wait()
def test_two_waiters_one_dies(self):
def waiter(q):
return q.get()
q = eventlet.Queue()
dying = eventlet.spawn(do_bail, q)
waiting = eventlet.spawn(waiter, q)
eventlet.sleep(0)
q.put('hi')
self.assertEqual(dying.wait(), 'timed out')
self.assertEqual(waiting.wait(), 'hi')
def test_two_bogus_waiters(self):
q = eventlet.Queue()
gt1 = eventlet.spawn(do_bail, q)
gt2 = eventlet.spawn(do_bail, q)
eventlet.sleep(0)
q.put('sent')
self.assertEqual(gt1.wait(), 'timed out')
self.assertEqual(gt2.wait(), 'timed out')
self.assertEqual(q.get(), 'sent')
def test_waiting(self):
q = eventlet.Queue()
gt1 = eventlet.spawn(q.get)
eventlet.sleep(0)
self.assertEqual(1, q.getting())
q.put('hi')
eventlet.sleep(0)
self.assertEqual(0, q.getting())
self.assertEqual('hi', gt1.wait())
self.assertEqual(0, q.getting())
def test_channel_send(self):
channel = eventlet.Queue(0)
events = []
def another_greenlet():
events.append(channel.get())
events.append(channel.get())
eventlet.spawn(another_greenlet)
events.append('sending')
channel.put('hello')
events.append('sent hello')
channel.put('world')
events.append('sent world')
self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)
def test_channel_wait(self):
channel = eventlet.Queue(0)
events = []
def another_greenlet():
events.append('sending hello')
channel.put('hello')
events.append('sending world')
channel.put('world')
events.append('sent world')
eventlet.spawn(another_greenlet)
events.append('waiting')
events.append(channel.get())
events.append(channel.get())
self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
eventlet.sleep(0)
self.assertEqual(
['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)
def test_channel_waiters(self):
c = eventlet.Queue(0)
w1 = eventlet.spawn(c.get)
w2 = eventlet.spawn(c.get)
w3 = eventlet.spawn(c.get)
eventlet.sleep(0)
self.assertEqual(c.getting(), 3)
s1 = eventlet.spawn(c.put, 1)
s2 = eventlet.spawn(c.put, 2)
s3 = eventlet.spawn(c.put, 3)
s1.wait()
s2.wait()
s3.wait()
self.assertEqual(c.getting(), 0)
# NOTE: we don't guarantee that waiters are served in order
results = sorted([w1.wait(), w2.wait(), w3.wait()])
self.assertEqual(results, [1, 2, 3])
def test_channel_sender_timing_out(self):
c = eventlet.Queue(0)
self.assertRaises(queue.Full, c.put, "hi", timeout=0.001)
self.assertRaises(queue.Empty, c.get_nowait)
def test_task_done(self):
channel = queue.Queue(0)
X = object()
gt = eventlet.spawn(channel.put, X)
result = channel.get()
assert result is X, (result, X)
assert channel.unfinished_tasks == 1, channel.unfinished_tasks
channel.task_done()
assert channel.unfinished_tasks == 0, channel.unfinished_tasks
gt.wait()
def test_join_doesnt_block_when_queue_is_already_empty(self):
queue = eventlet.Queue()
queue.join()
def store_result(result, func, *args):
try:
result.append(func(*args))
except Exception as exc:
result.append(exc)
class TestNoWait(LimitedTestCase):
def test_put_nowait_simple(self):
hub = hubs.get_hub()
result = []
q = eventlet.Queue(1)
hub.schedule_call_global(0, store_result, result, q.put_nowait, 2)
hub.schedule_call_global(0, store_result, result, q.put_nowait, 3)
eventlet.sleep(0)
eventlet.sleep(0)
assert len(result) == 2, result
assert result[0] is None, result
assert isinstance(result[1], queue.Full), result
def test_get_nowait_simple(self):
hub = hubs.get_hub()
result = []
q = queue.Queue(1)
q.put(4)
hub.schedule_call_global(0, store_result, result, q.get_nowait)
hub.schedule_call_global(0, store_result, result, q.get_nowait)
eventlet.sleep(0)
assert len(result) == 2, result
assert result[0] == 4, result
assert isinstance(result[1], queue.Empty), result
# get_nowait must work from the mainloop
def test_get_nowait_unlock(self):
hub = hubs.get_hub()
result = []
q = queue.Queue(0)
p = eventlet.spawn(q.put, 5)
assert q.empty(), q
assert q.full(), q
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
hub.schedule_call_global(0, store_result, result, q.get_nowait)
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
assert result == [5], result
# TODO add ready to greenthread
# assert p.ready(), p
assert p.dead, p
assert q.empty(), q
# put_nowait must work from the mainloop
def test_put_nowait_unlock(self):
hub = hubs.get_hub()
result = []
q = queue.Queue(0)
eventlet.spawn(q.get)
assert q.empty(), q
assert q.full(), q
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
hub.schedule_call_global(0, store_result, result, q.put_nowait, 10)
# TODO ready method on greenthread
# assert not p.ready(), p
eventlet.sleep(0)
assert result == [None], result
# TODO ready method
# assert p.ready(), p
assert q.full(), q
assert q.empty(), q
if __name__ == '__main__':
main()