Per-User Token Bucket Throttling in FastAPI
Key takeaways:
- A token bucket allows bursts up to a cap while enforcing a steady average rate.
- Store tokens and a refill timestamp per user in Redis.
- Run refill-and-consume as one atomic Lua script to avoid races.
- Key on the authenticated principal for per-user fairness.
- Return 429 with
Retry-Afterwhen the bucket is empty.
This guide builds the token-bucket algorithm behind Rate Limiting and Throttling, going lower-level than the SlowAPI approach.
The Problem This Solves
A fixed-window limit either rejects legitimate bursts or lets a double-limit slip past window boundaries, and a global limiter punishes users who share an IP. A per-user token bucket absorbs bursts, enforces a fair average rate, and isolates each principal.
Prerequisites
- An authenticated principal available on the request.
- An async Redis client supporting
EVAL/Lua.
Step-by-Step Implementation
1. The atomic refill-and-consume script
-- token_bucket.lua: KEYS[1]=bucket ARGV: capacity, refill_per_sec, now, cost
local data = redis.call('HMGET', KEYS[1], 'tokens', 'ts')
local tokens = tonumber(data[1]) or tonumber(ARGV[1])
local ts = tonumber(data[2]) or tonumber(ARGV[3])
local elapsed = math.max(0, tonumber(ARGV[3]) - ts)
-- Refill proportional to elapsed time, capped at capacity.
tokens = math.min(tonumber(ARGV[1]), tokens + elapsed * tonumber(ARGV[2]))
local allowed = 0
if tokens >= tonumber(ARGV[4]) then
tokens = tokens - tonumber(ARGV[4])
allowed = 1
end
redis.call('HMSET', KEYS[1], 'tokens', tokens, 'ts', ARGV[3])
redis.call('EXPIRE', KEYS[1], 3600)
return { allowed, tokens }
2. The FastAPI dependency
import time
from typing import Annotated
from fastapi import Depends, HTTPException, Request
# Loaded once; reused for every check.
_SCRIPT = open("token_bucket.lua").read()
def throttle(capacity: int, refill_per_sec: float, cost: int = 1):
async def _dep(request: Request, user: Annotated[dict, Depends(current_user)]):
redis = request.app.state.redis
allowed, _ = await redis.eval(
_SCRIPT, 1, f"tb:{user['sub']}", # Per-principal bucket key.
capacity, refill_per_sec, time.time(), cost,
)
if not allowed:
raise HTTPException(429, "rate limit exceeded",
headers={"Retry-After": "1"})
return _dep
3. Apply to a route
# 20-token bucket refilling at 5 tokens/sec → bursts of 20, ~5 rps sustained.
@router.get("/search", dependencies=[Depends(throttle(capacity=20, refill_per_sec=5))])
async def search(q: str) -> dict:
return await run_search(q)
Edge Cases and Gotchas
- Clock source. Pass a server timestamp into the script; relying on multiple app clocks causes drift. Using Redis
TIMEinside the script is even safer. - Cost-weighted requests. Charge expensive endpoints more tokens by raising
cost. - Cold buckets. A first request initializes a full bucket; ensure capacity reflects an acceptable initial burst.
Verification
def test_burst_then_throttle(client, auth):
ok = sum(client.get("/search?q=x", headers=auth).status_code == 200 for _ in range(20))
assert ok == 20 # Burst absorbed.
assert client.get("/search?q=x", headers=auth).status_code == 429 # Then limited.
Related Reading
- Up to the topic: Rate Limiting and Throttling.
- Related guides: FastAPI Rate Limiting with Redis and SlowAPI and Async Correctness and Concurrency.