from threading import Timer, Thread import time from hold_lock import HoldLock def test_basic_hold_release(): lock = HoldLock() lock.wait() lock.hold() assert lock.holders == (HoldLock.AnonHolder,) lock.hold("hold1") assert lock.holders == (HoldLock.AnonHolder, "hold1") assert not lock.wait(0.1) lock.release("hold1") lock.release() assert lock.wait() holder = lock.hold("hold1") holder.release() assert lock.wait() def test_thread_release(): lock = HoldLock() lock.hold() Timer(0.1, lambda: lock.release()).start() assert lock.wait(1) def test_context(): lock = HoldLock() def with_func(hold_lock): with hold_lock: time.sleep(0.2) Thread(target=with_func, args=[lock]).start() assert not lock.wait(0.1) assert lock.wait(1) def test_hold_context(): lock = HoldLock() def with_func(hold_lock): with hold_lock.hold("hold1"): time.sleep(0.2) Thread(target=with_func, args=[lock]).start() assert lock.holders == ("hold1",) assert lock.wait() def test_hold_timeout(): lock = HoldLock() lock.hold(timeout=0.1) assert lock.wait() def test_release_expired_holder(): lock = HoldLock() lock.hold(timeout=0.1) assert lock.wait() lock.release() assert lock.wait() with lock.hold(timeout=0.1): assert lock.wait() def test_waiting_for(): lock = HoldLock() # Wait with no holders res = [] for holders in lock.waiting_for(): res.append(holders) assert res == [()] # Release holders while waiting lock.hold("hold1") lock.hold("hold2") Timer(0.1, lambda: lock.release("hold1")).start() Timer(0.2, lambda: lock.release("hold2")).start() res = [] for holders in lock.waiting_for(): res.append(holders) assert res == [("hold1", "hold2"), ("hold2",), ()] # Release holders during the yield lock.hold("hold1") lock.hold("hold2") Timer(0.1, lambda: lock.release("hold2")).start() res = [] for loopnum, holders in enumerate(lock.waiting_for()): res.append(holders) if loopnum == 0: lock.release("hold1") assert res == [("hold1", "hold2"), ("hold2",), ()] def test_waiting_for_timeout(): lock = HoldLock() lock.hold("hold1") res = [] for holders in lock.waiting_for(timeout=0.1): res.append(holders) assert res == [("hold1",)] def test_waiting_for_timeout_during_yield(): lock = HoldLock() lock.hold("hold1") res = [] for holders in lock.waiting_for(timeout=0.1): res.append(holders) time.sleep(0.2) assert res == [("hold1",)] def test_waiting_for_hold_timeouts(): lock = HoldLock() lock.hold("hold1", timeout=0.1) lock.hold("hold2", timeout=0.2) res = [] for holders in lock.waiting_for(): res.append(holders) assert res == [("hold1", "hold2"), ("hold2",), ()] def test_waiting_for_hold_timeout_during_yield(): lock = HoldLock() lock.hold("hold1", timeout=0.1) lock.hold("hold2", timeout=0.3) res = [] for holders in lock.waiting_for(): res.append(holders) time.sleep(0.2) assert res == [("hold1", "hold2"), ("hold2",), ()]