Skip to content

Commit 99f4698

Browse files
committed
Fix Parquet collection wrapper extraction
1 parent 2a84fc7 commit 99f4698

4 files changed

Lines changed: 394 additions & 2 deletions

File tree

pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ protected Object transformValue(Object value, Schema.Field field) {
3737
return handleDeprecatedTypes(convert(value), field);
3838
}
3939

40+
@Override
41+
protected Object[] convertMultiValue(Object value) {
42+
return ParquetRecordExtractorUtils.unwrapListElementMaps(super.convertMultiValue(value));
43+
}
44+
4045
Object handleDeprecatedTypes(Object value, Schema.Field field) {
4146
Schema.Type avroColumnType = field.schema().getType();
4247
if (avroColumnType == Schema.Type.UNION) {

pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ private Object extractValue(Group from, int fieldIndex, Type fieldType, int inde
161161
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
162162
return extractList(group);
163163
}
164+
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation
165+
|| logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapKeyValueTypeAnnotation) {
166+
return ParquetRecordExtractorUtils.unwrapMapKeyValues(extractMap(group));
167+
}
164168
return extractMap(group);
165169
}
166170
return null;
@@ -182,9 +186,9 @@ public Object[] extractList(Group group) {
182186
return new Object[0];
183187
}
184188
if (numValues == 1 && array[0] instanceof Object[]) {
185-
return (Object[]) array[0];
189+
return ParquetRecordExtractorUtils.unwrapListElementMaps((Object[]) array[0]);
186190
}
187-
return array;
191+
return ParquetRecordExtractorUtils.unwrapListElementMaps(array);
188192
}
189193

190194
public Map<String, Object> extractMap(Group group) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.plugin.inputformat.parquet;
20+
21+
import java.util.Collection;
22+
import java.util.LinkedHashMap;
23+
import java.util.Map;
24+
25+
26+
/**
27+
* Shared helpers for normalizing Parquet list wrapper records while extracting Pinot rows.
28+
*
29+
* <p>This class is stateless and thread-safe.
30+
*/
31+
final class ParquetRecordExtractorUtils {
32+
private static final String LIST_ELEMENT_FIELD_NAME = "element";
33+
private static final String MAP_ENTRIES_FIELD_NAME = "key_value";
34+
private static final String MAP_KEY_FIELD_NAME = "key";
35+
private static final String MAP_VALUE_FIELD_NAME = "value";
36+
37+
private ParquetRecordExtractorUtils() {
38+
}
39+
40+
static Object[] unwrapListElementMaps(Object[] values) {
41+
if (values.length == 0) {
42+
return values;
43+
}
44+
45+
boolean hasElementMap = false;
46+
for (Object value : values) {
47+
if (value == null) {
48+
continue;
49+
}
50+
if (!(value instanceof Map)) {
51+
return values;
52+
}
53+
Map<?, ?> map = (Map<?, ?>) value;
54+
if (map.size() != 1 || !map.containsKey(LIST_ELEMENT_FIELD_NAME)) {
55+
return values;
56+
}
57+
hasElementMap = true;
58+
}
59+
if (!hasElementMap) {
60+
return values;
61+
}
62+
63+
Object[] unwrappedValues = new Object[values.length];
64+
for (int i = 0; i < values.length; i++) {
65+
Object value = values[i];
66+
unwrappedValues[i] = value == null ? null : ((Map<?, ?>) value).get(LIST_ELEMENT_FIELD_NAME);
67+
}
68+
return unwrappedValues;
69+
}
70+
71+
static Map<String, Object> unwrapMapKeyValues(Map<String, Object> map) {
72+
if (map.size() != 1 || !map.containsKey(MAP_ENTRIES_FIELD_NAME)) {
73+
return map;
74+
}
75+
76+
Object entries = map.get(MAP_ENTRIES_FIELD_NAME);
77+
Object[] entryArray = toEntryArray(entries);
78+
if (entryArray == null || entryArray.length == 0) {
79+
return new LinkedHashMap<>();
80+
}
81+
82+
Map<String, Object> unwrappedMap = new LinkedHashMap<>();
83+
for (Object entry : entryArray) {
84+
if (!(entry instanceof Map)) {
85+
return map;
86+
}
87+
Map<?, ?> entryMap = (Map<?, ?>) entry;
88+
if (!entryMap.containsKey(MAP_KEY_FIELD_NAME) || !entryMap.containsKey(MAP_VALUE_FIELD_NAME)) {
89+
return map;
90+
}
91+
Object key = entryMap.get(MAP_KEY_FIELD_NAME);
92+
if (key == null) {
93+
return map;
94+
}
95+
unwrappedMap.put(key.toString(), entryMap.get(MAP_VALUE_FIELD_NAME));
96+
}
97+
return unwrappedMap;
98+
}
99+
100+
private static Object[] toEntryArray(Object entries) {
101+
if (entries instanceof Object[]) {
102+
return (Object[]) entries;
103+
}
104+
if (entries instanceof Collection) {
105+
return ((Collection<?>) entries).toArray();
106+
}
107+
if (entries != null) {
108+
return new Object[]{entries};
109+
}
110+
return null;
111+
}
112+
}

0 commit comments

Comments
 (0)