Skip to content

Commit

Permalink
Enhance dynamic ut framework (#253)
Browse files Browse the repository at this point in the history
* Enhance dynamic ut framework
- Use sigkill to replace graceful shutdown
- Exclude RestartFollowerDuringBaselineResyncAndTimeout due to reconfigure issue.
  • Loading branch information
yuwmao authored Jan 20, 2025
1 parent dbc0d71 commit 17416c1
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 21 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.2"
version = "2.2.3"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ add_test(NAME HomestoreTestDynamic

# To test both baseline & incremental resync functionality, we use 13 to minimize the likelihood of it being a divisor of the total LSN (currently 30)
add_test(NAME HomestoreTestDynamicWithResync
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
COMMAND homestore_test_dynamic --gtest_filter="HomeObjectFixture.ReplaceMember" -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13)
6 changes: 6 additions & 0 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
LOGW("Simulating loading blob data error");
return false;
}
auto delay = iomgr_flip::instance()->get_test_flip< long >("read_snapshot_load_blob_latency", static_cast<long>(info.blob_id));
if (delay) {
LOGI("Simulating pg blob iterator load data with delay, delay:{}, blob_id:{}", delay.get(),
info.blob_id);
std::this_thread::sleep_for(std::chrono::milliseconds(delay.get()));
}
#endif
sisl::io_blob_safe blob;
uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
Expand Down
10 changes: 10 additions & 0 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
continue;
}

#ifdef _PRERELEASE
auto delay = iomgr_flip::instance()->get_test_flip< long >("write_snapshot_save_blob_latency",
static_cast< long >(blob->blob_id()));
if (delay) {
LOGI("Simulating pg snapshot receive data with delay, delay:{}, blob_id:{}", delay.get(),
blob->blob_id());
std::this_thread::sleep_for(std::chrono::milliseconds(delay.get()));
}
#endif

// Check duplication to avoid reprocessing. This may happen on resent blob batches.
if (!ctx_->index_table) {
std::shared_lock lock_guard(home_obj_._pg_lock);
Expand Down
40 changes: 37 additions & 3 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,35 @@ class HomeObjectFixture : public ::testing::Test {
g_helper->delete_homeobject();
}

void restart() {
void restart(uint32_t shutdown_delay_secs = 0u, uint32_t restart_delay_secs = 0u) {
g_helper->sync();
trigger_cp(true);
_obj_inst.reset();
_obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->restart(shutdown_delay_secs, restart_delay_secs));
// wait for leader to be elected
std::this_thread::sleep_for(std::chrono::seconds(5));
}

void stop() {
LOGINFO("Stoping homeobject replica={}", g_helper->my_replica_id());
_obj_inst.reset();
g_helper->homeobj_.reset();
sleep(120);
}

void start() {
LOGINFO("Starting homeobject replica={}", g_helper->my_replica_id());
_obj_inst.reset();
_obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->restart());
// wait for leader to be elected
std::this_thread::sleep_for(std::chrono::seconds(5));
}

void kill() {
LOGINFO("SigKilling homeobject replica={}", g_helper->my_replica_id());
std::raise(SIGKILL);
}

/**
* \brief create pg with a given id.
*
Expand Down Expand Up @@ -281,14 +301,17 @@ class HomeObjectFixture : public ::testing::Test {

// TODO:make this run in parallel
void verify_get_blob(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec,
uint64_t const num_blobs_per_shard, bool const use_random_offset = false) {
uint64_t const num_blobs_per_shard, bool const use_random_offset = false,
std::map<pg_id_t, blob_id_t> pg_start_blob_id = std::map<pg_id_t, blob_id_t>()) {
uint32_t off = 0, len = 0;

for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) {
if (!am_i_in_pg(pg_id)) continue;
blob_id_t current_blob_id{0};
if (pg_start_blob_id.find(pg_id) != pg_start_blob_id.end()) current_blob_id = pg_start_blob_id[pg_id];
for (const auto& shard_id : shard_vec) {
for (uint64_t k = 0; k < num_blobs_per_shard; k++) {
LOGDEBUG("going to verify blob pg {} shard {} blob {}", pg_id, shard_id, current_blob_id);
auto blob = build_blob(current_blob_id);
len = blob.body.size();
if (use_random_offset) {
Expand Down Expand Up @@ -491,6 +514,17 @@ class HomeObjectFixture : public ::testing::Test {
LOGINFO("Flip {} set", flip_name);
}

template <typename T>
void set_retval_flip(const std::string flip_name, const T retval,
uint32_t count = 1, uint32_t percent = 100, flip::FlipCondition cond = flip::FlipCondition())
{
flip::FlipFrequency freq;
freq.set_count(count);
freq.set_percent(percent);
ASSERT_TRUE(m_fc.inject_retval_flip(flip_name, {cond}, freq, retval));
LOGINFO("Flip {} with returned value set, value={}", flip_name, retval);
}

void set_delay_flip(const std::string flip_name, uint64_t delay_usec, uint32_t count = 1, uint32_t percent = 100) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
Expand All @@ -505,7 +539,7 @@ class HomeObjectFixture : public ::testing::Test {
LOGINFO("Flip {} removed", flip_name);
}
#endif

void RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval);
private:
std::random_device rnd{};
std::default_random_engine rnd_engine{rnd()};
Expand Down
60 changes: 48 additions & 12 deletions src/lib/homestore_backend/tests/hs_repl_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,16 @@ class HSReplTestHelper {
ipc_data_ = new (region_->get_address()) IPCData;

for (uint32_t i{1}; i < num_replicas; ++i) {
LOGINFO("Spawning Homeobject replica={} instance", i);

std::string cmd_line;
fmt::format_to(std::back_inserter(cmd_line), "{} --replica_num {}", args_[0], i);
for (int j{1}; j < (int)args_.size(); ++j) {
fmt::format_to(std::back_inserter(cmd_line), " {}", args_[j]);
}
boost::process::child c(boost::process::cmd = cmd_line, proc_grp_);
c.detach();
spawn_homeobject_process(i, false);
}
} else {
shm_ = std::make_unique< bip::shared_memory_object >(bip::open_only, "HO_repl_test_shmem", bip::read_write);
region_ = std::make_unique< bip::mapped_region >(*shm_, bip::read_write);
ipc_data_ = static_cast< IPCData* >(region_->get_address());
if (SISL_OPTIONS["is_restart"].as<bool>()) {
// reset sync point to the next sync point before restart
sync_point_num = ipc_data_->sync_point_num_ + 1;
}
}

int tmp_argc = 1;
Expand All @@ -221,8 +217,41 @@ class HSReplTestHelper {
app = std::make_shared< TestReplApplication >(*this);
}

void spawn_homeobject_process(uint8_t replica_num, bool is_restart) {
std::string cmd_line;
fmt::format_to(std::back_inserter(cmd_line), "{} --replica_num {} --is_restart={}", args_[0], replica_num,
is_restart? "true" : "false");
if (is_restart) {
auto ut = testing::UnitTest::GetInstance();
std::string pattern = "";
bool is_following = false;
for (int i = 0; i < ut->total_test_suite_count(); i++) {
auto ts = ut->GetTestSuite(i);
for (int j = 0; j < ts->total_test_count(); j++) {
auto ti = ts->GetTestInfo(j);
if (!is_following && ti == ut->current_test_info()) { is_following = true; }
if (is_following && ti->should_run()) {
if (!pattern.empty()) { fmt::format_to(std::back_inserter(pattern), ":"); }
fmt::format_to(std::back_inserter(pattern), "{}", ti->test_case_name());
fmt::format_to(std::back_inserter(pattern), ".{}", ti->name());
break;
}
}
}
LOGINFO("Restart, gtest filter pattern: {}", pattern);
if ("" != pattern) { fmt::format_to(std::back_inserter(cmd_line), " --gtest_filter={}", pattern); }
}
for (int j{1}; j < (int)args_.size(); ++j) {
fmt::format_to(std::back_inserter(cmd_line), " {}", args_[j]);
}
LOGINFO("Spawning Homeobject cmd: {}", cmd_line);
boost::process::child c(boost::process::cmd = cmd_line, proc_grp_);
c.detach();
}

std::shared_ptr< homeobject::HomeObject > build_new_homeobject() {
prepare_devices();
auto is_restart = SISL_OPTIONS["is_restart"].as< bool >();
prepare_devices(!is_restart);
homeobj_ = init_homeobject(std::weak_ptr< TestReplApplication >(app));
return homeobj_;
}
Expand All @@ -233,9 +262,16 @@ class HSReplTestHelper {
remove_test_files();
}

std::shared_ptr< homeobject::HomeObject > restart(uint32_t shutdown_delay_secs = 5u) {
LOGINFO("Restarting homeobject replica={}", replica_num_);
std::shared_ptr< homeobject::HomeObject > restart(uint32_t shutdown_delay_secs = 0u, uint32_t restart_delay_secs = 0u) {
if (shutdown_delay_secs > 0) {
std::this_thread::sleep_for(std::chrono::seconds(shutdown_delay_secs));
}
LOGINFO("Stoping homeobject after {} secs, replica={}", shutdown_delay_secs, replica_num_);
homeobj_.reset();
if (restart_delay_secs > 0) {
std::this_thread::sleep_for(std::chrono::seconds(restart_delay_secs));
}
LOGINFO("Starting homeobject after {} secs, replica={}", restart_delay_secs, replica_num_);
homeobj_ = init_homeobject(std::weak_ptr< TestReplApplication >(app));
return homeobj_;
}
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/tests/test_homestore_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ SISL_OPTION_GROUP(
(qdepth, "", "qdepth", "Max outstanding operations", ::cxxopts::value< uint32_t >()->default_value("8"), "number"),
(num_pgs, "", "num_pgs", "number of pgs", ::cxxopts::value< uint64_t >()->default_value("2"), "number"),
(num_shards, "", "num_shards", "number of shards", ::cxxopts::value< uint64_t >()->default_value("4"), "number"),
(num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"));
(num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"),
(is_restart, "", "is_restart", "the process is restart or the first start", ::cxxopts::value< bool >()->
default_value("false"), "true or false"));

SISL_LOGGING_INIT(homeobject)
#define test_options logging, config, homeobject, test_homeobject_repl_common
Expand Down
131 changes: 128 additions & 3 deletions src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ TEST_F(HomeObjectFixture, ReplaceMember) {
// spare replica,
run_if_in_pg(pg_id, [&]() {
verify_get_blob(pg_shard_id_vec, num_blobs_per_shard);
verify_obj_count(1, num_blobs_per_shard, num_shards_per_pg, false);
verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false);
});

// step 5: Verify no pg related data in out_member
Expand All @@ -134,7 +134,7 @@ TEST_F(HomeObjectFixture, ReplaceMember) {
restart();
run_if_in_pg(pg_id, [&]() {
verify_get_blob(pg_shard_id_vec, num_blobs_per_shard);
verify_obj_count(1, num_blobs_per_shard, num_shards_per_pg, false);
verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false);
LOGINFO("After restart, check pg related data in pg members successfully");
});

Expand All @@ -149,6 +149,129 @@ TEST_F(HomeObjectFixture, ReplaceMember) {
}
}

//Restart during baseline resync and timeout
TEST_F(HomeObjectFixture, RestartFollowerDuringBaselineResyncAndTimeout) {
RestartFollowerDuringBaselineResyncUsingSigKill(10000, 10000);
}

// Test case to restart new member during baseline resync, it will start 4 process to simulate the 4 replicas, let's say P0, P1, P2 and P3.
// P0, P1, P2 are the original members of the pg, P3 is the spare replica.
// After the replace_member happens, P3 will join the pg, and then kill itself(sigkill) to simulate the restart during baseline resync.
// As P0 is the original process who spawn the other 3 processes, so P0 will also help to spawn a new process to simulate the new member restart.
void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval) {
LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num());
auto spare_num_replicas = SISL_OPTIONS["spare_replicas"].as< uint8_t >();
ASSERT_TRUE(spare_num_replicas > 0) << "we need spare replicas for homestore backend dynamic tests";

auto is_restart = SISL_OPTIONS["is_restart"].as< bool >();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint8_t >();
pg_id_t pg_id{1};
if(!is_restart) {
#ifdef _PRERELEASE
//simulate delay in snapshot read data
flip::FlipCondition cond;
blob_id_t blob_id = 7;
m_fc.create_condition("blob id", flip::Operator::EQUAL, static_cast<long>(blob_id), &cond);
//This latency simulation is used to workaround the shutdown concurrency issue
// set_retval_flip("read_snapshot_load_blob_latency", static_cast<long>(flip_delay) /*ms*/, 10, 100, cond1);
//simulate delay in snapshot write data
set_retval_flip("write_snapshot_save_blob_latency", static_cast<long>(flip_delay) /*ms*/, 1, 100, cond);

#endif
}
// ====================Stage 1: Create a pg without spare replicas and put blobs.====================

std::unordered_set< uint8_t > excluding_replicas_in_pg;
for (size_t i = num_replicas; i < num_replicas + spare_num_replicas; i++)
excluding_replicas_in_pg.insert(i);

create_pg(pg_id, 0 /* pg_leader */, excluding_replicas_in_pg);

auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >();
auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg;

// we can not share all the shard_id and blob_id among all the replicas including the spare ones, so we need to
// derive them by calculating.
// since shard_id = pg_id + shard_sequence_num, so we can derive shard_ids for all the shards in this pg, and these
// derived info is used by all replicas(including the newly added member) to verify the blobs.
std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec;
std::map< pg_id_t, blob_id_t > pg_blob_id;
pg_blob_id[pg_id] = 0;
for (shard_id_t shard_id = 1; shard_id <= num_shards_per_pg; shard_id++) {
auto derived_shard_id = make_new_shard_id(pg_id, shard_id);
pg_shard_id_vec[pg_id].emplace_back(derived_shard_id);
}

if(!is_restart) {
for (uint64_t j = 0; j < num_shards_per_pg; j++)
create_shard(pg_id, 64 * Mi);

// put and verify blobs in the pg, excluding the spare replicas
put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);

verify_get_blob(pg_shard_id_vec, num_blobs_per_shard);
verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false);

// all the replicas , including the spare ones, sync at this point
g_helper->sync();

// ====================Stage 2: replace a member====================
auto out_member_id = g_helper->replica_id(num_replicas - 1);
auto in_member_id = g_helper->replica_id(num_replicas); /*spare replica*/

run_on_pg_leader(pg_id, [&]() {
auto r = _obj_inst->pg_manager()
->replace_member(pg_id, out_member_id, PGMember{in_member_id, "new_member", 0})
.get();
ASSERT_TRUE(r);
});

// ====================Stage 3: the new member will kill itself to simulate restart, then P0 will help start it ====================
if (in_member_id == g_helper->my_replica_id()) {
while (!am_i_in_pg(pg_id)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
LOGINFO("new member is waiting to become a member of pg {}", pg_id);
}
wait_for_all(pg_shard_id_vec[pg_id].front() /*the first shard id in this pg*/,
num_blobs_per_shard - 1 /*the last blob id in this shard*/);
LOGINFO("the data in the first shard has been replicated to the new member");
LOGINFO("about to kill new member")
// SyncPoint 1(new member): kill itself.
g_helper->sync();
kill();
}

// SyncPoint 1(others): wait for the new member stop, then P0 will help start it.
LOGINFO("waiting for new member stop")
g_helper->sync();
if (g_helper->replica_num() == 0) {
//wait for kill
std::this_thread::sleep_for(std::chrono::milliseconds(restart_interval));
LOGINFO("going to restart new member")
g_helper->spawn_homeobject_process(num_replicas, true);
}
// SyncPoint 2(others): wait for restart completed, new member will call g_helper->sync() at setup func to end up this stage implicitly.
LOGINFO("waiting for new member start up")
g_helper->sync();
// SyncPoint 3: waiting for all the blobs are replicated to the new member
g_helper->sync();
} else {
// new member restart
while (!am_i_in_pg(pg_id)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
LOGINFO("new member is waiting to become a member of pg {}", pg_id);
}
wait_for_all(pg_shard_id_vec[pg_id].back() /*the first shard id in this pg*/,
num_shards_per_pg * num_blobs_per_shard - 1 /*the last blob id in this shard*/);
run_if_in_pg(pg_id, [&]() {
verify_get_blob(pg_shard_id_vec, num_blobs_per_shard);
verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false);
});
// SyncPoint 3(new member): replication done, notify others.
g_helper->sync();
}
}

SISL_OPTION_GROUP(
test_homeobject_repl_common,
(spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"),
Expand All @@ -175,7 +298,9 @@ SISL_OPTION_GROUP(
(qdepth, "", "qdepth", "Max outstanding operations", ::cxxopts::value< uint32_t >()->default_value("8"), "number"),
(num_pgs, "", "num_pgs", "number of pgs", ::cxxopts::value< uint64_t >()->default_value("2"), "number"),
(num_shards, "", "num_shards", "number of shards", ::cxxopts::value< uint64_t >()->default_value("4"), "number"),
(num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"));
(num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"),
(is_restart, "", "is_restart", "the process is restart or the first start", ::cxxopts::value< bool >()->
default_value("false"), "true or false"));

SISL_LOGGING_INIT(homeobject)
#define test_options logging, config, homeobject, test_homeobject_repl_common
Expand Down

0 comments on commit 17416c1

Please sign in to comment.