Skip to content

Commit 784c483

Browse files
committed
Fix Parquet LIST/MAP wrapper extraction in native and Avro readers
The native (ParquetNativeRecordReader) and Avro-backed (ParquetAvroRecordReader) Parquet readers were leaking the LIST/MAP wrapper structs into Pinot rows, so a column of array<string> came back as [{"element":"abc"}, {"element":"xyz"}] instead of ["abc", "xyz"], and a map<string,string> came back as {"key_value": [{"key":"k","value":"v"}]} instead of {"k":"v"}. The broken shape propagated into segment generation, multi-value scans, and JSON paths. Approach: align with Apache Arrow / Parquet LogicalTypes spec ------------------------------------------------------------- Both readers now follow Apache Arrow's Parquet behavior — wrapper detection is driven by the Parquet LogicalType annotations and the spec backward-compat rules, never by guessing from value shape or field names. Avro reader: set `parquet.avro.add-list-element-records=false` in the Hadoop config so parquet-avro flattens the standard 3-level LIST encoding directly to Avro `array<elem-type>`. With this off, there is no LIST wrapper to strip on the Pinot side — user-defined records like `array<record<UserTag, [element]>>` round-trip cleanly because the file's Avro schema (when present in metadata) is honored as-is, and hand-authored Parquet `LIST<T>` surfaces as flat values without the wrapper artifact. The extractor reduces to plain delegation plus the existing INT96 promotion. Native reader (ParquetNativeRecordExtractor): apply the Parquet LogicalTypes backward-compat rules in extractList: 1. Repeated primitive: the primitive IS the element (no wrapper). 2. Repeated multi-field group: the group IS the element. 3. Repeated single-field group named `array` or `<list>_tuple`: the group IS the element (legacy convention). 4. Otherwise (single-field group, any other name): the inner field IS the element — strip the wrapper. Also hoists isListElementWrapper out of the per-row loop and resolves key/value field indices once for MAP entries. Documents that Parquet does NOT guarantee MAP read order; users wanting a stable order should use LIST<STRUCT<key, value>> instead. Behavior matches Apache Arrow / parquet-cpp / parquet-avro (with add-list-element-records=false) and the Parquet LogicalTypes spec, so the same Parquet bytes produce the same logical rows across readers. Tests ----- ParquetCollectionRecordReaderTest covers: - Hand-authored Parquet schemas through both readers. - Avro-schema-written Parquet files through both readers. - A checked-in golden Parquet fixture (collection-reader-fixture.parquet) with primitive types, DECIMAL/DATE/TIMESTAMP logical types, nested structs, LIST and MAP of scalars, struct lists, empty collections, and a real struct field named `element`. - Legacy LIST encodings (single-field non-`element` is flattened per spec rule 4; multi-field group is preserved per rule 2). - Nullable list elements. - Nested LIST<LIST<STRING>> through the Avro reader, including null inner element and null inner wrapper. - A regression test for user-authored `array<record<UserTag, [element: string]>>` confirming the inner records survive untouched (case B). Fixes #17420
1 parent 068907e commit 784c483

6 files changed

Lines changed: 851 additions & 15 deletions

File tree

pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,20 @@
2525
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
2626

2727

28+
/// Extracts Pinot rows from Avro [org.apache.avro.generic.GenericRecord]s materialized by parquet-avro.
29+
///
30+
/// The reader sets `parquet.avro.add-list-element-records=false` in the Hadoop configuration, which tells
31+
/// parquet-avro to flatten the standard Parquet 3-level LIST encoding
32+
/// (`<list-rep> group <name> (LIST) { repeated group list { <elem-type> element; } }`) directly to an Avro
33+
/// `array<elem-type>`. This matches Apache Arrow's Parquet reader behavior and means there is no LIST wrapper
34+
/// to strip on the Pinot side — user-defined records like `array<record<UserTag, [element]>>` round-trip
35+
/// cleanly because the file's Avro schema is honored as-is, and hand-authored Parquet `LIST<T>` surfaces as
36+
/// flat values without a wrapper artifact.
2837
public class ParquetAvroRecordExtractor extends AvroRecordExtractor {
2938

3039
@Override
3140
public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) {
32-
super.init(fields, recordExtractorConfig);
41+
super.init(fields, recordExtractorConfig);
3342
}
3443

3544
@Override

pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java

Lines changed: 92 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@
3939
import org.joda.time.DateTimeConstants;
4040

4141

42-
/**
43-
* ParquetNativeRecordExtractor extract values from Parquet {@link Group}.
44-
*/
42+
/// Extracts Pinot rows directly from Parquet [Group] objects, using Parquet [LogicalTypeAnnotation]s to drive
43+
/// LIST and MAP handling per the Parquet LogicalTypes spec backward-compatibility rules. Behavior matches
44+
/// Apache Arrow's Parquet reader so the same Parquet bytes produce the same logical rows across readers.
4545
public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
4646

4747
/**
@@ -161,6 +161,9 @@ private Object extractValue(Group from, int fieldIndex, Type fieldType, int inde
161161
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
162162
return extractList(group);
163163
}
164+
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
165+
return extractKeyValueMap(group);
166+
}
164167
return extractMap(group);
165168
}
166169
return null;
@@ -173,26 +176,101 @@ public static long convertInt96ToLong(byte[] int96Bytes) {
173176
}
174177

175178
public Object[] extractList(Group group) {
176-
int numValues = group.getType().getFieldCount();
177-
Object[] array = new Object[numValues];
178-
for (int i = 0; i < numValues; i++) {
179-
array[i] = extractValue(group, i);
180-
}
181-
if (numValues == 1 && array[0] == null) {
182-
return new Object[0];
183-
}
184-
if (numValues == 1 && array[0] instanceof Object[]) {
185-
return (Object[]) array[0];
179+
// Group is annotated with LIST. Per the Parquet LogicalTypes spec it always has exactly one child, the
180+
// repeated wrapper. The wrapper's schema (not the row data) decides which encoding we are reading, so we
181+
// resolve that once here and dispatch the whole list down a single branch.
182+
String parentListName = group.getType().getName();
183+
Type repeatedField = group.getType().getType(0);
184+
int numValues = group.getFieldRepetitionCount(0);
185+
Object[] values = new Object[numValues];
186+
if (isListElementWrapper(repeatedField, parentListName)) {
187+
// Standard 3-level LIST encoding (modern Parquet writers, parquet-avro, Spark, Arrow, …):
188+
// <list-rep> group <name> (LIST) {
189+
// repeated group list { // wrapper carries only repetition
190+
// <elem-rep> <elem-type> element;
191+
// }
192+
// }
193+
// Per the Parquet LogicalTypes backward-compat rules this also covers any single-field repeated group whose
194+
// wrapper field is NOT named `array` or `<list>_tuple` (rule 4) — the inner field IS the element, regardless
195+
// of its name. Strip the wrapper so each row surfaces the bare element value, matching Apache Arrow /
196+
// parquet-avro (with `parquet.avro.add-list-element-records=false`).
197+
for (int i = 0; i < numValues; i++) {
198+
values[i] = extractValue(group.getGroup(0, i), 0);
199+
}
200+
} else {
201+
// Legacy non-wrapper repeated forms:
202+
// * Repeated primitive: `repeated <elem-type> <name>;` (rule 1) — `repeatedField.isPrimitive()`.
203+
// * Repeated multi-field group: `repeated group <name> { <fields…> };` (rule 2) — the group IS the element.
204+
// * Repeated single-field group named `array` or `<list>_tuple` (rule 3) — the group IS the element,
205+
// preserved as a struct/Map.
206+
// In all of these the repeated field is the element, so we extract it directly without unwrapping.
207+
for (int i = 0; i < numValues; i++) {
208+
values[i] = extractValue(group, 0, repeatedField, i);
209+
}
186210
}
187-
return array;
211+
return values;
188212
}
189213

190214
public Map<String, Object> extractMap(Group group) {
215+
// Plain Parquet group (no LIST/MAP annotation) — surfaces as a Map keyed by child field name.
216+
// Reached for nested struct fields and for groups read with the legacy/un-annotated branch.
191217
int numValues = group.getType().getFieldCount();
192218
Map<String, Object> map = Maps.newHashMapWithExpectedSize(numValues);
193219
for (int i = 0; i < numValues; i++) {
194220
map.put(group.getType().getType(i).getName(), extractValue(group, i));
195221
}
196222
return map;
197223
}
224+
225+
private Map<String, Object> extractKeyValueMap(Group group) {
226+
// Group is annotated with MAP. Per the Parquet LogicalTypes spec it always has exactly one child — a repeated
227+
// group with two fields named "key" and "value":
228+
// <map-repetition> group <name> (MAP) {
229+
// repeated group key_value {
230+
// required <key-type> key;
231+
// <value-repetition> <value-type> value;
232+
// }
233+
// }
234+
// The repeated wrapper name (`key_value` / `map`) and field order vary across writers, so we resolve the
235+
// key/value field indices from the schema once and reuse them for every entry.
236+
//
237+
// NOTE: Parquet does NOT guarantee that MAP entries are returned in any particular order on read — neither
238+
// sorted nor in insertion order. Writers, page boundaries, and dictionary encodings can all reorder entries.
239+
// If you need a stable order, write the data as a LIST of STRUCT<key, value> instead of using the native MAP
240+
// logical type. We therefore use a plain HashMap here and make no ordering promise to downstream consumers.
241+
int numValues = group.getFieldRepetitionCount(0);
242+
if (numValues == 0) {
243+
return Map.of();
244+
}
245+
GroupType keyValueType = group.getType().getType(0).asGroupType();
246+
int keyIndex = keyValueType.getFieldIndex("key");
247+
int valueIndex = keyValueType.getFieldIndex("value");
248+
Map<String, Object> map = Maps.newHashMapWithExpectedSize(numValues);
249+
for (int i = 0; i < numValues; i++) {
250+
Group keyValueGroup = group.getGroup(0, i);
251+
Object key = extractValue(keyValueGroup, keyIndex);
252+
Object value = extractValue(keyValueGroup, valueIndex);
253+
map.put(key.toString(), value);
254+
}
255+
return map;
256+
}
257+
258+
private boolean isListElementWrapper(Type repeatedField, String parentListName) {
259+
// Implements the Parquet LogicalTypes spec backward-compatibility rules for LIST element resolution:
260+
// 1. Repeated primitive: the primitive IS the element (no wrapper).
261+
// 2. Repeated multi-field group: the group IS the element (no wrapper).
262+
// 3. Repeated single-field group named `array` or `<list>_tuple`: the group IS the element (no wrapper).
263+
// 4. Otherwise (single-field group, any other name): the inner field IS the element (wrapper present).
264+
// This mirrors how Apache Arrow / parquet-cpp / parquet-avro (with add-list-element-records=false) interpret
265+
// LIST encodings, so the same Parquet bytes produce the same logical rows across readers.
266+
if (repeatedField.isPrimitive()) {
267+
return false;
268+
}
269+
GroupType repeatedGroup = repeatedField.asGroupType();
270+
if (repeatedGroup.getFieldCount() != 1) {
271+
return false;
272+
}
273+
String repeatedFieldName = repeatedField.getName();
274+
return !"array".equals(repeatedFieldName) && !(parentListName + "_tuple").equals(repeatedFieldName);
275+
}
198276
}

pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ public static Configuration getParquetHadoopConfiguration() {
9292
conf.set("fs.defaultFS", DEFAULT_FS);
9393
// To read Int96 as bytes.
9494
conf.set(AvroReadSupport.READ_INT96_AS_FIXED, "true");
95+
// Tell parquet-avro NOT to surface the 3-level Parquet LIST encoding's repeated wrapper as an Avro record.
96+
// With the default (true), a Parquet `LIST<STRING>` reads back as Avro `array<record<element: string>>` and
97+
// we have to strip that wrapper ourselves; with this off, parquet-avro materializes the Avro schema flat
98+
// (`array<string>`), matching Apache Arrow's Parquet reader behavior. This eliminates the wrapper-stripping
99+
// ambiguity entirely — real user records (e.g. `array<record<UserTag, [element: string]>>` written from an
100+
// Avro source) survive untouched because their Avro schema in the file metadata is honored as-is.
101+
conf.set("parquet.avro.add-list-element-records", "false");
95102
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
96103
return conf;
97104
}

0 commit comments

Comments
 (0)