FastAPI Integration Guide¶
This guide is the navigation + reference for using pydantable inside FastAPI: validated request bodies, typed transforms, async materialization, and streaming.
If you want the shortest runnable service first, start with GOLDEN_PATH_FASTAPI.
Start here: GOLDEN_PATH_FASTAPI (one runnable async app: lifespan, Depends, acollect, streaming).
Related recipes: fastapi_columnar_bodies (column-shaped JSON bodies), fastapi_async_materialization, fastapi_observability (request IDs + observe), fastapi_background_tasks (BackgroundTasks + submit), async_lazy_pipeline (lazy aread_* → transforms → materialize). Example service layout (routers + lifespan): docs/examples/fastapi/service_layout/ in the repo. Roadmap / “when to use what”: FASTAPI_ENHANCEMENTS.
How to read this page (quick map)¶
- If you’re building a service today, read:
- Install
- Fast path for services (the “golden path” + cookbooks)
- HTTP errors and exception handlers (422 vs 400 vs 503)
- Testing note (lifespan-aware
TestClient) - If you’re deciding row vs column payloads, jump to:
- Columnar OpenAPI and
Depends(OpenAPI columnar models +Depends) - Column-shaped JSON request bodies (columnar bodies without helpers)
- If you’re tuning async / executors / streaming, jump to:
- FASTAPI_ADVANCED (four modes + I/O patterns)
- EXECUTION and MATERIALIZATION (deep dive)
Install (what to pip install)¶
Optional helpers used throughout the FastAPI docs:
For I/O-heavy service routes (Arrow buffers, Parquet/IPC helpers, streaming writers), you’ll often also want:
Fast path for services (recommended order)¶
- Run GOLDEN_PATH_FASTAPI end-to-end.
- Pick your payload shape:
- Row list:
list[YourDF.RowModel]in requests +response_model=list[YourDTO]in responses. - Columnar JSON:
dict[str, list]shapes; see fastapi_columnar_bodies. - Decide response size:
- Small/medium:
collect()/to_dict() - Large:
astream()+ndjson_streaming_response(FASTAPI_ENHANCEMENTS) - Lock down your error mapping: HTTP errors and exception handlers.
Optional pydantable.fastapi helpers¶
Install the extra:
Then import pydantable.fastapi (not required for basic FastAPI usage):
executor_lifespan(app, max_workers=..., thread_name_prefix=...)— async context manager that attaches aThreadPoolExecutortoapp.state.executorforacollect(executor=...)andpydantable.iohelpers.get_executor(request)— forDepends(get_executor), returningrequest.app.state.executor(orNoneif unset).register_exception_handlers(app)— registers HTTP handlers forMissingRustExtensionError(503),ColumnLengthMismatchError(400), and in-handlerpydantic.ValidationError(422); see HTTP errors and exception handlers.ingest_error_response(failures, status_code=..., title=...)— build a structured JSON payload for batch ingest failures (typically produced byignore_errors=True+on_validation_errors=...). This is meant for in-handler validation failures, not request parsing errors (which remain FastAPI’s 422).ndjson_streaming_response(astream_iter)/ndjson_chunk_bytes(astream_iter)— buildapplication/x-ndjsonStreamingResponsefromawait df.astream(...)without duplicating JSON line encoding; see FASTAPI_ENHANCEMENTS.columnar_body_model,columnar_body_model_from_dataframe_model— build a Pydantic model whose fields arelist[T]per column (OpenAPI-friendlydict[str, list]). Optionalexample=/json_schema_extra=for Swagger examples.columnar_dependency(model_cls, ...),rows_dependency(model_cls, ...)—Depends(...)factories that validate the request body and return aDataFrameModelinstance (columnar JSON orlist[RowModel]), forwardingtrusted_modeand relatedDataFrameModelkwargs.validation_profile=is also supported as a preset layer overtrusted_mode/fill_missing_optional/ignore_errors. Phase 5 addsgenerate_examples=(OpenAPI enrichment) andinput_key_mode=(alias-aware columnar ingestion).
Inbound request validation is still FastAPI’s default RequestValidationError (422) when the request body fails to parse.
Columnar OpenAPI and Depends¶
Use columnar_body_model_from_dataframe_model(MyDF) as Body / response_model when clients send or receive column-shaped JSON (same shape as to_dict()). For routes, prefer columnar_dependency so you inject a MyDF directly:
from typing import Annotated
from fastapi import Depends, FastAPI
from pydantable.fastapi import columnar_dependency
app = FastAPI()
@app.post("/ingest")
def ingest(df: Annotated[User, Depends(columnar_dependency(User, trusted_mode="strict"))]) -> dict:
return df.to_dict()
To accept alias keys in columnar request bodies, use input_key_mode=:
@app.post("/ingest")
def ingest(df: Annotated[User, Depends(columnar_dependency(User, input_key_mode="both"))]) -> dict:
return df.to_dict()
For row-array JSON bodies, use rows_dependency(User); OpenAPI documents list[User.RowModel].
Nested row fields (e.g. inner: NestedModel) become list[NestedModel] in columnar JSON (one nested object per row index). That shape is valid but can be heavy on the wire; prefer flat columns when you can.
Map-like columns (dict[str, T]) use the same columnar encoding: each field is list[dict]-compatible per row (parallel dict values at each index), matching to_dict() after ingest. See Map-like columns in SUPPORTED_TYPES. For the JSON value-kind mapping (nested object vs map vs list), see JSON (RFC 8259) vs column types in SUPPORTED_TYPES. File-based JSON loading (array vs JSON Lines, eager vs lazy) is described in IO_JSON—HTTP columnar bodies use the same logical shapes as materialize_json / to_dict().
Validation layers: Pydantic validates each column as list[T] (wrong element types → 422 before your handler). Row/column length consistency is enforced when constructing DataFrameModel inside the dependency; mismatched lengths raise ColumnLengthMismatchError (subclass of ValueError). With register_exception_handlers, that maps to 400; without it, you typically see 500—see HTTP errors and exception handlers.
NDJSON streaming responses do not get a per-chunk OpenAPI schema (same as any streaming body); columnar response_model applies to single JSON to_dict() responses only.
Testing: pydantable.testing.fastapi provides fastapi_app_with_executor() and fastapi_test_client(app) (context manager) so executor_lifespan runs under TestClient and get_executor works. Use TestClient(..., raise_server_exceptions=False) when asserting 500 responses from dependencies. See tests/test_pydantable_fastapi_columnar.py.
Testing note (lifespan and TestClient)¶
FastAPI’s TestClient is synchronous; if your app uses a lifespan function (including executor_lifespan), prefer pydantable.testing.fastapi.fastapi_test_client(app) so the lifespan runs and Depends(get_executor) works. See FASTAPI_ENHANCEMENTS (Phase 7).
Testing routes (TestClient)¶
Use FastAPI’s TestClient (synchronous) to exercise handlers without a live server. Install fastapi and httpx (included in the [dev] extra).
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import BaseModel
from pydantable import DataFrameModel
class UserDF(DataFrameModel):
id: int
age: int | None
class UserRow(BaseModel):
id: int
age: int | None
app = FastAPI()
@app.post("/users", response_model=list[UserRow])
def create_users(rows: list[UserDF.RowModel]):
df = UserDF(rows)
return df.collect()
client = TestClient(app)
r = client.post("/users", json=[{"id": 1, "age": 20}])
assert r.status_code == 200
assert r.json() == [{"id": 1, "age": 20}]
Column-shaped bodies are plain dict[str, list] JSON; use a dict parameter (or a Pydantic model wrapping that shape) and construct DataFrameModel(..., trusted_mode="shape_only") when the payload is trusted.
HTTP errors and exception handlers¶
| Situation | Typical exception | HTTP status | Notes |
|---|---|---|---|
| Invalid JSON body shape / types at the boundary | fastapi.exceptions.RequestValidationError |
422 | FastAPI’s default handler (before your route runs). |
Manual validation inside a route (e.g. model_validate) |
pydantic.ValidationError |
422 | Use register_exception_handlers from pydantable.fastapi, or map yourself. |
| Native extension missing | MissingRustExtensionError |
503 | register_exception_handlers returns a JSON detail string. |
| Mismatched column lengths after Pydantic accepted the body | ColumnLengthMismatchError |
400 | From register_exception_handlers; JSON {"detail": "<message>"}. |
| Engine / plan / transform errors | Often ValueError |
400 / 422 / 500 | Do not map all ValueErrors globally; register_exception_handlers only adds ColumnLengthMismatchError. |
Common building blocks (what most services use)¶
- Request bodies:
list[YourDF.RowModel](row list) or columnar JSON via fastapi_columnar_bodies. - Responses:
collect()/await acollect()→list[Row](best withresponse_model=list[YourDTO])to_dict()/await ato_dict()→dict[str, list](columnar JSON)astream()+ndjson_streaming_response(...)→ NDJSON streaming for large responses- Async routes: use
await ...acollect(...)so the loop stays responsive; prefer a bounded executor viaexecutor_lifespan+Depends(get_executor). - Testing: use
pydantable.testing.fastapi.fastapi_test_client(app)so lifespan runs.
If you need deeper I/O/materialization patterns, see FASTAPI_ADVANCED and the cookbook index (index).
Responses: columnar vs row-shaped¶
to_dict()/await ato_dict()—dict[str, list]; one JSON object with parallel arrays.collect()/await acollect()—listof Pydantic models for the current schema; return it from the handler and letresponse_modeldefine OpenAPI and validate the serialized response.to_dicts()/await ato_dicts()—list[dict]from row models when you want plain dicts without a separate DTO class.await ato_polars()— optional PolarsDataFramewhen the[polars]extra is installed (same semantics asto_polars()).
Advanced topics¶
If you need deeper async + I/O patterns (four materialization modes, DataFrameModel I/O in async def,
experimental URL transports, etc.), see FASTAPI_ADVANCED.
Trusted ingest (trusted_mode)¶
For DataFrameModel(...) and DataFrame[Schema](...), ingestion defaults to full per-cell validation (trusted_mode="off"). For trusted bulk paths (pre-validated upstream data, internal services), use:
| Mode | Meaning |
|---|---|
trusted_mode="off" |
Default: full Pydantic validation per cell (same as omitting the argument). |
trusted_mode="shape_only" |
Skip element validation; still checks column names and row counts. May emit DtypeDriftWarning when payloads would fail strict (see SUPPORTED_TYPES). |
trusted_mode="strict" |
Trusted bulk input plus dtype / nested-shape checks against the schema (including Polars columns). |
Details: DATAFRAMEMODEL.md, SUPPORTED_TYPES.md.
from pydantable import DataFrameModel
class UserDF(DataFrameModel):
id: int
age: int | None
# Trusted path: caller guarantees rows already match RowModel (e.g. validated earlier in the pipeline).
df = UserDF(
[{"id": 1, "age": 20}, {"id": 2, "age": None}],
trusted_mode="shape_only",
)
Column-shaped JSON request bodies¶
Row lists are natural for OpenAPI (list[YourRowModel]). Some clients send columnar JSON (parallel arrays). Model that with a Pydantic body whose fields are lists, then pass a dict[str, list] into DataFrameModel:
from pydantic import BaseModel
from pydantable import DataFrameModel
class UserDF(DataFrameModel):
id: int
age: int | None
class UsersColumnarBody(BaseModel):
"""OpenAPI-friendly columnar payload: keys match dataframe columns."""
id: list[int]
age: list[int | None]
# In a route: UsersColumnarBody validated by FastAPI, then:
body = UsersColumnarBody(id=[1, 2], age=[20, None])
df = UserDF({"id": body.id, "age": body.age})
Same schema rules apply as for columnar constructors in DATAFRAMEMODEL.md (equal-length columns, types per field).
Parquet and Arrow IPC uploads (multipart)¶
For upload routes and deeper async I/O patterns, see FASTAPI_ADVANCED and the I/O docs: IO_OVERVIEW, IO_HTTP, and the cookbook pages under index.
Injectable executor with Depends¶
Besides lifespan + app.state, you can inject a ThreadPoolExecutor (or None) via Depends so tests and routes share one pattern:
from concurrent.futures import ThreadPoolExecutor
from typing import Annotated
from fastapi import Depends, FastAPI
from pydantable import DataFrameModel
class UserDF(DataFrameModel):
id: int
age: int | None
_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="pydantable")
def get_df_executor() -> ThreadPoolExecutor:
return _executor
app = FastAPI()
@app.post("/users-async")
async def create_users_async(
rows: list[UserDF.RowModel],
executor: Annotated[ThreadPoolExecutor, Depends(get_df_executor)],
):
df = UserDF(rows)
return await df.acollect(executor=executor)
Pass executor=None (omit Depends) to keep asyncio.to_thread as in the Async routes example.
Use the same injected executor for await MyModel.aread_parquet(..., executor=ex) / await afetch_sql_raw(..., executor=ex) (or await afetch_sqlmodel(..., executor=ex)) when you want file and SQL I/O to share the pool with await df.acollect(executor=ex).
When to use the default thread pool vs a shared ThreadPoolExecutor: The default (executor=None) uses asyncio.to_thread, which schedules work on the interpreter’s default executor—fine for light or sporadic I/O. Prefer a dedicated bounded ThreadPoolExecutor (injected via Depends or app.state) when you need predictable concurrency limits, shared naming for observability, coordinated shutdown in lifespan, or to avoid competing with other libraries for the default pool under load. Neither choice makes Rust execution “more async”; both offload blocking work from the event loop.
Background tasks¶
Use Starlette BackgroundTasks for work that must run after the response is sent (e.g. logging metrics, cache warming). Background code cannot change the HTTP body; exceptions should be logged or handled inside the task— they do not become 500 responses. For dataframe work that must complete before the client receives data, keep await df.ato_dict() (or similar) in the handler instead.
Validation errors and HTTP status codes¶
- FastAPI / Pydantic validate route parameters and body models first. Type mismatches on
list[YourRowModel]typically produce422 Unprocessable Entitywith a structured error body. - Application logic after validation: map expected domain failures to
HTTPException(status_code=..., detail=...)(e.g.404,409). UncaughtValueError/TypeErrorfromDataFrameModel(...)(unknown columns, length mismatch,TypeErrorwhentrusted_modeis wrong for Polars) become500unless you catch them and translate.
Recommend validating untrusted JSON with default trusted_mode; reserve shape_only / strict for authenticated internal pipelines or files you control.
Async routes, executors, and lifespan¶
For async def handlers, await the async materialization helpers instead of calling collect() / to_dict() directly (unless you intentionally block the loop).
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from fastapi import FastAPI
from pydantic import BaseModel
from pydantable import DataFrameModel
class UserDF(DataFrameModel):
id: int
age: int | None
class UserRow(BaseModel):
id: int
age: int | None
@asynccontextmanager
async def lifespan(app: FastAPI):
# Bounded pool for dataframe materialization (optional; default is asyncio’s thread pool).
executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="pydantable")
app.state.df_executor = executor
yield
executor.shutdown(wait=True)
app = FastAPI(lifespan=lifespan)
@app.post("/users-async", response_model=list[UserRow])
async def create_users_async(rows: list[UserDF.RowModel]):
df = UserDF(rows)
ex = app.state.df_executor
return await df.acollect(executor=ex)
Without a custom executor, await df.acollect() is enough: pydantable uses asyncio.to_thread.
Chunked column dicts (stream / astream)¶
Full StreamingResponse examples (sync def + stream, async def + astream) are in FASTAPI_ADVANCED (four materialization modes).
DataFrame.stream() and DataFrame.astream() yield dict[str, list] batches after one engine collect (same contract as collect_batches; see EXECUTION). They do not avoid holding the full materialized result in memory before chunking—use pagination or external storage when the table is too large for one collect. If you need one blob first, await df.ato_dict() / await df.arows() and then build your own response shape is still valid.
Large tables, Polars, Arrow, and trust boundaries¶
Default (trusted_mode="off") is the right choice for public or untrusted HTTP bodies: every cell is validated against your RowModel types before any Rust work runs. Use it when clients can send arbitrary JSON.
When trusted_mode is appropriate in routes
| Situation | Suggested mode | Notes |
|---|---|---|
| Internal service-to-service batch (same org, authN/Z at gateway) | shape_only or strict |
You still enforce column names and row counts; strict adds dtype / nested-shape checks for Polars and columnar buffers. |
| Upstream already validated rows (e.g. warehouse export, replay from your own DB) | shape_only |
Fastest path; assumes wire format matches schema. |
Polars DataFrame or NumPy / PyArrow columns built inside your stack |
strict |
Checks Polars dtypes (and Python column buffers where implemented) against annotations; see SUPPORTED_TYPES.md (“Runtime column payloads”). |
Who may skip full RowModel validation
- Only code paths where mis-typed data cannot reach the constructor without a deliberate privilege break (private workers, ETL you own, or payloads already validated by Pydantic at an earlier hop).
- Do not attach
trusted_mode="shape_only"/strictdirectly to a public upload endpoint that accepts raw user JSON unless another layer has already validated every cell.
Polars and Arrow in handlers
- Passing a Polars
DataFramerequires trusted mode (shape_onlyorstrict); seeDATAFRAMEMODEL.md. strictrejects Polars columns whose dtypes do not match the schema (including nested list / struct / map shapes). Preferstrictwhen the frame comes from Arrow/Parquet/IPC and you want a safety net without per-cell Pydantic.- For performance characteristics (validation vs ingest vs
collect()), seePERFORMANCE.mdandEXECUTION.md.
End-to-end examples (moved to cookbook)¶
The longer “Example 1/2/3” service patterns live in the cookbook now: fastapi_end_to_end_examples.
Error timing and API safety¶
In the current Rust-first design:
- invalid expression type combinations fail while building the expression AST
- invalid
filter()condition types fail before execution - invalid
select()projections (for example, empty projections) fail from Rust logical-plan validation before execution
That keeps handlers predictable: many errors surface before collect() runs.
Practical pattern for larger apps¶
- Routes: Pydantic request/response models;
collect()for row-list responses. - Services:
DataFrameModeltransforms (reusable across HTTP, CLI, workers). - Adapters: load/save column dicts or row lists from databases, queues, or object storage.
This keeps schema and transformation contracts in one typed layer.
OpenAPI: list[YourDF.RowModel] and nested Pydantic fields follow Pydantic v2 JSON Schema generation; there is nothing pydantable-specific beyond the generated RowModel types. Inspect GET /openapi.json in tests when you need stable schema snapshots.