Skip to content

pydantable.io

Eager and lazy I/O helpers. Most symbols are re-exported from the top-level pydantable package.

pydantable.io

Unified data I/O: lazy read_* roots, materialize_* column dicts, and export_*.

  • read_* / aread_* return :class:pydantable_native._core.ScanFileRoot for lazy Polars scans (no full Python column lists).
  • materialize_* (and fetch_sql_raw / fetch_sql / fetch_*_url / fetch_mongo) return dict[str, list].
  • export_* / aexport_* write column dicts to files; write_mongo inserts into a PyMongo collection. afetch_mongo / aiter_mongo / awrite_mongo use the native PyMongo async API when given an async collection; otherwise asyncio.to_thread (or an optional executor) for sync pymongo.collection.Collection.

BeanieWriteOptions dataclass

Options that control ODM-aware write behavior.

Source code in python/pydantable/io/beanie.py
@dataclass(frozen=True, slots=True)
class BeanieWriteOptions:
    """Options that control ODM-aware write behavior."""

    validate_on_save: bool | None = None
    skip_actions: Sequence[Any] | None = None
    link_rule: Any | None = None

arrow_table_to_column_dict

arrow_table_to_column_dict(table)

Convert a PyArrow Table to dict[str, list] (copies into Python lists).

Source code in python/pydantable/io/arrow.py
def arrow_table_to_column_dict(table: Any) -> dict[str, list[Any]]:
    """Convert a PyArrow ``Table`` to ``dict[str, list]`` (copies into Python lists)."""
    pa = _require_pyarrow()
    if not isinstance(table, pa.Table):
        raise TypeError(f"expected pyarrow.Table, got {type(table)!r}")
    return {name: _column_to_pylist(table.column(name)) for name in table.column_names}

record_batch_to_column_dict

record_batch_to_column_dict(batch)

Convert a PyArrow RecordBatch to dict[str, list].

Source code in python/pydantable/io/arrow.py
def record_batch_to_column_dict(batch: Any) -> dict[str, list[Any]]:
    """Convert a PyArrow ``RecordBatch`` to ``dict[str, list]``."""
    pa = _require_pyarrow()
    if not isinstance(batch, pa.RecordBatch):
        raise TypeError(f"expected pyarrow.RecordBatch, got {type(batch)!r}")
    return {
        batch.schema.field(i).name: _column_to_pylist(batch.column(i))
        for i in range(batch.num_columns)
    }

iter_chain_batches

iter_chain_batches(paths, iter_one)

Yield batches from several files by chaining per-file iterators.

iter_one is typically lambda p: iter_parquet(p) (or another iter_*). This does not merge schemas across files; callers must ensure compatible layouts.

Source code in python/pydantable/io/batches.py
def iter_chain_batches(
    paths: Iterable[Path | str],
    iter_one: Callable[[Path], Iterator[dict[str, list[Any]]]],
) -> Iterator[dict[str, list[Any]]]:
    """
    Yield batches from several files by chaining per-file iterators.

    ``iter_one`` is typically ``lambda p: iter_parquet(p)`` (or another ``iter_*``).
    This does not merge schemas across files; callers must ensure compatible layouts.
    """
    for p in paths:
        yield from iter_one(Path(p))

afetch_beanie async

afetch_beanie(document_or_query, *, criteria=None, projection_model=None, fields=None, fetch_links=False, nesting_depth=None, nesting_depths_per_field=None, flatten=True, id_column='id')

Fetch Beanie documents (or a Beanie query) into dict[str, list].

  • document_or_query: a Beanie Document class (preferred) or a query object returned from Document.find(...).
  • fields: convenience projection (builds a temporary projection model).
  • projection_model: passed to Beanie .project(...).
  • fetch_links / nesting_depth*: forwarded to Beanie find call (relations).
  • flatten: if True, nested objects are flattened into dot-path keys.
  • id_column: normalize Mongo id to id (default) or _id.
Source code in python/pydantable/io/beanie.py
async def afetch_beanie(
    document_or_query: Any,
    *,
    criteria: Any | None = None,
    projection_model: type[Any] | None = None,
    fields: Sequence[str] | None = None,
    fetch_links: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: Mapping[str, int] | None = None,
    flatten: bool = True,
    id_column: Literal["id", "_id"] = "id",
) -> dict[str, list[Any]]:
    """Fetch Beanie documents (or a Beanie query) into ``dict[str, list]``.

    - **document_or_query**: a Beanie ``Document`` class (preferred) or a query object
      returned from ``Document.find(...)``.
    - **fields**: convenience projection (builds a temporary projection model).
    - **projection_model**: passed to Beanie ``.project(...)``.
    - **fetch_links / nesting_depth***: forwarded to Beanie find call (relations).
    - **flatten**: if True, nested objects are flattened into dot-path keys.
    - **id_column**: normalize Mongo id to ``id`` (default) or ``_id``.
    """
    _require_beanie()

    query: Any
    doc_cls: type[Any] | None = None
    if isinstance(document_or_query, type) and hasattr(document_or_query, "find"):
        doc_cls = document_or_query
        if criteria is None:
            # Prefer find_all when present; else find({}).
            find_all = getattr(doc_cls, "find_all", None)
            if callable(find_all):
                query = find_all(fetch_links=fetch_links)
            else:
                query = doc_cls.find({}, fetch_links=fetch_links)
        else:
            query = doc_cls.find(criteria, fetch_links=fetch_links)

        # Beanie supports nesting depth controls as kwargs on `find(...)` in current docs.
        # Keep method-based calls best-effort for older/newer versions.
        if nesting_depth is not None:
            nd = getattr(query, "nesting_depth", None)
            if callable(nd):
                query = nd(nesting_depth)
        if nesting_depths_per_field is not None:
            ndpf = getattr(query, "nesting_depths_per_field", None)
            if callable(ndpf):
                query = ndpf(dict(nesting_depths_per_field))
    else:
        if criteria is not None:
            raise TypeError(
                "criteria= is only supported when document_or_query is a Beanie "
                "Document class. If you pass a query object, apply criteria to the "
                "query before calling afetch_beanie()."
            )
        query = document_or_query

    if projection_model is not None and fields is not None:
        raise TypeError("Pass only one of projection_model= or fields=, not both.")
    if fields is not None:
        projection_model = _projection_model_for_fields(
            doc_cls or type("Doc", (), {}), fields
        )
    if projection_model is not None:
        project = getattr(query, "project", None)
        if not callable(project):
            raise TypeError("Query object does not support Beanie-style .project(...).")
        query = project(projection_model)

    items = await _query_to_list(query)

    norm_rows: list[dict[str, Any]] = []
    for obj in items:
        row = _pydantic_model_to_dict(obj)
        row = _normalize_id_keys(row, id_column=id_column)
        if flatten:
            row = _flatten_dict(row)
        norm_rows.append(row)

    # Column order: preserve `fields` when supplied; else sorted union.
    fixed_fields = list(fields) if fields is not None else None
    return _rows_to_column_dict(norm_rows, fields=fixed_fields)

aiter_beanie async

aiter_beanie(document_or_query, *, criteria=None, batch_size=1000, projection_model=None, fields=None, fetch_links=False, nesting_depth=None, nesting_depths_per_field=None, flatten=True, id_column='id')

Yield rectangular column-dict batches from Beanie results.

Source code in python/pydantable/io/beanie.py
async def aiter_beanie(
    document_or_query: Any,
    *,
    criteria: Any | None = None,
    batch_size: int = 1000,
    projection_model: type[Any] | None = None,
    fields: Sequence[str] | None = None,
    fetch_links: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: Mapping[str, int] | None = None,
    flatten: bool = True,
    id_column: Literal["id", "_id"] = "id",
) -> AsyncIterator[dict[str, list[Any]]]:
    """Yield rectangular column-dict batches from Beanie results."""
    _require_beanie()
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")

    # Build query first, then iterate.
    query = document_or_query
    doc_cls: type[Any] | None = None
    if isinstance(document_or_query, type) and hasattr(document_or_query, "find"):
        doc_cls = document_or_query
        if criteria is None:
            find_all = getattr(doc_cls, "find_all", None)
            if callable(find_all):
                query = find_all(fetch_links=fetch_links)
            else:
                query = doc_cls.find({}, fetch_links=fetch_links)
        else:
            query = doc_cls.find(criteria, fetch_links=fetch_links)

        if nesting_depth is not None:
            nd = getattr(query, "nesting_depth", None)
            if callable(nd):
                query = nd(nesting_depth)
        if nesting_depths_per_field is not None:
            ndpf = getattr(query, "nesting_depths_per_field", None)
            if callable(ndpf):
                query = ndpf(dict(nesting_depths_per_field))
    else:
        if criteria is not None:
            raise TypeError(
                "criteria= is only supported when document_or_query is a Beanie "
                "Document class. If you pass a query object, apply criteria to the "
                "query before calling aiter_beanie()."
            )

    if projection_model is not None and fields is not None:
        raise TypeError("Pass only one of projection_model= or fields=, not both.")
    if fields is not None:
        projection_model = _projection_model_for_fields(
            doc_cls or type("Doc", (), {}), fields
        )
    if projection_model is not None:
        project = getattr(query, "project", None)
        if not callable(project):
            raise TypeError("Query object does not support Beanie-style .project(...).")
        query = project(projection_model)

    batch: list[dict[str, Any]] = []
    async for obj in query:
        row = _pydantic_model_to_dict(obj)
        row = _normalize_id_keys(row, id_column=id_column)
        if flatten:
            row = _flatten_dict(row)
        batch.append(row)
        if len(batch) >= batch_size:
            fixed_fields = list(fields) if fields is not None else None
            yield _rows_to_column_dict(batch, fields=fixed_fields)
            batch = []
    if batch:
        fixed_fields = list(fields) if fields is not None else None
        yield _rows_to_column_dict(batch, fields=fixed_fields)

awrite_beanie async

awrite_beanie(document_cls, data, *, ordered=True, chunk_size=None, options=None)

Insert documents via Beanie so validate_on_save/actions can run.

This is intentionally not a high-throughput bulk insert helper; if you want raw speed and are ok bypassing ODM hooks, use :func:pydantable.write_mongo / :func:pydantable.io.write_mongo instead.

Source code in python/pydantable/io/beanie.py
async def awrite_beanie(
    document_cls: type[Any],
    data: dict[str, list[Any]],
    *,
    ordered: bool = True,
    chunk_size: int | None = None,
    options: BeanieWriteOptions | None = None,
) -> int:
    """Insert documents via Beanie so validate_on_save/actions can run.

    This is intentionally **not** a high-throughput bulk insert helper; if you want raw
    speed and are ok bypassing ODM hooks, use :func:`pydantable.write_mongo` /
    :func:`pydantable.io.write_mongo` instead.
    """
    _require_beanie()
    if not data:
        return 0

    # Local import to avoid a cycle.
    from pydantable.io.batches import ensure_rectangular
    from pydantable.io.sql import _write_chunk_size

    ensure_rectangular(data)
    n = len(next(iter(data.values())))
    if n == 0:
        return 0

    keys = list(data.keys())
    chunk_n = _write_chunk_size(chunk_size)
    total = 0
    opt = options or BeanieWriteOptions()

    # ODM-aware, per-document inserts. `ordered` is best-effort here: we stop on first
    # error when ordered=True; continue when ordered=False.
    for start in range(0, n, chunk_n):
        end = min(start + chunk_n, n)
        for i in range(start, end):
            row = {k: data[k][i] for k in keys}
            try:
                doc = document_cls(**row)
                insert = getattr(doc, "insert", None)
                if not callable(insert):
                    raise TypeError("Beanie document instances must support .insert().")
                kw: dict[str, Any] = {}
                if opt.skip_actions is not None:
                    kw["skip_actions"] = list(opt.skip_actions)
                if opt.link_rule is not None:
                    kw["link_rule"] = opt.link_rule
                # validate_on_save is a document Settings flag; keep this knob for
                # future expansion, but do not attempt to override Settings today.
                _ = opt.validate_on_save
                await insert(**kw)
                total += 1
            except Exception:
                if ordered:
                    raise
                continue
    return total

iter_avro

iter_avro(path, *, batch_size=65536, experimental=True)

Yield Avro batches via PyArrow (falls back to full read if needed).

Source code in python/pydantable/io/extras.py
def iter_avro(
    path: str | Path,
    *,
    batch_size: int = 65_536,
    experimental: bool = True,
) -> Iterator[dict[str, list[Any]]]:
    """Yield Avro batches via PyArrow (falls back to full read if needed)."""
    _require_experimental(experimental, "Avro ingestion")
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        import pyarrow as pa  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "iter_avro requires pyarrow (pip install 'pydantable[arrow]')."
        ) from e
    try:
        reader = pa.avro.open_file(str(path))  # type: ignore[attr-defined]
    except Exception:
        # PyArrow Avro API varies by version/build; eager read is the fallback.
        yield read_avro(path, experimental=True)
        return
    with reader:
        for rb in reader.iter_batches(batch_size=batch_size):
            d = rb.to_pydict()
            out = {k: list(v) for k, v in d.items()}
            ensure_rectangular(out)
            yield out

iter_bigquery

iter_bigquery(query, *, project=None, batch_size=65536, experimental=True, **kwargs)

Yield BigQuery results in Arrow-backed batches when available.

Source code in python/pydantable/io/extras.py
def iter_bigquery(
    query: str,
    *,
    project: str | None = None,
    batch_size: int = 65_536,
    experimental: bool = True,
    **kwargs: Any,
) -> Iterator[dict[str, list[Any]]]:
    """Yield BigQuery results in Arrow-backed batches when available."""
    _require_experimental(experimental, "BigQuery ingestion")
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        from google.cloud import bigquery  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "iter_bigquery requires google-cloud-bigquery (pip install 'pydantable[bq]')."
        ) from e
    client = bigquery.Client(project=project, **kwargs)
    rows = client.query(query).result(page_size=batch_size)
    # Prefer Arrow streaming if supported by the client version.
    if hasattr(rows, "to_arrow_iterable"):
        for rb in rows.to_arrow_iterable():  # type: ignore[attr-defined]
            d = rb.to_pydict()
            out = {k: list(v) for k, v in d.items()}
            ensure_rectangular(out)
            yield out
        return
    # Fallback: materialize Arrow table then chunk.
    yield read_bigquery(query, project=project, experimental=True, **kwargs)

iter_delta

iter_delta(path, *, batch_size=65536, experimental=True)

Yield Delta (Parquet dataset) batches via PyArrow dataset scanner.

Source code in python/pydantable/io/extras.py
def iter_delta(
    path: str | Path,
    *,
    batch_size: int = 65_536,
    experimental: bool = True,
) -> Iterator[dict[str, list[Any]]]:
    """Yield Delta (Parquet dataset) batches via PyArrow dataset scanner."""
    _require_experimental(experimental, "Delta Lake ingestion")
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        import pyarrow.dataset as ds  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "iter_delta requires pyarrow.dataset (pip install 'pydantable[arrow]')."
        ) from e
    dset = ds.dataset(path, format="parquet")
    for record_batch in dset.to_batches(batch_size=batch_size):
        d = record_batch.to_pydict()
        out = {k: list(v) for k, v in d.items()}
        ensure_rectangular(out)
        yield out

iter_excel

iter_excel(path, *, sheet_name=0, batch_size=65536, experimental=True)

Yield Excel rows as dict[str, list] batches (openpyxl read-only).

Source code in python/pydantable/io/extras.py
def iter_excel(
    path: str | Path,
    *,
    sheet_name: str | int = 0,
    batch_size: int = 65_536,
    experimental: bool = True,
) -> Iterator[dict[str, list[Any]]]:
    """Yield Excel rows as ``dict[str, list]`` batches (openpyxl read-only)."""
    _require_experimental(experimental, "Excel ingestion")
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        import openpyxl  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "iter_excel requires openpyxl (pip install 'pydantable[excel]')."
        ) from e
    wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
    ws = wb.worksheets[sheet_name] if isinstance(sheet_name, int) else wb[sheet_name]
    try:
        rows_iter = ws.iter_rows(values_only=True)
        try:
            first = next(rows_iter)
        except StopIteration:
            return
        header = [str(h) if h is not None else f"col{i}" for i, h in enumerate(first)]
        out: dict[str, list[Any]] = {h: [] for h in header}
        n = 0
        for row in rows_iter:
            for i, h in enumerate(header):
                out[h].append(row[i] if i < len(row) else None)
            n += 1
            if n >= batch_size:
                ensure_rectangular(out)
                yield out
                out = {h: [] for h in header}
                n = 0
        if n:
            ensure_rectangular(out)
            yield out
    finally:
        wb.close()

iter_kafka_json

iter_kafka_json(topic, *, bootstrap_servers, max_messages=None, batch_size=1000, experimental=True, **consumer_config)

Stream JSON payloads from Kafka, yielding batches as dict[str, list].

Stops after max_messages if provided; otherwise runs until poll returns empty.

Source code in python/pydantable/io/extras.py
def iter_kafka_json(
    topic: str,
    *,
    bootstrap_servers: str,
    max_messages: int | None = None,
    batch_size: int = 1000,
    experimental: bool = True,
    **consumer_config: Any,
) -> Iterator[dict[str, list[Any]]]:
    """
    Stream JSON payloads from Kafka, yielding batches as ``dict[str, list]``.

    Stops after `max_messages` if provided; otherwise runs until poll returns empty.
    """
    _require_experimental(experimental, "Kafka ingestion")
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        from kafka import KafkaConsumer  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "iter_kafka_json requires kafka-python (pip install 'pydantable[kafka]')."
        ) from e
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        value_deserializer=lambda b: json.loads(b.decode("utf-8")),
        **consumer_config,
    )
    seen = 0
    rows: list[dict[str, Any]] = []
    try:
        while True:
            if max_messages is not None and seen >= max_messages:
                break
            pack = consumer.poll(timeout_ms=2000)
            if not pack:
                break
            for _tp, messages in pack.items():
                for msg in messages:
                    val = msg.value if isinstance(msg.value, dict) else {}
                    row = {
                        "key": msg.key.decode("utf-8") if msg.key else None,
                        "partition": msg.partition,
                        "offset": msg.offset,
                        **val,
                    }
                    rows.append(row)
                    seen += 1
                    if len(rows) >= batch_size:
                        keys = sorted({k for r in rows for k in r})
                        out = {k: [r.get(k) for r in rows] for k in keys}
                        ensure_rectangular(out)
                        yield out
                        rows = []
                    if max_messages is not None and seen >= max_messages:
                        break
                if max_messages is not None and seen >= max_messages:
                    break
    finally:
        consumer.close()
    if rows:
        keys = sorted({k for r in rows for k in r})
        out = {k: [r.get(k) for r in rows] for k in keys}
        ensure_rectangular(out)
        yield out

iter_orc

iter_orc(path, *, batch_size=65536, experimental=True)

Yield ORC batches via PyArrow.

Source code in python/pydantable/io/extras.py
def iter_orc(
    path: str | Path,
    *,
    batch_size: int = 65_536,
    experimental: bool = True,
) -> Iterator[dict[str, list[Any]]]:
    """Yield ORC batches via PyArrow."""
    _require_experimental(experimental, "ORC ingestion")
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        import pyarrow.orc as orc  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "iter_orc requires pyarrow.orc (pip install 'pydantable[arrow]')."
        ) from e
    with open(path, "rb") as f:
        of = orc.ORCFile(f)
        for rb in of.iter_batches(batch_size=batch_size):
            d = rb.to_pydict()
            out = {k: list(v) for k, v in d.items()}
            ensure_rectangular(out)
            yield out

iter_snowflake

iter_snowflake(sql, *, batch_size=65536, experimental=True, **connect_kwargs)

Yield Snowflake query results in batches (cursor.fetchmany).

Source code in python/pydantable/io/extras.py
def iter_snowflake(
    sql: str,
    *,
    batch_size: int = 65_536,
    experimental: bool = True,
    **connect_kwargs: Any,
) -> Iterator[dict[str, list[Any]]]:
    """Yield Snowflake query results in batches (cursor.fetchmany)."""
    _require_experimental(experimental, "Snowflake ingestion")
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        import snowflake.connector  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "iter_snowflake requires snowflake-connector-python "
            "(pip install 'pydantable[snowflake]')."
        ) from e
    conn = snowflake.connector.connect(**connect_kwargs)
    try:
        cur = conn.cursor()
        cur.execute(sql)
        cols = [c[0] for c in cur.description or []]
        if not cols:
            return
        while True:
            chunk = cur.fetchmany(batch_size)
            if not chunk:
                break
            out = {cols[i]: [row[i] for row in chunk] for i in range(len(cols))}
            ensure_rectangular(out)
            yield out
    finally:
        conn.close()

read_bigquery

read_bigquery(query, *, project=None, experimental=True, **kwargs)

Run a BigQuery SQL string via google-cloud-bigquerydict[str, list].

Source code in python/pydantable/io/extras.py
def read_bigquery(
    query: str,
    *,
    project: str | None = None,
    experimental: bool = True,
    **kwargs: Any,
) -> dict[str, list[Any]]:
    """Run a BigQuery SQL string via ``google-cloud-bigquery`` → ``dict[str, list]``."""
    _require_experimental(experimental, "BigQuery ingestion")
    try:
        from google.cloud import bigquery  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "read_bigquery requires google-cloud-bigquery (pip install 'pydantable[bq]')."
        ) from e
    from .arrow import arrow_table_to_column_dict

    client = bigquery.Client(project=project, **kwargs)
    rows = client.query(query).result()
    table = rows.to_arrow()
    return arrow_table_to_column_dict(table)

read_csv_stdin

read_csv_stdin(stream=None, *, engine='auto')

Read CSV from stdin (or stream) via a temporary file + :func:materialize_csv.

Source code in python/pydantable/io/extras.py
def read_csv_stdin(
    stream: TextIO | None = None,
    *,
    engine: str = "auto",
) -> dict[str, list[Any]]:
    """Read CSV from ``stdin`` (or ``stream``) via a temporary file + :func:`materialize_csv`."""
    import tempfile

    from . import materialize_csv

    fh = stream or sys.stdin
    data = fh.read()
    raw = data.encode("utf-8") if isinstance(data, str) else data
    fd, name = tempfile.mkstemp(suffix=".csv")
    os.close(fd)
    path = Path(name)
    try:
        path.write_bytes(raw)
        return materialize_csv(str(path), engine=engine)
    finally:
        path.unlink(missing_ok=True)

read_delta

read_delta(path, *, experimental=True)

Read a Delta table directory via PyArrow dataset ([arrow] extra).

Source code in python/pydantable/io/extras.py
def read_delta(
    path: str | Path,
    *,
    experimental: bool = True,
) -> dict[str, list[Any]]:
    """Read a Delta table directory via PyArrow dataset (``[arrow]`` extra)."""
    _require_experimental(experimental, "Delta Lake ingestion")
    try:
        import pyarrow.dataset as ds  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "read_delta requires pyarrow with dataset support (pip install 'pydantable[arrow]')."
        ) from e
    from .arrow import arrow_table_to_column_dict

    dset = ds.dataset(path, format="parquet")
    table = dset.to_table()
    return arrow_table_to_column_dict(table)

read_excel

read_excel(path, *, sheet_name=0, experimental=True)

Load the first sheet (or sheet_name) from .xlsx via openpyxl → dict[str, list].

Source code in python/pydantable/io/extras.py
def read_excel(
    path: str | Path,
    *,
    sheet_name: str | int = 0,
    experimental: bool = True,
) -> dict[str, list[Any]]:
    """Load the first sheet (or ``sheet_name``) from ``.xlsx`` via openpyxl → ``dict[str, list]``."""
    _require_experimental(experimental, "Excel ingestion")
    try:
        import openpyxl  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "read_excel requires openpyxl (pip install 'pydantable[excel]')."
        ) from e
    wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
    ws = wb.worksheets[sheet_name] if isinstance(sheet_name, int) else wb[sheet_name]
    rows = list(ws.iter_rows(values_only=True))
    wb.close()
    if not rows:
        return {}
    header = [str(h) if h is not None else f"col{i}" for i, h in enumerate(rows[0])]
    out: dict[str, list[Any]] = {h: [] for h in header}
    for row in rows[1:]:
        for i, h in enumerate(header):
            out[h].append(row[i] if i < len(row) else None)
    return out

read_kafka_json_batch

read_kafka_json_batch(topic, *, bootstrap_servers, max_messages=100, experimental=True, **consumer_config)

Poll JSON payloads from topic into columns key, value, partition, offset.

At-least-once delivery only; values must be JSON objects whose keys become columns when unioning (best-effort flatten).

Source code in python/pydantable/io/extras.py
def read_kafka_json_batch(
    topic: str,
    *,
    bootstrap_servers: str,
    max_messages: int = 100,
    experimental: bool = True,
    **consumer_config: Any,
) -> dict[str, list[Any]]:
    """
    Poll JSON payloads from ``topic`` into columns ``key``, ``value``, ``partition``, ``offset``.

    **At-least-once** delivery only; values must be JSON objects whose keys become columns when
    unioning (best-effort flatten).
    """
    _require_experimental(experimental, "Kafka ingestion")
    try:
        from kafka import KafkaConsumer  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "read_kafka_json_batch requires kafka-python (pip install 'pydantable[kafka]')."
        ) from e
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        value_deserializer=lambda b: json.loads(b.decode("utf-8")),
        **consumer_config,
    )
    rows: list[dict[str, Any]] = []
    try:
        for _ in range(max_messages):
            pack = consumer.poll(timeout_ms=2000)
            if not pack:
                break
            for _tp, messages in pack.items():
                for msg in messages:
                    val = msg.value if isinstance(msg.value, dict) else {}
                    row = {
                        "key": msg.key.decode("utf-8") if msg.key else None,
                        "partition": msg.partition,
                        "offset": msg.offset,
                        **val,
                    }
                    rows.append(row)
                    if len(rows) >= max_messages:
                        break
                if len(rows) >= max_messages:
                    break
            if len(rows) >= max_messages:
                break
    finally:
        consumer.close()
    if not rows:
        return {}
    keys = sorted({k for r in rows for k in r})
    return {k: [r.get(k) for r in rows] for k in keys}

read_snowflake

read_snowflake(sql, *, experimental=True, **connect_kwargs)

Execute sql on Snowflake via snowflake-connector-python (experimental).

Source code in python/pydantable/io/extras.py
def read_snowflake(
    sql: str,
    *,
    experimental: bool = True,
    **connect_kwargs: Any,
) -> dict[str, list[Any]]:
    """Execute ``sql`` on Snowflake via ``snowflake-connector-python`` (experimental)."""
    _require_experimental(experimental, "Snowflake ingestion")
    try:
        import snowflake.connector  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "read_snowflake requires snowflake-connector-python "
            "(pip install 'pydantable[snowflake]')."
        ) from e
    conn = snowflake.connector.connect(**connect_kwargs)
    try:
        cur = conn.cursor()
        cur.execute(sql)
        cols = [c[0] for c in cur.description or []]
        fetched = cur.fetchall()
        if not cols:
            return {}
        return {cols[i]: [row[i] for row in fetched] for i in range(len(cols))}
    finally:
        conn.close()

write_csv_stdout

write_csv_stdout(data, stream=None, *, engine='auto')

Write data as CSV to stdout (or stream) using :func:export_csv to a temp file.

Source code in python/pydantable/io/extras.py
def write_csv_stdout(
    data: dict[str, list[Any]],
    stream: TextIO | BinaryIO | None = None,
    *,
    engine: str = "auto",
) -> None:
    """Write ``data`` as CSV to ``stdout`` (or ``stream``) using :func:`export_csv` to a temp file."""
    import tempfile

    from . import export_csv

    fd, name = tempfile.mkstemp(suffix=".csv")
    os.close(fd)
    path = Path(name)
    try:
        export_csv(str(path), data, engine=engine)
        out = path.read_bytes()
        if stream is None:
            sys.stdout.buffer.write(out)
        elif hasattr(stream, "buffer"):
            stream.buffer.write(out)
        else:
            cast("TextIO", stream).write(out.decode("utf-8"))
    finally:
        path.unlink(missing_ok=True)

fetch_bytes

fetch_bytes(url, *, experimental=True, headers=None, timeout=60.0, max_bytes=None)

Download url and return raw bytes (stdlib urllib).

If max_bytes is set, read in chunks and raise ValueError if the body would exceed that size (partial data is not returned).

Source code in python/pydantable/io/http.py
def fetch_bytes(
    url: str,
    *,
    experimental: bool = True,
    headers: dict[str, str] | None = None,
    timeout: float = 60.0,
    max_bytes: int | None = None,
) -> bytes:
    """Download ``url`` and return raw bytes (stdlib ``urllib``).

    If ``max_bytes`` is set, read in chunks and raise ``ValueError`` if the body
    would exceed that size (partial data is not returned).
    """
    _require_experimental(experimental)
    scheme = urlparse(url).scheme.lower()
    if scheme not in ("http", "https"):
        raise ValueError(
            f"fetch_bytes only supports http(s) URLs, got scheme={scheme!r}"
        )
    req = urllib.request.Request(url, headers=dict(headers or {}))
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return _read_limited(resp, max_bytes)
    except urllib.error.URLError as e:
        raise OSError(f"failed to fetch {url!r}: {e}") from e

fetch_csv_url

fetch_csv_url(url, *, experimental=True, **kwargs)

Download CSV from url to a temp file and read via the Rust CSV path when possible.

Source code in python/pydantable/io/http.py
def fetch_csv_url(
    url: str,
    *,
    experimental: bool = True,
    **kwargs: Any,
) -> dict[str, list[Any]]:
    """Download CSV from ``url`` to a temp file and read via the Rust CSV path when possible."""
    from pydantable_native.io_core import (  # type: ignore[import-not-found]
        rust_read_csv_path,
    )

    _require_experimental(experimental)
    data = fetch_bytes(url, experimental=True, **kwargs)
    path, f = _write_temp_suffix(".csv")
    try:
        f.write(data)
        f.close()
        try:
            return rust_read_csv_path(str(path))
        except Exception:
            # Native reader can surface PyO3-wrapped errors; stdlib csv is the fallback.
            import csv

            with path.open(newline="", encoding="utf-8") as fh:
                reader = csv.reader(fh)
                header = next(reader)
                cols: dict[str, list[Any]] = {h: [] for h in header}
                for row in reader:
                    for i, h in enumerate(header):
                        cols[h].append(row[i] if i < len(row) else None)
                return cols
    finally:
        path.unlink(missing_ok=True)

fetch_parquet_url

fetch_parquet_url(url, *, experimental=True, columns=None, **kwargs)

Download a Parquet file from url (HTTP(S) only) and materialize as dict[str, list].

Extra kwargs are forwarded to :func:fetch_bytes (e.g. max_bytes, timeout).

Source code in python/pydantable/io/http.py
def fetch_parquet_url(
    url: str,
    *,
    experimental: bool = True,
    columns: list[str] | None = None,
    **kwargs: Any,
) -> dict[str, list[Any]]:
    """Download a Parquet file from ``url`` (HTTP(S) only) and materialize as ``dict[str, list]``.

    Extra ``kwargs`` are forwarded to :func:`fetch_bytes` (e.g. ``max_bytes``, ``timeout``).
    """
    from .arrow import read_parquet_pyarrow

    _require_experimental(experimental)
    data = fetch_bytes(url, experimental=True, **kwargs)
    return read_parquet_pyarrow(data, columns=columns)

read_from_object_store

read_from_object_store(uri, *, experimental=True, format='parquet', max_bytes=None, **kwargs)

Read s3://, gs://, or az:// style URIs via fsspec (optional dependency).

Experimental: requires pip install 'pydantable[cloud]' (or fsspec + backend).

max_bytes caps how much of the object is read into memory (streaming reads the remote file in chunks until the limit). Full streaming without a cap is not implemented.

Source code in python/pydantable/io/http.py
def read_from_object_store(
    uri: str,
    *,
    experimental: bool = True,
    format: str = "parquet",
    max_bytes: int | None = None,
    **kwargs: Any,
) -> dict[str, list[Any]]:
    """
    Read ``s3://``, ``gs://``, or ``az://`` style URIs via ``fsspec`` (optional dependency).

    *Experimental*: requires ``pip install 'pydantable[cloud]'`` (or ``fsspec`` + backend).

    ``max_bytes`` caps how much of the object is read into memory (streaming reads the
    remote file in chunks until the limit). Full streaming without a cap is not implemented.
    """
    _require_experimental(experimental)
    try:
        import fsspec  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "object-store URIs require fsspec (pip install 'pydantable[cloud]'). "
            "If fsspec is installed, install a backend for your URI scheme (e.g. s3fs for s3://)."
        ) from e
    scheme = urlparse(uri).scheme.lower()
    if scheme in ("http", "https"):
        raise ValueError("use fetch_parquet_url / fetch_csv_url for http(s) URLs")
    fmt = format.lower()
    try:
        with fsspec.open(uri, "rb") as f:  # type: ignore[call-arg]
            raw = _read_limited(f, max_bytes)
    except OSError as e:
        raise OSError(
            f"failed to open or read {uri!r} via fsspec (check URI and backend drivers): {e}"
        ) from e
    from pydantable_native.io_core import (  # type: ignore[import-not-found]  # noqa: I001
        rust_read_csv_path,
        rust_read_ndjson_path,
    )
    from .arrow import read_parquet_pyarrow

    if fmt == "parquet":
        return read_parquet_pyarrow(raw)
    if fmt == "csv":
        path, out = _write_temp_suffix(".csv")
        try:
            out.write(raw)
            out.close()
            return rust_read_csv_path(str(path))
        finally:
            path.unlink(missing_ok=True)
    if fmt in ("ndjson", "jsonl"):
        path, out = _write_temp_suffix(".ndjson")
        try:
            out.write(raw)
            out.close()
            return rust_read_ndjson_path(str(path))
        finally:
            path.unlink(missing_ok=True)
    raise ValueError(f"unsupported format={format!r} (use parquet, csv, ndjson)")

iter_csv

iter_csv(path, *, batch_size=65536, encoding='utf-8', newline='')

Yield CSV rows in batches as dict[str, list].

Values are yielded as strings (or None for missing cells); downstream typed constructors can validate/coerce as needed.

Source code in python/pydantable/io/iter_file.py
def iter_csv(
    path: _PathLike | TextIO,
    *,
    batch_size: int = 65_536,
    encoding: str = "utf-8",
    newline: str = "",
) -> Iterator[dict[str, list[Any]]]:
    """
    Yield CSV rows in batches as ``dict[str, list]``.

    Values are yielded as strings (or ``None`` for missing cells); downstream typed
    constructors can validate/coerce as needed.
    """
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    if isinstance(path, (str, Path)):
        with open(path, newline=newline, encoding=encoding) as fh:
            yield from iter_csv(
                fh,
                batch_size=batch_size,
                encoding=encoding,
                newline=newline,
            )
        return

    reader = csv.reader(path)
    try:
        header = next(reader)
    except StopIteration:
        return
    header = [str(h) for h in header]
    cols: dict[str, list[Any]] = {h: [] for h in header}
    n = 0
    for row in reader:
        for i, h in enumerate(header):
            cols[h].append(row[i] if i < len(row) else None)
        n += 1
        if n >= batch_size:
            ensure_rectangular(cols)
            yield cols
            cols = {h: [] for h in header}
            n = 0
    if n:
        ensure_rectangular(cols)
        yield cols

iter_ipc

iter_ipc(source, *, batch_size=65536, as_stream=False)

Yield Arrow IPC in batches as dict[str, list].

  • as_stream=False reads IPC file format.
  • as_stream=True reads IPC stream format.
Source code in python/pydantable/io/iter_file.py
def iter_ipc(
    source: _PathLike | BinaryIO | bytes,
    *,
    batch_size: int = 65_536,
    as_stream: bool = False,
) -> Iterator[dict[str, list[Any]]]:
    """
    Yield Arrow IPC in batches as ``dict[str, list]``.

    - ``as_stream=False`` reads IPC file format.
    - ``as_stream=True`` reads IPC stream format.
    """
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        import pyarrow as pa  # type: ignore[import-not-found]
        from pyarrow import ipc  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "iter_ipc requires pyarrow (pip install 'pydantable[arrow]')."
        ) from e

    if isinstance(source, (str, Path)):
        if as_stream:
            reader = ipc.open_stream(str(source))
        else:
            reader = ipc.open_file(str(source))
    else:
        buf = pa.py_buffer(source)
        reader = ipc.open_stream(buf) if as_stream else ipc.open_file(buf)

    def _batches() -> Iterator[Any]:  # RecordBatch
        if as_stream:
            yield from reader
            return
        n = reader.num_record_batches
        for i in range(n):
            yield reader.get_batch(i)

    with reader:
        for batch in _batches():
            d = batch.to_pydict()
            out = {k: list(v) for k, v in d.items()}
            ensure_rectangular(out)
            yield out

iter_json_array

iter_json_array(path, *, batch_size=65536, encoding='utf-8')

Yield a JSON array-of-objects file in batches.

This is not incremental JSON parsing: the full file is currently loaded then chunked. Provided for API uniformity.

Source code in python/pydantable/io/iter_file.py
def iter_json_array(
    path: _PathLike | TextIO,
    *,
    batch_size: int = 65_536,
    encoding: str = "utf-8",
) -> Iterator[dict[str, list[Any]]]:
    """
    Yield a JSON array-of-objects file in batches.

    This is not incremental JSON parsing: the full file is currently loaded then
    chunked. Provided for API uniformity.
    """
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    if isinstance(path, (str, Path)):
        with open(path, encoding=encoding) as fh:
            yield from iter_json_array(fh, batch_size=batch_size, encoding=encoding)
        return

    data = json.load(path)
    if not isinstance(data, list):
        raise ValueError("JSON array reader expects a top-level array")
    rows: list[dict[str, Any]] = []
    for item in data:
        if not isinstance(item, dict):
            raise ValueError("JSON array elements must be objects")
        rows.append(item)
        if len(rows) >= batch_size:
            yield _rows_to_columns(rows)
            rows = []
    if rows:
        yield _rows_to_columns(rows)

iter_json_lines

iter_json_lines(path, *, batch_size=65536, encoding='utf-8')

Alias of :func:iter_ndjson.

Source code in python/pydantable/io/iter_file.py
def iter_json_lines(
    path: _PathLike | TextIO,
    *,
    batch_size: int = 65_536,
    encoding: str = "utf-8",
) -> Iterator[dict[str, list[Any]]]:
    """Alias of :func:`iter_ndjson`."""
    yield from iter_ndjson(path, batch_size=batch_size, encoding=encoding)

iter_ndjson

iter_ndjson(path, *, batch_size=65536, encoding='utf-8')

Yield NDJSON (JSON Lines) as dict[str, list] batches.

Each line must be a JSON object; keys are unioned within each batch.

Source code in python/pydantable/io/iter_file.py
def iter_ndjson(
    path: _PathLike | TextIO,
    *,
    batch_size: int = 65_536,
    encoding: str = "utf-8",
) -> Iterator[dict[str, list[Any]]]:
    """
    Yield NDJSON (JSON Lines) as ``dict[str, list]`` batches.

    Each line must be a JSON object; keys are unioned within each batch.
    """
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    if isinstance(path, (str, Path)):
        with open(path, encoding=encoding) as fh:
            yield from iter_ndjson(fh, batch_size=batch_size, encoding=encoding)
        return

    rows: list[dict[str, Any]] = []
    for line in path:
        line = line.strip()
        if not line:
            continue
        obj = json.loads(line)
        if not isinstance(obj, dict):
            raise ValueError("NDJSON lines must be JSON objects")
        rows.append(obj)
        if len(rows) >= batch_size:
            yield _rows_to_columns(rows)
            rows = []
    if rows:
        yield _rows_to_columns(rows)

iter_parquet

iter_parquet(path, *, batch_size=65536, columns=None)

Yield Parquet data in batches as dict[str, list].

Requires pyarrow (install pydantable[arrow]).

Source code in python/pydantable/io/iter_file.py
def iter_parquet(
    path: _PathLike,
    *,
    batch_size: int = 65_536,
    columns: list[str] | None = None,
) -> Iterator[dict[str, list[Any]]]:
    """
    Yield Parquet data in batches as ``dict[str, list]``.

    Requires `pyarrow` (install `pydantable[arrow]`).
    """
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    try:
        import pyarrow.parquet as pq  # type: ignore[import-not-found,import-untyped]
    except ImportError as e:
        raise ImportError(
            "iter_parquet requires pyarrow (pip install 'pydantable[arrow]')."
        ) from e

    pf = pq.ParquetFile(str(path))
    for record_batch in pf.iter_batches(batch_size=batch_size, columns=columns):
        d = record_batch.to_pydict()
        out = {k: list(v) for k, v in d.items()}
        ensure_rectangular(out)
        yield out

afetch_mongo_async async

afetch_mongo_async(collection, *, match=None, projection=None, sort=None, skip=None, limit=None, fields=None, session=None, max_time_ms=None)

Async :func:fetch_mongo for :class:~pymongo.asynchronous.collection.AsyncCollection.

Source code in python/pydantable/io/mongo.py
async def afetch_mongo_async(
    collection: Any,
    *,
    match: Mapping[str, Any] | None = None,
    projection: Mapping[str, Any] | int | bool | None = None,
    sort: Sequence[tuple[str, int]] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    fields: Sequence[str] | None = None,
    session: Any | None = None,
    max_time_ms: int | None = None,
) -> dict[str, list[Any]]:
    """Async :func:`fetch_mongo` for :class:`~pymongo.asynchronous.collection.AsyncCollection`."""
    _require_pymongo()
    cur = _build_async_cursor(
        collection,
        match=match,
        projection=projection,
        sort=sort,
        skip=skip,
        limit=limit,
        batch_size=0,
        session=session,
        max_time_ms=max_time_ms,
    )
    docs: list[dict[str, Any]] = []
    async for d in cur:
        docs.append(dict(d))
    return _docs_to_column_dict(docs, fields=fields)

aiter_mongo_async async

aiter_mongo_async(collection, *, match=None, projection=None, sort=None, skip=None, limit=None, batch_size=1000, fields=None, session=None, max_time_ms=None)

Async :func:iter_mongo for :class:~pymongo.asynchronous.collection.AsyncCollection.

Source code in python/pydantable/io/mongo.py
async def aiter_mongo_async(
    collection: Any,
    *,
    match: Mapping[str, Any] | None = None,
    projection: Mapping[str, Any] | int | bool | None = None,
    sort: Sequence[tuple[str, int]] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    batch_size: int = 1000,
    fields: Sequence[str] | None = None,
    session: Any | None = None,
    max_time_ms: int | None = None,
) -> AsyncIterator[dict[str, list[Any]]]:
    """Async :func:`iter_mongo` for :class:`~pymongo.asynchronous.collection.AsyncCollection`."""
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    _require_pymongo()
    cur = _build_async_cursor(
        collection,
        match=match,
        projection=projection,
        sort=sort,
        skip=skip,
        limit=limit,
        batch_size=batch_size,
        session=session,
        max_time_ms=max_time_ms,
    )
    batch: list[dict[str, Any]] = []
    async for doc in cur:
        batch.append(dict(doc))
        if len(batch) >= batch_size:
            yield _docs_to_column_dict(batch, fields=fields)
            batch = []
    if batch:
        yield _docs_to_column_dict(batch, fields=fields)

awrite_mongo_async async

awrite_mongo_async(collection, data, *, ordered=True, chunk_size=None, session=None)

Async :func:write_mongo for :class:~pymongo.asynchronous.collection.AsyncCollection.

Source code in python/pydantable/io/mongo.py
async def awrite_mongo_async(
    collection: Any,
    data: dict[str, list[Any]],
    *,
    ordered: bool = True,
    chunk_size: int | None = None,
    session: Any | None = None,
) -> int:
    """Async :func:`write_mongo` for :class:`~pymongo.asynchronous.collection.AsyncCollection`."""
    _require_pymongo()
    if not data:
        return 0
    ensure_rectangular(data)
    n = len(next(iter(data.values())))
    if n == 0:
        return 0
    keys = list(data.keys())
    chunk_n = _write_chunk_size(chunk_size)
    total = 0
    insert_kw: dict[str, Any] = {}
    if session is not None:
        insert_kw["session"] = session
    for start in range(0, n, chunk_n):
        end = min(start + chunk_n, n)
        rows = [{k: data[k][i] for k in keys} for i in range(start, end)]
        res = await collection.insert_many(rows, ordered=ordered, **insert_kw)
        total += len(res.inserted_ids)
    return total

fetch_mongo

fetch_mongo(collection, *, match=None, projection=None, sort=None, skip=None, limit=None, fields=None, session=None, max_time_ms=None)

Load all matching documents into a single dict[column_name, list].

Materializes the full cursor in memory; for large scans prefer :func:iter_mongo.

Source code in python/pydantable/io/mongo.py
def fetch_mongo(
    collection: Any,
    *,
    match: Mapping[str, Any] | None = None,
    projection: Mapping[str, Any] | int | bool | None = None,
    sort: Sequence[tuple[str, int]] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    fields: Sequence[str] | None = None,
    session: Any | None = None,
    max_time_ms: int | None = None,
) -> dict[str, list[Any]]:
    """
    Load all matching documents into a single ``dict[column_name, list]``.

    Materializes the full cursor in memory; for large scans prefer :func:`iter_mongo`.
    """
    _require_pymongo()
    cur = _build_cursor(
        collection,
        match=match,
        projection=projection,
        sort=sort,
        skip=skip,
        limit=limit,
        batch_size=0,
        session=session,
        max_time_ms=max_time_ms,
    )
    docs = [dict(d) for d in cur]
    return _docs_to_column_dict(docs, fields=fields)

is_async_mongo_collection

is_async_mongo_collection(collection)

True for :class:pymongo.asynchronous.collection.AsyncCollection instances.

Source code in python/pydantable/io/mongo.py
def is_async_mongo_collection(collection: Any) -> bool:
    """True for :class:`pymongo.asynchronous.collection.AsyncCollection` instances."""
    try:
        return type(collection).__module__.startswith("pymongo.asynchronous")
    except Exception:
        # Broad: tolerate pathological proxies or broken __class__ / __module__.
        return False

iter_mongo

iter_mongo(collection, *, match=None, projection=None, sort=None, skip=None, limit=None, batch_size=1000, fields=None, session=None, max_time_ms=None)

Yield dict[column_name, list] batches from a PyMongo Collection.find.

Each batch is rectangular. Document keys are merged per batch (sorted union unless fields fixes the column order). Install pymongo (or pydantable[mongo]).

Source code in python/pydantable/io/mongo.py
def iter_mongo(
    collection: Any,
    *,
    match: Mapping[str, Any] | None = None,
    projection: Mapping[str, Any] | int | bool | None = None,
    sort: Sequence[tuple[str, int]] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    batch_size: int = 1000,
    fields: Sequence[str] | None = None,
    session: Any | None = None,
    max_time_ms: int | None = None,
) -> Iterator[dict[str, list[Any]]]:
    """
    Yield ``dict[column_name, list]`` batches from a PyMongo ``Collection.find``.

    Each batch is rectangular. Document keys are merged per batch (sorted union unless
    ``fields`` fixes the column order). Install **pymongo** (or ``pydantable[mongo]``).
    """
    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")
    cur = _build_cursor(
        collection,
        match=match,
        projection=projection,
        sort=sort,
        skip=skip,
        limit=limit,
        batch_size=batch_size,
        session=session,
        max_time_ms=max_time_ms,
    )
    batch: list[dict[str, Any]] = []
    for doc in cur:
        batch.append(dict(doc))
        if len(batch) >= batch_size:
            yield _docs_to_column_dict(batch, fields=fields)
            batch = []
    if batch:
        yield _docs_to_column_dict(batch, fields=fields)

write_mongo

write_mongo(collection, data, *, ordered=True, chunk_size=None, session=None)

Insert rows from a rectangular column dict via insert_many.

Returns the number of inserted document ids. Empty data or zero rows is a no-op (returns 0).

Source code in python/pydantable/io/mongo.py
def write_mongo(
    collection: Any,
    data: dict[str, list[Any]],
    *,
    ordered: bool = True,
    chunk_size: int | None = None,
    session: Any | None = None,
) -> int:
    """
    Insert rows from a rectangular column ``dict`` via ``insert_many``.

    Returns the number of inserted document ids. Empty ``data`` or zero rows is a no-op
    (returns ``0``).
    """
    _require_pymongo()
    if not data:
        return 0
    ensure_rectangular(data)
    n = len(next(iter(data.values())))
    if n == 0:
        return 0
    keys = list(data.keys())
    chunk_n = _write_chunk_size(chunk_size)
    total = 0
    insert_kw: dict[str, Any] = {}
    if session is not None:
        insert_kw["session"] = session
    for start in range(0, n, chunk_n):
        end = min(start + chunk_n, n)
        rows = [{k: data[k][i] for k in keys} for i in range(start, end)]
        res = collection.insert_many(rows, ordered=ordered, **insert_kw)
        total += len(res.inserted_ids)
    return total

aread_csv_rap async

aread_csv_rap(path)

Load a CSV file with rapcsv.AsyncDictReader (non-blocking I/O).

Requires pip install 'pydantable[rap]' (rapcsv + rapfiles).

Source code in python/pydantable/io/rap_support.py
async def aread_csv_rap(path: str) -> dict[str, list[Any]]:
    """
    Load a CSV file with ``rapcsv.AsyncDictReader`` (non-blocking I/O).

    Requires ``pip install 'pydantable[rap]'`` (``rapcsv`` + ``rapfiles``).
    """
    try:
        import rapcsv  # type: ignore[import-not-found]
        import rapfiles  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "aread_csv_rap requires rapcsv and rapfiles (pip install 'pydantable[rap]')."
        ) from e

    # ``rapfiles.open`` matches aiofiles (there is no ``aopen`` on rapfiles 0.2.x).
    # Iterate with ``read_row()``: ``async for`` over ``AsyncDictReader`` can hang
    # in rapcsv 0.2.x when used with a rapfiles text handle.
    async with rapfiles.open(path, "r", encoding="utf-8", newline="") as fh:
        reader = rapcsv.AsyncDictReader(fh)
        rows: list[dict[str, Any]] = []
        while True:
            row = await reader.read_row()
            if not row:
                break
            rows.append(row)
    if not rows:
        return {}
    keys = list(rows[0].keys())
    return {k: [r.get(k) for r in rows] for k in keys}

rap_csv_available

rap_csv_available()

True when rapcsv and rapfiles are importable and rapfiles.open exists.

Source code in python/pydantable/io/rap_support.py
def rap_csv_available() -> bool:
    """True when ``rapcsv`` and ``rapfiles`` are importable and ``rapfiles.open`` exists."""
    try:
        import rapcsv  # type: ignore[import-not-found,unused-ignore]  # noqa: F401
        import rapfiles  # type: ignore[import-not-found,unused-ignore]
    except ImportError:
        return False
    # rapfiles exposes aiofiles-compatible ``open``, not ``aopen`` (see rapfiles 0.2.x docs).
    return callable(getattr(rapfiles, "open", None))

fetch_sql

fetch_sql(sql, bind, *, parameters=None, batch_size=None, auto_stream=True, auto_stream_threshold_rows=None)

Execute sql and return rows as dict[column_name, list] (materialized).

Source code in python/pydantable/io/sql.py
def fetch_sql(
    sql: str,
    bind: str | Engine | Connection,
    *,
    parameters: Mapping[str, Any] | None = None,
    batch_size: int | None = None,
    auto_stream: bool = True,
    auto_stream_threshold_rows: int | None = None,
) -> dict[str, list[Any]] | StreamingColumns:
    """
    Deprecated: use :func:`fetch_sql_raw` or :func:`fetch_sqlmodel`.

    Execute ``sql`` and return rows as ``dict[column_name, list]`` (materialized).
    """
    _warn_legacy_sql(
        "fetch_sql",
        raw="fetch_sql_raw(...)",
        sqlmodel="fetch_sqlmodel(...)",
    )
    return fetch_sql_raw(
        sql,
        bind,
        parameters=parameters,
        batch_size=batch_size,
        auto_stream=auto_stream,
        auto_stream_threshold_rows=auto_stream_threshold_rows,
    )

fetch_sql_raw

fetch_sql_raw(sql, bind, *, parameters=None, batch_size=None, auto_stream=True, auto_stream_threshold_rows=None)

Execute sql and return rows as dict[column_name, list] (materialized).

bind may be any SQLAlchemy URL your environment has drivers for, or a :class:~sqlalchemy.engine.Engine / :class:~sqlalchemy.engine.Connection. Use bound parameters only — never interpolate untrusted input into sql.

Source code in python/pydantable/io/sql.py
def fetch_sql_raw(
    sql: str,
    bind: str | Engine | Connection,
    *,
    parameters: Mapping[str, Any] | None = None,
    batch_size: int | None = None,
    auto_stream: bool = True,
    auto_stream_threshold_rows: int | None = None,
) -> dict[str, list[Any]] | StreamingColumns:
    """
    Execute ``sql`` and return rows as ``dict[column_name, list]`` (materialized).

    ``bind`` may be any SQLAlchemy **URL** your environment has drivers for, or a
    :class:`~sqlalchemy.engine.Engine` / :class:`~sqlalchemy.engine.Connection`.
    Use **bound parameters** only — never interpolate untrusted input into ``sql``.
    """
    bs = _fetch_batch_size(batch_size)
    thresh = _auto_stream_threshold_rows(auto_stream_threshold_rows)

    batches: list[dict[str, list[Any]]] = []
    total = 0
    streaming = False
    for b in iter_sql_raw(sql, bind, parameters=parameters, batch_size=bs):
        if not b:
            continue
        batches.append(b)
        # any column length works; iter_sql batches are rectangular
        any_col = next(iter(b.values()))
        total += len(any_col)
        if auto_stream and total > thresh:
            streaming = True

    if not batches:
        return {}
    if streaming:
        return StreamingColumns(batches)
    if len(batches) == 1:
        return batches[0]
    keys = list(batches[0].keys())
    out: dict[str, list[Any]] = {k: [] for k in keys}
    for b in batches:
        for k in keys:
            out[k].extend(b.get(k, []))
    return out

iter_sql

iter_sql(sql, bind, *, parameters=None, batch_size=None)

Execute sql and yield results in batches as dict[column_name, list].

Source code in python/pydantable/io/sql.py
def iter_sql(
    sql: str,
    bind: str | Engine | Connection,
    *,
    parameters: Mapping[str, Any] | None = None,
    batch_size: int | None = None,
) -> Iterator[dict[str, list[Any]]]:
    """
    Deprecated: use :func:`iter_sql_raw` or :func:`iter_sqlmodel`.

    Execute ``sql`` and yield results in batches as ``dict[column_name, list]``.
    """
    _warn_legacy_sql(
        "iter_sql",
        raw="iter_sql_raw(...)",
        sqlmodel="iter_sqlmodel(...)",
    )
    return iter_sql_raw(sql, bind, parameters=parameters, batch_size=batch_size)

iter_sql_raw

iter_sql_raw(sql, bind, *, parameters=None, batch_size=None)

Execute sql and yield results in batches as dict[column_name, list].

Streaming alternative to :func:fetch_sql_raw for large result sets.

Notes: - sql should be a SELECT (or other statement returning rows). - Use bound parameters only — never interpolate untrusted input into sql. - bind may be a SQLAlchemy URL string, Engine, or Connection.

Source code in python/pydantable/io/sql.py
def iter_sql_raw(
    sql: str,
    bind: str | Engine | Connection,
    *,
    parameters: Mapping[str, Any] | None = None,
    batch_size: int | None = None,
) -> Iterator[dict[str, list[Any]]]:
    """
    Execute ``sql`` and yield results in batches as ``dict[column_name, list]``.

    Streaming alternative to :func:`fetch_sql_raw` for large result sets.

    Notes:
    - ``sql`` should be a ``SELECT`` (or other statement returning rows).
    - Use **bound parameters** only — never interpolate untrusted input into ``sql``.
    - ``bind`` may be a SQLAlchemy URL string, ``Engine``, or ``Connection``.
    """
    bs = _fetch_batch_size(batch_size)

    from sqlalchemy import create_engine, text
    from sqlalchemy.engine import Connection as SAConnection
    from sqlalchemy.engine import Engine as SAEngine

    params = dict(parameters or {})

    if isinstance(bind, SAConnection):
        result = bind.execution_options(stream_results=True).execute(text(sql), params)
        while True:
            chunk = result.mappings().fetchmany(bs)
            if not chunk:
                break
            yield mappings_rows_to_column_dict(chunk)
        return

    eng = bind if isinstance(bind, SAEngine) else create_engine(bind)
    with eng.connect() as conn:
        result = conn.execution_options(stream_results=True).execute(text(sql), params)
        while True:
            chunk = result.mappings().fetchmany(bs)
            if not chunk:
                break
            yield mappings_rows_to_column_dict(chunk)

write_sql

write_sql(data, table_name, bind, *, schema=None, if_exists='append', chunk_size=None)

Insert data (column dict) into table_name.

Source code in python/pydantable/io/sql.py
def write_sql(
    data: dict[str, list[Any]],
    table_name: str,
    bind: str | Engine | Connection,
    *,
    schema: str | None = None,
    if_exists: str = "append",
    chunk_size: int | None = None,
) -> None:
    """
    Deprecated: use :func:`write_sql_raw` or :func:`write_sqlmodel`.

    Insert ``data`` (column dict) into ``table_name``.
    """
    _warn_legacy_sql(
        "write_sql",
        raw="write_sql_raw(...)",
        sqlmodel="write_sqlmodel(...)",
    )
    write_sql_raw(
        data,
        table_name,
        bind,
        schema=schema,
        if_exists=if_exists,
        chunk_size=chunk_size,
    )

write_sql_raw

write_sql_raw(data, table_name, bind, *, schema=None, if_exists='append', chunk_size=None)

Insert data (column dict) into table_name.

  • append: table must already exist; rows are appended.
  • replace: drops the table if it exists, recreates it with inferred column types, then inserts. table_name / schema must be trusted identifiers (not user-controlled).

bind is any SQLAlchemy-supported URL or Engine (same driver rules as fetch_sql_raw). if_exists="replace" uses generic DDL; exotic dialects may need app-specific migrations instead.

Source code in python/pydantable/io/sql.py
def write_sql_raw(
    data: dict[str, list[Any]],
    table_name: str,
    bind: str | Engine | Connection,
    *,
    schema: str | None = None,
    if_exists: str = "append",
    chunk_size: int | None = None,
) -> None:
    """
    Insert ``data`` (column dict) into ``table_name``.

    * ``append``: table must already exist; rows are appended.
    * ``replace``: drops the table if it exists, recreates it with inferred column types, then inserts.
      ``table_name`` / ``schema`` must be **trusted** identifiers (not user-controlled).

    ``bind`` is any SQLAlchemy-supported **URL** or **Engine** (same driver rules as ``fetch_sql_raw``).
    ``if_exists="replace"`` uses generic DDL; exotic dialects may need app-specific migrations instead.
    """
    from sqlalchemy import MetaData, Table, insert, inspect
    from sqlalchemy.schema import CreateTable, DropTable

    if if_exists not in ("append", "replace"):
        raise ValueError("if_exists must be 'append' or 'replace'")
    if not data:
        return
    lengths = {len(v) for v in data.values()}
    if len(lengths) != 1:
        raise ValueError("all columns in data must have the same length")
    n = lengths.pop()
    keys = list(data.keys())
    chunk_n = _write_chunk_size(chunk_size)

    def _row_chunks():
        for start in range(0, n, chunk_n):
            end = min(start + chunk_n, n)
            chunk = []
            for i in range(start, end):
                chunk.append({k: data[k][i] for k in keys})
            yield chunk

    eng = _to_engine(bind)
    insp = inspect(eng)
    exists = insp.has_table(table_name, schema=schema)

    with eng.begin() as conn:
        if if_exists == "replace":
            if exists:
                old_md = MetaData()
                old_tbl = Table(table_name, old_md, schema=schema)
                conn.execute(DropTable(old_tbl))
            md = MetaData()
            tbl = Table(
                table_name,
                md,
                *_infer_columns(data),
                schema=schema,
            )
            conn.execute(CreateTable(tbl))
            for chunk in _row_chunks():
                conn.execute(insert(tbl), chunk)
            return

        if not exists:
            raise ValueError(
                f"table {table_name!r} does not exist (if_exists='append')"
            )
        md = MetaData()
        tbl = Table(table_name, md, schema=schema, autoload_with=conn)
        for chunk in _row_chunks():
            conn.execute(insert(tbl), chunk)

fetch_sqlmodel

fetch_sqlmodel(model, bind, *, where=None, parameters=None, columns=None, order_by=None, limit=None, batch_size=None, auto_stream=True, auto_stream_threshold_rows=None)

Load rows for model into dict[column_name, list] (or :class:StreamingColumns).

Semantics match :func:pydantable.io.fetch_sql for batch_size, auto_stream, and auto_stream_threshold_rows.

Source code in python/pydantable/io/sqlmodel_read.py
def fetch_sqlmodel(
    model: type[Any],
    bind: str | Engine | Connection,
    *,
    where: Any | None = None,
    parameters: Mapping[str, Any] | None = None,
    columns: Sequence[Any] | None = None,
    order_by: Sequence[Any] | None = None,
    limit: int | None = None,
    batch_size: int | None = None,
    auto_stream: bool = True,
    auto_stream_threshold_rows: int | None = None,
) -> dict[str, list[Any]] | StreamingColumns:
    """
    Load rows for ``model`` into ``dict[column_name, list]`` (or :class:`StreamingColumns`).

    Semantics match :func:`pydantable.io.fetch_sql` for ``batch_size``,
    ``auto_stream``, and ``auto_stream_threshold_rows``.
    """
    bs = _fetch_batch_size(batch_size)
    thresh = _auto_stream_threshold_rows(auto_stream_threshold_rows)

    batches: list[dict[str, list[Any]]] = []
    total = 0
    streaming = False
    for b in iter_sqlmodel(
        model,
        bind,
        where=where,
        parameters=parameters,
        columns=columns,
        order_by=order_by,
        limit=limit,
        batch_size=bs,
    ):
        if not b:
            continue
        batches.append(b)
        any_col = next(iter(b.values()))
        total += len(any_col)
        if auto_stream and total > thresh:
            streaming = True

    if not batches:
        return {}
    if streaming:
        return StreamingColumns(batches)
    if len(batches) == 1:
        return batches[0]
    keys = list(batches[0].keys())
    out: dict[str, list[Any]] = {k: [] for k in keys}
    for b in batches:
        for k in keys:
            out[k].extend(b.get(k, []))
    return out

iter_sqlmodel

iter_sqlmodel(model, bind, *, where=None, parameters=None, columns=None, order_by=None, limit=None, batch_size=None)

Stream rows for model as dict[column_name, list] batches.

model must be a :class:sqlmodel.SQLModel subclass with table=True.

Source code in python/pydantable/io/sqlmodel_read.py
def iter_sqlmodel(
    model: type[Any],
    bind: str | Engine | Connection,
    *,
    where: Any | None = None,
    parameters: Mapping[str, Any] | None = None,
    columns: Sequence[Any] | None = None,
    order_by: Sequence[Any] | None = None,
    limit: int | None = None,
    batch_size: int | None = None,
) -> Iterator[dict[str, list[Any]]]:
    """
    Stream rows for ``model`` as ``dict[column_name, list]`` batches.

    ``model`` must be a :class:`sqlmodel.SQLModel` subclass with ``table=True``.
    """
    _ensure_table_model(model)
    stmt = _build_select(
        model,
        where=where,
        columns=columns,
        order_by=order_by,
        limit=limit,
    )
    bs = _fetch_batch_size(batch_size)
    params = dict(parameters or {})

    from sqlalchemy import create_engine
    from sqlalchemy.engine import Connection as SAConnection
    from sqlalchemy.engine import Engine as SAEngine

    if isinstance(bind, SAConnection):
        result = bind.execution_options(stream_results=True).execute(stmt, params)
        while True:
            chunk = result.mappings().fetchmany(bs)
            if not chunk:
                break
            yield mappings_rows_to_column_dict(chunk)
        return

    eng = bind if isinstance(bind, SAEngine) else create_engine(bind)
    with eng.connect() as conn:
        result = conn.execution_options(stream_results=True).execute(stmt, params)
        while True:
            chunk = result.mappings().fetchmany(bs)
            if not chunk:
                break
            yield mappings_rows_to_column_dict(chunk)

sqlmodel_columns

sqlmodel_columns(model)

Return ordered SQLAlchemy column keys for model (table=True SQLModel).

Matches the default key set returned by :func:~pydantable.io.fetch_sqlmodel and expected by :func:~pydantable.io.write_sqlmodel for a full-row payload.

Source code in python/pydantable/io/sqlmodel_schema.py
def sqlmodel_columns(model: type[Any]) -> list[str]:
    """
    Return ordered SQLAlchemy column keys for ``model`` (``table=True`` SQLModel).

    Matches the default key set returned by :func:`~pydantable.io.fetch_sqlmodel`
    and expected by :func:`~pydantable.io.write_sqlmodel` for a full-row payload.
    """
    _require_sqlmodel()
    _ensure_table_model(model)
    return list(model.__table__.columns.keys())

write_sqlmodel

write_sqlmodel(data, model, bind, *, schema=None, if_exists='append', chunk_size=None, validate_rows=False, replace_ok=False)

Insert data into the table defined by model (table=True SQLModel).

  • append: table must already exist.
  • replace: drops the table if present, recreates from model.__table__, inserts. Requires replace_ok=True (destructive).

validate_rows=True runs model.model_validate per row; failures include the row index in the error.

Source code in python/pydantable/io/sqlmodel_write.py
def write_sqlmodel(
    data: dict[str, list[Any]],
    model: type[Any],
    bind: str | Engine | Connection,
    *,
    schema: str | None = None,
    if_exists: str = "append",
    chunk_size: int | None = None,
    validate_rows: bool = False,
    replace_ok: bool = False,
) -> None:
    """
    Insert ``data`` into the table defined by ``model`` (``table=True`` SQLModel).

    * ``append``: table must already exist.
    * ``replace``: drops the table if present, recreates from ``model.__table__``, inserts.
      Requires ``replace_ok=True`` (destructive).

    ``validate_rows=True`` runs ``model.model_validate`` per row; failures include the
    row index in the error.
    """
    _require_sqlmodel()
    _ensure_table_model(model)
    _validate_if_exists(if_exists)
    if if_exists == "replace" and not replace_ok:
        raise ValueError(
            "if_exists='replace' is destructive (DROP + CREATE). "
            "Pass replace_ok=True after confirming the table name/schema are trusted."
        )
    if not data:
        return

    lengths = {len(v) for v in data.values()}
    if len(lengths) != 1:
        raise ValueError("all columns in data must have the same length")
    n = next(iter(lengths))

    table = model.__table__
    tbl_schema = table.schema
    if schema is not None and schema != tbl_schema:
        raise ValueError(
            f"schema={schema!r} does not match model.__table__.schema ({tbl_schema!r})"
        )

    _align_data_keys(data, table)

    keys = list(data.keys())
    if validate_rows:
        for i in range(n):
            row = {k: data[k][i] for k in keys}
            try:
                model.model_validate(row)
            except ValidationError as e:
                raise ValueError(f"row {i} failed validation for {model!r}: {e}") from e

    chunk_n = _write_chunk_size(chunk_size)

    def _row_chunks():
        for start in range(0, n, chunk_n):
            end = min(start + chunk_n, n)
            chunk = []
            for i in range(start, end):
                raw = {k: data[k][i] for k in keys}
                chunk.append(_insert_row_dict(raw, table))
            yield chunk

    from sqlalchemy import insert, inspect

    eng = _to_engine(bind)

    with eng.begin() as conn:
        if if_exists == "replace":
            table.drop(conn, checkfirst=True)
            table.create(bind=conn)
        else:
            if not inspect(eng).has_table(table.name, schema=tbl_schema):
                raise ValueError(
                    f"table {table.name!r} does not exist (if_exists='append')"
                )

        for chunk in _row_chunks():
            conn.execute(insert(table), chunk)

write_csv_batches

write_csv_batches(path, batches, *, mode='w', encoding='utf-8', newline='', write_header=True)

Write an iterator of rectangular column dict batches to a single CSV file.

path must be a file path (or open text stream), not a directory. mode="w" truncates or creates the file; mode="a" appends rows. With mode="a", a header row is written only when the file is new and write_header is true (first batch).

Source code in python/pydantable/io/write_batches.py
def write_csv_batches(
    path: _PathLike | _TextStream,
    batches: Iterable[dict[str, list[Any]]],
    *,
    mode: str = "w",
    encoding: str = "utf-8",
    newline: str = "",
    write_header: bool = True,
) -> None:
    """
    Write an iterator of rectangular column dict batches to a **single CSV file**.

    ``path`` must be a file path (or open text stream), not a directory. ``mode="w"``
    truncates or creates the file; ``mode="a"`` appends rows. With ``mode="a"``, a header
    row is written only when the file is new and ``write_header`` is true (first batch).
    """
    if mode not in ("w", "a"):
        raise ValueError("mode must be 'w' or 'a'")
    if isinstance(path, (str, Path)):
        _reject_directory_file_path(path, "write_csv_batches")
        with open(path, mode, newline=newline, encoding=encoding) as fh:
            write_csv_batches(
                fh,
                batches,
                mode=mode,
                encoding=encoding,
                newline=newline,
                write_header=write_header,
            )
        return

    writer = None
    header: list[str] | None = None
    first = True
    for batch in batches:
        if not batch:
            continue
        ensure_rectangular(batch)
        if header is None:
            header = list(batch.keys())
            writer = csv.writer(path)
        assert header is not None and writer is not None
        if first and write_header and mode == "w":
            writer.writerow(header)
        n = len(next(iter(batch.values())))
        for i in range(n):
            writer.writerow([batch[h][i] for h in header])
        first = False

write_ipc_batches

write_ipc_batches(path, batches, *, as_stream=True)

Write batches to a single Arrow IPC file or stream (not a dataset directory).

path must be a file path (or open binary stream). Requires pyarrow (install pydantable[arrow]).

Source code in python/pydantable/io/write_batches.py
def write_ipc_batches(
    path: _PathLike | BinaryIO,
    batches: Iterable[dict[str, list[Any]]],
    *,
    as_stream: bool = True,
) -> None:
    """
    Write batches to a **single Arrow IPC file or stream** (not a dataset directory).

    ``path`` must be a file path (or open binary stream). Requires `pyarrow` (install
    ``pydantable[arrow]``).
    """
    try:
        import pyarrow as pa  # type: ignore[import-not-found]
        from pyarrow import ipc  # type: ignore[import-not-found]
    except ImportError as e:
        raise ImportError(
            "write_ipc_batches requires pyarrow (pip install 'pydantable[arrow]')."
        ) from e

    if isinstance(path, (str, Path)):
        _reject_directory_file_path(path, "write_ipc_batches")
        with open(path, "wb") as fh:
            write_ipc_batches(fh, batches, as_stream=as_stream)
        return

    writer = None
    try:
        for batch in batches:
            if not batch:
                continue
            ensure_rectangular(batch)
            table = pa.Table.from_pydict(batch)
            if writer is None:
                writer = (
                    ipc.new_stream(path, table.schema)
                    if as_stream
                    else ipc.new_file(path, table.schema)
                )
            writer.write_table(table)
    finally:
        if writer is not None:
            with suppress(Exception):
                writer.close()

write_ndjson_batches

write_ndjson_batches(path, batches, *, mode='w', encoding='utf-8')

Write an iterator of rectangular column dict batches to a single NDJSON file.

path must be a file path (or open text stream), not a directory. mode="w" truncates or creates the file; mode="a" appends one JSON object per line.

Source code in python/pydantable/io/write_batches.py
def write_ndjson_batches(
    path: _PathLike | _TextStream,
    batches: Iterable[dict[str, list[Any]]],
    *,
    mode: str = "w",
    encoding: str = "utf-8",
) -> None:
    """
    Write an iterator of rectangular column dict batches to a **single NDJSON file**.

    ``path`` must be a file path (or open text stream), not a directory. ``mode="w"``
    truncates or creates the file; ``mode="a"`` appends one JSON object per line.
    """
    if mode not in ("w", "a"):
        raise ValueError("mode must be 'w' or 'a'")
    if isinstance(path, (str, Path)):
        _reject_directory_file_path(path, "write_ndjson_batches")
        with open(path, mode, encoding=encoding) as fh:
            write_ndjson_batches(fh, batches, mode=mode, encoding=encoding)
        return

    for batch in batches:
        if not batch:
            continue
        ensure_rectangular(batch)
        keys = list(batch.keys())
        n = len(next(iter(batch.values())))
        for i in range(n):
            row = {k: batch[k][i] for k in keys}
            path.write(json.dumps(row, default=str) + "\n")

write_parquet_batches

write_parquet_batches(path, batches, *, compression=None)

Write batches to a single Parquet file (one row group per non-empty batch).

path must be a file path (or open binary stream), not a directory or hive dataset root. For partitioned Parquet on disk, use :meth:~pydantable.dataframe.DataFrame.write_parquet with partition_by. Requires pyarrow (install pydantable[arrow]).

Source code in python/pydantable/io/write_batches.py
def write_parquet_batches(
    path: _PathLike | BinaryIO,
    batches: Iterable[dict[str, list[Any]]],
    *,
    compression: str | None = None,
) -> None:
    """
    Write batches to a **single Parquet file** (one row group per non-empty batch).

    ``path`` must be a file path (or open binary stream), not a directory or hive
    dataset root. For partitioned Parquet on disk, use :meth:`~pydantable.dataframe.DataFrame.write_parquet`
    with ``partition_by``. Requires `pyarrow` (install ``pydantable[arrow]``).
    """
    try:
        import pyarrow as pa  # type: ignore[import-not-found]
        import pyarrow.parquet as pq  # type: ignore[import-not-found,import-untyped]
    except ImportError as e:
        raise ImportError(
            "write_parquet_batches requires pyarrow (pip install 'pydantable[arrow]')."
        ) from e

    if isinstance(path, (str, Path)):
        _reject_directory_file_path(path, "write_parquet_batches")
        with open(path, "wb") as fh:
            write_parquet_batches(fh, batches, compression=compression)
        return

    writer = None
    try:
        for batch in batches:
            if not batch:
                continue
            ensure_rectangular(batch)
            table = pa.Table.from_pydict(batch)
            if writer is None:
                writer = pq.ParquetWriter(path, table.schema, compression=compression)
            writer.write_table(table)
    finally:
        if writer is not None:
            with suppress(Exception):
                writer.close()

read_parquet

read_parquet(path, *, columns=None, **scan_kwargs)

Lazy Parquet read (local path); returns ScanFileRoot. Use DataFrame[Schema].read_parquet.

Extra keyword arguments are forwarded as Polars scan options (e.g. low_memory, n_rows, parallel, glob, hive_partitioning, hive_start_idx, try_parse_hive_dates, include_file_paths, row_index_name, row_index_offset). Unknown keys raise ValueError from the Rust layer. Per-scan details: IO_PARQUET on the doc site; kwargs matrix: DATA_IO_SOURCES (Audit: Polars 0.53.x vs pydantable).

Source code in python/pydantable/io/__init__.py
def read_parquet(
    path: str | Path,
    *,
    columns: list[str] | None = None,
    **scan_kwargs: Any,
) -> Any:
    """Lazy Parquet read (local path); returns ``ScanFileRoot``. Use ``DataFrame[Schema].read_parquet``.

    Extra keyword arguments are forwarded as Polars scan options (e.g. ``low_memory``, ``n_rows``,
    ``parallel``, ``glob``, ``hive_partitioning``, ``hive_start_idx``, ``try_parse_hive_dates``,
    ``include_file_paths``, ``row_index_name``, ``row_index_offset``). Unknown keys raise
    ``ValueError`` from the Rust layer. Per-scan details: ``IO_PARQUET`` on the doc site; kwargs matrix: ``DATA_IO_SOURCES`` (**Audit: Polars 0.53.x vs pydantable**).
    """
    sk = scan_kwargs if scan_kwargs else None
    return _scan_file_root(path, "parquet", columns=columns, scan_kwargs=sk)

read_csv

read_csv(path, *, columns=None, **scan_kwargs)

Lazy CSV read (local path); returns ScanFileRoot. Use DataFrame[Schema].read_csv.

Extra keyword arguments are forwarded as Polars LazyCsvReader options (e.g. has_header, separator, skip_rows, skip_lines, n_rows, infer_schema_length, ignore_errors, low_memory, rechunk, glob, cache, quote_char, eol_char, include_file_paths, row_index_name, row_index_offset, raise_if_empty, truncate_ragged_lines, decimal_comma, try_parse_dates). Unknown keys raise ValueError from the Rust layer. Per-scan details: IO_CSV on the doc site; kwargs matrix: DATA_IO_SOURCES (Audit: Polars 0.53.x vs pydantable).

Source code in python/pydantable/io/__init__.py
def read_csv(
    path: str | Path,
    *,
    columns: list[str] | None = None,
    **scan_kwargs: Any,
) -> Any:
    """Lazy CSV read (local path); returns ``ScanFileRoot``. Use ``DataFrame[Schema].read_csv``.

    Extra keyword arguments are forwarded as Polars ``LazyCsvReader`` options (e.g. ``has_header``,
    ``separator``, ``skip_rows``, ``skip_lines``, ``n_rows``, ``infer_schema_length``,
    ``ignore_errors``, ``low_memory``, ``rechunk``, ``glob``, ``cache``, ``quote_char``, ``eol_char``,
    ``include_file_paths``, ``row_index_name``, ``row_index_offset``, ``raise_if_empty``,
    ``truncate_ragged_lines``, ``decimal_comma``, ``try_parse_dates``). Unknown keys raise
    ``ValueError`` from the Rust layer. Per-scan details: ``IO_CSV`` on the doc site; kwargs matrix:
    ``DATA_IO_SOURCES`` (**Audit: Polars 0.53.x vs pydantable**).
    """
    sk = scan_kwargs if scan_kwargs else None
    return _scan_file_root(path, "csv", columns=columns, scan_kwargs=sk)

read_ndjson

read_ndjson(path, *, columns=None, **scan_kwargs)

Lazy newline-delimited JSON read (local path); returns ScanFileRoot.

Extra keyword arguments are forwarded as Polars LazyJsonLineReader options (e.g. low_memory, rechunk, ignore_errors, n_rows, infer_schema_length, glob, include_file_paths, row_index_name, row_index_offset). glob=False raises ValueError (Polars 0.53 NDJSON scans always expand paths). Unknown keys raise ValueError from the Rust layer. Per-scan details: IO_NDJSON on the doc site; kwargs matrix: DATA_IO_SOURCES (Audit: Polars 0.53.x vs pydantable).

Source code in python/pydantable/io/__init__.py
def read_ndjson(
    path: str | Path,
    *,
    columns: list[str] | None = None,
    **scan_kwargs: Any,
) -> Any:
    """Lazy newline-delimited JSON read (local path); returns ``ScanFileRoot``.

    Extra keyword arguments are forwarded as Polars ``LazyJsonLineReader`` options (e.g.
    ``low_memory``, ``rechunk``, ``ignore_errors``, ``n_rows``, ``infer_schema_length``,
    ``glob``, ``include_file_paths``, ``row_index_name``, ``row_index_offset``). ``glob=False``
    raises ``ValueError`` (Polars 0.53 NDJSON scans always expand paths). Unknown keys raise
    ``ValueError`` from the Rust layer. Per-scan details: ``IO_NDJSON`` on the doc site; kwargs
    matrix: ``DATA_IO_SOURCES`` (**Audit: Polars 0.53.x vs pydantable**).
    """
    sk = scan_kwargs if scan_kwargs else None
    return _scan_file_root(path, "ndjson", columns=columns, scan_kwargs=sk)

read_ipc

read_ipc(path, *, columns=None, **scan_kwargs)

Lazy Arrow IPC file read (local path); returns ScanFileRoot.

Extra keyword arguments are forwarded to the Rust layer: IpcScanOptions (record_batch_statistics) and UnifiedScanArgs (glob, cache, rechunk, n_rows, hive_partitioning, hive_start_idx, try_parse_hive_dates, include_file_paths, row_index_name, row_index_offset). Unknown keys raise ValueError. Per-scan details: IO_IPC on the doc site; kwargs matrix: DATA_IO_SOURCES (Audit: Polars 0.53.x vs pydantable).

Source code in python/pydantable/io/__init__.py
def read_ipc(
    path: str | Path,
    *,
    columns: list[str] | None = None,
    **scan_kwargs: Any,
) -> Any:
    """Lazy Arrow IPC **file** read (local path); returns ``ScanFileRoot``.

    Extra keyword arguments are forwarded to the Rust layer: **``IpcScanOptions``**
    (**``record_batch_statistics``**) and **``UnifiedScanArgs``** (**``glob``**, **``cache``**,
    **``rechunk``**, **``n_rows``**, **``hive_partitioning``**, **``hive_start_idx``**,
    **``try_parse_hive_dates``**, **``include_file_paths``**, **``row_index_name``**,
    **``row_index_offset``**). Unknown keys raise ``ValueError``. Per-scan details: ``IO_IPC`` on
    the doc site; kwargs matrix: ``DATA_IO_SOURCES`` (**Audit: Polars 0.53.x vs pydantable**).
    """
    sk = scan_kwargs if scan_kwargs else None
    return _scan_file_root(path, "ipc", columns=columns, scan_kwargs=sk)

read_json

read_json(path, *, columns=None, **scan_kwargs)

Lazy JSON Lines read (local path); alias of :func:read_ndjson (same ScanFileRoot).

Not a lazy reader for a single-file JSON array [{...}, ...] — use :func:materialize_json or :func:iter_json_array for array layout.

Paths: directory, glob, or a single file behave like :func:read_ndjson (Polars LazyJsonLineReader). Pass glob=True when using a directory or *.jsonl-style pattern so kwargs match other read_* APIs. scan_kwargs are the same as NDJSON (e.g. low_memory, rechunk, ignore_errors, n_rows, infer_schema_length, glob, include_file_paths, row_index_name, row_index_offset); glob=False raises ValueError. Unknown keys raise from the Rust layer. See IO_JSON and DATA_IO_SOURCES (Audit).

Source code in python/pydantable/io/__init__.py
def read_json(
    path: str | Path,
    *,
    columns: list[str] | None = None,
    **scan_kwargs: Any,
) -> Any:
    """Lazy **JSON Lines** read (local path); alias of :func:`read_ndjson` (same ``ScanFileRoot``).

    **Not** a lazy reader for a single-file JSON **array** ``[{...}, ...]`` — use
    :func:`materialize_json` or :func:`iter_json_array` for array layout.

    **Paths:** directory, glob, or a single file behave like :func:`read_ndjson` (Polars
    ``LazyJsonLineReader``). Pass ``glob=True`` when using a directory or ``*.jsonl``-style
    pattern so kwargs match other ``read_*`` APIs. **``scan_kwargs``** are the same as NDJSON
    (e.g. ``low_memory``, ``rechunk``, ``ignore_errors``, ``n_rows``, ``infer_schema_length``,
    ``glob``, ``include_file_paths``, ``row_index_name``, ``row_index_offset``); ``glob=False``
    raises ``ValueError``. Unknown keys raise from the Rust layer. See ``IO_JSON`` and
    ``DATA_IO_SOURCES`` (**Audit**).
    """
    return read_ndjson(path, columns=columns, **scan_kwargs)

read_parquet_url

read_parquet_url(url, *, experimental=True, columns=None, **kwargs)

Download Parquet from url (HTTP(S)) to a temp file; return ScanFileRoot.

The file is not removed automatically: delete it when the pipeline finishes (see the DATA_IO_SOURCES guide (project docs)).

Source code in python/pydantable/io/__init__.py
def read_parquet_url(
    url: str,
    *,
    experimental: bool = True,
    columns: list[str] | None = None,
    **kwargs: Any,
) -> Any:
    """Download Parquet from ``url`` (HTTP(S)) to a temp file; return ``ScanFileRoot``.

    The file is **not** removed automatically: delete it when the pipeline finishes
    (see the DATA_IO_SOURCES guide (project docs)).
    """
    data = fetch_bytes(url, experimental=experimental, **kwargs)
    fd, name = tempfile.mkstemp(suffix=".parquet")
    try:
        with os.fdopen(fd, "wb") as f:
            f.write(data)
    except (OSError, MemoryError):
        with suppress(OSError):
            os.unlink(name)
        raise
    try:
        return _scan_file_root(name, "parquet", columns=columns, scan_kwargs=None)
    except Exception:
        # Broad: any scan/lazy-plan failure; always remove the temp file, then re-raise.
        with suppress(OSError):
            os.unlink(name)
        raise

read_parquet_url_ctx

read_parquet_url_ctx(dataframe_cls, url, *, experimental=True, columns=None, **kwargs)

Download Parquet from url to a temp file, yield DataFrame[Schema], delete the file after.

Pass the parametrized frame class (e.g. DataFrame[MySchema]). The lazy plan must not be used after the context exits (the backing file is removed).

Source code in python/pydantable/io/__init__.py
@contextmanager
def read_parquet_url_ctx(
    dataframe_cls: Any,
    url: str,
    *,
    experimental: bool = True,
    columns: list[str] | None = None,
    **kwargs: Any,
):
    """Download Parquet from ``url`` to a temp file, yield ``DataFrame[Schema]``, delete the file after.

    Pass the parametrized frame class (e.g. ``DataFrame[MySchema]``). The lazy plan must not
    be used after the context exits (the backing file is removed).
    """
    root = read_parquet_url(url, experimental=experimental, columns=columns, **kwargs)
    path = str(getattr(root, "path", "") or "")
    if not path:
        raise RuntimeError(
            "ScanFileRoot.path is empty; cannot manage temp file lifecycle"
        )
    try:
        yield dataframe_cls._from_scan_root(root)
    finally:
        with suppress(OSError):
            os.unlink(path)

aread_parquet_url_ctx async

aread_parquet_url_ctx(dataframe_cls, url, *, experimental=True, columns=None, executor=None, **kwargs)

Async variant of :func:read_parquet_url_ctx (uses :func:aread_parquet_url).

Source code in python/pydantable/io/__init__.py
@asynccontextmanager
async def aread_parquet_url_ctx(
    dataframe_cls: Any,
    url: str,
    *,
    experimental: bool = True,
    columns: list[str] | None = None,
    executor: Any = None,
    **kwargs: Any,
):
    """Async variant of :func:`read_parquet_url_ctx` (uses :func:`aread_parquet_url`)."""
    root = await aread_parquet_url(
        url,
        experimental=experimental,
        columns=columns,
        executor=executor,
        **kwargs,
    )
    path = str(getattr(root, "path", "") or "")
    if not path:
        raise RuntimeError(
            "ScanFileRoot.path is empty; cannot manage temp file lifecycle"
        )
    try:
        yield dataframe_cls._from_scan_root(root)
    finally:
        with suppress(OSError):
            os.unlink(path)

materialize_parquet

materialize_parquet(source, *, columns=None, engine=None)

Eagerly read Parquet into dict[str, list] (loads full data into Python).

Single file: one local path, buffer, or file-like per call. For multiple Parquet files, use :func:read_parquet with glob=True / a directory and materialize via :meth:~pydantable.dataframe.DataFrame.to_dict, or call materialize_parquet per file and merge (mind schema alignment).

  • engine="auto" (default): Rust for local file paths when columns is None; otherwise PyArrow.
  • engine="rust" / "pyarrow": force that implementation.

For out-of-core pipelines prefer :func:read_parquet + :meth:~pydantable.dataframe.DataFrame.write_parquet.

Source code in python/pydantable/io/__init__.py
def materialize_parquet(
    source: _Source,
    *,
    columns: list[str] | None = None,
    engine: str | None = None,
) -> dict[str, list[Any]]:
    """
    Eagerly read Parquet into ``dict[str, list]`` (loads full data into Python).

    **Single file:** one local path, buffer, or file-like per call. For **multiple** Parquet
    files, use :func:`read_parquet` with ``glob=True`` / a directory and materialize via
    :meth:`~pydantable.dataframe.DataFrame.to_dict`, or call ``materialize_parquet`` per file
    and merge (mind schema alignment).

    * ``engine="auto"`` (default): Rust for local file paths when ``columns`` is ``None``;
      otherwise PyArrow.
    * ``engine="rust"`` / ``"pyarrow"``: force that implementation.

    For out-of-core pipelines prefer :func:`read_parquet` + :meth:`~pydantable.dataframe.DataFrame.write_parquet`.
    """
    eng = (engine or _default_engine()).lower()
    with span("io.materialize_parquet", engine=eng, columns=columns is not None):
        use_rust = (
            eng in ("auto", "rust") and columns is None and _is_local_path(source)
        )
        if use_rust and eng != "pyarrow":
            from pydantable_native.io_core import (  # type: ignore[import-not-found]
                rust_read_parquet_path,
            )

            path = str(source)
            if os.path.isfile(path):
                try:
                    return rust_read_parquet_path(path)
                except Exception:
                    # Broad: PyO3/native may wrap diverse failures; fall back to PyArrow when auto.
                    _IO_LOG.debug(
                        "rust_read_parquet_path failed; trying PyArrow",
                        exc_info=True,
                    )
                    if eng == "rust":
                        raise
        if eng == "rust" and not use_rust:
            raise ValueError(
                "Rust Parquet read needs a local file path and columns=None"
            )
        return read_parquet_pyarrow(source, columns=columns)

materialize_ipc

materialize_ipc(source, *, as_stream=False, engine=None)

Read Arrow IPC (file or stream) into dict[str, list].

Single file per call. For multiple IPC files, prefer lazy :func:read_ipc + glob / to_dict, or iterate :func:materialize_ipc per path.

Source code in python/pydantable/io/__init__.py
def materialize_ipc(
    source: _Source,
    *,
    as_stream: bool = False,
    engine: str | None = None,
) -> dict[str, list[Any]]:
    """Read Arrow IPC (file or stream) into ``dict[str, list]``.

    **Single file** per call. For multiple IPC files, prefer lazy :func:`read_ipc` + ``glob`` /
    ``to_dict``, or iterate :func:`materialize_ipc` per path.
    """
    eng = (engine or _default_engine()).lower()
    with span("io.materialize_ipc", engine=eng, as_stream=bool(as_stream)):
        if (
            eng in ("auto", "rust")
            and not as_stream
            and _is_local_path(source)
            and os.path.isfile(str(source))
        ):
            from pydantable_native.io_core import (  # type: ignore[import-not-found]
                rust_read_ipc_path,
            )

            try:
                return rust_read_ipc_path(str(source))
            except Exception:
                # Broad: same Rust→PyArrow fallback contract as materialize_parquet.
                _IO_LOG.debug(
                    "rust_read_ipc_path failed; trying PyArrow", exc_info=True
                )
                if eng == "rust":
                    raise
        if eng == "rust" and (as_stream or not _is_local_path(source)):
            raise ValueError(
                "Rust IPC read supports on-disk file format only (as_stream=False)"
            )
        return read_ipc_pyarrow(source, as_stream=as_stream)

materialize_csv

materialize_csv(path, *, engine=None, use_rap=False)

Read CSV from a local path into dict[str, list].

Single file per call. For multiple CSVs, use :func:read_csv with glob=True / to_dict, or call materialize_csv per file and merge.

  • engine="auto": try Rust, then fall back to stdlib csv on failure.
  • use_rap=True: load via :func:aread_csv_rap (only when no running event loop).
Source code in python/pydantable/io/__init__.py
def materialize_csv(
    path: str | Path,
    *,
    engine: str | None = None,
    use_rap: bool = False,
) -> dict[str, list[Any]]:
    """
    Read CSV from a **local path** into ``dict[str, list]``.

    **Single file** per call. For **multiple** CSVs, use :func:`read_csv` with ``glob=True`` /
    ``to_dict``, or call ``materialize_csv`` per file and merge.

    * ``engine="auto"``: try Rust, then fall back to stdlib ``csv`` on failure.
    * ``use_rap=True``: load via :func:`aread_csv_rap` (only when no running event loop).
    """
    with span(
        "io.materialize_csv",
        engine=(engine or _default_engine()).lower(),
        use_rap=bool(use_rap),
    ):
        if use_rap:
            try:
                asyncio.get_running_loop()
            except RuntimeError:
                return asyncio.run(aread_csv_rap(str(path)))
            raise RuntimeError(
                "in an async context, await aread_csv_rap(path) instead of use_rap=True"
            )

        eng = (engine or _default_engine()).lower()
        if eng in ("auto", "rust"):
            from pydantable_native.io_core import (  # type: ignore[import-not-found]
                rust_read_csv_path,
            )

            try:
                return rust_read_csv_path(str(path))
            except Exception:
                # Broad: Rust CSV failure → stdlib csv when engine=auto.
                _IO_LOG.debug(
                    "rust_read_csv_path failed; using stdlib csv", exc_info=True
                )
                if eng == "rust":
                    raise
        with open(path, newline="", encoding="utf-8") as fh:
            reader = csv.reader(fh)
            header = next(reader)
            cols: dict[str, list[Any]] = {h: [] for h in header}
            for row in reader:
                for i, h in enumerate(header):
                    cols[h].append(row[i] if i < len(row) else None)
            return cols

materialize_ndjson

materialize_ndjson(path, *, engine=None)

Read newline-delimited JSON from a single local path into dict[str, list].

For multiple NDJSON files, use :func:read_ndjson with glob=True / to_dict, or call materialize_ndjson per file and merge.

Source code in python/pydantable/io/__init__.py
def materialize_ndjson(
    path: str | Path, *, engine: str | None = None
) -> dict[str, list[Any]]:
    """Read newline-delimited JSON from a **single local path** into ``dict[str, list]``.

    For **multiple** NDJSON files, use :func:`read_ndjson` with ``glob=True`` / ``to_dict``, or
    call ``materialize_ndjson`` per file and merge.
    """
    eng = (engine or _default_engine()).lower()
    with span("io.materialize_ndjson", engine=eng):
        if eng in ("auto", "rust"):
            from pydantable_native.io_core import (  # type: ignore[import-not-found]
                rust_read_ndjson_path,
            )

            try:
                return rust_read_ndjson_path(str(path))
            except Exception:
                # Broad: Rust NDJSON failure → pure Python JSON lines when engine=auto.
                _IO_LOG.debug(
                    "rust_read_ndjson_path failed; using pure Python JSON lines",
                    exc_info=True,
                )
                if eng == "rust":
                    raise
        rows: list[dict[str, Any]] = []
        with open(path, encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                rows.append(json.loads(line))
        if not rows:
            return {}
        keys = sorted({k for r in rows for k in r})
        return {k: [r.get(k) for r in rows] for k in keys}

materialize_json

materialize_json(path, *, engine=None)

Load a JSON file into dict[str, list]: either a JSON array of objects or JSON Lines.

Single file per call. For multiple JSON files, use lazy :func:read_json / read_ndjson with glob=True and to_dict, or call materialize_json per path.

If the first non-whitespace character is [, the file is parsed as one JSON array. Otherwise the file is read as newline-delimited JSON (same as :func:materialize_ndjson).

Source code in python/pydantable/io/__init__.py
def materialize_json(
    path: str | Path, *, engine: str | None = None
) -> dict[str, list[Any]]:
    """Load a JSON file into ``dict[str, list]``: either a JSON array of objects or JSON Lines.

    **Single file** per call. For **multiple** JSON files, use lazy :func:`read_json` /
    ``read_ndjson`` with ``glob=True`` and ``to_dict``, or call ``materialize_json`` per path.

    If the first non-whitespace character is ``[``, the file is parsed as one JSON array.
    Otherwise the file is read as newline-delimited JSON (same as :func:`materialize_ndjson`).
    """
    p = Path(path)
    eng = (engine or _default_engine()).lower()
    with span("io.materialize_json", engine=eng):
        with p.open(encoding="utf-8") as f:
            while True:
                ch = f.read(1)
                if not ch:
                    return {}
                if not ch.isspace():
                    break
            if ch == "[":
                f.seek(0)
                data = json.load(f)
                if not isinstance(data, list):
                    raise ValueError(
                        "materialize_json: expected a JSON array of objects when file starts with '['"
                    )
                if not data:
                    return {}
                if not all(isinstance(x, dict) for x in data):
                    raise ValueError(
                        "materialize_json: array elements must be JSON objects"
                    )
                return _json_rows_to_columns(data)
            f.seek(0)
        return materialize_ndjson(p, engine=eng)

export_json

export_json(path, data, *, indent=None)

Write dict[str, list] as one JSON array of row objects.

Uses :func:json.dump with default=str. Nested dict/list cells serialize as JSON objects/arrays; non-JSON-native scalars (e.g. datetime, Decimal, UUID) become str(value), not necessarily ISO-8601. For stable JSON output, prefer normalizing rows or using Pydantic model_dump(mode="json") after materialization.

Source code in python/pydantable/io/__init__.py
def export_json(
    path: str | Path,
    data: dict[str, list[Any]],
    *,
    indent: int | None = None,
) -> None:
    """Write ``dict[str, list]`` as one JSON array of row objects.

    Uses :func:`json.dump` with ``default=str``. Nested ``dict``/``list`` cells
    serialize as JSON objects/arrays; non-JSON-native scalars (e.g. ``datetime``,
    ``Decimal``, ``UUID``) become ``str(value)``, not necessarily ISO-8601. For
    stable JSON output, prefer normalizing rows or using Pydantic
    ``model_dump(mode="json")`` after materialization.
    """
    keys = list(data.keys())
    n = len(data[keys[0]]) if keys else 0
    rows = [{k: data[k][i] for k in keys} for i in range(n)]
    with open(path, "w", encoding="utf-8") as fh:
        json.dump(rows, fh, indent=indent, default=str)

export_parquet

export_parquet(path, data, *, engine=None)

Write dict[str, list] to Parquet (eager). For lazy plan output use :meth:DataFrame.write_parquet.

Source code in python/pydantable/io/__init__.py
def export_parquet(
    path: str | Path, data: dict[str, list[Any]], *, engine: str | None = None
) -> None:
    """Write ``dict[str, list]`` to Parquet (eager). For lazy plan output use :meth:`DataFrame.write_parquet`."""
    eng = (engine or _default_engine()).lower()
    with span("io.export_parquet", engine=eng, path=str(path)):
        if eng in ("auto", "rust"):
            from pydantable_native.io_core import (  # type: ignore[import-not-found]
                rust_write_parquet_path,
            )

            try:
                rust_write_parquet_path(str(path), data)
                return
            except ImportError:
                if eng == "rust":
                    raise
        try:
            import pyarrow as pa  # type: ignore[import-not-found, import-untyped]
            import pyarrow.parquet as pq  # type: ignore[import-not-found, import-untyped]
        except ImportError as e:
            raise ImportError(
                "export_parquet fallback requires pyarrow (pip install 'pydantable[arrow]')."
            ) from e
        pq.write_table(pa.Table.from_pydict(data), str(path))

export_csv

export_csv(path, data, *, engine=None)

Write dict[str, list] to CSV (eager).

Source code in python/pydantable/io/__init__.py
def export_csv(
    path: str | Path, data: dict[str, list[Any]], *, engine: str | None = None
) -> None:
    """Write ``dict[str, list]`` to CSV (eager)."""
    eng = (engine or _default_engine()).lower()
    with span("io.export_csv", engine=eng, path=str(path)):
        if eng in ("auto", "rust"):
            from pydantable_native.io_core import (  # type: ignore[import-not-found]
                rust_write_csv_path,
            )

            try:
                rust_write_csv_path(str(path), data)
                return
            except ImportError:
                if eng == "rust":
                    raise
        headers = list(data.keys())
        n = len(data[headers[0]]) if headers else 0
        with open(path, "w", newline="", encoding="utf-8") as fh:
            w = csv.writer(fh)
            w.writerow(headers)
            for i in range(n):
                w.writerow([data[h][i] for h in headers])

export_ndjson

export_ndjson(path, data, *, engine=None)

Write dict[str, list] as newline-delimited JSON (eager).

Source code in python/pydantable/io/__init__.py
def export_ndjson(
    path: str | Path, data: dict[str, list[Any]], *, engine: str | None = None
) -> None:
    """Write ``dict[str, list]`` as newline-delimited JSON (eager)."""
    eng = (engine or _default_engine()).lower()
    with span("io.export_ndjson", engine=eng, path=str(path)):
        if eng in ("auto", "rust"):
            from pydantable_native.io_core import (  # type: ignore[import-not-found]
                rust_write_ndjson_path,
            )

            try:
                rust_write_ndjson_path(str(path), data)
                return
            except ImportError:
                if eng == "rust":
                    raise
        headers = list(data.keys())
        n = len(data[headers[0]]) if headers else 0
        with open(path, "w", encoding="utf-8") as fh:
            for i in range(n):
                fh.write(
                    json.dumps({h: data[h][i] for h in headers}, default=str) + "\n"
                )

export_ipc

export_ipc(path, data, *, engine=None)

Write dict[str, list] to Arrow IPC file (eager).

Source code in python/pydantable/io/__init__.py
def export_ipc(
    path: str | Path, data: dict[str, list[Any]], *, engine: str | None = None
) -> None:
    """Write ``dict[str, list]`` to Arrow IPC file (eager)."""
    eng = (engine or _default_engine()).lower()
    with span("io.export_ipc", engine=eng, path=str(path)):
        if eng in ("auto", "rust"):
            from pydantable_native.io_core import (  # type: ignore[import-not-found]
                rust_write_ipc_path,
            )

            try:
                rust_write_ipc_path(str(path), data)
                return
            except ImportError:
                if eng == "rust":
                    raise
        try:
            import pyarrow as pa  # type: ignore[import-not-found, import-untyped]
        except ImportError as e:
            raise ImportError(
                "export_ipc fallback requires pyarrow (pip install 'pydantable[arrow]')."
            ) from e
        table = pa.Table.from_pydict(data)
        with open(path, "wb") as sink, pa.ipc.new_file(sink, table.schema) as writer:
            writer.write_table(table)

afetch_sql_raw async

afetch_sql_raw(sql, bind, *, parameters=None, batch_size=None, auto_stream=True, auto_stream_threshold_rows=None, executor=None)

Async :func:fetch_sql_raw via :func:asyncio.to_thread (optional Executor).

Source code in python/pydantable/io/__init__.py
async def afetch_sql_raw(
    sql: str,
    bind: str | Any,
    *,
    parameters: Mapping[str, Any] | None = None,
    batch_size: int | None = None,
    auto_stream: bool = True,
    auto_stream_threshold_rows: int | None = None,
    executor: Executor | None = None,
) -> dict[str, list[Any]] | StreamingColumns:
    """Async :func:`fetch_sql_raw` via :func:`asyncio.to_thread` (optional ``Executor``)."""
    return await _run_io(
        fetch_sql_raw,
        (sql, bind),
        {
            "parameters": parameters,
            "batch_size": batch_size,
            "auto_stream": auto_stream,
            "auto_stream_threshold_rows": auto_stream_threshold_rows,
        },
        executor=executor,
    )

aiter_sql_raw async

aiter_sql_raw(sql, bind, *, parameters=None, batch_size=65536, executor=None)

Async generator yielding batches from :func:iter_sql_raw without blocking the loop.

This runs the synchronous SQLAlchemy cursor in a background thread and streams batch dicts through an asyncio.Queue.

Source code in python/pydantable/io/__init__.py
async def aiter_sql_raw(
    sql: str,
    bind: str | Any,
    *,
    parameters: Mapping[str, Any] | None = None,
    batch_size: int = 65_536,
    executor: Executor | None = None,
):
    """
    Async generator yielding batches from :func:`iter_sql_raw` without blocking the loop.

    This runs the synchronous SQLAlchemy cursor in a background thread and streams
    batch dicts through an ``asyncio.Queue``.
    """
    import asyncio
    import threading

    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")

    q: asyncio.Queue[object] = asyncio.Queue(maxsize=2)
    sentinel = object()
    stop = threading.Event()

    loop = asyncio.get_running_loop()

    def _put(item: object) -> None:
        # Backpressure: never drop batches if the async consumer is slow.
        # We block the producer thread until the event loop enqueues the item.
        if stop.is_set():
            return
        fut = asyncio.run_coroutine_threadsafe(q.put(item), loop)
        try:
            # Avoid deadlocking the producer thread if the consumer stops early
            # (queue fills and q.put never completes).
            while not stop.is_set():
                try:
                    fut.result(timeout=0.25)
                    return
                except TimeoutError:
                    continue
        except BaseException:
            return
        finally:
            if stop.is_set():
                with suppress(BaseException):
                    fut.cancel()

    def _runner() -> None:
        try:
            for batch in iter_sql_raw(
                sql,
                bind,
                parameters=parameters,
                batch_size=batch_size,
            ):
                if stop.is_set():
                    return
                _put(batch)
        except BaseException as e:  # propagate exceptions to async consumer
            _put(e)
        finally:
            _put(sentinel)

    if executor is not None:
        loop.run_in_executor(executor, _runner)
    else:
        threading.Thread(target=_runner, daemon=True).start()

    try:
        while True:
            item = await q.get()
            if item is sentinel:
                return
            if isinstance(item, BaseException):
                raise item
            yield item  # dict[str, list[Any]]
    finally:
        stop.set()
        with suppress(BaseException):
            loop.call_soon_threadsafe(q.put_nowait, sentinel)

aiter_sql async

aiter_sql(sql, bind, *, parameters=None, batch_size=65536, executor=None)
Source code in python/pydantable/io/__init__.py
async def aiter_sql(
    sql: str,
    bind: str | Any,
    *,
    parameters: Mapping[str, Any] | None = None,
    batch_size: int = 65_536,
    executor: Executor | None = None,
):
    """Deprecated: use :func:`aiter_sql_raw` or :func:`aiter_sqlmodel`."""
    warnings.warn(
        "aiter_sql is deprecated and will be removed in a future major version; "
        "for mapped tables use aiter_sqlmodel(...), for string SQL use aiter_sql_raw(...).",
        DeprecationWarning,
        stacklevel=2,
    )
    async for batch in aiter_sql_raw(
        sql,
        bind,
        parameters=parameters,
        batch_size=batch_size,
        executor=executor,
    ):
        yield batch

aiter_sqlmodel async

aiter_sqlmodel(model, bind, *, where=None, parameters=None, columns=None, order_by=None, limit=None, batch_size=65536, executor=None)

Async generator yielding batches from :func:iter_sqlmodel without blocking the loop.

Source code in python/pydantable/io/__init__.py
async def aiter_sqlmodel(
    model: Any,
    bind: str | Any,
    *,
    where: Any | None = None,
    parameters: Mapping[str, Any] | None = None,
    columns: Sequence[Any] | None = None,
    order_by: Sequence[Any] | None = None,
    limit: int | None = None,
    batch_size: int = 65_536,
    executor: Executor | None = None,
):
    """
    Async generator yielding batches from :func:`iter_sqlmodel` without blocking the loop.
    """
    import asyncio
    import threading

    if batch_size <= 0:
        raise ValueError("batch_size must be a positive integer")

    q: asyncio.Queue[object] = asyncio.Queue(maxsize=2)
    sentinel = object()
    stop = threading.Event()
    loop = asyncio.get_running_loop()

    def _put(item: object) -> None:
        if stop.is_set():
            return
        fut = asyncio.run_coroutine_threadsafe(q.put(item), loop)
        try:
            while not stop.is_set():
                try:
                    fut.result(timeout=0.25)
                    return
                except TimeoutError:
                    continue
        except BaseException:
            return
        finally:
            if stop.is_set():
                with suppress(BaseException):
                    fut.cancel()

    def _runner() -> None:
        try:
            for batch in iter_sqlmodel(
                model,
                bind,
                where=where,
                parameters=parameters,
                columns=columns,
                order_by=order_by,
                limit=limit,
                batch_size=batch_size,
            ):
                if stop.is_set():
                    return
                _put(batch)
        except BaseException as e:
            _put(e)
        finally:
            _put(sentinel)

    if executor is not None:
        loop.run_in_executor(executor, _runner)
    else:
        threading.Thread(target=_runner, daemon=True).start()

    try:
        while True:
            item = await q.get()
            if item is sentinel:
                return
            if isinstance(item, BaseException):
                raise item
            yield item
    finally:
        stop.set()
        with suppress(BaseException):
            loop.call_soon_threadsafe(q.put_nowait, sentinel)

aiter_parquet async

aiter_parquet(path, *, batch_size=65536, columns=None, executor=None)

Async batches from :func:iter_parquet.

Source code in python/pydantable/io/__init__.py
async def aiter_parquet(
    path: str | Path,
    *,
    batch_size: int = 65_536,
    columns: list[str] | None = None,
    executor: Executor | None = None,
):
    """Async batches from :func:`iter_parquet`."""
    it = iter_parquet(path, batch_size=batch_size, columns=columns)
    async for batch in _aiter_from_iter(it, executor=executor):
        yield batch

aiter_ipc async

aiter_ipc(source, *, batch_size=65536, as_stream=False, executor=None)

Async batches from :func:iter_ipc.

Source code in python/pydantable/io/__init__.py
async def aiter_ipc(
    source: _Source,
    *,
    batch_size: int = 65_536,
    as_stream: bool = False,
    executor: Executor | None = None,
):
    """Async batches from :func:`iter_ipc`."""
    it = iter_ipc(source, batch_size=batch_size, as_stream=as_stream)
    async for batch in _aiter_from_iter(it, executor=executor):
        yield batch

aiter_csv async

aiter_csv(path, *, batch_size=65536, encoding='utf-8', executor=None)

Async batches from :func:iter_csv.

Source code in python/pydantable/io/__init__.py
async def aiter_csv(
    path: str | Path,
    *,
    batch_size: int = 65_536,
    encoding: str = "utf-8",
    executor: Executor | None = None,
):
    """Async batches from :func:`iter_csv`."""
    it = iter_csv(path, batch_size=batch_size, encoding=encoding)
    async for batch in _aiter_from_iter(it, executor=executor):
        yield batch

aiter_ndjson async

aiter_ndjson(path, *, batch_size=65536, encoding='utf-8', executor=None)

Async batches from :func:iter_ndjson.

Source code in python/pydantable/io/__init__.py
async def aiter_ndjson(
    path: str | Path,
    *,
    batch_size: int = 65_536,
    encoding: str = "utf-8",
    executor: Executor | None = None,
):
    """Async batches from :func:`iter_ndjson`."""
    it = iter_ndjson(path, batch_size=batch_size, encoding=encoding)
    async for batch in _aiter_from_iter(it, executor=executor):
        yield batch

aiter_json_lines async

aiter_json_lines(path, *, batch_size=65536, encoding='utf-8', executor=None)

Async batches from :func:iter_json_lines.

Source code in python/pydantable/io/__init__.py
async def aiter_json_lines(
    path: str | Path,
    *,
    batch_size: int = 65_536,
    encoding: str = "utf-8",
    executor: Executor | None = None,
):
    """Async batches from :func:`iter_json_lines`."""
    it = iter_json_lines(path, batch_size=batch_size, encoding=encoding)
    async for batch in _aiter_from_iter(it, executor=executor):
        yield batch

aiter_json_array async

aiter_json_array(path, *, batch_size=65536, encoding='utf-8', executor=None)

Async batches from :func:iter_json_array.

Source code in python/pydantable/io/__init__.py
async def aiter_json_array(
    path: str | Path,
    *,
    batch_size: int = 65_536,
    encoding: str = "utf-8",
    executor: Executor | None = None,
):
    """Async batches from :func:`iter_json_array`."""
    it = iter_json_array(path, batch_size=batch_size, encoding=encoding)
    async for batch in _aiter_from_iter(it, executor=executor):
        yield batch

afetch_mongo async

afetch_mongo(collection, *, match=None, projection=None, sort=None, skip=None, limit=None, fields=None, session=None, max_time_ms=None, executor=None)

Async :func:fetch_mongo.

Uses PyMongo's async driver for :class:~pymongo.asynchronous.collection.AsyncCollection; otherwise :func:asyncio.to_thread (optional Executor) for sync collections.

Source code in python/pydantable/io/__init__.py
async def afetch_mongo(
    collection: Any,
    *,
    match: Mapping[str, Any] | None = None,
    projection: Any = None,
    sort: Sequence[tuple[str, int]] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    fields: Sequence[str] | None = None,
    session: Any | None = None,
    max_time_ms: int | None = None,
    executor: Executor | None = None,
) -> dict[str, list[Any]]:
    """Async :func:`fetch_mongo`.

    Uses PyMongo's async driver for :class:`~pymongo.asynchronous.collection.AsyncCollection`;
    otherwise :func:`asyncio.to_thread` (optional ``Executor``) for sync collections.
    """
    if is_async_mongo_collection(collection):
        return await afetch_mongo_async(
            collection,
            match=match,
            projection=projection,
            sort=sort,
            skip=skip,
            limit=limit,
            fields=fields,
            session=session,
            max_time_ms=max_time_ms,
        )
    return await _run_io(
        fetch_mongo,
        (collection,),
        {
            "match": match,
            "projection": projection,
            "sort": sort,
            "skip": skip,
            "limit": limit,
            "fields": fields,
            "session": session,
            "max_time_ms": max_time_ms,
        },
        executor=executor,
    )

aiter_mongo async

aiter_mongo(collection, *, match=None, projection=None, sort=None, skip=None, limit=None, batch_size=1000, fields=None, session=None, max_time_ms=None, executor=None)

Async batches from :func:iter_mongo (native async or thread-backed).

Source code in python/pydantable/io/__init__.py
async def aiter_mongo(
    collection: Any,
    *,
    match: Mapping[str, Any] | None = None,
    projection: Any = None,
    sort: Sequence[tuple[str, int]] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    batch_size: int = 1000,
    fields: Sequence[str] | None = None,
    session: Any | None = None,
    max_time_ms: int | None = None,
    executor: Executor | None = None,
):
    """Async batches from :func:`iter_mongo` (native async or thread-backed)."""
    if is_async_mongo_collection(collection):
        async for batch in aiter_mongo_async(
            collection,
            match=match,
            projection=projection,
            sort=sort,
            skip=skip,
            limit=limit,
            batch_size=batch_size,
            fields=fields,
            session=session,
            max_time_ms=max_time_ms,
        ):
            yield batch
        return
    it = iter_mongo(
        collection,
        match=match,
        projection=projection,
        sort=sort,
        skip=skip,
        limit=limit,
        batch_size=batch_size,
        fields=fields,
        session=session,
        max_time_ms=max_time_ms,
    )
    async for batch in _aiter_from_iter(it, executor=executor):
        yield batch

awrite_mongo async

awrite_mongo(collection, data, *, ordered=True, chunk_size=None, session=None, executor=None)

Async :func:write_mongo (native async or thread-backed).

Source code in python/pydantable/io/__init__.py
async def awrite_mongo(
    collection: Any,
    data: dict[str, list[Any]],
    *,
    ordered: bool = True,
    chunk_size: int | None = None,
    session: Any | None = None,
    executor: Executor | None = None,
) -> int:
    """Async :func:`write_mongo` (native async or thread-backed)."""
    if is_async_mongo_collection(collection):
        return await awrite_mongo_async(
            collection,
            data,
            ordered=ordered,
            chunk_size=chunk_size,
            session=session,
        )
    return await _run_io(
        write_mongo,
        (collection, data),
        {"ordered": ordered, "chunk_size": chunk_size, "session": session},
        executor=executor,
    )

awrite_sql_raw async

awrite_sql_raw(data, table_name, bind, *, schema=None, if_exists='append', chunk_size=None, executor=None)

Async :func:write_sql_raw via :func:asyncio.to_thread (optional Executor).

Source code in python/pydantable/io/__init__.py
async def awrite_sql_raw(
    data: dict[str, list[Any]],
    table_name: str,
    bind: str | Any,
    *,
    schema: str | None = None,
    if_exists: str = "append",
    chunk_size: int | None = None,
    executor: Executor | None = None,
) -> None:
    """Async :func:`write_sql_raw` via :func:`asyncio.to_thread` (optional ``Executor``)."""
    await _run_io(
        write_sql_raw,
        (data, table_name, bind),
        {"schema": schema, "if_exists": if_exists, "chunk_size": chunk_size},
        executor=executor,
    )

write_sql_batches

write_sql_batches(batches, table_name, bind, *, schema=None, if_exists='append', chunk_size=None)

Write an iterator of batch column dicts to SQL.

Deprecated: prefer calling :func:write_sql_raw per batch.

Each batch is a dict[str, list] (e.g. from :func:iter_sql_raw or a :class:~pydantable.DataFrameModel batch via .to_dict()).

Source code in python/pydantable/io/__init__.py
def write_sql_batches(
    batches: Any,
    table_name: str,
    bind: str | Any,
    *,
    schema: str | None = None,
    if_exists: str = "append",
    chunk_size: int | None = None,
) -> None:
    """
    Write an iterator of batch column dicts to SQL.

    Deprecated: prefer calling :func:`write_sql_raw` per batch.

    Each batch is a ``dict[str, list]`` (e.g. from :func:`iter_sql_raw` or a
    :class:`~pydantable.DataFrameModel` batch via ``.to_dict()``).
    """
    warnings.warn(
        "write_sql_batches is deprecated and will be removed in a future major version; "
        "call write_sql_raw once per batch or use write_sqlmodel_batches for table models.",
        DeprecationWarning,
        stacklevel=2,
    )
    first = True
    for batch in batches:
        cols = batch.to_dict() if hasattr(batch, "to_dict") else batch
        mode = if_exists if first else "append"
        write_sql_raw(
            cols,
            table_name,
            bind,
            schema=schema,
            if_exists=mode,
            chunk_size=chunk_size,
        )
        first = False

write_sqlmodel_batches

write_sqlmodel_batches(batches, model, bind, *, schema=None, if_exists='append', chunk_size=None, validate_rows=False, replace_ok=False)

Write an iterator of batch column dicts via :func:write_sqlmodel.

The first batch uses if_exists; later batches always append.

Source code in python/pydantable/io/__init__.py
def write_sqlmodel_batches(
    batches: Any,
    model: Any,
    bind: str | Any,
    *,
    schema: str | None = None,
    if_exists: str = "append",
    chunk_size: int | None = None,
    validate_rows: bool = False,
    replace_ok: bool = False,
) -> None:
    """
    Write an iterator of batch column dicts via :func:`write_sqlmodel`.

    The first batch uses ``if_exists``; later batches always append.
    """
    first = True
    for batch in batches:
        cols = batch.to_dict() if hasattr(batch, "to_dict") else batch
        mode = if_exists if first else "append"
        write_sqlmodel(
            cols,
            model,
            bind,
            schema=schema,
            if_exists=mode,
            chunk_size=chunk_size,
            validate_rows=validate_rows,
            replace_ok=replace_ok if mode == "replace" else False,
        )
        first = False