Skip to content

Commit

Permalink
Support to clean txn state
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jan 16, 2025
1 parent f21df45 commit b959b0a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 2 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,8 @@ CONF_mInt32(merge_commit_rpc_request_retry_interval_ms, "500");
CONF_mInt32(merge_commit_rpc_reqeust_timeout_ms, "10000");
CONF_mBool(merge_commit_trace_log_enable, "false");
CONF_mInt32(merge_commit_txn_state_cache_capacity, "4096");
CONF_mInt32(merge_commit_txn_state_clean_interval_sec, "300");
CONF_mInt32(merge_commit_txn_state_expire_time_sec, "1800");
CONF_mInt32(merge_commit_txn_state_poll_interval_ms, "2000");
CONF_mInt32(merge_commit_txn_state_poll_max_fail_times, "2");

Expand Down
26 changes: 25 additions & 1 deletion be/src/runtime/batch_write/txn_state_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,10 @@ TxnStateCache::TxnStateCache(size_t capacity, std::unique_ptr<ThreadPoolToken> p

Status TxnStateCache::init() {
_txn_state_poller = std::make_unique<TxnStatePoller>(this, _poll_state_token.get());
return _txn_state_poller->init();
RETURN_IF_ERROR(_txn_state_poller->init());
_txn_state_clean_thread = std::make_unique<std::thread>([this] { _txn_state_clean_func(); });
Thread::set_thread_name(*_txn_state_clean_thread.get(), "txn_state_clean");
return Status::OK();
}

Status TxnStateCache::push_state(int64_t txn_id, TTransactionStatus::type status, const std::string& reason) {
Expand Down Expand Up @@ -420,6 +423,10 @@ void TxnStateCache::stop() {
_txn_state_poller->stop();
}
_poll_state_token->shutdown();
_txn_state_clean_stop_latch.count_down();
if (_txn_state_clean_thread && _txn_state_clean_thread->joinable()) {
_txn_state_clean_thread->join();
}
}

int32_t TxnStateCache::size() {
Expand All @@ -446,6 +453,10 @@ StatusOr<TxnStateDynamicCacheEntry*> TxnStateCache::_get_txn_entry(TxnStateDynam
} else {
entry = cache->get(txn_id);
}
if (entry) {
// expire time does not need very accurate and monotonic, so do not protect it from concurrent update
entry->update_expire_time(get_current_ms() + config::merge_commit_txn_state_expire_time_sec * 1000);
}
return entry;
}

Expand All @@ -468,4 +479,17 @@ void TxnStateCache::_notify_poll_result(const TxnStatePollTask& task, const Stat
}
}

void TxnStateCache::_txn_state_clean_func() {
while (!_stopped) {
int32_t clean_interval_sec = config::merge_commit_txn_state_clean_interval_sec;
_txn_state_clean_stop_latch.wait_for(std::chrono::seconds(clean_interval_sec));
if (_stopped) {
break;
}
for (auto& cache : _shards) {
cache->clear_expired();
}
}
}

} // namespace starrocks
8 changes: 7 additions & 1 deletion be/src/runtime/batch_write/txn_state_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/utils.h"
#include "testutil/sync_point.h"
#include "util/bthreads/bthread_shared_mutex.h"
#include "util/countdown_latch.h"
#include "util/dynamic_cache.h"
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
Expand Down Expand Up @@ -256,14 +257,19 @@ class TxnStateCache {
StatusOr<TxnStateDynamicCacheEntry*> _get_txn_entry(TxnStateDynamicCache* cache, int64_t txn_id,
bool create_if_not_exist);
void _notify_poll_result(const TxnStatePollTask& task, const StatusOr<TxnState>& result);
void _txn_state_clean_func();

size_t _capacity;
std::unique_ptr<ThreadPoolToken> _poll_state_token;
TxnStateDynamicCachePtr _shards[kNumShards];
std::unique_ptr<TxnStatePoller> _txn_state_poller;
// protect the cache from being accessed after it is stopped
bthreads::BThreadSharedMutex _rw_mutex;
bool _stopped{false};
std::atomic<bool> _stopped{false};

std::unique_ptr<std::thread> _txn_state_clean_thread;
// used to notify the clean thread to stop
CountDownLatch _txn_state_clean_stop_latch{1};
};

inline TxnStateDynamicCache* TxnStateCache::_get_txn_cache(int64_t txn_id) {
Expand Down
25 changes: 25 additions & 0 deletions be/test/runtime/batch_write/txn_state_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,31 @@ TEST_F(TxnStateCacheTest, cache_set_capacity) {
}
}

TEST_F(TxnStateCacheTest, cache_clean_txn_state) {
auto cache = create_cache(2048);
auto old_clean_interval_sec = config::merge_commit_txn_state_clean_interval_sec;
auto old_expire_time_sec = config::merge_commit_txn_state_expire_time_sec;
config::merge_commit_txn_state_clean_interval_sec = 1;
config::merge_commit_txn_state_expire_time_sec = 1;
SyncPoint::GetInstance()->EnableProcessing();
DeferOp defer([&] {
SyncPoint::GetInstance()->ClearCallBack("TxnStateHandler::destruct");
SyncPoint::GetInstance()->DisableProcessing();
cache->stop();
config::merge_commit_txn_state_clean_interval_sec = old_clean_interval_sec;
config::merge_commit_txn_state_expire_time_sec = old_expire_time_sec;
});
std::atomic<int32_t> num_evict = 0;
SyncPoint::GetInstance()->SetCallBack("TxnStateHandler::destruct", [&](void* arg) { num_evict += 1; });
ASSERT_OK(cache->push_state(1, TTransactionStatus::VISIBLE, ""));
ASSERT_OK(cache->push_state(2, TTransactionStatus::VISIBLE, ""));
ASSERT_OK(cache->push_state(3, TTransactionStatus::VISIBLE, ""));
ASSERT_TRUE(Awaitility().timeout(10000000).until([&] { return num_evict == 3; }));
for (auto shard : cache->get_cache_shards()) {
ASSERT_EQ(0, shard->object_size());
}
}

TEST_F(TxnStateCacheTest, cache_stop) {
auto cache = create_cache(2048);
SyncPoint::GetInstance()->EnableProcessing();
Expand Down

0 comments on commit b959b0a

Please sign in to comment.