Skip to content

pydantable.selectors

Column selector helpers for select / with_columns.

pydantable.selectors

Schema-driven column selection for :meth:DataFrame.select_schema.

Construct :class:Selector values with factories such as :func:everything, :func:by_name, :func:by_dtype, and dtype groups (:data:NUMERIC, :data:STRUCT, ...). Combine selectors with | (union), & (intersection), - (difference), and ~ (complement). Resolution uses only the current column name → annotation mapping.

See the SELECTORS documentation page.

Selector dataclass

Schema-driven column selector.

A Selector resolves only against a DataFrame's current schema. This keeps the DSL deterministic and compatible with schema-first typing.

Source code in python/pydantable/selectors.py
@dataclass(frozen=True, slots=True)
class Selector:
    """
    Schema-driven column selector.

    A Selector resolves *only* against a DataFrame's current schema. This keeps the
    DSL deterministic and compatible with schema-first typing.
    """

    _resolver: Callable[[Mapping[str, Any]], set[str]]
    _repr: str | None = None

    def __repr__(self) -> str:  # pragma: no cover
        return self._repr or "Selector(<resolver>)"

    def resolve(self, schema_field_types: Mapping[str, Any]) -> list[str]:
        """Return matching column names in *schema* iteration order."""
        selected = self._resolver(schema_field_types)
        return [name for name in schema_field_types if name in selected]

    def exclude(self, other: Selector | str | Iterable[str]) -> Selector:
        """Equivalent to ``self - other`` (remove columns matched by ``other``)."""
        return self - other

    def __or__(self, other: Selector | str | Iterable[str]) -> Selector:
        """Union of the two selectors' column sets."""
        o = _as_selector(other)
        rep = (
            f"({self!r} | {o!r})"
            if self._repr is not None and o._repr is not None
            else None
        )
        return Selector(
            lambda schema: self._resolver(schema) | o._resolver(schema), rep
        )

    def __and__(self, other: Selector | str | Iterable[str]) -> Selector:
        """Intersection of the two selectors' column sets."""
        o = _as_selector(other)
        rep = (
            f"({self!r} & {o!r})"
            if self._repr is not None and o._repr is not None
            else None
        )
        return Selector(
            lambda schema: self._resolver(schema) & o._resolver(schema), rep
        )

    def __sub__(self, other: Selector | str | Iterable[str]) -> Selector:
        """Set difference: columns in ``self`` that are not in ``other``."""
        o = _as_selector(other)
        rep = (
            f"({self!r} - {o!r})"
            if self._repr is not None and o._repr is not None
            else None
        )
        return Selector(
            lambda schema: self._resolver(schema) - o._resolver(schema), rep
        )

    def __invert__(self) -> Selector:
        """Complement: all schema columns not matched by ``self``."""
        rep = f"(~{self!r})" if self._repr is not None else None
        return Selector(lambda schema: set(schema) - self._resolver(schema), rep)

resolve

resolve(schema_field_types)

Return matching column names in schema iteration order.

Source code in python/pydantable/selectors.py
def resolve(self, schema_field_types: Mapping[str, Any]) -> list[str]:
    """Return matching column names in *schema* iteration order."""
    selected = self._resolver(schema_field_types)
    return [name for name in schema_field_types if name in selected]

exclude

exclude(other)

Equivalent to self - other (remove columns matched by other).

Source code in python/pydantable/selectors.py
def exclude(self, other: Selector | str | Iterable[str]) -> Selector:
    """Equivalent to ``self - other`` (remove columns matched by ``other``)."""
    return self - other

__or__

__or__(other)

Union of the two selectors' column sets.

Source code in python/pydantable/selectors.py
def __or__(self, other: Selector | str | Iterable[str]) -> Selector:
    """Union of the two selectors' column sets."""
    o = _as_selector(other)
    rep = (
        f"({self!r} | {o!r})"
        if self._repr is not None and o._repr is not None
        else None
    )
    return Selector(
        lambda schema: self._resolver(schema) | o._resolver(schema), rep
    )

__and__

__and__(other)

Intersection of the two selectors' column sets.

Source code in python/pydantable/selectors.py
def __and__(self, other: Selector | str | Iterable[str]) -> Selector:
    """Intersection of the two selectors' column sets."""
    o = _as_selector(other)
    rep = (
        f"({self!r} & {o!r})"
        if self._repr is not None and o._repr is not None
        else None
    )
    return Selector(
        lambda schema: self._resolver(schema) & o._resolver(schema), rep
    )

__sub__

__sub__(other)

Set difference: columns in self that are not in other.

Source code in python/pydantable/selectors.py
def __sub__(self, other: Selector | str | Iterable[str]) -> Selector:
    """Set difference: columns in ``self`` that are not in ``other``."""
    o = _as_selector(other)
    rep = (
        f"({self!r} - {o!r})"
        if self._repr is not None and o._repr is not None
        else None
    )
    return Selector(
        lambda schema: self._resolver(schema) - o._resolver(schema), rep
    )

__invert__

__invert__()
Source code in python/pydantable/selectors.py
def __invert__(self) -> Selector:
    """Complement: all schema columns not matched by ``self``."""
    rep = f"(~{self!r})" if self._repr is not None else None
    return Selector(lambda schema: set(schema) - self._resolver(schema), rep)

everything

everything()

Select every column present in the schema mapping.

Source code in python/pydantable/selectors.py
def everything() -> Selector:
    """Select every column present in the schema mapping."""
    return Selector(lambda schema: set(schema), "everything()")

all

all()

Alias for :func:everything (spelled all for readability in pipelines).

Source code in python/pydantable/selectors.py
def all() -> Selector:
    """Alias for :func:`everything` (spelled ``all`` for readability in pipelines)."""
    return everything()

by_name

by_name(*names)

Select columns whose names appear in names.

Source code in python/pydantable/selectors.py
def by_name(*names: str) -> Selector:
    """Select columns whose names appear in ``names``."""
    wanted = {str(n) for n in names}
    rep = f"by_name({', '.join(repr(n) for n in names)})"
    return Selector(lambda schema: {n for n in schema if n in wanted}, rep)

starts_with

starts_with(prefix)

Select columns whose names start with prefix.

Source code in python/pydantable/selectors.py
def starts_with(prefix: str) -> Selector:
    """Select columns whose names start with ``prefix``."""
    p = str(prefix)
    return Selector(
        lambda schema: {n for n in schema if n.startswith(p)},
        f"starts_with({p!r})",
    )

ends_with

ends_with(suffix)

Select columns whose names end with suffix.

Source code in python/pydantable/selectors.py
def ends_with(suffix: str) -> Selector:
    """Select columns whose names end with ``suffix``."""
    s = str(suffix)
    return Selector(
        lambda schema: {n for n in schema if n.endswith(s)},
        f"ends_with({s!r})",
    )

contains

contains(substr)

Select columns whose names contain the substring substr.

Source code in python/pydantable/selectors.py
def contains(substr: str) -> Selector:
    """Select columns whose names contain the substring ``substr``."""
    sub = str(substr)
    return Selector(
        lambda schema: {n for n in schema if sub in n},
        f"contains({sub!r})",
    )

matches

matches(pattern)

Select columns whose names match the regex pattern (search semantics).

Source code in python/pydantable/selectors.py
def matches(pattern: str | re.Pattern[str]) -> Selector:
    """Select columns whose names match the regex ``pattern`` (``search`` semantics)."""
    rx = re.compile(pattern) if isinstance(pattern, str) else pattern
    return Selector(
        lambda schema: {n for n in schema if rx.search(n) is not None},
        f"matches({rx.pattern!r})",
    )

by_dtype

by_dtype(*dtypes)

Select columns whose annotations match any of dtypes.

Pass concrete Python types (int, str, …) or dtype groups such as :data:NUMERIC, :data:STRUCT, :data:MAPS.

Source code in python/pydantable/selectors.py
def by_dtype(*dtypes: Any) -> Selector:
    """Select columns whose annotations match any of ``dtypes``.

    Pass concrete Python types (``int``, ``str``, …) or dtype groups such as
    :data:`NUMERIC`, :data:`STRUCT`, :data:`MAPS`.
    """
    requested = tuple(dtypes)

    def _matches_any(annotation: Any) -> bool:
        for d in requested:
            if isinstance(d, _DTypeGroup):
                if d.match(annotation):
                    return True
            else:
                if _unwrap_optional(annotation) is d:
                    return True
        return False

    rep_parts: list[str] = []
    for d in requested:
        if isinstance(d, _DTypeGroup):
            rep_parts.append(repr(d))
        else:
            rep_parts.append(getattr(d, "__name__", repr(d)))
    rep = f"by_dtype({', '.join(rep_parts)})"
    return Selector(
        lambda schema: {name for name, ann in schema.items() if _matches_any(ann)},
        rep,
    )

numeric

numeric()

Select int, float, and :class:decimal.Decimal columns.

Source code in python/pydantable/selectors.py
def numeric() -> Selector:
    """Select int, float, and :class:`decimal.Decimal` columns."""
    return by_dtype(NUMERIC)

integers

integers()

Select int columns.

Source code in python/pydantable/selectors.py
def integers() -> Selector:
    """Select ``int`` columns."""
    return by_dtype(INTEGERS)

integer

integer()

Alias for :func:integers.

Source code in python/pydantable/selectors.py
def integer() -> Selector:
    """Alias for :func:`integers`."""
    return integers()

floats

floats()

Select float columns.

Source code in python/pydantable/selectors.py
def floats() -> Selector:
    """Select ``float`` columns."""
    return by_dtype(FLOATS)

float

float()

Alias for :func:floats.

Source code in python/pydantable/selectors.py
def float() -> Selector:
    """Alias for :func:`floats`."""
    return floats()

decimals

decimals()

Select :class:decimal.Decimal columns.

Source code in python/pydantable/selectors.py
def decimals() -> Selector:
    """Select :class:`decimal.Decimal` columns."""
    return by_dtype(DECIMALS)

decimal

decimal()

Alias for :func:decimals.

Source code in python/pydantable/selectors.py
def decimal() -> Selector:
    """Alias for :func:`decimals`."""
    return decimals()

string

string()

Select str columns.

Source code in python/pydantable/selectors.py
def string() -> Selector:
    """Select ``str`` columns."""
    return by_dtype(STRING)

boolean

boolean()

Select bool columns.

Source code in python/pydantable/selectors.py
def boolean() -> Selector:
    """Select ``bool`` columns."""
    return by_dtype(BOOLEAN)

temporal

temporal()

Select date, datetime, time, or timedelta columns.

Source code in python/pydantable/selectors.py
def temporal() -> Selector:
    """Select ``date``, ``datetime``, ``time``, or ``timedelta`` columns."""
    return by_dtype(TEMPORAL)

lists

lists()

Select list/tuple/set-typed columns.

Source code in python/pydantable/selectors.py
def lists() -> Selector:
    """Select list/tuple/set-typed columns."""
    return by_dtype(LIST)

structs

structs()

Select nested Pydantic :class:~pydantic.BaseModel columns.

Source code in python/pydantable/selectors.py
def structs() -> Selector:
    """Select nested Pydantic :class:`~pydantic.BaseModel` columns."""
    return by_dtype(STRUCT)

struct

struct()

Alias for :func:structs.

Source code in python/pydantable/selectors.py
def struct() -> Selector:
    """Alias for :func:`structs`."""
    return structs()

uuids

uuids()

Select :class:uuid.UUID columns.

Source code in python/pydantable/selectors.py
def uuids() -> Selector:
    """Select :class:`uuid.UUID` columns."""
    return by_dtype(UUIDS)

binary

binary()

Select raw bytes columns (use :func:wkbs for WKB geometry).

Source code in python/pydantable/selectors.py
def binary() -> Selector:
    """Select raw ``bytes`` columns (use :func:`wkbs` for WKB geometry)."""
    return by_dtype(BINARIES)

maps

maps()

Select dict[str, T] columns.

Source code in python/pydantable/selectors.py
def maps() -> Selector:
    """Select ``dict[str, T]`` columns."""
    return by_dtype(MAPS)

enums

enums()

Select :class:enum.Enum subclass columns.

Source code in python/pydantable/selectors.py
def enums() -> Selector:
    """Select :class:`enum.Enum` subclass columns."""
    return by_dtype(ENUMS)

ipv4s

ipv4s()

Select :class:ipaddress.IPv4Address columns.

Source code in python/pydantable/selectors.py
def ipv4s() -> Selector:
    """Select :class:`ipaddress.IPv4Address` columns."""
    return by_dtype(IPV4S)

ipv6s

ipv6s()

Select :class:ipaddress.IPv6Address columns.

Source code in python/pydantable/selectors.py
def ipv6s() -> Selector:
    """Select :class:`ipaddress.IPv6Address` columns."""
    return by_dtype(IPV6S)

wkbs

wkbs()

Select :class:~pydantable.types.WKB (well-known binary) columns.

Source code in python/pydantable/selectors.py
def wkbs() -> Selector:
    """Select :class:`~pydantable.types.WKB` (well-known binary) columns."""
    return by_dtype(WKBS)

rename_map

rename_map(selector, fn)

Build a rename mapping from a selector and renaming function (schema-driven).

Source code in python/pydantable/selectors.py
def rename_map(
    selector: Selector, fn: Callable[[str], str]
) -> Callable[[Mapping[str, Any]], dict[str, str]]:
    """Build a rename mapping from a selector and renaming function (schema-driven)."""
    if not isinstance(selector, Selector):
        raise TypeError("rename_map(selector, fn) expects a Selector.")
    if not callable(fn):
        raise TypeError("rename_map(selector, fn) expects a callable.")

    def _mk(schema_field_types: Mapping[str, Any]) -> dict[str, str]:
        cols = selector.resolve(schema_field_types)
        if not cols:
            available = ", ".join(repr(c) for c in schema_field_types)
            raise ValueError(
                f"rename_map({selector!r}) matched no columns. "
                f"Available columns: [{available}]"
            )
        mapping = {c: str(fn(c)) for c in cols}
        if len(set(mapping.values())) != len(mapping):
            raise ValueError("rename_map(...) produced duplicate output column names.")
        return mapping

    return _mk