Add partition function expressions for chained partitioning#18165
Add partition function expressions for chained partitioning#18165xiangfu0 wants to merge 3 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds expression-mode partitioning (functionExpr) so Pinot can derive partitions and perform query-time pruning by evaluating a deterministic scalar-function pipeline against raw column values (supporting chained transforms like fnv1a_32(md5(id))), while preserving legacy functionName behavior.
Changes:
- Introduces
functionExprin table/segment partition configs and persists it through offline + realtime metadata paths. - Adds a partition-expression compiler that builds a typed
PartitionPipelineand aPartitionPipelineFunctionadapter. - Extends broker/server pruning to evaluate the same partition pipeline for EQ/IN predicates on the raw column (with fail-open behavior on evaluation errors) and adds/updates tests.
Reviewed changes
Copilot reviewed 46 out of 46 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java | Uses functionExpr when computing partitions for backfill segment selection. |
| pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommandTest.java | Adds coverage for segment partition matching with functionExpr. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ColumnPartitionConfig.java | Adds functionExpr mode with validation to enforce exclusive config mode. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentPartitionConfig.java | Exposes getFunctionExpr() for partition configs. |
| pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java | Adds JSON round-trip and invalid-mode tests for functionExpr. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java | Adds segment metadata key for partitionFunctionExpr. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java | Extends partition function interface to optionally expose getFunctionExpr(). |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java | Adds overloads to create partition functions from configs/metadata, including expression mode. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java | Persists/loads functionExpr in partition metadata JSON. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java | Reads partitionFunctionExpr from segment properties and constructs the correct partition function. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java | Implements compilation of restricted scalar-function expressions into a typed pipeline. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionPipelineFunction.java | Adapts compiled pipelines to the PartitionFunction interface. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionPipeline.java | Defines the immutable compiled pipeline and evaluation logic. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionStep.java | Defines a typed pipeline step contract and runtime validation. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionValue.java | Adds typed runtime value wrapper for pipeline evaluation. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionValueType.java | Defines supported type system for the partition pipeline compiler. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionIntNormalizer.java | Adds normalization strategies to match legacy partition-id semantics. |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompilerTest.java | Adds compiler coverage (canonicalization, literals, determinism, thread-safety, legacy normalization). |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/function/scalar/PartitionFunctionExprTestFunctions.java | Provides test-only scalar functions to exercise the compiler in pinot-segment-spi. |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/function/scalar/PartitionFunctionExprRacyTestFunctions.java | Provides a non-static racy scalar function to validate thread-local targets. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java | Validates partition configs by constructing the partition function (including expression mode). |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java | Adds validation tests for functionExpr, including literal args and invalid output type. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java | Exposes getPartitionFunctionExpr() to segment creation stats collection. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java | Creates partition functions using either functionName or functionExpr. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java | Persists partitionFunctionExpr into segment metadata properties when present. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java | Builds realtime partition function from ColumnPartitionConfig (supports expr mode). |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java | Preserves functionExpr when reporting segment partition config. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java | Adds coverage ensuring expression partition metadata is preserved across realtime conversion. |
| pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java | Uses factory method that supports expression-mode partition functions. |
| pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java | Uses fail-open partition evaluation for EQ/IN pruning and supports expr-based partition functions. |
| pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java | Adds pruning coverage for functionExpr and fail-open behavior when evaluation throws. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java | Builds realtime partition function from config with expression support. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java | Detects staleness when partition function expression changes (in addition to name/partition count). |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java | Persists expression partition metadata into ZK using the effective partition function. |
| pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java | Writes partition metadata including functionExpr into ZK metadata. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java | Marks request-context dependent scalar functions as non-deterministic. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java | Marks time-dependent scalar functions as non-deterministic. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java | Adds hash aliases (murmur2, murmur3_32, fnv1*_xx, md5_raw) for partition-expression compatibility. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionUtils.java | Treats invalid expression metadata as unprunable by catching construction exceptions. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java | Tracks partition function expression for routing metadata validation. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java | Initializes partition metadata manager with effective function name + expression. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java | Adds fail-open behavior for partition evaluation errors. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java | Adds fail-open behavior for partition evaluation errors. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/PartitionFunctionExprSegmentPrunerTest.java | Adds broker pruning tests for expression-mode partitions and fail-open on bad literals/metadata. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java | Extends existing routing/pruning tests to cover functionExpr. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java | Adds metadata-manager coverage for expression-mode partition metadata handling. |
645dc07 to
f230f4d
Compare
9b026ad to
ffdb486
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18165 +/- ##
============================================
- Coverage 63.48% 55.48% -8.01%
+ Complexity 1627 896 -731
============================================
Files 3244 2561 -683
Lines 197365 146654 -50711
Branches 30540 23614 -6926
============================================
- Hits 125306 81377 -43929
+ Misses 62019 58232 -3787
+ Partials 10040 7045 -2995
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ffdb486 to
2bc240a
Compare
d5aef71 to
4c371b0
Compare
4c371b0 to
3620543
Compare
17b4482 to
b54f745
Compare
4ba89bd to
3c1eec7
Compare
3c1eec7 to
dae5cc6
Compare
0540975 to
b10c06d
Compare
Introduces expression-mode partition functions that allow composing scalar functions in a pipeline (e.g. fnv1a_32(md5(col))) for more flexible partition schemes, including full BYTES-column support. Key changes: - New PartitionFunctionExprCompiler that compiles a restricted expression syntax into a typed PartitionPipeline backed by deterministic scalar functions - PartitionPipelineFunction adapter implementing PartitionFunction / FunctionEvaluator for both ingestion and broker routing - ColumnPartitionConfig.functionExpr field (expression-mode) alongside existing functionName field (name-mode); getFunctionName() is now @nullable - PartitionIntNormalizer enum (POSITIVE_MODULO / ABS / MASK) replaces ad-hoc modulo logic; normalizer name persisted in config and segment metadata - ColumnPartitionMetadata extended with functionExpr, partitionIdNormalizer, and inputType (BYTES only) fields; "FunctionExpr" sentinel written for functionName so pre-upgrade brokers degrade gracefully (no pruning) instead of NPE-ing - BYTES-column support: pipelines compiled with PartitionValueType.BYTES pass raw byte arrays through scalar functions; BytesColumnPreIndexStatsCollector and StatsCollectorConfig detect the column type and use the correct pipeline - Schema-aware factory helpers (PartitionFunctionFactory, PartitionerFactory, TableConfigPartitioner) so minion merge tasks and stale-segment detection use the correct input type for BYTES expression-mode columns - Broker routing extended to prune segments using expression-mode partition functions; SegmentPartitionMetadataManager and segment pruners handle both modes - Non-deterministic scalar functions (now/ago/sleep) blocked from partition expressions via isAllowedForPartitioning(); isDeterministic flag unchanged so compile-time constant-folding in query parsing continues to work - CommonPartitionScalarFunctionResolver wired via ServiceLoader; resolves scalar functions from FunctionRegistry for use in partition expression compilation - JSON round-trip test for ColumnPartitionMetadata expression-mode serialization Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
b10c06d to
fe2e6c6
Compare
…y backed by InbuiltFunctionEvaluator Addresses code review feedback on PR apache#18165: - Replace the 440-line CommonPartitionScalarFunctionResolver (custom overload resolution, typed PartitionValue/PartitionStep/PartitionValueConversions) with a thin PartitionEvaluatorFactory SPI in pinot-segment-spi implemented in pinot-common by PartitionFunctionEvaluator, which reuses Pinot's FunctionRegistry for all scalar-function resolution. - PartitionFunctionEvaluator extends ExecutableFunctionEvaluator and overrides String→byte[] conversion to use UTF-8 encoding (vs hex-decoding in InbuiltFunctionEvaluator) for correct ingestion semantics. - PartitionPipeline simplified: no longer extends ExecutableFunctionEvaluator; holds FunctionEvaluator. - Delete PartitionScalarFunctionResolver, PartitionValue, PartitionValueConversions, PartitionStep. - Move expression-mode partition tests requiring pinot-common to pinot-common (can't run in pinot-segment-spi where pinot-common is not a dependency). - Fix ColumnPartitionMetadata.hashCode() to include _partitions via Objects.hash() uniformly. - Add Javadoc to PartitionIntNormalizer enum constants clarifying pre- vs post-modulo semantics. Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
…, fix Javadoc refs - Declare PartitionFunctionEvaluator final (thread-safety contract) - Make PartitionFunctionExecutionNode fields private final - Cache getArguments() result in PartitionPipeline and delegate from PartitionPipelineFunction to avoid singletonList allocation on every call - Fix stale Javadoc references in PartitionEvaluatorFactory and PartitionFunctionExprCompiler (was: InbuiltFunctionEvaluator, now: PartitionFunctionEvaluator) - Add assertEquals(roundTripped, metadata) to ColumnPartitionMetadataTest round-trip tests to cover the hashCode() fix - Delete PartitionFunctionExprRacyTestFunctions (dead code — the test using it was removed) Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
What changed
This adds partition function expressions so Pinot can compute segment partitions and query-time pruning partitions from the raw column through a deterministic scalar-function pipeline instead of a single hard-coded partition function.
Key changes:
functionExprwhile preserving existingfunctionNamebehaviorpartitionIdNormalizerfor expression mode, defaulting toPOSITIVE_MODULOpinot-segment-spiWhy
Pinot's partition model assumed one partition function per raw column. That breaks down for common production layouts where the upstream partition key is derived by chaining transforms such as
md5(id) -> fnv1a_32(...) -> partitionorlower(key) -> murmur2(...) -> partition. Derived ingestion columns do not preserve correct pruning semantics when queries still filter on the raw column.This PR keeps partitioning attached to the raw column and evaluates the same deterministic pipeline against query literals so pruning remains correct.
Example table config
A table can now configure partitioning on the raw column with
functionExprinsidesegmentPartitionConfig:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128 } } } } }When
partitionIdNormalizeris omitted, Pinot usesPOSITIVE_MODULOby default.If an operator needs compatibility with an existing hash-to-partition mapping, expression mode can also configure the normalizer explicitly. For example,
ABSpreserves absolute-remainder semantics:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "ABS" } } } } }Or
MASKpreserves sign-bit masking semantics:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "MASK" } } } } }Another supported example is:
{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "memberId": { "functionExpr": "murmur2(lower(memberId))", "numPartitions": 8 } } } } }Partition-id normalization
Expression mode now separates two concerns:
functionExprcomputes the deterministic scalar pipeline from the raw column to an integer hash candidatepartitionIdNormalizerconverts that integer into the final partition id in[0, numPartitions)Supported normalizers are:
POSITIVE_MODULO: default behavior for expression mode; uses modulo with negative remainders shifted back into[0, numPartitions)ABS: compatibility mode for layouts that expectabs(hash % numPartitions)semanticsMASK: compatibility mode for hash pipelines that expect sign-bit masking semanticsQuery behavior
Users still query the raw column. There is no query rewrite and no need to expose a derived ingestion column.
For example, with the config above users write:
or:
Pinot evaluates the same expression pipeline against the raw query literal(s) during pruning.
For the compatibility config:
{ "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "MASK" }The UUID
000016be-9d72-466c-9632-cfa680dc8fa3maps to partition104, so an equality predicate on that rawidprunes using partition104.Safety and correctness
This also fixes a few issues discovered during review:
Validation
Ran:
./mvnw spotless:apply -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw checkstyle:check -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw license:format -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw license:check -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw -pl pinot-broker -am -Dtest=PartitionFunctionExprSegmentPrunerTest,SegmentPartitionMetadataManagerTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-core -am -Dtest=BaseTableDataManagerNeedRefreshTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-segment-local -Dtest=TableConfigUtilsTest,RealtimeSegmentConverterTest test./mvnw -pl pinot-segment-spi -Dtest=PartitionFunctionExprCompilerTest test./mvnw -pl pinot-spi -Dtest=IndexingConfigTest testNotable regression coverage includes:
POSITIVE_MODULOABSnormalization for negative-hash absolute-remainder compatibilityMASKnormalization forfnv1a_32(md5(id))with 128 partitions and UUID000016be-9d72-466c-9632-cfa680dc8fa3mapping to partition 104