Skip to content

Commit

Permalink
Reclaim resources on PG creation failure and add concurrency support …
Browse files Browse the repository at this point in the history
…for PG creation

- Reclaim resources if PG creation fails:
  - Ensure chunks are returned to the device heap if PG creation fails.
  - Remove replication device if PG creation fails.

- Add concurrency support for receiving PG creation requests:
  - Use `select_chunks_for_pg()` to support concurrent requests instead of only checking size.
  - Add concurrency tests to verify the behavior of concurrent PG creation requests.
  • Loading branch information
Hooper9973 committed Jan 10, 2025
1 parent c3a24ec commit 77d89ef
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 15 deletions.
13 changes: 7 additions & 6 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,20 @@ bool HeapChunkSelector::is_chunk_available(const pg_id_t pg_id, const chunk_num_

std::optional< uint32_t > HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id, uint64_t pg_size) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_per_pg_chunks.find(pg_id) != m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "PG had already created, pg_id {}", pg_id);
return std::nullopt;
}

const auto chunk_size = get_chunk_size();
if (pg_size < chunk_size) {
LOGWARNMOD(homeobject, "pg_size {} is less than chunk_size {}", pg_size, chunk_size);
return std::nullopt;
}

const uint32_t num_chunk = sisl::round_down(pg_size, chunk_size) / chunk_size;

if (m_per_pg_chunks.find(pg_id) != m_per_pg_chunks.end()) {
// leader may call select_chunks_for_pg multiple times
RELEASE_ASSERT(num_chunk == m_per_pg_chunks[pg_id]->m_pg_chunks.size(), "num_chunk should be same");
LOGWARNMOD(homeobject, "PG had already created, pg_id {}", pg_id);
return num_chunk;
}

// Select a pdev with the most available num chunk
auto most_avail_dev_it = std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(),
[](const std::pair< const uint32_t, std::shared_ptr< ChunkHeap > >& lhs,
Expand Down
48 changes: 40 additions & 8 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,58 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set<
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit();

const auto most_avail_num_chunks = chunk_selector()->most_avail_num_chunks();
const auto chunk_size = chunk_selector()->get_chunk_size();
if (pg_info.size < chunk_size) {
LOGW("Not support to create PG which pg_size {} < chunk_size {}", pg_info.size, chunk_size);
return folly::makeUnexpected(PGError::INVALID_ARG);
}

const auto needed_num_chunks = sisl::round_down(pg_info.size, chunk_size) / chunk_size;
if (needed_num_chunks > most_avail_num_chunks) {
LOGW("No enough space to create pg, pg_id {}, needed_num_chunks {}, most_avail_num_chunks {}", pg_id,
needed_num_chunks, most_avail_num_chunks);
auto const num_chunk = chunk_selector()->select_chunks_for_pg(pg_id, pg_info.size);
if (!num_chunk.has_value()) {
LOGW("Failed to select chunks for pg {}", pg_id);
return folly::makeUnexpected(PGError::NO_SPACE_LEFT);
}

pg_info.chunk_size = chunk_size;
pg_info.replica_set_uuid = boost::uuids::random_generator()();
const auto repl_dev_group_id = pg_info.replica_set_uuid;
return hs_repl_service()
.create_repl_dev(pg_info.replica_set_uuid, peers)
.via(executor_)
.thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullAsyncResult {
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("create_pg_create_repl_dev_error")) {
LOGW("Simulating create repl dev error in creating pg");
v = folly::makeUnexpected(ReplServiceError::FAILED);
}
#endif

if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); }
// we will write a PGHeader across the raft group and when it is committed
// all raft members will create PGinfo and index table for this PG.

// FIXME:https://github.com/eBay/HomeObject/pull/136#discussion_r1470504271
return do_create_pg(v.value(), std::move(pg_info));
})
.thenValue([this, pg_id, repl_dev_group_id](auto&& r) -> PGManager::NullAsyncResult {
// reclaim resources if failed to create pg
if (r.hasError()) {
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg {} chunks to dev_heap", pg_id);
// no matter if create repl dev successfully, remove it.
// if don't have repl dev, it will return ReplServiceError::SERVER_NOT_FOUND
return hs_repl_service()
.remove_repl_dev(repl_dev_group_id)
.deferValue([r, repl_dev_group_id](auto&& e) -> PGManager::NullAsyncResult {
if (e != ReplServiceError::OK) {
LOGW("Failed to remove repl device which group_id {}, error: {}", repl_dev_group_id, e);
}

// still return the original error
return folly::makeUnexpected(r.error());
});
}
return folly::Unit();
});
}

Expand All @@ -104,6 +130,13 @@ PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDe
req->header()->seal();
std::memcpy(req->header_extn(), serailized_pg_info.data(), info_size);

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("create_pg_raft_message_error")) {
LOGW("Simulating raft message error in creating pg");
return folly::makeUnexpected(PGError::UNKNOWN);
}
#endif

// replicate this create pg message to all raft members of this group
repl_dev->async_alloc_write(req->header_buf(), sisl::blob{}, sisl::sg_list{}, req);
return req->result().deferValue([req](auto const& e) -> PGManager::NullAsyncResult {
Expand Down Expand Up @@ -291,7 +324,7 @@ void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on pg destroy with unknown pg_id {}", pg_id);
LOGW("mark pg destroyed with unknown pg_id {}", pg_id);
return;
}
auto& pg = iter->second;
Expand All @@ -301,8 +334,7 @@ void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
}

void HSHomeObject::reset_pg_chunks(pg_id_t pg_id) {
bool res = chunk_selector_->reset_pg_chunks(pg_id);
RELEASE_ASSERT(res, "Failed to reset all chunks in pg {}", pg_id);
chunk_selector_->reset_pg_chunks(pg_id);
auto fut = homestore::hs()->cp_mgr().trigger_cp_flush(true /* force */);
auto on_complete = [&](auto success) {
RELEASE_ASSERT(success, "Failed to trigger CP flush");
Expand Down
1 change: 0 additions & 1 deletion src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ void HSHomeObject::destroy_shards(pg_id_t pg_id) {

auto& pg = iter->second;
for (auto& shard : pg->shards_) {
// release open shard v_chunk
auto hs_shard = s_cast< HS_Shard* >(shard.get());
// destroy shard super blk
hs_shard->sb_.destroy();
Expand Down
91 changes: 91 additions & 0 deletions src/lib/homestore_backend/tests/hs_pg_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "homeobj_fixture.hpp"
#include <homestore/replication_service.hpp>

TEST_F(HomeObjectFixture, PGStatsTest) {
LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num());
Expand Down Expand Up @@ -157,3 +158,93 @@ TEST_F(HomeObjectFixture, PGRecoveryTest) {
verify_hs_pg(reserved_pg, recovered_pg);
}
}

TEST_F(HomeObjectFixture, ConcurrencyCreatePG) {
g_helper->sync();

LOGINFO("print num chunks {}", _obj_inst->chunk_selector()->m_chunks.size());
auto const pg_num = 10;
// concurrent create pg
std::vector< std::future< void > > futures;
for (pg_id_t i = 1; i <= pg_num; ++i) {
futures.emplace_back(std::async(std::launch::async, [this, i]() { create_pg(i); }));
}
for (auto& future : futures) {
future.get();
}

// verify all pgs are created
for (pg_id_t i = 1; i <= pg_num; ++i) {
ASSERT_TRUE(pg_exist(i));
LOGINFO("Create pg {} successfully", i);
}
}

#ifdef _PRERELEASE
TEST_F(HomeObjectFixture, CreatePGFailed) {
set_basic_flip("create_pg_create_repl_dev_error", 1); // simulate create pg repl dev error
set_basic_flip("create_pg_raft_message_error", 1); // simulate create pg raft message error

// test twice to trigger each simulate error
for (auto i = 0; i < 2; ++i) {
g_helper->sync();
auto const pg_id = 1;
const uint8_t leader_replica_num = 0;
auto my_replica_num = g_helper->replica_num();
auto pg_size = SISL_OPTIONS["pg_size"].as< uint64_t >() * Mi;
auto name = g_helper->test_name();
if (leader_replica_num == my_replica_num) {
auto members = g_helper->members();
auto info = homeobject::PGInfo(pg_id);
info.size = pg_size;
for (const auto& member : members) {
if (leader_replica_num == member.second) {
// by default, leader is the first member
info.members.insert(homeobject::PGMember{member.first, name + std::to_string(member.second), 1});
} else {
info.members.insert(homeobject::PGMember{member.first, name + std::to_string(member.second), 0});
}
}
auto p = _obj_inst->pg_manager()->create_pg(std::move(info)).get();
ASSERT_FALSE(p);
ASSERT_EQ(PGError::UNKNOWN, p.error());

// verify pg resource
// since pg creation failed, the pg chunks should not exist
ASSERT_TRUE(_obj_inst->chunk_selector()->m_per_pg_chunks.find(pg_id) ==
_obj_inst->chunk_selector()->m_per_pg_chunks.end());
// wait for repl gc.
std::this_thread::sleep_for(std::chrono::seconds(70));
int num_repl = 0;
_obj_inst->hs_repl_service().iterate_repl_devs([&num_repl](cshared< homestore::ReplDev >&) { num_repl++; });
LOGINFO("Failed to create pg {} at leader, times {}, num_repl {}", pg_id, i, num_repl);
ASSERT_EQ(0, num_repl);

} else {
auto start_time = std::chrono::steady_clock::now();
bool res = true;
// follower need to wait for pg creation
while (!pg_exist(pg_id)) {
auto current_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast< std::chrono::seconds >(current_time - start_time).count();
if (duration >= 20) {
LOGINFO("Failed to create pg {} at follower", pg_id);
res = false;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
ASSERT_FALSE(res);
}
}

// test create pg successfully
g_helper->sync();
auto const pg_id = 1;
create_pg(pg_id);
ASSERT_TRUE(pg_exist(pg_id));
LOGINFO("create pg {} successfully", pg_id);
restart();
ASSERT_TRUE(pg_exist(pg_id));
}
#endif

0 comments on commit 77d89ef

Please sign in to comment.