Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions e2e_test/sink/postgres_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,64 @@ DROP SINK rw_types_to_pg_types;
statement ok
DROP TABLE rw_types_subset_table;

################### Test upsert sink when all sink columns are primary keys

system ok
PGDATABASE=sink_test psql -c "DROP TABLE IF EXISTS pg_pk_only_table"

system ok
PGDATABASE=sink_test psql -c "CREATE TABLE pg_pk_only_table (
user_id BIGINT,
client_id BIGINT,
PRIMARY KEY (user_id, client_id)
)"

system ok
PGDATABASE=sink_test psql -c "INSERT INTO pg_pk_only_table VALUES (1, 10)"

statement ok
CREATE TABLE rw_pk_only_table (
user_id BIGINT,
client_id BIGINT,
PRIMARY KEY (user_id, client_id)
);

statement ok
CREATE SINK rw_pk_only_sink FROM rw_pk_only_table WITH (
connector='postgres',
host='$PGHOST',
port='$PGPORT',
user='$PGUSER',
password='$PGPASSWORD',
database='sink_test',
table='pg_pk_only_table',
type='upsert',
primary_key='user_id, client_id',
);

statement ok
INSERT INTO rw_pk_only_table VALUES
(1, 10),
(2, 20);

statement ok
flush;

query II retry 4 backoff 1s
select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '${PGPASSWORD:postgres}', 'sink_test', 'select * from pg_pk_only_table order by user_id, client_id;');
----
1 10
2 20

statement ok
DROP SINK rw_pk_only_sink;

statement ok
DROP TABLE rw_pk_only_table;

system ok
PGDATABASE=sink_test psql -c "DROP TABLE pg_pk_only_table"

################### Drop DB

system ok
Expand Down
36 changes: 35 additions & 1 deletion src/connector/src/sink/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,11 @@ fn create_upsert_sql(
})
.collect_vec()
.join(", ");
format!("{insert_sql} on conflict ({pk_columns}) do update set {update_parameters}")
if update_parameters.is_empty() {
format!("{insert_sql} on conflict ({pk_columns}) do nothing")
} else {
format!("{insert_sql} on conflict ({pk_columns}) do update set {update_parameters}")
}
}

/// Quote an identifier for PostgreSQL.
Expand Down Expand Up @@ -718,4 +722,34 @@ mod tests {
]],
);
}

#[test]
fn test_create_upsert_sql_all_columns_are_primary_keys() {
let schema = Schema::new(vec![
Field {
data_type: DataType::Int32,
name: "user_id".to_owned(),
},
Field {
data_type: DataType::Int32,
name: "client_id".to_owned(),
},
]);
let schema_name = "test_schema";
let table_name = "test_table";
let pk_indices_lookup = HashSet::from_iter([0, 1]);
let sql = create_upsert_sql(
&schema,
schema_name,
table_name,
&[0, 1],
&pk_indices_lookup,
);
check(
sql,
expect![[
r#"INSERT INTO "test_schema"."test_table" ("user_id", "client_id") VALUES ($1, $2) on conflict ("user_id", "client_id") do nothing"#
]],
);
}
}
Loading