Async Correctness and Concurrency in FastAPI
Async correctness is the practice of keeping FastAPI's single-threaded event loop unblocked — choosing async def versus def deliberately, never running synchronous blocking work on the loop, and bounding concurrency so the loop's freedom does not overwhelm downstreams.
This is the foundation of Async, Background Tasks and Observability. Every other topic depends on it: async database sessions exist to keep the most common blocking call off the loop, and background task processing moves slow work off the request entirely.
Core Mechanics: One Loop per Worker
A FastAPI worker runs one event loop on one thread. Concurrency comes from coroutines voluntarily yielding at await points so the loop can advance another. A synchronous call never yields, so while it runs the loop is frozen and no other request progresses. This is why the async def versus def choice is about correctness, not style.
import httpx
@app.get("/upstream")
async def upstream() -> dict:
# Correct: an async client yields at every await, keeping the loop free.
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.example.com/data", timeout=5.0)
return resp.json()
If a handler is genuinely synchronous, declaring it def is safe — FastAPI runs def handlers in a thread pool, so their blocking does not touch the loop. The trap is the hybrid: an async def handler that calls blocking code.
Production Implementation: Offloading Blocking Work
When you must call blocking code from an async handler — a legacy sync driver, a CPU-bound computation — offload it. Use a thread for blocking I/O and a process for CPU-bound work.
import anyio
@app.post("/thumbnail")
async def thumbnail(image: bytes) -> dict[str, int]:
# CPU-bound resize offloaded so the loop is not frozen during it.
size = await anyio.to_thread.run_sync(resize_image, image)
return {"bytes": size}
Controlling Concurrency
An unblocked loop can issue many concurrent operations, which can overwhelm a downstream. Bound it with a semaphore sized to the dependency's capacity.
import asyncio
# At most 10 concurrent calls to the downstream, regardless of request volume.
_limit = asyncio.Semaphore(10)
async def fetch(client: httpx.AsyncClient, url: str) -> bytes:
async with _limit:
return (await client.get(url)).content
Async and Performance Notes
The thread pool that runs def handlers and to_thread work is finite; saturating it with long blocking calls reintroduces queuing. Size CPU-bound work to a process pool, keep thread-offloaded work I/O-bound, and prefer truly async libraries on the hot path so offloading is the exception, not the rule.
Testing Strategy
Detect accidental blocking by asserting that concurrent requests actually overlap:
import asyncio
import time
import httpx
async def test_requests_run_concurrently(app):
# Two 100ms async endpoints should finish in ~100ms, not ~200ms, if non-blocking.
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://t") as c:
start = time.perf_counter()
await asyncio.gather(c.get("/slow"), c.get("/slow"))
assert time.perf_counter() - start < 0.18
Failure Modes and Debugging
- Hidden blocking in libraries. A dependency may block internally; profile and offload it.
- Thread-pool exhaustion. Too many long
deforto_threadcalls starve the pool; move CPU work to processes. - Unbounded fan-out.
asyncio.gatherover thousands of tasks opens thousands of connections; bound with a semaphore. - Sync database drivers. The classic culprit; switch to async sessions, covered in Async Database Sessions.
Related Reading
- Up to the section: Async, Background Tasks and Observability.
- Composes with: Async Database Sessions and Background Task Processing.