async def aiter_beanie(
document_or_query: Any,
*,
criteria: Any | None = None,
batch_size: int = 1000,
projection_model: type[Any] | None = None,
fields: Sequence[str] | None = None,
fetch_links: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: Mapping[str, int] | None = None,
flatten: bool = True,
id_column: Literal["id", "_id"] = "id",
) -> AsyncIterator[dict[str, list[Any]]]:
"""Yield rectangular column-dict batches from Beanie results."""
_require_beanie()
if batch_size <= 0:
raise ValueError("batch_size must be a positive integer")
# Build query first, then iterate.
query = document_or_query
doc_cls: type[Any] | None = None
if isinstance(document_or_query, type) and hasattr(document_or_query, "find"):
doc_cls = document_or_query
if criteria is None:
find_all = getattr(doc_cls, "find_all", None)
if callable(find_all):
query = find_all(fetch_links=fetch_links)
else:
query = doc_cls.find({}, fetch_links=fetch_links)
else:
query = doc_cls.find(criteria, fetch_links=fetch_links)
if nesting_depth is not None:
nd = getattr(query, "nesting_depth", None)
if callable(nd):
query = nd(nesting_depth)
if nesting_depths_per_field is not None:
ndpf = getattr(query, "nesting_depths_per_field", None)
if callable(ndpf):
query = ndpf(dict(nesting_depths_per_field))
else:
if criteria is not None:
raise TypeError(
"criteria= is only supported when document_or_query is a Beanie "
"Document class. If you pass a query object, apply criteria to the "
"query before calling aiter_beanie()."
)
if projection_model is not None and fields is not None:
raise TypeError("Pass only one of projection_model= or fields=, not both.")
if fields is not None:
projection_model = _projection_model_for_fields(
doc_cls or type("Doc", (), {}), fields
)
if projection_model is not None:
project = getattr(query, "project", None)
if not callable(project):
raise TypeError("Query object does not support Beanie-style .project(...).")
query = project(projection_model)
batch: list[dict[str, Any]] = []
async for obj in query:
row = _pydantic_model_to_dict(obj)
row = _normalize_id_keys(row, id_column=id_column)
if flatten:
row = _flatten_dict(row)
batch.append(row)
if len(batch) >= batch_size:
fixed_fields = list(fields) if fields is not None else None
yield _rows_to_column_dict(batch, fields=fixed_fields)
batch = []
if batch:
fixed_fields = list(fields) if fields is not None else None
yield _rows_to_column_dict(batch, fields=fixed_fields)