Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;


/// Extracts Pinot rows from Avro [org.apache.avro.generic.GenericRecord]s materialized by parquet-avro.
///
/// The reader sets `parquet.avro.add-list-element-records=false` in the Hadoop configuration, which tells
/// parquet-avro to flatten the standard Parquet 3-level LIST encoding
/// (`<list-rep> group <name> (LIST) { repeated group list { <elem-type> element; } }`) directly to an Avro
/// `array<elem-type>`. This matches Apache Arrow's Parquet reader behavior and means there is no LIST wrapper
/// to strip on the Pinot side — user-defined records like `array<record<UserTag, [element]>>` round-trip
/// cleanly because the file's Avro schema is honored as-is, and hand-authored Parquet `LIST<T>` surfaces as
/// flat values without a wrapper artifact.
public class ParquetAvroRecordExtractor extends AvroRecordExtractor {

@Override
public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) {
super.init(fields, recordExtractorConfig);
super.init(fields, recordExtractorConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.joda.time.DateTimeConstants;


/**
* ParquetNativeRecordExtractor extract values from Parquet {@link Group}.
*/
/// Extracts Pinot rows directly from Parquet [Group] objects, using Parquet [LogicalTypeAnnotation]s to drive
/// LIST and MAP handling per the Parquet LogicalTypes spec backward-compatibility rules. Behavior matches
/// Apache Arrow's Parquet reader so the same Parquet bytes produce the same logical rows across readers.
public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {

/**
Expand Down Expand Up @@ -161,7 +161,10 @@ private Object extractValue(Group from, int fieldIndex, Type fieldType, int inde
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
return extractList(group);
}
return extractMap(group);
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return extractKeyValueMap(group);
}
return extractStruct(group);
}
return null;
}
Expand All @@ -173,26 +176,101 @@ public static long convertInt96ToLong(byte[] int96Bytes) {
}

public Object[] extractList(Group group) {
int numValues = group.getType().getFieldCount();
Object[] array = new Object[numValues];
for (int i = 0; i < numValues; i++) {
array[i] = extractValue(group, i);
}
if (numValues == 1 && array[0] == null) {
Comment thread
xiangfu0 marked this conversation as resolved.
return new Object[0];
}
if (numValues == 1 && array[0] instanceof Object[]) {
return (Object[]) array[0];
// Group is annotated with LIST. Per the Parquet LogicalTypes spec it always has exactly one child, the
// repeated wrapper. The wrapper's schema (not the row data) decides which encoding we are reading, so we
// resolve that once here and dispatch the whole list down a single branch.
String parentListName = group.getType().getName();
Type repeatedField = group.getType().getType(0);
int numValues = group.getFieldRepetitionCount(0);
Object[] values = new Object[numValues];
if (isStandardListWrapper(repeatedField, parentListName)) {
// Standard 3-level LIST encoding (modern Parquet writers, parquet-avro, Spark, Arrow, …):
// <list-rep> group <name> (LIST) {
// repeated group list { // wrapper carries only repetition
// <elem-rep> <elem-type> element;
// }
// }
// Per the Parquet LogicalTypes backward-compat rules this also covers any single-field repeated group whose
// wrapper field is NOT named `array` or `<list>_tuple` (rule 4) — the inner field IS the element, regardless
// of its name. Strip the wrapper so each row surfaces the bare element value, matching Apache Arrow /
// parquet-avro (with `parquet.avro.add-list-element-records=false`).
for (int i = 0; i < numValues; i++) {
values[i] = extractValue(group.getGroup(0, i), 0);
}
} else {
// Legacy non-wrapper repeated forms:
// * Repeated primitive: `repeated <elem-type> <name>;` (rule 1) — `repeatedField.isPrimitive()`.
// * Repeated multi-field group: `repeated group <name> { <fields…> };` (rule 2) — the group IS the element.
// * Repeated single-field group named `array` or `<list>_tuple` (rule 3) — the group IS the element,
// preserved as a struct/Map.
// In all of these the repeated field is the element, so we extract it directly without unwrapping.
for (int i = 0; i < numValues; i++) {
values[i] = extractValue(group, 0, repeatedField, i);
}
}
return array;
return values;
}

public Map<String, Object> extractMap(Group group) {
public Map<String, Object> extractStruct(Group group) {
// Plain Parquet group (no LIST/MAP annotation) — surfaces as a Map keyed by child field name. Reached for
// nested struct fields and for groups read with the legacy/un-annotated branch.
int numValues = group.getType().getFieldCount();
Map<String, Object> map = Maps.newHashMapWithExpectedSize(numValues);
for (int i = 0; i < numValues; i++) {
map.put(group.getType().getType(i).getName(), extractValue(group, i));
}
return map;
}

private Map<String, Object> extractKeyValueMap(Group group) {
// Group is annotated with MAP. Per the Parquet LogicalTypes spec it always has exactly one child — a repeated
// group with two fields named "key" and "value":
// <map-repetition> group <name> (MAP) {
// repeated group key_value {
// required <key-type> key;
// <value-repetition> <value-type> value;
// }
// }
// The repeated wrapper name (`key_value` / `map`) and field order vary across writers, so we resolve the
// key/value field indices from the schema once and reuse them for every entry.
//
// NOTE: Parquet does NOT guarantee that MAP entries are returned in any particular order on read — neither
// sorted nor in insertion order. Writers, page boundaries, and dictionary encodings can all reorder entries.
// If you need a stable order, write the data as a LIST of STRUCT<key, value> instead of using the native MAP
// logical type. We therefore use a plain HashMap here and make no ordering promise to downstream consumers.
int numValues = group.getFieldRepetitionCount(0);
if (numValues == 0) {
return Map.of();
}
GroupType keyValueType = group.getType().getType(0).asGroupType();
int keyIndex = keyValueType.getFieldIndex("key");
int valueIndex = keyValueType.getFieldIndex("value");
Map<String, Object> map = Maps.newHashMapWithExpectedSize(numValues);
for (int i = 0; i < numValues; i++) {
Group keyValueGroup = group.getGroup(0, i);
Object key = extractValue(keyValueGroup, keyIndex);
Object value = extractValue(keyValueGroup, valueIndex);
map.put(key.toString(), value);
}
return map;
}

private boolean isStandardListWrapper(Type repeatedField, String parentListName) {
// Implements the Parquet LogicalTypes spec backward-compatibility rules for LIST element resolution:
// 1. Repeated primitive: the primitive IS the element (no wrapper).
// 2. Repeated multi-field group: the group IS the element (no wrapper).
// 3. Repeated single-field group named `array` or `<list>_tuple`: the group IS the element (no wrapper).
// 4. Otherwise (single-field group, any other name): the inner field IS the element (wrapper present).
// This mirrors how Apache Arrow / parquet-cpp / parquet-avro (with add-list-element-records=false) interpret
// LIST encodings, so the same Parquet bytes produce the same logical rows across readers.
if (repeatedField.isPrimitive()) {
return false;
}
GroupType repeatedGroup = repeatedField.asGroupType();
if (repeatedGroup.getFieldCount() != 1) {
return false;
}
String repeatedFieldName = repeatedField.getName();
return !"array".equals(repeatedFieldName) && !(parentListName + "_tuple").equals(repeatedFieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ public static Configuration getParquetHadoopConfiguration() {
conf.set("fs.defaultFS", DEFAULT_FS);
// To read Int96 as bytes.
conf.set(AvroReadSupport.READ_INT96_AS_FIXED, "true");
// Tell parquet-avro NOT to surface the 3-level Parquet LIST encoding's repeated wrapper as an Avro record.
// With the default (true), a Parquet `LIST<STRING>` reads back as Avro `array<record<element: string>>` and
// we have to strip that wrapper ourselves; with this off, parquet-avro materializes the Avro schema flat
// (`array<string>`), matching Apache Arrow's Parquet reader behavior. This eliminates the wrapper-stripping
// ambiguity entirely — real user records (e.g. `array<record<UserTag, [element: string]>>` written from an
// Avro source) survive untouched because their Avro schema in the file metadata is honored as-is.
conf.set("parquet.avro.add-list-element-records", "false");
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
return conf;
}
Expand Down
Loading
Loading