from aiofence import on_timeout, on_event
# Timeout — suppress and inspect
with on_timeout(5).move_on_cancel() as fence:
await work()
if fence.cancelled:
return fallback()
# Timeout — raise on cancel
with on_timeout(5).raise_on_cancel() as fence:
await work()
# raises FenceCancelled if timed out
# Event — cancel when shutdown is signalled
shutdown = asyncio.Event()
with on_event(shutdown, code="shutdown").move_on_cancel() as fence:
await work()Every cancellation source is a trigger. You declare triggers once at the boundary using a Fencing builder, then materialize them into a context manager. Inside the block, code runs normally — no need to thread events, flags, or tokens through call signatures.
After the block, inspect fence.cancelled, fence.cancel_reasons, or fence.cancelled_by(code) to decide what to do.
Use the factory functions — each returns a Fencing builder:
| Factory | Condition |
|---|---|
on_timeout(delay, *, code=None) |
Relative timeout in seconds |
on_deadline(when, *, code=None) |
Absolute monotonic time (loop.time() based) |
on_event(event, *, code=None) |
Cancel when asyncio.Event is set |
The code parameter is an optional machine-readable identifier. Use it to distinguish which trigger fired via fence.cancelled_by(code). Works well with StrEnum for type safety.
Fencing is immutable — every method returns a new instance. Chain freely:
fencing = (
on_timeout(30, code="budget")
.event(shutdown, code="shutdown")
.event(disconnect, code="disconnect")
)
with fencing.move_on_cancel() as fence:
await work()Available builder methods:
| Method | Description |
|---|---|
.timeout(delay, *, code=None) |
Add a relative timeout |
.deadline(when, *, code=None) |
Add an absolute deadline (loop.time() based) |
.event(event, *, code=None) |
Add an event condition |
Time-based conditions are merged — the tightest constraint wins:
ctx = on_timeout(30).timeout(5, code="db")
# 30s vs 5s → 5s wins, code="db"
ctx = on_deadline(T + 20, code="sla").timeout(5, code="db")
# T+20 vs now+5 → minimum wins.timeout() eagerly resolves to an absolute deadline, making the Fencing one-shot (raises on reuse). Use .deadline() for reusable configs.
Events are never merged — all arm independently.
Two modes, both yield a Fence:
with on_timeout(5).move_on_cancel() as fence:
await work()
if fence.cancelled:
print(fence.cancel_reasons) # why were we cancelled?CancelledError is suppressed. Code after the with block always runs. Check fence.cancelled to decide what to do.
try:
with on_timeout(5).raise_on_cancel() as fence:
await work()
except FenceCancelled as e:
print(e.cancel_reasons)
print(e.cancelled_by("shutdown"))CancelledError is still suppressed inside the block, but FenceCancelled (a regular Exception, not CancelledError) is raised after exit. Safe to use inside TaskGroup.
After the block, the Fence has:
| Property / Method | Type | Description |
|---|---|---|
fence.cancelled |
bool |
True if any trigger fired |
fence.suppressed |
bool |
True if CancelledError was caught and suppressed |
fence.cancel_reasons |
tuple[CancelReason, ...] |
All reasons that fired |
fence.cancelled_by(code) |
bool |
Did a specific trigger fire? |
Most code should use cancelled — it tells you whether a condition was met. suppressed differs only when a trigger fires but the body completes synchronously before CancelledError is delivered (pre-triggered sync body). In that case cancelled is True but suppressed is False.
Each CancelReason has:
| Field | Type | Description |
|---|---|---|
message |
str |
Human-readable (e.g. "timed out after 5s") |
cancel_type |
CancelType |
TIMEOUT or EVENT |
code |
str | None |
Machine-readable identifier |
Unlike asyncio.timeout, cancellation state is available immediately:
with on_timeout(5).move_on_cancel() as fence:
if fence.cancelled:
return fallback()
await work()# Middleware: set request budget
ctx = on_deadline(loop.time() + 30, code="request")
# Handler: add shutdown listener
ctx = ctx.event(shutdown, code="shutdown")
# Inner code: per-operation timeout
with ctx.timeout(5, code="db").move_on_cancel() as fence:
await query_db()
if fence.cancelled_by("db"):
return cached_result
elif fence.cancelled_by("shutdown"):
return graceful_shutdown()Fencing builders that use only .deadline() and .event() are reusable — each move_on_cancel() / raise_on_cancel() creates a fresh Fence:
ctx = on_deadline(loop.time() + 30)
with ctx.move_on_cancel() as f1:
await op_a()
with ctx.move_on_cancel() as f2:
await op_b()Note: .timeout() anchors the builder to a point in time, making it one-shot. Reusing an anchored Fencing raises RuntimeError. Call .timeout() fresh each time instead.
with (
on_timeout(30, code="timeout")
.event(shutdown, code="shutdown")
.move_on_cancel()
) as fence:
await call_external()
if fence.cancelled_by("timeout"):
log("slow response")
elif fence.cancelled_by("shutdown"):
log("shutting down")bind_fencing() stores a Fencing in a ContextVar, so inner code can access it via get_current_fencing() without passing it through every call signature.
from aiofence import Fencing, bind_fencing, get_current_fencing, on_event
# Boundary: declare the rules
fencing = on_event(disconnect, code="disconnect").timeout(30)
with bind_fencing(fencing):
await handle_request()
# Deep inside: read and use
async def process():
with get_current_fencing().move_on_cancel() as fence:
await do_work()
# Or extend with local concerns:
async def process_with_extra():
with get_current_fencing().event(other_event).move_on_cancel() as fence:
await do_work()bind_fencing()only stores config — it does not create a Fence.move_on_cancel()/raise_on_cancel()materialize Fences from it.- Token-based set/reset — nesting works naturally. Inner
bind_fencing()overrides, outer is restored on exit. - Task inheritance —
asyncio.create_task()copies theContextVarautomatically. Child tasks inherit the boundary's config without affecting the parent. get_current_fencing()with no context — returns an emptyFencing(), so chaining always works:get_current_fencing().timeout(5).
Fence is the underlying context manager. Use it directly when you need full control over trigger instances:
from aiofence import Fence, TimeoutTrigger, EventTrigger
with Fence(TimeoutTrigger(5), EventTrigger(shutdown, code="shutdown")) as fence:
await work()Fence always suppresses CancelledError. It doesn't raise FenceCancelled — for that, use Fencing.raise_on_cancel().
aiofence.contrib.starlette provides a FastAPI dependency that cancels the current Fencing when the client disconnects.
from fastapi import Depends, FastAPI
from aiofence import Fencing
from aiofence.contrib.starlette import disconnect_fencing
app = FastAPI()
@app.get("/work")
async def handler(fencing: Fencing = Depends(disconnect_fencing)):
with fencing.move_on_cancel() as fence:
await long_work()
if fence.cancelled_by("disconnect"):
return Response(status_code=499)disconnect_fencing does three things:
- Creates an
asyncio.Eventthat fires onhttp.disconnect - Adds it to
get_current_fencing()withcode="disconnect"(or a custom code) - Binds the result as the active
Fencingcontext viabind_fencing()
The returned Fencing inherits from the current context, so you can chain additional triggers:
@app.get("/work")
async def handler(fencing: Fencing = Depends(disconnect_fencing)):
with fencing.timeout(30, code="budget").move_on_cancel() as fence:
await long_work()
if fence.cancelled_by("budget"):
return cached_result
elif fence.cancelled_by("disconnect"):
return Response(status_code=499)Inner code can also access the disconnect trigger via get_current_fencing():
@app.get("/work")
async def handler(fencing: Fencing = Depends(disconnect_fencing)):
await process()
async def process():
with get_current_fencing().move_on_cancel() as fence:
await do_work() # cancelled if client disconnectsasync def handler(
fencing: Fencing = Depends(lambda r: disconnect_fencing(r, code="client_gone")),
):
...