Skip to content

Commit 5ea49a5

Browse files
committed
Add Oracle UPSERT
1 parent ec0383c commit 5ea49a5

File tree

4 files changed

+273
-0
lines changed

4 files changed

+273
-0
lines changed
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright © 2026 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.oracle;
18+
19+
import io.cdap.plugin.db.sink.ETLDBOutputFormat;
20+
21+
/**
22+
* Class that extends {@link ETLDBOutputFormat} to implement the abstract methods
23+
*/
24+
public class OracleETLDBOutputFormat extends ETLDBOutputFormat {
25+
26+
/**
27+
* This method is used to construct the upsert query for Oracle using MERGE statement.
28+
* Example - MERGE INTO my_table target
29+
* USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source
30+
* ON (target.id = source.id)
31+
* WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age
32+
* WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age)
33+
* @param table - Name of the table
34+
* @param fieldNames - All the columns of the table
35+
* @param listKeys - The columns used as keys for matching
36+
* @return Upsert query in the form of string
37+
*/
38+
@Override
39+
public String constructUpsertQuery(String table, String[] fieldNames, String[] listKeys) {
40+
if (listKeys == null) {
41+
throw new IllegalArgumentException("Column names to be updated should not be null");
42+
} else if (fieldNames == null) {
43+
throw new IllegalArgumentException("Field names should not be null");
44+
} else {
45+
StringBuilder query = new StringBuilder();
46+
47+
// MERGE INTO target_table target
48+
query.append("MERGE INTO ").append(table).append(" target ");
49+
50+
// USING (SELECT ? AS col1, ? AS col2, ... FROM dual) source
51+
query.append("USING (SELECT ");
52+
for (int i = 0; i < fieldNames.length; ++i) {
53+
query.append("? AS ").append(fieldNames[i]);
54+
if (i != fieldNames.length - 1) {
55+
query.append(", ");
56+
}
57+
}
58+
query.append(" FROM dual) source ");
59+
60+
// ON (target.key1 = source.key1 AND target.key2 = source.key2 ...)
61+
query.append("ON (");
62+
for (int i = 0; i < listKeys.length; ++i) {
63+
query.append("target.").append(listKeys[i]).append(" = source.").append(listKeys[i]);
64+
if (i != listKeys.length - 1) {
65+
query.append(" AND ");
66+
}
67+
}
68+
query.append(") ");
69+
70+
// WHEN MATCHED THEN UPDATE SET target.col1 = source.col1, target.col2 = source.col2 ...
71+
// Only update non-key columns
72+
query.append("WHEN MATCHED THEN UPDATE SET ");
73+
boolean firstUpdateColumn = true;
74+
for (int i = 0; i < fieldNames.length; ++i) {
75+
// Skip key columns in the UPDATE SET clause
76+
boolean isKeyColumn = false;
77+
for (String listKey : listKeys) {
78+
if (listKey.equals(fieldNames[i])) {
79+
isKeyColumn = true;
80+
break;
81+
}
82+
}
83+
if (!isKeyColumn) {
84+
if (!firstUpdateColumn) {
85+
query.append(", ");
86+
}
87+
query.append("target.").append(fieldNames[i]).append(" = source.").append(fieldNames[i]);
88+
firstUpdateColumn = false;
89+
}
90+
}
91+
92+
// WHEN NOT MATCHED THEN INSERT (col1, col2, ...) VALUES (source.col1, source.col2, ...)
93+
query.append(" WHEN NOT MATCHED THEN INSERT (");
94+
for (int i = 0; i < fieldNames.length; ++i) {
95+
query.append(fieldNames[i]);
96+
if (i != fieldNames.length - 1) {
97+
query.append(", ");
98+
}
99+
}
100+
query.append(") VALUES (");
101+
for (int i = 0; i < fieldNames.length; ++i) {
102+
query.append("source.").append(fieldNames[i]);
103+
if (i != fieldNames.length - 1) {
104+
query.append(", ");
105+
}
106+
}
107+
query.append(")");
108+
109+
return query.toString();
110+
}
111+
}
112+
}

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.cdap.cdap.api.annotation.MetadataProperty;
2424
import io.cdap.cdap.api.annotation.Name;
2525
import io.cdap.cdap.api.annotation.Plugin;
26+
import io.cdap.cdap.api.data.batch.Output;
2627
import io.cdap.cdap.api.data.format.StructuredRecord;
2728
import io.cdap.cdap.etl.api.FailureCollector;
2829
import io.cdap.cdap.etl.api.batch.BatchSink;
@@ -31,6 +32,7 @@
3132
import io.cdap.plugin.common.Asset;
3233
import io.cdap.plugin.common.ConfigUtil;
3334
import io.cdap.plugin.common.LineageRecorder;
35+
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
3436
import io.cdap.plugin.common.db.DBErrorDetailsProvider;
3537
import io.cdap.plugin.db.DBRecord;
3638
import io.cdap.plugin.db.SchemaReader;
@@ -72,6 +74,13 @@ protected FieldsValidator getFieldsValidator() {
7274
protected SchemaReader getSchemaReader() {
7375
return new OracleSinkSchemaReader();
7476
}
77+
78+
@Override
79+
protected void addOutputContext(BatchSinkContext context) {
80+
context.addOutput(Output.of(oracleSinkConfig.getReferenceName(),
81+
new SinkOutputFormatProvider(OracleETLDBOutputFormat.class, getConfiguration())));
82+
}
83+
7584
@Override
7685
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
7786
String fqn = DBUtils.constructFQN("oracle",
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright © 2026 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.oracle;
18+
19+
import org.junit.Assert;
20+
import org.junit.Test;
21+
22+
public class OracleETLDBOutputFormatTest {
23+
24+
private final OracleETLDBOutputFormat outputFormat = new OracleETLDBOutputFormat();
25+
26+
@Test
27+
public void testConstructUpsertQueryBasic() {
28+
String[] fieldNames = {"id", "name", "age"};
29+
String[] listKeys = {"id"};
30+
String table = "my_table";
31+
32+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
33+
34+
String expected = "MERGE INTO my_table target " +
35+
"USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source " +
36+
"ON (target.id = source.id) " +
37+
"WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age " +
38+
"WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age)";
39+
40+
Assert.assertEquals(expected, result);
41+
}
42+
43+
@Test
44+
public void testConstructUpsertQueryMultipleKeys() {
45+
String[] fieldNames = {"id", "code", "name", "value"};
46+
String[] listKeys = {"id", "code"};
47+
String table = "composite_key_table";
48+
49+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
50+
51+
String expected = "MERGE INTO composite_key_table target "
52+
+ "USING (SELECT ? AS id, ? AS code, ? AS name, ? AS value FROM dual) source "
53+
+ "ON (target.id = source.id AND target.code = source.code) "
54+
+ "WHEN MATCHED THEN UPDATE SET target.name = source.name, target.value = source.value "
55+
+ "WHEN NOT MATCHED THEN INSERT (id, code, name, value) VALUES (source.id, source.code, source.name, source"
56+
+ ".value)";
57+
58+
Assert.assertEquals(expected, result);
59+
}
60+
61+
@Test
62+
public void testConstructUpsertQuerySingleField() {
63+
String[] fieldNames = {"id", "name"};
64+
String[] listKeys = {"id"};
65+
String table = "single_field_update_table";
66+
67+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
68+
69+
String expected = "MERGE INTO single_field_update_table target " +
70+
"USING (SELECT ? AS id, ? AS name FROM dual) source " +
71+
"ON (target.id = source.id) " +
72+
"WHEN MATCHED THEN UPDATE SET target.name = source.name " +
73+
"WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)";
74+
75+
Assert.assertEquals(expected, result);
76+
}
77+
78+
@Test(expected = IllegalArgumentException.class)
79+
public void testConstructUpsertQueryNullListKeys() {
80+
String[] fieldNames = {"id", "name", "age"};
81+
String table = "my_table";
82+
83+
outputFormat.constructUpsertQuery(table, fieldNames, null);
84+
}
85+
86+
@Test(expected = IllegalArgumentException.class)
87+
public void testConstructUpsertQueryNullFieldNames() {
88+
String[] listKeys = {"id"};
89+
String table = "my_table";
90+
91+
outputFormat.constructUpsertQuery(table, null, listKeys);
92+
}
93+
94+
@Test
95+
public void testConstructUpsertQueryAllFieldsAreKeys() {
96+
String[] fieldNames = {"id", "code"};
97+
String[] listKeys = {"id", "code"};
98+
String table = "all_keys_table";
99+
100+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
101+
102+
// When all fields are keys, the UPDATE SET clause will be empty after "SET "
103+
// Note: There's an extra space before "WHEN NOT MATCHED" due to implementation
104+
String expected = "MERGE INTO all_keys_table target " +
105+
"USING (SELECT ? AS id, ? AS code FROM dual) source " +
106+
"ON (target.id = source.id AND target.code = source.code) " +
107+
"WHEN MATCHED THEN UPDATE SET " +
108+
"WHEN NOT MATCHED THEN INSERT (id, code) VALUES (source.id, source.code)";
109+
110+
Assert.assertEquals(expected, result);
111+
}
112+
113+
@Test
114+
public void testConstructUpsertQueryWithSpecialTableName() {
115+
String[] fieldNames = {"id", "name"};
116+
String[] listKeys = {"id"};
117+
String table = "SCHEMA.MY_TABLE";
118+
119+
String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys);
120+
121+
String expected = "MERGE INTO SCHEMA.MY_TABLE target " +
122+
"USING (SELECT ? AS id, ? AS name FROM dual) source " +
123+
"ON (target.id = source.id) " +
124+
"WHEN MATCHED THEN UPDATE SET target.name = source.name " +
125+
"WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)";
126+
127+
Assert.assertEquals(expected, result);
128+
}
129+
}

oracle-plugin/widgets/Oracle-batchsink.json

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,29 @@
192192
"label": "Schema Name",
193193
"name": "dbSchemaName"
194194
},
195+
{
196+
"widget-type": "radio-group",
197+
"label": "Operation Name",
198+
"name": "operationName",
199+
"widget-attributes": {
200+
"default": "insert",
201+
"layout": "inline",
202+
"options": [
203+
{
204+
"id": "insert",
205+
"label": "INSERT"
206+
},
207+
{
208+
"id": "update",
209+
"label": "UPDATE"
210+
},
211+
{
212+
"id": "upsert",
213+
"label": "UPSERT"
214+
}
215+
]
216+
}
217+
},
195218
{
196219
"widget-type": "hidden",
197220
"label": "Operation Name",

0 commit comments

Comments
 (0)