Per-User Token Bucket Throttling in FastAPI

Key takeaways:

  • A token bucket allows bursts up to a cap while enforcing a steady average rate.
  • Store tokens and a refill timestamp per user in Redis.
  • Run refill-and-consume as one atomic Lua script to avoid races.
  • Key on the authenticated principal for per-user fairness.
  • Return 429 with Retry-After when the bucket is empty.

This guide builds the token-bucket algorithm behind Rate Limiting and Throttling, going lower-level than the SlowAPI approach.

The Problem This Solves

A fixed-window limit either rejects legitimate bursts or lets a double-limit slip past window boundaries, and a global limiter punishes users who share an IP. A per-user token bucket absorbs bursts, enforces a fair average rate, and isolates each principal.

Prerequisites

  • An authenticated principal available on the request.
  • An async Redis client supporting EVAL/Lua.

Step-by-Step Implementation

1. The atomic refill-and-consume script

-- token_bucket.lua: KEYS[1]=bucket  ARGV: capacity, refill_per_sec, now, cost
local data = redis.call('HMGET', KEYS[1], 'tokens', 'ts')
local tokens = tonumber(data[1]) or tonumber(ARGV[1])
local ts = tonumber(data[2]) or tonumber(ARGV[3])
local elapsed = math.max(0, tonumber(ARGV[3]) - ts)
-- Refill proportional to elapsed time, capped at capacity.
tokens = math.min(tonumber(ARGV[1]), tokens + elapsed * tonumber(ARGV[2]))
local allowed = 0
if tokens >= tonumber(ARGV[4]) then
  tokens = tokens - tonumber(ARGV[4])
  allowed = 1
end
redis.call('HMSET', KEYS[1], 'tokens', tokens, 'ts', ARGV[3])
redis.call('EXPIRE', KEYS[1], 3600)
return { allowed, tokens }

2. The FastAPI dependency

import time
from typing import Annotated

from fastapi import Depends, HTTPException, Request

# Loaded once; reused for every check.
_SCRIPT = open("token_bucket.lua").read()


def throttle(capacity: int, refill_per_sec: float, cost: int = 1):
    async def _dep(request: Request, user: Annotated[dict, Depends(current_user)]):
        redis = request.app.state.redis
        allowed, _ = await redis.eval(
            _SCRIPT, 1, f"tb:{user['sub']}",     # Per-principal bucket key.
            capacity, refill_per_sec, time.time(), cost,
        )
        if not allowed:
            raise HTTPException(429, "rate limit exceeded",
                                headers={"Retry-After": "1"})
    return _dep

3. Apply to a route

# 20-token bucket refilling at 5 tokens/sec → bursts of 20, ~5 rps sustained.
@router.get("/search", dependencies=[Depends(throttle(capacity=20, refill_per_sec=5))])
async def search(q: str) -> dict:
    return await run_search(q)

Edge Cases and Gotchas

  • Clock source. Pass a server timestamp into the script; relying on multiple app clocks causes drift. Using Redis TIME inside the script is even safer.
  • Cost-weighted requests. Charge expensive endpoints more tokens by raising cost.
  • Cold buckets. A first request initializes a full bucket; ensure capacity reflects an acceptable initial burst.

Verification

def test_burst_then_throttle(client, auth):
    ok = sum(client.get("/search?q=x", headers=auth).status_code == 200 for _ in range(20))
    assert ok == 20                                   # Burst absorbed.
    assert client.get("/search?q=x", headers=auth).status_code == 429  # Then limited.