Skip to content
3 changes: 3 additions & 0 deletions src/v/cloud_io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,18 @@ redpanda_cc_library(
"cache_probe.cc",
"cache_service.cc",
"recursive_directory_walker.cc",
"staging_file.cc",
],
hdrs = [
"access_time_tracker.h",
"cache_probe.h",
"cache_service.h",
"recursive_directory_walker.h",
"staging_file.h",
],
implementation_deps = [
":logger",
"//src/v/bytes:iobuf",
"//src/v/bytes:iobuf_parser",
"//src/v/container:chunked_vector",
"//src/v/serde",
Expand Down
9 changes: 9 additions & 0 deletions src/v/cloud_io/basic_cache_service_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ std::ostream& operator<<(std::ostream& o, cache_element_status s) {
return o;
}

template<class Clock>
void basic_space_reservation_guard<Clock>::merge(
basic_space_reservation_guard&& other) noexcept {
_bytes += other._bytes;
_objects += other._objects;
other._bytes = 0;
other._objects = 0;
}

template<class Clock>
void basic_space_reservation_guard<Clock>::wrote_data(
uint64_t written_bytes, size_t written_objects) {
Expand Down
4 changes: 4 additions & 0 deletions src/v/cloud_io/basic_cache_service_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class basic_space_reservation_guard {

~basic_space_reservation_guard();

/// Merges another guard's reservation into this one. The other guard is
/// left empty (zero bytes/objects).
void merge(basic_space_reservation_guard&& other) noexcept;

/// After completing the write operation that this space reservation
/// protected, indicate how many bytes were really written: this is used to
/// atomically update cache usage stats to free the reservation and update
Expand Down
186 changes: 159 additions & 27 deletions src/v/cloud_io/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,18 @@ std::optional<std::chrono::milliseconds> cache::get_trim_delay() const {

ss::future<> cache::trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
std::optional<size_t> object_limit_override,
std::optional<ss::lowres_clock::time_point> deadline) {
// If we trimmed very recently then do not do it immediately:
// this reduces load and improves chance of currently promoted
// segments finishing their read work before we demote their
// data from cache.
auto trim_delay = get_trim_delay();

if (trim_delay.has_value()) {
if (deadline && ss::lowres_clock::now() + *trim_delay > *deadline) {
throw ss::timed_out_error();
}
vlog(
log.info,
"Cache trimming throttled, waiting {}ms",
Expand Down Expand Up @@ -828,9 +832,13 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) {
continue;
}

// Unlike the fast trim, we *do not* skip .tmp files. This is to handle
// the case where we have some abandoned tmp files, and have hit the
// exhaustive trim because they are occupying too much space.
if (
std::string_view(file_stat.path)
.ends_with(cache_tmp_file_extension)) {
result.trim_missed_tmp_files = true;
continue;
}

try {
co_await delete_file_and_empty_parents(file_stat.path);
_access_time_tracker.remove(file_stat.path);
Expand All @@ -851,23 +859,6 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) {
} catch (const ss::gate_closed_exception&) {
// We are shutting down, stop iterating and propagate
throw;
} catch (const std::filesystem::filesystem_error& e) {
if (likely(file_stat.path.ends_with(cache_tmp_file_extension))) {
// In exhaustive scan we might hit a .part file and get ENOENT,
// this is expected behavior occasionally.
result.trim_missed_tmp_files = true;
vlog(
log.info,
"trim: couldn't delete temp file {}: {}.",
file_stat.path,
e.what());
} else {
vlog(
log.error,
"trim: couldn't delete {}: {}.",
file_stat.path,
e.what());
}
} catch (const std::exception& e) {
vlog(
log.error,
Expand Down Expand Up @@ -1419,15 +1410,26 @@ ss::future<> cache::_invalidate(const std::filesystem::path& key) {

ss::future<space_reservation_guard>
cache::reserve_space(uint64_t bytes, size_t objects) {
return reserve_space(bytes, objects, std::nullopt);
}

ss::future<space_reservation_guard> cache::reserve_space(
uint64_t bytes,
size_t objects,
std::optional<ss::lowres_clock::time_point> deadline) {
while (_block_puts) {
vlog(
log.warn,
"Blocking tiered storage cache write, disk space critically low.");
co_await _block_puts_cond.wait();
if (deadline) {
co_await _block_puts_cond.wait(*deadline);
} else {
co_await _block_puts_cond.wait();
}
}

co_await container().invoke_on(0, [bytes, objects](cache& c) {
return c.do_reserve_space(bytes, objects);
co_await container().invoke_on(0, [bytes, objects, deadline](cache& c) {
return c.do_reserve_space(bytes, objects, deadline);
});

vlog(
Expand Down Expand Up @@ -1684,9 +1686,18 @@ void cache::maybe_background_trim() {
}
}

ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
ss::future<> cache::do_reserve_space(
uint64_t bytes,
size_t objects,
std::optional<ss::lowres_clock::time_point> deadline) {
vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0");

auto check_deadline = [&] {
if (deadline && ss::lowres_clock::now() >= *deadline) {
throw ss::timed_out_error();
}
};

maybe_background_trim();

if (may_reserve_space(bytes, objects)) {
Expand All @@ -1708,7 +1719,19 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
_reservations_pending,
_reservations_pending_objects);

auto units = co_await ss::get_units(_cleanup_sm, 1);
check_deadline();

auto get_cleanup_units = [&]() {
if (deadline) {
auto now = ss::lowres_clock::now();
auto remaining = *deadline > now
? *deadline - now
: ss::lowres_clock::duration::zero();
return ss::get_units(_cleanup_sm, 1, remaining);
}
return ss::get_units(_cleanup_sm, 1);
};
auto units = co_await get_cleanup_units();

// Situation may change after a scheduling point. Another fiber could
// trigger carryover trim which released some resources. Exit early in this
Expand Down Expand Up @@ -1780,6 +1803,8 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
_reservations_pending_objects += objects;

while (!may_reserve_space(bytes, objects)) {
check_deadline();

bool may_exceed = may_exceed_limits(bytes, objects)
&& _last_trim_failed;
bool may_trim_now = !get_trim_delay().has_value();
Expand All @@ -1799,7 +1824,8 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
// After taking lock, there still isn't space: means someone
// else didn't take it and free space for us already, so we will
// do the trim.
co_await trim_throttled_unlocked();
co_await trim_throttled_unlocked(
std::nullopt, std::nullopt, deadline);
did_trim = true;
} else {
vlog(
Expand Down Expand Up @@ -2010,4 +2036,110 @@ cache::validate_cache_config(const config::configuration& conf) {
return std::nullopt;
}

ss::future<std::filesystem::path>
cache::create_staging_path(std::filesystem::path key) {
auto guard = _gate.hold();

auto key_path = _cache_dir / key;
auto filename = key_path.filename();
auto dir_path = key_path;
dir_path.remove_filename();

// TODO: share code with put().
auto tmp_filename = std::filesystem::path(
ss::format(
"{}_{}_{}{}",
filename.native(),
ss::this_shard_id(),
(++_cnt),
cache_tmp_file_extension));
auto tmp_filepath = dir_path / tmp_filename;
Comment on lines +2043 to +2056
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_staging_path() builds key_path using _cache_dir / key without the path normalization / containment check used by put(). If key is absolute or contains .., this can create .part files outside the cache directory (path traversal). Validate/normalize key (and reject out-of-cache paths) similarly to put() before using it to construct filesystem paths.

Copilot uses AI. Check for mistakes.
if (!co_await ss::file_exists(dir_path.string())) {
co_await ss::recursive_touch_directory(dir_path.string());
}

co_return tmp_filepath;
}

ss::future<staging_file> cache::create_staging_file(
std::filesystem::path key, staging_file_options opts) {
auto gate_holder = _gate.hold();
auto staging_path = co_await create_staging_path(key);
auto open_fut = co_await ss::coroutine::as_future(
ss::open_file_dma(
staging_path.native(),
ss::open_flags::create | ss::open_flags::rw
| ss::open_flags::exclusive));
if (open_fut.failed()) {
co_await ss::remove_file(staging_path.native())
.then_wrapped([](ss::future<> f) { f.ignore_ready_future(); });
std::rethrow_exception(open_fut.get_exception());
}
auto stream_fut = co_await ss::coroutine::as_future(
ss::make_file_output_stream(std::move(open_fut.get())));
if (stream_fut.failed()) {
co_await ss::remove_file(staging_path.native())
.then_wrapped([](ss::future<> f) { f.ignore_ready_future(); });
std::rethrow_exception(stream_fut.get_exception());
}
co_return staging_file{
*this,
std::move(gate_holder),
std::move(key),
std::move(staging_path),
std::move(stream_fut.get()),
opts};
}

ss::future<> cache::commit_staging_file(
std::filesystem::path key,
std::filesystem::path staging_path,
space_reservation_guard reservation) {
auto guard = _gate.hold();

auto dest_path = _cache_dir / key;
vlog(
log.debug, "Committing staging file {} to {}", staging_path, dest_path);

// We use link() rather than O_EXCL create + rename because a crash between
// the O_EXCL create and the rename would leave an empty file at the
// destination: on restart the cache would see a zero-byte entry for this
// key. With link(), a crash between link and unlink leaves the real data
// at the destination.
auto link_fut = co_await ss::coroutine::as_future(
ss::link_file(staging_path.native(), dest_path.native()));
if (link_fut.failed()) {
Comment on lines +2098 to +2111
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit_staging_file() uses dest_path = _cache_dir / key without validating/normalizing key. A malicious or buggy caller could commit (link) arbitrary paths outside the cache directory. Apply the same containment validation as put() (and consider rejecting absolute/parent-relative keys) before linking into place.

Copilot uses AI. Check for mistakes.
auto ex = link_fut.get_exception();
try {
std::rethrow_exception(ex);
} catch (const std::system_error& e) {
if (e.code() != std::errc::file_exists) {
throw;
}
}
// Someone else already wrote to the destination. Treat this as a
// success, but presume that the one who won the race already did the
// reservation accounting.
co_await ss::remove_file(staging_path.native());
co_return;
}
co_await ss::remove_file(staging_path.native());

auto file_size = co_await ss::file_size(dest_path.native());
reservation.wrote_data(file_size, 1);

Comment on lines +2125 to +2130
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After link_file() succeeds, commit_staging_file() immediately remove_file(staging_path) and only then updates reservation/accounting. If the unlink throws, the destination file is already committed but the reservation/accounting update is skipped, leaving cache usage undercounted and the key present. Consider making staging cleanup best-effort (ignore/log failures) and ensure reservation.wrote_data(...) / access_time_tracker updates still run once the destination exists.

Copilot uses AI. Check for mistakes.
auto source = dest_path.native();
if (ss::this_shard_id() == 0) {
_access_time_tracker.add(
source, std::chrono::system_clock::now(), file_size);
} else {
ssx::spawn_with_gate(_gate, [this, source, file_size] {
return container().invoke_on(0, [source, file_size](cache& c) {
c._access_time_tracker.add(
source, std::chrono::system_clock::now(), file_size);
});
});
}
}

} // namespace cloud_io
33 changes: 30 additions & 3 deletions src/v/cloud_io/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
#include "cloud_io/basic_cache_service_api.h"
#include "cloud_io/cache_probe.h"
#include "cloud_io/recursive_directory_walker.h"
#include "cloud_io/staging_file.h"
#include "config/configuration.h"
#include "config/property.h"
#include "ssx/semaphore.h"
#include "storage/disk.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
Expand Down Expand Up @@ -167,6 +169,24 @@ class cache
return _cache_dir / key;
}

/// Create a staging_file handle for the given cache key.
ss::future<staging_file>
create_staging_file(std::filesystem::path key, staging_file_options opts);

/// Commit a staging file into the cache. Atomically claims the
/// destination via link(), then removes the staging .part file and
/// updates the reservation to record actual usage.
ss::future<> commit_staging_file(
std::filesystem::path key,
std::filesystem::path staging_path,
space_reservation_guard reservation);

/// Reserve cache space with an optional deadline for blocking waits.
ss::future<space_reservation_guard> reserve_space(
uint64_t bytes,
size_t objects,
std::optional<ss::lowres_clock::time_point> deadline);

// Checks if a cluster configuration is valid for the properties
// `cloud_storage_cache_size` and `cloud_storage_cache_size_percent`.
// Two cases are invalid: 1. the case in which both are 0, 2. the case in
Expand All @@ -178,6 +198,9 @@ class cache
validate_cache_config(const config::configuration& conf);

private:
ss::future<std::filesystem::path>
create_staging_path(std::filesystem::path key);

/// Load access time tracker from file
ss::future<> load_access_time_tracker();

Expand Down Expand Up @@ -230,10 +253,13 @@ class cache
/// rate limit.
std::optional<std::chrono::milliseconds> get_trim_delay() const;

/// Invoke trim, waiting if not enough time passed since the last trim
/// Invoke trim, waiting if not enough time passed since the last trim.
/// If a deadline is provided and the throttle delay would exceed it,
/// throws ss::timed_out_error rather than skipping the rate limit.
ss::future<> trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);
std::optional<size_t> object_limit_override = std::nullopt,
std::optional<ss::lowres_clock::time_point> deadline = std::nullopt);

// Take the cleanup semaphore before calling trim_throttled
ss::future<> trim_throttled(
Expand Down Expand Up @@ -261,7 +287,8 @@ class cache

/// Block until enough space is available to commit to a reservation
/// (only runs on shard 0)
ss::future<> do_reserve_space(uint64_t, size_t);
ss::future<> do_reserve_space(
uint64_t, size_t, std::optional<ss::lowres_clock::time_point> deadline);

/// Trim cache using results from the previous recursive directory walk
ss::future<trim_result>
Expand Down
Loading
Loading