Skip to content

Commit

Permalink
Extract get_hs_pg() function for simplicity (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
koujl authored Feb 19, 2025
1 parent 13b5911 commit 37b1f8a
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 214 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.6"
version = "2.2.7"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
72 changes: 24 additions & 48 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,10 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
shared< homestore::ReplDev > repl_dev;
blob_id_t new_blob_id;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HS_PG* >(iter->second.get());
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
repl_dev = hs_pg->repl_dev_;
hs_pg->durable_entities_update(
const_cast< HS_PG* >(hs_pg)->durable_entities_update(
[&new_blob_id](auto& de) { new_blob_id = de.blob_sequence_num.fetch_add(1, std::memory_order_relaxed); },
false /* dirty */);

Expand Down Expand Up @@ -186,13 +184,8 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
}

bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob_info) {
HS_PG* hs_pg{nullptr};
{
shared_lock lock_guard(_pg_lock);
const auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = dynamic_cast< HS_PG* >(iter->second.get());
}
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg != nullptr, "PG not found");
shared< BlobIndexTable > index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not initialized");

Expand All @@ -213,7 +206,7 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_info](auto& de) {
const_cast< HS_PG* >(hs_pg)->durable_entities_update([&blob_info](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_info.blob_id + 1;
while (next_blob_id > existing_blob_id &&
Expand Down Expand Up @@ -265,15 +258,10 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset,
uint64_t req_len) const {
auto& pg_id = shard.placement_group;
shared< BlobIndexTable > index_table;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
index_table = static_cast< HS_PG* >(iter->second.get())->index_table_;
}
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto repl_dev = hs_pg->repl_dev_;
auto index_table = hs_pg->index_table_;

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
RELEASE_ASSERT(index_table != nullptr, "Index table instance null");
Expand Down Expand Up @@ -370,8 +358,8 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto pg_iter = _pg_map.find(msg_header->pg_id);
if (pg_iter == _pg_map.end()) {
auto hs_pg = get_hs_pg(msg_header->pg_id);
if (hs_pg == nullptr) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later",
msg_header->pg_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
Expand All @@ -392,9 +380,8 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;

if (msg_header->blob_id != 0) {
//check if the blob already exists, if yes, return the blk id
auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_;
auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id);
// check if the blob already exists, if yes, return the blk id
auto r = get_blob_from_index_table(hs_pg->index_table_, msg_header->shard_id, msg_header->blob_id);
if (r.hasValue()) {
LOGT("Blob has already been persisted, blob_id:{}, shard_id:{}", msg_header->blob_id, msg_header->shard_id);
hints.committed_blk_id = r.value();
Expand All @@ -407,13 +394,9 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) {
BLOGT(shard.id, blob_id, "deleting blob");
auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
}
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto repl_dev = hs_pg->repl_dev_;

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

Expand Down Expand Up @@ -469,19 +452,12 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
return;
}

shared< BlobIndexTable > index_table;
shared< homestore::ReplDev > repl_dev;
HSHomeObject::HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get());
index_table = hs_pg->index_table_;
repl_dev = hs_pg->repl_dev_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
}
auto hs_pg = get_hs_pg(msg_header->pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto index_table = hs_pg->index_table_;
auto repl_dev = hs_pg->repl_dev_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
Expand All @@ -502,7 +478,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
auto& multiBlks = r.value();
if (multiBlks != tombstone_pbas) {
repl_dev->async_free_blks(lsn, multiBlks);
hs_pg->durable_entities_update([](auto& de) {
const_cast< HS_PG* >(hs_pg)->durable_entities_update([](auto& de) {
de.active_blob_count.fetch_sub(1, std::memory_order_relaxed);
de.tombstone_blob_count.fetch_add(1, std::memory_order_relaxed);
});
Expand Down
16 changes: 11 additions & 5 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ class HSHomeObject : public HomeObjectImpl {

// Snapshot receiver progress info, used as a checkpoint for recovery
// Placed within HS_PG since HomeObject is unable to locate the ReplicationStateMachine
homestore::superblk< snapshot_rcvr_info_superblk > snp_rcvr_info_sb_;
homestore::superblk< snapshot_rcvr_shard_list_superblk > snp_rcvr_shard_list_sb_;
mutable homestore::superblk< snapshot_rcvr_info_superblk > snp_rcvr_info_sb_;
mutable homestore::superblk< snapshot_rcvr_shard_list_superblk > snp_rcvr_shard_list_sb_;

HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table,
std::shared_ptr< const std::vector< homestore::chunk_num_t > > pg_chunk_ids);
Expand Down Expand Up @@ -430,8 +430,6 @@ class HSHomeObject : public HomeObjectImpl {

// Update the snp_info superblock
void update_snp_info_sb(bool init = false);

HS_PG* get_hs_pg(pg_id_t pg_id);
};

private:
Expand All @@ -456,6 +454,7 @@ class HSHomeObject : public HomeObjectImpl {
static std::string serialize_pg_info(const PGInfo& info);
static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size);
void add_pg_to_map(unique< HS_PG > hs_pg);
const HS_PG* _get_hs_pg_unlocked(pg_id_t pg_id) const;

// create shard related
shard_id_t generate_new_shard_id(pg_id_t pg);
Expand Down Expand Up @@ -542,6 +541,13 @@ class HSHomeObject : public HomeObjectImpl {
*/
void pg_destroy(pg_id_t pg_id);

/**
* @brief Get HS_PG object from given pg_id.
* @param pg_id The ID of the PG.
* @return The HS_PG object matching the given pg_id, otherwise nullptr.
*/
const HS_PG* get_hs_pg(pg_id_t pg_id) const;

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down Expand Up @@ -641,7 +647,7 @@ class HSHomeObject : public HomeObjectImpl {

BlobManager::Result< homestore::MultiBlkId > move_to_tombstone(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info);
void print_btree_index(pg_id_t pg_id);
void print_btree_index(pg_id_t pg_id) const;

shared< BlobIndexTable > get_index_table(pg_id_t pg_id);

Expand Down
79 changes: 36 additions & 43 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ PGError toPgError(ReplServiceError const& e) {

PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) {
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit();
if (pg_exists(pg_id)) return folly::Unit();

const auto chunk_size = chunk_selector()->get_chunk_size();
if (pg_info.size < chunk_size) {
Expand Down Expand Up @@ -148,12 +148,9 @@ PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDe
folly::Expected< HSHomeObject::HS_PG*, PGError > HSHomeObject::local_create_pg(shared< ReplDev > repl_dev,
PGInfo pg_info) {
auto pg_id = pg_info.id;
{
auto lg = shared_lock(_pg_lock);
if (auto it = _pg_map.find(pg_id); it != _pg_map.end()) {
LOGW("PG already exists, pg_id {}", pg_id);
return dynamic_cast< HS_PG* >(it->second.get());
}
if (auto hs_pg = get_hs_pg(pg_id); hs_pg) {
LOGW("PG already exists, pg_id {}", pg_id);
return const_cast< HS_PG* >(hs_pg);
}

auto local_chunk_size = chunk_selector()->get_chunk_size();
Expand Down Expand Up @@ -235,25 +232,19 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he

PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t pg_id, peer_id_t const& old_member_id,
PGMember const& new_member, uint32_t commit_quorum) {

group_id_t group_id;
{
auto lg = std::shared_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) return folly::makeUnexpected(PGError::UNKNOWN_PG);
auto& repl_dev = pg_repl_dev(*iter->second);

if (!repl_dev.is_leader() && commit_quorum == 0) {
// Only leader can replace a member
return folly::makeUnexpected(PGError::NOT_LEADER);
}
group_id = repl_dev.group_id();
auto hs_pg = get_hs_pg(pg_id);
if (hs_pg == nullptr) return folly::makeUnexpected(PGError::UNKNOWN_PG);
auto& repl_dev = pg_repl_dev(*hs_pg);
if (!repl_dev.is_leader() && commit_quorum == 0) {
// Only leader can replace a member
return folly::makeUnexpected(PGError::NOT_LEADER);
}
auto group_id = repl_dev.group_id();

LOGI("PG replace member initated member_out={} member_in={}", boost::uuids::to_string(old_member_id),
boost::uuids::to_string(new_member.id));

homestore::replica_member_info in_replica, out_replica;
replica_member_info in_replica, out_replica;
out_replica.id = old_member_id;
in_replica.id = new_member.id;
in_replica.priority = new_member.priority;
Expand Down Expand Up @@ -327,30 +318,28 @@ void HSHomeObject::pg_destroy(pg_id_t pg_id) {

LOGI("pg {} is destroyed", pg_id);
}

void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
if (hs_pg == nullptr) {
LOGW("mark pg destroyed with unknown pg_id {}", pg_id);
return;
}
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
hs_pg->pg_sb_->state = PGState::DESTROYED;
hs_pg->pg_sb_.write();
}

void HSHomeObject::destroy_hs_resources(pg_id_t pg_id) {
// Step 1: purge on repl dev
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
auto hs_pg = _get_hs_pg_unlocked(pg_id);
if (hs_pg == nullptr) {
LOGW("destroy repl dev with unknown pg_id {}", pg_id);
return;
}

auto& pg = iter->second;
auto group_id = pg->pg_info_.replica_set_uuid;
auto group_id = hs_pg->pg_info_.replica_set_uuid;
auto v = hs_repl_service().get_repl_dev(group_id);
if (v.hasError()) {
LOGW("get repl dev for group_id={} has failed", boost::uuids::to_string(group_id));
Expand All @@ -364,14 +353,12 @@ void HSHomeObject::destroy_hs_resources(pg_id_t pg_id) {

void HSHomeObject::destroy_pg_index_table(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
auto hs_pg = _get_hs_pg_unlocked(pg_id);
if (hs_pg == nullptr) {
LOGW("destroy pg index table with unknown pg_id {}", pg_id);
return;
}

auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
if (nullptr != hs_pg->index_table_) {
auto uuid_str = boost::uuids::to_string(hs_pg->index_table_->uuid());
index_table_pg_map_.erase(uuid_str);
Expand All @@ -392,18 +379,16 @@ void HSHomeObject::destroy_pg_superblk(pg_id_t pg_id) {

{
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
if (hs_pg == nullptr) {
LOGW("destroy pg superblk with unknown pg_id {}", pg_id);
return;
}

// destroy pg super blk
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
hs_pg->pg_sb_.destroy();

// erase pg in pg map
auto iter = _pg_map.find(pg_id);
_pg_map.erase(iter);
}
}
Expand Down Expand Up @@ -554,12 +539,20 @@ uint32_t HSHomeObject::HS_PG::open_shards() const {
return std::count_if(shards_.begin(), shards_.end(), [](auto const& s) { return s->is_open(); });
}

// NOTE: caller should hold the _pg_lock
const HSHomeObject::HS_PG* HSHomeObject::_get_hs_pg_unlocked(pg_id_t pg_id) const {
auto iter = _pg_map.find(pg_id);
return iter == _pg_map.end() ? nullptr : dynamic_cast< HS_PG* >(iter->second.get());
}

const HSHomeObject::HS_PG* HSHomeObject::get_hs_pg(pg_id_t pg_id) const {
std::shared_lock lck(_pg_lock);
return _get_hs_pg_unlocked(pg_id);
}

bool HSHomeObject::_get_stats(pg_id_t id, PGStats& stats) const {
auto lg = std::shared_lock(_pg_lock);
auto it = _pg_map.find(id);
if (_pg_map.end() == it) { return false; }
auto const& pg = it->second;
auto hs_pg = static_cast< HS_PG* >(pg.get());
auto hs_pg = get_hs_pg(id);
if (hs_pg == nullptr) return false;
auto const blk_size = hs_pg->repl_dev_->get_blk_size();

stats.id = hs_pg->pg_info_.id;
Expand Down
Loading

0 comments on commit 37b1f8a

Please sign in to comment.