diff --git a/conanfile.py b/conanfile.py index 7165e32..c7fdc46 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.2.5" + version = "2.2.6" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index bf8cc66..2d28402 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -69,13 +69,29 @@ target_link_libraries(homestore_test_dynamic PUBLIC ${COMMON_TEST_DEPS} ) -add_test(NAME HomestoreTestDynamic +add_test(NAME HomestoreTestReplaceMember COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ - --override_config homestore_config.consensus.snapshot_freq_distance:0) + --override_config homestore_config.consensus.snapshot_freq_distance:0 + --gtest_filter=HomeObjectFixture.ReplaceMember) # To test both baseline & incremental resync functionality, we use 13 to minimize the likelihood of it being a divisor of the total LSN (currently 30) -add_test(NAME HomestoreTestDynamicWithResync +add_test(NAME HomestoreTestReplaceMemberWithBaselineResync COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:13 --override_config homestore_config.consensus.num_reserved_log_items=13 - --override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000) + --override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000 + --gtest_filter=HomeObjectFixture.ReplaceMember) + +add_test(NAME HomestoreResyncTestWithFollowerRestart + COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ + --override_config homestore_config.consensus.snapshot_freq_distance:13 + --override_config homestore_config.consensus.num_reserved_log_items=13 + --override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000 + --gtest_filter=HomeObjectFixture.RestartFollower*) + +add_test(NAME HomestoreResyncTestWithLeaderRestart + COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ + --override_config homestore_config.consensus.snapshot_freq_distance:13 + --override_config homestore_config.consensus.num_reserved_log_items=13 + --override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000 + --gtest_filter=HomeObjectFixture.RestartLeader*) diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index cb87e2f..027b79f 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -245,7 +245,8 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe LOGW("Simulating loading blob data error"); return false; } - auto delay = iomgr_flip::instance()->get_test_flip< long >("read_snapshot_load_blob_latency", static_cast(info.blob_id)); + auto delay = iomgr_flip::instance()->get_test_flip< long >("simulate_read_snapshot_load_blob_delay", static_cast(info.blob_id)); + LOGD("simulate_read_snapshot_load_blob_delay flip, triggered: {}, blob: {}", delay.has_value(), info.blob_id); if (delay) { LOGI("Simulating pg blob iterator load data with delay, delay:{}, blob_id:{}", delay.get(), info.blob_id); diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 35de0fd..be9087c 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -225,6 +225,14 @@ ReplicationStateMachine::create_snapshot(std::shared_ptr< homestore::snapshot_co } bool ReplicationStateMachine::apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) { + #ifdef _PRERELEASE + auto delay = iomgr_flip::instance()->get_test_flip< long >("simulate_apply_snapshot_delay"); + LOGD("simulate_apply_snapshot_delay flip, triggered: {}", delay.has_value()); + if (delay) { + LOGI("Simulating apply snapshot with delay, delay:{}", delay.get()); + std::this_thread::sleep_for(std::chrono::milliseconds(delay.get())); + } + #endif // TODO persist snapshot m_snp_rcv_handler->destroy_context(); @@ -326,7 +334,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna if (!m_snp_rcv_handler) { m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev); if (m_snp_rcv_handler->load_prev_context()) { - LOGI("Reloaded previous snapshot context, lsn:{} pg_id:{} shard:{}", context->get_lsn(), + LOGI("Reloaded previous snapshot context, lsn:{} pg_id:{} next_shard:{}", context->get_lsn(), m_snp_rcv_handler->get_context_pg_id(), m_snp_rcv_handler->get_next_shard()); } } @@ -375,8 +383,8 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna m_snp_rcv_handler->get_shard_cursor() == HSHomeObject::SnapshotReceiveHandler::shard_list_end_marker ? LAST_OBJ_ID : objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_shard_cursor()), 0).value; - LOGI("Resume from previous context breakpoint, lsn:{} pg_id:{} shard:{}", context->get_lsn(), - pg_data->pg_id(), m_snp_rcv_handler->get_next_shard()); + LOGI("Resume from previous context breakpoint, lsn:{} pg_id:{} next_shard:{}, shard_cursor:{}", context->get_lsn(), + pg_data->pg_id(), m_snp_rcv_handler->get_next_shard(), m_snp_rcv_handler->get_shard_cursor()); return; } diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index 82fae3f..7053a40 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -302,6 +302,7 @@ class HomeObjectFixture : public ::testing::Test { // TODO:make this run in parallel void verify_get_blob(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, bool const use_random_offset = false, + bool const wait_when_not_exist = false, std::map pg_start_blob_id = std::map()) { uint32_t off = 0, len = 0; @@ -324,6 +325,11 @@ class HomeObjectFixture : public ::testing::Test { } auto g = _obj_inst->blob_manager()->get(shard_id, current_blob_id, off, len).get(); + while (wait_when_not_exist && g.hasError() && g.error().code == BlobErrorCode::UNKNOWN_BLOB) { + LOGDEBUG("blob not exist at the moment, waiting for sync, shard {} blob {}", shard_id, current_blob_id); + wait_for_blob(shard_id, current_blob_id); + g = _obj_inst->blob_manager()->get(shard_id, current_blob_id, off, len).get(); + } ASSERT_TRUE(!!g) << "get blob fail, shard_id " << shard_id << " blob_id " << current_blob_id << " replica number " << g_helper->replica_num(); auto result = std::move(g.value()); @@ -433,6 +439,30 @@ class HomeObjectFixture : public ::testing::Test { // TODO: add logic for check and retry of leader change if necessary } + peer_id_t get_leader_id(pg_id_t pg_id) { + if (!am_i_in_pg(pg_id)) return uuids::nil_uuid(); + while(true) { + PGStats pg_stats; + auto res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats); + if (!res || pg_stats.leader_id.is_nil()) { + LOGINFO("fail to get leader, retry later"); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + continue; + } + return pg_stats.leader_id; + } + } + + bool wait_for_leader_change(pg_id_t pg_id, peer_id_t old_leader) { + if (old_leader.is_nil())return false; + while (true) { + auto leader = get_leader_id(pg_id); + if (old_leader != leader) { return true; } + LOGDEBUG("leader not change, leader_id={}", leader); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } + void run_if_in_pg(pg_id_t pg_id, auto&& lambda) { if (am_i_in_pg(pg_id)) lambda(); } @@ -448,9 +478,13 @@ class HomeObjectFixture : public ::testing::Test { } // wait for the last blob to be created locally, which means all the blob before this blob are created - void wait_for_all(shard_id_t shard_id, blob_id_t blob_id) { + void wait_for_blob(shard_id_t shard_id, blob_id_t blob_id) { while (true) { - if (blob_exist(shard_id, blob_id)) return; + if (blob_exist(shard_id, blob_id)) { + LOGINFO("shard {} blob {} is created locally, which means all the blob before {} are created", shard_id, + blob_id, blob_id); + return; + } std::this_thread::sleep_for(1s); } } @@ -539,7 +573,8 @@ void set_retval_flip(const std::string flip_name, const T retval, LOGINFO("Flip {} removed", flip_name); } #endif - void RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval); + void RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval, string restart_phase); + void RestartLeaderDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval, string restart_phase); private: std::random_device rnd{}; std::default_random_engine rnd_engine{rnd()}; diff --git a/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp b/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp index 0794a09..e7ce345 100644 --- a/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp +++ b/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp @@ -300,7 +300,7 @@ class HSReplTestHelper { void teardown() { sisl::GrpcAsyncClientWorker::shutdown_all(); } void sync() { - LOGINFO("=== Syncing: replica={}(total {}), sync_point_num={} ===", replica_num_, total_replicas_nums_, + LOGINFO("=== Syncing: replica={}(total {}), sync_point_num={} ===", ipc_data_->homeobject_replica_count_, total_replicas_nums_, sync_point_num); ipc_data_->sync(sync_point_num++, total_replicas_nums_); } diff --git a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp index 0369f32..6bde150 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp @@ -17,6 +17,11 @@ */ #include "homeobj_fixture.hpp" +#define RECEIVING_SNAPSHOT "RECEIVING_SNAPSHOT" +#define APPLYING_SNAPSHOT "APPLYING_SNAPSHOT" +#define AFTER_BASELINE_RESYNC "AFTER_BASELINE_RESYNC" +#define TRUNCATING_LOGS "TRUNCATING_LOGS" + TEST_F(HomeObjectFixture, ReplaceMember) { LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num()); auto spare_num_replicas = SISL_OPTIONS["spare_replicas"].as< uint8_t >(); @@ -101,7 +106,7 @@ TEST_F(HomeObjectFixture, ReplaceMember) { LOGINFO("new member is waiting to become a member of pg {}", pg_id); } - wait_for_all(pg_shard_id_vec[pg_id].back() /*the last shard id in this pg*/, + wait_for_blob(pg_shard_id_vec[pg_id].back() /*the last shard id in this pg*/, num_shards_per_pg * num_blobs_per_shard - 1 /*the last blob id in this pg*/); sleep(5); // wait for incremental append-log requests to complete @@ -150,54 +155,66 @@ TEST_F(HomeObjectFixture, ReplaceMember) { } // Restart during baseline resync -TEST_F(HomeObjectFixture, RestartFollowerDuringBaselineResync) { - RestartFollowerDuringBaselineResyncUsingSigKill(10000, 1000); +TEST_F(HomeObjectFixture, RestartFollowerDuringBaselineResyncWithoutTimeout) { + RestartFollowerDuringBaselineResyncUsingSigKill(10000, 1000, RECEIVING_SNAPSHOT); } -// Restart during baseline resync and timeout +// Restart follower during baseline resync and timeout. Also test resumption and conflict with incoming traffic. +// Take default value as example, num_shards_per_pg=4, num_blobs_per_shard=5. +//1. Put 1st round blobs in the pg: shard1(blob 0-4), shard2(blob 5-9), shard3(blob 10-14), shard4(blob 15-19). +//2. Then replace a member, the new member will perform baseline resync. +//3.a During the baseline resync(BR), restart it when blob id is 11(in shard3) to simulate a follower crash. +//3.b Then put 2nd round blobs to each shard, shard1(blob 0-4, 20), shard2(blob 5-9, 21), shard3(blob 10-14, 22), shard4(blob 15-19, 23). +//4. After restart, new member recovers from the context(skip shard1(blob 0-4), shard2(blob 5-9)) and resume to receive shard3(blob 10-14, 22), shard4(blob 15-19, 23). +//5. After the baseline resync is completed, start incremental append-log requests: logs for blob 20-21 will fetch data, and logs for blob 22-23 will skip. +//Note: pay attention to num_reserved_log_items and snapshot_freq_distance, if a new snapshot triggered when handling the 2nd round put blobs, +//logs will be truncated and a new BR will be triggered in incremental resync stage. TEST_F(HomeObjectFixture, RestartFollowerDuringBaselineResyncAndTimeout) { - RestartFollowerDuringBaselineResyncUsingSigKill(10000, 10000); + RestartFollowerDuringBaselineResyncUsingSigKill(10000, 10000, RECEIVING_SNAPSHOT); +} + +// Restart follower when applying snapshot and timeout +// FIXME Currently, incremental resync will fail due to bad term when append log entries(root cause is lack of last snapshot metadata) +// TEST_F(HomeObjectFixture, RestartFollowerWhenApplyingSnapshot) { +// RestartFollowerDuringBaselineResyncUsingSigKill(10000, 10000, APPLYING_SNAPSHOT); +// } + +// Restart follower during baseline resync and timeout +TEST_F(HomeObjectFixture, RestartFollowerAfterBaselineResync) { + RestartFollowerDuringBaselineResyncUsingSigKill(10000, 1000, AFTER_BASELINE_RESYNC); } +// Restart follower when truncating logs +// TEST_F(HomeObjectFixture, RestartFollowerWhenTruncatingLogs) { +// RestartFollowerDuringBaselineResyncUsingSigKill(10000, 1000, TRUNCATING_LOGS); +// } + // Test case to restart new member during baseline resync, it will start 4 process to simulate the 4 replicas, let's say // P0, P1, P2 and P3. P0, P1, P2 are the original members of the pg, P3 is the spare replica. After the replace_member // happens, P3 will join the pg, and then kill itself(sigkill) to simulate the restart during baseline resync. As P0 is // the original process who spawn the other 3 processes, so P0 will also help to spawn a new process to simulate the new // member restart. -void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, - uint64_t restart_interval) { +void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval, + string restart_phase) { LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num()); auto spare_num_replicas = SISL_OPTIONS["spare_replicas"].as< uint8_t >(); ASSERT_TRUE(spare_num_replicas > 0) << "we need spare replicas for homestore backend dynamic tests"; auto is_restart = g_helper->is_current_testcase_restarted(); auto num_replicas = SISL_OPTIONS["replicas"].as< uint8_t >(); + auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >(); + auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg; pg_id_t pg_id{1}; - -#ifdef _PRERELEASE - if (!is_restart) { - // simulate delay in snapshot read data - flip::FlipCondition cond; - blob_id_t blob_id = 7; - m_fc.create_condition("blob id", flip::Operator::EQUAL, static_cast< long >(blob_id), &cond); - // This latency simulation is used to workaround the shutdown concurrency issue - // set_retval_flip("read_snapshot_load_blob_latency", static_cast(flip_delay) /*ms*/, 10, 100, cond1); - // simulate delay in snapshot write data - set_retval_flip("write_snapshot_save_blob_latency", static_cast< long >(flip_delay) /*ms*/, 1, 100, cond); - } -#endif + auto out_member_id = g_helper->replica_id(num_replicas - 1); + auto in_member_id = g_helper->replica_id(num_replicas); /*spare replica*/ // ======== Stage 1: Create a pg without spare replicas and put blobs ======== - std::unordered_set< uint8_t > excluding_replicas_in_pg; for (size_t i = num_replicas; i < num_replicas + spare_num_replicas; i++) excluding_replicas_in_pg.insert(i); create_pg(pg_id, 0 /* pg_leader */, excluding_replicas_in_pg); - auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >(); - auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg; - // we can not share all the shard_id and blob_id among all the replicas including the spare ones, so we need to // derive them by calculating. // since shard_id = pg_id + shard_sequence_num, so we can derive shard_ids for all the shards in this pg, and these @@ -209,8 +226,39 @@ void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t auto derived_shard_id = make_new_shard_id(pg_id, shard_id); pg_shard_id_vec[pg_id].emplace_back(derived_shard_id); } + auto last_shard = pg_shard_id_vec[pg_id].back(); + //put one more blob in every shard to test incremental resync. + auto last_blob = num_blobs_per_shard * num_shards_per_pg + num_shards_per_pg - 1; - if (!is_restart) { + auto kill_until_shard = pg_shard_id_vec[pg_id].back(); + auto kill_until_blob = num_blobs_per_shard * num_shards_per_pg - 1; +#ifdef _PRERELEASE + if (!is_restart && in_member_id == g_helper->my_replica_id()) { + if (restart_phase == RECEIVING_SNAPSHOT) { + LOGINFO("Test case: restart follower when receiving snapshot: {}", restart_phase); + flip::FlipCondition cond; + // will only delay the snapshot with blob id 11 during which restart will happen + m_fc.create_condition("blob_id", flip::Operator::EQUAL, static_cast< long >(11), &cond); + set_retval_flip("simulate_write_snapshot_save_blob_delay", static_cast< long >(flip_delay) /*ms*/, 1, 100, cond); + //kill after the last blob in the first shard is replicated + kill_until_shard = pg_shard_id_vec[pg_id].front(); + kill_until_blob = num_blobs_per_shard - 1; + } else if (restart_phase == APPLYING_SNAPSHOT) { + LOGINFO("Test case: restart follower when applying snapshot: {}", restart_phase); + set_retval_flip("simulate_apply_snapshot_delay", static_cast< long >(flip_delay) /*ms*/, 1, 100); + } else if (restart_phase == TRUNCATING_LOGS) { + flip::FlipCondition cond; + // TODO This flip should be added in homestore + m_fc.create_condition("compact_lsn", flip::Operator::EQUAL, static_cast< long >(26), &cond); + LOGINFO("Test case: restart follower when truncating logs: {}", restart_phase); + set_retval_flip("simulate_truncate_log_delay", static_cast< long >(flip_delay) /*ms*/, 1, 100, cond); + } else { + LOGWARN("restart after baseline resync: {}", restart_phase); + } + } +#endif + + if(!is_restart) { for (uint64_t j = 0; j < num_shards_per_pg; j++) create_shard(pg_id, 64 * Mi); @@ -224,8 +272,6 @@ void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t g_helper->sync(); // ======== Stage 2: replace a member ======== - auto out_member_id = g_helper->replica_id(num_replicas - 1); - auto in_member_id = g_helper->replica_id(num_replicas); /*spare replica*/ run_on_pg_leader(pg_id, [&]() { auto r = _obj_inst->pg_manager() @@ -237,13 +283,14 @@ void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t // ======== Stage 3: the new member will kill itself to simulate restart, then P0 will help start it ======== if (in_member_id == g_helper->my_replica_id()) { while (!am_i_in_pg(pg_id)) { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); LOGINFO("new member is waiting to become a member of pg {}", pg_id); } - wait_for_all(pg_shard_id_vec[pg_id].front() /*the first shard id in this pg*/, - num_blobs_per_shard - 1 /*the last blob id in this shard*/); - LOGINFO("the data in the first shard has been replicated to the new member"); + LOGDEBUG("wait for the data[shard:{}, blob:{}] replicated to the new member", + kill_until_shard, kill_until_blob); + wait_for_blob(kill_until_shard, kill_until_blob); LOGINFO("about to kill new member") + sleep(3); // SyncPoint 1(new member): kill itself. g_helper->sync(); kill(); @@ -252,17 +299,22 @@ void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t // SyncPoint 1(others): wait for the new member stop, then P0 will help start it. LOGINFO("waiting for new member stop") g_helper->sync(); + if (g_helper->replica_num() == 0) { // wait for kill std::this_thread::sleep_for(std::chrono::milliseconds(restart_interval)); LOGINFO("going to restart new member") g_helper->spawn_homeobject_process(num_replicas, true); } - // SyncPoint 2(others): wait for restart completed, new member will call g_helper->sync() at setup func to end - // up this stage implicitly. - LOGINFO("waiting for new member start up") - g_helper->sync(); - // SyncPoint 3: waiting for all the blobs are replicated to the new member + + // SyncPoint 2: put more blobs when the new member is restarted. + // g_helper->sync() will be called in new process setup and pub_blobs implicitly. + LOGINFO("going to put more blobs") + pg_blob_id[pg_id] = num_blobs_per_shard * num_shards_per_pg; + put_blobs(pg_shard_id_vec, 1, pg_blob_id); + if (out_member_id != g_helper->my_replica_id()) { wait_for_blob(last_shard, last_blob); } + + // SyncPoint 3: wait for new member to verify all blobs. g_helper->sync(); } else { // new member restart @@ -270,17 +322,176 @@ void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t std::this_thread::sleep_for(std::chrono::milliseconds(1000)); LOGINFO("new member is waiting to become a member of pg {}", pg_id); } - wait_for_all(pg_shard_id_vec[pg_id].back() /*the first shard id in this pg*/, - num_shards_per_pg * num_blobs_per_shard - 1 /*the last blob id in this shard*/); run_if_in_pg(pg_id, [&]() { - verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); - verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); + wait_for_blob(last_shard, last_blob); + // 1st round blobs + verify_get_blob(pg_shard_id_vec, num_blobs_per_shard, false, true); + // 2nd round blobs + pg_blob_id[pg_id] = num_blobs_per_shard * num_shards_per_pg; + verify_get_blob(pg_shard_id_vec, 1, false, true, pg_blob_id); + verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard + 1, false); }); // SyncPoint 3(new member): replication done, notify others. g_helper->sync(); } } +TEST_F(HomeObjectFixture, RestartLeaderDuringBaselineResync) { + RestartLeaderDuringBaselineResyncUsingSigKill(10000, 1000, RECEIVING_SNAPSHOT); +} + +TEST_F(HomeObjectFixture, RestartLeaderWhenApplySnapshot) { + RestartLeaderDuringBaselineResyncUsingSigKill(10000, 1000, APPLYING_SNAPSHOT); +} + +TEST_F(HomeObjectFixture, RestartLeaderAfterBaselineResync) { + RestartLeaderDuringBaselineResyncUsingSigKill(10000, 1000, AFTER_BASELINE_RESYNC); +} + +//Need to restart the leader for process sync +void HomeObjectFixture::RestartLeaderDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval, + string restart_phase) { + LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num()); + auto spare_num_replicas = SISL_OPTIONS["spare_replicas"].as< uint8_t >(); + ASSERT_TRUE(spare_num_replicas > 0) << "we need spare replicas for homestore backend dynamic tests"; + + auto is_restart = g_helper->is_current_testcase_restarted(); + auto num_replicas = SISL_OPTIONS["replicas"].as< uint8_t >(); + pg_id_t pg_id{1}; + + std::unordered_set< uint8_t > excluding_replicas_in_pg; + for (size_t i = num_replicas; i < num_replicas + spare_num_replicas; i++) + excluding_replicas_in_pg.insert(i); + + auto initial_leader_replica_num = 1; + create_pg(pg_id, initial_leader_replica_num /* pg_leader */, excluding_replicas_in_pg); + peer_id_t initial_leader_replica_id = get_leader_id(pg_id); + + auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >(); + auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg; + + // we can not share all the shard_id and blob_id among all the replicas including the spare ones, so we need to + // derive them by calculating. + // since shard_id = pg_id + shard_sequence_num, so we can derive shard_ids for all the shards in this pg, and these + // derived info is used by all replicas(including the newly added member) to verify the blobs. + std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; + std::map< pg_id_t, blob_id_t > pg_blob_id; + pg_blob_id[pg_id] = 0; + for (shard_id_t shard_id = 1; shard_id <= num_shards_per_pg; shard_id++) { + auto derived_shard_id = make_new_shard_id(pg_id, shard_id); + pg_shard_id_vec[pg_id].emplace_back(derived_shard_id); + } + + auto last_shard = pg_shard_id_vec[pg_id].back(); + //put one more blob in every shard to test incremental resync. + auto last_blob = num_blobs_per_shard * num_shards_per_pg + num_shards_per_pg - 1; + if(!is_restart) { + auto kill_until_shard = pg_shard_id_vec[pg_id].back(); + auto kill_until_blob = num_blobs_per_shard * num_shards_per_pg - 1; + if (restart_phase == RECEIVING_SNAPSHOT) { + //kill after the last blob in the first shard is replicated + kill_until_shard = pg_shard_id_vec[pg_id].front(); + kill_until_blob = num_blobs_per_shard - 1; + } +#ifdef _PRERELEASE + if (initial_leader_replica_num == g_helper->replica_num()) { + if (restart_phase == RECEIVING_SNAPSHOT) { + LOGINFO("restart when receiving snapshot: {}, kill_until_shard={}, kill_until_blob={}", restart_phase, + kill_until_shard, kill_until_blob); + flip::FlipCondition cond; + // will only delay the snapshot with blob id 7 during which restart will happen + m_fc.create_condition("blob_id", flip::Operator::EQUAL, static_cast< long >(7), &cond); + set_retval_flip("simulate_read_snapshot_load_blob_delay", static_cast< long >(flip_delay) /*ms*/, 1, 100, + cond); + } else if (restart_phase == APPLYING_SNAPSHOT) { + LOGINFO("restart when applying snapshot: {}", restart_phase); + set_retval_flip("simulate_apply_snapshot_delay", static_cast< long >(flip_delay) /*ms*/, 1, 100); + } else { LOGWARN("restart after baseline resync: {}", restart_phase); } + } +#endif + + // ========Stage 1: Create a pg without spare replicas and put blobs======== + + for (uint64_t j = 0; j < num_shards_per_pg; j++) + create_shard(pg_id, 64 * Mi); + + // put and verify blobs in the pg, excluding the spare replicas + put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id); + + verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); + verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); + + // all the replicas , including the spare ones, sync at this point + g_helper->sync(); + + // ========Stage 2: replace a member======== + auto out_member_id = g_helper->replica_id(num_replicas - 1); + auto in_member_id = g_helper->replica_id(num_replicas); /*spare replica*/ + + run_on_pg_leader(pg_id, [&]() { + auto r = _obj_inst->pg_manager() + ->replace_member(pg_id, out_member_id, + PGMember{in_member_id, "new_member", 0}) + .get(); + ASSERT_TRUE(r); + }); + + // ========Stage 3: kill leader, no need to restart it again ======== + if (in_member_id == g_helper->my_replica_id()) { + while (!am_i_in_pg(pg_id)) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + LOGINFO("new member is waiting to become a member of pg {}", pg_id); + } + initial_leader_replica_id = get_leader_id(pg_id); + LOGDEBUG("wait for the data[shard:{}, blob:{}] replicated to the new member", kill_until_shard, + kill_until_blob); + wait_for_blob(kill_until_shard, kill_until_blob); + } else if (initial_leader_replica_num == g_helper->replica_num()) { + //SyncPoint 1(leader) + g_helper->sync(); + LOGINFO("going to kill leader"); + kill(); + } + // SyncPoint 1: tell leader to kill + g_helper->sync(); + + if (out_member_id != g_helper->my_replica_id()) { + //SyncPoint 2: wait for leader ready for traffic. + wait_for_leader_change(pg_id, initial_leader_replica_id); + } + //start a new thread to spawn process, help write + + if (g_helper->replica_num() == 0) { + std::thread spawn_thread([restart_interval, initial_leader_replica_num]() { + std::this_thread::sleep_for(std::chrono::milliseconds(restart_interval)); + LOGINFO("going to restart replica {}", initial_leader_replica_num) + g_helper->spawn_homeobject_process(initial_leader_replica_num, true); + }); + spawn_thread.detach(); + } + // g_helper->sync(); + LOGINFO("going to put more blobs") + pg_blob_id[pg_id] = num_blobs_per_shard * num_shards_per_pg; + put_blobs(pg_shard_id_vec, 1, pg_blob_id); + if (out_member_id != g_helper->my_replica_id()) { wait_for_blob(last_shard, last_blob); } + + if (in_member_id == g_helper->my_replica_id()) { + run_if_in_pg(pg_id, [&]() { + LOGDEBUG("verify blobs"); + // 1st round blobs + verify_get_blob(pg_shard_id_vec, num_blobs_per_shard, false, true); + // // 2nd round blobs + pg_blob_id[pg_id] = num_blobs_per_shard * num_shards_per_pg; + verify_get_blob(pg_shard_id_vec, 1, false, true, pg_blob_id); + verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard + 1, false); + }); + } + } + LOGINFO("wait for all blobs replicated to the new member") + // SyncPoint 3: waiting for all the blobs replicated to the new member + g_helper->sync(); +} + SISL_OPTION_GROUP( test_homeobject_repl_common, (spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"),