Skip to content

Golden path: FastAPI + pydantable

This page is a realistic starting point for services: versioned JSON routes, a health check, non-blocking materialization with a shared thread pool, optional NDJSON streaming, and hooks for the same patterns on lazy file reads when you add storage.

Prerequisites

pip install "pydantable[fastapi]"
# File uploads (multipart) in routes:
pip install "python-multipart"

The [fastapi] extra installs FastAPI only. See FASTAPI for the full integration guide and error-handling table.

What you ship

Piece Role
executor_lifespan Attaches a ThreadPoolExecutor to app.state.executor so acollect(executor=...), pydantable.io amaterialize_* / afetch_sql, and similar offload work off the asyncio loop without starving the default thread pool under load.
get_executor + Depends Injects that pool into handlers; None if you skip lifespan (still valid for acollect).
register_exception_handlers MissingRustExtensionError → 503, ColumnLengthMismatchError → 400, in-route pydantic.ValidationError → 422 (see HTTP errors and exception handlers).
Typed routes list[DataFrameModel.RowModel] bodies and response_model=list[YourRow] keep OpenAPI and clients aligned.
Streaming astream() + ndjson_streaming_response from pydantable.fastapi for NDJSON (one JSON object per line). See FASTAPI_ENHANCEMENTS (NDJSON semantics, production lifespan snippet, troubleshooting).

Async I/O beyond this page

This golden path uses in-memory frames so you can run it without a Parquet file. In production you usually chain lazy readers:

  • await MyModel.aread_parquet(path) (or Async.read_parquet) → select / filterawait …acollect()
  • Prefer aread_* for non-blocking open/scan setup; use amaterialize_* only when you need a full dict[str, list] in memory first (FASTAPI, IO_OVERVIEW).

That async read + lazy plan + async materialize path is where pydantable differs from hand-rolling asyncio.to_thread around pandas or Polars alone.

Runnable example in the repo

This is the full runnable example (the same file as docs/examples/fastapi/golden_path_app.py in the repo). It includes:

  • GET /health — cheap probe for load balancers or Kubernetes.
  • POST /api/v1/users — row-list body, select then acollect(executor=...).
  • GET /api/v1/users/stream — NDJSON chunks from astream.

"""Example FastAPI service using :mod:pydantable.fastapi helpers.

This mirrors a small production layout: versioned routes, a health check, a shared executor for async materialization, and NDJSON streaming for larger responses.

Run (from this directory)::

pip install 'pydantable[fastapi]'
uvicorn golden_path_app:app --reload

Smoke-test without uvicorn (from repo root)::

PYTHONPATH=python python docs/examples/fastapi/golden_path_app.py

Try::

curl -s localhost:8000/health
curl -s localhost:8000/api/v1/users -H 'Content-Type: application/json' \\
  -d '[{"id":1,"age":30},{"id":2,"age":null}]'
curl -s -N localhost:8000/api/v1/users/stream

For file-backed lazy reads (aread_parquet → transforms → acollect), see the GOLDEN_PATH_FASTAPI doc and the async_lazy_pipeline cookbook in the repo docs. """

from future import annotations

from contextlib import asynccontextmanager

from fastapi import APIRouter, Depends, FastAPI from pydantable import DataFrameModel from pydantable.fastapi import ( executor_lifespan, get_executor, ndjson_streaming_response, register_exception_handlers, ) from pydantic import BaseModel

class UserDF(DataFrameModel): id: int age: int | None

class UserRow(BaseModel): id: int age: int | None

@asynccontextmanager async def lifespan(app: FastAPI): # Dedicated pool avoids starving the default asyncio thread pool under load. async with executor_lifespan( app, max_workers=4, thread_name_prefix="pydantable-golden", ): yield

app = FastAPI( title="PydanTable golden path", version="1.0.0", lifespan=lifespan, ) register_exception_handlers(app)

api = APIRouter(prefix="/api/v1", tags=["users"])

@api.post("/users", response_model=list[UserRow]) async def upsert_users( rows: list[UserDF.RowModel], executor=Depends(get_executor), # noqa: B008 ): """Accept validated rows, project columns, materialize off the event loop.""" df = UserDF(rows) return await df.select("id", "age").acollect(executor=executor)

@api.get("/users/stream") async def stream_users(executor=Depends(get_executor)): # noqa: B008 """Stream column chunks as NDJSON (one JSON object per line).""" df = UserDF( {"id": [1, 2, 3], "age": [10, None, 40]}, trusted_mode="shape_only", ) return ndjson_streaming_response(df.astream(batch_size=2, executor=executor))

app.include_router(api)

@app.get("/health") def health() -> dict[str, str]: """Load balancer / Kubernetes probe: no pydantable work.""" return {"status": "ok"}

if name == "main": # Smoke-test import + routing without starting uvicorn (CI / doc example runner). from fastapi.testclient import TestClient

client = TestClient(app)
r = client.get("/health")
assert r.status_code == 200
assert r.json() == {"status": "ok"}
print("golden_path_app: ok")
cd docs/examples/fastapi
uvicorn golden_path_app:app --reload

Script output (running the file)

If you run the example file directly (without starting a server), it executes a small self-check:

PYTHONPATH=python python docs/examples/fastapi/golden_path_app.py
golden_path_app: ok
curl -s localhost:8000/health
curl -s localhost:8000/api/v1/users \
  -H 'Content-Type: application/json' \
  -d '[{"id":1,"age":30},{"id":2,"age":null}]'
curl -s -N localhost:8000/api/v1/users/stream

Expected output (example):

{"status":"ok"}
[{"id":1,"age":30},{"id":2,"age":null}]
{"id": [1, 2], "age": [10, null]}
{"id": [3], "age": [40]}

Production checklist

  • Paths: If you accept filesystem paths from clients, allowlist directories and reject .. and symlinks where unsafe; see FASTAPI Parquet examples.
  • trusted_mode: Use trusted_mode="shape_only" only when upstream already guarantees schema; default validation for untrusted sources.
  • Executor size: Set max_workers from env (see fastapi_settings); match CPU and expected concurrent heavy requests.
  • Cancellation: await acollect() does not cancel in-flight Rust/Polars work when the client disconnects; see EXECUTION.