Skip to content

Commit d5aef71

Browse files
committed
Add partition function expressions for chained partitioning
1 parent 34d3fd6 commit d5aef71

50 files changed

Lines changed: 3075 additions & 102 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@
8686
import org.apache.pinot.core.routing.timeboundary.TimeBoundaryStrategyService;
8787
import org.apache.pinot.core.transport.ServerInstance;
8888
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
89+
import org.apache.pinot.segment.spi.partition.PartitionFunction;
90+
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
8991
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
9092
import org.apache.pinot.spi.config.table.QueryConfig;
9193
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -797,9 +799,13 @@ private void buildRoutingInternal(String tableNameWithType) {
797799
columnPartitionMap.entrySet().iterator().next();
798800
LOGGER.info("Enabling SegmentPartitionMetadataManager for table: {} on partition column: {}",
799801
tableNameWithType, partitionConfig.getKey());
802+
PartitionFunction partitionFunction =
803+
PartitionFunctionFactory.getPartitionFunction(partitionConfig.getKey(), partitionConfig.getValue());
800804
partitionMetadataManager =
801805
new SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
802-
partitionConfig.getValue().getFunctionName(), partitionConfig.getValue().getNumPartitions());
806+
partitionFunction.getName(), partitionFunction.getFunctionExpr(),
807+
partitionFunction.getPartitionIdNormalizer(),
808+
partitionFunction.getNumPartitions());
803809
} else {
804810
LOGGER.warn(
805811
"Cannot enable SegmentPartitionMetadataManager for table: {} with multiple partition columns: {}",

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashSet;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Objects;
2728
import java.util.Set;
2829
import javax.annotation.Nullable;
2930
import org.apache.commons.lang3.tuple.Pair;
@@ -62,6 +63,8 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
6263
// static content, if anything changes for the following. a rebuild of routing table is needed.
6364
private final String _partitionColumn;
6465
private final String _partitionFunctionName;
66+
private final String _partitionFunctionExpr;
67+
private final String _partitionIdNormalizer;
6568
private final int _numPartitions;
6669

6770
// cache-able content, only follow changes if onlineSegments list (of ideal-state) is changed.
@@ -72,10 +75,12 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
7275
private transient TablePartitionReplicatedServersInfo _tablePartitionReplicatedServersInfo;
7376

7477
public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn, String partitionFunctionName,
75-
int numPartitions) {
78+
@Nullable String partitionFunctionExpr, @Nullable String partitionIdNormalizer, int numPartitions) {
7679
_tableNameWithType = tableNameWithType;
7780
_partitionColumn = partitionColumn;
7881
_partitionFunctionName = partitionFunctionName;
82+
_partitionFunctionExpr = partitionFunctionExpr;
83+
_partitionIdNormalizer = partitionIdNormalizer;
7984
_numPartitions = numPartitions;
8085
}
8186

@@ -103,7 +108,7 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
103108
return INVALID_PARTITION_ID;
104109
}
105110
PartitionFunction partitionFunction = segmentPartitionInfo.getPartitionFunction();
106-
if (!_partitionFunctionName.equalsIgnoreCase(partitionFunction.getName())) {
111+
if (!isMatchingPartitionFunction(partitionFunction)) {
107112
return INVALID_PARTITION_ID;
108113
}
109114
if (_numPartitions != partitionFunction.getNumPartitions()) {
@@ -116,6 +121,14 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
116121
return partitions.iterator().next();
117122
}
118123

124+
private boolean isMatchingPartitionFunction(PartitionFunction partitionFunction) {
125+
if (_partitionFunctionExpr != null) {
126+
return Objects.equals(_partitionFunctionExpr, partitionFunction.getFunctionExpr()) && Objects.equals(
127+
_partitionIdNormalizer, partitionFunction.getPartitionIdNormalizer());
128+
}
129+
return _partitionFunctionName.equalsIgnoreCase(partitionFunction.getName());
130+
}
131+
119132
private static long getCreationTimeMs(@Nullable ZNRecord znRecord) {
120133
if (znRecord == null) {
121134
return INVALID_CREATION_TIME_MS;

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionUtils.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import javax.annotation.Nullable;
2626
import org.apache.helix.zookeeper.datamodel.ZNRecord;
2727
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
28+
import org.apache.pinot.segment.spi.partition.PartitionFunction;
2829
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
2930
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
3031
import org.apache.pinot.spi.utils.CommonConstants;
@@ -79,10 +80,17 @@ public static SegmentPartitionInfo extractPartitionInfo(String tableNameWithType
7980
return INVALID_PARTITION_INFO;
8081
}
8182

82-
return new SegmentPartitionInfo(partitionColumn,
83-
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata.getFunctionName(),
84-
columnPartitionMetadata.getNumPartitions(), columnPartitionMetadata.getFunctionConfig()),
85-
columnPartitionMetadata.getPartitions());
83+
try {
84+
PartitionFunction partitionFunction =
85+
PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionMetadata.getFunctionName(),
86+
columnPartitionMetadata.getNumPartitions(), columnPartitionMetadata.getFunctionConfig(),
87+
columnPartitionMetadata.getFunctionExpr(), columnPartitionMetadata.getPartitionIdNormalizer());
88+
return new SegmentPartitionInfo(partitionColumn, partitionFunction, columnPartitionMetadata.getPartitions());
89+
} catch (Exception e) {
90+
LOGGER.warn("Caught exception while constructing partition function for column: {}, segment: {}, table: {}",
91+
partitionColumn, segment, tableNameWithType, e);
92+
return INVALID_PARTITION_INFO;
93+
}
8694
}
8795

8896
/**
@@ -124,11 +132,18 @@ public static Map<String, SegmentPartitionInfo> extractPartitionInfoMap(String t
124132
segment, tableNameWithType);
125133
continue;
126134
}
127-
SegmentPartitionInfo segmentPartitionInfo = new SegmentPartitionInfo(partitionColumn,
128-
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata.getFunctionName(),
129-
columnPartitionMetadata.getNumPartitions(), columnPartitionMetadata.getFunctionConfig()),
130-
columnPartitionMetadata.getPartitions());
131-
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
135+
try {
136+
PartitionFunction partitionFunction =
137+
PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionMetadata.getFunctionName(),
138+
columnPartitionMetadata.getNumPartitions(), columnPartitionMetadata.getFunctionConfig(),
139+
columnPartitionMetadata.getFunctionExpr(), columnPartitionMetadata.getPartitionIdNormalizer());
140+
SegmentPartitionInfo segmentPartitionInfo =
141+
new SegmentPartitionInfo(partitionColumn, partitionFunction, columnPartitionMetadata.getPartitions());
142+
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
143+
} catch (Exception e) {
144+
LOGGER.warn("Caught exception while constructing partition function for column: {}, segment: {}, table: {}",
145+
partitionColumn, segment, tableNameWithType, e);
146+
}
132147
}
133148
if (columnSegmentPartitionInfoMap.size() == 1) {
134149
String partitionColumn = columnSegmentPartitionInfoMap.keySet().iterator().next();

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,17 @@
3636
import org.apache.pinot.common.request.Identifier;
3737
import org.apache.pinot.common.request.context.RequestContextUtils;
3838
import org.apache.pinot.sql.FilterKind;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
3941

4042

4143
/**
4244
* The {@code MultiPartitionColumnsSegmentPruner} prunes segments based on their partition metadata stored in ZK. The
4345
* pruner supports queries with filter (or nested filter) of EQUALITY and IN predicates.
4446
*/
4547
public class MultiPartitionColumnsSegmentPruner implements SegmentPruner {
48+
private static final Logger LOGGER = LoggerFactory.getLogger(MultiPartitionColumnsSegmentPruner.class);
49+
4650
private final String _tableNameWithType;
4751
private final Set<String> _partitionColumns;
4852
private final Map<String, Map<String, SegmentPartitionInfo>> _segmentColumnPartitionInfoMap =
@@ -140,8 +144,8 @@ private boolean isPartitionMatch(Expression filterExpression,
140144
Identifier identifier = operands.get(0).getIdentifier();
141145
if (identifier != null) {
142146
SegmentPartitionInfo partitionInfo = columnPartitionInfoMap.get(identifier.getName());
143-
return partitionInfo == null || partitionInfo.getPartitions().contains(
144-
partitionInfo.getPartitionFunction().getPartition(RequestContextUtils.getStringValue(operands.get(1))));
147+
return partitionInfo == null || isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(
148+
operands.get(1)));
145149
} else {
146150
return true;
147151
}
@@ -155,8 +159,7 @@ private boolean isPartitionMatch(Expression filterExpression,
155159
}
156160
int numOperands = operands.size();
157161
for (int i = 1; i < numOperands; i++) {
158-
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
159-
.getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
162+
if (isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(i)))) {
160163
return true;
161164
}
162165
}
@@ -169,4 +172,14 @@ private boolean isPartitionMatch(Expression filterExpression,
169172
return true;
170173
}
171174
}
175+
176+
private boolean isPartitionMatch(SegmentPartitionInfo partitionInfo, String value) {
177+
try {
178+
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction().getPartition(value));
179+
} catch (RuntimeException e) {
180+
LOGGER.debug("Failed to evaluate partition function for table: {}, partition column: {}; skipping partition "
181+
+ "pruning", _tableNameWithType, partitionInfo.getPartitionColumn(), e);
182+
return true;
183+
}
184+
}
172185
}

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,17 @@
3535
import org.apache.pinot.common.request.Identifier;
3636
import org.apache.pinot.common.request.context.RequestContextUtils;
3737
import org.apache.pinot.sql.FilterKind;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3840

3941

4042
/**
4143
* The {@code SinglePartitionColumnSegmentPruner} prunes segments based on their partition metadata stored in ZK. The
4244
* pruner supports queries with filter (or nested filter) of EQUALITY and IN predicates.
4345
*/
4446
public class SinglePartitionColumnSegmentPruner implements SegmentPruner {
47+
private static final Logger LOGGER = LoggerFactory.getLogger(SinglePartitionColumnSegmentPruner.class);
48+
4549
private final String _tableNameWithType;
4650
private final String _partitionColumn;
4751
private final Map<String, SegmentPartitionInfo> _partitionInfoMap = new ConcurrentHashMap<>();
@@ -129,8 +133,7 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
129133
case EQUALS: {
130134
Identifier identifier = operands.get(0).getIdentifier();
131135
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
132-
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
133-
.getPartition(RequestContextUtils.getStringValue(operands.get(1))));
136+
return isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(1)));
134137
} else {
135138
return true;
136139
}
@@ -140,8 +143,7 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
140143
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
141144
int numOperands = operands.size();
142145
for (int i = 1; i < numOperands; i++) {
143-
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
144-
.getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
146+
if (isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(i)))) {
145147
return true;
146148
}
147149
}
@@ -154,4 +156,14 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
154156
return true;
155157
}
156158
}
159+
160+
private boolean isPartitionMatch(SegmentPartitionInfo partitionInfo, String value) {
161+
try {
162+
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction().getPartition(value));
163+
} catch (RuntimeException e) {
164+
LOGGER.debug("Failed to evaluate partition function for table: {}, partition column: {}; skipping partition "
165+
+ "pruning", _tableNameWithType, _partitionColumn, e);
166+
return true;
167+
}
168+
}
157169
}

pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashSet;
2424
import java.util.Map;
2525
import java.util.Set;
26+
import javax.annotation.Nullable;
2627
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
2728
import org.apache.helix.model.ExternalView;
2829
import org.apache.helix.model.IdealState;
@@ -37,6 +38,7 @@
3738
import org.apache.pinot.controller.helix.ControllerTest;
3839
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
3940
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
41+
import org.apache.pinot.segment.spi.partition.pipeline.PartitionPipelineFunction;
4042
import org.testng.annotations.AfterClass;
4143
import org.testng.annotations.BeforeClass;
4244
import org.testng.annotations.Test;
@@ -84,7 +86,7 @@ public void testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
8486

8587
SegmentPartitionMetadataManager partitionMetadataManager =
8688
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN, PARTITION_COLUMN_FUNC,
87-
NUM_PARTITIONS);
89+
null, null, NUM_PARTITIONS);
8890
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
8991
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
9092
segmentZkMetadataFetcher.register(partitionMetadataManager);
@@ -284,11 +286,87 @@ public void testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
284286
assertEquals(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().get(0), segmentInvalid);
285287
}
286288

289+
@Test
290+
public void testPartitionMetadataManagerProcessingWithFunctionExpr() {
291+
ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
292+
Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
293+
Set<String> onlineSegments = new HashSet<>();
294+
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
295+
String functionExpr = "fnv1a_32(md5(" + PARTITION_COLUMN + "))";
296+
String segment = "exprSegment";
297+
298+
SegmentPartitionMetadataManager partitionMetadataManager =
299+
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN, PartitionPipelineFunction.NAME,
300+
functionExpr, "MASK", 8);
301+
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
302+
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
303+
segmentZkMetadataFetcher.register(partitionMetadataManager);
304+
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
305+
306+
onlineSegments.add(segment);
307+
segmentAssignment.put(segment, Collections.singletonMap(SERVER_0, ONLINE));
308+
setSegmentZKMetadata(segment, PartitionPipelineFunction.NAME, 8, 1, functionExpr, "MASK", 0L);
309+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
310+
311+
TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo =
312+
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
313+
assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap()[1]._segments,
314+
Collections.singleton(segment));
315+
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
316+
317+
setSegmentZKMetadata(segment, PartitionPipelineFunction.NAME, 8, 1, functionExpr, "POSITIVE_MODULO", 0L);
318+
segmentZkMetadataFetcher.refreshSegment(segment);
319+
320+
tablePartitionReplicatedServersInfo = partitionMetadataManager.getTablePartitionReplicatedServersInfo();
321+
assertEquals(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition(),
322+
Collections.singletonList(segment));
323+
}
324+
325+
@Test
326+
public void testPartitionMetadataManagerProcessingWithFunctionExprAbsNormalizer() {
327+
ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
328+
Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
329+
Set<String> onlineSegments = new HashSet<>();
330+
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
331+
String functionExpr = "murmur2(" + PARTITION_COLUMN + ")";
332+
String segment = "exprAbsSegment";
333+
334+
SegmentPartitionMetadataManager partitionMetadataManager =
335+
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN, PartitionPipelineFunction.NAME,
336+
functionExpr, "ABS", 8);
337+
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
338+
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
339+
segmentZkMetadataFetcher.register(partitionMetadataManager);
340+
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
341+
342+
onlineSegments.add(segment);
343+
segmentAssignment.put(segment, Collections.singletonMap(SERVER_0, ONLINE));
344+
setSegmentZKMetadata(segment, PartitionPipelineFunction.NAME, 8, 1, functionExpr, "ABS", 0L);
345+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
346+
347+
TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo =
348+
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
349+
assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap()[1]._segments,
350+
Collections.singleton(segment));
351+
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
352+
}
353+
287354
private void setSegmentZKMetadata(String segment, String partitionFunction, int numPartitions, int partitionId,
288355
long creationTimeMs) {
356+
setSegmentZKMetadata(segment, partitionFunction, numPartitions, partitionId, null, null, creationTimeMs);
357+
}
358+
359+
private void setSegmentZKMetadata(String segment, String partitionFunction, int numPartitions, int partitionId,
360+
@Nullable String functionExpr, long creationTimeMs) {
361+
setSegmentZKMetadata(segment, partitionFunction, numPartitions, partitionId, functionExpr, null, creationTimeMs);
362+
}
363+
364+
private void setSegmentZKMetadata(String segment, String partitionFunction, int numPartitions, int partitionId,
365+
@Nullable String functionExpr, @Nullable String partitionIdNormalizer, long creationTimeMs) {
289366
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);
290367
segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN,
291-
new ColumnPartitionMetadata(partitionFunction, numPartitions, Collections.singleton(partitionId), null))));
368+
new ColumnPartitionMetadata(partitionFunction, numPartitions, Collections.singleton(partitionId), null,
369+
functionExpr, partitionIdNormalizer))));
292370
segmentZKMetadata.setCreationTime(creationTimeMs);
293371
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segmentZKMetadata);
294372
}

0 commit comments

Comments
 (0)