Skip to content

Commit dae5cc6

Browse files
committed
Add partition function expressions for chained partitioning
1 parent 22b3b6f commit dae5cc6

63 files changed

Lines changed: 4308 additions & 460 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ private void buildRoutingInternal(String tableNameWithType) {
799799
tableNameWithType, partitionConfig.getKey());
800800
partitionMetadataManager =
801801
new SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
802-
partitionConfig.getValue().getFunctionName(), partitionConfig.getValue().getNumPartitions());
802+
partitionConfig.getValue());
803803
} else {
804804
LOGGER.warn(
805805
"Cannot enable SegmentPartitionMetadataManager for table: {} with multiple partition columns: {}",

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashSet;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Objects;
2728
import java.util.Set;
2829
import javax.annotation.Nullable;
2930
import org.apache.commons.lang3.tuple.Pair;
@@ -39,6 +40,8 @@
3940
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
4041
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo.PartitionInfo;
4142
import org.apache.pinot.segment.spi.partition.PartitionFunction;
43+
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
44+
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
4245
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
4346
import org.slf4j.Logger;
4447
import org.slf4j.LoggerFactory;
@@ -61,7 +64,7 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
6164

6265
// static content, if anything changes for the following. a rebuild of routing table is needed.
6366
private final String _partitionColumn;
64-
private final String _partitionFunctionName;
67+
private final PartitionFunction _partitionFunction;
6568
private final int _numPartitions;
6669

6770
// cache-able content, only follow changes if onlineSegments list (of ideal-state) is changed.
@@ -71,12 +74,12 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
7174
private transient TablePartitionInfo _tablePartitionInfo;
7275
private transient TablePartitionReplicatedServersInfo _tablePartitionReplicatedServersInfo;
7376

74-
public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn, String partitionFunctionName,
75-
int numPartitions) {
77+
public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn,
78+
ColumnPartitionConfig columnPartitionConfig) {
7679
_tableNameWithType = tableNameWithType;
7780
_partitionColumn = partitionColumn;
78-
_partitionFunctionName = partitionFunctionName;
79-
_numPartitions = numPartitions;
81+
_partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionConfig);
82+
_numPartitions = _partitionFunction.getNumPartitions();
8083
}
8184

8285
@Override
@@ -103,7 +106,7 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
103106
return INVALID_PARTITION_ID;
104107
}
105108
PartitionFunction partitionFunction = segmentPartitionInfo.getPartitionFunction();
106-
if (!_partitionFunctionName.equalsIgnoreCase(partitionFunction.getName())) {
109+
if (!isMatchingPartitionFunction(partitionFunction)) {
107110
return INVALID_PARTITION_ID;
108111
}
109112
if (_numPartitions != partitionFunction.getNumPartitions()) {
@@ -116,6 +119,13 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
116119
return partitions.iterator().next();
117120
}
118121

122+
private boolean isMatchingPartitionFunction(PartitionFunction partitionFunction) {
123+
return _partitionFunction.getName().equalsIgnoreCase(partitionFunction.getName())
124+
&& Objects.equals(_partitionFunction.getFunctionConfig(), partitionFunction.getFunctionConfig())
125+
&& Objects.equals(_partitionFunction.getFunctionExpr(), partitionFunction.getFunctionExpr())
126+
&& Objects.equals(_partitionFunction.getPartitionIdNormalizer(), partitionFunction.getPartitionIdNormalizer());
127+
}
128+
119129
private static long getCreationTimeMs(@Nullable ZNRecord znRecord) {
120130
if (znRecord == null) {
121131
return INVALID_CREATION_TIME_MS;
@@ -306,7 +316,7 @@ private void computeTablePartitionReplicatedServersInfo() {
306316
}
307317
}
308318
_tablePartitionReplicatedServersInfo =
309-
new TablePartitionReplicatedServersInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName,
319+
new TablePartitionReplicatedServersInfo(_tableNameWithType, _partitionColumn, _partitionFunction.getName(),
310320
_numPartitions, partitionInfoMap, segmentsWithInvalidPartition);
311321
}
312322

@@ -337,7 +347,7 @@ private void computeTablePartitionInfo() {
337347
_tableNameWithType);
338348
}
339349
_tablePartitionInfo =
340-
new TablePartitionInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName, _numPartitions,
350+
new TablePartitionInfo(_tableNameWithType, _partitionColumn, _partitionFunction.getName(), _numPartitions,
341351
segmentsByPartition, segmentsWithInvalidPartition);
342352
}
343353

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionUtils.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import javax.annotation.Nullable;
2626
import org.apache.helix.zookeeper.datamodel.ZNRecord;
2727
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
28+
import org.apache.pinot.segment.spi.partition.PartitionFunction;
2829
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
2930
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
3031
import org.apache.pinot.spi.utils.CommonConstants;
@@ -79,9 +80,15 @@ public static SegmentPartitionInfo extractPartitionInfo(String tableNameWithType
7980
return INVALID_PARTITION_INFO;
8081
}
8182

82-
return new SegmentPartitionInfo(partitionColumn,
83-
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata),
84-
columnPartitionMetadata.getPartitions());
83+
try {
84+
PartitionFunction partitionFunction =
85+
PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionMetadata);
86+
return new SegmentPartitionInfo(partitionColumn, partitionFunction, columnPartitionMetadata.getPartitions());
87+
} catch (Exception e) {
88+
LOGGER.warn("Caught exception while constructing partition function for column: {}, segment: {}, table: {}",
89+
partitionColumn, segment, tableNameWithType, e);
90+
return INVALID_PARTITION_INFO;
91+
}
8592
}
8693

8794
/**
@@ -123,10 +130,16 @@ public static Map<String, SegmentPartitionInfo> extractPartitionInfoMap(String t
123130
segment, tableNameWithType);
124131
continue;
125132
}
126-
SegmentPartitionInfo segmentPartitionInfo = new SegmentPartitionInfo(partitionColumn,
127-
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata),
128-
columnPartitionMetadata.getPartitions());
129-
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
133+
try {
134+
PartitionFunction partitionFunction =
135+
PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionMetadata);
136+
SegmentPartitionInfo segmentPartitionInfo =
137+
new SegmentPartitionInfo(partitionColumn, partitionFunction, columnPartitionMetadata.getPartitions());
138+
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
139+
} catch (Exception e) {
140+
LOGGER.warn("Caught exception while constructing partition function for column: {}, segment: {}, table: {}",
141+
partitionColumn, segment, tableNameWithType, e);
142+
}
130143
}
131144
if (columnSegmentPartitionInfoMap.size() == 1) {
132145
String partitionColumn = columnSegmentPartitionInfoMap.keySet().iterator().next();

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,17 @@
3636
import org.apache.pinot.common.request.Identifier;
3737
import org.apache.pinot.common.request.context.RequestContextUtils;
3838
import org.apache.pinot.sql.FilterKind;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
3941

4042

4143
/**
4244
* The {@code MultiPartitionColumnsSegmentPruner} prunes segments based on their partition metadata stored in ZK. The
4345
* pruner supports queries with filter (or nested filter) of EQUALITY and IN predicates.
4446
*/
4547
public class MultiPartitionColumnsSegmentPruner implements SegmentPruner {
48+
private static final Logger LOGGER = LoggerFactory.getLogger(MultiPartitionColumnsSegmentPruner.class);
49+
4650
private final String _tableNameWithType;
4751
private final Set<String> _partitionColumns;
4852
private final Map<String, Map<String, SegmentPartitionInfo>> _segmentColumnPartitionInfoMap =
@@ -140,8 +144,8 @@ private boolean isPartitionMatch(Expression filterExpression,
140144
Identifier identifier = operands.get(0).getIdentifier();
141145
if (identifier != null) {
142146
SegmentPartitionInfo partitionInfo = columnPartitionInfoMap.get(identifier.getName());
143-
return partitionInfo == null || partitionInfo.getPartitions().contains(
144-
partitionInfo.getPartitionFunction().getPartition(RequestContextUtils.getStringValue(operands.get(1))));
147+
return partitionInfo == null || isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(
148+
operands.get(1)));
145149
} else {
146150
return true;
147151
}
@@ -155,8 +159,7 @@ private boolean isPartitionMatch(Expression filterExpression,
155159
}
156160
int numOperands = operands.size();
157161
for (int i = 1; i < numOperands; i++) {
158-
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
159-
.getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
162+
if (isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(i)))) {
160163
return true;
161164
}
162165
}
@@ -169,4 +172,14 @@ private boolean isPartitionMatch(Expression filterExpression,
169172
return true;
170173
}
171174
}
175+
176+
private boolean isPartitionMatch(SegmentPartitionInfo partitionInfo, String value) {
177+
try {
178+
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction().getPartition(value));
179+
} catch (RuntimeException e) {
180+
LOGGER.debug("Failed to evaluate partition function for table: {}, partition column: {}; skipping partition "
181+
+ "pruning", _tableNameWithType, partitionInfo.getPartitionColumn(), e);
182+
return true;
183+
}
184+
}
172185
}

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,17 @@
3535
import org.apache.pinot.common.request.Identifier;
3636
import org.apache.pinot.common.request.context.RequestContextUtils;
3737
import org.apache.pinot.sql.FilterKind;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3840

3941

4042
/**
4143
* The {@code SinglePartitionColumnSegmentPruner} prunes segments based on their partition metadata stored in ZK. The
4244
* pruner supports queries with filter (or nested filter) of EQUALITY and IN predicates.
4345
*/
4446
public class SinglePartitionColumnSegmentPruner implements SegmentPruner {
47+
private static final Logger LOGGER = LoggerFactory.getLogger(SinglePartitionColumnSegmentPruner.class);
48+
4549
private final String _tableNameWithType;
4650
private final String _partitionColumn;
4751
private final Map<String, SegmentPartitionInfo> _partitionInfoMap = new ConcurrentHashMap<>();
@@ -129,8 +133,7 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
129133
case EQUALS: {
130134
Identifier identifier = operands.get(0).getIdentifier();
131135
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
132-
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
133-
.getPartition(RequestContextUtils.getStringValue(operands.get(1))));
136+
return isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(1)));
134137
} else {
135138
return true;
136139
}
@@ -140,8 +143,7 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
140143
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
141144
int numOperands = operands.size();
142145
for (int i = 1; i < numOperands; i++) {
143-
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
144-
.getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
146+
if (isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(i)))) {
145147
return true;
146148
}
147149
}
@@ -154,4 +156,14 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
154156
return true;
155157
}
156158
}
159+
160+
private boolean isPartitionMatch(SegmentPartitionInfo partitionInfo, String value) {
161+
try {
162+
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction().getPartition(value));
163+
} catch (RuntimeException e) {
164+
LOGGER.debug("Failed to evaluate partition function for table: {}, partition column: {}; skipping partition "
165+
+ "pruning", _tableNameWithType, _partitionColumn, e);
166+
return true;
167+
}
168+
}
157169
}

pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashSet;
2424
import java.util.Map;
2525
import java.util.Set;
26+
import javax.annotation.Nullable;
2627
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
2728
import org.apache.helix.model.ExternalView;
2829
import org.apache.helix.model.IdealState;
@@ -36,7 +37,11 @@
3637
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
3738
import org.apache.pinot.controller.helix.ControllerTest;
3839
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
40+
import org.apache.pinot.segment.spi.partition.PartitionFunction;
41+
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
3942
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
43+
import org.apache.pinot.segment.spi.partition.pipeline.PartitionPipelineFunction;
44+
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
4045
import org.testng.annotations.AfterClass;
4146
import org.testng.annotations.BeforeClass;
4247
import org.testng.annotations.Test;
@@ -83,8 +88,8 @@ public void testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
8388
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
8489

8590
SegmentPartitionMetadataManager partitionMetadataManager =
86-
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN, PARTITION_COLUMN_FUNC,
87-
NUM_PARTITIONS);
91+
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN,
92+
new ColumnPartitionConfig(PARTITION_COLUMN_FUNC, NUM_PARTITIONS));
8893
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
8994
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
9095
segmentZkMetadataFetcher.register(partitionMetadataManager);
@@ -284,11 +289,89 @@ public void testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
284289
assertEquals(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().get(0), segmentInvalid);
285290
}
286291

292+
@Test
293+
public void testPartitionMetadataManagerProcessingWithFunctionExpr() {
294+
ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
295+
Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
296+
Set<String> onlineSegments = new HashSet<>();
297+
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
298+
String functionExpr = "fnv1a_32(md5(" + PARTITION_COLUMN + "))";
299+
String segment = "exprSegment";
300+
301+
SegmentPartitionMetadataManager partitionMetadataManager =
302+
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN,
303+
ColumnPartitionConfig.forFunctionExpr(functionExpr, 8, "MASK"));
304+
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
305+
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
306+
segmentZkMetadataFetcher.register(partitionMetadataManager);
307+
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
308+
309+
onlineSegments.add(segment);
310+
segmentAssignment.put(segment, Collections.singletonMap(SERVER_0, ONLINE));
311+
setSegmentZKMetadata(segment, PartitionPipelineFunction.NAME, 8, 1, functionExpr, "MASK", 0L);
312+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
313+
314+
TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo =
315+
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
316+
assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap()[1]._segments,
317+
Collections.singleton(segment));
318+
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
319+
320+
setSegmentZKMetadata(segment, PartitionPipelineFunction.NAME, 8, 1, functionExpr, "POSITIVE_MODULO", 0L);
321+
segmentZkMetadataFetcher.refreshSegment(segment);
322+
323+
tablePartitionReplicatedServersInfo = partitionMetadataManager.getTablePartitionReplicatedServersInfo();
324+
assertEquals(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition(),
325+
Collections.singletonList(segment));
326+
}
327+
328+
@Test
329+
public void testPartitionMetadataManagerProcessingWithFunctionExprAbsNormalizer() {
330+
ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
331+
Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
332+
Set<String> onlineSegments = new HashSet<>();
333+
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
334+
String functionExpr = "murmur2(" + PARTITION_COLUMN + ")";
335+
String segment = "exprAbsSegment";
336+
337+
SegmentPartitionMetadataManager partitionMetadataManager =
338+
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN,
339+
ColumnPartitionConfig.forFunctionExpr(functionExpr, 8, "ABS"));
340+
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
341+
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
342+
segmentZkMetadataFetcher.register(partitionMetadataManager);
343+
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
344+
345+
onlineSegments.add(segment);
346+
segmentAssignment.put(segment, Collections.singletonMap(SERVER_0, ONLINE));
347+
setSegmentZKMetadata(segment, PartitionPipelineFunction.NAME, 8, 1, functionExpr, "ABS", 0L);
348+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
349+
350+
TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo =
351+
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
352+
assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap()[1]._segments,
353+
Collections.singleton(segment));
354+
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
355+
}
356+
287357
private void setSegmentZKMetadata(String segment, String partitionFunction, int numPartitions, int partitionId,
288358
long creationTimeMs) {
359+
setSegmentZKMetadata(segment, partitionFunction, numPartitions, partitionId, null, null, creationTimeMs);
360+
}
361+
362+
private void setSegmentZKMetadata(String segment, String partitionFunction, int numPartitions, int partitionId,
363+
@Nullable String functionExpr, long creationTimeMs) {
364+
setSegmentZKMetadata(segment, partitionFunction, numPartitions, partitionId, functionExpr, null, creationTimeMs);
365+
}
366+
367+
private void setSegmentZKMetadata(String segment, String partitionFunction, int numPartitions, int partitionId,
368+
@Nullable String functionExpr, @Nullable String partitionIdNormalizer, long creationTimeMs) {
289369
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);
370+
PartitionFunction effectivePartitionFunction =
371+
PartitionFunctionFactory.getPartitionFunction(PARTITION_COLUMN, partitionFunction, numPartitions, null,
372+
functionExpr, partitionIdNormalizer);
290373
segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN,
291-
new ColumnPartitionMetadata(partitionFunction, numPartitions, Collections.singleton(partitionId), null))));
374+
new ColumnPartitionMetadata(effectivePartitionFunction, Collections.singleton(partitionId)))));
292375
segmentZKMetadata.setCreationTime(creationTimeMs);
293376
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segmentZKMetadata);
294377
}

0 commit comments

Comments
 (0)