Skip to content

afetch_beanie

pydantable.afetch_beanie async

afetch_beanie(document_or_query, *, criteria=None, projection_model=None, fields=None, fetch_links=False, nesting_depth=None, nesting_depths_per_field=None, flatten=True, id_column='id')

Fetch Beanie documents (or a Beanie query) into dict[str, list].

  • document_or_query: a Beanie Document class (preferred) or a query object returned from Document.find(...).
  • fields: convenience projection (builds a temporary projection model).
  • projection_model: passed to Beanie .project(...).
  • fetch_links / nesting_depth*: forwarded to Beanie find call (relations).
  • flatten: if True, nested objects are flattened into dot-path keys.
  • id_column: normalize Mongo id to id (default) or _id.
Source code in python/pydantable/io/beanie.py
async def afetch_beanie(
    document_or_query: Any,
    *,
    criteria: Any | None = None,
    projection_model: type[Any] | None = None,
    fields: Sequence[str] | None = None,
    fetch_links: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: Mapping[str, int] | None = None,
    flatten: bool = True,
    id_column: Literal["id", "_id"] = "id",
) -> dict[str, list[Any]]:
    """Fetch Beanie documents (or a Beanie query) into ``dict[str, list]``.

    - **document_or_query**: a Beanie ``Document`` class (preferred) or a query object
      returned from ``Document.find(...)``.
    - **fields**: convenience projection (builds a temporary projection model).
    - **projection_model**: passed to Beanie ``.project(...)``.
    - **fetch_links / nesting_depth***: forwarded to Beanie find call (relations).
    - **flatten**: if True, nested objects are flattened into dot-path keys.
    - **id_column**: normalize Mongo id to ``id`` (default) or ``_id``.
    """
    _require_beanie()

    query: Any
    doc_cls: type[Any] | None = None
    if isinstance(document_or_query, type) and hasattr(document_or_query, "find"):
        doc_cls = document_or_query
        if criteria is None:
            # Prefer find_all when present; else find({}).
            find_all = getattr(doc_cls, "find_all", None)
            if callable(find_all):
                query = find_all(fetch_links=fetch_links)
            else:
                query = doc_cls.find({}, fetch_links=fetch_links)
        else:
            query = doc_cls.find(criteria, fetch_links=fetch_links)

        # Beanie supports nesting depth controls as kwargs on `find(...)` in current docs.
        # Keep method-based calls best-effort for older/newer versions.
        if nesting_depth is not None:
            nd = getattr(query, "nesting_depth", None)
            if callable(nd):
                query = nd(nesting_depth)
        if nesting_depths_per_field is not None:
            ndpf = getattr(query, "nesting_depths_per_field", None)
            if callable(ndpf):
                query = ndpf(dict(nesting_depths_per_field))
    else:
        if criteria is not None:
            raise TypeError(
                "criteria= is only supported when document_or_query is a Beanie "
                "Document class. If you pass a query object, apply criteria to the "
                "query before calling afetch_beanie()."
            )
        query = document_or_query

    if projection_model is not None and fields is not None:
        raise TypeError("Pass only one of projection_model= or fields=, not both.")
    if fields is not None:
        projection_model = _projection_model_for_fields(
            doc_cls or type("Doc", (), {}), fields
        )
    if projection_model is not None:
        project = getattr(query, "project", None)
        if not callable(project):
            raise TypeError("Query object does not support Beanie-style .project(...).")
        query = project(projection_model)

    items = await _query_to_list(query)

    norm_rows: list[dict[str, Any]] = []
    for obj in items:
        row = _pydantic_model_to_dict(obj)
        row = _normalize_id_keys(row, id_column=id_column)
        if flatten:
            row = _flatten_dict(row)
        norm_rows.append(row)

    # Column order: preserve `fields` when supplied; else sorted union.
    fixed_fields = list(fields) if fields is not None else None
    return _rows_to_column_dict(norm_rows, fields=fixed_fields)