You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
141 lines
3.2 KiB
141 lines
3.2 KiB
from threading import Timer, Thread
|
|
import time
|
|
|
|
from hold_lock import HoldLock
|
|
|
|
|
|
def test_basic_hold_release():
|
|
lock = HoldLock()
|
|
lock.wait()
|
|
lock.hold()
|
|
assert lock.holders == (HoldLock.AnonHolder,)
|
|
lock.hold("hold1")
|
|
assert lock.holders == (HoldLock.AnonHolder, "hold1")
|
|
|
|
assert not lock.wait(0.1)
|
|
|
|
lock.release("hold1")
|
|
lock.release()
|
|
assert lock.wait()
|
|
|
|
holder = lock.hold("hold1")
|
|
holder.release()
|
|
assert lock.wait()
|
|
|
|
|
|
def test_thread_release():
|
|
lock = HoldLock()
|
|
lock.hold()
|
|
Timer(0.1, lambda: lock.release()).start()
|
|
assert lock.wait(1)
|
|
|
|
|
|
def test_context():
|
|
lock = HoldLock()
|
|
|
|
def with_func(hold_lock):
|
|
with hold_lock:
|
|
time.sleep(0.2)
|
|
Thread(target=with_func, args=[lock]).start()
|
|
assert not lock.wait(0.1)
|
|
assert lock.wait(1)
|
|
|
|
|
|
def test_hold_context():
|
|
lock = HoldLock()
|
|
|
|
def with_func(hold_lock):
|
|
with hold_lock.hold("hold1"):
|
|
time.sleep(0.2)
|
|
Thread(target=with_func, args=[lock]).start()
|
|
assert lock.holders == ("hold1",)
|
|
assert lock.wait()
|
|
|
|
|
|
def test_hold_timeout():
|
|
lock = HoldLock()
|
|
lock.hold(timeout=0.1)
|
|
assert lock.wait()
|
|
|
|
|
|
def test_release_expired_holder():
|
|
lock = HoldLock()
|
|
lock.hold(timeout=0.1)
|
|
assert lock.wait()
|
|
lock.release()
|
|
assert lock.wait()
|
|
|
|
with lock.hold(timeout=0.1):
|
|
assert lock.wait()
|
|
|
|
|
|
def test_waiting_for():
|
|
lock = HoldLock()
|
|
|
|
# Wait with no holders
|
|
res = []
|
|
for holders in lock.waiting_for():
|
|
res.append(holders)
|
|
assert res == [()]
|
|
|
|
# Release holders while waiting
|
|
lock.hold("hold1")
|
|
lock.hold("hold2")
|
|
Timer(0.1, lambda: lock.release("hold1")).start()
|
|
Timer(0.2, lambda: lock.release("hold2")).start()
|
|
res = []
|
|
for holders in lock.waiting_for():
|
|
res.append(holders)
|
|
assert res == [("hold1", "hold2"), ("hold2",), ()]
|
|
|
|
# Release holders during the yield
|
|
lock.hold("hold1")
|
|
lock.hold("hold2")
|
|
Timer(0.1, lambda: lock.release("hold2")).start()
|
|
res = []
|
|
for loopnum, holders in enumerate(lock.waiting_for()):
|
|
res.append(holders)
|
|
if loopnum == 0:
|
|
lock.release("hold1")
|
|
assert res == [("hold1", "hold2"), ("hold2",), ()]
|
|
|
|
|
|
def test_waiting_for_timeout():
|
|
lock = HoldLock()
|
|
lock.hold("hold1")
|
|
res = []
|
|
for holders in lock.waiting_for(timeout=0.1):
|
|
res.append(holders)
|
|
assert res == [("hold1",)]
|
|
|
|
|
|
def test_waiting_for_timeout_during_yield():
|
|
lock = HoldLock()
|
|
lock.hold("hold1")
|
|
res = []
|
|
for holders in lock.waiting_for(timeout=0.1):
|
|
res.append(holders)
|
|
time.sleep(0.2)
|
|
assert res == [("hold1",)]
|
|
|
|
|
|
def test_waiting_for_hold_timeouts():
|
|
lock = HoldLock()
|
|
lock.hold("hold1", timeout=0.1)
|
|
lock.hold("hold2", timeout=0.2)
|
|
res = []
|
|
for holders in lock.waiting_for():
|
|
res.append(holders)
|
|
assert res == [("hold1", "hold2"), ("hold2",), ()]
|
|
|
|
|
|
def test_waiting_for_hold_timeout_during_yield():
|
|
lock = HoldLock()
|
|
lock.hold("hold1", timeout=0.1)
|
|
lock.hold("hold2", timeout=0.3)
|
|
res = []
|
|
for holders in lock.waiting_for():
|
|
res.append(holders)
|
|
time.sleep(0.2)
|
|
assert res == [("hold1", "hold2"), ("hold2",), ()]
|