from threading import Timer import time from hold_lock import HoldLock def test_basic_hold_release(): lock = HoldLock() lock.wait() lock.hold() assert lock.holders == (None,) lock.hold("hold1") assert lock.holders == (None, "hold1") assert not lock.wait(0.1) lock.release("hold1") lock.release() assert lock.wait() def test_thread_release(): lock = HoldLock() lock.hold() Timer(0.1, lambda: lock.release()).start() assert lock.wait(1) def test_hold_timeout(): lock = HoldLock() lock.hold(timeout=0.1) assert lock.wait() def test_waiting_for(): lock = HoldLock() # Wait with no holders res = [] for holders in lock.waiting_for(): res.append(holders) assert res == [()] # Release holders while waiting lock.hold("hold1") lock.hold("hold2") Timer(0.1, lambda: lock.release("hold1")).start() Timer(0.2, lambda: lock.release("hold2")).start() res = [] for holders in lock.waiting_for(): res.append(holders) assert res == [("hold1", "hold2"), ("hold2",), ()] # Release holders during the yield lock.hold("hold1") lock.hold("hold2") Timer(0.1, lambda: lock.release("hold2")).start() res = [] for loopnum, holders in enumerate(lock.waiting_for()): res.append(holders) if loopnum == 0: lock.release("hold1") assert res == [("hold1", "hold2"), ("hold2",), ()] def test_waiting_for_timeout(): lock = HoldLock() lock.hold("hold1") res = [] for holders in lock.waiting_for(timeout=0.1): res.append(holders) assert res == [("hold1",)] def test_waiting_for_timeout_during_yield(): lock = HoldLock() lock.hold("hold1") res = [] for holders in lock.waiting_for(timeout=0.1): res.append(holders) time.sleep(0.2) assert res == [("hold1",)] def test_waiting_for_hold_timeouts(): lock = HoldLock() lock.hold("hold1", timeout=0.1) lock.hold("hold2", timeout=0.2) res = [] for holders in lock.waiting_for(): res.append(holders) assert res == [("hold1", "hold2"), ("hold2",), ()] def test_waiting_for_hold_timeout_during_yield(): lock = HoldLock() lock.hold("hold1", timeout=0.1) lock.hold("hold2", timeout=0.3) res = [] for holders in lock.waiting_for(): res.append(holders) time.sleep(0.2) assert res == [("hold1", "hold2"), ("hold2",), ()]