Running ARQ Workers with FastAPI
Key takeaways:
- ARQ tasks are async functions that take a context dict plus their arguments.
- The FastAPI process enqueues jobs through a Redis pool opened in lifespan.
- Pass an explicit
_job_idto deduplicate double submissions. WorkerSettingsconfigures functions, retries, and cron jobs.- The worker runs as its own process, scaling independently of the web tier.
This guide implements the ARQ path from Background Task Processing and its comparison with Celery.
The Problem This Solves
You need durable, retryable background work in an async FastAPI codebase without the ceremony of Celery. ARQ is async-native and Redis-backed, so it fits an async stack with minimal moving parts while still giving you persistence, retries, and scheduling.
Prerequisites
- Redis reachable from both the web and worker processes.
arqinstalled; Python 3.11+.
Step-by-Step Implementation
1. Define tasks and worker settings
# app/worker.py
from arq import cron
from arq.connections import RedisSettings
async def sync_invoice(ctx: dict, invoice_id: str) -> None:
# Idempotent: safe for ARQ to retry on failure.
await push_to_accounting(invoice_id)
async def nightly_cleanup(ctx: dict) -> None:
await purge_expired_tokens()
class WorkerSettings:
functions = [sync_invoice]
cron_jobs = [cron(nightly_cleanup, hour=3, minute=0)] # Runs at 03:00 daily.
redis_settings = RedisSettings()
max_tries = 5 # Retry with backoff.
2. Open the pool in the FastAPI lifespan
from contextlib import asynccontextmanager
from arq import create_pool
from arq.connections import RedisSettings
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.arq = await create_pool(RedisSettings())
yield
await app.state.arq.close()
3. Enqueue from a route
from fastapi import Request
@app.post("/invoices/{invoice_id}/sync")
async def sync(invoice_id: str, request: Request) -> dict[str, str]:
# _job_id dedupes repeat submissions of the same invoice.
await request.app.state.arq.enqueue_job("sync_invoice", invoice_id,
_job_id=f"sync:{invoice_id}")
return {"status": "queued", "invoice_id": invoice_id}
4. Run the worker as its own process
# Separate process → workers scale independently of the web tier.
CMD ["arq", "app.worker.WorkerSettings"]
Edge Cases and Gotchas
- Separate database pool. The worker creates its own engine; budget its connections alongside the web tier, per Async Database Sessions.
- Large arguments. Pass IDs, not ORM objects; arguments are serialized to Redis.
- Job result expiry. Results expire by default; configure retention if you read them.
Verification
async def test_enqueue(client_app):
pool = client_app.state.arq
job = await pool.enqueue_job("sync_invoice", "inv-1", _job_id="sync:inv-1")
assert job is not None
# Enqueuing the same _job_id again returns None (deduplicated).
dup = await pool.enqueue_job("sync_invoice", "inv-1", _job_id="sync:inv-1")
assert dup is None
Related Reading
- Up to the topic: Background Task Processing.
- Related guides: FastAPI BackgroundTasks vs Celery vs ARQ and Observability and Tracing.