deb-python-eventlet/tests/semaphore_test.py

74 lines
1.9 KiB
Python

import time
import eventlet
import tests
class TestSemaphore(tests.LimitedTestCase):
def test_bounded(self):
sem = eventlet.CappedSemaphore(2, limit=3)
self.assertEqual(sem.acquire(), True)
self.assertEqual(sem.acquire(), True)
gt1 = eventlet.spawn(sem.release)
self.assertEqual(sem.acquire(), True)
self.assertEqual(-3, sem.balance)
sem.release()
sem.release()
sem.release()
gt2 = eventlet.spawn(sem.acquire)
sem.release()
self.assertEqual(3, sem.balance)
gt1.wait()
gt2.wait()
def test_bounded_with_zero_limit(self):
sem = eventlet.CappedSemaphore(0, 0)
gt = eventlet.spawn(sem.acquire)
sem.release()
gt.wait()
def test_non_blocking(self):
sem = eventlet.Semaphore(0)
self.assertEqual(sem.acquire(blocking=False), False)
def test_timeout(self):
sem = eventlet.Semaphore(0)
start = time.time()
self.assertEqual(sem.acquire(timeout=0.1), False)
self.assertTrue(time.time() - start >= 0.1)
def test_timeout_non_blocking(self):
sem = eventlet.Semaphore()
self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
def test_semaphore_contention():
g_mutex = eventlet.Semaphore()
counts = [0, 0]
def worker(no):
while min(counts) < 200:
with g_mutex:
counts[no - 1] += 1
eventlet.sleep(0.001)
t1 = eventlet.spawn(worker, no=1)
t2 = eventlet.spawn(worker, no=2)
eventlet.sleep(0.5)
t1.kill()
t2.kill()
assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
def test_semaphore_type_check():
eventlet.Semaphore(0)
eventlet.Semaphore(1)
eventlet.Semaphore(1e2)
with tests.assert_raises(TypeError):
eventlet.Semaphore('foo')
with tests.assert_raises(ValueError):
eventlet.Semaphore(-1)