diff --git a/be/src/runtime/batch_write/batch_write_mgr.cpp b/be/src/runtime/batch_write/batch_write_mgr.cpp index 462e4f5f4c265..2aa157f507bc2 100644 --- a/be/src/runtime/batch_write/batch_write_mgr.cpp +++ b/be/src/runtime/batch_write/batch_write_mgr.cpp @@ -240,7 +240,7 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* ctx->status = append_data(ctx); } -TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) { +static TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) { switch (status) { case TRANS_UNKNOWN: return TTransactionStatus::UNKNOWN; diff --git a/be/src/runtime/batch_write/txn_state_cache.cpp b/be/src/runtime/batch_write/txn_state_cache.cpp index 79fe10a0cf0d3..997de4ffa0a28 100644 --- a/be/src/runtime/batch_write/txn_state_cache.cpp +++ b/be/src/runtime/batch_write/txn_state_cache.cpp @@ -35,7 +35,7 @@ void TxnStateHandler::push_state(TTransactionStatus::type new_status, const std: if (_stopped) { return; } - _transit_txn_state(new_status, reason, true); + _transition_txn_state(new_status, reason, true); if (_is_finished_txn_state()) { _cv.notify_all(); } @@ -52,15 +52,15 @@ bool TxnStateHandler::poll_state(const StatusOr& result) { << ", status: " << result.status(); // fast fail if there is failure between FE and BE if (_num_poll_failure >= config::merge_commit_txn_state_poll_max_fail_times) { - _transit_txn_state(TTransactionStatus::UNKNOWN, - fmt::format("poll txn state failure exceeds max times {}, last error: {}", - _num_poll_failure, result.status().to_string(false)), - false); + _transition_txn_state(TTransactionStatus::UNKNOWN, + fmt::format("poll txn state failure exceeds max times {}, last error: {}", + _num_poll_failure, result.status().to_string(false)), + false); } } else { TRACE_BATCH_WRITE << "handler poll success, txn_id: " << _txn_id << ", " << result.value(); _num_poll_failure = 0; - _transit_txn_state(result.value().txn_status, result.value().reason, false); + _transition_txn_state(result.value().txn_status, result.value().reason, false); } // stop polling if reach the finished state or there is no subscriber if (_is_finished_txn_state()) { @@ -133,7 +133,8 @@ std::string TxnStateHandler::debug_string() { _num_waiting_finished_state, _stopped); } -void TxnStateHandler::_transit_txn_state(TTransactionStatus::type new_status, const std::string& reason, bool from_fe) { +void TxnStateHandler::_transition_txn_state(TTransactionStatus::type new_status, const std::string& reason, + bool from_fe) { TTransactionStatus::type old_status = _txn_state.txn_status; TRACE_BATCH_WRITE << "receive new txn state, txn_id: " << _txn_id << ", current status: " << to_string(_txn_state.txn_status) @@ -317,12 +318,12 @@ void TxnStatePoller::_execute_poll(const TxnStatePollTask& task) { } } -bool TxnStatePoller::is_txn_pending(int64_t txn_id) { +bool TxnStatePoller::TEST_is_txn_pending(int64_t txn_id) { std::unique_lock lock(_mutex); return _pending_txn_ids.find(txn_id) != _pending_txn_ids.end(); } -StatusOr TxnStatePoller::pending_execution_time(int64_t txn_id) { +StatusOr TxnStatePoller::TEST_pending_execution_time(int64_t txn_id) { std::unique_lock lock(_mutex); auto it = _pending_tasks.begin(); while (it != _pending_tasks.end()) { @@ -334,7 +335,7 @@ StatusOr TxnStatePoller::pending_execution_time(int64_t txn_id) { return Status::NotFound("no task found"); } -bool TxnStatePoller::is_scheduling() { +bool TxnStatePoller::TEST_is_scheduling() { std::unique_lock lock(_mutex); return _is_scheduling; } @@ -436,11 +437,14 @@ StatusOr TxnStateCache::_get_txn_entry(TxnStateDynam if (_stopped) { return Status::ServiceUnavailable("Transaction state cache is stopped"); } - TxnStateDynamicCacheEntry* entry = create_if_not_exist ? cache->get_or_create(txn_id, 1) : cache->get(txn_id); + TxnStateDynamicCacheEntry* entry = nullptr; if (create_if_not_exist) { + entry = cache->get_or_create(txn_id, 1); DCHECK(entry != nullptr); // initialize txn_id entry->value().set_txn_id(txn_id); + } else { + entry = cache->get(txn_id); } return entry; } diff --git a/be/src/runtime/batch_write/txn_state_cache.h b/be/src/runtime/batch_write/txn_state_cache.h index eb2a1944be715..541a37c5dd1d2 100644 --- a/be/src/runtime/batch_write/txn_state_cache.h +++ b/be/src/runtime/batch_write/txn_state_cache.h @@ -67,7 +67,6 @@ class TxnStateHandler { void subscribe(bool& trigger_poll); // Remove a subscriber which has called subscribe() before void unsubscribe(); - int32_t num_subscriber(); // A subscriber calls this function to wait for the txn state to reach // a finished state or error happens. StatusOr wait_finished_state(const std::string& subscriber_name, int64_t timeout_us); @@ -82,8 +81,11 @@ class TxnStateHandler { void stop(); + // For testing + int32_t TEST_num_subscriber() { return _num_subscriber; } + private: - void _transit_txn_state(TTransactionStatus::type new_status, const std::string& reason, bool from_fe); + void _transition_txn_state(TTransactionStatus::type new_status, const std::string& reason, bool from_fe); // Whether the current status indicate the load is finished bool _is_finished_txn_state(); @@ -102,11 +104,6 @@ class TxnStateHandler { bool _stopped{false}; }; -inline int32_t TxnStateHandler::num_subscriber() { - std::unique_lock lock(_mutex); - return _num_subscriber; -} - inline int32_t TxnStateHandler::num_waiting_finished_state() { std::unique_lock lock(_mutex); return _num_waiting_finished_state; @@ -177,9 +174,9 @@ class TxnStatePoller { void stop(); // For testing - bool is_txn_pending(int64_t txn_id); - StatusOr pending_execution_time(int64_t txn_id); - bool is_scheduling(); + bool TEST_is_txn_pending(int64_t txn_id); + StatusOr TEST_pending_execution_time(int64_t txn_id); + bool TEST_is_scheduling(); private: void _schedule_func(); diff --git a/be/test/runtime/batch_write/txn_state_cache_test.cpp b/be/test/runtime/batch_write/txn_state_cache_test.cpp index 43565026cdf1b..26f837ef4c2bc 100644 --- a/be/test/runtime/batch_write/txn_state_cache_test.cpp +++ b/be/test/runtime/batch_write/txn_state_cache_test.cpp @@ -186,18 +186,18 @@ TEST_F(TxnStateCacheTest, handler_subscriber) { TxnStateHandler handler; bool trigger_poll = false; handler.subscribe(trigger_poll); - ASSERT_EQ(1, handler.num_subscriber()); + ASSERT_EQ(1, handler.TEST_num_subscriber()); ASSERT_TRUE(trigger_poll); handler.subscribe(trigger_poll); - ASSERT_EQ(2, handler.num_subscriber()); + ASSERT_EQ(2, handler.TEST_num_subscriber()); ASSERT_FALSE(trigger_poll); handler.push_state(TTransactionStatus::VISIBLE, ""); handler.unsubscribe(); - ASSERT_EQ(1, handler.num_subscriber()); + ASSERT_EQ(1, handler.TEST_num_subscriber()); handler.unsubscribe(); - ASSERT_EQ(0, handler.num_subscriber()); + ASSERT_EQ(0, handler.TEST_num_subscriber()); handler.subscribe(trigger_poll); - ASSERT_EQ(1, handler.num_subscriber()); + ASSERT_EQ(1, handler.TEST_num_subscriber()); ASSERT_FALSE(trigger_poll); } @@ -317,7 +317,7 @@ TEST_F(TxnStateCacheTest, poller_skip_schedule) { // create a subscriber to trigger the poll scheduling auto s1 = cache->subscribe_state(1, "s1", _db, _tbl, _auth); ASSERT_OK(s1.status()); - ASSERT_TRUE(poller->is_txn_pending(1)); + ASSERT_TRUE(poller->TEST_is_txn_pending(1)); // transit the state to the final before scheduling the poll, and the poll should be skipped ASSERT_OK(cache->push_state(1, TTransactionStatus::VISIBLE, "")); @@ -327,7 +327,7 @@ TEST_F(TxnStateCacheTest, poller_skip_schedule) { // create subscriber for txn 2 to trigger the poll scheduling auto s2 = cache->subscribe_state(2, "s2", _db, _tbl, _auth); ASSERT_OK(s2.status()); - ASSERT_TRUE(poller->is_txn_pending(2)); + ASSERT_TRUE(poller->TEST_is_txn_pending(2)); // advance the time to schedule the poll for txn 2 SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 400; }); // when state for txn 2 becomes visible, it means poll task for txn 2 has been finished. The thread pool token @@ -462,7 +462,7 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) { ASSERT_TRUE(cache->get_state(1).status().is_not_found()); auto s1_1 = cache->subscribe_state(1, "s1_1", _db, _tbl, _auth); ASSERT_OK(s1_1.status()); - ASSERT_TRUE(poller->is_txn_pending(1)); + ASSERT_TRUE(poller->TEST_is_txn_pending(1)); assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s1_1.value()->current_state()); auto s1_2 = cache->subscribe_state(1, "s1_2", _db, _tbl, _auth); ASSERT_OK(s1_2.status()); @@ -472,7 +472,7 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) { auto s2_1 = cache->subscribe_state(2, "s2_1", _db, _tbl, _auth); ASSERT_OK(s2_1.status()); assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s2_1.value()->current_state()); - ASSERT_TRUE(poller->is_txn_pending(2)); + ASSERT_TRUE(poller->TEST_is_txn_pending(2)); SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 50; }); // txn 3 should be scheduled at time 150, current is 50 @@ -480,9 +480,9 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) { auto s3_1 = cache->subscribe_state(3, "s3_1", _db, _tbl, _auth); ASSERT_OK(s3_1.status()); assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s3_1.value()->current_state()); - ASSERT_TRUE(poller->is_txn_pending(1)); - ASSERT_TRUE(poller->is_txn_pending(2)); - ASSERT_TRUE(poller->is_txn_pending(3)); + ASSERT_TRUE(poller->TEST_is_txn_pending(1)); + ASSERT_TRUE(poller->TEST_is_txn_pending(2)); + ASSERT_TRUE(poller->TEST_is_txn_pending(3)); SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 80; }); // txn 4 should be scheduled at time 180, current is 80 @@ -490,10 +490,10 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) { auto s4_1 = cache->subscribe_state(4, "s4_1", _db, _tbl, _auth); ASSERT_OK(s4_1.status()); assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s4_1.value()->current_state()); - ASSERT_TRUE(poller->is_txn_pending(1)); - ASSERT_TRUE(poller->is_txn_pending(2)); - ASSERT_TRUE(poller->is_txn_pending(3)); - ASSERT_TRUE(poller->is_txn_pending(4)); + ASSERT_TRUE(poller->TEST_is_txn_pending(1)); + ASSERT_TRUE(poller->TEST_is_txn_pending(2)); + ASSERT_TRUE(poller->TEST_is_txn_pending(3)); + ASSERT_TRUE(poller->TEST_is_txn_pending(4)); auto t1_1 = std::thread([&]() { wait_func(s1_1.value().get(), 60000000, StatusOr({TTransactionStatus::VISIBLE, ""})); @@ -518,10 +518,10 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) { assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s1_1.value()->current_state()); assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s2_1.value()->current_state()); assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s3_1.value()->current_state()); - ASSERT_FALSE(poller->is_txn_pending(1)); - ASSERT_FALSE(poller->is_txn_pending(2)); - ASSERT_FALSE(poller->is_txn_pending(3)); - ASSERT_TRUE(poller->is_txn_pending(4)); + ASSERT_FALSE(poller->TEST_is_txn_pending(1)); + ASSERT_FALSE(poller->TEST_is_txn_pending(2)); + ASSERT_FALSE(poller->TEST_is_txn_pending(3)); + ASSERT_TRUE(poller->TEST_is_txn_pending(4)); ASSERT_EQ(3, num_rpc.load()); assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s4_1.value()->current_state()); @@ -537,7 +537,7 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) { // the state is COMMITTED which is not final. next poll time is 300 SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 200; }); ASSERT_TRUE(Awaitility().timeout(5000000).until([&] { - auto st = poller->pending_execution_time(4); + auto st = poller->TEST_pending_execution_time(4); return st.ok() && st.value() == 300; })); assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, s4_1.value()->current_state()); @@ -663,7 +663,7 @@ TEST_F(TxnStateCacheTest, cache_stop) { expected_status = Status::ServiceUnavailable("Transaction state cache is stopped"); ASSERT_EQ(expected_status.to_string(), cache->push_state(3, TTransactionStatus::VISIBLE, "").to_string(false)); ASSERT_EQ(expected_status.to_string(), cache->subscribe_state(3, "s4", _db, _tbl, _auth).status().to_string(false)); - ASSERT_FALSE(cache->txn_state_poller()->is_scheduling()); + ASSERT_FALSE(cache->txn_state_poller()->TEST_is_scheduling()); } } // namespace starrocks