Skip to content

Commit 35b567d

Browse files
committed
Add Oracle UPSERT
1 parent ec0383c commit 35b567d

File tree

7 files changed

+308
-9
lines changed

7 files changed

+308
-9
lines changed

database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,7 @@ protected void upsertOperation(PreparedStatement stmt) throws SQLException {
314314
}
315315

316316
private boolean fillUpdateParams(List<String> updatedKeyList, ColumnType columnType) {
317-
if (operationName.equals(Operation.UPDATE) && updatedKeyList.contains(columnType.getName())) {
318-
return true;
319-
}
320-
return false;
317+
return operationName.equals(Operation.UPDATE) && updatedKeyList.contains(columnType.getName());
321318
}
322319

323320
private Schema getNonNullableSchema(Schema.Field field) {
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright © 2026 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.oracle;
18+
19+
import io.cdap.plugin.db.sink.ETLDBOutputFormat;
20+
21+
/**
22+
* Class that extends {@link ETLDBOutputFormat} to implement the abstract methods
23+
*/
24+
public class OracleETLDBOutputFormat extends ETLDBOutputFormat {
25+
26+
/**
27+
* This method is used to construct the upsert query for Oracle using MERGE statement.
28+
* Example - MERGE INTO my_table target
29+
* USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source
30+
* ON (target.id = source.id)
31+
* WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age
32+
* WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age)
33+
* @param table - Name of the table
34+
* @param fieldNames - All the columns of the table
35+
* @param listKeys - The columns used as keys for matching
36+
* @return Upsert query in the form of string
37+
*/
38+
@Override
39+
public String constructUpsertQuery(String table, String[] fieldNames, String[] listKeys) {
40+
if (listKeys == null) {
41+
throw new IllegalArgumentException(
42+
"'Relation Table Key' must be specified for upsert operations. " +
43+
"Please provide the list of key columns used to match records in the target table.");
44+
} else if (fieldNames == null) {
45+
throw new IllegalArgumentException(
46+
"'Field Names' must be specified for upsert operations. " +
47+
"Please provide the list of columns to be written to the target table.");
48+
} else {
49+
StringBuilder query = new StringBuilder();
50+
51+
// MERGE INTO target_table target
52+
query.append("MERGE INTO ").append(table).append(" target ");
53+
54+
// USING (SELECT ? AS col1, ? AS col2, ... FROM dual) source
55+
query.append("USING (SELECT ");
56+
for (int i = 0; i < fieldNames.length; ++i) {
57+
query.append("? AS ").append(fieldNames[i]);
58+
if (i != fieldNames.length - 1) {
59+
query.append(", ");
60+
}
61+
}
62+
query.append(" FROM dual) source ");
63+
64+
// ON (target.key1 = source.key1 AND target.key2 = source.key2 ...)
65+
query.append("ON (");
66+
for (int i = 0; i < listKeys.length; ++i) {
67+
query.append("target.").append(listKeys[i]).append(" = source.").append(listKeys[i]);
68+
if (i != listKeys.length - 1) {
69+
query.append(" AND ");
70+
}
71+
}
72+
query.append(") ");
73+
74+
// WHEN MATCHED THEN UPDATE SET target.col1 = source.col1, target.col2 = source.col2 ...
75+
// Only update non-key columns
76+
query.append("WHEN MATCHED THEN UPDATE SET ");
77+
boolean firstUpdateColumn = true;
78+
for (String fieldName : fieldNames) {
79+
boolean isKeyColumn = false;
80+
for (String listKey : listKeys) {
81+
String listKeyNoQuote = listKey.replace("\"", "");
82+
if (listKeyNoQuote.equals(fieldName)) {
83+
isKeyColumn = true;
84+
break;
85+
}
86+
}
87+
if (!isKeyColumn) {
88+
if (!firstUpdateColumn) {
89+
query.append(", ");
90+
}
91+
query.append("target.").append(fieldName).append(" = source.").append(fieldName);
92+
firstUpdateColumn = false;
93+
}
94+
}
95+
96+
// WHEN NOT MATCHED THEN INSERT (col1, col2, ...) VALUES (source.col1, source.col2, ...)
97+
query.append(" WHEN NOT MATCHED THEN INSERT (");
98+
for (int i = 0; i < fieldNames.length; ++i) {
99+
query.append(fieldNames[i]);
100+
if (i != fieldNames.length - 1) {
101+
query.append(", ");
102+
}
103+
}
104+
query.append(") VALUES (");
105+
for (int i = 0; i < fieldNames.length; ++i) {
106+
query.append("source.").append(fieldNames[i]);
107+
if (i != fieldNames.length - 1) {
108+
query.append(", ");
109+
}
110+
}
111+
query.append(")");
112+
113+
return query.toString();
114+
}
115+
}
116+
117+
@Override
118+
public String constructUpdateQuery(String table, String[] fieldNames, String[] listKeys) {
119+
// Oracle JDBC does not accept a trailing semicolon in prepared statements.
120+
String query = super.constructUpdateQuery(table, fieldNames, listKeys);
121+
if (query.endsWith(";")) {
122+
return query.substring(0, query.length() - 1);
123+
}
124+
return query;
125+
}
126+
}

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.cdap.cdap.api.annotation.MetadataProperty;
2424
import io.cdap.cdap.api.annotation.Name;
2525
import io.cdap.cdap.api.annotation.Plugin;
26+
import io.cdap.cdap.api.data.batch.Output;
2627
import io.cdap.cdap.api.data.format.StructuredRecord;
2728
import io.cdap.cdap.etl.api.FailureCollector;
2829
import io.cdap.cdap.etl.api.batch.BatchSink;
@@ -31,6 +32,7 @@
3132
import io.cdap.plugin.common.Asset;
3233
import io.cdap.plugin.common.ConfigUtil;
3334
import io.cdap.plugin.common.LineageRecorder;
35+
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
3436
import io.cdap.plugin.common.db.DBErrorDetailsProvider;
3537
import io.cdap.plugin.db.DBRecord;
3638
import io.cdap.plugin.db.SchemaReader;
@@ -60,7 +62,8 @@ public OracleSink(OracleSinkConfig oracleSinkConfig) {
6062

6163
@Override
6264
protected DBRecord getDBRecord(StructuredRecord output) {
63-
return new OracleSinkDBRecord(output, columnTypes);
65+
return new OracleSinkDBRecord(output, columnTypes, oracleSinkConfig.getOperationName(),
66+
oracleSinkConfig.getRelationTableKey());
6467
}
6568

6669
@Override
@@ -72,6 +75,13 @@ protected FieldsValidator getFieldsValidator() {
7275
protected SchemaReader getSchemaReader() {
7376
return new OracleSinkSchemaReader();
7477
}
78+
79+
@Override
80+
protected void addOutputContext(BatchSinkContext context) {
81+
context.addOutput(Output.of(oracleSinkConfig.getReferenceName(),
82+
new SinkOutputFormatProvider(OracleETLDBOutputFormat.class, getConfiguration())));
83+
}
84+
7585
@Override
7686
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
7787
String fqn = DBUtils.constructFQN("oracle",

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.cdap.cdap.api.data.format.StructuredRecord;
2020
import io.cdap.cdap.api.data.schema.Schema;
2121
import io.cdap.plugin.db.ColumnType;
22+
import io.cdap.plugin.db.Operation;
2223
import io.cdap.plugin.db.SchemaReader;
2324

2425
import java.sql.PreparedStatement;
@@ -31,9 +32,12 @@
3132
*/
3233
public class OracleSinkDBRecord extends OracleSourceDBRecord {
3334

34-
public OracleSinkDBRecord(StructuredRecord record, List<ColumnType> columnTypes) {
35+
public OracleSinkDBRecord(StructuredRecord record, List<ColumnType> columnTypes, Operation operationName,
36+
String relationTableKey) {
3537
this.record = record;
3638
this.columnTypes = columnTypes;
39+
this.operationName = operationName;
40+
this.relationTableKey = relationTableKey;
3741
}
3842

3943
@Override
@@ -50,4 +54,13 @@ protected void insertOperation(PreparedStatement stmt) throws SQLException {
5054
writeToDB(stmt, field, fieldIndex);
5155
}
5256
}
57+
58+
@Override
59+
protected void upsertOperation(PreparedStatement stmt) throws SQLException {
60+
for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) {
61+
ColumnType columnType = columnTypes.get(fieldIndex);
62+
Schema.Field field = record.getSchema().getField(columnType.getName());
63+
writeToDB(stmt, field, fieldIndex);
64+
}
65+
}
5366
}

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
116116
@Override
117117
protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
118118
String fieldName, int fieldIndex) throws SQLException {
119-
int sqlType = columnTypes.get(fieldIndex).getType();
120119
int sqlIndex = fieldIndex + 1;
120+
int sqlType = modifiableColumnTypes.get(fieldIndex).getType();
121121

122122
// TIMESTAMP and TIMESTAMPTZ types needs to be handled using the specific oracle types to ensure that the data
123123
// inserted matches with the provided value. As Oracle driver internally alters the values provided
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright © 2026 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.oracle;
18+
19+
import org.junit.Assert;
20+
import org.junit.Test;
21+
22+
public class OracleETLDBOutputFormatTest {
23+
24+
private final OracleETLDBOutputFormat outputFormat = new OracleETLDBOutputFormat();
25+
26+
@Test
27+
public void testConstructUpsertQueryBasic() {
28+
String[] fieldNames = {"id", "name", "age"};
29+
String[] listKeys = {"id"};
30+
String table = "my_table";
31+
32+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
33+
34+
String expected = "MERGE INTO my_table target " +
35+
"USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source " +
36+
"ON (target.id = source.id) " +
37+
"WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age " +
38+
"WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age)";
39+
40+
Assert.assertEquals(expected, result);
41+
}
42+
43+
@Test
44+
public void testConstructUpsertQueryMultipleKeys() {
45+
String[] fieldNames = {"id", "code", "name", "value"};
46+
String[] listKeys = {"id", "code"};
47+
String table = "composite_key_table";
48+
49+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
50+
51+
String expected = "MERGE INTO composite_key_table target "
52+
+ "USING (SELECT ? AS id, ? AS code, ? AS name, ? AS value FROM dual) source "
53+
+ "ON (target.id = source.id AND target.code = source.code) "
54+
+ "WHEN MATCHED THEN UPDATE SET target.name = source.name, target.value = source.value "
55+
+ "WHEN NOT MATCHED THEN INSERT (id, code, name, value) VALUES (source.id, source.code, source.name, source"
56+
+ ".value)";
57+
58+
Assert.assertEquals(expected, result);
59+
}
60+
61+
@Test
62+
public void testConstructUpsertQuerySingleField() {
63+
String[] fieldNames = {"id", "name"};
64+
String[] listKeys = {"id"};
65+
String table = "single_field_update_table";
66+
67+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
68+
69+
String expected = "MERGE INTO single_field_update_table target " +
70+
"USING (SELECT ? AS id, ? AS name FROM dual) source " +
71+
"ON (target.id = source.id) " +
72+
"WHEN MATCHED THEN UPDATE SET target.name = source.name " +
73+
"WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)";
74+
75+
Assert.assertEquals(expected, result);
76+
}
77+
78+
@Test(expected = IllegalArgumentException.class)
79+
public void testConstructUpsertQueryNullListKeys() {
80+
String[] fieldNames = {"id", "name", "age"};
81+
String table = "my_table";
82+
83+
outputFormat.constructUpsertQuery(table, fieldNames, null);
84+
}
85+
86+
@Test(expected = IllegalArgumentException.class)
87+
public void testConstructUpsertQueryNullFieldNames() {
88+
String[] listKeys = {"id"};
89+
String table = "my_table";
90+
91+
outputFormat.constructUpsertQuery(table, null, listKeys);
92+
}
93+
94+
@Test
95+
public void testConstructUpsertQueryAllFieldsAreKeys() {
96+
String[] fieldNames = {"id", "code"};
97+
String[] listKeys = {"id", "code"};
98+
String table = "all_keys_table";
99+
100+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
101+
102+
// When all fields are keys, the UPDATE SET clause will be empty after "SET "
103+
// Note: There's an extra space before "WHEN NOT MATCHED" due to implementation
104+
String expected = "MERGE INTO all_keys_table target " +
105+
"USING (SELECT ? AS id, ? AS code FROM dual) source " +
106+
"ON (target.id = source.id AND target.code = source.code) " +
107+
"WHEN MATCHED THEN UPDATE SET " +
108+
"WHEN NOT MATCHED THEN INSERT (id, code) VALUES (source.id, source.code)";
109+
110+
Assert.assertEquals(expected, result);
111+
}
112+
113+
@Test
114+
public void testConstructUpsertQueryWithSpecialTableName() {
115+
String[] fieldNames = {"id", "name"};
116+
String[] listKeys = {"id"};
117+
String table = "SCHEMA.MY_TABLE";
118+
119+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
120+
121+
String expected = "MERGE INTO SCHEMA.MY_TABLE target " +
122+
"USING (SELECT ? AS id, ? AS name FROM dual) source " +
123+
"ON (target.id = source.id) " +
124+
"WHEN MATCHED THEN UPDATE SET target.name = source.name " +
125+
"WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)";
126+
127+
Assert.assertEquals(expected, result);
128+
}
129+
}

oracle-plugin/widgets/Oracle-batchsink.json

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,29 @@
192192
"label": "Schema Name",
193193
"name": "dbSchemaName"
194194
},
195+
{
196+
"widget-type": "radio-group",
197+
"label": "Operation Name",
198+
"name": "operationName",
199+
"widget-attributes": {
200+
"default": "insert",
201+
"layout": "inline",
202+
"options": [
203+
{
204+
"id": "insert",
205+
"label": "INSERT"
206+
},
207+
{
208+
"id": "update",
209+
"label": "UPDATE"
210+
},
211+
{
212+
"id": "upsert",
213+
"label": "UPSERT"
214+
}
215+
]
216+
}
217+
},
195218
{
196219
"widget-type": "hidden",
197220
"label": "Operation Name",
@@ -201,9 +224,10 @@
201224
}
202225
},
203226
{
204-
"widget-type": "hidden",
227+
"name": "relationTableKey",
228+
"widget-type": "csv",
205229
"label": "Table Key",
206-
"name": "relationTableKey"
230+
"widget-attributes": {}
207231
}
208232
]
209233
},

0 commit comments

Comments
 (0)