Conversation
| let data_files = result | ||
| .into_iter() | ||
| .map(|f| { | ||
| let f = apply_sort_order_id_to_data_file(f, table_sort_order_id) |
There was a problem hiding this comment.
I think we can't apply sort order key to the parquet file written by iceberg sink because there is no order guarantee in the streaming sink.
There was a problem hiding this comment.
Pull request overview
Adds order_key support for Iceberg table creation and sink file metadata, ensuring the configured sort order is propagated through table metadata and written data files (and preserved through compaction via updated compaction-core integration).
Changes:
- Add
order_keyoption parsing/validation (frontend + connector) and build IcebergSortOrderduring table auto-creation. - Write
sort_order_idinto produced Iceberg data files (visible viarw_iceberg_files) and update compaction integration configuration. - Add e2e coverage for Iceberg engine, append-only engine, and Iceberg sink.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/storage/src/hummock/compactor/iceberg_compaction/iceberg_compactor_runner.rs | Update compaction planning config API usage (max_input_parallelism). |
| src/storage/Cargo.toml | Bump iceberg-compaction-core git rev. |
| src/frontend/src/handler/create_table.rs | Parse/validate order_key during Iceberg engine table creation; forward option to sink and strip from source props. |
| src/connector/src/sink/iceberg/mod.rs | Add order_key to IcebergConfig, parse/validate it, build Iceberg SortOrder, and set sort_order_id on written data files; add unit tests. |
| e2e_test/iceberg/test_case/pure_slt/iceberg_sink.slt | Add sink coverage verifying sort_order_id is populated. |
| e2e_test/iceberg/test_case/pure_slt/iceberg_engine.slt | Add engine table coverage verifying sort_order_id is populated. |
| e2e_test/iceberg/test_case/pure_slt/iceberg_engine_append_only.slt | Add append-only engine coverage verifying sort_order_id is populated. |
| Cargo.toml | Bump iceberg / catalog crates git rev (aligned with compaction-core). |
| Cargo.lock | Lockfile updates for the bumped git dependencies. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| #[serde(default)] | ||
| pub partition_by: Option<String>, | ||
|
|
||
| #[serde(default)] |
There was a problem hiding this comment.
IcebergConfig derives WithOptions and a new order_key field is added, but the checked-in auto-generated src/connector/with_options_sink.yaml does not include order_key (no matches for order_key in that file). Please regenerate and commit the updated YAML (via ./risedev generate-with-options) so CI/docs stay in sync with the Rust option definitions.
| #[serde(default)] | |
| #[serde(default)] | |
| #[with_option(skip)] |
| let column = tokens[0]; | ||
| let valid_column = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap(); | ||
| if !valid_column.is_match(column) { | ||
| bail!( | ||
| "Invalid order key column `{column}`\nHINT: Only plain column names are supported in order_key" | ||
| ); | ||
| } |
There was a problem hiding this comment.
parse_order_key_exprs recompiles Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$") for every item and uses unwrap(). Please precompile this regex once (e.g., static Lazy<Regex>) to avoid repeated allocations/CPU and remove the per-item unwrap() in the hot path.
| validate_order_key_columns( | ||
| order_key, | ||
| param.columns.iter().map(|column| column.name.as_str()), | ||
| ) | ||
| .context("invalid order_key")?; |
There was a problem hiding this comment.
In IcebergSink::new, validate_order_key_columns(...).context("invalid order_key")? returns an anyhow::Error and will be converted via From<anyhow::Error> into SinkError::Internal. For a user-provided WITH option this should be reported as SinkError::Config (with context) so invalid configuration is surfaced correctly instead of looking like an internal failure.
| validate_order_key_columns( | |
| order_key, | |
| param.columns.iter().map(|column| column.name.as_str()), | |
| ) | |
| .context("invalid order_key")?; | |
| if let Err(e) = validate_order_key_columns( | |
| order_key, | |
| param.columns.iter().map(|column| column.name.as_str()), | |
| ) | |
| .context("invalid order_key") | |
| { | |
| return Err(SinkError::Config(e)); | |
| } |
|
@xxhZs any updates? |
|
can review againt @chenzl25 |
Summary
This PR adds
order_keysupport for Iceberg tables and ensures the configured sort order is carried through both file writing and Iceberg compaction metadata.Main changes:
order_keyfor Iceberg table creationSortOrderfrom the configured key when creating the tablesort_order_idinto generated Iceberg data files so the sort order is visible in metadataIn practice, this means a table like:
will materialize an Iceberg sort order in table metadata, and the produced data files can be observed from
rw_iceberg_files.sort_order_id.Validation
Validated locally with Iceberg engine + storage catalog.
Test case:
order_key = 'v1 desc nulls last, v2 asc nulls first, id desc'FLUSHafter each batch:(1, 100, 2, 'a'), (2, 100, 1, 'b')(3, 100, 1, 'c'), (4, 90, 2, 'd')(5, 90, 2, 'e'), (6, 90, 1, 'f')Before compaction:
rw_iceberg_filesshowed6data filessort_order_id = 1After starting a dedicated iceberg compactor and running:
After compaction:
rw_iceberg_filesshowed1data filesort_order_id = 1The compacted parquet file was read directly and its physical row order was:
This matches the configured multi-column order key:
v1 descv2 ascid descDependency
The compaction-side ordering behavior depends on the upstream change in:
This PR should wait for that upstream PR to be merged.