Conversation
| if (fut.failed()) { | ||
| if (attempts < max_client_retries) { | ||
| co_return co_await std::move(fut); | ||
| } | ||
| continue; |
There was a problem hiding this comment.
Bug (pre-existing, but relevant to this refactor): The exception-handling logic here appears inverted. When attempts < max_client_retries (retries remaining), the failed future is returned immediately (propagating the exception). When attempts >= max_client_retries (exhausted), continue retries. This means exceptions are never retried when we can retry, and always retried when we can't.
Should this be:
| if (fut.failed()) { | |
| if (attempts < max_client_retries) { | |
| co_return co_await std::move(fut); | |
| } | |
| continue; | |
| if (fut.failed()) { | |
| if (attempts >= max_client_retries) { | |
| co_return co_await std::move(fut); | |
| } | |
| continue; |
| } | ||
| co_return kafka::offset_cast(result.value().high_watermark); | ||
| } | ||
|
|
||
| ss::future<> rpc_transport::consume_range( | ||
| model::offset start, | ||
| model::offset end, | ||
| ss::noncopyable_function<ss::future<>(model::record_batch)> consumer) { | ||
| // The RPC consume API may not return all records in a single call, | ||
| // so loop until we've consumed up to the desired end offset. | ||
| constexpr size_t max_bytes = 1 << 20; // 1 MiB per fetch | ||
| auto current = start; | ||
| while (current < end) { | ||
| auto result = co_await _client.consume( | ||
| model::schema_registry_internal_tp, | ||
| offset_cast(current), | ||
| offset_cast(end), | ||
| 1, | ||
| max_bytes, | ||
| std::chrono::seconds(5)); | ||
| if (result.has_error()) { | ||
| throw_as_kafka_error("RPC consume failed", result.error()); | ||
| } | ||
| auto& reply = result.value(); |
There was a problem hiding this comment.
Concern: no overall timeout on consume loop. Each individual RPC call has a 5s timeout, but the outer while (current < end) loop has no overall deadline. If the partition keeps returning small batches that never advance current to end (e.g., due to a bug in offset calculation), or if end is set to a very high value, this could loop for a very long time.
Consider adding a deadline or iteration cap, e.g.:
constexpr int max_iterations = 1000;
int iterations = 0;
while (current < end && ++iterations <= max_iterations) {| , schema_registry_use_rpc( | ||
| *this, | ||
| "schema_registry_use_rpc", | ||
| "Produce schema registry messages using internal Redpanda RPCs. When " | ||
| "disabled, produce schema registry messages using a Kafka client " | ||
| "instead.", | ||
| {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, | ||
| true) | ||
| , cloud_storage_enabled( |
There was a problem hiding this comment.
Question: default true for a new feature? This means all existing deployments will switch to the RPC transport path on upgrade (assuming cluster version is sufficient). While the feature-gating on v26_1_1 in api.cc is a good safety net for mixed-version clusters, defaulting to true is aggressive for a brand new code path.
Consider defaulting to false for the initial release and switching to true once the RPC path has been validated in production. This gives operators an opt-in experience rather than a surprise behavioral change on upgrade.
| offset_cast(end), | ||
| 1, | ||
| max_bytes, | ||
| std::chrono::seconds(5)); |
There was a problem hiding this comment.
Edge case: If the RPC returns successfully but with an empty batches vector, this throws unconditionally. However, this could legitimately happen if the partition has no data in the requested range (e.g., after log truncation/compaction). Should this break out of the loop instead of throwing?
Also, since the retry logic in client.cc already handles not_leader/timeout for the consume path, returning 0 batches on success would indicate the data genuinely isn't there.
|
|
||
| leader_mitigating_retry_policy( | ||
| ss::abort_source& as, | ||
| partition_leader_cache& leaders, | ||
| model::topic_namespace_view tp_ns, | ||
| model::partition_id pid) |
There was a problem hiding this comment.
Nit: prepare() snapshots the leader term before the attempt, but the actual RPC call in do_produce_once/do_consume_once independently looks up the leader. There's an inherent race: leadership could change between prepare() and the RPC lookup. This means _stale_term might already be outdated before the attempt even starts, causing mitigate() to skip the wait (since the table already advanced).
In practice this is benign — the retry loop handles it — but a comment documenting this intended behavior would help future readers.
| const bool rpc_available | ||
| = _controller->get_feature_table().local().get_active_version() | ||
| >= features::to_cluster_version(features::release_version::v26_1_1); | ||
| if (!rpc_available) { | ||
| vlog( | ||
| srlog.warn, | ||
| "schema_registry_use_rpc enabled but cluster version too old. " | ||
| "Falling back to Kafka client. RPC mode will be available " | ||
| "on the next restart after all brokers are upgraded."); | ||
| return false; | ||
| } | ||
| return true; | ||
| }(); | ||
|
|
||
| if (use_rpc) { | ||
| vlog(srlog.info, "Schema registry in RPC mode"); |
There was a problem hiding this comment.
Nit: When _transport is still std::monostate and the kafka _client error callback fires during _client.start(), the monostate visitor rethrows the exception. This is fine for correctness but worth a brief comment explaining that _client shouldn't invoke the error callback during start(), or that the rethrow is the intended fallback behavior during the initialization window.
| // Count records before moving batches — needed to convert the | ||
| // last_offset returned by replicate() into a base_offset. | ||
| // Same arithmetic as kafka/server/handlers/produce.cc. | ||
| int32_t total_records = 0; | ||
| for (const auto& b : data.batches) { | ||
| total_records += b.record_count(); | ||
| } | ||
| auto result = co_await produce(ktp, std::move(data.batches), timeout); | ||
| auto ec = result.has_error() ? result.error() : cluster::errc::success; | ||
| co_return kafka_topic_data_result(data.tp, ec); | ||
| if (result.has_error()) { | ||
| co_return kafka_topic_data_result(data.tp, result.error()); | ||
| } | ||
| auto last_offset = result.value(); | ||
| auto base_offset = model::offset{last_offset() - (total_records - 1)}; | ||
| co_return kafka_topic_data_result( |
There was a problem hiding this comment.
Correctness check: The base_offset calculation last_offset() - (total_records - 1) assumes all records land in a single batch at contiguous offsets. This matches the Kafka produce handler's arithmetic, but if data.batches contains multiple batches, total_records sums all of them while last_offset is the offset of the very last record across all batches. This is correct as long as the batches are replicated atomically as a single group. Worth a brief assertion or comment that multi-batch produces yield contiguous offsets.
| } | ||
| // Subject was already deleted before our first attempt. | ||
| co_return co_await _store.get_versions(sub, include_deleted::yes); | ||
| } | ||
|
|
||
| // Grab the versions before they're gone. | ||
| auto versions = co_await _store.get_versions(sub, include_deleted::no); | ||
|
|
||
| // Inspect the subject to see if its already deleted | ||
| if (co_await _store.is_subject_deleted(sub)) { | ||
| co_return std::make_optional(std::move(versions)); | ||
| } | ||
| // Cache versions for potential retry — after a subject-level soft | ||
| // delete all versions are marked deleted and the pre-delete list | ||
| // cannot be reconstructed from the store. Tagged with subject so | ||
| // stale entries from a prior delete of a different subject are ignored. |
There was a problem hiding this comment.
Good fix for the retry-after-collision bug. One thought: the _delete_versions_cache is protected by _write_sem (single-writer serialization), so concurrent deletes of different subjects are properly serialized and the subject tag prevents stale entries. But the cache is never cleared on exceptions — if produce_and_apply throws, the cache persists to the next do_delete_subject_impermanent call. If that call is for a different subject, the tag check handles it correctly. Just confirming the design: the cache is intentionally sticky across exceptions, relying on the subject tag for staleness detection?
PR Review: SR on kafka RPCsSummaryThis PR introduces a polymorphic
The refactoring is well-structured — the transport interface is clean, the separation of concerns is good, and the leadership-aware retry policy ( Test coverage is solid: new C++ unit tests for both Issues FoundPotential bugs:
Robustness concerns: Minor/Nits: What looks good
|
There was a problem hiding this comment.
Pull request overview
Adds a new “transport” abstraction for Schema Registry internal topic I/O, enabling an internal Kafka data RPC-based path (no client auth overhead) alongside the existing kafka::client-based path, and updates tests to explicitly select transport mode.
Changes:
- Introduce
pandaproxy::schema_registry::transportwithrpc_transportandkafka_client_transportimplementations, wired through Schema Registryapi/service/seq_writer. - Extend kafka data RPC to return base/last offsets from produce, add leadership-mitigating retries, and add targeted unit tests.
- Update rptest coverage to explicitly set
schema_registry_use_rpcper test and add RPC-transport variants/stress coverage.
Reviewed changes
Copilot reviewed 44 out of 44 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/tls_version_test.py | Explicitly disables SR RPC transport for TLS version tests. |
| tests/rptest/tests/tls_metrics_test.py | Explicitly disables SR RPC transport for TLS metrics tests. |
| tests/rptest/tests/security_report_test.py | Forces SR RPC transport off for security-report related SR tests. |
| tests/rptest/tests/schema_registry_test.py | Requires explicit schema_registry_use_rpc, adds RPC variants + stress test. |
| tests/rptest/tests/rpk_registry_test.py | Disables SR RPC transport for rpk registry tests. |
| tests/rptest/tests/redpanda_oauth_test.py | Disables SR RPC transport for OAuth test cluster config. |
| tests/rptest/tests/metrics_reporter_test.py | Enables SR RPC transport for metrics reporter SR usage. |
| tests/rptest/tests/crl_test.py | Disables SR RPC transport for CRL-related TLS config. |
| tests/rptest/tests/cluster_linking_topic_syncing_test.py | Enables SR RPC transport for cluster-linking topic syncing test config. |
| tests/rptest/tests/audit_log_test.py | Disables SR RPC transport for audit log test setup. |
| tests/rptest/tests/admin_api_auth_test.py | Disables SR RPC transport for auto-auth admin API test setup. |
| src/v/redpanda/application_runtime.cc | Passes kafka data RPC client into Schema Registry API wiring. |
| src/v/pandaproxy/schema_registry/transport.h | Adds transport interface for SR internal topic I/O. |
| src/v/pandaproxy/schema_registry/test/utils.h | Adds noop_transport for seq_writer-related tests. |
| src/v/pandaproxy/schema_registry/test/rpc_transport_test.cc | Adds unit tests for SR rpc_transport behavior. |
| src/v/pandaproxy/schema_registry/test/consume_to_store.cc | Switches tests from dummy kafka client to noop transport. |
| src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc | Switches 3rd-party compatibility test to noop transport. |
| src/v/pandaproxy/schema_registry/test/BUILD | Adds deps for transport and new rpc_transport gtest. |
| src/v/pandaproxy/schema_registry/service.h | Replaces kafka client dependency with transport pointer; delegates mitigation/ephemeral checks. |
| src/v/pandaproxy/schema_registry/service.cc | Routes internal topic reads/writes via transport; adds startup retry loop for topic metadata race. |
| src/v/pandaproxy/schema_registry/seq_writer.h | Replaces kafka client with transport; adds delete-subject retry cache state. |
| src/v/pandaproxy/schema_registry/seq_writer.cc | Uses transport for HWM/consume/produce; implements delete-subject retry caching behavior. |
| src/v/pandaproxy/schema_registry/rpc_transport.h | Declares RPC-based SR transport using kafka data RPC client. |
| src/v/pandaproxy/schema_registry/rpc_transport.cc | Implements RPC-based SR internal topic produce/consume/HWM. |
| src/v/pandaproxy/schema_registry/kafka_client_transport.h | Introduces kafka::client-backed SR transport (legacy path). |
| src/v/pandaproxy/schema_registry/kafka_client_transport.cc | Implements legacy transport, including ephemeral-credential mitigation logic. |
| src/v/pandaproxy/schema_registry/fwd.h | Adds forward declarations for new transport types. |
| src/v/pandaproxy/schema_registry/BUILD | Adds build targets/deps for transport and implementations. |
| src/v/pandaproxy/schema_registry/api.h | Adds rpc client input + transport variant storage for SR API. |
| src/v/pandaproxy/schema_registry/api.cc | Selects RPC vs Kafka-client transport using config + feature table; wires transport into service/seq_writer. |
| src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc | Adds tests for produce_with_offset returning base/last offsets. |
| src/v/kafka/data/rpc/test/deps.h | Extends fake leader cache with leader-term + not-leader mitigation hooks. |
| src/v/kafka/data/rpc/service.h | Documents produce() return semantics for offsets. |
| src/v/kafka/data/rpc/service.cc | Returns base/last offsets in produce reply; computes base from last+record_count. |
| src/v/kafka/data/rpc/serde.h | Bumps kafka_topic_data_result serde version and adds optional base/last offsets. |
| src/v/kafka/data/rpc/serde.cc | Extends formatting to include optional base/last offsets. |
| src/v/kafka/data/rpc/deps.h | Extends leader cache interface with leader-term lookup and mitigation API. |
| src/v/kafka/data/rpc/deps.cc | Implements leadership-change wait mitigation via partition_leaders_table notifications. |
| src/v/kafka/data/rpc/client.h | Adds produce_with_offset and get_single_partition_offsets; adds retry_mitigating helpers. |
| src/v/kafka/data/rpc/client.cc | Adds leadership-mitigating retry policy and exposes new client methods. |
| src/v/kafka/data/rpc/BUILD | Adds expiring_promise dependency for mitigation implementation. |
| src/v/config/configuration.h | Adds schema_registry_use_rpc tunable property. |
| src/v/config/configuration.cc | Defines schema_registry_use_rpc property and help text (default true). |
| src/v/cluster_link/tests/deps.h | Updates fake leader cache to satisfy new leader-term/mitigation interface. |
| auto fut = co_await ss::coroutine::as_future<result_type>( | ||
| ss::futurize_invoke(func)); | ||
| backoff.next_backoff(); | ||
| if (fut.failed()) { | ||
| if (attempts < max_client_retries) { | ||
| co_return co_await std::move(fut); | ||
| } | ||
| continue; | ||
| } |
There was a problem hiding this comment.
In retry_with_backoff(), the exception path is inverted: when fut.failed() and attempts < max_client_retries, the code co_return co_await fut, which immediately rethrows and prevents any retries. Conversely, once attempts >= max_client_retries, it hits continue and can loop forever without mitigation/backoff. Suggestion: when fut.failed(), retry (optionally policy.mitigate(extract_errc/timeout)) while attempts < max_client_retries, and only rethrow/return on the final attempt.
| for (int attempts = 0;; ++attempts) { | ||
| auto fut = co_await ss::coroutine::as_future( | ||
| s.fetch_internal_topic()); | ||
| if (fut.available()) { |
There was a problem hiding this comment.
The retry loop in service::do_start() will never retry as written: auto fut = co_await ss::coroutine::as_future(s.fetch_internal_topic()); waits for completion, so fut.available() will always be true and the function will co_return even when fetch_internal_topic threw. This also bypasses the intended exception inspection in the block below. Suggestion: check fut.failed() (or !fut.failed()) instead of available(), and only co_return on success; on failure, inspect/rethrow and sleep before retrying.
| if (fut.available()) { | |
| if (!fut.failed()) { |
| throw kafka::exception( | ||
| kafka::error_code::unknown_server_error, "No records returned"); |
There was a problem hiding this comment.
rpc_transport::consume_range() throws an unknown_server_error when reply.batches is empty. The underlying local_service::consume can legitimately return an empty batch vector for a valid range (e.g., no records available in [start,end)), and throwing here can break schema registry startup reads or cause spurious failures under race/compaction. Suggestion: treat an empty response as end-of-stream (e.g., set current=end / break) or implement a bounded retry/backoff, rather than throwing.
| throw kafka::exception( | |
| kafka::error_code::unknown_server_error, "No records returned"); | |
| // Treat an empty response as end-of-stream rather than an error. | |
| current = end; | |
| break; |
| # Start 2 reader threads and 1 writer thread | ||
| threads = [] | ||
| for _ in range(2): | ||
| t = threading.Thread(target=reader_worker, daemon=True) | ||
| t.start() | ||
| threads.append(t) | ||
| t = threading.Thread(target=writer_worker, daemon=True) | ||
| t.start() | ||
| threads.append(t) |
There was a problem hiding this comment.
In SchemaRegistryRpcTransportStressTest, worker threads are started as daemon threads and then joined with a 30s timeout, but each SR request defaults to a 60s timeout (SchemaRegistryRedpandaClient.request). This can let threads continue running past teardown if they’re blocked in a request, which can make the test flaky and cause background traffic after the assertion. Consider making the threads non-daemon and joining without timeout (or using a shorter per-request timeout / checking thread liveness after join).
529dcbe to
8852936
Compare
|
/ci-repeat 1 |
|
replace with #30062 |
|
/ci-repeat 1 |
|
/ci-repeat 1 |
Retry command for Build#82720please wait until all jobs are finished before running the slash command |
do_delete_subject_impermanent had a latent ordering bug: get_versions(include_deleted::no) was called BEFORE is_subject_deleted(), so on retry after a write collision where the subject was already soft-deleted (by the winning writer), get_versions would throw subject_not_found — which propagated as HTTP 404 to the client. Fix by checking is_subject_deleted first. A cached version list (via lw_shared_ptr) preserves the pre-delete version list across retries, since after a subject-level soft delete all versions are marked deleted and the store can no longer distinguish individually-deleted versions from those deleted as part of the subject-level operation. This bug was never observed on the kafka::client transport because the full Kafka protocol round-trip (SASL, request queuing, acks) acts as a natural pacer between different nodes' writes, making write collisions rare. The RPC transport's lower latency tightens the timing window enough to trigger collisions routinely in integration tests. Co-Authored-By: Claude Opus 4.6 <[email protected]> Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Returns the term of the currently cached leader. Signed-off-by: Oren Leiman <[email protected]>
Allows clients to proactively clear the leaders cache entry for some ntp and wait for the metadata dissemination service to repopulate it. This is particularly useful for servicing RPC-based read operations that might have a tighter deadline upstream (e.g. an HTTP GET). Signed-off-by: Oren Leiman <[email protected]>
Also adds partition_operation_failed to list of retriable errors Signed-off-by: Oren Leiman <[email protected]>
8852936 to
5f0fbab
Compare
|
/ci-repeat 1 |
Retry command for Build#82735please wait until all jobs are finished before running the slash command |
tries to refresh the leader cache on not_leader error. Useful for situations where calling code is latency sensitive, e.g. servicing a schema registry request. Signed-off-by: Oren Leiman <[email protected]>
Required for usage in SR Signed-off-by: Oren Leiman <[email protected]>
Reliable version that tries to mitigate not_leader errors Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Some of this might be better placed under k/d/rpc/tests, but this is the specific functionality required by schema registry itself. Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
And plumb into seq_writer & service. Selected at runtime, dependent on use_rpc config and cluster version. Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
SchemaRegistryEndpoints now asserts that every subclass explicitly sets schema_registry_use_rpc. RPC-transport-enabled tests: - SchemaRegistryModeNotMutableTest - SchemaRegistryModeMutableTest - SchemaRegistryContextRpcTransportTest - SchemaRegistryBasicAuthRpcTransportTest - SchemaRegistryRpcTransportTest - SchemaRegistryAutoAuthTest - SchemaRegistryConfluentClient - SchemaRegistryCompatibilityModes - SchemaRegistryACLTest - SchemaRegistryContextAuthzRpcTransportTest - SchemaRegistryRpcTransportStressTest - ClusterLinkingSchemaRegistry - SchemaRegistryContextMetricsTest Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
kafka/data/rpc & pp/schema_registry
5f0fbab to
2bda853
Compare
|
/ci-repeat 3 |
Retry command for Build#82745please wait until all jobs are finished before running the slash command |
28bfbd9 to
fdad016
Compare
|
ci-repeat 3 |
|
/ci-repeat 3 |
Retry command for Build#82751please wait until all jobs are finished before running the slash command |
fdad016 to
6daa586
Compare
|
/ci-repeat 1 |
.
Backports Required
Release Notes
Improvements