async def afetch_beanie(
document_or_query: Any,
*,
criteria: Any | None = None,
projection_model: type[Any] | None = None,
fields: Sequence[str] | None = None,
fetch_links: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: Mapping[str, int] | None = None,
flatten: bool = True,
id_column: Literal["id", "_id"] = "id",
) -> dict[str, list[Any]]:
"""Fetch Beanie documents (or a Beanie query) into ``dict[str, list]``.
- **document_or_query**: a Beanie ``Document`` class (preferred) or a query object
returned from ``Document.find(...)``.
- **fields**: convenience projection (builds a temporary projection model).
- **projection_model**: passed to Beanie ``.project(...)``.
- **fetch_links / nesting_depth***: forwarded to Beanie find call (relations).
- **flatten**: if True, nested objects are flattened into dot-path keys.
- **id_column**: normalize Mongo id to ``id`` (default) or ``_id``.
"""
_require_beanie()
query: Any
doc_cls: type[Any] | None = None
if isinstance(document_or_query, type) and hasattr(document_or_query, "find"):
doc_cls = document_or_query
if criteria is None:
# Prefer find_all when present; else find({}).
find_all = getattr(doc_cls, "find_all", None)
if callable(find_all):
query = find_all(fetch_links=fetch_links)
else:
query = doc_cls.find({}, fetch_links=fetch_links)
else:
query = doc_cls.find(criteria, fetch_links=fetch_links)
# Beanie supports nesting depth controls as kwargs on `find(...)` in current docs.
# Keep method-based calls best-effort for older/newer versions.
if nesting_depth is not None:
nd = getattr(query, "nesting_depth", None)
if callable(nd):
query = nd(nesting_depth)
if nesting_depths_per_field is not None:
ndpf = getattr(query, "nesting_depths_per_field", None)
if callable(ndpf):
query = ndpf(dict(nesting_depths_per_field))
else:
if criteria is not None:
raise TypeError(
"criteria= is only supported when document_or_query is a Beanie "
"Document class. If you pass a query object, apply criteria to the "
"query before calling afetch_beanie()."
)
query = document_or_query
if projection_model is not None and fields is not None:
raise TypeError("Pass only one of projection_model= or fields=, not both.")
if fields is not None:
projection_model = _projection_model_for_fields(
doc_cls or type("Doc", (), {}), fields
)
if projection_model is not None:
project = getattr(query, "project", None)
if not callable(project):
raise TypeError("Query object does not support Beanie-style .project(...).")
query = project(projection_model)
items = await _query_to_list(query)
norm_rows: list[dict[str, Any]] = []
for obj in items:
row = _pydantic_model_to_dict(obj)
row = _normalize_id_keys(row, id_column=id_column)
if flatten:
row = _flatten_dict(row)
norm_rows.append(row)
# Column order: preserve `fields` when supplied; else sorted union.
fixed_fields = list(fields) if fields is not None else None
return _rows_to_column_dict(norm_rows, fields=fixed_fields)