Running ARQ Workers with FastAPI

Key takeaways:

  • ARQ tasks are async functions that take a context dict plus their arguments.
  • The FastAPI process enqueues jobs through a Redis pool opened in lifespan.
  • Pass an explicit _job_id to deduplicate double submissions.
  • WorkerSettings configures functions, retries, and cron jobs.
  • The worker runs as its own process, scaling independently of the web tier.

This guide implements the ARQ path from Background Task Processing and its comparison with Celery.

The Problem This Solves

You need durable, retryable background work in an async FastAPI codebase without the ceremony of Celery. ARQ is async-native and Redis-backed, so it fits an async stack with minimal moving parts while still giving you persistence, retries, and scheduling.

Prerequisites

  • Redis reachable from both the web and worker processes.
  • arq installed; Python 3.11+.

Step-by-Step Implementation

1. Define tasks and worker settings

# app/worker.py
from arq import cron
from arq.connections import RedisSettings


async def sync_invoice(ctx: dict, invoice_id: str) -> None:
    # Idempotent: safe for ARQ to retry on failure.
    await push_to_accounting(invoice_id)


async def nightly_cleanup(ctx: dict) -> None:
    await purge_expired_tokens()


class WorkerSettings:
    functions = [sync_invoice]
    cron_jobs = [cron(nightly_cleanup, hour=3, minute=0)]   # Runs at 03:00 daily.
    redis_settings = RedisSettings()
    max_tries = 5                                           # Retry with backoff.

2. Open the pool in the FastAPI lifespan

from contextlib import asynccontextmanager

from arq import create_pool
from arq.connections import RedisSettings
from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.arq = await create_pool(RedisSettings())
    yield
    await app.state.arq.close()

3. Enqueue from a route

from fastapi import Request


@app.post("/invoices/{invoice_id}/sync")
async def sync(invoice_id: str, request: Request) -> dict[str, str]:
    # _job_id dedupes repeat submissions of the same invoice.
    await request.app.state.arq.enqueue_job("sync_invoice", invoice_id,
                                            _job_id=f"sync:{invoice_id}")
    return {"status": "queued", "invoice_id": invoice_id}

4. Run the worker as its own process

# Separate process → workers scale independently of the web tier.
CMD ["arq", "app.worker.WorkerSettings"]

Edge Cases and Gotchas

  • Separate database pool. The worker creates its own engine; budget its connections alongside the web tier, per Async Database Sessions.
  • Large arguments. Pass IDs, not ORM objects; arguments are serialized to Redis.
  • Job result expiry. Results expire by default; configure retention if you read them.

Verification

async def test_enqueue(client_app):
    pool = client_app.state.arq
    job = await pool.enqueue_job("sync_invoice", "inv-1", _job_id="sync:inv-1")
    assert job is not None
    # Enqueuing the same _job_id again returns None (deduplicated).
    dup = await pool.enqueue_job("sync_invoice", "inv-1", _job_id="sync:inv-1")
    assert dup is None