asyncio. The standard library gives you asyncio.Event And asyncio.ConditionBut each has a difference that is visible only in the actual concurrent pressure. We hit this when building Ingest’s Python SDK, where multiple async handlers coordinate around WebSocket connection state.
This post works through each primitive, shows where it breaks, and iterates toward a solution that handles every case we throw at it.
landscape
Imagine an async Python app managing a connection that moves through states:
disconnected → connecting → connected → closing → closed
One of your concurrent handlers needs to eliminate pending requests when the connection starts to close. will have to wait for it closing State:
state = "disconnected"
async def drain_requests():
...
print("draining pending requests")
Quite simple. Let’s see how each stdlib tool handles this.
Attempt 1: Voting
The most obvious approach: check the value in a loop.
async def drain_requests():
while state != "closing":
await asyncio.sleep(0.1)
print("draining pending requests")
This is correct. But the tradeoffs are bad:
- latency vs efficiency: Short sleep intervals waste CPU cycles. Adds long latency. There is no good value.
- copy:Each consumer reimplements the same polling loop with the same tradeoffs.
- No event-driven wake: The consumer moves on whether anything has changed or not.
We can do better. what we really want sleep until position changesDon’t sleep and check for an arbitrary period of time.
Attempt 2: asyncio.Event
asyncio.Event stdlib’s answer to “wake me up when something happens” is:
closing_event = asyncio.Event()
async def drain_requests():
await closing_event.wait()
print("draining pending requests")
No voting, no wasted bicycles. The handler blocks until the event fires. But Event is Boolean: It is either set or unset. There are five stages in our connection, and drain_requests Only one of them cares. What happens when the other handler has to wait connected? You need another event. Is there a third handler waiting for “not disconnected”? Third incident with reverse logic. All this the setter needs to know:
closing_event = asyncio.Event()
connected_event = asyncio.Event()
async def set_state(new_state):
global state
state = new_state
if new_state == "closing":
closing_event.set()
if new_state == "connected":
connected_event.set()
Every new situation requires another Event object. Coordination between events is where the bugs live. forget it a set() Or clear() Call and the consumer is blocked forever.
Attempt 3: asyncio.Condition
asyncio.Condition Allows consumers to wait on arbitrary predicates:
state = "disconnected"
condition = asyncio.Condition()
async def drain_requests():
async with condition:
await condition.wait_for(lambda: state == "closing")
print("draining pending requests")
One coordinate point, arbitrary predicate, no propagation Event objects. this is much better.
But it breaks down into a general pattern.
lost update
Condition is designed to check current Value happens when a consumer wakes up. This is fine when the state simply moves forward, but it disintegrates when changes are rapid. When the setter changes state, it calls notify_all()Which schedules the wakeup for each waiting consumer. But in a single-threaded event loop, no consumer actually runs until the current coroutine is generated. If the price changes again before this happens, consumers wake up and reevaluate their forecast against it current value, not the value that triggered the notification. The predicate fails and the consumer goes to sleep, possibly forever.
await set_state("closing")
await set_state("closed")
Here is a playable reproduction:
import asyncio
state = "disconnected"
condition = asyncio.Condition()
async def set_state(new_state):
global state
async with condition:
state = new_state
condition.notify_all()
async def drain_requests():
async with condition:
await condition.wait_for(lambda: state == "closing")
print("draining pending requests")
async def main():
task = asyncio.create_task(drain_requests())
await asyncio.sleep(0)
await set_state("closing")
await set_state("closed")
await asyncio.wait_for(task, timeout=1.0)
asyncio.run(main())
Value Was "closing"but according to time drain_requests wakes up and checks, it’s already there "closed". The intermediate state is over.
This is not a hypothetical case. In our SDK’s connection manager, a close signal may arrive and the connection may be closed in the same event loop tick. drain_requests Never runs, and any work disappears during flight.
Solution: Per-Consumer Queues
Instead of waking up consumers and asking “is the current state the one you want?”, buffer each transition in a per-consumer queue. Each consumer deletes its own queue and checks each transition individually. The consumer never misses a situation.
Every consumer registers himself asyncio.Queue. When value changes, push setter (old, new) In each registered queue. Here’s a simplified version that shows the basic idea:
class ValueWatcher:
def __init__(self, initial_value):
self._value = initial_value
self._watch_queues: list[asyncio.Queue] = []
@property
def value(self):
return self._value
@value.setter
def value(self, new_value):
if new_value == self._value:
return
old_value = self._value
self._value = new_value
for queue in self._watch_queues:
queue.put_nowait((old_value, new_value))
async def wait_for(self, target):
queue = asyncio.Queue()
self._watch_queues.append(queue)
try:
if self._value == target:
return
while True:
old, new = await queue.get()
if new == target:
return
finally:
self._watch_queues.remove(queue)
wait_for Registers a queue, checks the current value, then removes transitions until it finds a match. try/finally This ensures that the queue remains unregistered even if the caller cancels.
The queue buffers and delivers each intermediate transition in order, even if the value changes several times before the consumer runs.
prepare it for production
We still need some features to make it production ready. Our final implementation requires the following:
- thread safety: A
threading.LockProtects the value and queue list. Each queue is associated with its event loop, and the setter usesloop.call_soon_threadsafeinstead ofput_nowaitdirectly. - nuclear registration: :
wait_forChecks the current value and registers the queue inside the same lock acquisition, eliminating the race where a transition can slip between registration and initial check. - full normal typing: :
Generic[T]End-to-end, so predicates, queues, and return values are all type-checked. - predicate based matching: :
wait_for,wait_for_notAndwait_for_not_noneall routes through a shared_wait_for_condition(predicate)main. - timeout:Each await method accepts an optional
timeoutsupported byasyncio.wait_for. - conditional set: :
set_ifAtomically sets a value only if the current value satisfies a predicate, which is useful for state machine transitions that must only occur from a specific state. - change view: :
wait_for_changeWaits for any transition regardless of value, useful for reacting to logging or state churn. - callback api: :
on_changeAndon_valueFor synchronous consumers with async wait API. - flexible notifications:setter catches
RuntimeError(closed loop) and callbacks suppress exceptions so that a failure does not block other consumers.
The full implementation is about 300 lines, most of which are docstrings and convenience methods built on the same core. Feel free to copy it into your codebase!
full ValueWatcher Source
from __future__ import annotations
import asyncio
import threading
import typing
T = typing.TypeVar("T")
S = typing.TypeVar("S")
class ValueWatcher(typing.Generic[T]):
"""
Thread-safe observable value with async watchers.
Watchers can await value changes via methods like `wait_for` and
`wait_for_change`. Alternatively, they can add callbacks via `on_change` and
`on_value`.
Any thread can set `.value`, and the watcher will react accordingly.
"""
def __init__(
self,
initial_value: T,
*,
on_change: typing.Callable[[T, T], None] | None = None,
) -> None:
"""
Args:
initial_value: The initial value.
on_change: Called when the value changes. Good for debug logging.
"""
self._lock = threading.Lock()
self._on_changes: list[typing.Callable[[T, T], None]] = []
if on_change:
self._on_changes.append(on_change)
self._watch_queues: list[
tuple[asyncio.AbstractEventLoop, asyncio.Queue[tuple[T, T]]]
] = []
self._background_tasks: set[asyncio.Task[T]] = set()
self._value = initial_value
@property
def value(self) -> T:
with self._lock:
return self._value
@value.setter
def value(self, new_value: T) -> None:
with self._lock:
if new_value == self._value:
return
old_value = self._value
self._value = new_value
queues = list(self._watch_queues)
callbacks = list(self._on_changes)
for loop, queue in queues:
try:
loop.call_soon_threadsafe(
queue.put_nowait, (old_value, new_value)
)
except RuntimeError:
pass
for on_change in callbacks:
try:
on_change(old_value, new_value)
except Exception:
pass
def set_if(
self,
new_value: T,
condition: typing.Callable[[T], bool],
) -> bool:
"""
Atomically set the value only if the current value satisfies the
condition. Returns True if the value was set.
"""
with self._lock:
if not condition(self._value):
return False
if new_value == self._value:
return True
old_value = self._value
self._value = new_value
queues = list(self._watch_queues)
callbacks = list(self._on_changes)
for loop, queue in queues:
try:
loop.call_soon_threadsafe(
queue.put_nowait, (old_value, new_value)
)
except RuntimeError:
pass
for on_change in callbacks:
try:
on_change(old_value, new_value)
except Exception:
pass
return True
def on_change(self, callback: typing.Callable[[T, T], None]) -> None:
"""
Add a callback that's called when the value changes.
Args:
callback: Called with (old_value, new_value) on each change.
"""
with self._lock:
self._on_changes.append(callback)
def on_value(self, value: T, callback: typing.Callable[[], None]) -> None:
"""
One-shot callback for when the value equals `value`. Requires a
running event loop (internally spawns a background task).
Args:
value: The value to wait for.
callback: Called when the internal value equals `value`.
"""
task = asyncio.create_task(self.wait_for(value))
self._background_tasks.add(task)
def _done(t: asyncio.Task[T]) -> None:
self._background_tasks.discard(t)
if not t.cancelled() and t.exception() is None:
callback()
task.add_done_callback(_done)
async def wait_for(
self,
value: T,
*,
immediate: bool = True,
timeout: float | None = None,
) -> T:
"""
Wait for the internal value to equal the given value.
Args:
value: Return when the internal value is equal to this.
immediate: If True and the internal value is already equal to the given value, return immediately. Defaults to True.
timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
"""
return await self._wait_for_condition(
lambda v: v == value,
immediate=immediate,
timeout=timeout,
)
async def wait_for_not(
self,
value: T,
*,
immediate: bool = True,
timeout: float | None = None,
) -> T:
"""
Wait for the internal value to not equal the given value.
Args:
value: Return when the internal value is not equal to this.
immediate: If True and the internal value is already not equal to the given value, return immediately. Defaults to True.
timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
"""
return await self._wait_for_condition(
lambda v: v != value,
immediate=immediate,
timeout=timeout,
)
async def wait_for_not_none(
self: ValueWatcher[S | None],
*,
immediate: bool = True,
timeout: float | None = None,
) -> S:
"""
Wait for the internal value to be not None.
Args:
immediate: If True and the internal value is already not None, return immediately. Defaults to True.
timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
"""
result = await self._wait_for_condition(
lambda v: v is not None,
immediate=immediate,
timeout=timeout,
)
if result is None:
raise AssertionError("unreachable")
return result
async def _wait_for_condition(
self,
condition: typing.Callable[[T], bool],
*,
immediate: bool = True,
timeout: float | None = None,
) -> T:
"""
Wait until `condition(current_value)` is true, then return the
matching value. Handles the TOCTOU gap between checking the current
value and subscribing to the change queue.
"""
if immediate:
current = self.value
if condition(current):
return current
async def _wait() -> T:
with self._watch() as queue:
if immediate:
current = self.value
if condition(current):
return current
while True:
_, new = await queue.get()
if condition(new):
return new
return await asyncio.wait_for(_wait(), timeout=timeout)
async def wait_for_change(
self,
*,
timeout: float | None = None,
) -> T:
"""
Wait for the internal value to change.
Args:
timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
"""
async def _wait() -> T:
with self._watch() as queue:
_, new = await queue.get()
return new
return await asyncio.wait_for(_wait(), timeout=timeout)
def _watch(self) -> _WatchContextManager[T]:
"""
Watch for all changes to the value. This method returns a context
manager so it must be used in a `with` statement.
Its return value is a queue that yields tuples of the old and new
values.
"""
loop = asyncio.get_running_loop()
queue = asyncio.Queue[tuple[T, T]]()
with self._lock:
self._watch_queues.append((loop, queue))
return _WatchContextManager(
on_exit=lambda: self._remove_queue(queue),
queue=queue,
)
def _remove_queue(self, queue: asyncio.Queue[tuple[T, T]]) -> None:
"""
Remove a queue from the watch list in a thread-safe manner.
"""
with self._lock:
self._watch_queues = [
entry for entry in self._watch_queues if entry[1] is not queue
]
class _WatchContextManager(typing.Generic[T]):
"""
Context manager that's used to automatically delete a queue when it's no
longer being watched.
Returns a queue that yields tuples of the old and new values.
"""
def __init__(
self,
on_exit: typing.Callable[[], None],
queue: asyncio.Queue[tuple[T, T]],
) -> None:
self._on_exit = on_exit
self._queue = queue
def __enter__(self) -> asyncio.Queue[tuple[T, T]]:
return self._queue
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: object,
) -> None:
self._on_exit()
wait_for_not_none This is especially useful because we like type safety:
await state.wait_for_not("disconnected")
ws_watcher = ValueWatcher[Connection | None](None)
ws: Connection = await ws_watcher.wait_for_not_none()
a warning
Setter duplicates based on equality: if new value == Current value, no notifications are activated. This works well for enums, strings and ints, but changing a mutable object in place and reassigning the same reference will not trigger consumers (because obj == obj is trivial True). Stick to immutable values and it’s nothing to worry about.
wrapping up
The basic insight is simple: asyncio.Condition Asks consumers “is current Explain what you want?” whereas he should have asked “Did the state say ever become What do you want?” Per-consumer queues make this possible by buffering each transition instead of only notifying about the latest one.
we use ValueWatcher Ingest’s Python SDK for coordinating WebSocket connection state, worker lifecycle, and smooth shutdown. If you are managing shared mutable state in asyncio, try this.
<a href