Skip to content

awrite_beanie

pydantable.awrite_beanie async

awrite_beanie(document_cls, data, *, ordered=True, chunk_size=None, options=None)

Insert documents via Beanie so validate_on_save/actions can run.

This is intentionally not a high-throughput bulk insert helper; if you want raw speed and are ok bypassing ODM hooks, use :func:pydantable.write_mongo / :func:pydantable.io.write_mongo instead.

Source code in python/pydantable/io/beanie.py
async def awrite_beanie(
    document_cls: type[Any],
    data: dict[str, list[Any]],
    *,
    ordered: bool = True,
    chunk_size: int | None = None,
    options: BeanieWriteOptions | None = None,
) -> int:
    """Insert documents via Beanie so validate_on_save/actions can run.

    This is intentionally **not** a high-throughput bulk insert helper; if you want raw
    speed and are ok bypassing ODM hooks, use :func:`pydantable.write_mongo` /
    :func:`pydantable.io.write_mongo` instead.
    """
    _require_beanie()
    if not data:
        return 0

    # Local import to avoid a cycle.
    from pydantable.io.batches import ensure_rectangular
    from pydantable.io.sql import _write_chunk_size

    ensure_rectangular(data)
    n = len(next(iter(data.values())))
    if n == 0:
        return 0

    keys = list(data.keys())
    chunk_n = _write_chunk_size(chunk_size)
    total = 0
    opt = options or BeanieWriteOptions()

    # ODM-aware, per-document inserts. `ordered` is best-effort here: we stop on first
    # error when ordered=True; continue when ordered=False.
    for start in range(0, n, chunk_n):
        end = min(start + chunk_n, n)
        for i in range(start, end):
            row = {k: data[k][i] for k in keys}
            try:
                doc = document_cls(**row)
                insert = getattr(doc, "insert", None)
                if not callable(insert):
                    raise TypeError("Beanie document instances must support .insert().")
                kw: dict[str, Any] = {}
                if opt.skip_actions is not None:
                    kw["skip_actions"] = list(opt.skip_actions)
                if opt.link_rule is not None:
                    kw["link_rule"] = opt.link_rule
                # validate_on_save is a document Settings flag; keep this knob for
                # future expansion, but do not attempt to override Settings today.
                _ = opt.validate_on_save
                await insert(**kw)
                total += 1
            except Exception:
                if ordered:
                    raise
                continue
    return total