Skip to content

pydantable.fastapi

Optional FastAPI integration (install pydantable[fastapi]). See the Golden path.

pydantable.fastapi

Optional FastAPI helpers (install pydantable[fastapi]).

import pydantable does not require FastAPI; import this submodule when you use FastAPI in your service.

ColumnLengthMismatchError

Bases: PydantableUserError

Raised when per-column list lengths disagree for a rectangular table mapping.

Typical cause: constructing a dict[str, list] or equivalent where one column has a different row count than the others.

Source code in python/pydantable/errors.py
class ColumnLengthMismatchError(PydantableUserError):
    """Raised when per-column list lengths disagree for a rectangular table mapping.

    Typical cause: constructing a ``dict[str, list]`` or equivalent where one column
    has a different row count than the others.
    """

PydantableUserError

Bases: ValueError

Base class for predictable validation and contract failures.

Subclasses :exc:ValueError so except ValueError still matches; use this base or a concrete subclass when you need to distinguish pydantable errors from other :exc:ValueError sources.

Source code in python/pydantable/errors.py
class PydantableUserError(ValueError):
    """Base class for predictable validation and contract failures.

    Subclasses :exc:`ValueError` so ``except ValueError`` still matches; use this
    base or a concrete subclass when you need to distinguish pydantable errors from
    other :exc:`ValueError` sources.
    """

IngestValidationErrorDetail

Bases: BaseModel

Top-level error payload for a batch ingest operation.

Source code in python/pydantable/ingest_errors.py
class IngestValidationErrorDetail(BaseModel):
    """Top-level error payload for a batch ingest operation."""

    model_config = ConfigDict(extra="forbid")

    title: str = "Ingest validation failed"
    failures: list[IngestRowFailure]

ingest_error_response

ingest_error_response(failures, *, status_code=422, title='Ingest validation failed')

Return a structured JSON response describing batch ingest validation failures.

Intended for use with ignore_errors=True and on_validation_errors=... where you accept partial success but need a stable error payload for clients.

Source code in python/pydantable/fastapi/__init__.py
def ingest_error_response(
    failures: object,
    *,
    status_code: int = 422,
    title: str = "Ingest validation failed",
) -> Any:
    """
    Return a structured JSON response describing batch ingest validation failures.

    Intended for use with `ignore_errors=True` and `on_validation_errors=...` where you
    accept partial success but need a stable error payload for clients.
    """
    from fastapi.responses import JSONResponse

    coerced = coerce_validation_failures(failures)
    detail = IngestValidationErrorDetail(title=title, failures=coerced)
    return JSONResponse(status_code=status_code, content=detail.model_dump(mode="json"))

executor_lifespan async

executor_lifespan(app, *, max_workers=None, thread_name_prefix='pydantable')

Attach a thread pool executor to app.state.executor.

Use with FastAPI's lifespan so acollect(executor=...) and pydantable.io helpers can share a dedicated pool instead of the default asyncio thread pool.

Example::

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with executor_lifespan(app, max_workers=4):
        yield

app = FastAPI(lifespan=lifespan)
Source code in python/pydantable/fastapi/__init__.py
@asynccontextmanager
async def executor_lifespan(
    app: FastAPI,
    *,
    max_workers: int | None = None,
    thread_name_prefix: str = "pydantable",
) -> AsyncIterator[None]:
    """Attach a thread pool executor to ``app.state.executor``.

    Use with FastAPI's ``lifespan`` so ``acollect(executor=...)`` and
    ``pydantable.io`` helpers can share a dedicated pool instead of the default
    ``asyncio`` thread pool.

    Example::

        @asynccontextmanager
        async def lifespan(app: FastAPI):
            async with executor_lifespan(app, max_workers=4):
                yield

        app = FastAPI(lifespan=lifespan)
    """
    executor = ThreadPoolExecutor(
        max_workers=max_workers,
        thread_name_prefix=thread_name_prefix,
    )
    app.state.executor = executor
    try:
        yield
    finally:
        executor.shutdown(wait=True)

ndjson_chunk_bytes async

ndjson_chunk_bytes(chunks)

Yield dict[str, list] chunks as UTF-8 NDJSON lines (JSON + newline).

Use with the async iterator from :meth:pydantable.dataframe.DataFrame.astream when building a custom :class:starlette.responses.StreamingResponse.

Source code in python/pydantable/fastapi/__init__.py
async def ndjson_chunk_bytes(
    chunks: AsyncIterator[dict[str, Any]],
) -> AsyncIterator[bytes]:
    """Yield ``dict[str, list]`` chunks as UTF-8 NDJSON lines (JSON + newline).

    Use with the async iterator from :meth:`pydantable.dataframe.DataFrame.astream`
    when building a custom :class:`starlette.responses.StreamingResponse`.
    """
    async for chunk in chunks:
        yield json.dumps(chunk).encode("utf-8") + b"\n"

ndjson_streaming_response

ndjson_streaming_response(chunks, *, media_type='application/x-ndjson')

Wrap an astream() iterator as a NDJSON :class:StreamingResponse.

Each chunk is JSON-encoded on its own line (common for streaming columnar dicts to HTTP clients).

Source code in python/pydantable/fastapi/__init__.py
def ndjson_streaming_response(
    chunks: AsyncIterator[dict[str, Any]],
    *,
    media_type: str = "application/x-ndjson",
) -> StreamingResponse:
    """Wrap an ``astream()`` iterator as a NDJSON :class:`StreamingResponse`.

    Each chunk is JSON-encoded on its own line (common for streaming columnar
    dicts to HTTP clients).
    """
    return StreamingResponse(
        ndjson_chunk_bytes(chunks),
        media_type=media_type,
    )

get_executor

get_executor(request)

Return app.state.executor when set by :func:executor_lifespan.

Use with Depends(get_executor) and pass the result to acollect(executor=...).

Source code in python/pydantable/fastapi/__init__.py
def get_executor(request: Request) -> Executor | None:
    """Return ``app.state.executor`` when set by :func:`executor_lifespan`.

    Use with ``Depends(get_executor)`` and pass the result to
    ``acollect(executor=...)``.
    """
    return getattr(request.app.state, "executor", None)

register_exception_handlers

register_exception_handlers(app)

Register HTTP-friendly handlers for common pydantable / Pydantic errors.

  • :exc:~pydantable.MissingRustExtensionError503 (native extension missing).
  • :exc:~pydantable.errors.ColumnLengthMismatchError400 with detail string.
  • :exc:pydantic.ValidationError422 with detail as Pydantic's error list.

Inbound request body validation is usually handled by FastAPI as :exc:fastapi.exceptions.RequestValidationError (422) before your route runs. This handler covers :exc:~pydantic.ValidationError raised inside handlers (for example manual model_validate). Other :exc:ValueError subclasses from the engine are not handled here — map those in your routes if needed.

Idempotent for duplicate registration: re-calling on the same app replaces handlers for the same exception types (Starlette/FastAPI behavior).

Source code in python/pydantable/fastapi/__init__.py
def register_exception_handlers(app: FastAPI) -> None:
    """Register HTTP-friendly handlers for common pydantable / Pydantic errors.

    - :exc:`~pydantable.MissingRustExtensionError` → **503** (native extension missing).
    - :exc:`~pydantable.errors.ColumnLengthMismatchError` → **400** with ``detail``
      string.
    - :exc:`pydantic.ValidationError` → **422** with ``detail`` as Pydantic's error
      list.

    Inbound request body validation is usually handled by FastAPI as
    :exc:`fastapi.exceptions.RequestValidationError` (**422**) before your route runs.
    This handler covers :exc:`~pydantic.ValidationError` raised inside handlers (for
    example manual ``model_validate``). Other :exc:`ValueError` subclasses from the
    engine are **not** handled here — map those in your routes if needed.

    Idempotent for duplicate registration: re-calling on the same app replaces handlers
    for the same exception types (Starlette/FastAPI behavior).
    """
    from fastapi.responses import JSONResponse

    @app.exception_handler(ColumnLengthMismatchError)
    async def _column_length_mismatch(
        request: Request, exc: ColumnLengthMismatchError
    ) -> JSONResponse:
        return JSONResponse(
            status_code=400,
            content={"detail": str(exc)},
        )

    @app.exception_handler(MissingRustExtensionError)
    async def _missing_rust(
        request: Request, exc: MissingRustExtensionError
    ) -> JSONResponse:
        return JSONResponse(
            status_code=503,
            content={"detail": str(exc)},
        )

    @app.exception_handler(ValidationError)
    async def _pydantic_validation(
        request: Request,
        exc: ValidationError,
    ) -> JSONResponse:
        return JSONResponse(
            status_code=422,
            content={"detail": exc.errors()},
        )