Skip to content

HTTP(S) and object-store reads

Primary: use DataFrame[Schema].read_parquet_url for lazy Parquet over HTTP, or pass column dicts from downloads into DataFrame / DataFrameModel. Secondary: pydantable.iofetch_bytes, fetch_*_url, read_from_object_store (re-exported from pydantable.io.http).

Experimental gate

By default, URL and cloud-style helpers require experimental=True on each call or environment variable PYDANTABLE_IO_EXPERIMENTAL=1. This matches fetch_bytes and read_from_object_store.

DataFrame / DataFrameModel

Lazy Parquet URL

  • DataFrame[Schema].read_parquet_url(url, *, experimental=True, columns=None, **kwargs)
  • MyModel.read_parquet_url(...)kwargs for fetch_bytes only (not scan_kwargs).
  • pydantable.io.read_parquet_url_ctx, aread_parquet_url_ctx, and DataFrameModel.read_parquet_url_ctx, aread_parquet_url_ctx delete the temp file when the context block exits (preferred when you do not need the path yourself).

The temp .parquet from the non-context read_parquet_url is not auto-deleted; see IO_PARQUET and DATA_IO_SOURCES.

Eager URL helpers (column dict)

Pass fetch_parquet_url, fetch_csv_url, or fetch_ndjson_url results to MyModel(cols) or DataFrame[Schema](cols) — there are no DataFrameModel.fetch_*_url classmethods.

Object stores

Pass read_from_object_store(...) output to MyModel(cols) or DataFrame[Schema](cols) — there is no DataFrameModel.read_from_object_store.

pydantable.io

Raw bytes

  • fetch_bytes(url, *, experimental=True, headers=None, timeout=60.0, max_bytes=None)HTTP/HTTPS only (stdlib urllib). Set max_bytes to cap download size (ValueError if exceeded).

Eager format helpers (dict[str, list])

Function Notes
fetch_parquet_url PyArrow on bytes; optional columns=.
fetch_csv_url Temp CSV file; Rust read with stdlib fallback.
fetch_ndjson_url Temp NDJSON file; Rust read.

kwargs beyond each function’s explicit parameters are passed through to fetch_bytes (e.g. headers, timeout, max_bytes).

Lazy Parquet URL

  • read_parquet_url, aread_parquet_url — return ScanFileRoot; temp-file lifecycle as above unless you use a context manager.
  • read_parquet_url_ctx(dataframe_cls, url, ...), aread_parquet_url_ctx — yield DataFrame[Schema] and unlink the temp path in finally.

Object-store URIs (s3://, gs://, az://, …)

  • read_from_object_store(uri, *, experimental=True, format="parquet", max_bytes=None)

Requires fsspec and a backend (e.g. s3fs). Install pydantable[cloud] or add dependencies manually. format is "parquet" (default), "csv", or "ndjson" / "jsonl" (object is read into memory in chunks until complete or max_bytes). max_bytes limits how many bytes are buffered (ValueError if exceeded).

Runnable example

Spawns a local http.server on 127.0.0.1 (no external network). Run conventions: IO_OVERVIEW (Runnable example).

python docs/examples/io/http_local_fetch.py

The script uses read_parquet_url from pydantable.io so it can os.unlink(root.path) after collect(); DataFrame.read_parquet_url performs the same download but you must arrange cleanup yourself if you need to delete the temp file immediately.

"""Local HTTP server: Parquet asset, CSV legacy report, NDJSON log (temp files).

Uses methods on DataFrameModel; stdlib urllib to assert the wire bytes.

Run::

python docs/examples/io/http_local_fetch.py

"""

from future import annotations

import os import tempfile import threading import urllib.request from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path

from pydantable import DataFrameModel

class ProductMetric(DataFrameModel): """Row materialized from a Parquet blob served over HTTP."""

units_sold: int

class LegacyCsvRow(DataFrameModel): """Two-column report from an older system that only serves CSV."""

region_id: int
revenue_usd: int

class LogLine(DataFrameModel): """Single field from a newline-delimited log download."""

trace_id: int

def _serve_blob(blob: bytes) -> tuple[HTTPServer, str]: class Handler(BaseHTTPRequestHandler): def do_GET(self) -> None: self.send_response(200) self.send_header("Content-Length", str(len(blob))) self.end_headers() self.wfile.write(blob)

    def log_message(self, *args: object) -> None:
        return

server = HTTPServer(("127.0.0.1", 0), Handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
url = f"http://127.0.0.1:{server.server_port}/blob"
return server, url

def main() -> None: with tempfile.TemporaryDirectory() as td: pq = Path(td) / "metrics.parquet" ProductMetric({"units_sold": [10, 25, 3]}).write_parquet(str(pq)) parquet_blob = pq.read_bytes()

server, parquet_url = _serve_blob(parquet_blob)
try:
    assert urllib.request.urlopen(parquet_url).read() == parquet_blob

    df = ProductMetric.read_parquet_url(parquet_url, experimental=True)
    try:
        assert [r.units_sold for r in df.collect()] == [10, 25, 3]
    finally:
        os.unlink(df._df._root_data.path)
finally:
    server.shutdown()
    server.server_close()

csv_blob = b"region_id,revenue_usd\n3,45000\n"
server2, csv_url = _serve_blob(csv_blob)
try:
    data = urllib.request.urlopen(csv_url).read()
    with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as f:
        f.write(data)
        csv_path = f.name
    try:
        d = LegacyCsvRow.read_csv(csv_path).to_dict()
        assert [int(x) for x in d["region_id"]] == [3]
        assert [int(x) for x in d["revenue_usd"]] == [45000]
    finally:
        os.unlink(csv_path)
finally:
    server2.shutdown()
    server2.server_close()

ndjson_blob = b'{"trace_id":9001}\n{"trace_id":9002}\n'
server3, nd_url = _serve_blob(ndjson_blob)
try:
    data = urllib.request.urlopen(nd_url).read()
    with tempfile.NamedTemporaryFile(suffix=".ndjson", delete=False) as f:
        f.write(data)
        nd_path = f.name
    try:
        d = LogLine.read_ndjson(nd_path).to_dict()
        assert [int(x) for x in d["trace_id"]] == [9001, 9002]
    finally:
        os.unlink(nd_path)
finally:
    server3.shutdown()
    server3.server_close()

print("http_local_fetch: ok")

if name == "main": main()

Output

http_local_fetch: ok

See also

IO_OVERVIEW · IO_PARQUET · IO_CSV · IO_NDJSON · FASTAPI (thread-pool patterns for I/O)