Skip to content

Commit

Permalink
Enhance BaselineResync UT
Browse files Browse the repository at this point in the history
  • Loading branch information
yuwmao committed Feb 19, 2025
1 parent 17fdb38 commit 13b5911
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 53 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.5"
version = "2.2.6"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
24 changes: 20 additions & 4 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,29 @@ target_link_libraries(homestore_test_dynamic PUBLIC
${COMMON_TEST_DEPS}
)

add_test(NAME HomestoreTestDynamic
add_test(NAME HomestoreTestReplaceMember
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:0)
--override_config homestore_config.consensus.snapshot_freq_distance:0
--gtest_filter=HomeObjectFixture.ReplaceMember)

# To test both baseline & incremental resync functionality, we use 13 to minimize the likelihood of it being a divisor of the total LSN (currently 30)
add_test(NAME HomestoreTestDynamicWithResync
add_test(NAME HomestoreTestReplaceMemberWithBaselineResync
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13
--override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000)
--override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000
--gtest_filter=HomeObjectFixture.ReplaceMember)

add_test(NAME HomestoreResyncTestWithFollowerRestart
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13
--override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000
--gtest_filter=HomeObjectFixture.RestartFollower*)

add_test(NAME HomestoreResyncTestWithLeaderRestart
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13
--override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000
--gtest_filter=HomeObjectFixture.RestartLeader*)
3 changes: 2 additions & 1 deletion src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
LOGW("Simulating loading blob data error");
return false;
}
auto delay = iomgr_flip::instance()->get_test_flip< long >("read_snapshot_load_blob_latency", static_cast<long>(info.blob_id));
auto delay = iomgr_flip::instance()->get_test_flip< long >("simulate_read_snapshot_load_blob_delay", static_cast<long>(info.blob_id));
LOGD("simulate_read_snapshot_load_blob_delay flip, triggered: {}, blob: {}", delay.has_value(), info.blob_id);
if (delay) {
LOGI("Simulating pg blob iterator load data with delay, delay:{}, blob_id:{}", delay.get(),
info.blob_id);
Expand Down
14 changes: 11 additions & 3 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ ReplicationStateMachine::create_snapshot(std::shared_ptr< homestore::snapshot_co
}

bool ReplicationStateMachine::apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) {
#ifdef _PRERELEASE
auto delay = iomgr_flip::instance()->get_test_flip< long >("simulate_apply_snapshot_delay");
LOGD("simulate_apply_snapshot_delay flip, triggered: {}", delay.has_value());
if (delay) {
LOGI("Simulating apply snapshot with delay, delay:{}", delay.get());
std::this_thread::sleep_for(std::chrono::milliseconds(delay.get()));
}
#endif
// TODO persist snapshot
m_snp_rcv_handler->destroy_context();

Expand Down Expand Up @@ -326,7 +334,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
if (!m_snp_rcv_handler) {
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev);
if (m_snp_rcv_handler->load_prev_context()) {
LOGI("Reloaded previous snapshot context, lsn:{} pg_id:{} shard:{}", context->get_lsn(),
LOGI("Reloaded previous snapshot context, lsn:{} pg_id:{} next_shard:{}", context->get_lsn(),
m_snp_rcv_handler->get_context_pg_id(), m_snp_rcv_handler->get_next_shard());
}
}
Expand Down Expand Up @@ -375,8 +383,8 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
m_snp_rcv_handler->get_shard_cursor() == HSHomeObject::SnapshotReceiveHandler::shard_list_end_marker
? LAST_OBJ_ID
: objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_shard_cursor()), 0).value;
LOGI("Resume from previous context breakpoint, lsn:{} pg_id:{} shard:{}", context->get_lsn(),
pg_data->pg_id(), m_snp_rcv_handler->get_next_shard());
LOGI("Resume from previous context breakpoint, lsn:{} pg_id:{} next_shard:{}, shard_cursor:{}", context->get_lsn(),
pg_data->pg_id(), m_snp_rcv_handler->get_next_shard(), m_snp_rcv_handler->get_shard_cursor());
return;
}

Expand Down
41 changes: 38 additions & 3 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ class HomeObjectFixture : public ::testing::Test {
// TODO:make this run in parallel
void verify_get_blob(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec,
uint64_t const num_blobs_per_shard, bool const use_random_offset = false,
bool const wait_when_not_exist = false,
std::map<pg_id_t, blob_id_t> pg_start_blob_id = std::map<pg_id_t, blob_id_t>()) {
uint32_t off = 0, len = 0;

Expand All @@ -324,6 +325,11 @@ class HomeObjectFixture : public ::testing::Test {
}

auto g = _obj_inst->blob_manager()->get(shard_id, current_blob_id, off, len).get();
while (wait_when_not_exist && g.hasError() && g.error().code == BlobErrorCode::UNKNOWN_BLOB) {
LOGDEBUG("blob not exist at the moment, waiting for sync, shard {} blob {}", shard_id, current_blob_id);
wait_for_blob(shard_id, current_blob_id);
g = _obj_inst->blob_manager()->get(shard_id, current_blob_id, off, len).get();
}
ASSERT_TRUE(!!g) << "get blob fail, shard_id " << shard_id << " blob_id " << current_blob_id
<< " replica number " << g_helper->replica_num();
auto result = std::move(g.value());
Expand Down Expand Up @@ -433,6 +439,30 @@ class HomeObjectFixture : public ::testing::Test {
// TODO: add logic for check and retry of leader change if necessary
}

peer_id_t get_leader_id(pg_id_t pg_id) {
if (!am_i_in_pg(pg_id)) return uuids::nil_uuid();
while(true) {
PGStats pg_stats;
auto res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats);
if (!res || pg_stats.leader_id.is_nil()) {
LOGINFO("fail to get leader, retry later");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
continue;
}
return pg_stats.leader_id;
}
}

bool wait_for_leader_change(pg_id_t pg_id, peer_id_t old_leader) {
if (old_leader.is_nil())return false;
while (true) {
auto leader = get_leader_id(pg_id);
if (old_leader != leader) { return true; }
LOGDEBUG("leader not change, leader_id={}", leader);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

void run_if_in_pg(pg_id_t pg_id, auto&& lambda) {
if (am_i_in_pg(pg_id)) lambda();
}
Expand All @@ -448,9 +478,13 @@ class HomeObjectFixture : public ::testing::Test {
}

// wait for the last blob to be created locally, which means all the blob before this blob are created
void wait_for_all(shard_id_t shard_id, blob_id_t blob_id) {
void wait_for_blob(shard_id_t shard_id, blob_id_t blob_id) {
while (true) {
if (blob_exist(shard_id, blob_id)) return;
if (blob_exist(shard_id, blob_id)) {
LOGINFO("shard {} blob {} is created locally, which means all the blob before {} are created", shard_id,
blob_id, blob_id);
return;
}
std::this_thread::sleep_for(1s);
}
}
Expand Down Expand Up @@ -539,7 +573,8 @@ void set_retval_flip(const std::string flip_name, const T retval,
LOGINFO("Flip {} removed", flip_name);
}
#endif
void RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval);
void RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval, string restart_phase);
void RestartLeaderDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval, string restart_phase);
private:
std::random_device rnd{};
std::default_random_engine rnd_engine{rnd()};
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/tests/hs_repl_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ class HSReplTestHelper {
void teardown() { sisl::GrpcAsyncClientWorker::shutdown_all(); }

void sync() {
LOGINFO("=== Syncing: replica={}(total {}), sync_point_num={} ===", replica_num_, total_replicas_nums_,
LOGINFO("=== Syncing: replica={}(total {}), sync_point_num={} ===", ipc_data_->homeobject_replica_count_, total_replicas_nums_,
sync_point_num);
ipc_data_->sync(sync_point_num++, total_replicas_nums_);
}
Expand Down
Loading

0 comments on commit 13b5911

Please sign in to comment.