Skip to content

Commit cf239bf

Browse files
committed
NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Array, and Single JSON input formats with size-based batching
1 parent 1d2c25f commit cf239bf

8 files changed

Lines changed: 1435 additions & 161 deletions

File tree

nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java

Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.nifi.elasticsearch;
1919

20+
import java.nio.charset.StandardCharsets;
2021
import java.util.List;
2122
import java.util.Map;
23+
import java.util.Objects;
2224

2325
/**
2426
* A POJO that represents an "operation on an index". It should not be confused with just indexing documents, as it
@@ -30,25 +32,102 @@ public class IndexOperationRequest {
3032
private final String type;
3133
private final String id;
3234
private final Map<String, Object> fields;
35+
private final byte[] rawJsonBytes;
3336
private final Operation operation;
3437
private final Map<String, Object> script;
35-
3638
private final boolean scriptedUpsert;
3739
private final Map<String, Object> dynamicTemplates;
3840
private final Map<String, String> headerFields;
3941

40-
public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields,
41-
final Operation operation, final Map<String, Object> script, final boolean scriptedUpsert,
42-
final Map<String, Object> dynamicTemplates, final Map<String, String> headerFields) {
43-
this.index = index;
44-
this.type = type;
45-
this.id = id;
46-
this.fields = fields;
47-
this.operation = operation;
48-
this.script = script;
49-
this.scriptedUpsert = scriptedUpsert;
50-
this.dynamicTemplates = dynamicTemplates;
51-
this.headerFields = headerFields;
42+
private IndexOperationRequest(final Builder builder) {
43+
this.index = builder.index;
44+
this.type = builder.type;
45+
this.id = builder.id;
46+
this.fields = builder.fields;
47+
this.rawJsonBytes = builder.rawJsonBytes;
48+
this.operation = builder.operation;
49+
this.script = builder.script;
50+
this.scriptedUpsert = builder.scriptedUpsert;
51+
this.dynamicTemplates = builder.dynamicTemplates;
52+
this.headerFields = builder.headerFields;
53+
}
54+
55+
public static Builder builder() {
56+
return new Builder();
57+
}
58+
59+
public static class Builder {
60+
private String index;
61+
private String type;
62+
private String id;
63+
private Map<String, Object> fields;
64+
private byte[] rawJsonBytes;
65+
private Operation operation;
66+
private Map<String, Object> script;
67+
private boolean scriptedUpsert;
68+
private Map<String, Object> dynamicTemplates;
69+
private Map<String, String> headerFields;
70+
71+
public Builder index(final String index) {
72+
this.index = index;
73+
return this;
74+
}
75+
76+
public Builder type(final String type) {
77+
this.type = type;
78+
return this;
79+
}
80+
81+
public Builder id(final String id) {
82+
this.id = id;
83+
return this;
84+
}
85+
86+
public Builder fields(final Map<String, Object> fields) {
87+
this.fields = fields;
88+
return this;
89+
}
90+
91+
public Builder rawJson(final String rawJson) {
92+
this.rawJsonBytes = rawJson != null ? rawJson.getBytes(StandardCharsets.UTF_8) : null;
93+
return this;
94+
}
95+
96+
public Builder rawJsonBytes(final byte[] rawJsonBytes) {
97+
this.rawJsonBytes = rawJsonBytes;
98+
return this;
99+
}
100+
101+
public Builder operation(final Operation operation) {
102+
this.operation = operation;
103+
return this;
104+
}
105+
106+
public Builder script(final Map<String, Object> script) {
107+
this.script = script;
108+
return this;
109+
}
110+
111+
public Builder scriptedUpsert(final boolean scriptedUpsert) {
112+
this.scriptedUpsert = scriptedUpsert;
113+
return this;
114+
}
115+
116+
public Builder dynamicTemplates(final Map<String, Object> dynamicTemplates) {
117+
this.dynamicTemplates = dynamicTemplates;
118+
return this;
119+
}
120+
121+
public Builder headerFields(final Map<String, String> headerFields) {
122+
this.headerFields = headerFields;
123+
return this;
124+
}
125+
126+
public IndexOperationRequest build() {
127+
Objects.requireNonNull(index, "Index required");
128+
Objects.requireNonNull(operation, "Operation required");
129+
return new IndexOperationRequest(this);
130+
}
52131
}
53132

54133
public String getIndex() {
@@ -67,6 +146,10 @@ public Map<String, Object> getFields() {
67146
return fields;
68147
}
69148

149+
public byte[] getRawJsonBytes() {
150+
return rawJsonBytes;
151+
}
152+
70153
public Operation getOperation() {
71154
return operation;
72155
}

nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.http.auth.AuthScope;
2828
import org.apache.http.auth.UsernamePasswordCredentials;
2929
import org.apache.http.client.CredentialsProvider;
30+
import org.apache.http.entity.ByteArrayEntity;
3031
import org.apache.http.entity.ContentType;
3132
import org.apache.http.impl.client.BasicCredentialsProvider;
3233
import org.apache.http.message.BasicHeader;
@@ -68,6 +69,7 @@
6869
import java.io.ByteArrayOutputStream;
6970
import java.io.IOException;
7071
import java.io.InputStream;
72+
import java.io.OutputStream;
7173
import java.net.MalformedURLException;
7274
import java.net.Proxy;
7375
import java.net.URI;
@@ -624,6 +626,19 @@ public IndexOperationResponse add(final IndexOperationRequest operation, final E
624626
return bulk(Collections.singletonList(operation), elasticsearchRequestOptions);
625627
}
626628

629+
/**
630+
* ByteArrayOutputStream subclass that exposes the internal buffer without a defensive copy,
631+
* allowing zero-copy construction of ByteArrayEntity.
632+
*/
633+
private static final class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
634+
byte[] buf() {
635+
return buf;
636+
}
637+
int count() {
638+
return count;
639+
}
640+
}
641+
627642
private String flatten(final String str) {
628643
return str.replaceAll("[\\n\\r]", "\\\\n");
629644
}
@@ -656,13 +671,18 @@ private String buildBulkHeader(final String operation, final String index, final
656671
return flatten(mapper.writeValueAsString(Collections.singletonMap(operation, operationBody)));
657672
}
658673

659-
protected void buildRequest(final IndexOperationRequest request, final StringBuilder builder) throws JsonProcessingException {
674+
protected void buildRequest(final IndexOperationRequest request, final OutputStream out) throws IOException {
660675
final String header = buildBulkHeader(request);
661-
builder.append(header).append("\n");
676+
out.write(header.getBytes(StandardCharsets.UTF_8));
677+
out.write('\n');
662678
switch (request.getOperation()) {
663679
case Index, Create:
664-
final String indexDocument = mapper.writeValueAsString(request.getFields());
665-
builder.append(indexDocument).append("\n");
680+
if (request.getRawJsonBytes() != null) {
681+
out.write(request.getRawJsonBytes());
682+
} else {
683+
mapper.writeValue(out, request.getFields());
684+
}
685+
out.write('\n');
666686
break;
667687
case Update, Upsert:
668688
final Map<String, Object> updateBody = new HashMap<>(2, 1);
@@ -678,9 +698,9 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui
678698
updateBody.put("doc_as_upsert", true);
679699
}
680700
}
681-
682701
final String update = flatten(mapper.writeValueAsString(updateBody)).trim();
683-
builder.append(update).append("\n");
702+
out.write(update.getBytes(StandardCharsets.UTF_8));
703+
out.write('\n');
684704
break;
685705
case Delete:
686706
// nothing to do for Delete operations, it just needs the header
@@ -691,15 +711,15 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui
691711
@Override
692712
public IndexOperationResponse bulk(final List<IndexOperationRequest> operations, final ElasticsearchRequestOptions elasticsearchRequestOptions) {
693713
try {
694-
final StringBuilder payload = new StringBuilder();
714+
final ExposedByteArrayOutputStream payload = new ExposedByteArrayOutputStream();
695715
for (final IndexOperationRequest or : operations) {
696716
buildRequest(or, payload);
697717
}
698718

699719
if (getLogger().isDebugEnabled()) {
700-
getLogger().debug("{}", payload);
720+
getLogger().debug("{}", payload.toString(StandardCharsets.UTF_8));
701721
}
702-
final HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON);
722+
final HttpEntity entity = new ByteArrayEntity(payload.buf(), 0, payload.count(), ContentType.APPLICATION_JSON);
703723
final StopWatch watch = new StopWatch();
704724
watch.start();
705725
final Response response = performRequest("POST", "/_bulk", elasticsearchRequestOptions, entity);

0 commit comments

Comments
 (0)