Skip to content

aiter_beanie

pydantable.aiter_beanie async

aiter_beanie(document_or_query, *, criteria=None, batch_size=1000, projection_model=None, fields=None, fetch_links=False, nesting_depth=None, nesting_depths_per_field=None, flatten=True, id_column='id')

Yield rectangular column-dict batches from Beanie results.

Source code in python/pydantable/io/beanie.py
async def aiter_beanie(
    document_or_query: Any,
    *,
    criteria: Any | None = None,
    batch_size: int = 1000,
    projection_model: type[Any] | None = None,
    fields: Sequence[str] | None = None,
    fetch_links: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: Mapping[str, int] | None = None,
    flatten: bool = True,
    id_column: Literal["id", "_id"] = "id",
) -> AsyncIterator[dict[str, list[Any]]]:
    """Yield rectangular column-dict batches from Beanie results."""
    _require_beanie()
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")

    # Build query first, then iterate.
    query = document_or_query
    doc_cls: type[Any] | None = None
    if isinstance(document_or_query, type) and hasattr(document_or_query, "find"):
        doc_cls = document_or_query
        if criteria is None:
            find_all = getattr(doc_cls, "find_all", None)
            if callable(find_all):
                query = find_all(fetch_links=fetch_links)
            else:
                query = doc_cls.find({}, fetch_links=fetch_links)
        else:
            query = doc_cls.find(criteria, fetch_links=fetch_links)

        if nesting_depth is not None:
            nd = getattr(query, "nesting_depth", None)
            if callable(nd):
                query = nd(nesting_depth)
        if nesting_depths_per_field is not None:
            ndpf = getattr(query, "nesting_depths_per_field", None)
            if callable(ndpf):
                query = ndpf(dict(nesting_depths_per_field))
    else:
        if criteria is not None:
            raise TypeError(
                "criteria= is only supported when document_or_query is a Beanie "
                "Document class. If you pass a query object, apply criteria to the "
                "query before calling aiter_beanie()."
            )

    if projection_model is not None and fields is not None:
        raise TypeError("Pass only one of projection_model= or fields=, not both.")
    if fields is not None:
        projection_model = _projection_model_for_fields(
            doc_cls or type("Doc", (), {}), fields
        )
    if projection_model is not None:
        project = getattr(query, "project", None)
        if not callable(project):
            raise TypeError("Query object does not support Beanie-style .project(...).")
        query = project(projection_model)

    batch: list[dict[str, Any]] = []
    async for obj in query:
        row = _pydantic_model_to_dict(obj)
        row = _normalize_id_keys(row, id_column=id_column)
        if flatten:
            row = _flatten_dict(row)
        batch.append(row)
        if len(batch) >= batch_size:
            fixed_fields = list(fields) if fields is not None else None
            yield _rows_to_column_dict(batch, fields=fixed_fields)
            batch = []
    if batch:
        fixed_fields = list(fields) if fields is not None else None
        yield _rows_to_column_dict(batch, fields=fixed_fields)