Skip to content

FastAPI (advanced)

This page contains less common FastAPI integration topics: deeper async/I/O patterns, experimental URL transports, and “how the pieces fit” when you’re building larger systems.

If you’re looking for the common path, start with GOLDEN_PATH_FASTAPI, then the short index + reference tables in FASTAPI.

Four materialization modes (FastAPI)

The same lazy plan can be materialized in four ways; see MATERIALIZATION for the full table and PlanMaterialization.

Below, routes read Parquet from a server-local path (shared volume, artifact from an upstream job, or a temp file you wrote after await upload.read()). In production, validate and sandbox paths (allowlist directories, reject .., etc.). trusted_mode="shape_only" matches typical “file already matches our schema” pipelines; use default trusted_mode when you need full cell validation.

Row-list JSON bodies are covered in FASTAPI and fastapi_columnar_bodies; async file routes should await MyModel.aread_* (lazy scan, blocking open/read off the event loop) rather than await amaterialize_*, which builds a full dict[str, list] first. SQL: await afetch_sqlmodel / await afetch_sql_raw as needed (IO_SQL).

1. Blocking — sync def + lazy read_parquet + collect() / to_dict()

Sync read_parquet keeps work on a Polars LazyFrame until collect() / to_dict() (see EXECUTION).

from fastapi import FastAPI, Query
from pydantic import BaseModel

from pydantable import DataFrameModel

app = FastAPI()


class UserDF(DataFrameModel):
    id: int
    age: int | None


class UserRow(BaseModel):
    id: int
    age: int | None


@app.get("/users-blocking", response_model=list[UserRow])
def report_from_parquet_blocking(path: str = Query(..., description="Readable Parquet path on server")):
    df = UserDF.read_parquet(path, trusted_mode="shape_only").select("id", "age")
    return df.collect()


@app.get("/users-columnar-blocking")
def columnar_from_parquet_blocking(path: str = Query(...)):
    df = UserDF.read_parquet(path, trusted_mode="shape_only")
    return df.to_dict()

2. Async — async def + await collect() / await to_dict() (or acollect / ato_dict)

aread_* (or UserDF.Async.read_parquet, …) returns AwaitableDataFrameModel: chain lazy transforms (select, filter, …) and use one leading await on collect() / to_dict() — unprefixed aliases of acollect() / ato_dict().

from fastapi import FastAPI, Query
from pydantic import BaseModel

from pydantable import DataFrameModel

app = FastAPI()


class UserDF(DataFrameModel):
    id: int
    age: int | None


class UserRow(BaseModel):
    id: int
    age: int | None


@app.get("/users-async", response_model=list[UserRow])
async def report_from_parquet(path: str = Query(...)):
    return await UserDF.Async.read_parquet(path, trusted_mode="shape_only").select(
        "id", "age"
    ).collect()


@app.get("/users-columnar-async")
async def columnar_async(path: str = Query(...)):
    return await UserDF.Async.read_parquet(path, trusted_mode="shape_only").to_dict()

3. Deferred — submit() + await handle.result()

import asyncio

from fastapi import FastAPI, Query
from pydantic import BaseModel

from pydantable import DataFrameModel

app = FastAPI()


class UserDF(DataFrameModel):
    id: int
    age: int | None


class UserRow(BaseModel):
    id: int
    age: int | None


@app.get("/users-deferred", response_model=list[UserRow])
async def report_deferred(path: str = Query(...)):
    df = await UserDF.aread_parquet(path, trusted_mode="shape_only")
    handle = df.select("id", "age").submit()
    return await handle.result()


@app.get("/users-two-deferred")
async def two_cohorts_deferred(path_a: str = Query(...), path_b: str = Query(...)):
    df_a, df_b = await asyncio.gather(
        UserDF.aread_parquet(path_a, trusted_mode="shape_only"),
        UserDF.aread_parquet(path_b, trusted_mode="shape_only"),
    )
    h_a = df_a.select("id", "age").submit()
    h_b = df_b.select("id", "age").submit()
    out_a, out_b = await asyncio.gather(h_a.result(), h_b.result())
    return {"cohort_a": out_a, "cohort_b": out_b}

4. Chunked — stream() / astream() + streaming body

import json

from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse

from pydantable import DataFrameModel

app = FastAPI()


class UserDF(DataFrameModel):
    id: int
    age: int | None


def ndjson_sync(df):
    for batch in df.stream(batch_size=1_000):
        yield (json.dumps(batch, default=str) + "\n").encode()


@app.get("/users-stream-sync")
def users_stream_sync(path: str = Query(...)):
    df = UserDF.read_parquet(path, trusted_mode="shape_only").select("id", "age")
    return StreamingResponse(ndjson_sync(df), media_type="application/x-ndjson")


async def ndjson_async(df):
    async for batch in df.astream(batch_size=1_000):
        yield (json.dumps(batch, default=str) + "\n").encode()


@app.get("/users-stream-async")
async def users_stream_async(path: str = Query(...)):
    df = await UserDF.aread_parquet(path, trusted_mode="shape_only")
    df = df.select("id", "age")
    return StreamingResponse(ndjson_async(df), media_type="application/x-ndjson")

These are chunked replay responses, not out-of-core Polars streaming; very large tables may need pagination or writing to object storage instead (EXECUTION).

DataFrameModel I/O in async def routes

Prefer await MyModel.aread_*, await afetch_sqlmodel / await afetch_sql_raw when you need SQL (from pydantable import …), await MyModel.aexport_*, and await MyModel.awrite_sql / await MyModel.awrite_sqlmodel.

Install what you need:

pip install "pydantable[io]"
pip install "pydantable[sql]"     # plus a DBAPI driver
pip install "pydantable[cloud]"   # fsspec backends (experimental)
pip install "pydantable[rap]"     # rapcsv + rapfiles (optional)

Experimental HTTP(S) and object-store URLs

HTTP(S) helpers download with stdlib urllib, then parse (experimental): set PYDANTABLE_IO_EXPERIMENTAL=1 or pass experimental=True to URL helpers. Do not fetch untrusted URLs without size limits and timeouts at your gateway.

Object-store URIs (s3://, gs://, file://, …) use fsspec (pydantable[cloud]) and are also experimental.

Optional: true-async CSV with aread_csv_rap

amaterialize_csv uses asyncio.to_thread around sync/Rust paths. aread_csv_rap (install pydantable[rap]) uses rapcsv + rapfiles for async file reads without that thread offload.