Skip to content

feat(xorq-stats): compile_batch_expr for portable Phase-1 aggregate expressions#766

Open
paddymul wants to merge 2 commits into
mainfrom
feat/xorq-compile-batch-expr
Open

feat(xorq-stats): compile_batch_expr for portable Phase-1 aggregate expressions#766
paddymul wants to merge 2 commits into
mainfrom
feat/xorq-compile-batch-expr

Conversation

@paddymul
Copy link
Copy Markdown
Collaborator

Summary

Adds XorqStatPipeline.compile_batch_expr(table) -> (expr, errors) — a way to extract the Phase-1 batched-aggregate expression without executing it. Pass an xo.table(schema, name=...) UnboundTable and you get a portable, reusable summary-stats expression you can catalog, ship across processes, or rebind to any source later.

Motivating use case: xorq/buckaroo users want to save the summary-stats config as an unbound expression so it can live alongside other catalog entries instead of being trapped inside process_table's execute loop.

What's in the expression

Shape: (1 row) x (1 + N_batch_results). Columns:

  • __total_length__ — table-level row count (promoted to XorqStatPipeline.TOTAL_LENGTH_KEY)
  • <col>|<stat> for every batch-phase (col, stat) pair that survived the column filter

Only Phase-1 batched stats are folded in (null_count, min, max, distinct_count, mean, std, median). Histograms and pure-Python computed stats (non_null_count, nan_per, distinct_per, _type, typing_stats) are not — histograms need scalar min/max from Phase 1, and the computed stats are Python on resolved scalars.

Internals

Refactors process_table's Phase-1 build loop into _build_batch_agg_exprs(table) returning (agg_exprs, batch_items, errors). Shared by the new public method and the execution path. No behaviour change to process_table — construction-time failures still land in the per-column accumulator as Err, just routed through StatError now.

Usage

import xorq.api as xo
from buckaroo.customizations.xorq_stats_v2 import XORQ_STATS_V2
from buckaroo.pluggable_analysis_framework.xorq_stat_pipeline import XorqStatPipeline

schema = {"a": "float64", "b": "string", "c": "int64"}
unbound = xo.table(schema, name="t")

pipeline = XorqStatPipeline(XORQ_STATS_V2)
expr, errors = pipeline.compile_batch_expr(unbound)

# rebind to any source with a compatible schema and execute:
bound = expr.op().replace({unbound.op(): real_source.op()}).to_expr()
df = bound.execute()

Test plan

  • pytest tests/unit/test_xorq_compile_batch_expr.py — 7 new tests pass (unbound stays unbound, column naming, no histogram, rebind matches process_table baseline, real-table input, construction-error surfacing, process_table regression)
  • pytest tests/unit/test_xorq_*.py — 79 existing xorq tests pass
  • ruff clean
  • CI green

🤖 Generated with Claude Code

… as portable expressions

Adds `XorqStatPipeline.compile_batch_expr(table) -> (expr, errors)` so callers
can extract the Phase-1 batched-aggregate expression without executing it.
Pass an `xo.table(schema, name=...)` UnboundTable to get a portable, reusable
stat expression that can be cataloged / shipped / rebound to any source later.

The result shape is `(1 row) x (1 + N_batch_results)` with columns
`__total_length__` and `<col>|<stat>` — same naming the internal Phase-1
result reader has always used; now promoted to a class constant
(`TOTAL_LENGTH_KEY`) so external callers don't hard-code it.

Refactors `process_table`'s Phase-1 build loop into `_build_batch_agg_exprs`,
shared by both the public method and the execution path. No behaviour change
to `process_table`: construction-time failures still land in the per-column
accumulator as `Err`, just routed through `StatError`.

Histograms are intentionally excluded — they're Phase 2, parameterised on
scalar min/max from Phase 1, so they can't be folded into one expression.
Computed Python stats (`non_null_count`, `nan_per`, `distinct_per`, `_type`,
`typing_stats`) are also out — they're pure Python on resolved scalars.

Rebind pattern documented in the docstring:

    unbound = xo.table(schema, name="t")
    expr, _ = pipeline.compile_batch_expr(unbound)
    bound = expr.op().replace({unbound.op(): real_source.op()}).to_expr()
    df = bound.execute()

Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 17, 2026

📦 TestPyPI package published

pip install --index-strategy unsafe-best-match --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ buckaroo==0.14.2.dev26006465187

or with uv:

uv pip install --index-strategy unsafe-best-match --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ buckaroo==0.14.2.dev26006465187

MCP server for Claude Code

claude mcp add buckaroo-table -- uvx --from "buckaroo[mcp]==0.14.2.dev26006465187" --index-strategy unsafe-best-match --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ buckaroo-table

📖 Docs preview

🎨 Storybook preview

- Assert batch funcs provide exactly one stat key — surfaces the
  long-standing implicit constraint (the named aggregate column uses
  ``sf.provides[0].name``; multiple provides would silently drop after
  the first).
- Clarify in process_table why construction_errors aren't appended to
  all_errors directly — they reach the caller via resolve_accumulator.
- compile_batch_expr docstring: note the table.aggregate wrapper is
  already applied, and that the rebind source must be schema-compatible.
- test_returns_unbound_when_given_unbound: walk the op tree via
  ``op.find(UnboundTable)`` instead of substring-matching repr(expr),
  so the test survives ibis repr-format changes.

Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
@paddymul
Copy link
Copy Markdown
Collaborator Author

Design notes — compile_batch_expr is the wrong shape

Capturing a design discussion for when this gets picked back up (verify on a machine where xorq installs — no macOS-13 wheel locally).

The problem with the current PR

compile_batch_expr(table) takes an xo.table(schema, name=...) UnboundTable, enumerates that schema's columns, bakes them into result names (<col>|<stat>), and expects the caller to rebind later via expr.op().replace({unbound.op(): real.op()}).

  • The "unbound" expr is still pinned to one schema — you only have that schema because you inspected a real table. It's reusable for exactly one catalog entry, not a catalog of heterogeneous tables.
  • .op().replace() is internal-API graph surgery. xorq already exposes replace_unbound (xorq/common/utils/graph_utils.py:286) for this — but more importantly, the rebind dance buys nothing.
  • It's the rigid path: equivalent to UnboundExprExchanger whose schema_in_condition = operator.eq(exact_schema) (xorq/flight/exchanger.py:283).

The fundamental constraint

A summary-stats expression's shape — how many aggregate columns, their names — is a function of the input schema (3 numeric cols → 9 agg exprs; 5 → 15). A single frozen expression has one fixed shape. So something must read the schema; the only question is when. There is no "never inspects the schema" option.

What the genuinely-generic, write-once artifact can be is a recipe parameterized only by the analysis klasses, not a frozen expression.

Approach A — Deferred recipe (ibis._) ← recommended starting point

Write the aggregate with the table left as a hole:

from xorq import _
import xorq.expr.selectors as s

buckaroo_summary = _.aggregate(
    _.count().name("__total_length__"),
    s.across(s.numeric(),         {"min": _.min(), "max": _.max(), "mean": _.mean()}),
    s.across(s.of_type("string"), {"distinct_count": _.nunique()}),
)
  • buckaroo_summary holds nothing bound — _ is the hole, Across/numeric() are table-free until .expand().
  • One artifact, write-once. Bind per catalog entry: buckaroo_summary.resolve(entry).
  • Each resolved result is an ordinary expression on top of entry → xorq caches it natively, keyed on hash(entry subtree + agg ops). Agg ops derive from the stat funcs, so changing the analysis klasses invalidates the cache automatically.
  • Uses ibis selectors instead of a hand-rolled for col / column_filter(dtype) loop — selectors auto-expand and auto-suffix <col>_<stat>.

Gotcha to verify on a real machine: _ is overloaded — outer _ = table, inner _ inside across = column (across resolves it via func.resolve({"_": orig_col}), selectors.py:440). Works only if the outer Deferred treats the Across object as an opaque literal and doesn't recurse into its funcs. Across is a Concrete, not a Deferred, so it almost certainly passes through — but run it to confirm.

Approach B — plain function recipe (no _ shadowing risk)

Same idea, table as an explicit param — bulletproof, no Deferred ambiguity:

def buckaroo_summary(t):
    return t.aggregate(
        t.count().name("__total_length__"),
        s.across(s.numeric(),         {"min": _.min(), "max": _.max(), "mean": _.mean()}),
        s.across(s.of_type("string"), {"distinct_count": _.nunique()}),
    )

Here _ only ever appears inside across (= column). The function and the Deferred are the same recipe — code vs. inspectable data. Pick the Deferred if you need to store/introspect it as a value; pick the function otherwise.

Approach C — UDXF (flight_udxf) — rejected, noted for completeness

xorq/flight/exchanger.py:make_udxf accepts callable maybe_schema_in / maybe_schema_out, giving a genuinely schema-polymorphic transform you .pipe() onto any expr. But the UDXF path is not pushdownprocess_batch does batch.to_pandas(); process_df(df), streaming the whole table through an Arrow Flight server. Rejected: we want SQL pushdown.

Open issues for either A or B

  1. column_filter → selector adapter. Buckaroo stats carry an arbitrary column_filter(dtype) -> bool. numeric/string/temporal map straight to s.numeric() / s.of_type(...); an arbitrary dtype predicate needs a shim, or batch stats should declare a selector instead of a filter.
  2. Loss of per-(col, stat) construction-error attribution. The current loop try/excepts each sf.func(col) into a StatError. s.across builds the aggregate in one shot — a construction failure throws for the batch. Acceptable for pure batched aggregates (min/max/mean rarely fail per-column once dtype filtering is right), but it's a real change in error semantics.
  3. Phase 2 still doesn't fit. Histograms need scalar min/max resolved by executing Phase 1; computed Python stats run on resolved scalars. To make the whole summary one cached expression, histograms must compute bucket bounds as subexpressions of col.min()/col.max() instead of round-tripping through Python. Separate, larger PR.

Suggested direction

Keep the _build_batch_agg_exprs refactor. Drop compile_batch_expr, the xo.table/.op().replace() machinery, and the unbound-table tests. Replace with a selector-based buckaroo_summary recipe (Approach A or B), resolved per catalog entry and .cache()d. File the histogram-as-subexpression change as a follow-up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant