Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract get_hs_pg() function for simplicity #257

Merged
merged 1 commit into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.6"
version = "2.2.7"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
72 changes: 24 additions & 48 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,10 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
shared< homestore::ReplDev > repl_dev;
blob_id_t new_blob_id;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HS_PG* >(iter->second.get());
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
repl_dev = hs_pg->repl_dev_;
hs_pg->durable_entities_update(
const_cast< HS_PG* >(hs_pg)->durable_entities_update(
[&new_blob_id](auto& de) { new_blob_id = de.blob_sequence_num.fetch_add(1, std::memory_order_relaxed); },
false /* dirty */);

Expand Down Expand Up @@ -186,13 +184,8 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
}

bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob_info) {
HS_PG* hs_pg{nullptr};
{
shared_lock lock_guard(_pg_lock);
const auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = dynamic_cast< HS_PG* >(iter->second.get());
}
auto hs_pg = get_hs_pg(pg_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RELEASE_ASSERT(hs_pg != nullptr, "pg not found")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed & double-checked.

RELEASE_ASSERT(hs_pg != nullptr, "PG not found");
shared< BlobIndexTable > index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not initialized");

Expand All @@ -213,7 +206,7 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_info](auto& de) {
const_cast< HS_PG* >(hs_pg)->durable_entities_update([&blob_info](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_info.blob_id + 1;
while (next_blob_id > existing_blob_id &&
Expand Down Expand Up @@ -265,15 +258,10 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset,
uint64_t req_len) const {
auto& pg_id = shard.placement_group;
shared< BlobIndexTable > index_table;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
index_table = static_cast< HS_PG* >(iter->second.get())->index_table_;
}
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto repl_dev = hs_pg->repl_dev_;
auto index_table = hs_pg->index_table_;

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
RELEASE_ASSERT(index_table != nullptr, "Index table instance null");
Expand Down Expand Up @@ -370,8 +358,8 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto pg_iter = _pg_map.find(msg_header->pg_id);
if (pg_iter == _pg_map.end()) {
auto hs_pg = get_hs_pg(msg_header->pg_id);
if (hs_pg == nullptr) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later",
msg_header->pg_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
Expand All @@ -392,9 +380,8 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;

if (msg_header->blob_id != 0) {
//check if the blob already exists, if yes, return the blk id
auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_;
auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id);
// check if the blob already exists, if yes, return the blk id
auto r = get_blob_from_index_table(hs_pg->index_table_, msg_header->shard_id, msg_header->blob_id);
if (r.hasValue()) {
LOGT("Blob has already been persisted, blob_id:{}, shard_id:{}", msg_header->blob_id, msg_header->shard_id);
hints.committed_blk_id = r.value();
Expand All @@ -407,13 +394,9 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) {
BLOGT(shard.id, blob_id, "deleting blob");
auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
}
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto repl_dev = hs_pg->repl_dev_;

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

Expand Down Expand Up @@ -469,19 +452,12 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
return;
}

shared< BlobIndexTable > index_table;
shared< homestore::ReplDev > repl_dev;
HSHomeObject::HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get());
index_table = hs_pg->index_table_;
repl_dev = hs_pg->repl_dev_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
}
auto hs_pg = get_hs_pg(msg_header->pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto index_table = hs_pg->index_table_;
auto repl_dev = hs_pg->repl_dev_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
Expand All @@ -502,7 +478,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
auto& multiBlks = r.value();
if (multiBlks != tombstone_pbas) {
repl_dev->async_free_blks(lsn, multiBlks);
hs_pg->durable_entities_update([](auto& de) {
const_cast< HS_PG* >(hs_pg)->durable_entities_update([](auto& de) {
de.active_blob_count.fetch_sub(1, std::memory_order_relaxed);
de.tombstone_blob_count.fetch_add(1, std::memory_order_relaxed);
});
Expand Down
16 changes: 11 additions & 5 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ class HSHomeObject : public HomeObjectImpl {

// Snapshot receiver progress info, used as a checkpoint for recovery
// Placed within HS_PG since HomeObject is unable to locate the ReplicationStateMachine
homestore::superblk< snapshot_rcvr_info_superblk > snp_rcvr_info_sb_;
homestore::superblk< snapshot_rcvr_shard_list_superblk > snp_rcvr_shard_list_sb_;
mutable homestore::superblk< snapshot_rcvr_info_superblk > snp_rcvr_info_sb_;
mutable homestore::superblk< snapshot_rcvr_shard_list_superblk > snp_rcvr_shard_list_sb_;

HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table,
std::shared_ptr< const std::vector< homestore::chunk_num_t > > pg_chunk_ids);
Expand Down Expand Up @@ -430,8 +430,6 @@ class HSHomeObject : public HomeObjectImpl {

// Update the snp_info superblock
void update_snp_info_sb(bool init = false);

HS_PG* get_hs_pg(pg_id_t pg_id);
};

private:
Expand All @@ -456,6 +454,7 @@ class HSHomeObject : public HomeObjectImpl {
static std::string serialize_pg_info(const PGInfo& info);
static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size);
void add_pg_to_map(unique< HS_PG > hs_pg);
const HS_PG* _get_hs_pg_unlocked(pg_id_t pg_id) const;

// create shard related
shard_id_t generate_new_shard_id(pg_id_t pg);
Expand Down Expand Up @@ -542,6 +541,13 @@ class HSHomeObject : public HomeObjectImpl {
*/
void pg_destroy(pg_id_t pg_id);

/**
* @brief Get HS_PG object from given pg_id.
* @param pg_id The ID of the PG.
* @return The HS_PG object matching the given pg_id, otherwise nullptr.
*/
const HS_PG* get_hs_pg(pg_id_t pg_id) const;

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down Expand Up @@ -641,7 +647,7 @@ class HSHomeObject : public HomeObjectImpl {

BlobManager::Result< homestore::MultiBlkId > move_to_tombstone(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info);
void print_btree_index(pg_id_t pg_id);
void print_btree_index(pg_id_t pg_id) const;

shared< BlobIndexTable > get_index_table(pg_id_t pg_id);

Expand Down
79 changes: 36 additions & 43 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ PGError toPgError(ReplServiceError const& e) {

PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) {
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit();
if (pg_exists(pg_id)) return folly::Unit();

const auto chunk_size = chunk_selector()->get_chunk_size();
if (pg_info.size < chunk_size) {
Expand Down Expand Up @@ -148,12 +148,9 @@ PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDe
folly::Expected< HSHomeObject::HS_PG*, PGError > HSHomeObject::local_create_pg(shared< ReplDev > repl_dev,
PGInfo pg_info) {
auto pg_id = pg_info.id;
{
auto lg = shared_lock(_pg_lock);
if (auto it = _pg_map.find(pg_id); it != _pg_map.end()) {
LOGW("PG already exists, pg_id {}", pg_id);
return dynamic_cast< HS_PG* >(it->second.get());
}
if (auto hs_pg = get_hs_pg(pg_id); hs_pg) {
LOGW("PG already exists, pg_id {}", pg_id);
return const_cast< HS_PG* >(hs_pg);
}

auto local_chunk_size = chunk_selector()->get_chunk_size();
Expand Down Expand Up @@ -235,25 +232,19 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he

PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t pg_id, peer_id_t const& old_member_id,
PGMember const& new_member, uint32_t commit_quorum) {

group_id_t group_id;
{
auto lg = std::shared_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) return folly::makeUnexpected(PGError::UNKNOWN_PG);
auto& repl_dev = pg_repl_dev(*iter->second);

if (!repl_dev.is_leader() && commit_quorum == 0) {
// Only leader can replace a member
return folly::makeUnexpected(PGError::NOT_LEADER);
}
group_id = repl_dev.group_id();
auto hs_pg = get_hs_pg(pg_id);
if (hs_pg == nullptr) return folly::makeUnexpected(PGError::UNKNOWN_PG);
auto& repl_dev = pg_repl_dev(*hs_pg);
if (!repl_dev.is_leader() && commit_quorum == 0) {
// Only leader can replace a member
return folly::makeUnexpected(PGError::NOT_LEADER);
}
auto group_id = repl_dev.group_id();

LOGI("PG replace member initated member_out={} member_in={}", boost::uuids::to_string(old_member_id),
boost::uuids::to_string(new_member.id));

homestore::replica_member_info in_replica, out_replica;
replica_member_info in_replica, out_replica;
out_replica.id = old_member_id;
in_replica.id = new_member.id;
in_replica.priority = new_member.priority;
Expand Down Expand Up @@ -327,30 +318,28 @@ void HSHomeObject::pg_destroy(pg_id_t pg_id) {

LOGI("pg {} is destroyed", pg_id);
}

void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
if (hs_pg == nullptr) {
LOGW("mark pg destroyed with unknown pg_id {}", pg_id);
return;
}
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
hs_pg->pg_sb_->state = PGState::DESTROYED;
hs_pg->pg_sb_.write();
}

void HSHomeObject::destroy_hs_resources(pg_id_t pg_id) {
// Step 1: purge on repl dev
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
auto hs_pg = _get_hs_pg_unlocked(pg_id);
if (hs_pg == nullptr) {
LOGW("destroy repl dev with unknown pg_id {}", pg_id);
return;
}

auto& pg = iter->second;
auto group_id = pg->pg_info_.replica_set_uuid;
auto group_id = hs_pg->pg_info_.replica_set_uuid;
auto v = hs_repl_service().get_repl_dev(group_id);
if (v.hasError()) {
LOGW("get repl dev for group_id={} has failed", boost::uuids::to_string(group_id));
Expand All @@ -364,14 +353,12 @@ void HSHomeObject::destroy_hs_resources(pg_id_t pg_id) {

void HSHomeObject::destroy_pg_index_table(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
auto hs_pg = _get_hs_pg_unlocked(pg_id);
if (hs_pg == nullptr) {
LOGW("destroy pg index table with unknown pg_id {}", pg_id);
return;
}

auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
if (nullptr != hs_pg->index_table_) {
auto uuid_str = boost::uuids::to_string(hs_pg->index_table_->uuid());
index_table_pg_map_.erase(uuid_str);
Expand All @@ -392,18 +379,16 @@ void HSHomeObject::destroy_pg_superblk(pg_id_t pg_id) {

{
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
if (hs_pg == nullptr) {
LOGW("destroy pg superblk with unknown pg_id {}", pg_id);
return;
}

// destroy pg super blk
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
hs_pg->pg_sb_.destroy();

// erase pg in pg map
auto iter = _pg_map.find(pg_id);
_pg_map.erase(iter);
}
}
Expand Down Expand Up @@ -554,12 +539,20 @@ uint32_t HSHomeObject::HS_PG::open_shards() const {
return std::count_if(shards_.begin(), shards_.end(), [](auto const& s) { return s->is_open(); });
}

// NOTE: caller should hold the _pg_lock
const HSHomeObject::HS_PG* HSHomeObject::_get_hs_pg_unlocked(pg_id_t pg_id) const {
auto iter = _pg_map.find(pg_id);
return iter == _pg_map.end() ? nullptr : dynamic_cast< HS_PG* >(iter->second.get());
}

const HSHomeObject::HS_PG* HSHomeObject::get_hs_pg(pg_id_t pg_id) const {
std::shared_lock lck(_pg_lock);
return _get_hs_pg_unlocked(pg_id);
}

bool HSHomeObject::_get_stats(pg_id_t id, PGStats& stats) const {
auto lg = std::shared_lock(_pg_lock);
auto it = _pg_map.find(id);
if (_pg_map.end() == it) { return false; }
auto const& pg = it->second;
auto hs_pg = static_cast< HS_PG* >(pg.get());
auto hs_pg = get_hs_pg(id);
if (hs_pg == nullptr) return false;
auto const blk_size = hs_pg->repl_dev_->get_blk_size();

stats.id = hs_pg->pg_info_.id;
Expand Down
Loading
Loading