diff --git a/src/v/cloud_io/BUILD b/src/v/cloud_io/BUILD index 7996d79d7de61..3d7cd386a19e8 100644 --- a/src/v/cloud_io/BUILD +++ b/src/v/cloud_io/BUILD @@ -148,15 +148,18 @@ redpanda_cc_library( "cache_probe.cc", "cache_service.cc", "recursive_directory_walker.cc", + "staging_file.cc", ], hdrs = [ "access_time_tracker.h", "cache_probe.h", "cache_service.h", "recursive_directory_walker.h", + "staging_file.h", ], implementation_deps = [ ":logger", + "//src/v/bytes:iobuf", "//src/v/bytes:iobuf_parser", "//src/v/container:chunked_vector", "//src/v/serde", diff --git a/src/v/cloud_io/basic_cache_service_api.cc b/src/v/cloud_io/basic_cache_service_api.cc index b08e4d2ae4b69..6f102cb722843 100644 --- a/src/v/cloud_io/basic_cache_service_api.cc +++ b/src/v/cloud_io/basic_cache_service_api.cc @@ -30,6 +30,15 @@ std::ostream& operator<<(std::ostream& o, cache_element_status s) { return o; } +template +void basic_space_reservation_guard::merge( + basic_space_reservation_guard&& other) noexcept { + _bytes += other._bytes; + _objects += other._objects; + other._bytes = 0; + other._objects = 0; +} + template void basic_space_reservation_guard::wrote_data( uint64_t written_bytes, size_t written_objects) { diff --git a/src/v/cloud_io/basic_cache_service_api.h b/src/v/cloud_io/basic_cache_service_api.h index 2acbc7659ea7b..a1a810ed1e658 100644 --- a/src/v/cloud_io/basic_cache_service_api.h +++ b/src/v/cloud_io/basic_cache_service_api.h @@ -75,6 +75,10 @@ class basic_space_reservation_guard { ~basic_space_reservation_guard(); + /// Merges another guard's reservation into this one. The other guard is + /// left empty (zero bytes/objects). + void merge(basic_space_reservation_guard&& other) noexcept; + /// After completing the write operation that this space reservation /// protected, indicate how many bytes were really written: this is used to /// atomically update cache usage stats to free the reservation and update diff --git a/src/v/cloud_io/cache_service.cc b/src/v/cloud_io/cache_service.cc index 59bf8920db914..61a6055af9043 100644 --- a/src/v/cloud_io/cache_service.cc +++ b/src/v/cloud_io/cache_service.cc @@ -269,7 +269,8 @@ std::optional cache::get_trim_delay() const { ss::future<> cache::trim_throttled_unlocked( std::optional size_limit_override, - std::optional object_limit_override) { + std::optional object_limit_override, + std::optional deadline) { // If we trimmed very recently then do not do it immediately: // this reduces load and improves chance of currently promoted // segments finishing their read work before we demote their @@ -277,6 +278,9 @@ ss::future<> cache::trim_throttled_unlocked( auto trim_delay = get_trim_delay(); if (trim_delay.has_value()) { + if (deadline && ss::lowres_clock::now() + *trim_delay > *deadline) { + throw ss::timed_out_error(); + } vlog( log.info, "Cache trimming throttled, waiting {}ms", @@ -828,9 +832,13 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { continue; } - // Unlike the fast trim, we *do not* skip .tmp files. This is to handle - // the case where we have some abandoned tmp files, and have hit the - // exhaustive trim because they are occupying too much space. + if ( + std::string_view(file_stat.path) + .ends_with(cache_tmp_file_extension)) { + result.trim_missed_tmp_files = true; + continue; + } + try { co_await delete_file_and_empty_parents(file_stat.path); _access_time_tracker.remove(file_stat.path); @@ -851,23 +859,6 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { } catch (const ss::gate_closed_exception&) { // We are shutting down, stop iterating and propagate throw; - } catch (const std::filesystem::filesystem_error& e) { - if (likely(file_stat.path.ends_with(cache_tmp_file_extension))) { - // In exhaustive scan we might hit a .part file and get ENOENT, - // this is expected behavior occasionally. - result.trim_missed_tmp_files = true; - vlog( - log.info, - "trim: couldn't delete temp file {}: {}.", - file_stat.path, - e.what()); - } else { - vlog( - log.error, - "trim: couldn't delete {}: {}.", - file_stat.path, - e.what()); - } } catch (const std::exception& e) { vlog( log.error, @@ -1419,15 +1410,26 @@ ss::future<> cache::_invalidate(const std::filesystem::path& key) { ss::future cache::reserve_space(uint64_t bytes, size_t objects) { + return reserve_space(bytes, objects, std::nullopt); +} + +ss::future cache::reserve_space( + uint64_t bytes, + size_t objects, + std::optional deadline) { while (_block_puts) { vlog( log.warn, "Blocking tiered storage cache write, disk space critically low."); - co_await _block_puts_cond.wait(); + if (deadline) { + co_await _block_puts_cond.wait(*deadline); + } else { + co_await _block_puts_cond.wait(); + } } - co_await container().invoke_on(0, [bytes, objects](cache& c) { - return c.do_reserve_space(bytes, objects); + co_await container().invoke_on(0, [bytes, objects, deadline](cache& c) { + return c.do_reserve_space(bytes, objects, deadline); }); vlog( @@ -1684,9 +1686,18 @@ void cache::maybe_background_trim() { } } -ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { +ss::future<> cache::do_reserve_space( + uint64_t bytes, + size_t objects, + std::optional deadline) { vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0"); + auto check_deadline = [&] { + if (deadline && ss::lowres_clock::now() >= *deadline) { + throw ss::timed_out_error(); + } + }; + maybe_background_trim(); if (may_reserve_space(bytes, objects)) { @@ -1708,7 +1719,19 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { _reservations_pending, _reservations_pending_objects); - auto units = co_await ss::get_units(_cleanup_sm, 1); + check_deadline(); + + auto get_cleanup_units = [&]() { + if (deadline) { + auto now = ss::lowres_clock::now(); + auto remaining = *deadline > now + ? *deadline - now + : ss::lowres_clock::duration::zero(); + return ss::get_units(_cleanup_sm, 1, remaining); + } + return ss::get_units(_cleanup_sm, 1); + }; + auto units = co_await get_cleanup_units(); // Situation may change after a scheduling point. Another fiber could // trigger carryover trim which released some resources. Exit early in this @@ -1780,6 +1803,8 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { _reservations_pending_objects += objects; while (!may_reserve_space(bytes, objects)) { + check_deadline(); + bool may_exceed = may_exceed_limits(bytes, objects) && _last_trim_failed; bool may_trim_now = !get_trim_delay().has_value(); @@ -1799,7 +1824,8 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { // After taking lock, there still isn't space: means someone // else didn't take it and free space for us already, so we will // do the trim. - co_await trim_throttled_unlocked(); + co_await trim_throttled_unlocked( + std::nullopt, std::nullopt, deadline); did_trim = true; } else { vlog( @@ -2010,4 +2036,110 @@ cache::validate_cache_config(const config::configuration& conf) { return std::nullopt; } +ss::future +cache::create_staging_path(std::filesystem::path key) { + auto guard = _gate.hold(); + + auto key_path = _cache_dir / key; + auto filename = key_path.filename(); + auto dir_path = key_path; + dir_path.remove_filename(); + + // TODO: share code with put(). + auto tmp_filename = std::filesystem::path( + ss::format( + "{}_{}_{}{}", + filename.native(), + ss::this_shard_id(), + (++_cnt), + cache_tmp_file_extension)); + auto tmp_filepath = dir_path / tmp_filename; + if (!co_await ss::file_exists(dir_path.string())) { + co_await ss::recursive_touch_directory(dir_path.string()); + } + + co_return tmp_filepath; +} + +ss::future cache::create_staging_file( + std::filesystem::path key, staging_file_options opts) { + auto gate_holder = _gate.hold(); + auto staging_path = co_await create_staging_path(key); + auto open_fut = co_await ss::coroutine::as_future( + ss::open_file_dma( + staging_path.native(), + ss::open_flags::create | ss::open_flags::rw + | ss::open_flags::exclusive)); + if (open_fut.failed()) { + co_await ss::remove_file(staging_path.native()) + .then_wrapped([](ss::future<> f) { f.ignore_ready_future(); }); + std::rethrow_exception(open_fut.get_exception()); + } + auto stream_fut = co_await ss::coroutine::as_future( + ss::make_file_output_stream(std::move(open_fut.get()))); + if (stream_fut.failed()) { + co_await ss::remove_file(staging_path.native()) + .then_wrapped([](ss::future<> f) { f.ignore_ready_future(); }); + std::rethrow_exception(stream_fut.get_exception()); + } + co_return staging_file{ + *this, + std::move(gate_holder), + std::move(key), + std::move(staging_path), + std::move(stream_fut.get()), + opts}; +} + +ss::future<> cache::commit_staging_file( + std::filesystem::path key, + std::filesystem::path staging_path, + space_reservation_guard reservation) { + auto guard = _gate.hold(); + + auto dest_path = _cache_dir / key; + vlog( + log.debug, "Committing staging file {} to {}", staging_path, dest_path); + + // We use link() rather than O_EXCL create + rename because a crash between + // the O_EXCL create and the rename would leave an empty file at the + // destination: on restart the cache would see a zero-byte entry for this + // key. With link(), a crash between link and unlink leaves the real data + // at the destination. + auto link_fut = co_await ss::coroutine::as_future( + ss::link_file(staging_path.native(), dest_path.native())); + if (link_fut.failed()) { + auto ex = link_fut.get_exception(); + try { + std::rethrow_exception(ex); + } catch (const std::system_error& e) { + if (e.code() != std::errc::file_exists) { + throw; + } + } + // Someone else already wrote to the destination. Treat this as a + // success, but presume that the one who won the race already did the + // reservation accounting. + co_await ss::remove_file(staging_path.native()); + co_return; + } + co_await ss::remove_file(staging_path.native()); + + auto file_size = co_await ss::file_size(dest_path.native()); + reservation.wrote_data(file_size, 1); + + auto source = dest_path.native(); + if (ss::this_shard_id() == 0) { + _access_time_tracker.add( + source, std::chrono::system_clock::now(), file_size); + } else { + ssx::spawn_with_gate(_gate, [this, source, file_size] { + return container().invoke_on(0, [source, file_size](cache& c) { + c._access_time_tracker.add( + source, std::chrono::system_clock::now(), file_size); + }); + }); + } +} + } // namespace cloud_io diff --git a/src/v/cloud_io/cache_service.h b/src/v/cloud_io/cache_service.h index 993c9584a63bb..be59727674422 100644 --- a/src/v/cloud_io/cache_service.h +++ b/src/v/cloud_io/cache_service.h @@ -15,11 +15,13 @@ #include "cloud_io/basic_cache_service_api.h" #include "cloud_io/cache_probe.h" #include "cloud_io/recursive_directory_walker.h" +#include "cloud_io/staging_file.h" #include "config/configuration.h" #include "config/property.h" #include "ssx/semaphore.h" #include "storage/disk.h" +#include #include #include #include @@ -167,6 +169,24 @@ class cache return _cache_dir / key; } + /// Create a staging_file handle for the given cache key. + ss::future + create_staging_file(std::filesystem::path key, staging_file_options opts); + + /// Commit a staging file into the cache. Atomically claims the + /// destination via link(), then removes the staging .part file and + /// updates the reservation to record actual usage. + ss::future<> commit_staging_file( + std::filesystem::path key, + std::filesystem::path staging_path, + space_reservation_guard reservation); + + /// Reserve cache space with an optional deadline for blocking waits. + ss::future reserve_space( + uint64_t bytes, + size_t objects, + std::optional deadline); + // Checks if a cluster configuration is valid for the properties // `cloud_storage_cache_size` and `cloud_storage_cache_size_percent`. // Two cases are invalid: 1. the case in which both are 0, 2. the case in @@ -178,6 +198,9 @@ class cache validate_cache_config(const config::configuration& conf); private: + ss::future + create_staging_path(std::filesystem::path key); + /// Load access time tracker from file ss::future<> load_access_time_tracker(); @@ -230,10 +253,13 @@ class cache /// rate limit. std::optional get_trim_delay() const; - /// Invoke trim, waiting if not enough time passed since the last trim + /// Invoke trim, waiting if not enough time passed since the last trim. + /// If a deadline is provided and the throttle delay would exceed it, + /// throws ss::timed_out_error rather than skipping the rate limit. ss::future<> trim_throttled_unlocked( std::optional size_limit_override = std::nullopt, - std::optional object_limit_override = std::nullopt); + std::optional object_limit_override = std::nullopt, + std::optional deadline = std::nullopt); // Take the cleanup semaphore before calling trim_throttled ss::future<> trim_throttled( @@ -261,7 +287,8 @@ class cache /// Block until enough space is available to commit to a reservation /// (only runs on shard 0) - ss::future<> do_reserve_space(uint64_t, size_t); + ss::future<> do_reserve_space( + uint64_t, size_t, std::optional deadline); /// Trim cache using results from the previous recursive directory walk ss::future diff --git a/src/v/cloud_io/staging_file.cc b/src/v/cloud_io/staging_file.cc new file mode 100644 index 0000000000000..5f2cf0f8b482c --- /dev/null +++ b/src/v/cloud_io/staging_file.cc @@ -0,0 +1,87 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_io/staging_file.h" + +#include "bytes/iobuf.h" +#include "cloud_io/cache_service.h" + +namespace cloud_io { + +staging_file::staging_file( + cache& cache, + ss::gate::holder gate_holder, + std::filesystem::path key, + std::filesystem::path staging_path, + ss::output_stream output, + staging_file_options opts) noexcept + : _cache(&cache) + , _gate_holder(std::move(gate_holder)) + , _key(std::move(key)) + , _staging_path(std::move(staging_path)) + , _output(std::move(output)) + , _opts(opts) {} + +const std::filesystem::path& staging_file::path() const noexcept { + return _staging_path; +} + +const std::filesystem::path& staging_file::key() const noexcept { return _key; } + +ss::future<> staging_file::flush() { return _output.flush(); } + +size_t staging_file::written() const noexcept { return _written; } + +ss::future<> staging_file::append( + iobuf data, std::optional deadline) { + auto new_written = _written + data.size_bytes(); + co_await ensure_reserved(new_written, deadline); + for (auto& frag : data) { + co_await _output.write(frag.get(), frag.size()); + } + _written = new_written; +} + +ss::future<> staging_file::ensure_reserved( + size_t total_bytes, std::optional deadline) { + while (total_bytes > _reserved) { + auto objects = _reservation.has_value() ? size_t{0} : size_t{1}; + auto bytes_to_reserve = std::max( + total_bytes - _reserved, _opts.reservation_min_chunk_size); + auto guard = co_await _cache->reserve_space( + bytes_to_reserve, objects, deadline); + if (_reservation.has_value()) { + _reservation->merge(std::move(guard)); + } else { + _reservation.emplace(std::move(guard)); + } + _reserved += bytes_to_reserve; + } +} + +ss::future<> staging_file::close() { + if (!_closed) { + _closed = true; + co_await _output.close(); + } + _reservation.reset(); +} + +ss::future<> staging_file::commit() { + co_await _output.flush(); + _closed = true; + co_await _output.close(); + auto reservation = std::move(*_reservation); + _reservation.reset(); + co_await _cache->commit_staging_file( + _key, _staging_path, std::move(reservation)); +} + +} // namespace cloud_io diff --git a/src/v/cloud_io/staging_file.h b/src/v/cloud_io/staging_file.h new file mode 100644 index 0000000000000..d323215675da3 --- /dev/null +++ b/src/v/cloud_io/staging_file.h @@ -0,0 +1,105 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "cloud_io/basic_cache_service_api.h" + +#include +#include +#include +#include + +#include +#include +#include + +class iobuf; + +namespace cloud_io { + +class cache; + +struct staging_file_options { + // Minimum size at which reservations will be incrementally requested when + // appending. + size_t reservation_min_chunk_size = 1_MiB; +}; + +/// \brief Handle for writing a temporary file to the cache before it's +/// uploaded, and then making it available as a full-fledged, consumable cached +/// file (presumably after uploading). +/// +/// Callers must call either commit() or close() before destruction to +/// properly close the output stream. Orphaned .part files are cleaned +/// on cache restart. +class staging_file { +public: + staging_file(staging_file&&) noexcept = default; + staging_file& operator=(staging_file&&) noexcept = delete; + + /// Path to the staging file. Before commit() is called, callers may read + /// this file after writing (e.g. for upload to cloud storage). + const std::filesystem::path& path() const noexcept; + + /// Key in cloud storage. + const std::filesystem::path& key() const noexcept; + + /// Append data to the staging file. Reserves cache space as needed before + /// writing. If a deadline is provided, throws ss::timed_out_error if the + /// reservation wait exceeds it. + ss::future<> append( + iobuf data, + std::optional deadline = std::nullopt); + + /// Flush buffered data to disk without closing the stream. + ss::future<> flush(); + + /// Number of bytes written so far. + size_t written() const noexcept; + + /// Close the output stream without committing. Releases all + /// reservations. Call this instead of commit() on error paths. + ss::future<> close(); + + /// Flush and close the output stream, and atomically commit the + /// file into the cache. + ss::future<> commit(); + +private: + friend class cache; + + using space_reservation_guard + = basic_space_reservation_guard; + + staging_file( + cache& cache, + ss::gate::holder gate_holder, + std::filesystem::path key, + std::filesystem::path staging_path, + ss::output_stream output, + staging_file_options opts) noexcept; + + ss::future<> ensure_reserved( + size_t total_bytes, std::optional deadline); + + cache* _cache; + ss::gate::holder _gate_holder; + std::filesystem::path _key; + std::filesystem::path _staging_path; + ss::output_stream _output; + bool _closed{false}; + staging_file_options _opts; + uint64_t _written{0}; + uint64_t _reserved{0}; + std::optional _reservation; +}; + +} // namespace cloud_io diff --git a/src/v/cloud_io/tests/cache_test.cc b/src/v/cloud_io/tests/cache_test.cc index 04c53b1c090f4..3600000dda1f0 100644 --- a/src/v/cloud_io/tests/cache_test.cc +++ b/src/v/cloud_io/tests/cache_test.cc @@ -824,3 +824,110 @@ FIXTURE_TEST(test_tracker_sync_add_remove, cache_test_fixture) { BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 1024); BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 1); } + +namespace { + +constexpr cloud_io::staging_file_options test_staging_opts{ + .reservation_min_chunk_size = 4_KiB}; + +iobuf make_iobuf(char fill, size_t size) { + iobuf buf; + auto data = ss::sstring(size, fill); + buf.append(data.data(), data.size()); + return buf; +} +} // namespace + +// Full staging write cycle: create staging_file, append data, commit, +// verify the result via get(). +FIXTURE_TEST(staging_write_commit_read_cycle, cache_test_fixture) { + auto& cache = sharded_cache.local(); + + auto staging = cache.create_staging_file(KEY, test_staging_opts).get(); + + BOOST_CHECK(staging.path().native().ends_with(".part")); + BOOST_CHECK(staging.path().native().starts_with(CACHE_DIR.native())); + + staging.append(make_iobuf('x', 4_KiB)).get(); + + BOOST_CHECK(!cache.get(KEY).get().has_value()); + + staging.commit().get(); + + auto item = cache.get(KEY).get(); + BOOST_REQUIRE(item.has_value()); + BOOST_CHECK_EQUAL(item->size, 4_KiB); + item->body.close().get(); + + BOOST_CHECK(ss::file_exists(cache.get_local_path(KEY).native()).get()); +} + +// Verify that .part staging files are not evicted by trims. +FIXTURE_TEST(staging_files_survive_trim, cache_test_fixture) { + auto& cache = sharded_cache.local(); + + // Commit a regular file so the cache is non-empty. + auto data_string = create_data_string('z', 1_KiB); + put_into_cache(data_string, KEY); + + // Create a staging file for a different key. + const std::filesystem::path staging_key{ + "abc001/test_topic/staging_target.txt"}; + auto staging + = cache.create_staging_file(staging_key, test_staging_opts).get(); + staging.append(make_iobuf('y', 1_KiB)).get(); + staging.flush().get(); + BOOST_REQUIRE(ss::file_exists(staging.path().native()).get()); + + // Trim the committed file. The limits must be generous enough to + // accommodate the outstanding reservation (which may overshoot due + // to chunking) so trim_fast handles everything and does not fall + // through to trim_exhaustive (which deletes .part files). + trim_cache(8_KiB, 4); + + BOOST_CHECK(ss::file_exists(staging.path().native()).get()); + + staging.close().get(); + ss::remove_file(staging.path().native()).get(); +} + +// Committing two staging files for the same key doesn't crash or corrupt. +// The key remains readable and accounting reflects exactly one object. +FIXTURE_TEST(double_commit_same_key_is_safe, cache_test_fixture) { + auto& cache = sharded_cache.local(); + + auto staging1 = cache.create_staging_file(KEY, test_staging_opts).get(); + staging1.append(make_iobuf('a', 4_KiB)).get(); + + auto staging2 = cache.create_staging_file(KEY, test_staging_opts).get(); + staging2.append(make_iobuf('b', 4_KiB)).get(); + + staging1.commit().get(); + staging2.commit().get(); + + BOOST_CHECK_EQUAL(cache.get_usage_objects(), 1); + BOOST_CHECK_EQUAL(cache.get_usage_bytes(), 4_KiB); + + auto item = cache.get(KEY).get(); + BOOST_REQUIRE(item.has_value()); + BOOST_CHECK_EQUAL(item->size, 4_KiB); + item->body.close().get(); +} + +// Destroying a staging_file without calling commit() releases the reservation. +FIXTURE_TEST(staging_file_abandoned_releases_reservation, cache_test_fixture) { + auto& cache = sharded_cache.local(); + + auto pre_bytes = cache.get_usage_bytes(); + auto pre_objects = cache.get_usage_objects(); + + { + auto staging = cache.create_staging_file(KEY, test_staging_opts).get(); + staging.append(make_iobuf('z', 4_KiB)).get(); + staging.close().get(); + } + + BOOST_CHECK_EQUAL(cache.get_usage_bytes(), pre_bytes); + BOOST_CHECK_EQUAL(cache.get_usage_objects(), pre_objects); + BOOST_CHECK(!cache.get(KEY).get().has_value()); +} diff --git a/src/v/cloud_topics/app.cc b/src/v/cloud_topics/app.cc index 77ce05e907815..e5b7b9ebe777f 100644 --- a/src/v/cloud_topics/app.cc +++ b/src/v/cloud_topics/app.cc @@ -84,7 +84,7 @@ ss::future<> app::construct( domain_supervisor, controller, ss::sharded_parameter([this] { return &l1_io.local(); }), - config::node().l1_staging_path(), + ss::sharded_parameter([&cloud_cache] { return &cloud_cache->local(); }), ss::sharded_parameter([&remote] { return &remote->local(); }), bucket, scheduling_groups::instance().cloud_topics_metastore_sg()); @@ -342,25 +342,23 @@ ss::future<> app::cleanup_tmp_files() { co_return; } if (entry.type == ss::directory_entry_type::regular) { - if (std::string_view(entry.name).contains(".tmp")) { - auto entry_path_str = entry_path.string(); - auto rm_fut = co_await ss::coroutine::as_future( - ss::remove_file(entry_path_str)); - if (rm_fut.failed()) { - auto ex = rm_fut.get_exception(); - auto lvl = ssx::is_shutdown_exception(ex) - ? ss::log_level::debug - : ss::log_level::warn; - vlogl( - cd_log, - lvl, - "Failed to delete tmp file {}: {}", - entry_path_str, - ex); - co_return; - } - deleted_count++; + auto entry_path_str = entry_path.string(); + auto rm_fut = co_await ss::coroutine::as_future( + ss::remove_file(entry_path_str)); + if (rm_fut.failed()) { + auto ex = rm_fut.get_exception(); + auto lvl = ssx::is_shutdown_exception(ex) + ? ss::log_level::debug + : ss::log_level::warn; + vlogl( + cd_log, + lvl, + "Failed to delete staging file {}: {}", + entry_path_str, + ex); + co_return; } + deleted_count++; } })); @@ -376,11 +374,12 @@ ss::future<> app::cleanup_tmp_files() { if (deleted_count > 0) { vlog( cd_log.info, - "Cleanup deleted {} tmp file(s) from {}", + "Cleanup deleted {} staging file(s) from {}", deleted_count, staging_dir); } else { - vlog(cd_log.debug, "No tmp files found to cleanup in {}", staging_dir); + vlog( + cd_log.debug, "No staging files found to cleanup in {}", staging_dir); } } diff --git a/src/v/cloud_topics/level_one/domain/BUILD b/src/v/cloud_topics/level_one/domain/BUILD index 695b80753b1f2..140be876d7b4d 100644 --- a/src/v/cloud_topics/level_one/domain/BUILD +++ b/src/v/cloud_topics/level_one/domain/BUILD @@ -27,6 +27,7 @@ redpanda_cc_library( ":db_domain_manager", ":domain_manager_probe", ":simple_domain_manager", + "//src/v/cloud_io:cache", "//src/v/cloud_topics:logger", "//src/v/cloud_topics/level_one/common:abstract_io", "//src/v/cloud_topics/level_one/metastore/lsm:stm", @@ -118,6 +119,7 @@ redpanda_cc_library( ":domain_manager", ":domain_manager_probe", "//src/v/base", + "//src/v/cloud_io:cache", "//src/v/cloud_topics/level_one/common:abstract_io", "//src/v/cloud_topics/level_one/common:object_id", "//src/v/cloud_topics/level_one/metastore:rpc_types", diff --git a/src/v/cloud_topics/level_one/domain/db_domain_manager.cc b/src/v/cloud_topics/level_one/domain/db_domain_manager.cc index c85967ed39683..79486d7721f8a 100644 --- a/src/v/cloud_topics/level_one/domain/db_domain_manager.cc +++ b/src/v/cloud_topics/level_one/domain/db_domain_manager.cc @@ -242,14 +242,14 @@ db_domain_manager::entity_locks::acquire_objects( db_domain_manager::db_domain_manager( model::term_id expected_term, ss::shared_ptr stm, - std::filesystem::path staging_dir, + cloud_io::cache* cache, cloud_io::remote* remote, cloud_storage_clients::bucket_name bucket, io* object_io, ss::scheduling_group sg, domain_manager_probe* probe) : expected_term_(expected_term) - , staging_dir_(std::move(staging_dir)) + , cache_(cache) , remote_(remote) , bucket_(std::move(bucket)) , object_io_(object_io) @@ -1620,7 +1620,7 @@ ss::future> db_domain_manager::maybe_open_db() { vlog( cd_log.debug, "Opening database with expected term {}", expected_term_); auto db_res = co_await replicated_database::open( - expected_term_, stm_.get(), staging_dir_, remote_, bucket_, as_, sg_); + expected_term_, stm_.get(), cache_, remote_, bucket_, as_, sg_); if (!db_res.has_value()) { co_return std::unexpected( log_and_convert(db_res.error(), "Failed to open database: ")); @@ -1804,7 +1804,7 @@ db_domain_manager::restore_domain(rpc::restore_domain_request req) { "Re-opening database with expected term {}", expected_term_); auto db_res = co_await replicated_database::open( - expected_term_, stm_.get(), staging_dir_, remote_, bucket_, as_, sg_); + expected_term_, stm_.get(), cache_, remote_, bucket_, as_, sg_); if (!db_res.has_value()) { co_return rpc::restore_domain_reply{ .ec = log_and_convert(db_res.error(), "Failed to reopen database: "), diff --git a/src/v/cloud_topics/level_one/domain/db_domain_manager.h b/src/v/cloud_topics/level_one/domain/db_domain_manager.h index f6a2bfed4e079..fb3abf1fc299a 100644 --- a/src/v/cloud_topics/level_one/domain/db_domain_manager.h +++ b/src/v/cloud_topics/level_one/domain/db_domain_manager.h @@ -10,6 +10,7 @@ #pragma once #include "absl/container/btree_set.h" +#include "cloud_io/cache_service.h" #include "cloud_topics/level_one/common/object_id.h" #include "cloud_topics/level_one/domain/domain_manager.h" #include "cloud_topics/level_one/domain/domain_manager_probe.h" @@ -36,7 +37,7 @@ class db_domain_manager final : public domain_manager { explicit db_domain_manager( model::term_id expected_term, ss::shared_ptr stm, - std::filesystem::path staging_dir, + cloud_io::cache* cache, cloud_io::remote* remote, cloud_storage_clients::bucket_name bucket, io* object_io, @@ -227,7 +228,7 @@ class db_domain_manager final : public domain_manager { ss::gate gate_; ss::abort_source as_; model::term_id expected_term_; - std::filesystem::path staging_dir_; + cloud_io::cache* cache_; cloud_io::remote* remote_; cloud_storage_clients::bucket_name bucket_; io* object_io_; diff --git a/src/v/cloud_topics/level_one/domain/domain_supervisor.cc b/src/v/cloud_topics/level_one/domain/domain_supervisor.cc index 2998877b07503..150e8d5548372 100644 --- a/src/v/cloud_topics/level_one/domain/domain_supervisor.cc +++ b/src/v/cloud_topics/level_one/domain/domain_supervisor.cc @@ -10,6 +10,7 @@ #include "cloud_topics/level_one/domain/domain_supervisor.h" +#include "cloud_io/cache_service.h" #include "cloud_topics/level_one/common/abstract_io.h" #include "cloud_topics/level_one/domain/db_domain_manager.h" #include "cloud_topics/level_one/domain/domain_manager_probe.h" @@ -35,13 +36,13 @@ class domain_supervisor::impl { explicit impl( cluster::controller* controller, io* io, - std::filesystem::path staging_dir, + cloud_io::cache* cache, cloud_io::remote* remote, cloud_storage_clients::bucket_name bucket, ss::scheduling_group sg) : _controller(controller) , _object_io(io) - , _staging_dir(std::move(staging_dir)) + , _cache(cache) , _remote(remote) , _bucket(std::move(bucket)) , _sg(sg) @@ -330,7 +331,7 @@ class domain_supervisor::impl { domain_mgr = ss::make_shared( *expected_term, stm_manager->get(), - _staging_dir, + _cache, _remote, _bucket, _object_io, @@ -346,7 +347,7 @@ class domain_supervisor::impl { cluster::controller* _controller; io* _object_io; - std::filesystem::path _staging_dir; + cloud_io::cache* _cache; cloud_io::remote* _remote; cloud_storage_clients::bucket_name _bucket; ss::scheduling_group _sg; @@ -370,18 +371,13 @@ class domain_supervisor::impl { domain_supervisor::domain_supervisor( cluster::controller* controller, io* io, - std::filesystem::path staging_dir, + cloud_io::cache* cache, cloud_io::remote* remote, cloud_storage_clients::bucket_name bucket, ss::scheduling_group sg) : _impl( std::make_unique( - controller, - io, - std::move(staging_dir), - remote, - std::move(bucket), - sg)) {} + controller, io, cache, remote, std::move(bucket), sg)) {} domain_supervisor::~domain_supervisor() = default; diff --git a/src/v/cloud_topics/level_one/domain/domain_supervisor.h b/src/v/cloud_topics/level_one/domain/domain_supervisor.h index 89922a91009ee..e7cba4d9dabca 100644 --- a/src/v/cloud_topics/level_one/domain/domain_supervisor.h +++ b/src/v/cloud_topics/level_one/domain/domain_supervisor.h @@ -25,6 +25,7 @@ class partition; } // namespace cluster namespace cloud_io { +class cache; class remote; } // namespace cloud_io @@ -41,7 +42,7 @@ class domain_supervisor { explicit domain_supervisor( cluster::controller*, io*, - std::filesystem::path staging_dir, + cloud_io::cache*, cloud_io::remote*, cloud_storage_clients::bucket_name bucket, ss::scheduling_group sg); diff --git a/src/v/cloud_topics/level_one/domain/tests/BUILD b/src/v/cloud_topics/level_one/domain/tests/BUILD index 8d717c6995824..b5b2d97fffa59 100644 --- a/src/v/cloud_topics/level_one/domain/tests/BUILD +++ b/src/v/cloud_topics/level_one/domain/tests/BUILD @@ -24,6 +24,7 @@ redpanda_cc_gtest( cpu = 1, memory = "2GiB", deps = [ + "//src/v/cloud_io:cache", "//src/v/cloud_io:remote", "//src/v/cloud_io/tests:s3_imposter", "//src/v/cloud_io/tests:scoped_remote", @@ -42,10 +43,12 @@ redpanda_cc_gtest( "//src/v/cloud_topics/level_one/metastore/lsm:write_batch_row", "//src/v/config", "//src/v/container:chunked_vector", + "//src/v/lsm/io:cloud_cache_persistence", "//src/v/lsm/io:cloud_persistence", "//src/v/lsm/io:persistence", "//src/v/model", "//src/v/raft/tests:raft_fixture", + "//src/v/storage:disk", "//src/v/test_utils:async", "//src/v/test_utils:gtest", "//src/v/test_utils:scoped_config", diff --git a/src/v/cloud_topics/level_one/domain/tests/db_domain_manager_test.cc b/src/v/cloud_topics/level_one/domain/tests/db_domain_manager_test.cc index 18fcf48a64d22..1120211ec82af 100644 --- a/src/v/cloud_topics/level_one/domain/tests/db_domain_manager_test.cc +++ b/src/v/cloud_topics/level_one/domain/tests/db_domain_manager_test.cc @@ -8,6 +8,7 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ +#include "cloud_io/cache_service.h" #include "cloud_io/remote.h" #include "cloud_io/tests/s3_imposter.h" #include "cloud_io/tests/scoped_remote.h" @@ -23,10 +24,12 @@ #include "cloud_topics/level_one/metastore/rpc_types.h" #include "cloud_topics/level_one/metastore/state_update.h" #include "config/node_config.h" +#include "lsm/io/cloud_cache_persistence.h" #include "lsm/io/cloud_persistence.h" #include "lsm/io/persistence.h" #include "model/fundamental.h" #include "raft/tests/raft_fixture.h" +#include "storage/disk.h" #include "test_utils/async.h" #include "test_utils/scoped_config.h" #include "test_utils/tmp_dir.h" @@ -57,10 +60,12 @@ struct domain_manager_node { ss::shared_ptr s, cloud_io::remote* remote, const cloud_storage_clients::bucket_name& bucket, + cloud_io::cache* cache, const ss::sstring& staging_path) : stm_ptr(std::move(s)) , remote(remote) , bucket(bucket) + , cache(cache) , staging_directory(staging_path.data()) , object_io( staging_directory.get_path(), @@ -74,7 +79,7 @@ struct domain_manager_node { auto mgr = std::make_unique( stm_ptr->raft()->confirmed_term(), stm_ptr, - staging_directory.get_path(), + cache, remote, bucket, &object_io, @@ -118,6 +123,7 @@ struct domain_manager_node { ss::shared_ptr stm_ptr; cloud_io::remote* remote; const cloud_storage_clients::bucket_name& bucket; + cloud_io::cache* cache; temporary_dir staging_directory; file_io object_io; domain_manager_probe probe; @@ -314,6 +320,33 @@ class DbDomainManagerTest set_expectations_and_listen({}); sr = cloud_io::scoped_remote::create(10, conf); + // Set up cloud cache. + cache_tmpdir = std::make_unique("db_dm_cache"); + auto cache_dir = cache_tmpdir->get_path() / "cache"; + cloud_io::cache::initialize(cache_dir).get(); + test_cache + .start( + cache_dir, + 30_GiB, + config::mock_binding(0.0), + config::mock_binding(100_MiB), + config::mock_binding>(std::nullopt), + config::mock_binding(100000), + config::mock_binding(3)) + .get(); + test_cache.invoke_on_all([](cloud_io::cache& c) { return c.start(); }) + .get(); + test_cache + .invoke_on( + ss::shard_id{0}, + [](cloud_io::cache& c) { + c.notify_disk_status( + 100ULL * 1024 * 1024 * 1024, + 50ULL * 1024 * 1024 * 1024, + storage::disk_space_alert::ok); + }) + .get(); + raft::raft_fixture::SetUpAsync().get(); // Create our STMs. @@ -331,10 +364,13 @@ class DbDomainManagerTest node->start(std::move(builder)).get(); - // Create staging directory for this node. auto staging_path = fmt::format("db_domain_manager_test_{}", id()); dm_nodes.at(id()) = std::make_unique( - std::move(s), &sr->remote.local(), bucket_name, staging_path); + std::move(s), + &sr->remote.local(), + bucket_name, + &test_cache.local(), + staging_path); } opt_ref leader; ASSERT_NO_FATAL_FAILURE(wait_for_leader(leader).get()); @@ -355,6 +391,8 @@ class DbDomainManagerTest } raft::raft_fixture::TearDownAsync().get(); sr.reset(); + test_cache.stop().get(); + cache_tmpdir.reset(); } // Returns the node of the current leader. @@ -628,6 +666,8 @@ class DbDomainManagerTest std::array, num_nodes> dm_nodes; scoped_config cfg; std::unique_ptr sr; + std::unique_ptr cache_tmpdir; + ss::sharded test_cache; // Initial leader and manager on that leader. domain_manager_node* initial_leader{nullptr}; diff --git a/src/v/cloud_topics/level_one/metastore/lsm/BUILD b/src/v/cloud_topics/level_one/metastore/lsm/BUILD index 78ce7cdc46404..1fbc793e00b20 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/BUILD +++ b/src/v/cloud_topics/level_one/metastore/lsm/BUILD @@ -223,6 +223,7 @@ redpanda_cc_library( "//src/v/cloud_topics:logger", "//src/v/cloud_topics/level_one/metastore:domain_uuid", "//src/v/config", + "//src/v/lsm/io:cloud_cache_persistence", "//src/v/lsm/io:cloud_persistence", "//src/v/model:batch_builder", "//src/v/serde", @@ -234,6 +235,7 @@ redpanda_cc_library( ":lsm_update", ":stm", ":write_batch_row", + "//src/v/cloud_io:cache", "//src/v/cloud_io:remote", "//src/v/cloud_storage_clients", "//src/v/container:chunked_vector", diff --git a/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc b/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc index a3f06ee56dd4b..2d8e520b0d366 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc @@ -16,6 +16,7 @@ #include "cloud_topics/level_one/metastore/lsm/stm.h" #include "cloud_topics/logger.h" #include "config/configuration.h" +#include "lsm/io/cloud_cache_persistence.h" #include "lsm/io/cloud_persistence.h" #include "lsm/io/persistence.h" #include "lsm/proto/manifest.proto.h" @@ -58,7 +59,7 @@ ss::future s, cloud_io::remote* remote, const cloud_storage_clients::bucket_name& bucket, - const ss::sstring& staging_path) + cloud_io::cache* cache) : stm_ptr(std::move(s)) , remote(remote) , bucket(bucket) - , staging_directory(staging_path.data()) {} + , cache(cache) {} ss::future> open_db() { auto ret = co_await replicated_database::open( stm_ptr->raft()->confirmed_term(), stm_ptr.get(), - staging_directory.get_path(), + cache, remote, bucket, as, @@ -107,7 +110,7 @@ struct replicated_db_node { ss::shared_ptr stm_ptr; cloud_io::remote* remote; const cloud_storage_clients::bucket_name& bucket; - temporary_dir staging_directory; + cloud_io::cache* cache; ss::abort_source as; std::list> dbs; }; @@ -131,6 +134,33 @@ class ReplicatedDatabaseTest set_expectations_and_listen({}); sr = cloud_io::scoped_remote::create(10, conf); + // Set up cloud cache. + cache_tmpdir = std::make_unique("replicated_db_cache"); + auto cache_dir = cache_tmpdir->get_path() / "cache"; + cloud_io::cache::initialize(cache_dir).get(); + test_cache + .start( + cache_dir, + 30_GiB, + config::mock_binding(0.0), + config::mock_binding(100_MiB), + config::mock_binding>(std::nullopt), + config::mock_binding(100000), + config::mock_binding(3)) + .get(); + test_cache.invoke_on_all([](cloud_io::cache& c) { return c.start(); }) + .get(); + test_cache + .invoke_on( + ss::shard_id{0}, + [](cloud_io::cache& c) { + c.notify_disk_status( + 100ULL * 1024 * 1024 * 1024, + 50ULL * 1024 * 1024 * 1024, + storage::disk_space_alert::ok); + }) + .get(); + raft::raft_fixture::SetUpAsync().get(); // Create our STMs. @@ -148,10 +178,11 @@ class ReplicatedDatabaseTest node->start(std::move(builder)).get(); - // Create staging directory for this node. - auto staging_path = fmt::format("replicated_db_test_{}", id()); db_nodes.at(id()) = std::make_unique( - std::move(s), &sr->remote.local(), bucket_name, staging_path); + std::move(s), + &sr->remote.local(), + bucket_name, + &test_cache.local()); } opt_ref leader; ASSERT_NO_FATAL_FAILURE(wait_for_leader(leader).get()); @@ -169,7 +200,9 @@ class ReplicatedDatabaseTest } } raft::raft_fixture::TearDownAsync().get(); + test_cache.stop().get(); sr.reset(); + cache_tmpdir.reset(); } // Returns the node on the current leader. @@ -214,17 +247,15 @@ class ReplicatedDatabaseTest chunked_vector rows) { auto domain_prefix = cloud_storage_clients::object_key{ domain_cloud_prefix(domain_uuid)}; - temporary_dir tmp("lsm_staging_scratch"); auto cloud_db = lsm::database::open( {.database_epoch = db_epoch}, lsm::io::persistence{ - .data = lsm::io::open_cloud_data_persistence( - tmp.get_path(), + .data = lsm::io::open_cloud_cache_data_persistence( + &test_cache.local(), &sr->remote.local(), bucket_name, - domain_prefix, - ss::sstring(domain_uuid())) + domain_prefix) .get(), .metadata = lsm::io::open_cloud_metadata_persistence( &sr->remote.local(), bucket_name, domain_prefix) @@ -259,6 +290,8 @@ class ReplicatedDatabaseTest std::array, num_nodes> db_nodes; scoped_config cfg; std::unique_ptr sr; + std::unique_ptr cache_tmpdir; + ss::sharded test_cache; // Initial leader and a database opened on that leader. replicated_db_node* initial_leader; diff --git a/src/v/cloud_topics/read_replica/BUILD b/src/v/cloud_topics/read_replica/BUILD index e1477ec2a644b..8990a82fadacb 100644 --- a/src/v/cloud_topics/read_replica/BUILD +++ b/src/v/cloud_topics/read_replica/BUILD @@ -88,6 +88,7 @@ redpanda_cc_library( "//src/v/cloud_topics/level_one/common:file_io", "//src/v/cloud_topics/level_one/metastore:manifest_io", "//src/v/config", + "//src/v/lsm/io:cloud_cache_persistence", "//src/v/lsm/io:cloud_persistence", "//src/v/ssx:future_util", "//src/v/ssx:sleep_abortable", diff --git a/src/v/cloud_topics/read_replica/snapshot_manager.cc b/src/v/cloud_topics/read_replica/snapshot_manager.cc index 34f697b867240..af13e2b264a47 100644 --- a/src/v/cloud_topics/read_replica/snapshot_manager.cc +++ b/src/v/cloud_topics/read_replica/snapshot_manager.cc @@ -16,6 +16,7 @@ #include "cloud_topics/logger.h" #include "cloud_topics/read_replica/snapshot_metastore.h" #include "config/configuration.h" +#include "lsm/io/cloud_cache_persistence.h" #include "lsm/io/cloud_persistence.h" #include "ssx/future-util.h" #include "ssx/sleep_abortable.h" @@ -69,10 +70,9 @@ class database_refresher { ss::gate gate_; ss::abort_source as_; - // TODO: integrate with the cloud cache? - const std::filesystem::path staging_directory_; const cloud_storage_clients::bucket_name bucket_; cloud_io::remote* remote_; + cloud_io::cache* cache_; l1::domain_uuid domain_uuid_; prefix_logger logger_; @@ -98,15 +98,15 @@ database_refresher::database_refresher( cloud_io::remote* remote, cloud_io::cache* cache, l1::domain_uuid domain_uuid) - : staging_directory_(std::move(staging_directory)) - , bucket_(std::move(bucket)) + : bucket_(std::move(bucket)) , remote_(remote) + , cache_(cache) , domain_uuid_(domain_uuid) , logger_( cd_log, fmt::format("database_refresher {}, {}", domain_uuid_, bucket_)) , io_( std::make_unique( - staging_directory_, remote_, bucket_, cache)) {} + std::move(staging_directory), remote_, bucket_, cache_)) {} void database_refresher::start() { ssx::spawn_with_gate(gate_, [this] { return run_loop(); }); @@ -135,12 +135,8 @@ ss::future<> database_refresher::open_or_refresh() { } cloud_storage_clients::object_key domain_prefix{ domain_cloud_prefix(domain_uuid_)}; - auto data_persist = co_await lsm::io::open_cloud_data_persistence( - staging_directory_, - remote_, - bucket_, - domain_prefix, - ss::sstring(domain_uuid_())); + auto data_persist = co_await lsm::io::open_cloud_cache_data_persistence( + cache_, remote_, bucket_, domain_prefix); auto meta_persist = co_await lsm::io::open_cloud_metadata_persistence( remote_, bucket_, domain_prefix); lsm::io::persistence io{ diff --git a/src/v/cloud_topics/read_replica/tests/BUILD b/src/v/cloud_topics/read_replica/tests/BUILD index 245e0c359b8d0..2440acaf042b6 100644 --- a/src/v/cloud_topics/read_replica/tests/BUILD +++ b/src/v/cloud_topics/read_replica/tests/BUILD @@ -57,6 +57,7 @@ redpanda_cc_gtest( cpu = 1, deps = [ ":db_utils", + "//src/v/cloud_io:cache", "//src/v/cloud_io:remote", "//src/v/cloud_io/tests:s3_imposter", "//src/v/cloud_io/tests:scoped_remote", @@ -71,6 +72,7 @@ redpanda_cc_gtest( "//src/v/cloud_topics/read_replica:stm", "//src/v/lsm", "//src/v/raft/tests:raft_fixture", + "//src/v/storage:disk", "//src/v/test_utils:async", "//src/v/test_utils:gtest", "//src/v/test_utils:scoped_config", diff --git a/src/v/cloud_topics/read_replica/tests/state_refresh_loop_test.cc b/src/v/cloud_topics/read_replica/tests/state_refresh_loop_test.cc index 70ceddaf05bda..a4e3711547162 100644 --- a/src/v/cloud_topics/read_replica/tests/state_refresh_loop_test.cc +++ b/src/v/cloud_topics/read_replica/tests/state_refresh_loop_test.cc @@ -8,6 +8,7 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ +#include "cloud_io/cache_service.h" #include "cloud_io/remote.h" #include "cloud_io/tests/s3_imposter.h" #include "cloud_io/tests/scoped_remote.h" @@ -21,6 +22,7 @@ #include "cloud_topics/read_replica/tests/db_utils.h" #include "lsm/lsm.h" #include "raft/tests/raft_fixture.h" +#include "storage/disk.h" #include "test_utils/async.h" #include "test_utils/scoped_config.h" #include "test_utils/test.h" @@ -76,6 +78,7 @@ class StateRefreshLoopTest refresh_node( ss::shared_ptr stm, cloud_io::remote* r, + cloud_io::cache* c, const cloud_storage_clients::bucket_name& b, std::filesystem::path staging) : stm_ptr(std::move(stm)) @@ -83,7 +86,7 @@ class StateRefreshLoopTest , bucket(b) , staging_path(std::move(staging)) { snapshot_mgr = std::make_unique( - staging_path, remote, nullptr); + staging_path, remote, c); } void start_refresh_loop( @@ -125,6 +128,31 @@ class StateRefreshLoopTest sr_ = cloud_io::scoped_remote::create(10, conf); + auto cache_dir = cache_tmpdir_.get_path() / "cache"; + cloud_io::cache::initialize(cache_dir).get(); + test_cache_ + .start( + cache_dir, + 30_GiB, + config::mock_binding(0.0), + config::mock_binding(100_MiB), + config::mock_binding>(std::nullopt), + config::mock_binding(100000), + config::mock_binding(3)) + .get(); + test_cache_.invoke_on_all([](cloud_io::cache& c) { return c.start(); }) + .get(); + test_cache_ + .invoke_on( + ss::shard_id{0}, + [](cloud_io::cache& c) { + c.notify_disk_status( + 100ULL * 1024 * 1024 * 1024, + 50ULL * 1024 * 1024 * 1024, + storage::disk_space_alert::ok); + }) + .get(); + test_tidp_ = model::topic_id_partition{ model::topic_id{uuid_t::create()}, model::partition_id{0}}; test_remote_label_ = cloud_storage::remote_label{ @@ -154,7 +182,11 @@ class StateRefreshLoopTest / fmt::format("node_{}", node_idx++); std::filesystem::create_directories(node_staging_path); refresh_nodes_[id] = std::make_unique( - stm_ptr, &sr_->remote.local(), bucket_name, node_staging_path); + stm_ptr, + &sr_->remote.local(), + &test_cache_.local(), + bucket_name, + node_staging_path); } } @@ -176,6 +208,7 @@ class StateRefreshLoopTest writer_dbs_.clear(); raft::stm_raft_fixture::TearDownAsync().get(); refresh_nodes_.clear(); + test_cache_.stop().get(); sr_.reset(); } @@ -245,6 +278,8 @@ class StateRefreshLoopTest protected: temporary_dir reader_staging_base_dir_; + temporary_dir cache_tmpdir_{"state_refresh_loop_cache"}; + ss::sharded test_cache_; absl::node_hash_map> refresh_nodes_; scoped_config cfg_; diff --git a/src/v/lsm/db/tests/impl_test.cc b/src/v/lsm/db/tests/impl_test.cc index f8a8e702a30e5..95a940a3ceed2 100644 --- a/src/v/lsm/db/tests/impl_test.cc +++ b/src/v/lsm/db/tests/impl_test.cc @@ -690,7 +690,9 @@ TEST_F(ImplTest, RefreshFailsForLowerSeqno) { // Nefarious case: open another database so it gets a low seqno and we can // flush a seqno lower than the refreshed database below. - auto* another_db = open(make_options()); + auto other_opts = make_options(); + other_opts->database_epoch = lsm::internal::database_epoch{1}; + auto* another_db = open(other_opts); // Flush a couple more times to bump the seqno. write_at_least(512_KiB); diff --git a/src/v/lsm/io/BUILD b/src/v/lsm/io/BUILD index 5dc1d58f2f02d..b9090402318c5 100644 --- a/src/v/lsm/io/BUILD +++ b/src/v/lsm/io/BUILD @@ -47,11 +47,34 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "cloud_data_persistence_base", + srcs = ["cloud_data_persistence_base.cc"], + hdrs = ["cloud_data_persistence_base.h"], + implementation_deps = [ + "//src/v/config", + "//src/v/lsm/core:exceptions", + "//src/v/lsm/core/internal:files", + "//src/v/ssx:future_util", + "//src/v/utils:retry_chain_node", + ], + deps = [ + ":persistence", + "//src/v/cloud_io:io_result", + "//src/v/cloud_io:remote", + "//src/v/cloud_storage_clients", + "//src/v/model", + "//src/v/utils:stream_provider", + "@seastar", + ], +) + redpanda_cc_library( name = "cloud_persistence", srcs = ["cloud_persistence.cc"], hdrs = ["cloud_persistence.h"], implementation_deps = [ + ":cloud_data_persistence_base", ":file_io", "//src/v/base", "//src/v/bytes:ioarray", @@ -73,6 +96,27 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "cloud_cache_persistence", + srcs = ["cloud_cache_persistence.cc"], + hdrs = ["cloud_cache_persistence.h"], + implementation_deps = [ + ":cloud_data_persistence_base", + ":file_io", + "//src/v/cloud_io:io_result", + "//src/v/lsm/core:exceptions", + "//src/v/lsm/core/internal:files", + "//src/v/ssx:future_util", + ], + deps = [ + ":persistence", + "//src/v/cloud_io:cache", + "//src/v/cloud_io:remote", + "//src/v/cloud_storage_clients", + "@seastar", + ], +) + redpanda_cc_library( name = "readahead_file_reader", srcs = ["readahead_file_reader.cc"], diff --git a/src/v/lsm/io/cloud_cache_persistence.cc b/src/v/lsm/io/cloud_cache_persistence.cc new file mode 100644 index 0000000000000..26100e2de93d4 --- /dev/null +++ b/src/v/lsm/io/cloud_cache_persistence.cc @@ -0,0 +1,250 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "lsm/io/cloud_cache_persistence.h" + +#include "cloud_io/io_result.h" +#include "lsm/core/exceptions.h" +#include "lsm/core/internal/files.h" +#include "lsm/io/cloud_data_persistence_base.h" +#include "lsm/io/file_io.h" +#include "ssx/future-util.h" + +#include +#include + +#include + +namespace lsm::io { + +namespace { + +static constexpr auto reservation_timeout = std::chrono::seconds(30); + +class cache_staged_file_writer : public sequential_file_writer { +public: + cache_staged_file_writer( + cloud_io::staging_file staging, + cloud_io::remote* remote, + ss::abort_source* as, + cloud_storage_clients::bucket_name bucket, + cloud_storage_clients::object_key cloud_key) + : _staging(std::move(staging)) + , _remote(remote) + , _as(as) + , _bucket(std::move(bucket)) + , _cloud_key(std::move(cloud_key)) {} + + ss::future<> append(iobuf b) override { + auto deadline = ss::lowres_clock::now() + reservation_timeout; + auto fut = co_await ss::coroutine::as_future( + _staging.append(std::move(b), deadline)); + if (fut.failed()) { + _failed = true; + std::rethrow_exception(fut.get_exception()); + } + } + + ss::future<> close() override { + if (_failed) { + co_await cleanup_staging_file(); + co_return; + } + + auto flush_fut = co_await ss::coroutine::as_future(_staging.flush()); + if (flush_fut.failed()) { + auto ex = flush_fut.get_exception(); + co_await cleanup_staging_file(); + throw_as_lsm_ex(ex, "failed to flush staging file"); + } + + auto upload_fut = co_await ss::coroutine::as_future(upload_file( + *_remote, + *_as, + _bucket, + _cloud_key, + _staging.path(), + _staging.written())); + if (upload_fut.failed()) { + auto ex = upload_fut.get_exception(); + co_await cleanup_staging_file(); + throw_as_lsm_ex( + ex, fmt::format("failed to upload SST file {}", _cloud_key)); + } + + auto commit_fut = co_await ss::coroutine::as_future(_staging.commit()); + if (commit_fut.failed()) { + auto ex = commit_fut.get_exception(); + co_await cleanup_staging_file(); + throw_as_lsm_ex( + ex, "failed to commit to cache (data safe in cloud)"); + } + } + + fmt::iterator format_to(fmt::iterator it) const override { + return fmt::format_to( + it, + "{{cache_staging={}, upload={}, written={}}}", + _staging.path(), + _cloud_key, + _staging.written()); + } + +private: + ss::future<> cleanup_staging_file() { + co_await _staging.close(); + auto fut = co_await ss::coroutine::as_future( + ss::remove_file(_staging.path().native())); + if (fut.failed()) { + fut.ignore_ready_future(); + } + } + + bool _failed = false; + cloud_io::staging_file _staging; + cloud_io::remote* _remote; + ss::abort_source* _as; + cloud_storage_clients::bucket_name _bucket; + cloud_storage_clients::object_key _cloud_key; +}; + +class cloud_cache_data_persistence : public cloud_data_persistence_base { +public: + cloud_cache_data_persistence( + cloud_io::cache* cache, + cloud_io::remote* remote, + cloud_storage_clients::bucket_name bucket, + cloud_storage_clients::object_key prefix) + : cloud_data_persistence_base( + remote, std::move(bucket), std::move(prefix)) + , _cache(cache) {} + + ss::future> + open_random_access_reader(internal::file_handle h) override { + _as.check(); + auto _ = _gate.hold(); + auto filename = internal::sst_file_name(h); + auto key = cache_key(filename); + + // Check the cache and download on miss, retrying if there was an + // eviction between download and open. Bounded by the retry_chain_node. + auto root = make_cloud_rtc(_as); + while (true) { + auto reader = co_await open_cached_reader(key); + if (reader) { + co_return reader; + } + + auto dl_fut = co_await ss::coroutine::as_future( + _remote->download_stream( + { + .bucket = _bucket, + .key = cloud_key(filename), + .parent_rtc = root, + }, + [this, + &key](uint64_t content_length, ss::input_stream stream) { + return save_to_cache( + content_length, std::move(stream), key); + }, + "SST file download", + /*acquire_hydration_units=*/true)); + if (dl_fut.failed()) { + throw_as_lsm_ex( + dl_fut.get_exception(), "error downloading file"); + } + if (!check_cloud_result(dl_fut.get())) { + co_return std::nullopt; + } + } + } + + ss::future> + open_sequential_writer(internal::file_handle h) override { + _as.check(); + auto _ = _gate.hold(); + auto filename = internal::sst_file_name(h); + auto key = cache_key(filename); + auto staging_fut = co_await ss::coroutine::as_future( + _cache->create_staging_file(key, cloud_io::staging_file_options{})); + if (staging_fut.failed()) { + throw_as_lsm_ex( + staging_fut.get_exception(), "error opening file writer"); + } + co_return std::make_unique( + std::move(staging_fut.get()), + _remote, + &_as, + _bucket, + cloud_key(filename)); + } + + ss::future<> remove_file_locally(std::string_view filename) override { + co_await _cache->invalidate(cache_key(filename)); + } + +private: + ss::future> + open_cached_reader(const std::filesystem::path& key) { + try { + auto item = co_await _cache->get(key); + if (!item.has_value()) { + co_return std::nullopt; + } + auto local_path = _cache->get_local_path(key); + std::unique_ptr ptr; + ptr = std::make_unique( + std::move(local_path), std::move(item->body)); + co_return ptr; + } catch (const std::system_error& e) { + if (e.code() == std::errc::no_such_file_or_directory) { + co_return std::nullopt; + } + throw io_error_exception( + e.code(), "io error opening cached reader: {}", e); + } catch (...) { + auto ex = std::current_exception(); + if (ssx::is_shutdown_exception(ex)) { + throw abort_requested_exception( + "shutdown exception opening cached reader: {}", ex); + } + throw io_error_exception("io error opening cached reader: {}", ex); + } + } + + ss::future save_to_cache( + uint64_t content_length, + ss::input_stream input_stream, + const std::filesystem::path& key) { + auto reservation = co_await _cache->reserve_space(content_length, 1); + co_await _cache->put(key, input_stream, reservation); + co_await input_stream.close(); + co_return content_length; + } + + std::filesystem::path cache_key(std::string_view name) { + return std::filesystem::path("lsm") / _prefix() / name; + } + + cloud_io::cache* _cache; +}; + +} // namespace + +ss::future> open_cloud_cache_data_persistence( + cloud_io::cache* cache, + cloud_io::remote* remote, + cloud_storage_clients::bucket_name bucket, + cloud_storage_clients::object_key prefix) { + co_return std::make_unique( + cache, remote, std::move(bucket), std::move(prefix)); +} + +} // namespace lsm::io diff --git a/src/v/lsm/io/cloud_cache_persistence.h b/src/v/lsm/io/cloud_cache_persistence.h new file mode 100644 index 0000000000000..c02b097270d25 --- /dev/null +++ b/src/v/lsm/io/cloud_cache_persistence.h @@ -0,0 +1,27 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "cloud_io/cache_service.h" +#include "cloud_io/remote.h" +#include "cloud_storage_clients/types.h" +#include "lsm/io/persistence.h" + +namespace lsm::io { + +/// Open a data persistence backed by the cloud cache and cloud storage. +ss::future> open_cloud_cache_data_persistence( + cloud_io::cache* cache, + cloud_io::remote* remote, + cloud_storage_clients::bucket_name bucket, + cloud_storage_clients::object_key prefix); + +} // namespace lsm::io diff --git a/src/v/lsm/io/cloud_data_persistence_base.cc b/src/v/lsm/io/cloud_data_persistence_base.cc new file mode 100644 index 0000000000000..00adb001ed200 --- /dev/null +++ b/src/v/lsm/io/cloud_data_persistence_base.cc @@ -0,0 +1,212 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "lsm/io/cloud_data_persistence_base.h" + +#include "cloud_io/remote.h" +#include "config/configuration.h" +#include "lsm/core/exceptions.h" +#include "lsm/core/internal/files.h" +#include "ssx/future-util.h" + +#include +#include +#include + +namespace lsm::io { + +retry_chain_node make_cloud_rtc(ss::abort_source& as) { + constexpr auto timeout = std::chrono::seconds(10); + auto backoff + = config::shard_local_cfg().cloud_storage_initial_backoff_ms.value(); + return retry_chain_node{as, timeout, backoff}; +} + +bool check_cloud_result(cloud_io::download_result result) { + switch (result) { + case cloud_io::download_result::success: + return true; + case cloud_io::download_result::notfound: + return false; + case cloud_io::download_result::timedout: + throw io_error_exception( + std::make_error_code(std::errc::timed_out), + "timeout reading from cloud storage"); + case cloud_io::download_result::failed: + throw io_error_exception("failed to read from cloud storage"); + } +} + +void check_cloud_result(cloud_io::upload_result result) { + switch (result) { + case cloud_io::upload_result::success: + return; + case cloud_io::upload_result::timedout: + throw io_error_exception( + std::make_error_code(std::errc::timed_out), + "timeout writing to cloud storage"); + case cloud_io::upload_result::failed: + throw io_error_exception("failed to write to cloud storage"); + case cloud_io::upload_result::cancelled: + throw io_error_exception("cloud storage write cancelled"); + } +} + +cloud_storage_clients::object_key join_cloud_key( + const cloud_storage_clients::object_key& prefix, std::string_view suffix) { + return cloud_storage_clients::object_key(prefix() / suffix); +} + +one_time_stream_provider::one_time_stream_provider(ss::input_stream s) + : _st(std::move(s)) {} + +ss::input_stream one_time_stream_provider::take_stream() { + auto tmp = std::exchange(_st, std::nullopt); + return std::move(tmp.value()); +} + +ss::future<> one_time_stream_provider::close() { + if (_st.has_value()) { + return _st->close().then([this] { _st = std::nullopt; }); + } + return ss::now(); +} + +ss::future<> upload_file( + cloud_io::remote& remote, + ss::abort_source& as, + const cloud_storage_clients::bucket_name& bucket, + const cloud_storage_clients::object_key& cloud_key, + const std::filesystem::path& local_path, + size_t written) { + auto root = make_cloud_rtc(as); + lazy_abort_source las{[&as] { + return as.abort_requested() ? std::make_optional("abort requested") + : std::nullopt; + }}; + auto result = co_await remote.upload_stream( + { + .bucket = bucket, + .key = cloud_key, + .parent_rtc = root, + }, + written, + [&local_path]() { + return ss::open_file_dma(local_path.native(), ss::open_flags::ro) + .then([](ss::file file) -> std::unique_ptr { + ss::file_input_stream_options opts{.read_ahead = 1}; + auto stream = ss::make_file_input_stream(std::move(file), opts); + return std::make_unique( + std::move(stream)); + }); + }, + las, + "SST file upload", + std::nullopt); + check_cloud_result(result); +} + +void throw_as_lsm_ex(std::exception_ptr ex, ss::sstring msg) { + try { + std::rethrow_exception(ex); + } catch (const std::system_error& e) { + throw io_error_exception(e.code(), "{}: {}", msg, e); + } catch (...) { + if (ssx::is_shutdown_exception(ex)) { + throw abort_requested_exception("{}: {}", msg, ex); + } + throw io_error_exception("{}: {}", msg, ex); + } +} + +cloud_data_persistence_base::cloud_data_persistence_base( + cloud_io::remote* remote, + cloud_storage_clients::bucket_name bucket, + cloud_storage_clients::object_key prefix) + : _remote(remote) + , _bucket(std::move(bucket)) + , _prefix(std::move(prefix)) {} + +ss::future<> cloud_data_persistence_base::remove_file(internal::file_handle h) { + _as.check(); + auto _ = _gate.hold(); + auto rtc = make_cloud_rtc(_as); + auto filename = internal::sst_file_name(h); + + cloud_io::upload_result result{}; + try { + co_await remove_file_locally(filename); + result = co_await _remote->delete_object({ + .bucket = _bucket, + .key = cloud_key(filename), + .parent_rtc = rtc, + }); + } catch (const std::system_error& e) { + if (e.code() != std::errc::no_such_file_or_directory) { + throw io_error_exception(e.code(), "io error removing file: {}", e); + } + } catch (...) { + auto ex = std::current_exception(); + if (ssx::is_shutdown_exception(ex)) { + throw abort_requested_exception( + "shutdown exception while removing file: {}", ex); + } + throw io_error_exception("io error removing file: {}", ex); + } + check_cloud_result(result); +} + +ss::coroutine::experimental::generator +cloud_data_persistence_base::list_files() { + _as.check(); + auto _ = _gate.hold(); + auto rtc = make_cloud_rtc(_as); + cloud_io::list_result result = cloud_storage_clients::error_outcome::fail; + try { + result = co_await _remote->list_objects(_bucket, rtc, _prefix); + } catch (...) { + auto ex = std::current_exception(); + if (ssx::is_shutdown_exception(ex)) { + throw abort_requested_exception( + "shutdown exception while listing files: {}", ex); + } + throw io_error_exception("io error listing files: {}", ex); + } + if (result.has_error()) { + throw io_error_exception("io error listing files: {}", result.error()); + } + for (const auto& item : result.value().contents) { + auto suffix = std::filesystem::path(item.key).lexically_relative( + _prefix()); + if (suffix.has_parent_path() || !suffix.has_filename()) { + continue; + } + auto file_id = internal::parse_sst_file_name( + suffix.filename().native()); + if (!file_id) { + continue; + } + co_yield *file_id; + _as.check(); + } +} + +ss::future<> cloud_data_persistence_base::close() { + _as.request_abort_ex( + abort_requested_exception("cloud persistence layer shutdown")); + co_await _gate.close(); +} + +cloud_storage_clients::object_key +cloud_data_persistence_base::cloud_key(std::string_view name) { + return join_cloud_key(_prefix, name); +} + +} // namespace lsm::io diff --git a/src/v/lsm/io/cloud_data_persistence_base.h b/src/v/lsm/io/cloud_data_persistence_base.h new file mode 100644 index 0000000000000..6d3bc5d49e984 --- /dev/null +++ b/src/v/lsm/io/cloud_data_persistence_base.h @@ -0,0 +1,92 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "cloud_io/io_result.h" +#include "cloud_storage_clients/types.h" +#include "lsm/io/persistence.h" +#include "model/fundamental.h" +#include "utils/retry_chain_node.h" +#include "utils/stream_provider.h" + +#include +#include +#include +#include + +#include +#include +#include + +namespace cloud_io { +class remote; +} // namespace cloud_io + +namespace lsm::io { + +[[noreturn]] void throw_as_lsm_ex(std::exception_ptr ex, ss::sstring msg); +retry_chain_node make_cloud_rtc(ss::abort_source& as); + +/// Returns true if the download succeeded and false if the object didn't exist. +/// Throws otherwise. +bool check_cloud_result(cloud_io::download_result); + +/// Throws an appropriate exception for the given result. +void check_cloud_result(cloud_io::upload_result); +cloud_storage_clients::object_key join_cloud_key( + const cloud_storage_clients::object_key& prefix, std::string_view suffix); + +ss::future<> upload_file( + cloud_io::remote& remote, + ss::abort_source& as, + const cloud_storage_clients::bucket_name& bucket, + const cloud_storage_clients::object_key& cloud_key, + const std::filesystem::path& local_path, + size_t written); + +struct one_time_stream_provider : public stream_provider { + explicit one_time_stream_provider(ss::input_stream s); + ss::input_stream take_stream() override; + ss::future<> close() override; + std::optional> _st; +}; + +/// Shared base for data_persistence implementations that store SST files +/// in cloud storage. Subclasses may differ in how they stage data locally +/// (e.g. staging directory vs cloud cache). +class cloud_data_persistence_base : public data_persistence { +public: + cloud_data_persistence_base( + cloud_io::remote* remote, + cloud_storage_clients::bucket_name bucket, + cloud_storage_clients::object_key prefix); + + ss::future<> remove_file(internal::file_handle h) override; + + ss::coroutine::experimental::generator + list_files() override; + + ss::future<> close() override; + +protected: + /// Called before deleting from cloud. + virtual ss::future<> remove_file_locally(std::string_view filename) = 0; + + cloud_storage_clients::object_key cloud_key(std::string_view name); + + cloud_io::remote* _remote; + cloud_storage_clients::bucket_name _bucket; + cloud_storage_clients::object_key _prefix; + ss::abort_source _as; + ss::gate _gate; +}; + +} // namespace lsm::io diff --git a/src/v/lsm/io/cloud_persistence.cc b/src/v/lsm/io/cloud_persistence.cc index d7eb4bff55a7c..6cd5e750fd9df 100644 --- a/src/v/lsm/io/cloud_persistence.cc +++ b/src/v/lsm/io/cloud_persistence.cc @@ -12,17 +12,17 @@ #include "cloud_io/io_result.h" #include "cloud_storage_clients/types.h" -#include "config/configuration.h" #include "lsm/core/exceptions.h" #include "lsm/core/internal/files.h" +#include "lsm/io/cloud_data_persistence_base.h" #include "lsm/io/file_io.h" #include "lsm/io/persistence.h" #include "ssx/future-util.h" -#include "utils/retry_chain_node.h" #include "utils/uuid.h" #include #include +#include #include #include @@ -32,70 +32,6 @@ namespace lsm::io { namespace { -retry_chain_node make_rtc(ss::abort_source& as) { - constexpr auto timeout = std::chrono::seconds(10); - auto backoff - = config::shard_local_cfg().cloud_storage_initial_backoff_ms.value(); - return retry_chain_node{ - as, - timeout, - backoff, - }; -} - -bool check_result(cloud_io::download_result result) { - switch (result) { - case cloud_io::download_result::success: - return true; - case cloud_io::download_result::notfound: - return false; - case cloud_io::download_result::timedout: - throw io_error_exception( - std::make_error_code(std::errc::timed_out), - "timeout reading from cloud storage"); - case cloud_io::download_result::failed: - throw io_error_exception("failed to read from cloud storage"); - } -} - -void check_result(cloud_io::upload_result result) { - switch (result) { - case cloud_io::upload_result::success: - return; - case cloud_io::upload_result::timedout: - throw io_error_exception( - std::make_error_code(std::errc::timed_out), - "timeout reading from cloud storage"); - case cloud_io::upload_result::failed: - throw io_error_exception("failed to write to cloud storage"); - case cloud_io::upload_result::cancelled: - throw io_error_exception("cloud storage write cancelled"); - } -} - -cloud_storage_clients::object_key -join(const cloud_storage_clients::object_key& prefix, std::string_view suffix) { - return cloud_storage_clients::object_key(prefix() / suffix); -} - -// TODO: deduplicate, expose from cloud storage -struct one_time_stream_provider : public stream_provider { - explicit one_time_stream_provider(ss::input_stream s) - : _st(std::move(s)) {} - - ss::input_stream take_stream() override { - auto tmp = std::exchange(_st, std::nullopt); - return std::move(tmp.value()); - } - ss::future<> close() override { - if (_st.has_value()) { - return _st->close().then([this] { _st = std::nullopt; }); - } - return ss::now(); - } - std::optional> _st; -}; - class staged_file_writer : public disk_seq_file_writer { public: staged_file_writer( @@ -118,24 +54,14 @@ class staged_file_writer : public disk_seq_file_writer { ss::future<> close() override { co_await disk_seq_file_writer::close(); - try { - co_await upload().then_wrapped([this](ss::future<> fut) { - // If the upload fails, then we need to delete the staged file. - if (fut.failed()) { - return fut.finally( - [this] { return ss::remove_file(path().native()); }); - } - return fut; - }); - } catch (const std::system_error& err) { - throw io_error_exception(err.code(), "io error closing: {}", err); - } catch (...) { - auto ex = std::current_exception(); - if (ssx::is_shutdown_exception(ex)) { - throw abort_requested_exception( - "shutdown exception while closing: {}", ex); - } - throw io_error_exception("io error closing: {}", ex); + auto upload_fut = co_await ss::coroutine::as_future( + upload_file(*_remote, *_as, _bucket, _key, path(), _written)); + if (upload_fut.failed()) { + auto ex = upload_fut.get_exception(); + co_await ss::remove_file(path().native()) + .then_wrapped( + [](ss::future<> fut) { fut.ignore_ready_future(); }); + throw_as_lsm_ex(ex, "io error closing"); } } @@ -146,37 +72,6 @@ class staged_file_writer : public disk_seq_file_writer { } private: - ss::future<> upload() { - auto root = make_rtc(*_as); - lazy_abort_source las{[this] { - return _as->abort_requested() - ? std::make_optional("abort requested") - : std::nullopt; - }}; - // TODO: Ensure the file doesn't yet exist with CAS - auto result = co_await _remote->upload_stream( - { - .bucket = _bucket, - .key = _key, - .parent_rtc = root, - }, - _written, - [this]() { - return ss::open_file_dma(path().native(), ss::open_flags::ro) - .then([](ss::file file) -> std::unique_ptr { - ss::file_input_stream_options opts{.read_ahead = 1}; - auto stream = ss::make_file_input_stream( - std::move(file), opts); - return std::make_unique( - std::move(stream)); - }); - }, - las, - "SST file upload", - std::nullopt); - check_result(result); - } - size_t _written = 0; cloud_io::remote* _remote; ss::abort_source* _as; @@ -184,7 +79,7 @@ class staged_file_writer : public disk_seq_file_writer { cloud_storage_clients::object_key _key; }; -class cloud_data_persistence : public data_persistence { +class cloud_data_persistence : public cloud_data_persistence_base { public: cloud_data_persistence( std::filesystem::path staging, @@ -192,10 +87,9 @@ class cloud_data_persistence : public data_persistence { cloud_storage_clients::bucket_name bucket, cloud_storage_clients::object_key prefix, ss::sstring staging_prefix) - : _staging(std::move(staging)) - , _remote(remote) - , _bucket(std::move(bucket)) - , _prefix(std::move(prefix)) + : cloud_data_persistence_base( + remote, std::move(bucket), std::move(prefix)) + , _staging(std::move(staging)) , _staging_prefix(std::move(staging_prefix)) {} ss::future> @@ -208,7 +102,7 @@ class cloud_data_persistence : public data_persistence { if (reader) { co_return reader; } - auto root = make_rtc(_as); + auto root = make_cloud_rtc(_as); cloud_io::download_result result{}; try { result = co_await _remote->download_stream( @@ -235,7 +129,7 @@ class cloud_data_persistence : public data_persistence { } throw io_error_exception("io error downloading file: {}", ex); } - if (check_result(result)) { + if (check_cloud_result(result)) { co_return co_await open_local_reader(filepath); } else { co_return std::nullopt; @@ -252,7 +146,7 @@ class cloud_data_persistence : public data_persistence { auto file = ss::open_file_dma( filepath.native(), ss::open_flags::create | ss::open_flags::rw - | ss::open_flags::truncate); + | ss::open_flags::exclusive); auto stream = co_await ss::with_file_close_on_failure( std::move(file), [](ss::file f) { return ss::make_file_output_stream( @@ -278,77 +172,8 @@ class cloud_data_persistence : public data_persistence { } } - ss::future<> remove_file(internal::file_handle h) override { - _as.check(); - auto _ = _gate.hold(); - auto rtc = make_rtc(_as); - auto filename = internal::sst_file_name(h); - cloud_io::upload_result result{}; - try { - co_await ss::remove_file(staging_path(filename).native()); - result = co_await _remote->delete_object({ - .bucket = _bucket, - .key = cloud_key(filename), - .parent_rtc = rtc, - }); - } catch (const std::system_error& e) { - if (e.code() != std::errc::no_such_file_or_directory) { - throw io_error_exception( - e.code(), "io error removing file: {}", e); - } - } catch (...) { - auto ex = std::current_exception(); - if (ssx::is_shutdown_exception(ex)) { - throw abort_requested_exception( - "shutdown exception while removing file: {}", ex); - } - throw io_error_exception("io error removing file: {}", ex); - } - check_result(result); - } - - ss::coroutine::experimental::generator - list_files() override { - _as.check(); - auto _ = _gate.hold(); - auto rtc = make_rtc(_as); - // TODO: Consider merging with on disk file listing. - cloud_io::list_result result - = cloud_storage_clients::error_outcome::fail; - try { - result = co_await _remote->list_objects(_bucket, rtc, _prefix); - } catch (...) { - auto ex = std::current_exception(); - if (ssx::is_shutdown_exception(ex)) { - throw abort_requested_exception( - "shutdown exception while listing files: {}", ex); - } - throw io_error_exception("io error listing files: {}", ex); - } - if (result.has_error()) { - throw io_error_exception( - "io error listing files: {}", result.error()); - } - for (const auto& item : result.value().contents) { - auto suffix = std::filesystem::path(item.key).lexically_relative( - _prefix()); - if (suffix.has_parent_path() || !suffix.has_filename()) { - continue; - } - auto file_id = internal::parse_sst_file_name( - suffix.filename().native()); - if (!file_id) { - continue; - } - co_yield *file_id; - _as.check(); - } - } - - ss::future<> close() override { - _as.request_abort_ex( - abort_requested_exception("cloud persistence layer shutdown")); - co_await _gate.close(); + ss::future<> remove_file_locally(std::string_view filename) override { + co_await ss::remove_file(staging_path(filename).native()); } private: @@ -433,17 +258,8 @@ class cloud_data_persistence : public data_persistence { return _staging / fmt::format("{}-{}", _staging_prefix, name); } - cloud_storage_clients::object_key cloud_key(std::string_view name) { - return join(_prefix, name); - } - std::filesystem::path _staging; - cloud_io::remote* _remote; - cloud_storage_clients::bucket_name _bucket; - cloud_storage_clients::object_key _prefix; ss::sstring _staging_prefix; - ss::abort_source _as; - ss::gate _gate; }; class cloud_metadata_persistence : public metadata_persistence { @@ -460,7 +276,7 @@ class cloud_metadata_persistence : public metadata_persistence { read_manifest(internal::database_epoch epoch) override { _as.check(); auto _ = _gate.hold(); - auto rtc = make_rtc(_as); + auto rtc = make_cloud_rtc(_as); auto max_key = manifest_key(epoch); auto keys = co_await list_manifests(); // list_manifests gives you biggest to smallest key, so find the first @@ -481,15 +297,15 @@ class cloud_metadata_persistence : public metadata_persistence { .payload = b, .expect_missing = true, }); - co_return check_result(result) ? std::make_optional(std::move(b)) - : std::nullopt; + co_return check_cloud_result(result) ? std::make_optional(std::move(b)) + : std::nullopt; } ss::future<> write_manifest(internal::database_epoch epoch, iobuf b) override { _as.check(); auto _ = _gate.hold(); - auto rtc = make_rtc(_as); + auto rtc = make_cloud_rtc(_as); auto my_key = manifest_key(epoch); auto result = co_await _remote->upload_object({ .transfer_details = { @@ -500,7 +316,7 @@ class cloud_metadata_persistence : public metadata_persistence { .display_str = "LSM Manifest upload", .payload = std::move(b), }); - check_result(result); + check_cloud_result(result); // Now cleanup old manifests chunked_vector keys_to_delete; for (const auto& key : co_await list_manifests()) { @@ -523,7 +339,7 @@ class cloud_metadata_persistence : public metadata_persistence { ss::future> list_manifests() { using namespace cloud_storage_clients; - auto rtc = make_rtc(_as); + auto rtc = make_cloud_rtc(_as); auto list_result = co_await _remote->list_objects( _bucket, rtc, manifest_prefix()); if (list_result.has_error()) { diff --git a/src/v/lsm/io/disk_persistence.cc b/src/v/lsm/io/disk_persistence.cc index d47aea7c5c812..7f47147d2ced7 100644 --- a/src/v/lsm/io/disk_persistence.cc +++ b/src/v/lsm/io/disk_persistence.cc @@ -70,7 +70,7 @@ class impl auto file = ss::open_file_dma( filepath.native(), ss::open_flags::create | ss::open_flags::rw - | ss::open_flags::truncate); + | ss::open_flags::exclusive); auto stream = co_await ss::with_file_close_on_failure( std::move(file), [](ss::file& f) { return ss::make_file_output_stream( diff --git a/src/v/lsm/io/memory_persistence.cc b/src/v/lsm/io/memory_persistence.cc index 2abbf96d18290..0a4c2dd07b7b7 100644 --- a/src/v/lsm/io/memory_persistence.cc +++ b/src/v/lsm/io/memory_persistence.cc @@ -148,10 +148,12 @@ class data_impl : public data_persistence { if (_controller && _controller->should_fail) { throw io_error_exception("injected error"); } - auto it = _data.try_emplace( + auto [it, inserted] = _data.try_emplace( h, ss::make_lw_shared(h)); - co_return std::make_unique( - it.first->second); + if (!inserted) { + throw io_error_exception("file already exists: {}", h); + } + co_return std::make_unique(it->second); } ss::future<> remove_file(internal::file_handle h) override { diff --git a/src/v/lsm/io/persistence.h b/src/v/lsm/io/persistence.h index a633785534d70..c82e7b6ff09ec 100644 --- a/src/v/lsm/io/persistence.h +++ b/src/v/lsm/io/persistence.h @@ -119,8 +119,8 @@ class data_persistence { open_random_access_reader(internal::file_handle) = 0; // Create a writer that writes to a new file with the specified name. - // - // Deletes any existing file with the same name and creates a new file. + // The file_handle must be unique — callers must not write to a handle + // that has already been committed. virtual ss::future> open_sequential_writer(internal::file_handle) = 0; diff --git a/src/v/lsm/io/tests/BUILD b/src/v/lsm/io/tests/BUILD index 9a7ba20808686..742f8b044c0a9 100644 --- a/src/v/lsm/io/tests/BUILD +++ b/src/v/lsm/io/tests/BUILD @@ -27,15 +27,18 @@ redpanda_cc_gtest( cpu = 1, deps = [ "//src/v/base", + "//src/v/cloud_io:cache", "//src/v/cloud_io:remote", "//src/v/cloud_io/tests:s3_imposter", "//src/v/cloud_io/tests:scoped_remote", "//src/v/cloud_storage_clients", "//src/v/lsm/core/internal:files", + "//src/v/lsm/io:cloud_cache_persistence", "//src/v/lsm/io:cloud_persistence", "//src/v/lsm/io:disk_persistence", "//src/v/lsm/io:memory_persistence", "//src/v/lsm/io:persistence", + "//src/v/storage:disk", "//src/v/test_utils:gtest", "//src/v/utils:uuid", "@googletest//:gtest", diff --git a/src/v/lsm/io/tests/persistence_test.cc b/src/v/lsm/io/tests/persistence_test.cc index 20f73519ad97e..d4667043e21e3 100644 --- a/src/v/lsm/io/tests/persistence_test.cc +++ b/src/v/lsm/io/tests/persistence_test.cc @@ -9,18 +9,23 @@ * by the Apache License, Version 2.0 */ +#include "cloud_io/cache_service.h" #include "cloud_io/remote.h" #include "cloud_io/tests/s3_imposter.h" #include "cloud_io/tests/scoped_remote.h" #include "cloud_storage_clients/types.h" #include "lsm/core/internal/files.h" +#include "lsm/io/cloud_cache_persistence.h" #include "lsm/io/cloud_persistence.h" #include "lsm/io/disk_persistence.h" #include "lsm/io/memory_persistence.h" #include "lsm/io/persistence.h" +#include "storage/disk.h" +#include "test_utils/tmp_dir.h" #include "utils/uuid.h" #include +#include #include #include @@ -95,18 +100,28 @@ TEST_P(PersistenceTest, ListFiles) { EXPECT_THAT(list_files().get(), testing::UnorderedElementsAreArray(files)); } -TEST_P(PersistenceTest, OverwriteFile) { - for (int i = 0; i < 3; ++i) { +// Writing to the same file handle twice must not corrupt the original data. +TEST_P(PersistenceTest, DuplicateWritePreservesOriginal) { + { + auto w = persistence->open_sequential_writer({}).get(); + auto _ = ss::defer([&w] { w->close().get(); }); + w->append(iobuf::from("original data")).get(); + } + // A second write to the same handle may throw or silently no-op + // depending on the backend. Either way, the original must survive. + try { auto w = persistence->open_sequential_writer({}).get(); auto _ = ss::defer([&w] { w->close().get(); }); - w->append(iobuf::from(fmt::format("hello, world: {}", i))).get(); + w->append(iobuf::from("replacement")).get(); + } catch (...) { + // Expected for backends that use O_EXCL. } auto maybe_r = persistence->open_random_access_reader({}).get(); ASSERT_TRUE(bool(maybe_r)); auto r = std::move(*maybe_r); auto _ = ss::defer([&r] { r->close().get(); }); - auto buf = r->read(0, 15).get(); - EXPECT_EQ(buf.as_iobuf(), iobuf::from("hello, world: 2")) + auto buf = r->read(0, 13).get(); + EXPECT_EQ(buf.as_iobuf(), iobuf::from("original data")) << buf.as_iobuf().hexdump(32); } @@ -295,6 +310,58 @@ class mock_cloud_data_persistence : public data_persistence { std::unique_ptr impl_; }; +// Wrapper for cloud_cache persistence that owns the cache and s3 imposter +// lifecycle alongside the real implementation. +class mock_cloud_cache_data_persistence : public data_persistence { +public: + mock_cloud_cache_data_persistence( + std::unique_ptr fixture, + std::unique_ptr sr, + std::unique_ptr cache_tmpdir, + std::unique_ptr> cache, + std::unique_ptr impl) + : fixture_(std::move(fixture)) + , sr_(std::move(sr)) + , cache_tmpdir_(std::move(cache_tmpdir)) + , cache_(std::move(cache)) + , impl_(std::move(impl)) {} + + ss::future> + open_sequential_writer(lsm_internal::file_handle handle) override { + return impl_->open_sequential_writer(handle); + } + + ss::future> + open_random_access_reader(lsm_internal::file_handle handle) override { + return impl_->open_random_access_reader(handle); + } + + ss::future<> remove_file(lsm_internal::file_handle handle) override { + co_await impl_->remove_file(handle); + fixture_->remove_expectations( + chunked_vector::single( + fmt::format( + "test-prefix/{}", lsm_internal::sst_file_name(handle)))); + } + + ss::coroutine::experimental::generator + list_files() override { + return impl_->list_files(); + } + + ss::future<> close() override { + co_await impl_->close(); + co_await cache_->stop(); + } + +private: + std::unique_ptr fixture_; + std::unique_ptr sr_; + std::unique_ptr cache_tmpdir_; + std::unique_ptr> cache_; + std::unique_ptr impl_; +}; + INSTANTIATE_TEST_SUITE_P( PersistenceSuite, PersistenceTest, @@ -324,6 +391,47 @@ INSTANTIATE_TEST_SUITE_P( co_return std::make_unique( std::move(fixture), std::move(sr), std::move(impl)); + }, + []() -> ss::future> { + auto fixture = std::make_unique(); + fixture->set_expectations_and_listen({}); + auto sr = cloud_io::scoped_remote::create(10, fixture->conf); + + auto cache_tmpdir = std::make_unique( + "cloud_cache_persistence_test"); + auto cache_dir = cache_tmpdir->get_path() / "cache"; + co_await cloud_io::cache::initialize(cache_dir); + + auto cache = std::make_unique>(); + co_await cache->start( + cache_dir, + 30_GiB, + config::mock_binding(0.0), + config::mock_binding(100_MiB), + config::mock_binding>(std::nullopt), + config::mock_binding(100000), + config::mock_binding(3)); + co_await cache->invoke_on_all( + [](cloud_io::cache& c) { return c.start(); }); + co_await cache->invoke_on(ss::shard_id{0}, [](cloud_io::cache& c) { + c.notify_disk_status( + 100ULL * 1024 * 1024 * 1024, + 50ULL * 1024 * 1024 * 1024, + storage::disk_space_alert::ok); + }); + + auto impl = co_await open_cloud_cache_data_persistence( + &cache->local(), + &sr->remote.local(), + fixture->bucket_name, + cloud_storage_clients::object_key("test-prefix")); + + co_return std::make_unique( + std::move(fixture), + std::move(sr), + std::move(cache_tmpdir), + std::move(cache), + std::move(impl)); }), [](const testing::TestParamInfo& info) { switch (info.index) { @@ -333,6 +441,8 @@ INSTANTIATE_TEST_SUITE_P( return "disk"; case 2: return "cloud"; + case 3: + return "cloud_cache"; default: return "unknown"; }