Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into graceful-shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Feb 20, 2025
2 parents 023a31c + 37b1f8a commit 00175b0
Show file tree
Hide file tree
Showing 17 changed files with 767 additions and 290 deletions.
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.4"
version = "2.2.8"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.6]@oss/master")
self.requires("homestore/[>=6.6.18]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
27 changes: 22 additions & 5 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,29 @@ target_link_libraries(homestore_test_dynamic PUBLIC
${COMMON_TEST_DEPS}
)

add_test(NAME HomestoreTestDynamic
add_test(NAME HomestoreTestReplaceMember
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:0)
--override_config homestore_config.consensus.snapshot_freq_distance:0
--gtest_filter=HomeObjectFixture.ReplaceMember)

# To test both baseline & incremental resync functionality, we use 13 to minimize the likelihood of it being a divisor of the total LSN (currently 30)
add_test(NAME HomestoreTestDynamicWithResync
COMMAND homestore_test_dynamic --gtest_filter="HomeObjectFixture.ReplaceMember" -csv error --executor immediate --config_path ./
add_test(NAME HomestoreTestReplaceMemberWithBaselineResync
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13
--override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000
--gtest_filter=HomeObjectFixture.ReplaceMember)

add_test(NAME HomestoreResyncTestWithFollowerRestart
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13
--override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000
--gtest_filter=HomeObjectFixture.RestartFollower*)

add_test(NAME HomestoreResyncTestWithLeaderRestart
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13)
--override_config homestore_config.consensus.num_reserved_log_items=13
--override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000
--gtest_filter=HomeObjectFixture.RestartLeader*)
72 changes: 25 additions & 47 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,10 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
shared< homestore::ReplDev > repl_dev;
blob_id_t new_blob_id;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HS_PG* >(iter->second.get());
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
repl_dev = hs_pg->repl_dev_;
hs_pg->durable_entities_update(
const_cast< HS_PG* >(hs_pg)->durable_entities_update(
[&new_blob_id](auto& de) { new_blob_id = de.blob_sequence_num.fetch_add(1, std::memory_order_relaxed); },
false /* dirty */);

Expand Down Expand Up @@ -194,13 +192,8 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
}

bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob_info) {
HS_PG* hs_pg{nullptr};
{
shared_lock lock_guard(_pg_lock);
const auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = dynamic_cast< HS_PG* >(iter->second.get());
}
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg != nullptr, "PG not found");
shared< BlobIndexTable > index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not initialized");

Expand All @@ -221,7 +214,7 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_info](auto& de) {
const_cast< HS_PG* >(hs_pg)->durable_entities_update([&blob_info](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_info.blob_id + 1;
while (next_blob_id > existing_blob_id &&
Expand Down Expand Up @@ -283,15 +276,10 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
incr_pending_request_num();

auto& pg_id = shard.placement_group;
shared< BlobIndexTable > index_table;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
index_table = static_cast< HS_PG* >(iter->second.get())->index_table_;
}
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto repl_dev = hs_pg->repl_dev_;
auto index_table = hs_pg->index_table_;

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
RELEASE_ASSERT(index_table != nullptr, "Index table instance null");
Expand Down Expand Up @@ -399,9 +387,10 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto pg_iter = _pg_map.find(msg_header->pg_id);
if (pg_iter == _pg_map.end()) {
auto hs_pg = get_hs_pg(msg_header->pg_id);
if (hs_pg == nullptr) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later", msg_header->pg_id);
decr_pending_request_num();
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

Expand All @@ -410,6 +399,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
if (shard_iter == _shard_map.end()) {
LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later",
msg_header->shard_id);
decr_pending_request_num();
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

Expand All @@ -421,8 +411,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<

if (msg_header->blob_id != 0) {
// check if the blob already exists, if yes, return the blk id
auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_;
auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id);
auto r = get_blob_from_index_table(hs_pg->index_table_, msg_header->shard_id, msg_header->blob_id);
if (r.hasValue()) {
LOGT("Blob has already been persisted, blob_id:{}, shard_id:{}", msg_header->blob_id, msg_header->shard_id);
hints.committed_blk_id = r.value();
Expand All @@ -441,13 +430,9 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo

BLOGT(shard.id, blob_id, "deleting blob");
auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
}
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto repl_dev = hs_pg->repl_dev_;

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

Expand Down Expand Up @@ -506,19 +491,12 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
return;
}

shared< BlobIndexTable > index_table;
shared< homestore::ReplDev > repl_dev;
HSHomeObject::HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get());
index_table = hs_pg->index_table_;
repl_dev = hs_pg->repl_dev_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
}
auto hs_pg = get_hs_pg(msg_header->pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
auto index_table = hs_pg->index_table_;
auto repl_dev = hs_pg->repl_dev_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
Expand All @@ -539,7 +517,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
auto& multiBlks = r.value();
if (multiBlks != tombstone_pbas) {
repl_dev->async_free_blks(lsn, multiBlks);
hs_pg->durable_entities_update([](auto& de) {
const_cast< HS_PG* >(hs_pg)->durable_entities_update([](auto& de) {
de.active_blob_count.fetch_sub(1, std::memory_order_relaxed);
de.tombstone_blob_count.fetch_add(1, std::memory_order_relaxed);
});
Expand Down
15 changes: 15 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,21 @@ void HSHomeObject::on_replica_restart() {
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf); },
[this](bool success) { on_shard_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_shard_meta_name);

// recover snapshot transmission progress info
HomeStore::instance()->meta_service().register_handler(
_snp_rcvr_meta_name,
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) { on_snp_rcvr_meta_blk_found(mblk, buf); },
[this](bool success) { on_snp_rcvr_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_snp_rcvr_meta_name);

HomeStore::instance()->meta_service().register_handler(
_snp_rcvr_shard_list_meta_name,
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_snp_rcvr_shard_list_meta_blk_found(mblk, buf);
},
[this](bool success) { on_snp_rcvr_shard_list_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_snp_rcvr_shard_list_meta_name);
});
}

Expand Down
70 changes: 62 additions & 8 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class HSHomeObject : public HomeObjectImpl {
inline static auto const _svc_meta_name = std::string("HomeObject");
inline static auto const _pg_meta_name = std::string("PGManager");
inline static auto const _shard_meta_name = std::string("ShardManager");
inline static auto const _snp_rcvr_meta_name = std::string("SnapshotReceiver");
inline static auto const _snp_rcvr_shard_list_meta_name = std::string("SnapshotReceiverShardList");
static constexpr uint64_t HS_CHUNK_SIZE = 2 * Gi;
static constexpr uint32_t _data_block_size = 1024;
static uint64_t _hs_chunk_size;
Expand Down Expand Up @@ -161,8 +163,30 @@ class HSHomeObject : public HomeObjectImpl {
homestore::chunk_num_t p_chunk_id;
homestore::chunk_num_t v_chunk_id;
};
// TODO this blk is used to store snapshot metadata/status for recovery
struct snapshot_info_superblk {};

// Since shard list can be quite large and only need to be persisted once, we store it in a separate superblk
struct snapshot_rcvr_info_superblk {
shard_id_t shard_cursor;
int64_t snp_lsn;
pg_id_t pg_id;

uint32_t size() const { return sizeof(snapshot_rcvr_info_superblk); }
static auto name() -> string { return _snp_rcvr_meta_name; }
};

struct snapshot_rcvr_shard_list_superblk {
pg_id_t pg_id;
int64_t snp_lsn;
uint64_t shard_cnt; // count of shards
shard_id_t shard_list[1]; // array of shard ids

uint32_t size() const {
return sizeof(snapshot_rcvr_shard_list_superblk) - sizeof(shard_id_t) + shard_cnt * sizeof(shard_id_t);
}

std::vector< shard_id_t > get_shard_list() const { return std::vector(shard_list, shard_list + shard_cnt); }
static auto name() -> string { return _snp_rcvr_shard_list_meta_name; }
};
#pragma pack()

public:
Expand Down Expand Up @@ -227,12 +251,16 @@ class HSHomeObject : public HomeObjectImpl {
uint32_t blk_size;
};

public:
homestore::superblk< pg_info_superblk > pg_sb_;
shared< homestore::ReplDev > repl_dev_;
std::shared_ptr< BlobIndexTable > index_table_;
PGMetrics metrics_;

// Snapshot receiver progress info, used as a checkpoint for recovery
// Placed within HS_PG since HomeObject is unable to locate the ReplicationStateMachine
mutable homestore::superblk< snapshot_rcvr_info_superblk > snp_rcvr_info_sb_;
mutable homestore::superblk< snapshot_rcvr_shard_list_superblk > snp_rcvr_shard_list_sb_;

HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table,
std::shared_ptr< const std::vector< homestore::chunk_num_t > > pg_chunk_ids);
HS_PG(homestore::superblk< pg_info_superblk >&& sb, shared< homestore::ReplDev > rdev);
Expand Down Expand Up @@ -353,6 +381,7 @@ class HSHomeObject : public HomeObjectImpl {
enum ErrorCode {
ALLOC_BLK_ERR = 1,
WRITE_DATA_ERR,
COMMIT_BLK_ERR,
INVALID_BLOB_HEADER,
BLOB_DATA_CORRUPTED,
ADD_BLOB_INDEX_ERR,
Expand All @@ -370,7 +399,15 @@ class HSHomeObject : public HomeObjectImpl {
bool is_last_batch);

int64_t get_context_lsn() const;
pg_id_t get_context_pg_id() const;

// Try to load existing snapshot context info
bool load_prev_context();

// Reset the context for a new snapshot, should be called before each new snapshot transmission
void reset_context(int64_t lsn, pg_id_t pg_id);
void destroy_context();

shard_id_t get_shard_cursor() const;
shard_id_t get_next_shard() const;

Expand All @@ -392,9 +429,8 @@ class HSHomeObject : public HomeObjectImpl {

std::unique_ptr< SnapshotContext > ctx_;

// snapshot info, can be used as a checkpoint for recovery
snapshot_info_superblk snp_info_;
// other stats for snapshot transmission progress
// Update the snp_info superblock
void update_snp_info_sb(bool init = false);
};

private:
Expand All @@ -405,7 +441,6 @@ class HSHomeObject : public HomeObjectImpl {
static constexpr size_t max_zpad_bufs = _data_block_size / io_align;
std::array< sisl::io_blob_safe, max_zpad_bufs > zpad_bufs_; // Zero padded buffers for blob payload.

private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

// blob related
Expand All @@ -420,6 +455,7 @@ class HSHomeObject : public HomeObjectImpl {
static std::string serialize_pg_info(const PGInfo& info);
static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size);
void add_pg_to_map(unique< HS_PG > hs_pg);
const HS_PG* _get_hs_pg_unlocked(pg_id_t pg_id) const;

// create shard related
shard_id_t generate_new_shard_id(pg_id_t pg);
Expand All @@ -437,6 +473,10 @@ class HSHomeObject : public HomeObjectImpl {
void on_pg_meta_blk_recover_completed(bool success);
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_shard_meta_blk_recover_completed(bool success);
void on_snp_rcvr_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_snp_rcvr_meta_blk_recover_completed(bool success);
void on_snp_rcvr_shard_list_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_snp_rcvr_shard_list_meta_blk_recover_completed(bool success);

void persist_pg_sb();

Expand Down Expand Up @@ -502,6 +542,13 @@ class HSHomeObject : public HomeObjectImpl {
*/
void pg_destroy(pg_id_t pg_id);

/**
* @brief Get HS_PG object from given pg_id.
* @param pg_id The ID of the PG.
* @return The HS_PG object matching the given pg_id, otherwise nullptr.
*/
const HS_PG* get_hs_pg(pg_id_t pg_id) const;

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down Expand Up @@ -601,7 +648,7 @@ class HSHomeObject : public HomeObjectImpl {

BlobManager::Result< homestore::MultiBlkId > move_to_tombstone(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info);
void print_btree_index(pg_id_t pg_id);
void print_btree_index(pg_id_t pg_id) const;

shared< BlobIndexTable > get_index_table(pg_id_t pg_id);

Expand Down Expand Up @@ -630,6 +677,13 @@ class HSHomeObject : public HomeObjectImpl {
*/
void destroy_shards(pg_id_t pg_id);

/**
* @brief Cleans up resources associated with the PG identified by pg_id on HomeStore side.
*
* @param pg_id The ID of the PG to be
*/
void destroy_hs_resources(pg_id_t pg_id);

/**
* @brief destroy index table for the PG located using a pg_id.
*
Expand Down
Loading

0 comments on commit 00175b0

Please sign in to comment.