Async Database Sessions in FastAPI
Async database sessions are how FastAPI talks to a database without blocking the event loop: a shared async engine owns the connection pool, and each request draws its own session from it through a yield dependency.
This topic is part of Async, Background Tasks and Observability and is where async correctness most often succeeds or fails, since a synchronous driver is the most common loop-blocking mistake. It extends the session pattern from Dependency Injection Strategies.
Core Mechanics: Engine in Lifespan, Session per Request
The engine and its pool are long-lived and belong in the lifespan. Sessions are per-request and belong in a yield dependency, so each request gets an isolated transaction that always cleans up.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
@asynccontextmanager
async def lifespan(app: FastAPI):
engine = create_async_engine(app.state.settings.database_url,
pool_size=10, max_overflow=5)
app.state.session_factory = async_sessionmaker(engine, expire_on_commit=False)
yield
await engine.dispose() # Release the pool on shutdown.
Production Implementation: The Session Dependency
The yield dependency commits on success, rolls back on error, and always returns the connection — the contract that prevents leaks.
from collections.abc import AsyncGenerator
from typing import Annotated
from fastapi import Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession
async def get_session(request: Request) -> AsyncGenerator[AsyncSession, None]:
async with request.app.state.session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback() # Always runs on failure.
raise
SessionDep = Annotated[AsyncSession, Depends(get_session)]
The deep dive on pool exhaustion and its fixes is in Fixing asyncpg Connection Pool Exhaustion.
Async and Performance Notes
Every database call must be awaited so it yields the loop; a synchronous driver here blocks every request on the worker. Pool size sets your per-worker concurrency ceiling: pool_size + max_overflow is the most simultaneous in-flight queries one worker can hold, and across all workers that total must stay under the database's connection limit. Keep transactions short so connections return to the pool quickly.
Testing Strategy
Use an isolated database per test and assert rollback on error:
async def test_rollback_on_error(session):
with pytest.raises(ValueError):
async with session.begin():
await session.execute(insert_user("ada"))
raise ValueError("boom") # Forces rollback.
assert await count_users(session) == 0 # Nothing persisted.
Failure Modes and Debugging
- Pool exhaustion. Sessions not returned, or slow requests holding connections, drain the pool; use yield scoping and short transactions.
- Sync driver on the loop. Blocks everything; use an async driver such as asyncpg.
- Oversized pools.
pool_sizetimes workers exceeding the database limit causes connection refusals; size to fit. - Shared sessions. Reusing one session across requests corrupts transaction state; one session per request.
Related Reading
- Up to the section: Async, Background Tasks and Observability.
- Hands-on guide: Fixing asyncpg Connection Pool Exhaustion.
- Composes with: Async Correctness and Concurrency and Dependency Injection Strategies.