Golden path: FastAPI + pydantable¶
This page is a realistic starting point for services: versioned JSON routes, a health check, non-blocking materialization with a shared thread pool, optional NDJSON streaming, and hooks for the same patterns on lazy file reads when you add storage.
Prerequisites¶
pip install "pydantable[fastapi]"
# File uploads (multipart) in routes:
pip install "python-multipart"
The [fastapi] extra installs FastAPI only. See FASTAPI for the full
integration guide and error-handling table.
What you ship¶
| Piece | Role |
|---|---|
executor_lifespan |
Attaches a ThreadPoolExecutor to app.state.executor so acollect(executor=...), pydantable.io amaterialize_* / afetch_sql, and similar offload work off the asyncio loop without starving the default thread pool under load. |
get_executor + Depends |
Injects that pool into handlers; None if you skip lifespan (still valid for acollect). |
register_exception_handlers |
MissingRustExtensionError → 503, ColumnLengthMismatchError → 400, in-route pydantic.ValidationError → 422 (see HTTP errors and exception handlers). |
| Typed routes | list[DataFrameModel.RowModel] bodies and response_model=list[YourRow] keep OpenAPI and clients aligned. |
| Streaming | astream() + ndjson_streaming_response from pydantable.fastapi for NDJSON (one JSON object per line). See FASTAPI_ENHANCEMENTS (NDJSON semantics, production lifespan snippet, troubleshooting). |
Async I/O beyond this page¶
This golden path uses in-memory frames so you can run it without a Parquet file. In production you usually chain lazy readers:
await MyModel.aread_parquet(path)(orAsync.read_parquet) →select/filter→await …acollect()- Prefer
aread_*for non-blocking open/scan setup; useamaterialize_*only when you need a fulldict[str, list]in memory first (FASTAPI, IO_OVERVIEW).
That async read + lazy plan + async materialize path is where pydantable differs
from hand-rolling asyncio.to_thread around pandas or Polars alone.
Runnable example in the repo¶
This is the full runnable example (the same file as docs/examples/fastapi/golden_path_app.py in the repo).
It includes:
GET /health— cheap probe for load balancers or Kubernetes.POST /api/v1/users— row-list body,selectthenacollect(executor=...).GET /api/v1/users/stream— NDJSON chunks fromastream.
"""Example FastAPI service using :mod:pydantable.fastapi helpers.
This mirrors a small production layout: versioned routes, a health check, a shared executor for async materialization, and NDJSON streaming for larger responses.
Run (from this directory)::
pip install 'pydantable[fastapi]'
uvicorn golden_path_app:app --reload
Smoke-test without uvicorn (from repo root)::
PYTHONPATH=python python docs/examples/fastapi/golden_path_app.py
Try::
curl -s localhost:8000/health
curl -s localhost:8000/api/v1/users -H 'Content-Type: application/json' \\
-d '[{"id":1,"age":30},{"id":2,"age":null}]'
curl -s -N localhost:8000/api/v1/users/stream
For file-backed lazy reads (aread_parquet → transforms → acollect), see
the GOLDEN_PATH_FASTAPI doc and the async_lazy_pipeline cookbook in the repo docs.
"""
from future import annotations
from contextlib import asynccontextmanager
from fastapi import APIRouter, Depends, FastAPI from pydantable import DataFrameModel from pydantable.fastapi import ( executor_lifespan, get_executor, ndjson_streaming_response, register_exception_handlers, ) from pydantic import BaseModel
class UserDF(DataFrameModel): id: int age: int | None
class UserRow(BaseModel): id: int age: int | None
@asynccontextmanager async def lifespan(app: FastAPI): # Dedicated pool avoids starving the default asyncio thread pool under load. async with executor_lifespan( app, max_workers=4, thread_name_prefix="pydantable-golden", ): yield
app = FastAPI( title="PydanTable golden path", version="1.0.0", lifespan=lifespan, ) register_exception_handlers(app)
api = APIRouter(prefix="/api/v1", tags=["users"])
@api.post("/users", response_model=list[UserRow]) async def upsert_users( rows: list[UserDF.RowModel], executor=Depends(get_executor), # noqa: B008 ): """Accept validated rows, project columns, materialize off the event loop.""" df = UserDF(rows) return await df.select("id", "age").acollect(executor=executor)
@api.get("/users/stream") async def stream_users(executor=Depends(get_executor)): # noqa: B008 """Stream column chunks as NDJSON (one JSON object per line).""" df = UserDF( {"id": [1, 2, 3], "age": [10, None, 40]}, trusted_mode="shape_only", ) return ndjson_streaming_response(df.astream(batch_size=2, executor=executor))
app.include_router(api)
@app.get("/health") def health() -> dict[str, str]: """Load balancer / Kubernetes probe: no pydantable work.""" return {"status": "ok"}
if name == "main": # Smoke-test import + routing without starting uvicorn (CI / doc example runner). from fastapi.testclient import TestClient
client = TestClient(app)
r = client.get("/health")
assert r.status_code == 200
assert r.json() == {"status": "ok"}
print("golden_path_app: ok")
Script output (running the file)¶
If you run the example file directly (without starting a server), it executes a small self-check:
curl -s localhost:8000/health
curl -s localhost:8000/api/v1/users \
-H 'Content-Type: application/json' \
-d '[{"id":1,"age":30},{"id":2,"age":null}]'
curl -s -N localhost:8000/api/v1/users/stream
Expected output (example):
{"status":"ok"}
[{"id":1,"age":30},{"id":2,"age":null}]
{"id": [1, 2], "age": [10, null]}
{"id": [3], "age": [40]}
Production checklist¶
- Paths: If you accept filesystem paths from clients, allowlist directories and reject
..and symlinks where unsafe; see FASTAPI Parquet examples. trusted_mode: Usetrusted_mode="shape_only"only when upstream already guarantees schema; default validation for untrusted sources.- Executor size: Set
max_workersfrom env (see fastapi_settings); match CPU and expected concurrent heavy requests. - Cancellation:
await acollect()does not cancel in-flight Rust/Polars work when the client disconnects; see EXECUTION.
Related docs¶
- Multi-router example (routers + lifespan):
docs/examples/fastapi/service_layout/(README in that folder) - Roadmap and “when to use what”: FASTAPI_ENHANCEMENTS
- Full FastAPI guide: FASTAPI
- HTTP status mapping: HTTP errors and exception handlers
- Columnar JSON bodies: fastapi_columnar_bodies
- Async materialization: fastapi_async_materialization
- Lazy async file pipeline: async_lazy_pipeline
- Settings (
pydantic-settings): fastapi_settings