Skip to content

Commit 797fd1c

Browse files
xiangfu0claude
andcommitted
Fix review findings: move ChunkCodecHandler to pinot-segment-spi, fix buffer and type guards
- Move ChunkCodecHandler interface to pinot-segment-spi alongside CodecDefinition - Add dst.clear() in ZstdCodecDefinition.decodeInto and Lz4CodecDefinition.decodeInto - Guard empty-pipeline at construction time with a clear IllegalArgumentException - Fix Delta/DeltaDelta encode to throw on unsupported stored types (restore old guard) - Update Preconditions message to include canonical spec instead of hardcoded codec names - Fix stale CodecDefinition Javadoc reference in CodecPipelineValidator Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
1 parent 7755942 commit 797fd1c

10 files changed

Lines changed: 30 additions & 12 deletions

File tree

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/CodecPipelineExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.nio.ByteBuffer;
2424
import java.util.ArrayList;
2525
import java.util.List;
26+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2627
import org.apache.pinot.segment.spi.codec.CodecContext;
2728
import org.apache.pinot.segment.spi.codec.CodecInvocation;
2829
import org.apache.pinot.segment.spi.codec.CodecKind;
@@ -114,6 +115,9 @@ public static CodecPipelineExecutor create(String spec, CodecContext ctx, CodecR
114115
@SuppressWarnings({"unchecked", "rawtypes"})
115116
private CodecPipelineExecutor(CodecPipeline pipeline, CodecRegistry registry, CodecContext ctx) {
116117
List<CodecInvocation> invocations = pipeline.stages();
118+
if (invocations.isEmpty()) {
119+
throw new IllegalArgumentException("Codec pipeline must contain at least one stage");
120+
}
117121
List<BoundStage<?>> stages = new ArrayList<>(invocations.size());
118122

119123
for (CodecInvocation inv : invocations) {
@@ -193,7 +197,7 @@ public ByteBuffer decompress(ByteBuffer src) throws IOException {
193197
*/
194198
public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException {
195199
Preconditions.checkArgument(!_requiresDirectDstBuffer || dst.isDirect(),
196-
"decompress(src, dst) requires a direct ByteBuffer when the pipeline includes Zstd or Snappy");
200+
"decompress(src, dst) requires a direct ByteBuffer for pipeline: %s", _canonicalSpec);
197201

198202
int stageCount = _stages.size();
199203
if (stageCount == 1) {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/CodecPipelineValidator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pinot.segment.local.io.codec;
2020

2121
import java.util.List;
22+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2223
import org.apache.pinot.segment.spi.codec.CodecContext;
2324
import org.apache.pinot.segment.spi.codec.CodecInvocation;
2425
import org.apache.pinot.segment.spi.codec.CodecKind;
@@ -35,7 +36,7 @@
3536
* <li>At most one {@link CodecKind#COMPRESSION} stage is allowed.</li>
3637
* <li>The compression stage, if present, must be last.</li>
3738
* <li>All {@link CodecKind#TRANSFORM} stages must precede the compression stage.</li>
38-
* <li>Each codec's {@link CodecDefinition#validateContext} must pass.</li>
39+
* <li>Each codec's {@link ChunkCodecHandler#validateContext} must pass.</li>
3940
* </ol>
4041
*/
4142
public final class CodecPipelineValidator {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/CodecRegistry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.LinkedHashMap;
2323
import java.util.Map;
2424
import javax.annotation.Nullable;
25+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2526
import org.apache.pinot.segment.spi.codec.CodecSpecParser;
2627

2728

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/DeltaCodecDefinition.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.nio.ByteBuffer;
2222
import java.util.List;
23+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2324
import org.apache.pinot.segment.spi.codec.CodecContext;
2425
import org.apache.pinot.segment.spi.codec.CodecKind;
2526
import org.apache.pinot.segment.spi.codec.CodecOptions;
@@ -101,8 +102,11 @@ public ByteBuffer encode(Options options, CodecContext ctx, ByteBuffer src) {
101102
int remaining = src.remaining();
102103
if (ctx.getDataType() == DataType.LONG) {
103104
return encodeLong(src, remaining);
105+
} else if (ctx.getDataType() == DataType.INT) {
106+
return encodeInt(src, remaining);
107+
} else {
108+
throw new IllegalArgumentException("DELTA does not support stored type: " + ctx.getDataType());
104109
}
105-
return encodeInt(src, remaining);
106110
}
107111

108112
@Override

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/DeltaDeltaCodecDefinition.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.nio.ByteBuffer;
2222
import java.util.List;
23+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2324
import org.apache.pinot.segment.spi.codec.CodecContext;
2425
import org.apache.pinot.segment.spi.codec.CodecKind;
2526
import org.apache.pinot.segment.spi.codec.CodecOptions;
@@ -103,8 +104,11 @@ public ByteBuffer encode(Options options, CodecContext ctx, ByteBuffer src) {
103104
int remaining = src.remaining();
104105
if (ctx.getDataType() == DataType.LONG) {
105106
return encodeLong(src, remaining);
107+
} else if (ctx.getDataType() == DataType.INT) {
108+
return encodeInt(src, remaining);
109+
} else {
110+
throw new IllegalArgumentException("DELTADELTA does not support stored type: " + ctx.getDataType());
106111
}
107-
return encodeInt(src, remaining);
108112
}
109113

110114
@Override

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/GzipCodecDefinition.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.zip.DataFormatException;
2525
import java.util.zip.Deflater;
2626
import java.util.zip.Inflater;
27+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2728
import org.apache.pinot.segment.spi.codec.CodecContext;
2829
import org.apache.pinot.segment.spi.codec.CodecKind;
2930
import org.apache.pinot.segment.spi.codec.CodecOptions;

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/Lz4CodecDefinition.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import net.jpountz.lz4.LZ4CompressorWithLength;
2525
import net.jpountz.lz4.LZ4DecompressorWithLength;
2626
import net.jpountz.lz4.LZ4Factory;
27+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2728
import org.apache.pinot.segment.spi.codec.CodecContext;
2829
import org.apache.pinot.segment.spi.codec.CodecKind;
2930
import org.apache.pinot.segment.spi.codec.CodecOptions;
@@ -115,6 +116,7 @@ public ByteBuffer decode(Options options, CodecContext ctx, ByteBuffer src) thro
115116

116117
@Override
117118
public void decodeInto(Options options, CodecContext ctx, ByteBuffer src, ByteBuffer dst) throws IOException {
119+
dst.clear();
118120
LZ4DecompressorWithLength decompressor = new LZ4DecompressorWithLength(LZ4_FACTORY.safeDecompressor());
119121
int decompressedLength = LZ4DecompressorWithLength.getDecompressedLength(src);
120122
if (decompressedLength > dst.capacity()) {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/SnappyCodecDefinition.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
2323
import java.util.List;
24+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2425
import org.apache.pinot.segment.spi.codec.CodecContext;
2526
import org.apache.pinot.segment.spi.codec.CodecKind;
2627
import org.apache.pinot.segment.spi.codec.CodecOptions;

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/ZstdCodecDefinition.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.nio.ByteBuffer;
2424
import java.util.List;
2525
import java.util.Objects;
26+
import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
2627
import org.apache.pinot.segment.spi.codec.CodecContext;
2728
import org.apache.pinot.segment.spi.codec.CodecKind;
2829
import org.apache.pinot.segment.spi.codec.CodecOptions;
@@ -166,6 +167,7 @@ public ByteBuffer decode(Options options, CodecContext ctx, ByteBuffer src) thro
166167

167168
@Override
168169
public void decodeInto(Options options, CodecContext ctx, ByteBuffer src, ByteBuffer dst) throws IOException {
170+
dst.clear();
169171
ByteBuffer directSrc = CodecBufferUtils.toDirectBuffer(src);
170172
long decompressedSize = Zstd.decompressedSize(directSrc);
171173
if (decompressedSize <= 0) {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/ChunkCodecHandler.java renamed to pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/codec/ChunkCodecHandler.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,10 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.segment.local.io.codec;
19+
package org.apache.pinot.segment.spi.codec;
2020

2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
23-
import org.apache.pinot.segment.spi.codec.CodecContext;
24-
import org.apache.pinot.segment.spi.codec.CodecDefinition;
25-
import org.apache.pinot.segment.spi.codec.CodecOptions;
2623

2724

2825
/**
@@ -35,8 +32,9 @@
3532
* <ul>
3633
* <li>{@link #encode} and {@link #decode}: {@code src} is ready for read (position=0);
3734
* the returned buffer is ready for read and owned by the caller.</li>
38-
* <li>{@link #decodeInto}: {@code dst} must be pre-cleared (position=0); it is flipped
39-
* (position=0, limit=decoded bytes) on return.</li>
35+
* <li>{@link #decodeInto}: implementations must treat {@code dst} as freshly cleared
36+
* (calling {@code dst.clear()} internally is recommended as a defensive first step);
37+
* {@code dst} is flipped (position=0, limit=decoded bytes) on return.</li>
4038
* </ul>
4139
*
4240
* @param <O> typed {@link CodecOptions} for this codec
@@ -65,15 +63,15 @@ public interface ChunkCodecHandler<O extends CodecOptions> extends CodecDefiniti
6563

6664
/**
6765
* Decodes {@code src} directly into {@code dst}, avoiding an extra allocation.
68-
* {@code dst} must be pre-cleared (position=0); it is flipped on return.
66+
* Implementations must treat {@code dst} as freshly cleared and flip it before returning.
6967
*
7068
* <p>Callers must ensure {@code dst} is a direct {@link ByteBuffer} when
7169
* {@link #requiresDirectDstBuffer()} returns {@code true}.
7270
*
7371
* @param options parsed options for this codec invocation
7472
* @param ctx column context
7573
* @param src encoded data, ready for read
76-
* @param dst output buffer, pre-cleared; must be direct when required
74+
* @param dst output buffer; must be direct when required; must have sufficient capacity
7775
*/
7876
void decodeInto(O options, CodecContext ctx, ByteBuffer src, ByteBuffer dst) throws IOException;
7977

0 commit comments

Comments
 (0)