FastAPI: BackgroundTasks and submit / ExecutionHandle¶
Use Starlette BackgroundTasks when you want to return an HTTP response before
finishing heavy collect() work, and DataFrame.submit() when that work should run
from a thread-pool future (see EXECUTION).
End-to-end pattern¶
executor_lifespanon the app (shared pool).Depends(get_executor)in the route.- Build a
DataFrameModel(here from validated columnar JSON). submit(executor=...)returns anExecutionHandleimmediately.BackgroundTasks.add_taskrunsawait handle.result()after the response is sent.
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import BackgroundTasks, Depends, FastAPI, Request
from pydantic import BaseModel, Field
from pydantable import DataFrameModel
from pydantable.fastapi import (
columnar_dependency,
executor_lifespan,
get_executor,
register_exception_handlers,
)
class UserRow(DataFrameModel):
user_id: int = Field(validation_alias="userId")
amount: float
class EnqueueResponse(BaseModel):
accepted_rows: int
request_id: str | None
@asynccontextmanager
async def lifespan(app: FastAPI):
async with executor_lifespan(app, max_workers=8, thread_name_prefix="billing-ingest"):
yield
app = FastAPI(title="Billing ingest (example)", lifespan=lifespan)
register_exception_handlers(app)
async def persist_rollup(handle, request_id: str | None) -> None:
"""Runs after the HTTP response; holds the worker until collect finishes."""
try:
rows = await handle.result()
except Exception as exc: # noqa: BLE001 - log-only boundary in an example
# Replace with metrics / DLQ in production
print(f"[{request_id}] background collect failed: {exc}")
return
# Here: write rows to warehouse, emit metrics, etc.
print(f"[{request_id}] persisted {len(rows)} rows")
@app.post("/ingest", response_model=EnqueueResponse)
async def enqueue(
request: Request,
background_tasks: BackgroundTasks,
df: Annotated[UserRow, Depends(columnar_dependency(UserRow, trusted_mode="strict"))],
executor=Depends(get_executor),
):
if executor is None:
raise RuntimeError("executor_lifespan must be configured")
rid = getattr(request.state, "request_id", None)
n = df.shape[0]
handle = df.submit(executor=executor)
background_tasks.add_task(persist_rollup, handle, rid)
return EnqueueResponse(accepted_rows=n, request_id=rid)
Add RequestIdMiddleware from fastapi_observability if you want request_id
populated; without it, rid is None while the rest still works.
Semantics and limits¶
await handle.result()blocks the Starlette background worker until the engine finishes; size yourThreadPoolExecutoraccordingly (and avoid huge frames inBackgroundTasks).- Cancelling
await acollect()/result()does not cancel in-flight Rust work—see EXECUTION. - For jobs that must survive process restarts, use a real queue (Celery, RQ, SQS, …);
BackgroundTasksis in-process and best-effort only.
See also¶
- MATERIALIZATION — four terminal modes
- fastapi_observability — request IDs for correlating background logs
docs/examples/fastapi/service_layout/— routers + lifespan in the repo