Skip to content

Commit 6e2fa5c

Browse files
yuwmaoyuwmao
andauthored
SDSTOR-21438: Adjust commit_quorum reduction ordering (#875)
When two members are down, the raft leader yields leadership after leadership_expiry (20x heartbeat). reset_quorum_size must be called before the NOT_LEADER check so the node can maintain/reclaim leadership with election_quorum=1. Previously, the TwoMemberDown UT called replace_member immediately, so the leadership doesn't expired. Sleep 10s to simulate leader expiry. Co-authored-by: yuwmao <[email protected]>
1 parent 805eac1 commit 6e2fa5c

4 files changed

Lines changed: 79 additions & 33 deletions

File tree

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomestoreConan(ConanFile):
1111
name = "homestore"
12-
version = "7.5.2"
12+
version = "7.5.3"
1313

1414
homepage = "https://github.com/eBay/Homestore"
1515
description = "HomeStore Storage Engine"

src/lib/replication/repl_dev/raft_repl_dev.cpp

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,40 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
219219
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
220220

221221
// Step1, validate request
222+
bool is_leader = m_my_repl_id == get_leader_id();
223+
// Check if leader itself is requested to move out.
224+
if (m_my_repl_id == member_out.id) {
225+
// immediate=false successor=-1, nuraft will choose an alive peer with highest priority as successor, and wait
226+
// until the successor finishes the catch-up of the latest log, and then resign. Return NOT_LEADER and let
227+
// client retry.
228+
if (is_leader) {
229+
RD_LOGI(trace_id, "Step1. Replace member, leader is the member_out so yield leadership, task_id={}", task_id);
230+
raft_server()->yield_leadership(false /* immediate */, -1 /* successor */);
231+
}
232+
RD_LOGE(trace_id, "Step1. Replace member, I am not leader, can not handle the request, task_id={}", task_id);
233+
return make_async_error<>(ReplServiceError::NOT_LEADER);
234+
}
235+
if (commit_quorum >= 1) {
236+
// Reduce the quorum size BEFORE checking leadership. When two members are down, the raft leader will
237+
// eventually yield its leadership after leadership_expiry (default: 20x heartbeat interval) because it
238+
// cannot reach majority. Once leadership is lost, the remaining single node cannot elect itself without a
239+
// reduced election quorum. By calling reset_quorum_size here first (which sets both custom_commit_quorum
240+
// and custom_election_quorum to 1), the current leader is able to maintain leadership, and if leadership
241+
// was already lost, the node will self-elect on the next election timeout. The caller should retry on
242+
// NOT_LEADER to allow time for self-election to complete.
243+
reset_quorum_size(commit_quorum, trace_id);
244+
}
245+
246+
if (!is_leader) { return make_async_error<>(ReplServiceError::NOT_LEADER); }
247+
248+
// I am leader and not out_member
222249
// TODO support rollback, this could happen when the first task failed, and we want to launch a new task to
223250
// remediate it. Need to rollback the first task. And for the same task, it's reentrant and idempotent.
224251
auto existing_task_id = get_replace_member_task_id();
225252
if (!existing_task_id.empty() && existing_task_id != task_id) {
226253
RD_LOGE(trace_id, "Step1. Replace member, task_id={} is not the same as existing task_id={}", task_id,
227254
existing_task_id);
255+
reset_quorum_size(0, trace_id);
228256
return make_async_error<>(ReplServiceError::REPLACE_MEMBER_TASK_MISMATCH);
229257
}
230258

@@ -239,18 +267,10 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
239267
return make_async_success<>();
240268
}
241269
RD_LOGE(trace_id, "Step1. Replace member invalid parameter, out member is not found, task_id={}", task_id);
270+
reset_quorum_size(0, trace_id);
242271
return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND);
243272
}
244-
if (m_my_repl_id != get_leader_id()) { return make_async_error<>(ReplServiceError::NOT_LEADER); }
245-
// Check if leader itself is requested to move out.
246-
if (m_my_repl_id == member_out.id) {
247-
// immediate=false successor=-1, nuraft will choose an alive peer with highest priority as successor, and wait
248-
// until the successor finishes the catch-up of the latest log, and then resign. Return NOT_LEADER and let
249-
// client retry.
250-
raft_server()->yield_leadership(false /* immediate */, -1 /* successor */);
251-
RD_LOGI(trace_id, "Step1. Replace member, leader is the member_out so yield leadership, task_id={}", task_id);
252-
return make_async_error<>(ReplServiceError::NOT_LEADER);
253-
}
273+
254274
// quorum safety check. TODO currently only consider lsn, need to check last response time.
255275
auto active_peers = get_active_peers();
256276
// active_peers doesn't include leader itself.
@@ -272,18 +292,15 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
272292
"Step1. Replace member, quorum safety check failed, active_peers={}, "
273293
"active_peers_exclude_out/in_member={}, required_quorum={}, commit_quorum={}, task_id={}",
274294
active_peers.size(), active_num, quorum, commit_quorum, task_id);
295+
reset_quorum_size(0, trace_id);
275296
return make_async_error<>(ReplServiceError::QUORUM_NOT_MET);
276297
}
277298

278-
if (commit_quorum >= 1) {
279-
// Two members are down and leader cant form the quorum. Reduce the quorum size.
280-
reset_quorum_size(commit_quorum, trace_id);
281-
}
282-
283299
// Step 2: Handle out member.
284300
#ifdef _PRERELEASE
285301
if (iomgr_flip::instance()->test_flip("replace_member_set_learner_failure")) {
286302
RD_LOGE(trace_id, "Simulating set member to learner failure");
303+
reset_quorum_size(0, trace_id);
287304
return make_async_error(ReplServiceError::FAILED);
288305
}
289306
#endif

src/tests/test_common/raft_repl_test_base.hpp

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -780,24 +780,33 @@ class RaftReplDevTestBase : public testing::Test {
780780
void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); }
781781
repl_lsn_t get_truncation_upper_limit() { return dbs_[0]->get_truncation_upper_limit(); }
782782

783+
void do_replace_member(std::shared_ptr< TestReplicatedDB > db, std::string& task_id, replica_id_t member_out,
784+
replica_id_t member_in, uint32_t commit_quorum,
785+
ReplServiceError error = ReplServiceError::OK) {
786+
LOGINFO("Start replace member task_id={}, out={}, in={}", task_id, boost::uuids::to_string(member_out),
787+
boost::uuids::to_string(member_in));
788+
replica_member_info out{member_out, ""};
789+
replica_member_info in{member_in, ""};
790+
auto result =
791+
hs()->repl_service().replace_member(db->repl_dev()->group_id(), task_id, out, in, commit_quorum).get();
792+
if (error == ReplServiceError::OK) {
793+
ASSERT_EQ(result.hasError(), false) << "Error in replacing member, err=" << result.error();
794+
} else {
795+
ASSERT_EQ(result.hasError(), true);
796+
ASSERT_EQ(result.error(), error) << "Error in replacing member, err=" << result.error();
797+
}
798+
}
799+
783800
void replace_member(std::shared_ptr< TestReplicatedDB > db, std::string& task_id, replica_id_t member_out,
784801
replica_id_t member_in, uint32_t commit_quorum = 0,
785802
ReplServiceError error = ReplServiceError::OK) {
786-
this->run_on_leader(db, [this, error, db, &task_id, member_out, member_in, commit_quorum]() {
787-
LOGINFO("Start replace member task_id={}, out={}, in={}", task_id, boost::uuids::to_string(member_out),
788-
boost::uuids::to_string(member_in));
789-
790-
replica_member_info out{member_out, ""};
791-
replica_member_info in{member_in, ""};
792-
auto result =
793-
hs()->repl_service().replace_member(db->repl_dev()->group_id(), task_id, out, in, commit_quorum).get();
794-
if (error == ReplServiceError::OK) {
795-
ASSERT_EQ(result.hasError(), false) << "Error in replacing member, err=" << result.error();
796-
} else {
797-
ASSERT_EQ(result.hasError(), true);
798-
ASSERT_EQ(result.error(), error) << "Error in replacing member, err=" << result.error();
799-
}
800-
});
803+
if (commit_quorum == 0) {
804+
this->run_on_leader(db, [this, error, db, &task_id, member_out, member_in, commit_quorum]() {
805+
do_replace_member(db, task_id, member_out, member_in, commit_quorum, error);
806+
});
807+
} else {
808+
do_replace_member(db, task_id, member_out, member_in, commit_quorum, error);
809+
}
801810
}
802811

803812
void remove_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_id) {

src/tests/test_raft_repl_dev_dynamic.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,33 @@ TEST_F(ReplDevDynamicTest, TwoMemberDown) {
198198
LOGINFO("Shutdown replica 2");
199199
}
200200

201+
LOGINFO("Sleep 10 seconds to waiting for leadership expiring");
202+
sleep(10);
201203
std::string task_id = "task_id";
202204
if (g_helper->replica_num() == 0) {
203205
// Replace down replica 2 with spare replica 3 with commit quorum 1
204206
// so that leader can go ahead with replacing member.
207+
// After reset_quorum_size(1) is applied, the node may need one election timeout
208+
// to self-elect as leader, so retry on NOT_LEADER.
205209
LOGINFO("Replace member started, task_id={}", task_id);
206-
replace_member(db, task_id, g_helper->replica_id(member_out), g_helper->replica_id(member_in),
207-
1 /* commit quorum*/);
210+
constexpr int max_retries = 3;
211+
bool succeeded = false;
212+
for (int i = 0; i < max_retries; ++i) {
213+
auto result = hs()->repl_service()
214+
.replace_member(db->repl_dev()->group_id(), task_id,
215+
replica_member_info{g_helper->replica_id(member_out), ""},
216+
replica_member_info{g_helper->replica_id(member_in), ""}, 1)
217+
.get();
218+
if (!result.hasError()) {
219+
succeeded = true;
220+
break;
221+
}
222+
ASSERT_EQ(result.error(), ReplServiceError::NOT_LEADER)
223+
<< "Replace member failed with unexpected error: " << result.error();
224+
LOGINFO("Replace member returned NOT_LEADER, retry {}/{}", i + 1, max_retries);
225+
std::this_thread::sleep_for(std::chrono::seconds(2));
226+
}
227+
ASSERT_TRUE(succeeded) << "Replace member failed after " << max_retries << " retries";
208228
this->write_on_leader(num_io_entries, true /* wait_for_commit */);
209229
LOGINFO("Leader completed num_io={}", num_io_entries);
210230
}

0 commit comments

Comments
 (0)