Fixing Blocking Calls in Async FastAPI Routes

Key takeaways:

  • The symptom is latency that climbs with concurrency on an endpoint that should be fast.
  • The cause is synchronous I/O or CPU work inside an async def handler.
  • Offload blocking I/O with run_in_threadpool or anyio.to_thread.run_sync.
  • Send CPU-bound work to a process pool, since threads do not help under the GIL.
  • Verify by load testing that concurrent requests overlap.

This is the debugging companion to Async Correctness and Concurrency. Read that page for why one blocked coroutine stalls the whole worker.

The Problem This Solves

An endpoint works in development and falls over under load. The cause is almost always a blocking call hidden in an async def: the loop cannot advance other requests while that call runs, so throughput collapses exactly when you need it. This guide finds and fixes it.

Prerequisites

  • A FastAPI app exhibiting latency that grows with concurrency.
  • A load-testing tool and, optionally, asyncio debug mode enabled.

Step-by-Step Implementation

1. Reproduce and confirm

# If p95 latency scales with -c (concurrency), the loop is being blocked.
hey -z 10s -c 1 http://localhost:8000/slow    # baseline
hey -z 10s -c 50 http://localhost:8000/slow   # compare p95

2. Find the blocking line

# Symptoms to grep for inside async def handlers:
#   requests.get(...)        → blocking HTTP
#   sync_session.execute()   → blocking DB driver
#   time.sleep(...)          → blocking sleep
#   heavy_pure_python_loop() → CPU-bound

3. Offload blocking I/O

from starlette.concurrency import run_in_threadpool


@app.get("/external")
async def external() -> dict:
    # The sync client now runs in a worker thread; the loop keeps serving others.
    data = await run_in_threadpool(legacy_sync_client.fetch, "/resource")
    return {"data": data}

4. Offload CPU-bound work to a process

import asyncio
from concurrent.futures import ProcessPoolExecutor

_pool = ProcessPoolExecutor()


@app.post("/encode")
async def encode(payload: bytes) -> dict:
    loop = asyncio.get_running_loop()
    # CPU-bound work in a separate process sidesteps the GIL and the loop.
    result = await loop.run_in_executor(_pool, cpu_encode, payload)
    return {"size": len(result)}

Edge Cases and Gotchas

  • asyncio.sleep vs time.sleep. Use await asyncio.sleep(); time.sleep() blocks the loop.
  • Sync middleware. Blocking work in middleware blocks every request; offload there too.
  • Pickling for processes. ProcessPoolExecutor pickles arguments and results; pass simple, picklable data.

Verification

import asyncio
import time

import httpx


async def test_no_longer_blocking(app):
    transport = httpx.ASGITransport(app=app)
    async with httpx.AsyncClient(transport=transport, base_url="http://t") as c:
        start = time.perf_counter()
        await asyncio.gather(*[c.get("/external") for _ in range(10)])
        # Ten concurrent calls should overlap, not run end to end.
        assert time.perf_counter() - start < 1.0