HTTP(S) and object-store reads¶
Primary: use DataFrame[Schema].read_parquet_url for lazy Parquet over HTTP, or pass column dicts from downloads into DataFrame / DataFrameModel. Secondary: pydantable.io — fetch_bytes, fetch_*_url, read_from_object_store (re-exported from pydantable.io.http).
Experimental gate¶
By default, URL and cloud-style helpers require experimental=True on each call or environment variable PYDANTABLE_IO_EXPERIMENTAL=1. This matches fetch_bytes and read_from_object_store.
DataFrame / DataFrameModel¶
Lazy Parquet URL
DataFrame[Schema].read_parquet_url(url, *, experimental=True, columns=None, **kwargs)MyModel.read_parquet_url(...)—kwargsforfetch_bytesonly (notscan_kwargs).pydantable.io.read_parquet_url_ctx,aread_parquet_url_ctx, andDataFrameModel.read_parquet_url_ctx,aread_parquet_url_ctxdelete the temp file when the context block exits (preferred when you do not need the path yourself).
The temp .parquet from the non-context read_parquet_url is not auto-deleted; see IO_PARQUET and DATA_IO_SOURCES.
Eager URL helpers (column dict)
Pass fetch_parquet_url, fetch_csv_url, or fetch_ndjson_url results to MyModel(cols) or DataFrame[Schema](cols) — there are no DataFrameModel.fetch_*_url classmethods.
Object stores
Pass read_from_object_store(...) output to MyModel(cols) or DataFrame[Schema](cols) — there is no DataFrameModel.read_from_object_store.
pydantable.io¶
Raw bytes¶
fetch_bytes(url, *, experimental=True, headers=None, timeout=60.0, max_bytes=None)— HTTP/HTTPS only (stdliburllib). Setmax_bytesto cap download size (ValueErrorif exceeded).
Eager format helpers (dict[str, list])¶
| Function | Notes |
|---|---|
fetch_parquet_url |
PyArrow on bytes; optional columns=. |
fetch_csv_url |
Temp CSV file; Rust read with stdlib fallback. |
fetch_ndjson_url |
Temp NDJSON file; Rust read. |
kwargs beyond each function’s explicit parameters are passed through to fetch_bytes (e.g. headers, timeout, max_bytes).
Lazy Parquet URL¶
read_parquet_url,aread_parquet_url— returnScanFileRoot; temp-file lifecycle as above unless you use a context manager.read_parquet_url_ctx(dataframe_cls, url, ...),aread_parquet_url_ctx— yieldDataFrame[Schema]and unlink the temp path infinally.
Object-store URIs (s3://, gs://, az://, …)¶
read_from_object_store(uri, *, experimental=True, format="parquet", max_bytes=None)
Requires fsspec and a backend (e.g. s3fs). Install pydantable[cloud] or add dependencies manually. format is "parquet" (default), "csv", or "ndjson" / "jsonl" (object is read into memory in chunks until complete or max_bytes). max_bytes limits how many bytes are buffered (ValueError if exceeded).
Runnable example¶
Spawns a local http.server on 127.0.0.1 (no external network). Run conventions: IO_OVERVIEW (Runnable example).
The script uses read_parquet_url from pydantable.io so it can os.unlink(root.path) after collect(); DataFrame.read_parquet_url performs the same download but you must arrange cleanup yourself if you need to delete the temp file immediately.
"""Local HTTP server: Parquet asset, CSV legacy report, NDJSON log (temp files).
Uses methods on DataFrameModel; stdlib urllib to assert the wire bytes.
Run::
python docs/examples/io/http_local_fetch.py
"""
from future import annotations
import os import tempfile import threading import urllib.request from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path
from pydantable import DataFrameModel
class ProductMetric(DataFrameModel): """Row materialized from a Parquet blob served over HTTP."""
units_sold: int
class LegacyCsvRow(DataFrameModel): """Two-column report from an older system that only serves CSV."""
region_id: int
revenue_usd: int
class LogLine(DataFrameModel): """Single field from a newline-delimited log download."""
trace_id: int
def _serve_blob(blob: bytes) -> tuple[HTTPServer, str]: class Handler(BaseHTTPRequestHandler): def do_GET(self) -> None: self.send_response(200) self.send_header("Content-Length", str(len(blob))) self.end_headers() self.wfile.write(blob)
def log_message(self, *args: object) -> None:
return
server = HTTPServer(("127.0.0.1", 0), Handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
url = f"http://127.0.0.1:{server.server_port}/blob"
return server, url
def main() -> None: with tempfile.TemporaryDirectory() as td: pq = Path(td) / "metrics.parquet" ProductMetric({"units_sold": [10, 25, 3]}).write_parquet(str(pq)) parquet_blob = pq.read_bytes()
server, parquet_url = _serve_blob(parquet_blob)
try:
assert urllib.request.urlopen(parquet_url).read() == parquet_blob
df = ProductMetric.read_parquet_url(parquet_url, experimental=True)
try:
assert [r.units_sold for r in df.collect()] == [10, 25, 3]
finally:
os.unlink(df._df._root_data.path)
finally:
server.shutdown()
server.server_close()
csv_blob = b"region_id,revenue_usd\n3,45000\n"
server2, csv_url = _serve_blob(csv_blob)
try:
data = urllib.request.urlopen(csv_url).read()
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as f:
f.write(data)
csv_path = f.name
try:
d = LegacyCsvRow.read_csv(csv_path).to_dict()
assert [int(x) for x in d["region_id"]] == [3]
assert [int(x) for x in d["revenue_usd"]] == [45000]
finally:
os.unlink(csv_path)
finally:
server2.shutdown()
server2.server_close()
ndjson_blob = b'{"trace_id":9001}\n{"trace_id":9002}\n'
server3, nd_url = _serve_blob(ndjson_blob)
try:
data = urllib.request.urlopen(nd_url).read()
with tempfile.NamedTemporaryFile(suffix=".ndjson", delete=False) as f:
f.write(data)
nd_path = f.name
try:
d = LogLine.read_ndjson(nd_path).to_dict()
assert [int(x) for x in d["trace_id"]] == [9001, 9002]
finally:
os.unlink(nd_path)
finally:
server3.shutdown()
server3.server_close()
print("http_local_fetch: ok")
if name == "main": main()
Output¶
See also¶
IO_OVERVIEW · IO_PARQUET · IO_CSV · IO_NDJSON · FASTAPI (thread-pool patterns for I/O)