You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
3.2 KiB

from threading import Timer, Thread
import time
from hold_lock import HoldLock
def test_basic_hold_release():
lock = HoldLock()
lock.wait()
lock.hold()
assert lock.holders == (HoldLock.AnonHolder,)
lock.hold("hold1")
assert lock.holders == (HoldLock.AnonHolder, "hold1")
assert not lock.wait(0.1)
lock.release("hold1")
lock.release()
assert lock.wait()
holder = lock.hold("hold1")
holder.release()
assert lock.wait()
def test_thread_release():
lock = HoldLock()
lock.hold()
Timer(0.1, lambda: lock.release()).start()
assert lock.wait(1)
def test_context():
lock = HoldLock()
def with_func(hold_lock):
with hold_lock:
time.sleep(0.2)
Thread(target=with_func, args=[lock]).start()
assert not lock.wait(0.1)
assert lock.wait(1)
def test_hold_context():
lock = HoldLock()
def with_func(hold_lock):
with hold_lock.hold("hold1"):
time.sleep(0.2)
Thread(target=with_func, args=[lock]).start()
assert lock.holders == ("hold1",)
assert lock.wait()
def test_hold_timeout():
lock = HoldLock()
lock.hold(timeout=0.1)
assert lock.wait()
def test_release_expired_holder():
lock = HoldLock()
lock.hold(timeout=0.1)
assert lock.wait()
lock.release()
assert lock.wait()
with lock.hold(timeout=0.1):
assert lock.wait()
def test_waiting_for():
lock = HoldLock()
# Wait with no holders
res = []
for holders in lock.waiting_for():
res.append(holders)
assert res == [()]
# Release holders while waiting
lock.hold("hold1")
lock.hold("hold2")
Timer(0.1, lambda: lock.release("hold1")).start()
Timer(0.2, lambda: lock.release("hold2")).start()
res = []
for holders in lock.waiting_for():
res.append(holders)
assert res == [("hold1", "hold2"), ("hold2",), ()]
# Release holders during the yield
lock.hold("hold1")
lock.hold("hold2")
Timer(0.1, lambda: lock.release("hold2")).start()
res = []
for loopnum, holders in enumerate(lock.waiting_for()):
res.append(holders)
if loopnum == 0:
lock.release("hold1")
assert res == [("hold1", "hold2"), ("hold2",), ()]
def test_waiting_for_timeout():
lock = HoldLock()
lock.hold("hold1")
res = []
for holders in lock.waiting_for(timeout=0.1):
res.append(holders)
assert res == [("hold1",)]
def test_waiting_for_timeout_during_yield():
lock = HoldLock()
lock.hold("hold1")
res = []
for holders in lock.waiting_for(timeout=0.1):
res.append(holders)
time.sleep(0.2)
assert res == [("hold1",)]
def test_waiting_for_hold_timeouts():
lock = HoldLock()
lock.hold("hold1", timeout=0.1)
lock.hold("hold2", timeout=0.2)
res = []
for holders in lock.waiting_for():
res.append(holders)
assert res == [("hold1", "hold2"), ("hold2",), ()]
def test_waiting_for_hold_timeout_during_yield():
lock = HoldLock()
lock.hold("hold1", timeout=0.1)
lock.hold("hold2", timeout=0.3)
res = []
for holders in lock.waiting_for():
res.append(holders)
time.sleep(0.2)
assert res == [("hold1", "hold2"), ("hold2",), ()]