Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jan 15, 2025
1 parent d761fda commit 55a3ea1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 44 deletions.
2 changes: 1 addition & 1 deletion be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
ctx->status = append_data(ctx);
}

TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) {
static TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) {
switch (status) {
case TRANS_UNKNOWN:
return TTransactionStatus::UNKNOWN;
Expand Down
26 changes: 15 additions & 11 deletions be/src/runtime/batch_write/txn_state_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void TxnStateHandler::push_state(TTransactionStatus::type new_status, const std:
if (_stopped) {
return;
}
_transit_txn_state(new_status, reason, true);
_transition_txn_state(new_status, reason, true);
if (_is_finished_txn_state()) {
_cv.notify_all();
}
Expand All @@ -52,15 +52,15 @@ bool TxnStateHandler::poll_state(const StatusOr<TxnState>& result) {
<< ", status: " << result.status();
// fast fail if there is failure between FE and BE
if (_num_poll_failure >= config::merge_commit_txn_state_poll_max_fail_times) {
_transit_txn_state(TTransactionStatus::UNKNOWN,
fmt::format("poll txn state failure exceeds max times {}, last error: {}",
_num_poll_failure, result.status().to_string(false)),
false);
_transition_txn_state(TTransactionStatus::UNKNOWN,
fmt::format("poll txn state failure exceeds max times {}, last error: {}",
_num_poll_failure, result.status().to_string(false)),
false);
}
} else {
TRACE_BATCH_WRITE << "handler poll success, txn_id: " << _txn_id << ", " << result.value();
_num_poll_failure = 0;
_transit_txn_state(result.value().txn_status, result.value().reason, false);
_transition_txn_state(result.value().txn_status, result.value().reason, false);
}
// stop polling if reach the finished state or there is no subscriber
if (_is_finished_txn_state()) {
Expand Down Expand Up @@ -133,7 +133,8 @@ std::string TxnStateHandler::debug_string() {
_num_waiting_finished_state, _stopped);
}

void TxnStateHandler::_transit_txn_state(TTransactionStatus::type new_status, const std::string& reason, bool from_fe) {
void TxnStateHandler::_transition_txn_state(TTransactionStatus::type new_status, const std::string& reason,
bool from_fe) {
TTransactionStatus::type old_status = _txn_state.txn_status;
TRACE_BATCH_WRITE << "receive new txn state, txn_id: " << _txn_id
<< ", current status: " << to_string(_txn_state.txn_status)
Expand Down Expand Up @@ -317,12 +318,12 @@ void TxnStatePoller::_execute_poll(const TxnStatePollTask& task) {
}
}

bool TxnStatePoller::is_txn_pending(int64_t txn_id) {
bool TxnStatePoller::TEST_is_txn_pending(int64_t txn_id) {
std::unique_lock<bthread::Mutex> lock(_mutex);
return _pending_txn_ids.find(txn_id) != _pending_txn_ids.end();
}

StatusOr<int64_t> TxnStatePoller::pending_execution_time(int64_t txn_id) {
StatusOr<int64_t> TxnStatePoller::TEST_pending_execution_time(int64_t txn_id) {
std::unique_lock<bthread::Mutex> lock(_mutex);
auto it = _pending_tasks.begin();
while (it != _pending_tasks.end()) {
Expand All @@ -334,7 +335,7 @@ StatusOr<int64_t> TxnStatePoller::pending_execution_time(int64_t txn_id) {
return Status::NotFound("no task found");
}

bool TxnStatePoller::is_scheduling() {
bool TxnStatePoller::TEST_is_scheduling() {
std::unique_lock<bthread::Mutex> lock(_mutex);
return _is_scheduling;
}
Expand Down Expand Up @@ -436,11 +437,14 @@ StatusOr<TxnStateDynamicCacheEntry*> TxnStateCache::_get_txn_entry(TxnStateDynam
if (_stopped) {
return Status::ServiceUnavailable("Transaction state cache is stopped");
}
TxnStateDynamicCacheEntry* entry = create_if_not_exist ? cache->get_or_create(txn_id, 1) : cache->get(txn_id);
TxnStateDynamicCacheEntry* entry = nullptr;
if (create_if_not_exist) {
entry = cache->get_or_create(txn_id, 1);
DCHECK(entry != nullptr);
// initialize txn_id
entry->value().set_txn_id(txn_id);
} else {
entry = cache->get(txn_id);
}
return entry;
}
Expand Down
17 changes: 7 additions & 10 deletions be/src/runtime/batch_write/txn_state_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class TxnStateHandler {
void subscribe(bool& trigger_poll);
// Remove a subscriber which has called subscribe() before
void unsubscribe();
int32_t num_subscriber();
// A subscriber calls this function to wait for the txn state to reach
// a finished state or error happens.
StatusOr<TxnState> wait_finished_state(const std::string& subscriber_name, int64_t timeout_us);
Expand All @@ -82,8 +81,11 @@ class TxnStateHandler {

void stop();

// For testing
int32_t TEST_num_subscriber() { return _num_subscriber; }

private:
void _transit_txn_state(TTransactionStatus::type new_status, const std::string& reason, bool from_fe);
void _transition_txn_state(TTransactionStatus::type new_status, const std::string& reason, bool from_fe);
// Whether the current status indicate the load is finished
bool _is_finished_txn_state();

Expand All @@ -102,11 +104,6 @@ class TxnStateHandler {
bool _stopped{false};
};

inline int32_t TxnStateHandler::num_subscriber() {
std::unique_lock<bthread::Mutex> lock(_mutex);
return _num_subscriber;
}

inline int32_t TxnStateHandler::num_waiting_finished_state() {
std::unique_lock<bthread::Mutex> lock(_mutex);
return _num_waiting_finished_state;
Expand Down Expand Up @@ -177,9 +174,9 @@ class TxnStatePoller {
void stop();

// For testing
bool is_txn_pending(int64_t txn_id);
StatusOr<int64_t> pending_execution_time(int64_t txn_id);
bool is_scheduling();
bool TEST_is_txn_pending(int64_t txn_id);
StatusOr<int64_t> TEST_pending_execution_time(int64_t txn_id);
bool TEST_is_scheduling();

private:
void _schedule_func();
Expand Down
44 changes: 22 additions & 22 deletions be/test/runtime/batch_write/txn_state_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,18 @@ TEST_F(TxnStateCacheTest, handler_subscriber) {
TxnStateHandler handler;
bool trigger_poll = false;
handler.subscribe(trigger_poll);
ASSERT_EQ(1, handler.num_subscriber());
ASSERT_EQ(1, handler.TEST_num_subscriber());
ASSERT_TRUE(trigger_poll);
handler.subscribe(trigger_poll);
ASSERT_EQ(2, handler.num_subscriber());
ASSERT_EQ(2, handler.TEST_num_subscriber());
ASSERT_FALSE(trigger_poll);
handler.push_state(TTransactionStatus::VISIBLE, "");
handler.unsubscribe();
ASSERT_EQ(1, handler.num_subscriber());
ASSERT_EQ(1, handler.TEST_num_subscriber());
handler.unsubscribe();
ASSERT_EQ(0, handler.num_subscriber());
ASSERT_EQ(0, handler.TEST_num_subscriber());
handler.subscribe(trigger_poll);
ASSERT_EQ(1, handler.num_subscriber());
ASSERT_EQ(1, handler.TEST_num_subscriber());
ASSERT_FALSE(trigger_poll);
}

Expand Down Expand Up @@ -317,7 +317,7 @@ TEST_F(TxnStateCacheTest, poller_skip_schedule) {
// create a subscriber to trigger the poll scheduling
auto s1 = cache->subscribe_state(1, "s1", _db, _tbl, _auth);
ASSERT_OK(s1.status());
ASSERT_TRUE(poller->is_txn_pending(1));
ASSERT_TRUE(poller->TEST_is_txn_pending(1));

// transit the state to the final before scheduling the poll, and the poll should be skipped
ASSERT_OK(cache->push_state(1, TTransactionStatus::VISIBLE, ""));
Expand All @@ -327,7 +327,7 @@ TEST_F(TxnStateCacheTest, poller_skip_schedule) {
// create subscriber for txn 2 to trigger the poll scheduling
auto s2 = cache->subscribe_state(2, "s2", _db, _tbl, _auth);
ASSERT_OK(s2.status());
ASSERT_TRUE(poller->is_txn_pending(2));
ASSERT_TRUE(poller->TEST_is_txn_pending(2));
// advance the time to schedule the poll for txn 2
SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 400; });
// when state for txn 2 becomes visible, it means poll task for txn 2 has been finished. The thread pool token
Expand Down Expand Up @@ -462,7 +462,7 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) {
ASSERT_TRUE(cache->get_state(1).status().is_not_found());
auto s1_1 = cache->subscribe_state(1, "s1_1", _db, _tbl, _auth);
ASSERT_OK(s1_1.status());
ASSERT_TRUE(poller->is_txn_pending(1));
ASSERT_TRUE(poller->TEST_is_txn_pending(1));
assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s1_1.value()->current_state());
auto s1_2 = cache->subscribe_state(1, "s1_2", _db, _tbl, _auth);
ASSERT_OK(s1_2.status());
Expand All @@ -472,28 +472,28 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) {
auto s2_1 = cache->subscribe_state(2, "s2_1", _db, _tbl, _auth);
ASSERT_OK(s2_1.status());
assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s2_1.value()->current_state());
ASSERT_TRUE(poller->is_txn_pending(2));
ASSERT_TRUE(poller->TEST_is_txn_pending(2));

SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 50; });
// txn 3 should be scheduled at time 150, current is 50
ASSERT_TRUE(cache->get_state(3).status().is_not_found());
auto s3_1 = cache->subscribe_state(3, "s3_1", _db, _tbl, _auth);
ASSERT_OK(s3_1.status());
assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s3_1.value()->current_state());
ASSERT_TRUE(poller->is_txn_pending(1));
ASSERT_TRUE(poller->is_txn_pending(2));
ASSERT_TRUE(poller->is_txn_pending(3));
ASSERT_TRUE(poller->TEST_is_txn_pending(1));
ASSERT_TRUE(poller->TEST_is_txn_pending(2));
ASSERT_TRUE(poller->TEST_is_txn_pending(3));

SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 80; });
// txn 4 should be scheduled at time 180, current is 80
ASSERT_TRUE(cache->get_state(4).status().is_not_found());
auto s4_1 = cache->subscribe_state(4, "s4_1", _db, _tbl, _auth);
ASSERT_OK(s4_1.status());
assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s4_1.value()->current_state());
ASSERT_TRUE(poller->is_txn_pending(1));
ASSERT_TRUE(poller->is_txn_pending(2));
ASSERT_TRUE(poller->is_txn_pending(3));
ASSERT_TRUE(poller->is_txn_pending(4));
ASSERT_TRUE(poller->TEST_is_txn_pending(1));
ASSERT_TRUE(poller->TEST_is_txn_pending(2));
ASSERT_TRUE(poller->TEST_is_txn_pending(3));
ASSERT_TRUE(poller->TEST_is_txn_pending(4));

auto t1_1 = std::thread([&]() {
wait_func(s1_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""}));
Expand All @@ -518,10 +518,10 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) {
assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s1_1.value()->current_state());
assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s2_1.value()->current_state());
assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s3_1.value()->current_state());
ASSERT_FALSE(poller->is_txn_pending(1));
ASSERT_FALSE(poller->is_txn_pending(2));
ASSERT_FALSE(poller->is_txn_pending(3));
ASSERT_TRUE(poller->is_txn_pending(4));
ASSERT_FALSE(poller->TEST_is_txn_pending(1));
ASSERT_FALSE(poller->TEST_is_txn_pending(2));
ASSERT_FALSE(poller->TEST_is_txn_pending(3));
ASSERT_TRUE(poller->TEST_is_txn_pending(4));
ASSERT_EQ(3, num_rpc.load());

assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s4_1.value()->current_state());
Expand All @@ -537,7 +537,7 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) {
// the state is COMMITTED which is not final. next poll time is 300
SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 200; });
ASSERT_TRUE(Awaitility().timeout(5000000).until([&] {
auto st = poller->pending_execution_time(4);
auto st = poller->TEST_pending_execution_time(4);
return st.ok() && st.value() == 300;
}));
assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, s4_1.value()->current_state());
Expand Down Expand Up @@ -663,7 +663,7 @@ TEST_F(TxnStateCacheTest, cache_stop) {
expected_status = Status::ServiceUnavailable("Transaction state cache is stopped");
ASSERT_EQ(expected_status.to_string(), cache->push_state(3, TTransactionStatus::VISIBLE, "").to_string(false));
ASSERT_EQ(expected_status.to_string(), cache->subscribe_state(3, "s4", _db, _tbl, _auth).status().to_string(false));
ASSERT_FALSE(cache->txn_state_poller()->is_scheduling());
ASSERT_FALSE(cache->txn_state_poller()->TEST_is_scheduling());
}

} // namespace starrocks

0 comments on commit 55a3ea1

Please sign in to comment.