Skip to content

Commit 5f3d045

Browse files
xiangfu0claude
andcommitted
Add codec pipeline framework for raw forward index encoding (V7)
Introduces a self-describing codec DSL (DELTA, DELTADELTA, ZSTD(N), LZ4, SNAPPY, GZIP, CODEC(...)) and a new on-disk format version 7 that stores the canonical codec spec in the file header. The pipeline is codec-agnostic: each codec implements the ChunkCodecHandler SPI (encode/decode/decodeInto/maxEncodedSize) and is dispatched polymorphically by CodecPipelineExecutor. Key components: - Codec DSL AST + parser (CodecSpecParser) in pinot-segment-spi - ChunkCodecHandler SPI and CodecRegistry/Validator/Executor in pinot-segment-local - V7 fixed-byte chunk writer (FixedByteChunkForwardIndexWriterV7) and reader (FixedByteChunkSVForwardIndexReaderV7) for INT/LONG SV columns - ForwardIndexCreatorFactory + ForwardIndexReaderFactory dispatch for V7 - ForwardIndexConfig.codecSpec field (mutually exclusive with compressionCodec); forces writer version 7 - ForwardIndexHandler V7-to-legacy rollback detection on segment reload - CompressionCodecMigrator: type-agnostic and schema-aware helpers to translate legacy compressionCodec to codecSpec - TableConfigUtils.validateCodecSpecIfPresent for table-level validation - FieldConfig.withCodecSpec builder method - CodecPipelineIntegrationTest: end-to-end segment build + query test covering INT/LONG codec-pipeline columns and STRING dictionary column Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
1 parent 3eff094 commit 5f3d045

40 files changed

Lines changed: 5477 additions & 16 deletions

File tree

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.integration.tests.custom;
20+
21+
import com.fasterxml.jackson.databind.JsonNode;
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import javax.annotation.Nullable;
27+
import org.apache.avro.file.DataFileWriter;
28+
import org.apache.avro.generic.GenericData;
29+
import org.apache.pinot.spi.config.table.FieldConfig;
30+
import org.apache.pinot.spi.config.table.TableConfig;
31+
import org.apache.pinot.spi.config.table.TableType;
32+
import org.apache.pinot.spi.data.FieldSpec;
33+
import org.apache.pinot.spi.data.Schema;
34+
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
35+
import org.testng.annotations.Test;
36+
37+
import static org.testng.Assert.assertEquals;
38+
39+
40+
/**
41+
* Integration test for the codec pipeline forward index (version 7).
42+
* Writes an offline table with INT and LONG raw columns encoded via the CODEC(DELTA,ZSTD(3)) pipeline,
43+
* plus a STRING column stored with dictionary encoding, verifying both encoding paths coexist in
44+
* the same segment. Exercises the full segment build → query path.
45+
*/
46+
@Test(suiteName = "CustomClusterIntegrationTest")
47+
public class CodecPipelineIntegrationTest extends CustomDataQueryClusterIntegrationTest {
48+
49+
private static final String TABLE_NAME = "CodecPipelineIntegrationTest";
50+
private static final int NUM_DOCS = 1000;
51+
52+
private static final String INT_COL = "intVal";
53+
private static final String LONG_COL = "longVal";
54+
private static final String STR_COL = "strVal";
55+
private static final String TIME_COL = "ts";
56+
57+
// Predictable values: intVal[i] = i, longVal[i] = i * 1_000_000_000L
58+
// Sum(0..999) = 499_500
59+
private static final long EXPECTED_INT_SUM = 499_500L;
60+
// Sum(0..999) * 1_000_000_000 = 499_500 * 1_000_000_000 = 499_500_000_000_000L
61+
private static final long EXPECTED_LONG_SUM = 499_500L * 1_000_000_000L;
62+
63+
@Override
64+
public String getTableName() {
65+
return TABLE_NAME;
66+
}
67+
68+
@Override
69+
public Schema createSchema() {
70+
return new Schema.SchemaBuilder().setSchemaName(getTableName())
71+
.addMetric(INT_COL, FieldSpec.DataType.INT)
72+
.addMetric(LONG_COL, FieldSpec.DataType.LONG)
73+
.addSingleValueDimension(STR_COL, FieldSpec.DataType.STRING)
74+
.addDateTimeField(TIME_COL, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
75+
.build();
76+
}
77+
78+
@Override
79+
public List<File> createAvroFiles()
80+
throws IOException {
81+
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("codecRecord", null, null, false);
82+
avroSchema.setFields(List.of(
83+
new org.apache.avro.Schema.Field(INT_COL, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT),
84+
null, null),
85+
new org.apache.avro.Schema.Field(LONG_COL, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
86+
null, null),
87+
new org.apache.avro.Schema.Field(STR_COL, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
88+
null, null),
89+
new org.apache.avro.Schema.Field(TIME_COL, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
90+
null, null)));
91+
92+
try (AvroFilesAndWriters avroFilesAndWriters = createAvroFilesAndWriters(avroSchema)) {
93+
List<DataFileWriter<GenericData.Record>> writers = avroFilesAndWriters.getWriters();
94+
for (int i = 0; i < NUM_DOCS; i++) {
95+
GenericData.Record record = new GenericData.Record(avroSchema);
96+
record.put(INT_COL, i);
97+
record.put(LONG_COL, (long) i * 1_000_000_000L);
98+
record.put(STR_COL, "str_" + i);
99+
record.put(TIME_COL, (long) i);
100+
writers.get(i % getNumAvroFiles()).append(record);
101+
}
102+
return avroFilesAndWriters.getAvroFiles();
103+
}
104+
}
105+
106+
@Override
107+
public String getTimeColumnName() {
108+
return TIME_COL;
109+
}
110+
111+
@Override
112+
protected long getCountStarResult() {
113+
return NUM_DOCS;
114+
}
115+
116+
@Override
117+
public TableConfig createOfflineTableConfig() {
118+
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName())
119+
.setNoDictionaryColumns(getNoDictionaryColumns())
120+
.setFieldConfigList(getFieldConfigs())
121+
.build();
122+
}
123+
124+
@Override
125+
protected List<String> getNoDictionaryColumns() {
126+
// STR_COL uses a dictionary (default), so it is intentionally NOT in this list
127+
return List.of(INT_COL, LONG_COL);
128+
}
129+
130+
@Override
131+
protected List<FieldConfig> getFieldConfigs() {
132+
List<FieldConfig> fieldConfigs = new ArrayList<>();
133+
// INT column with full DELTA+ZSTD pipeline (codec pipeline, raw, no dict)
134+
fieldConfigs.add(new FieldConfig.Builder(INT_COL)
135+
.withEncodingType(FieldConfig.EncodingType.RAW)
136+
.withCodecSpec("CODEC(DELTA,ZSTD(3))")
137+
.build());
138+
// LONG column with ZSTD-only (codec pipeline, raw, no dict)
139+
fieldConfigs.add(new FieldConfig.Builder(LONG_COL)
140+
.withEncodingType(FieldConfig.EncodingType.RAW)
141+
.withCodecSpec("ZSTD(3)")
142+
.build());
143+
// STR_COL with dictionary encoding — verifies codec-pipeline and dict columns coexist in the same segment
144+
fieldConfigs.add(new FieldConfig.Builder(STR_COL)
145+
.withEncodingType(FieldConfig.EncodingType.DICTIONARY)
146+
.build());
147+
return fieldConfigs;
148+
}
149+
150+
@Nullable
151+
@Override
152+
protected String getSortedColumn() {
153+
return null;
154+
}
155+
156+
@Nullable
157+
@Override
158+
protected List<String> getInvertedIndexColumns() {
159+
return null;
160+
}
161+
162+
@Nullable
163+
@Override
164+
protected List<String> getRangeIndexColumns() {
165+
return null;
166+
}
167+
168+
@Nullable
169+
@Override
170+
protected List<String> getBloomFilterColumns() {
171+
return null;
172+
}
173+
174+
@Test(dataProvider = "useBothQueryEngines")
175+
public void testSumQueries(boolean useMultiStageQueryEngine)
176+
throws Exception {
177+
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
178+
179+
// Verify COUNT(*)
180+
String countQuery = "SELECT COUNT(*) FROM " + getTableName();
181+
JsonNode countResult = postQuery(countQuery);
182+
assertEquals(countResult.get("resultTable").get("rows").get(0).get(0).asLong(), NUM_DOCS,
183+
"Unexpected row count");
184+
185+
// Verify SUM(intVal) — exercises delta-decoded INT reads
186+
String intSumQuery = "SELECT SUM(intVal) FROM " + getTableName();
187+
JsonNode intSumResult = postQuery(intSumQuery);
188+
assertEquals(intSumResult.get("resultTable").get("rows").get(0).get(0).asLong(), EXPECTED_INT_SUM,
189+
"Unexpected SUM(intVal)");
190+
191+
// Verify SUM(longVal) — exercises ZSTD-only LONG reads
192+
String longSumQuery = "SELECT SUM(longVal) FROM " + getTableName();
193+
JsonNode longSumResult = postQuery(longSumQuery);
194+
assertEquals(longSumResult.get("resultTable").get("rows").get(0).get(0).asLong(), EXPECTED_LONG_SUM,
195+
"Unexpected SUM(longVal)");
196+
}
197+
198+
@Test(dataProvider = "useBothQueryEngines")
199+
public void testFilterQueries(boolean useMultiStageQueryEngine)
200+
throws Exception {
201+
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
202+
203+
// Filter on INT column — verifies individual value decoding is correct
204+
// intVal < 100 → 100 rows (values 0..99)
205+
String intFilterQuery = "SELECT COUNT(*) FROM " + getTableName() + " WHERE intVal < 100";
206+
JsonNode intFilterResult = postQuery(intFilterQuery);
207+
assertEquals(intFilterResult.get("resultTable").get("rows").get(0).get(0).asLong(), 100L,
208+
"Unexpected count for intVal < 100");
209+
210+
// Filter on LONG column — verifies LONG decoding
211+
// longVal < 100_000_000_000L → rows with i < 100 → 100 rows
212+
String longFilterQuery =
213+
"SELECT COUNT(*) FROM " + getTableName() + " WHERE longVal < 100000000000";
214+
JsonNode longFilterResult = postQuery(longFilterQuery);
215+
assertEquals(longFilterResult.get("resultTable").get("rows").get(0).get(0).asLong(), 100L,
216+
"Unexpected count for longVal < 100_000_000_000");
217+
}
218+
219+
/**
220+
* Verifies specific individual values via point lookups to detect per-doc decoding errors.
221+
* A codec that misapplies delta or reads stale chunks would produce wrong individual values
222+
* even if aggregate queries (SUM, COUNT) happen to be correct.
223+
*/
224+
@Test(dataProvider = "useBothQueryEngines")
225+
public void testPointLookups(boolean useMultiStageQueryEngine)
226+
throws Exception {
227+
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
228+
229+
// Spot-check a few specific docs across different chunks
230+
int[] spotCheckIds = {0, 1, 511, 512, 513, 999};
231+
for (int id : spotCheckIds) {
232+
// intVal = id for row where ts = id (ts is unique and equals the doc index)
233+
String intQuery = "SELECT intVal FROM " + getTableName() + " WHERE ts = " + id;
234+
JsonNode intResult = postQuery(intQuery);
235+
assertEquals(intResult.get("resultTable").get("rows").get(0).get(0).asInt(), id,
236+
"Wrong intVal for ts=" + id);
237+
238+
// longVal = id * 1_000_000_000L
239+
String longQuery = "SELECT longVal FROM " + getTableName() + " WHERE ts = " + id;
240+
JsonNode longResult = postQuery(longQuery);
241+
assertEquals(longResult.get("resultTable").get("rows").get(0).get(0).asLong(),
242+
(long) id * 1_000_000_000L, "Wrong longVal for ts=" + id);
243+
}
244+
}
245+
246+
/**
247+
* Verifies that a STRING column stored with dictionary encoding (not codec pipeline) reads back
248+
* correctly alongside codec-pipeline columns, confirming both can coexist in the same segment.
249+
*/
250+
@Test(dataProvider = "useBothQueryEngines")
251+
public void testStringColumnWithDictEncoding(boolean useMultiStageQueryEngine)
252+
throws Exception {
253+
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
254+
255+
// Spot-check a few string values: strVal = "str_<ts>"
256+
int[] spotCheckIds = {0, 42, 500, 999};
257+
for (int id : spotCheckIds) {
258+
String strQuery = "SELECT strVal FROM " + getTableName() + " WHERE ts = " + id;
259+
JsonNode result = postQuery(strQuery);
260+
assertEquals(result.get("resultTable").get("rows").get(0).get(0).asText(), "str_" + id,
261+
"Wrong strVal for ts=" + id);
262+
}
263+
264+
// Verify COUNT DISTINCT to confirm all unique string values are stored
265+
String countDistinctQuery = "SELECT COUNT(DISTINCT strVal) FROM " + getTableName();
266+
JsonNode countDistinctResult = postQuery(countDistinctQuery);
267+
assertEquals(countDistinctResult.get("resultTable").get("rows").get(0).get(0).asLong(), NUM_DOCS,
268+
"Expected all " + NUM_DOCS + " distinct string values");
269+
}
270+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.io.codec;
20+
21+
import java.nio.ByteBuffer;
22+
23+
24+
/** Package-private buffer helpers shared across codec handler implementations. */
25+
final class CodecBufferUtils {
26+
27+
private CodecBufferUtils() {
28+
}
29+
30+
/** Returns {@code buf} if already direct; otherwise copies into a new direct buffer. */
31+
static ByteBuffer toDirectBuffer(ByteBuffer buf) {
32+
if (buf.isDirect()) {
33+
return buf;
34+
}
35+
ByteBuffer direct = ByteBuffer.allocateDirect(buf.remaining());
36+
direct.put(buf.duplicate());
37+
direct.flip();
38+
return direct;
39+
}
40+
41+
/** Copies the readable bytes of {@code buf} into a new heap byte array. */
42+
static byte[] toHeapArray(ByteBuffer buf) {
43+
byte[] bytes = new byte[buf.remaining()];
44+
buf.duplicate().get(bytes);
45+
return bytes;
46+
}
47+
}

0 commit comments

Comments
 (0)