From faf0e3802297cbfa439a4e941ad8caf85fe181b3 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Thu, 16 Jan 2025 19:14:28 +0800 Subject: [PATCH] [Enhancement] Introduce TxnStateCache for merge commit sync mode (backport #55001) (#55150) Signed-off-by: PengFei Li --- be/src/common/config.h | 22 +- be/src/http/action/update_config_action.cpp | 10 + be/src/runtime/CMakeLists.txt | 1 + .../runtime/batch_write/batch_write_mgr.cpp | 55 +- be/src/runtime/batch_write/batch_write_mgr.h | 17 +- be/src/runtime/batch_write/batch_write_util.h | 2 +- .../batch_write/isomorphic_batch_write.cpp | 105 +-- .../batch_write/isomorphic_batch_write.h | 7 +- .../runtime/batch_write/txn_state_cache.cpp | 495 +++++++++++++ be/src/runtime/batch_write/txn_state_cache.h | 278 +++++++ be/src/runtime/exec_env.cpp | 8 +- be/src/service/internal_service.cpp | 11 +- be/src/service/internal_service.h | 4 + be/src/util/dynamic_cache.h | 55 +- be/test/CMakeLists.txt | 1 + .../batch_write/batch_write_mgr_test.cpp | 54 ++ .../isomorphic_batch_write_test.cpp | 103 ++- .../batch_write/txn_state_cache_test.cpp | 694 ++++++++++++++++++ be/test/util/dynamic_cache_test.cpp | 15 + 19 files changed, 1768 insertions(+), 169 deletions(-) create mode 100644 be/src/runtime/batch_write/txn_state_cache.cpp create mode 100644 be/src/runtime/batch_write/txn_state_cache.h create mode 100644 be/test/runtime/batch_write/txn_state_cache_test.cpp diff --git a/be/src/common/config.h b/be/src/common/config.h index 2ac744b1b14c71..622f5ddc3f11ee 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1517,15 +1517,19 @@ CONF_mInt32(apply_version_slow_log_sec, "30"); CONF_mInt32(merge_commit_stream_load_pipe_block_wait_us, "500"); // The maximum number of bytes that the merge commit stream load pipe can buffer. CONF_mInt64(merge_commit_stream_load_pipe_max_buffered_bytes, "1073741824"); -CONF_Int32(batch_write_thread_pool_num_min, "0"); -CONF_Int32(batch_write_thread_pool_num_max, "512"); -CONF_Int32(batch_write_thread_pool_queue_size, "4096"); -CONF_mInt32(batch_write_default_timeout_ms, "600000"); -CONF_mInt32(batch_write_rpc_request_retry_num, "10"); -CONF_mInt32(batch_write_rpc_request_retry_interval_ms, "500"); -CONF_mInt32(batch_write_rpc_reqeust_timeout_ms, "10000"); -CONF_mInt32(batch_write_poll_load_status_interval_ms, "200"); -CONF_mBool(batch_write_trace_log_enable, "false"); +CONF_Int32(merge_commit_thread_pool_num_min, "0"); +CONF_Int32(merge_commit_thread_pool_num_max, "512"); +CONF_Int32(merge_commit_thread_pool_queue_size, "4096"); +CONF_mInt32(merge_commit_default_timeout_ms, "600000"); +CONF_mInt32(merge_commit_rpc_request_retry_num, "10"); +CONF_mInt32(merge_commit_rpc_request_retry_interval_ms, "500"); +CONF_mInt32(merge_commit_rpc_reqeust_timeout_ms, "10000"); +CONF_mBool(merge_commit_trace_log_enable, "false"); +CONF_mInt32(merge_commit_txn_state_cache_capacity, "4096"); +CONF_mInt32(merge_commit_txn_state_clean_interval_sec, "300"); +CONF_mInt32(merge_commit_txn_state_expire_time_sec, "1800"); +CONF_mInt32(merge_commit_txn_state_poll_interval_ms, "2000"); +CONF_mInt32(merge_commit_txn_state_poll_max_fail_times, "2"); // ignore union type tag in avro kafka routine load CONF_mBool(avro_ignore_union_type_tag, "false"); diff --git a/be/src/http/action/update_config_action.cpp b/be/src/http/action/update_config_action.cpp index 27a30e85b53e45..f527bf2f88ea16 100644 --- a/be/src/http/action/update_config_action.cpp +++ b/be/src/http/action/update_config_action.cpp @@ -52,6 +52,8 @@ #include "http/http_headers.h" #include "http/http_request.h" #include "http/http_status.h" +#include "runtime/batch_write/batch_write_mgr.h" +#include "runtime/batch_write/txn_state_cache.h" #include "storage/compaction_manager.h" #include "storage/lake/compaction_scheduler.h" #include "storage/lake/tablet_manager.h" @@ -316,6 +318,14 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str } return Status::OK(); }); + _config_callback.emplace("merge_commit_txn_state_cache_capacity", [&]() -> Status { + LOG(INFO) << "set merge_commit_txn_state_cache_capacity: " << config::merge_commit_txn_state_cache_capacity; + auto batch_write_mgr = _exec_env->batch_write_mgr(); + if (batch_write_mgr) { + batch_write_mgr->txn_state_cache()->set_capacity(config::merge_commit_txn_state_cache_capacity); + } + return Status::OK(); + }); #ifdef USE_STAROS #define UPDATE_STARLET_CONFIG(BE_CONFIG, STARLET_CONFIG) \ diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 15e45ad435c555..88ff203f07730a 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -62,6 +62,7 @@ set(RUNTIME_FILES batch_write/isomorphic_batch_write.cpp batch_write/batch_write_mgr.cpp batch_write/batch_write_util.cpp + batch_write/txn_state_cache.cpp routine_load/data_consumer.cpp routine_load/data_consumer_group.cpp routine_load/data_consumer_pool.cpp diff --git a/be/src/runtime/batch_write/batch_write_mgr.cpp b/be/src/runtime/batch_write/batch_write_mgr.cpp index 9bbf9ebee286a5..2aa157f507bc25 100644 --- a/be/src/runtime/batch_write/batch_write_mgr.cpp +++ b/be/src/runtime/batch_write/batch_write_mgr.cpp @@ -25,6 +25,15 @@ namespace starrocks { +BatchWriteMgr::BatchWriteMgr(std::unique_ptr executor) : _executor(std::move(executor)) {} + +Status BatchWriteMgr::init() { + std::unique_ptr token = + _executor->get_thread_pool()->new_token(ThreadPool::ExecutionMode::CONCURRENT); + _txn_state_cache = std::make_unique(config::merge_commit_txn_state_cache_capacity, std::move(token)); + return _txn_state_cache->init(); +} + Status BatchWriteMgr::register_stream_load_pipe(StreamLoadContext* pipe_ctx) { BatchWriteId batch_write_id = { .db = pipe_ctx->db, .table = pipe_ctx->table, .load_params = pipe_ctx->load_parameters}; @@ -78,7 +87,7 @@ StatusOr BatchWriteMgr::_get_batch_write(const st return it->second; } - auto batch_write = std::make_shared(batch_write_id, _executor.get()); + auto batch_write = std::make_shared(batch_write_id, _executor.get(), _txn_state_cache.get()); Status st = batch_write->init(); if (!st.ok()) { LOG(ERROR) << "Fail to init batch write, " << batch_write_id << ", status: " << st; @@ -105,6 +114,10 @@ void BatchWriteMgr::stop() { for (auto& batch_write : stop_writes) { batch_write->stop(); } + if (_txn_state_cache) { + _txn_state_cache->stop(); + } + _executor->get_thread_pool()->shutdown(); } StatusOr BatchWriteMgr::create_and_register_pipe( @@ -224,7 +237,45 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* ctx->buffer->flip(); ctx->receive_bytes = io_buf.size(); ctx->mc_read_data_cost_nanos = MonotonicNanos() - ctx->start_nanos; - ctx->status = exec_env->batch_write_mgr()->append_data(ctx); + ctx->status = append_data(ctx); +} + +static TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) { + switch (status) { + case TRANS_UNKNOWN: + return TTransactionStatus::UNKNOWN; + case TRANS_PREPARE: + return TTransactionStatus::PREPARE; + case TRANS_COMMITTED: + return TTransactionStatus::COMMITTED; + case TRANS_VISIBLE: + return TTransactionStatus::VISIBLE; + case TRANS_ABORTED: + return TTransactionStatus::ABORTED; + case TRANS_PREPARED: + return TTransactionStatus::PREPARED; + default: + return TTransactionStatus::UNKNOWN; + } +} + +void BatchWriteMgr::update_transaction_state(const PUpdateTransactionStateRequest* request, + PUpdateTransactionStateResponse* response) { + for (int i = 0; i < request->states_size(); i++) { + auto& txn_state = request->states(i); + auto st = _txn_state_cache->push_state(txn_state.txn_id(), to_thrift_txn_status(txn_state.status()), + txn_state.reason()); + if (!st.ok()) { + LOG(WARNING) << "Failed to update transaction state, txn_id: " << txn_state.txn_id() + << ", txn status: " << TransactionStatusPB_Name(txn_state.status()) + << ", status reason: " << txn_state.reason() << ", update error: " << st; + } else { + TRACE_BATCH_WRITE << "Update transaction state, txn_id: " << txn_state.txn_id() + << ", txn status: " << TransactionStatusPB_Name(txn_state.status()) + << ", status reason: " << txn_state.reason(); + } + st.to_protobuf(response->add_results()); + } } } // namespace starrocks \ No newline at end of file diff --git a/be/src/runtime/batch_write/batch_write_mgr.h b/be/src/runtime/batch_write/batch_write_mgr.h index 0b860a27de01da..801bb013bfaec4 100644 --- a/be/src/runtime/batch_write/batch_write_mgr.h +++ b/be/src/runtime/batch_write/batch_write_mgr.h @@ -18,6 +18,7 @@ #include "common/statusor.h" #include "runtime/batch_write/isomorphic_batch_write.h" +#include "runtime/batch_write/txn_state_cache.h" #include "runtime/stream_load/stream_load_context.h" #include "util/bthreads/bthread_shared_mutex.h" #include "util/bthreads/executor.h" @@ -32,10 +33,13 @@ class ExecEnv; class PStreamLoadRequest; class PStreamLoadResponse; class StreamLoadContext; +class PUpdateTransactionStateRequest; +class PUpdateTransactionStateResponse; class BatchWriteMgr { public: - BatchWriteMgr(std::unique_ptr executor) : _executor(std::move(executor)){}; + BatchWriteMgr(std::unique_ptr executor); + Status init(); Status register_stream_load_pipe(StreamLoadContext* pipe_ctx); void unregister_stream_load_pipe(StreamLoadContext* pipe_ctx); @@ -45,19 +49,26 @@ class BatchWriteMgr { void stop(); + bthreads::ThreadPoolExecutor* executor() { return _executor.get(); } + TxnStateCache* txn_state_cache() { return _txn_state_cache.get(); } + static StatusOr create_and_register_pipe( ExecEnv* exec_env, BatchWriteMgr* batch_write_mgr, const string& db, const string& table, const std::map& load_parameters, const string& label, long txn_id, const TUniqueId& load_id, int32_t batch_write_interval_ms); - static void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request, - PStreamLoadResponse* response); + void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request, + PStreamLoadResponse* response); + + void update_transaction_state(const PUpdateTransactionStateRequest* request, + PUpdateTransactionStateResponse* response); private: StatusOr _get_batch_write(const BatchWriteId& batch_write_id, bool create_if_missing); std::unique_ptr _executor; + std::unique_ptr _txn_state_cache; bthreads::BThreadSharedMutex _rw_mutex; std::unordered_map _batch_write_map; diff --git a/be/src/runtime/batch_write/batch_write_util.h b/be/src/runtime/batch_write/batch_write_util.h index dee7c3c615aab4..bf6fb12324faf5 100644 --- a/be/src/runtime/batch_write/batch_write_util.h +++ b/be/src/runtime/batch_write/batch_write_util.h @@ -22,7 +22,7 @@ namespace starrocks { -#define TRACE_BATCH_WRITE LOG_IF(INFO, config::batch_write_trace_log_enable) +#define TRACE_BATCH_WRITE LOG_IF(INFO, config::merge_commit_trace_log_enable) using BatchWriteLoadParams = std::map; diff --git a/be/src/runtime/batch_write/isomorphic_batch_write.cpp b/be/src/runtime/batch_write/isomorphic_batch_write.cpp index 10fade35811d94..22b81b6bd47241 100644 --- a/be/src/runtime/batch_write/isomorphic_batch_write.cpp +++ b/be/src/runtime/batch_write/isomorphic_batch_write.cpp @@ -119,8 +119,9 @@ class AsyncAppendDataContext { std::atomic_int num_retries{-1}; }; -IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor) - : _batch_write_id(std::move(batch_write_id)), _executor(executor) {} +IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor, + TxnStateCache* txn_state_cache) + : _batch_write_id(std::move(batch_write_id)), _executor(executor), _txn_state_cache(txn_state_cache) {} Status IsomorphicBatchWrite::init() { TEST_ERROR_POINT("IsomorphicBatchWrite::init::error"); @@ -220,7 +221,6 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) { if (_stopped.load(std::memory_order_acquire)) { return Status::ServiceUnavailable("Batch write is stopped"); } - int64_t start_ts = MonotonicNanos(); AsyncAppendDataContext* async_ctx = new AsyncAppendDataContext(data_ctx); async_ctx->ref(); async_ctx->create_time_ts.store(MonotonicNanos()); @@ -258,10 +258,7 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) { if (_batch_write_async) { return Status::OK(); } - int64_t timeout_ms = - data_ctx->timeout_second > 0 ? data_ctx->timeout_second * 1000 : config::batch_write_default_timeout_ms; - int64_t left_timeout_ns = std::max((int64_t)0, timeout_ms * 1000 * 1000 - (MonotonicNanos() - start_ts)); - return _wait_for_load_status(data_ctx, left_timeout_ns); + return _wait_for_load_finish(data_ctx); } int IsomorphicBatchWrite::_execute_tasks(void* meta, bthread::TaskIterator& iter) { @@ -309,7 +306,7 @@ Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) { SCOPED_RAW_TIMER(&write_data_cost_ns); st = _write_data_to_pipe(async_ctx); } - if (st.ok() || num_retries >= config::batch_write_rpc_request_retry_num) { + if (st.ok() || num_retries >= config::merge_commit_rpc_request_retry_num) { break; } num_retries += 1; @@ -324,7 +321,7 @@ Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) { SCOPED_RAW_TIMER(&wait_pipe_cost_ns); std::unique_lock lock(_mutex); if (_alive_stream_load_pipe_ctxs.empty()) { - _cv.wait_for(lock, config::batch_write_rpc_request_retry_interval_ms * 1000); + _cv.wait_for(lock, config::merge_commit_rpc_request_retry_interval_ms * 1000); } } } @@ -404,7 +401,7 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) { st = ThriftRpcHelper::rpc( master_addr.hostname, master_addr.port, [&request, &response](FrontendServiceConnection& client) { client->requestMergeCommit(response, request); }, - config::batch_write_rpc_reqeust_timeout_ms); + config::merge_commit_rpc_reqeust_timeout_ms); TRACE_BATCH_WRITE << "receive requestBatchWrite response, " << _batch_write_id << ", user label: " << data_ctx->label << ", master: " << master_addr << ", cost: " << ((MonotonicNanos() - start_ts) / 1000) << "us, status: " << st @@ -418,78 +415,42 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) { return st.ok() ? Status(response.status) : st; } -bool is_final_load_status(const TTransactionStatus::type& status) { - switch (status) { - case TTransactionStatus::VISIBLE: - case TTransactionStatus::ABORTED: - case TTransactionStatus::UNKNOWN: - return true; - default: - return false; +Status IsomorphicBatchWrite::_wait_for_load_finish(StreamLoadContext* data_ctx) { + int64_t total_timeout_ms = + data_ctx->timeout_second > 0 ? data_ctx->timeout_second * 1000 : config::merge_commit_default_timeout_ms; + int64_t left_timeout_ms = + std::max((int64_t)0, total_timeout_ms - (MonotonicNanos() - data_ctx->start_nanos) / 1000000); + StatusOr subscriber_status = _txn_state_cache->subscribe_state( + data_ctx->txn_id, data_ctx->label, data_ctx->db, data_ctx->table, data_ctx->auth); + if (!subscriber_status.ok()) { + return Status::InternalError("Failed to create txn state subscriber, " + + subscriber_status.status().to_string()); } -} - -// TODO just poll the load status periodically. improve it later, such as cache the label, and FE notify the BE -Status IsomorphicBatchWrite::_wait_for_load_status(StreamLoadContext* data_ctx, int64_t timeout_ns) { + TxnStateSubscriberPtr subscriber = std::move(subscriber_status.value()); int64_t start_ts = MonotonicNanos(); - int64_t wait_load_finish_ns = std::max((int64_t)0, data_ctx->mc_left_merge_time_nanos) + 1000000; - bthread_usleep(std::min(wait_load_finish_ns, timeout_ns) / 1000); - TGetLoadTxnStatusRequest request; - request.__set_db(_batch_write_id.db); - request.__set_tbl(_batch_write_id.table); - request.__set_txnId(data_ctx->txn_id); - set_request_auth(&request, data_ctx->auth); - TGetLoadTxnStatusResult response; - Status st; - do { - if (_stopped.load(std::memory_order_acquire)) { - return Status::ServiceUnavailable("Batch write is stopped"); - } -#ifndef BE_TEST - int64_t rpc_ts = MonotonicNanos(); - TNetworkAddress master_addr = get_master_address(); - st = ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &response](FrontendServiceConnection& client) { - client->getLoadTxnStatus(response, request); - }, - config::batch_write_rpc_reqeust_timeout_ms); - TRACE_BATCH_WRITE << "receive getLoadTxnStatus response, " << _batch_write_id - << ", user label: " << data_ctx->label << ", txn_id: " << data_ctx->txn_id - << ", label: " << data_ctx->batch_write_label << ", master: " << master_addr - << ", cost: " << ((MonotonicNanos() - rpc_ts) / 1000) << "us, status: " << st - << ", response: " << response; -#else - TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::request", &request); - TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::status", &st); - TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::response", &response); -#endif - if (st.ok() && is_final_load_status(response.status)) { - break; - } - int64_t left_timeout_ns = timeout_ns - (MonotonicNanos() - start_ts); - if (left_timeout_ns <= 0) { - break; - } - bthread_usleep( - std::min(config::batch_write_poll_load_status_interval_ms * (int64_t)1000, left_timeout_ns / 1000)); - } while (true); + StatusOr status_or = subscriber->wait_finished_state(left_timeout_ms * 1000); data_ctx->mc_wait_finish_cost_nanos = MonotonicNanos() - start_ts; - if (!st.ok()) { - return Status::InternalError("Failed to get load status, " + st.to_string()); + TRACE_BATCH_WRITE << "finish to wait load, " << _batch_write_id << ", user label: " << data_ctx->label + << ", txn_id: " << data_ctx->txn_id << ", load label: " << data_ctx->batch_write_label + << ", cost: " << (data_ctx->mc_wait_finish_cost_nanos / 1000) + << "us, wait status: " << status_or.status() << ", " + << (status_or.ok() ? status_or.value() : subscriber->current_state()); + if (!status_or.ok()) { + TxnState current_state = subscriber->current_state(); + return Status::InternalError(fmt::format("Failed to get load final status, current status: {}, error: {}", + to_string(current_state.txn_status), status_or.status().to_string())); } - switch (response.status) { - case TTransactionStatus::PREPARE: - case TTransactionStatus::PREPARED: - return Status::TimedOut("load timeout, txn status: " + to_string(response.status)); + switch (status_or.value().txn_status) { case TTransactionStatus::COMMITTED: return Status::PublishTimeout("Load has not been published before timeout"); case TTransactionStatus::VISIBLE: return Status::OK(); case TTransactionStatus::ABORTED: - return Status::InternalError("Load is aborted, reason: " + response.reason); + return Status::InternalError("Load is aborted, reason: " + status_or.value().reason); + case TTransactionStatus::UNKNOWN: + return Status::InternalError("Can't find the transaction, reason: " + status_or.value().reason); default: - return Status::InternalError("Load status is unknown: " + to_string(response.status)); + return Status::InternalError("Load status is not final: " + to_string(status_or.value().txn_status)); } } diff --git a/be/src/runtime/batch_write/isomorphic_batch_write.h b/be/src/runtime/batch_write/isomorphic_batch_write.h index c74b9f333ff7bb..c93bc176f0bbdb 100644 --- a/be/src/runtime/batch_write/isomorphic_batch_write.h +++ b/be/src/runtime/batch_write/isomorphic_batch_write.h @@ -25,6 +25,7 @@ #include "common/statusor.h" #include "runtime/batch_write/batch_write_util.h" +#include "runtime/batch_write/txn_state_cache.h" #include "util/countdown_latch.h" namespace starrocks { @@ -44,7 +45,8 @@ struct Task { class IsomorphicBatchWrite { public: - explicit IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor); + explicit IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor, + TxnStateCache* txn_state_cache); Status init(); @@ -65,10 +67,11 @@ class IsomorphicBatchWrite { Status _execute_write(AsyncAppendDataContext* async_ctx); Status _write_data_to_pipe(AsyncAppendDataContext* data_ctx); Status _send_rpc_request(StreamLoadContext* data_ctx); - Status _wait_for_load_status(StreamLoadContext* data_ctx, int64_t timeout_ns); + Status _wait_for_load_finish(StreamLoadContext* data_ctx); BatchWriteId _batch_write_id; bthreads::ThreadPoolExecutor* _executor; + TxnStateCache* _txn_state_cache; bool _batch_write_async{false}; bthread::Mutex _mutex; diff --git a/be/src/runtime/batch_write/txn_state_cache.cpp b/be/src/runtime/batch_write/txn_state_cache.cpp new file mode 100644 index 00000000000000..aa7a4fc5478a69 --- /dev/null +++ b/be/src/runtime/batch_write/txn_state_cache.cpp @@ -0,0 +1,495 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "runtime/batch_write/txn_state_cache.h" + +#include + +#include "agent/master_info.h" +#include "gen_cpp/FrontendService.h" +#include "runtime/batch_write/batch_write_util.h" +#include "runtime/client_cache.h" +#include "util/thread.h" +#include "util/thrift_rpc_helper.h" + +namespace starrocks { + +TxnStateHandler::~TxnStateHandler() { + TEST_SYNC_POINT_CALLBACK("TxnStateHandler::destruct", this); + TRACE_BATCH_WRITE << "evict txn state, " << debug_string(); +} + +void TxnStateHandler::push_state(TTransactionStatus::type new_status, const std::string& reason) { + std::unique_lock lock(_mutex); + if (_stopped) { + return; + } + _transition_txn_state(new_status, reason, true); + if (_is_finished_txn_state()) { + _cv.notify_all(); + } +} + +bool TxnStateHandler::poll_state(const StatusOr& result) { + std::unique_lock lock(_mutex); + if (_stopped) { + return false; + } + if (!result.status().ok()) { + _num_poll_failure += 1; + TRACE_BATCH_WRITE << "handler poll failure, txn_id: " << _txn_id << ", num_poll_failure: " << _num_poll_failure + << ", status: " << result.status(); + // fast fail if there is failure between FE and BE + if (_num_poll_failure >= config::merge_commit_txn_state_poll_max_fail_times) { + _transition_txn_state(TTransactionStatus::UNKNOWN, + fmt::format("poll txn state failure exceeds max times {}, last error: {}", + _num_poll_failure, result.status().to_string(false)), + false); + } + } else { + TRACE_BATCH_WRITE << "handler poll success, txn_id: " << _txn_id << ", " << result.value(); + _num_poll_failure = 0; + _transition_txn_state(result.value().txn_status, result.value().reason, false); + } + // stop polling if reach the finished state or there is no subscriber + if (_is_finished_txn_state()) { + _cv.notify_all(); + return false; + } else { + return _num_subscriber > 0; + } +} + +void TxnStateHandler::subscribe(bool& trigger_poll) { + std::unique_lock lock(_mutex); + _num_subscriber++; + // trigger polling if this is the first subscriber and not in finished state + trigger_poll = _num_subscriber == 1 && !_is_finished_txn_state(); +} + +void TxnStateHandler::unsubscribe() { + std::unique_lock lock(_mutex); + _num_subscriber--; +} + +StatusOr TxnStateHandler::wait_finished_state(const std::string& subscriber_name, int64_t timeout_us) { + std::unique_lock lock(_mutex); + if (_is_finished_txn_state()) { + return _txn_state; + } + if (_stopped) { + return Status::ServiceUnavailable("Transaction state handler is stopped"); + } + _num_waiting_finished_state++; + DeferOp defer([&] { _num_waiting_finished_state--; }); + + int64_t left_timeout_us = timeout_us; + while (left_timeout_us > 0) { + TRACE_BATCH_WRITE << "start to wait state, subscriber name: " << subscriber_name << ", txn_id: " << _txn_id + << ", timeout_us: " << left_timeout_us; + auto start_us = MonotonicMicros(); + int ret = _cv.wait_for(lock, left_timeout_us); + int64_t elapsed_us = MonotonicMicros() - start_us; + TRACE_BATCH_WRITE << "finish to wait state, subscriber name: " << subscriber_name << ", txn_id: " << _txn_id + << ", elapsed: " << elapsed_us << " us, txn_status: " << to_string(_txn_state.txn_status) + << ", reason: " << _txn_state.reason << ", stopped: " << _stopped; + if (_is_finished_txn_state()) { + return _txn_state; + } else if (_stopped) { + return Status::ServiceUnavailable("Transaction state handler is stopped"); + } else if (ret == ETIMEDOUT) { + break; + } + left_timeout_us = std::max((int64_t)0, left_timeout_us - elapsed_us); + } + return Status::TimedOut(fmt::format("Wait txn state timeout {} us", timeout_us)); +} + +void TxnStateHandler::stop() { + std::unique_lock lock(_mutex); + if (_stopped) { + return; + } + _stopped = true; + _cv.notify_all(); +} + +std::string TxnStateHandler::debug_string() { + std::unique_lock lock(_mutex); + return fmt::format( + "txn_id: {}, txn_status: {}, reason: {}, num_subscriber: {}, num_waiting_finished_state: {}, stopped: {}", + _txn_id.load(), to_string(_txn_state.txn_status), _txn_state.reason, _num_subscriber, + _num_waiting_finished_state, _stopped); +} + +void TxnStateHandler::_transition_txn_state(TTransactionStatus::type new_status, const std::string& reason, + bool from_fe) { + TTransactionStatus::type old_status = _txn_state.txn_status; + TRACE_BATCH_WRITE << "receive new txn state, txn_id: " << _txn_id + << ", current status: " << to_string(_txn_state.txn_status) + << ", current reason: " << _txn_state.reason << ", new status: " << new_status + << ", new reason: " << reason << ", from_fe: " << from_fe; + // special case for COMMITTED status. If it's notified by FE, it means the load finished with + // publish timeout, _is_finished_txn_state() should return true, and notify subscribers. + if (new_status == TTransactionStatus::COMMITTED && from_fe) { + _committed_status_from_fe = from_fe; + } + if (old_status == TTransactionStatus::VISIBLE || old_status == TTransactionStatus::ABORTED || + old_status == TTransactionStatus::UNKNOWN) { + return; + } else if (old_status == TTransactionStatus::PREPARED && new_status == TTransactionStatus::PREPARE) { + return; + } else if (old_status == TTransactionStatus::COMMITTED && + (new_status != TTransactionStatus::VISIBLE && new_status != TTransactionStatus::UNKNOWN)) { + return; + } + _txn_state.txn_status = new_status; + _txn_state.reason = reason; +} + +bool TxnStateHandler::_is_finished_txn_state() { + // The load can be successful or failed. When successful, the transaction status is VISIBLE. + // When failed, the transaction status can be COMMITTED, ABORTED, or UNKNOWN. COMMITTED is a + // special status + switch (_txn_state.txn_status) { + case TTransactionStatus::VISIBLE: + case TTransactionStatus::ABORTED: + case TTransactionStatus::UNKNOWN: + return true; + case TTransactionStatus::COMMITTED: + return _committed_status_from_fe; + default: + return false; + } +} + +StatusOr TxnStateSubscriber::wait_finished_state(int64_t timeout_us) { + return _entry->value().wait_finished_state(_name, timeout_us); +} + +TxnState TxnStateSubscriber::current_state() { + return _entry->value().txn_state(); +} + +inline int64_t get_current_ms() { + int64_t current_ts = MonotonicMillis(); + TEST_SYNC_POINT_CALLBACK("TxnStatePoller::get_current_ms", ¤t_ts); + return current_ts; +} + +Status TxnStatePoller::init() { + _schedule_thread = std::make_unique([this] { _schedule_func(); }); + Thread::set_thread_name(*_schedule_thread.get(), "txn_state_sche"); + return Status::OK(); +} + +void TxnStatePoller::submit(const TxnStatePollTask& task, int64_t delay_ms) { + std::unique_lock lock(_mutex); + if (_stopped) { + return; + } + if (_pending_txn_ids.find(task.txn_id) != _pending_txn_ids.end()) { + return; + } + int64_t execute_time = get_current_ms() + delay_ms; + _pending_txn_ids.emplace(task.txn_id); + _pending_tasks.emplace(std::make_pair(execute_time, task)); + _cv.notify_all(); + TRACE_BATCH_WRITE << "submit poll task, txn_id: " << task.txn_id << ", db: " << task.db << ", tbl: " << task.tbl + << ", delay_ms: " << delay_ms; +} + +void TxnStatePoller::stop() { + { + std::unique_lock lock(_mutex); + if (_stopped) { + return; + } + _stopped = true; + _cv.notify_all(); + } + if (_schedule_thread && _schedule_thread->joinable()) { + _schedule_thread->join(); + } +} + +void TxnStatePoller::_schedule_func() { + std::vector poll_tasks; + std::unique_lock lock(_mutex); + _is_scheduling = true; + while (!_stopped) { + int64_t current_ts = get_current_ms(); + auto it = _pending_tasks.begin(); + while (it != _pending_tasks.end()) { + if (it->first <= current_ts) { + _pending_txn_ids.erase(it->second.txn_id); + poll_tasks.emplace_back(it->second); + it = _pending_tasks.erase(it); + } else { + break; + } + } + if (!poll_tasks.empty()) { + lock.unlock(); + _schedule_poll_tasks(poll_tasks); + poll_tasks.clear(); + lock.lock(); + } + if (_stopped) { + break; + } + if (_pending_tasks.empty()) { + _cv.wait(lock); + } else { + // at least wait 50 ms to avoid busy loop + int64_t wait_time_ms = std::max((int64_t)50, _pending_tasks.begin()->first - get_current_ms()); + _cv.wait_for(lock, wait_time_ms * 1000); + } + } + _is_scheduling = false; +} + +void TxnStatePoller::_schedule_poll_tasks(const std::vector& poll_tasks) { + for (const auto& task : poll_tasks) { + // check current state of the txn, if it's not in cache or is final, skip poll + auto current_state = _txn_state_cache->get_state(task.txn_id); + if (!current_state.ok()) { + TRACE_BATCH_WRITE << "skip poll task because fail to get txn state, txn_id: " << task.txn_id + << ", db: " << task.db << ", tbl: " << task.tbl << ", error: " << current_state.status(); + continue; + } + TTransactionStatus::type txn_status = current_state.value().txn_status; + if (txn_status == TTransactionStatus::VISIBLE || txn_status == TTransactionStatus::ABORTED || + txn_status == TTransactionStatus::UNKNOWN) { + TRACE_BATCH_WRITE << "skip poll task because txn state is final, txn_id: " << task.txn_id + << ", db: " << task.db << ", tbl: " << task.tbl << ", state status: " << txn_status + << ", reason: " << current_state.value().reason; + continue; + } + Status status = _poll_token->submit_func([this, task] { _execute_poll(task); }, ThreadPool::HIGH_PRIORITY); + if (!status.ok()) { + _txn_state_cache->_notify_poll_result( + task, Status::InternalError("failed to submit poll txn state task, error: " + status.to_string())); + } else { + TRACE_BATCH_WRITE << "schedule poll task, txn_id: " << task.txn_id << ", db: " << task.db + << ", tbl: " << task.tbl; + } + } +} + +void TxnStatePoller::_execute_poll(const TxnStatePollTask& task) { + int64_t start_ts = MonotonicMicros(); + TGetLoadTxnStatusRequest request; + request.__set_db(task.db); + request.__set_tbl(task.tbl); + request.__set_txnId(task.txn_id); + set_request_auth(&request, task.auth); + TGetLoadTxnStatusResult response; + Status status; +#ifndef BE_TEST + TNetworkAddress master_addr = get_master_address(); + status = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &response](FrontendServiceConnection& client) { client->getLoadTxnStatus(response, request); }); +#else + TEST_SYNC_POINT_CALLBACK("TxnStatePoller::_execute_poll::request", &request); + TEST_SYNC_POINT_CALLBACK("TxnStatePoller::_execute_poll::status", &status); + TEST_SYNC_POINT_CALLBACK("TxnStatePoller::_execute_poll::response", &response); +#endif + TRACE_BATCH_WRITE << "execute poll task, txn_id: " << task.txn_id << ", db: " << task.db << ", tbl: " << task.tbl + << ", cost: " << (MonotonicMicros() - start_ts) << " us, rpc status: " << status + << ", response: " << response; + if (status.ok()) { + _txn_state_cache->_notify_poll_result(task, TxnState{response.status, response.reason}); + } else { + _txn_state_cache->_notify_poll_result( + task, Status::InternalError("poll txn state failed, error: " + status.to_string())); + } +} + +bool TxnStatePoller::TEST_is_txn_pending(int64_t txn_id) { + std::unique_lock lock(_mutex); + return _pending_txn_ids.find(txn_id) != _pending_txn_ids.end(); +} + +StatusOr TxnStatePoller::TEST_pending_execution_time(int64_t txn_id) { + std::unique_lock lock(_mutex); + auto it = _pending_tasks.begin(); + while (it != _pending_tasks.end()) { + if (it->second.txn_id == txn_id) { + return it->first; + } + ++it; + } + return Status::NotFound("no task found"); +} + +bool TxnStatePoller::TEST_is_scheduling() { + std::unique_lock lock(_mutex); + return _is_scheduling; +} + +TxnStateCache::TxnStateCache(size_t capacity, std::unique_ptr poller_token) + : _capacity(capacity), _poll_state_token(std::move(poller_token)) { + size_t capacity_per_shard = (_capacity + (kNumShards - 1)) / kNumShards; + for (int32_t i = 0; i < kNumShards; i++) { + _shards[i] = std::make_unique(capacity_per_shard); + } +} + +Status TxnStateCache::init() { + _txn_state_poller = std::make_unique(this, _poll_state_token.get()); + RETURN_IF_ERROR(_txn_state_poller->init()); + _txn_state_clean_thread = std::make_unique([this] { _txn_state_clean_func(); }); + Thread::set_thread_name(*_txn_state_clean_thread.get(), "txn_state_clean"); + return Status::OK(); +} + +Status TxnStateCache::push_state(int64_t txn_id, TTransactionStatus::type status, const std::string& reason) { + auto cache = _get_txn_cache(txn_id); + ASSIGN_OR_RETURN(auto entry, _get_txn_entry(cache, txn_id, true)); + DCHECK(entry != nullptr); + entry->value().push_state(status, reason); + cache->release(entry); + return Status::OK(); +} + +StatusOr TxnStateCache::get_state(int64_t txn_id) { + auto cache = _get_txn_cache(txn_id); + ASSIGN_OR_RETURN(auto entry, _get_txn_entry(cache, txn_id, false)); + if (entry == nullptr) { + return Status::NotFound("Transaction state not found"); + } + auto txn_state = entry->value().txn_state(); + cache->release(entry); + return txn_state; +} + +StatusOr TxnStateCache::subscribe_state(int64_t txn_id, const std::string& subscriber_name, + const std::string& db, const std::string& tbl, + const AuthInfo& auth) { + auto cache = _get_txn_cache(txn_id); + ASSIGN_OR_RETURN(auto entry, _get_txn_entry(cache, txn_id, true)); + DCHECK(entry != nullptr); + bool trigger_poll = false; + entry->value().subscribe(trigger_poll); + TRACE_BATCH_WRITE << "create subscriber, txn_id: " << txn_id << ", name: " << subscriber_name << ", db: " << db + << ", tbl: " << tbl << ", trigger_poll: " << trigger_poll; + if (trigger_poll) { + _txn_state_poller->submit({txn_id, db, tbl, auth}, config::merge_commit_txn_state_poll_interval_ms); + } + return std::make_unique(cache, entry, subscriber_name); +} + +void TxnStateCache::set_capacity(size_t new_capacity) { + std::unique_lock lock; + if (_stopped) { + return; + } + const size_t capacity_per_shard = (new_capacity + (kNumShards - 1)) / kNumShards; + for (auto& _shard : _shards) { + _shard->set_capacity(capacity_per_shard); + } + _capacity = new_capacity; +} + +void TxnStateCache::stop() { + { + std::unique_lock lock; + if (_stopped) { + return; + } + _stopped = true; + } + for (auto& cache : _shards) { + auto entries = cache->get_all_entries(); + for (auto entry : entries) { + entry->value().stop(); + cache->release(entry); + } + } + if (_txn_state_poller) { + _txn_state_poller->stop(); + } + _poll_state_token->shutdown(); + _txn_state_clean_stop_latch.count_down(); + if (_txn_state_clean_thread && _txn_state_clean_thread->joinable()) { + _txn_state_clean_thread->join(); + } +} + +int32_t TxnStateCache::size() { + int32_t size = 0; + for (auto& cache : _shards) { + size += cache->size(); + } + return size; +} + +StatusOr TxnStateCache::_get_txn_entry(TxnStateDynamicCache* cache, int64_t txn_id, + bool create_if_not_exist) { + // use lock to avoid creating new entry after stopped + std::shared_lock lock; + if (_stopped) { + return Status::ServiceUnavailable("Transaction state cache is stopped"); + } + TxnStateDynamicCacheEntry* entry = nullptr; + if (create_if_not_exist) { + entry = cache->get_or_create(txn_id, 1); + DCHECK(entry != nullptr); + // initialize txn_id + entry->value().set_txn_id(txn_id); + } else { + entry = cache->get(txn_id); + } + if (entry) { + // expire time does not need very accurate and monotonic, so do not protect it from concurrent update + entry->update_expire_time(get_current_ms() + config::merge_commit_txn_state_expire_time_sec * 1000); + } + return entry; +} + +void TxnStateCache::_notify_poll_result(const TxnStatePollTask& task, const StatusOr& result) { + auto cache = _get_txn_cache(task.txn_id); + auto entry_st = _get_txn_entry(cache, task.txn_id, false); + if (!entry_st.ok() || entry_st.value() == nullptr) { + TRACE_BATCH_WRITE << "skip notify poll result, txn_id: " << task.txn_id << ", db: " << task.db + << ", tbl: " << task.tbl << ", entry status: " + << (entry_st.ok() ? Status::NotFound("not in cache") : entry_st.status()); + return; + } + auto entry = entry_st.value(); + DeferOp defer([&] { cache->release(entry); }); + bool continue_poll = entry->value().poll_state(result); + TRACE_BATCH_WRITE << "notify poll result, txn_id: " << task.txn_id << ", db: " << task.db << ", tbl: " << task.tbl + << ", continue_poll: " << continue_poll; + if (continue_poll) { + _txn_state_poller->submit(task, config::merge_commit_txn_state_poll_interval_ms); + } +} + +void TxnStateCache::_txn_state_clean_func() { + while (!_stopped) { + int32_t clean_interval_sec = config::merge_commit_txn_state_clean_interval_sec; + _txn_state_clean_stop_latch.wait_for(std::chrono::seconds(clean_interval_sec)); + if (_stopped) { + break; + } + for (auto& cache : _shards) { + cache->clear_expired(); + } + } +} + +} // namespace starrocks \ No newline at end of file diff --git a/be/src/runtime/batch_write/txn_state_cache.h b/be/src/runtime/batch_write/txn_state_cache.h new file mode 100644 index 00000000000000..079d538fc7a67d --- /dev/null +++ b/be/src/runtime/batch_write/txn_state_cache.h @@ -0,0 +1,278 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include +#include + +#include "common/utils.h" +#include "testutil/sync_point.h" +#include "util/bthreads/bthread_shared_mutex.h" +#include "util/countdown_latch.h" +#include "util/dynamic_cache.h" +#include "util/threadpool.h" +#include "util/thrift_rpc_helper.h" + +namespace starrocks { + +class ThreadPoolToken; +class TxnStateHandler; +class TxnStateSubscriber; +class TxnStateCache; +using TxnStateDynamicCache = DynamicCache; +using TxnStateDynamicCachePtr = std::unique_ptr; +using TxnStateDynamicCacheEntry = TxnStateDynamicCache::Entry; + +struct TxnState { + TTransactionStatus::type txn_status{TTransactionStatus::PREPARE}; + std::string reason; +}; + +inline std::ostream& operator<<(std::ostream& os, const TxnState& txn_state) { + os << "txn_status: " << to_string(txn_state.txn_status) << ", reason: " << txn_state.reason; + return os; +} + +// Handle the transaction state. It's the value of the DynamicCache entry +// 1. transmit the txn state according to the new state pushed by FE or polled from FE +// 2. notify txn state subscribers whether it reaches the finished state +// 3. control the behaviour of txn state poll. The poll task starts to schedule when the +// first subscriber comes(see subscribe()), and continue to schedule (see poll_state()) +// until the txn state reaches the finished state, and there is no subscriber +class TxnStateHandler { +public: + ~TxnStateHandler(); + + // update the txn state pushed by FE + void push_state(TTransactionStatus::type new_status, const std::string& reason); + // update the txn state polled by the cache. The returned value tell the caller to + // continue or stop polling according the current result and the current txn state + bool poll_state(const StatusOr& result); + + // Add a subscriber for the finished state. Handler will set 'trigger_poll' + // to tell whether the caller should submit a txn state poll task + void subscribe(bool& trigger_poll); + // Remove a subscriber which has called subscribe() before + void unsubscribe(); + // A subscriber calls this function to wait for the txn state to reach + // a finished state or error happens. + StatusOr wait_finished_state(const std::string& subscriber_name, int64_t timeout_us); + int32_t num_waiting_finished_state(); + + void set_txn_id(int64_t txn_id) { _txn_id.store(txn_id); } + int64_t txn_id() { return _txn_id.load(); } + TxnState txn_state(); + bool committed_status_from_fe(); + int32_t num_poll_failure(); + std::string debug_string(); + + void stop(); + + // For testing + int32_t TEST_num_subscriber() { return _num_subscriber; } + +private: + void _transition_txn_state(TTransactionStatus::type new_status, const std::string& reason, bool from_fe); + // Whether the current status indicate the load is finished + bool _is_finished_txn_state(); + + // lazy initialized + std::atomic _txn_id{-1}; + bthread::Mutex _mutex; + bthread::ConditionVariable _cv; + TxnState _txn_state; + // whether COMMITTED status is notified by FE. Only valid if txn status is COMMITTED. + // If true, means publish timeout happens, and should notify subscribers. If false, + // means its from poll should continue to wait. + bool _committed_status_from_fe{false}; + int32_t _num_subscriber{0}; + int32_t _num_waiting_finished_state{0}; + int32_t _num_poll_failure{0}; + bool _stopped{false}; +}; + +inline int32_t TxnStateHandler::num_waiting_finished_state() { + std::unique_lock lock(_mutex); + return _num_waiting_finished_state; +} + +inline TxnState TxnStateHandler::txn_state() { + std::unique_lock lock(_mutex); + return _txn_state; +} + +inline bool TxnStateHandler::committed_status_from_fe() { + std::unique_lock lock(_mutex); + return _committed_status_from_fe; +} + +inline int32_t TxnStateHandler::num_poll_failure() { + std::unique_lock lock(_mutex); + return _num_poll_failure; +} + +inline std::ostream& operator<<(std::ostream& os, TxnStateHandler& holder) { + os << holder.debug_string(); + return os; +} + +// A subscriber which will wait for the finished txn state. It holds a reference +// to the entry of DynamicCache so that the cache will not evict the txn state. +// The subscriber can call wait_finished_state() to wait for the finished state. +class TxnStateSubscriber { +public: + TxnStateSubscriber(TxnStateDynamicCache* cache, TxnStateDynamicCacheEntry* entry, std::string name) + : _cache(cache), _entry(entry), _name(std::move(name)){}; + + ~TxnStateSubscriber() { + _entry->value().unsubscribe(); + _cache->release(_entry); + } + + const std::string& name() const { return _name; } + StatusOr wait_finished_state(int64_t timeout_us); + TxnState current_state(); + TxnStateDynamicCacheEntry* entry() { return _entry; } + +private: + TxnStateDynamicCache* _cache; + TxnStateDynamicCacheEntry* _entry; + std::string _name; +}; +using TxnStateSubscriberPtr = std::unique_ptr; + +struct TxnStatePollTask { + int64_t txn_id; + std::string db; + std::string tbl; + AuthInfo auth; +}; + +// Schedule and execute txn state poll tasks. The poller uses a single thread +// to schedule tasks according to their execution time, and submit them to the +// thread pool to run which will send rpc to FE to get txn state. +class TxnStatePoller { +public: + TxnStatePoller(TxnStateCache* txn_state_cache, ThreadPoolToken* poll_token) + : _txn_state_cache(txn_state_cache), _poll_token(poll_token) {} + Status init(); + // submit a task which should be executed after the delay time + void submit(const TxnStatePollTask& task, int64_t delay_ms); + void stop(); + + // For testing + bool TEST_is_txn_pending(int64_t txn_id); + StatusOr TEST_pending_execution_time(int64_t txn_id); + bool TEST_is_scheduling(); + +private: + void _schedule_func(); + void _schedule_poll_tasks(const std::vector& poll_tasks); + void _execute_poll(const TxnStatePollTask& task); + + TxnStateCache* _txn_state_cache; + ThreadPoolToken* _poll_token; + std::unique_ptr _schedule_thread; + bthread::Mutex _mutex; + bthread::ConditionVariable _cv; + // txn ids to schedule, used to duplicate tasks for the same txn + std::unordered_set _pending_txn_ids; + // sorted execution time (milliseconds) -> task + std::multimap _pending_tasks; + bool _is_scheduling{false}; + bool _stopped{false}; +}; + +// A cache for txn states. It can receive txn state in two ways: pushed by FE and polled from FE by itself. +// When the load finishes, FE will try to push the txn state to BE which is more efficient and realtime, +// but it does not always work because the push may fail for some reason, such as FE leader switch or crash. +// So BE will poll the txn state from FE periodically in a low frequency to detect those bad cases rather +// than just waiting until timeout. Apart from maintaining the txn state, the cache also provides a subscribe +// mechanism to notify the subscriber when the txn state reaches the finished state. +// The poll state task starts to schedule when the first subscriber comes, and continue to schedule when the +// last poll finishes. The schedule will end when the txn reaches the finished state or there is no subscriber. +class TxnStateCache { +public: + TxnStateCache(size_t capacity, std::unique_ptr poller_token); + Status init(); + + // update the txn state which is pushed by FE. It will create an entry + // in the DynamicCache it the txn does not in the cache before. + Status push_state(int64_t txn_id, TTransactionStatus::type status, const std::string& reason); + + // get the current state of txn_id. A TxnState will return if the txn is in the cache. + // Status::NotFound will return if the txn is not in the cache. Other status will return + // if error happens. + StatusOr get_state(int64_t txn_id); + + // create a TxnStateSubscriber to subscribe the finished txn state. It will create an entry + // in the DynamicCache it the txn does not in the cache before. The subscriber will hold a + // reference to the entry, so the entry will not be evicted if any subscriber is using it. + // The db/tbl/auth may be used to poll txn state. + StatusOr subscribe_state(int64_t txn_id, const std::string& subscriber_name, + const std::string& db, const std::string& tbl, + const AuthInfo& auth); + + void set_capacity(size_t new_capacity); + int32_t size(); + + void stop(); + + // For testing + std::vector get_cache_shards() { + std::vector ret; + for (auto& shard : _shards) { + ret.push_back(shard.get()); + } + return ret; + } + TxnStatePoller* txn_state_poller() { return _txn_state_poller.get(); } + +private: + static const int kNumShardBits = 5; + static const int kNumShards = 1 << kNumShardBits; + + friend class TxnStatePoller; + + TxnStateDynamicCache* _get_txn_cache(int64_t txn_id); + // if create_if_not_exist is true, must return non nullptr entry if status is ok. + // if create_if_not_exist is false, return nullptr if txn is not in cache. + // Return not ok status if error happens. + StatusOr _get_txn_entry(TxnStateDynamicCache* cache, int64_t txn_id, + bool create_if_not_exist); + void _notify_poll_result(const TxnStatePollTask& task, const StatusOr& result); + void _txn_state_clean_func(); + + size_t _capacity; + std::unique_ptr _poll_state_token; + TxnStateDynamicCachePtr _shards[kNumShards]; + std::unique_ptr _txn_state_poller; + // protect the cache from being accessed after it is stopped + bthreads::BThreadSharedMutex _rw_mutex; + std::atomic _stopped{false}; + + std::unique_ptr _txn_state_clean_thread; + // used to notify the clean thread to stop + CountDownLatch _txn_state_clean_stop_latch{1}; +}; + +inline TxnStateDynamicCache* TxnStateCache::_get_txn_cache(int64_t txn_id) { + return _shards[txn_id & (kNumShards - 1)].get(); +} +} // namespace starrocks diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index b3c1f2774560e7..7b702586f3160b 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -509,14 +509,15 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { std::unique_ptr batch_write_thread_pool; RETURN_IF_ERROR(ThreadPoolBuilder("batch_write") - .set_min_threads(config::batch_write_thread_pool_num_min) - .set_max_threads(config::batch_write_thread_pool_num_max) - .set_max_queue_size(config::batch_write_thread_pool_queue_size) + .set_min_threads(config::merge_commit_thread_pool_num_min) + .set_max_threads(config::merge_commit_thread_pool_num_max) + .set_max_queue_size(config::merge_commit_thread_pool_queue_size) .set_idle_timeout(MonoDelta::FromMilliseconds(10000)) .build(&batch_write_thread_pool)); auto batch_write_executor = std::make_unique(batch_write_thread_pool.release(), kTakesOwnership); _batch_write_mgr = new BatchWriteMgr(std::move(batch_write_executor)); + RETURN_IF_ERROR(_batch_write_mgr->init()); _routine_load_task_executor = new RoutineLoadTaskExecutor(this); RETURN_IF_ERROR(_routine_load_task_executor->init()); @@ -740,6 +741,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_result_queue_mgr); SAFE_DELETE(_result_mgr); SAFE_DELETE(_stream_mgr); + SAFE_DELETE(_batch_write_mgr); SAFE_DELETE(_external_scan_context_mgr); SAFE_DELETE(_lake_tablet_manager); SAFE_DELETE(_lake_update_manager); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 9a5098de8356bb..e1120e72013038 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1279,7 +1279,16 @@ void PInternalServiceImplBase::stream_load(google::protobuf::RpcController* c google::protobuf::Closure* done) { ClosureGuard closure_guard(done); auto* cntl = static_cast(cntl_base); - BatchWriteMgr::receive_stream_load_rpc(_exec_env, cntl, request, response); + _exec_env->batch_write_mgr()->receive_stream_load_rpc(_exec_env, cntl, request, response); +} + +template +void PInternalServiceImplBase::update_transaction_state(google::protobuf::RpcController* cntl_base, + const PUpdateTransactionStateRequest* request, + PUpdateTransactionStateResponse* response, + google::protobuf::Closure* done) { + ClosureGuard closure_guard(done); + _exec_env->batch_write_mgr()->update_transaction_state(request, response); } template class PInternalServiceImplBase; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index f571dff3cabba1..1523a472a2f3f2 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -186,6 +186,10 @@ class PInternalServiceImplBase : public T { void stream_load(google::protobuf::RpcController* controller, const PStreamLoadRequest* request, PStreamLoadResponse* response, google::protobuf::Closure* done) override; + void update_transaction_state(google::protobuf::RpcController* controller, + const PUpdateTransactionStateRequest* request, + PUpdateTransactionStateResponse* response, google::protobuf::Closure* done) override; + private: void _transmit_chunk(::google::protobuf::RpcController* controller, const ::starrocks::PTransmitChunkParams* request, ::starrocks::PTransmitChunkResult* response, diff --git a/be/src/util/dynamic_cache.h b/be/src/util/dynamic_cache.h index 6b7837eef0a020..121e399d7c6fad 100644 --- a/be/src/util/dynamic_cache.h +++ b/be/src/util/dynamic_cache.h @@ -37,12 +37,12 @@ namespace starrocks { // Note: the capacity is a soft limit, it will only free unused objects // to reduce memory usage, but if currently used(pinned) objects' memory // exceeds capacity, that's allowed. -template +template class DynamicCache { public: struct Entry { public: - Entry(DynamicCache& cache, Key key) : _cache(cache), _key(std::move(key)), _ref(1) {} + Entry(DynamicCache& cache, Key key) : _cache(cache), _key(std::move(key)), _ref(1) {} const Key& key() const { return _key; } T& value() { return _value; } @@ -54,10 +54,10 @@ class DynamicCache { uint32_t get_ref() const { return _ref.load(); } protected: - friend class DynamicCache; + friend class DynamicCache; typedef typename std::list::const_iterator Handle; - DynamicCache& _cache; + DynamicCache& _cache; Handle _handle; Key _key; size_t _size = 0; @@ -72,7 +72,7 @@ class DynamicCache { DynamicCache(size_t capacity) : _size(0), _capacity(capacity) {} ~DynamicCache() { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); _object_size = 0; _size = 0; auto itr = _list.begin(); @@ -98,7 +98,7 @@ class DynamicCache { // get or return null Entry* get(const Key& key) { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); auto itr = _map.find(key); if (itr == _map.end()) { return nullptr; @@ -111,8 +111,8 @@ class DynamicCache { // atomic get_or_create operation, to prevent loading // same resource multiple times - Entry* get_or_create(const Key& key) { - std::lock_guard lg(_lock); + Entry* get_or_create(const Key& key, size_t init_size = 0) { + std::lock_guard lg(_lock); auto itr = _map.find(key); if (itr == _map.end()) { // at first all created object is with size 0 @@ -123,6 +123,7 @@ class DynamicCache { insert->_handle = ret; _map[key] = ret; (*ret)->_ref++; + (*ret)->_size = init_size; _object_size++; if (insert->_size > 0) { _size += insert->_size; @@ -140,7 +141,7 @@ class DynamicCache { // release(unuse) an object get/get_or_create'ed earlier void release(Entry* entry) { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); // CHECK _ref > 1 entry->_ref--; if (entry->_ref > 0) { @@ -159,7 +160,7 @@ class DynamicCache { // remove an object get/get_or_create'ed earlier bool remove(Entry* entry) { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); entry->_ref--; if (entry->_ref != 1) { return false; @@ -179,7 +180,7 @@ class DynamicCache { // if no one use this object, object will be removed // otherwise do not remove the object, return false bool try_remove_by_key(const Key& key) { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); auto itr = _map.find(key); if (itr == _map.end()) { return true; @@ -203,7 +204,7 @@ class DynamicCache { // remove object by key // return true if object exist and is removed bool remove_by_key(const Key& key) { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); auto itr = _map.find(key); if (itr == _map.end()) { return false; @@ -228,7 +229,7 @@ class DynamicCache { // track size changes and evict objects accordingly // return false if actual memory usage is larger than capacity bool update_object_size(Entry* entry, size_t new_size) { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); _size += new_size - entry->_size; if (_mem_tracker) _mem_tracker->consume(new_size - entry->_size); entry->_size = new_size; @@ -240,7 +241,7 @@ class DynamicCache { std::vector entry_list; { int64_t now = MonotonicMillis(); - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); auto itr = _list.begin(); while (itr != _list.end()) { Entry* entry = (*itr); @@ -266,7 +267,7 @@ class DynamicCache { void clear() { std::vector entry_list; { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); auto itr = _list.begin(); while (itr != _list.end()) { Entry* entry = (*itr); @@ -296,13 +297,13 @@ class DynamicCache { // adjust capacity // return false if actual memory usage is larger than capacity bool set_capacity(size_t capacity) { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); _capacity = capacity; return _evict(); } std::vector> get_entry_sizes() const { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); std::vector> ret; ret.reserve(_map.size()); auto itr = _list.begin(); @@ -317,7 +318,7 @@ class DynamicCache { void try_evict(size_t target_capacity) { std::vector entry_list; { - std::lock_guard lg(_lock); + std::lock_guard lg(_lock); _evict(target_capacity, &entry_list); } for (Entry* entry : entry_list) { @@ -326,6 +327,22 @@ class DynamicCache { return; } + std::vector get_all_entries() { + std::vector entry_list; + { + std::lock_guard lg(_lock); + entry_list.reserve(_list.size()); + auto itr = _list.begin(); + while (itr != _list.end()) { + Entry* entry = (*itr); + entry->_ref++; + entry_list.push_back(entry); + itr++; + } + } + return entry_list; + } + bool TEST_evict(size_t target_capacity, std::vector* entry_list) { return _evict(target_capacity, entry_list); } @@ -360,7 +377,7 @@ class DynamicCache { return ret; } - mutable std::mutex _lock; + mutable Lock _lock; List _list; Map _map; size_t _object_size{0}; diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index f626b8c072f47d..5661d44dbdd090 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -387,6 +387,7 @@ set(EXEC_FILES ./runtime/batch_write/batch_write_mgr_test.cpp ./runtime/batch_write/batch_write_util_test.cpp ./runtime/batch_write/isomorphic_batch_write_test.cpp + ./runtime/batch_write/txn_state_cache_test.cpp #./runtime/routine_load_task_executor_test.cpp ./runtime/routine_load/data_consumer_test.cpp ./runtime/small_file_mgr_test.cpp diff --git a/be/test/runtime/batch_write/batch_write_mgr_test.cpp b/be/test/runtime/batch_write/batch_write_mgr_test.cpp index 21d2b34d435b78..9917d65f05d36d 100644 --- a/be/test/runtime/batch_write/batch_write_mgr_test.cpp +++ b/be/test/runtime/batch_write/batch_write_mgr_test.cpp @@ -36,6 +36,7 @@ class BatchWriteMgrTest : public testing::Test { BatchWriteMgrTest() = default; ~BatchWriteMgrTest() override = default; void SetUp() override { + config::merge_commit_trace_log_enable = true; _exec_env = ExecEnv::GetInstance(); std::unique_ptr thread_pool; ASSERT_OK(ThreadPoolBuilder("BatchWriteMgrTest") @@ -46,6 +47,7 @@ class BatchWriteMgrTest : public testing::Test { .build(&thread_pool)); auto executor = std::make_unique(thread_pool.release(), kTakesOwnership); _batch_write_mgr = std::make_unique(std::move(executor)); + ASSERT_OK(_batch_write_mgr->init()); } void TearDown() override { @@ -373,4 +375,56 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) { } } +TEST_F(BatchWriteMgrTest, update_transaction_state) { + PUpdateTransactionStateRequest request; + std::vector expected_cache_state; + + auto prepare_state = request.add_states(); + prepare_state->set_txn_id(1); + prepare_state->set_status(TransactionStatusPB::TRANS_PREPARE); + prepare_state->set_reason(""); + expected_cache_state.push_back({TTransactionStatus::PREPARE, ""}); + + auto prepared_state = request.add_states(); + prepared_state->set_txn_id(2); + prepared_state->set_status(TransactionStatusPB::TRANS_PREPARED); + prepared_state->set_reason(""); + expected_cache_state.push_back({TTransactionStatus::PREPARED, ""}); + + auto commited_state = request.add_states(); + commited_state->set_txn_id(3); + commited_state->set_status(TransactionStatusPB::TRANS_COMMITTED); + commited_state->set_reason(""); + expected_cache_state.push_back({TTransactionStatus::COMMITTED, ""}); + + auto visible_state = request.add_states(); + visible_state->set_txn_id(4); + visible_state->set_status(TransactionStatusPB::TRANS_VISIBLE); + visible_state->set_reason(""); + expected_cache_state.push_back({TTransactionStatus::VISIBLE, ""}); + + auto aborted_state = request.add_states(); + aborted_state->set_txn_id(5); + aborted_state->set_status(TransactionStatusPB::TRANS_ABORTED); + aborted_state->set_reason("artificial failure"); + expected_cache_state.push_back({TTransactionStatus::ABORTED, "artificial failure"}); + + auto unknown_state = request.add_states(); + unknown_state->set_txn_id(6); + unknown_state->set_status(TransactionStatusPB::TRANS_UNKNOWN); + unknown_state->set_reason(""); + expected_cache_state.push_back({TTransactionStatus::UNKNOWN, ""}); + + PUpdateTransactionStateResponse response; + _batch_write_mgr->update_transaction_state(&request, &response); + ASSERT_EQ(request.states_size(), response.results_size()); + for (int i = 1; i <= expected_cache_state.size(); ++i) { + ASSERT_EQ(TStatusCode::OK, response.results(i - 1).status_code()); + auto actual_state = _batch_write_mgr->txn_state_cache()->get_state(i); + ASSERT_OK(actual_state.status()); + ASSERT_EQ(expected_cache_state[i - 1].txn_status, actual_state.value().txn_status); + ASSERT_EQ(expected_cache_state[i - 1].reason, actual_state.value().reason); + } +} + } // namespace starrocks \ No newline at end of file diff --git a/be/test/runtime/batch_write/isomorphic_batch_write_test.cpp b/be/test/runtime/batch_write/isomorphic_batch_write_test.cpp index 92e24a21b9ae04..d7dffa44682e8f 100644 --- a/be/test/runtime/batch_write/isomorphic_batch_write_test.cpp +++ b/be/test/runtime/batch_write/isomorphic_batch_write_test.cpp @@ -33,6 +33,7 @@ class IsomorphicBatchWriteTest : public testing::Test { IsomorphicBatchWriteTest() = default; ~IsomorphicBatchWriteTest() override = default; void SetUp() override { + config::merge_commit_trace_log_enable = true; _exec_env = ExecEnv::GetInstance(); std::unique_ptr thread_pool; ASSERT_OK(ThreadPoolBuilder("IsomorphicBatchWriteTest") @@ -42,12 +43,22 @@ class IsomorphicBatchWriteTest : public testing::Test { .set_idle_timeout(MonoDelta::FromMilliseconds(10000)) .build(&thread_pool)); _executor = std::make_unique(thread_pool.release(), kTakesOwnership); + std::unique_ptr token = + _executor->get_thread_pool()->new_token(ThreadPool::ExecutionMode::CONCURRENT); + _txn_state_cache = std::make_unique(2048, std::move(token)); + ASSERT_OK(_txn_state_cache->init()); } void TearDown() override { for (auto* ctx : _to_release_contexts) { StreamLoadContext::release(ctx); } + if (_txn_state_cache) { + _txn_state_cache->stop(); + } + if (_executor) { + _executor->get_thread_pool()->shutdown(); + } } StreamLoadContext* build_pipe_context(const std::string& label, int64_t txn_id, const BatchWriteId& batch_write_id, @@ -84,12 +95,13 @@ class IsomorphicBatchWriteTest : public testing::Test { return ctx; } - void test_append_data_sync_base(const Status& rpc_status, const TGetLoadTxnStatusResult& expect_result, + void test_append_data_sync_base(int64_t txn_id, std::string label, const TxnState& txn_state, const Status& expect_st); protected: ExecEnv* _exec_env; std::unique_ptr _executor; + std::unique_ptr _txn_state_cache; std::unordered_set _to_release_contexts; }; @@ -102,7 +114,8 @@ void verify_data(std::string expected, ByteBufferPtr actual) { TEST_F(IsomorphicBatchWriteTest, register_and_unregister_pipe) { BatchWriteId batch_write_id{.db = "db", .table = "table", .load_params = {}}; - IsomorphicBatchWriteSharedPtr batch_write = std::make_shared(batch_write_id, _executor.get()); + IsomorphicBatchWriteSharedPtr batch_write = + std::make_shared(batch_write_id, _executor.get(), _txn_state_cache.get()); ASSERT_OK(batch_write->init()); DeferOp defer_writer([&] { batch_write->stop(); }); @@ -134,7 +147,8 @@ TEST_F(IsomorphicBatchWriteTest, register_and_unregister_pipe) { TEST_F(IsomorphicBatchWriteTest, append_data_async) { BatchWriteId batch_write_id{.db = "db", .table = "table", .load_params = {{HTTP_MERGE_COMMIT_ASYNC, "true"}}}; - IsomorphicBatchWriteSharedPtr batch_write = std::make_shared(batch_write_id, _executor.get()); + IsomorphicBatchWriteSharedPtr batch_write = + std::make_shared(batch_write_id, _executor.get(), _txn_state_cache.get()); ASSERT_OK(batch_write->init()); DeferOp defer_writer([&] { batch_write->stop(); }); @@ -244,34 +258,21 @@ TEST_F(IsomorphicBatchWriteTest, append_data_async) { } TEST_F(IsomorphicBatchWriteTest, append_data_sync) { - TGetLoadTxnStatusResult expect_result; - expect_result.__set_status(TTransactionStatus::UNKNOWN); - test_append_data_sync_base(Status::InternalError("Artificial failure"), expect_result, - Status::InternalError("Failed to get load status, Internal error: Artificial failure")); - expect_result.__set_status(TTransactionStatus::PREPARE); - test_append_data_sync_base(Status::OK(), expect_result, Status::TimedOut("load timeout, txn status: PREPARE")); - expect_result.__set_status(TTransactionStatus::PREPARED); - test_append_data_sync_base(Status::OK(), expect_result, Status::TimedOut("load timeout, txn status: PREPARED")); - expect_result.__set_status(TTransactionStatus::COMMITTED); - test_append_data_sync_base(Status::OK(), expect_result, + test_append_data_sync_base(1, "label1", {TTransactionStatus::UNKNOWN, ""}, + Status::InternalError("Can't find the transaction, reason: ")); + test_append_data_sync_base(2, "label2", {TTransactionStatus::COMMITTED, ""}, Status::PublishTimeout("Load has not been published before timeout")); - expect_result.__set_status(TTransactionStatus::VISIBLE); - test_append_data_sync_base(Status::OK(), expect_result, Status::OK()); - expect_result.__set_status(TTransactionStatus::ABORTED); - expect_result.__set_reason("artificial failure"); - test_append_data_sync_base(Status::OK(), expect_result, + test_append_data_sync_base(3, "label3", {TTransactionStatus::VISIBLE, ""}, Status::OK()); + test_append_data_sync_base(4, "label4", {TTransactionStatus::ABORTED, "artificial failure"}, Status::InternalError("Load is aborted, reason: artificial failure")); - expect_result.__set_status(TTransactionStatus::UNKNOWN); - expect_result.__set_reason(""); - test_append_data_sync_base(Status::OK(), expect_result, Status::InternalError("Load status is unknown: UNKNOWN")); } -void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_status, - const TGetLoadTxnStatusResult& expect_result, +void IsomorphicBatchWriteTest::test_append_data_sync_base(int64_t txn_id, std::string label, const TxnState& txn_state, const Status& expect_st) { BatchWriteId batch_write_id{ .db = "db", .table = "table", .load_params = {{HTTP_MERGE_COMMIT_ASYNC, "false"}, {HTTP_TIMEOUT, "1"}}}; - IsomorphicBatchWriteSharedPtr batch_write = std::make_shared(batch_write_id, _executor.get()); + IsomorphicBatchWriteSharedPtr batch_write = + std::make_shared(batch_write_id, _executor.get(), _txn_state_cache.get()); ASSERT_OK(batch_write->init()); DeferOp defer_writer([&] { batch_write->stop(); }); @@ -281,9 +282,6 @@ void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_stat SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::request"); SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::status"); SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::response"); - SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::_wait_for_load_status::request"); - SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::_wait_for_load_status::status"); - SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::_wait_for_load_status::response"); SyncPoint::GetInstance()->DisableProcessing(); }); @@ -293,7 +291,7 @@ void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_stat SyncPoint::GetInstance()->SetCallBack("TimeBoundedStreamLoadPipe::get_current_ns", [&](void* arg) { *((int64_t*)arg) = 0; }); StreamLoadContext* pipe_ctx1 = - build_pipe_context("label1", 1, batch_write_id, std::make_shared("p1", 1000)); + build_pipe_context(label, txn_id, batch_write_id, std::make_shared("p1", 1000)); SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::status", [&](void* arg) { *((Status*)arg) = Status::OK(); }); SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::response", [&](void* arg) { @@ -301,26 +299,14 @@ void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_stat TStatus status; status.__set_status_code(TStatusCode::OK); result->__set_status(status); - result->__set_label("label1"); + result->__set_label(label); ASSERT_OK(batch_write->register_stream_load_pipe(pipe_ctx1)); }); // stream pipe left time is 100ms SyncPoint::GetInstance()->SetCallBack("TimeBoundedStreamLoadPipe::get_current_ns", [&](void* arg) { *((int64_t*)arg) = 900000000; }); - SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::_wait_for_load_status::request", [&](void* arg) { - TGetLoadTxnStatusRequest* request = (TGetLoadTxnStatusRequest*)arg; - EXPECT_EQ(batch_write_id.db, request->db); - EXPECT_EQ(batch_write_id.table, request->tbl); - EXPECT_EQ(1, request->txnId); - }); - SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::_wait_for_load_status::status", - [&](void* arg) { *((Status*)arg) = rpc_status; }); - SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::_wait_for_load_status::response", [&](void* arg) { - TGetLoadTxnStatusResult* result = (TGetLoadTxnStatusResult*)arg; - result->__set_status(expect_result.status); - result->__set_reason(expect_result.reason); - }); + ASSERT_OK(_txn_state_cache->push_state(txn_id, txn_state.txn_status, txn_state.reason)); StreamLoadContext* data_ctx1 = build_data_context(batch_write_id, "data1"); Status result = batch_write->append_data(data_ctx1); ASSERT_EQ(1, num_rpc_request); @@ -332,7 +318,8 @@ void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_stat TEST_F(IsomorphicBatchWriteTest, stop_write) { BatchWriteId batch_write_id{.db = "db", .table = "table", .load_params = {}}; - IsomorphicBatchWriteSharedPtr batch_write = std::make_shared(batch_write_id, _executor.get()); + IsomorphicBatchWriteSharedPtr batch_write = + std::make_shared(batch_write_id, _executor.get(), _txn_state_cache.get()); ASSERT_OK(batch_write->init()); DeferOp defer_writer([&] { batch_write->stop(); }); @@ -362,22 +349,23 @@ TEST_F(IsomorphicBatchWriteTest, stop_write) { TEST_F(IsomorphicBatchWriteTest, reach_max_rpc_retry) { BatchWriteId batch_write_id{.db = "db", .table = "table", .load_params = {{HTTP_MERGE_COMMIT_ASYNC, "true"}}}; - IsomorphicBatchWriteSharedPtr batch_write = std::make_shared(batch_write_id, _executor.get()); + IsomorphicBatchWriteSharedPtr batch_write = + std::make_shared(batch_write_id, _executor.get(), _txn_state_cache.get()); ASSERT_OK(batch_write->init()); DeferOp defer_writer([&] { batch_write->stop(); }); - auto old_retry_num = config::batch_write_rpc_request_retry_num; - auto old_retry_interval = config::batch_write_rpc_request_retry_interval_ms; - config::batch_write_rpc_request_retry_num = 5; - config::batch_write_rpc_request_retry_interval_ms = 10; + auto old_retry_num = config::merge_commit_rpc_request_retry_num; + auto old_retry_interval = config::merge_commit_rpc_request_retry_interval_ms; + config::merge_commit_rpc_request_retry_num = 5; + config::merge_commit_rpc_request_retry_interval_ms = 10; SyncPoint::GetInstance()->EnableProcessing(); DeferOp defer([&]() { SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::request"); SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::status"); SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::response"); SyncPoint::GetInstance()->DisableProcessing(); - config::batch_write_rpc_request_retry_num = old_retry_num; - config::batch_write_rpc_request_retry_interval_ms = old_retry_interval; + config::merge_commit_rpc_request_retry_num = old_retry_num; + config::merge_commit_rpc_request_retry_interval_ms = old_retry_interval; }); int num_rpc_request = 0; @@ -402,22 +390,23 @@ TEST_F(IsomorphicBatchWriteTest, reach_max_rpc_retry) { TEST_F(IsomorphicBatchWriteTest, stop_retry_if_rpc_failed) { BatchWriteId batch_write_id{.db = "db", .table = "table", .load_params = {{HTTP_MERGE_COMMIT_ASYNC, "true"}}}; - IsomorphicBatchWriteSharedPtr batch_write = std::make_shared(batch_write_id, _executor.get()); + IsomorphicBatchWriteSharedPtr batch_write = + std::make_shared(batch_write_id, _executor.get(), _txn_state_cache.get()); ASSERT_OK(batch_write->init()); DeferOp defer_writer([&] { batch_write->stop(); }); - auto old_retry_num = config::batch_write_rpc_request_retry_num; - auto old_retry_interval = config::batch_write_rpc_request_retry_interval_ms; - config::batch_write_rpc_request_retry_num = 5; - config::batch_write_rpc_request_retry_interval_ms = 10; + auto old_retry_num = config::merge_commit_rpc_request_retry_num; + auto old_retry_interval = config::merge_commit_rpc_request_retry_interval_ms; + config::merge_commit_rpc_request_retry_num = 5; + config::merge_commit_rpc_request_retry_interval_ms = 10; SyncPoint::GetInstance()->EnableProcessing(); DeferOp defer([&]() { SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::request"); SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::status"); SyncPoint::GetInstance()->ClearCallBack("IsomorphicBatchWrite::send_rpc_request::response"); SyncPoint::GetInstance()->DisableProcessing(); - config::batch_write_rpc_request_retry_num = old_retry_num; - config::batch_write_rpc_request_retry_interval_ms = old_retry_interval; + config::merge_commit_rpc_request_retry_num = old_retry_num; + config::merge_commit_rpc_request_retry_interval_ms = old_retry_interval; }); // rpc failed diff --git a/be/test/runtime/batch_write/txn_state_cache_test.cpp b/be/test/runtime/batch_write/txn_state_cache_test.cpp new file mode 100644 index 00000000000000..cd8b83793a3298 --- /dev/null +++ b/be/test/runtime/batch_write/txn_state_cache_test.cpp @@ -0,0 +1,694 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "runtime/batch_write/txn_state_cache.h" + +#include "testutil/assert.h" +#include "util/await.h" + +namespace starrocks { + +class TxnStateCacheTest : public testing::Test { +public: + TxnStateCacheTest() = default; + ~TxnStateCacheTest() override = default; + + void SetUp() override { + config::merge_commit_trace_log_enable = true; + _db = "test_db"; + _tbl = "test_tbl"; + _auth = {"test_user", "test_password"}; + ASSERT_OK(ThreadPoolBuilder("IsomorphicBatchWriteTest") + .set_min_threads(0) + .set_max_threads(1) + .set_max_queue_size(2048) + .set_idle_timeout(MonoDelta::FromMilliseconds(10000)) + .build(&_thread_pool)); + } + + void TearDown() override { + if (_thread_pool) { + _thread_pool->shutdown(); + } + } + + std::unique_ptr create_cache(int32_t capacity) { + // use serial mode to run poll task in the same order of their execution time, this is just used for testing + std::unique_ptr token = _thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL); + std::unique_ptr cache = std::make_unique(capacity, std::move(token)); + EXPECT_OK(cache->init()); + return cache; + } + +protected: + std::string _db; + std::string _tbl; + AuthInfo _auth; + +private: + std::unique_ptr _thread_pool; +}; + +void assert_txn_state_eq(const TxnState& expected, const TxnState& actual) { + ASSERT_EQ(expected.txn_status, actual.txn_status); + ASSERT_EQ(expected.reason, actual.reason); +} + +TEST_F(TxnStateCacheTest, handler_push_state) { + // PREPARE -> COMMITTED -> VISIBLE + { + TxnStateHandler handler; + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + handler.push_state(TTransactionStatus::COMMITTED, ""); + assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, handler.txn_state()); + ASSERT_TRUE(handler.committed_status_from_fe()); + handler.push_state(TTransactionStatus::VISIBLE, ""); + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, handler.txn_state()); + } + + // PREPARE -> ABORTED + { + TxnStateHandler handler; + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + handler.push_state(TTransactionStatus::ABORTED, "manual failure"); + assert_txn_state_eq({TTransactionStatus::ABORTED, "manual failure"}, handler.txn_state()); + } + + // PREPARE -> UNKNOWN + { + TxnStateHandler handler; + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + handler.push_state(TTransactionStatus::UNKNOWN, ""); + assert_txn_state_eq({TTransactionStatus::UNKNOWN, ""}, handler.txn_state()); + } +} + +TEST_F(TxnStateCacheTest, handler_poll_state) { + bool trigger_poll = false; + // PREPARE -> PREPARED -> COMMITTED -> VISIBLE + { + TxnStateHandler handler; + handler.subscribe(trigger_poll); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + ASSERT_TRUE(handler.poll_state(TxnState{TTransactionStatus::PREPARED, ""})); + assert_txn_state_eq({TTransactionStatus::PREPARED, ""}, handler.txn_state()); + ASSERT_TRUE(handler.poll_state(TxnState{TTransactionStatus::COMMITTED, ""})); + assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, handler.txn_state()); + ASSERT_FALSE(handler.committed_status_from_fe()); + ASSERT_FALSE(handler.poll_state(TxnState{TTransactionStatus::VISIBLE, ""})); + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, handler.txn_state()); + } + + // PREPARE -> ABORTED + { + TxnStateHandler handler; + handler.subscribe(trigger_poll); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + ASSERT_FALSE(handler.poll_state(TxnState{TTransactionStatus::ABORTED, "manual failure"})); + assert_txn_state_eq({TTransactionStatus::ABORTED, "manual failure"}, handler.txn_state()); + } + + // PREPARE -> UNKNOWN + { + TxnStateHandler handler; + handler.subscribe(trigger_poll); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + ASSERT_FALSE(handler.poll_state(TxnState{TTransactionStatus::UNKNOWN, ""})); + assert_txn_state_eq({TTransactionStatus::UNKNOWN, ""}, handler.txn_state()); + } + + // max failure (merge_commit_txn_state_poll_max_fail_times) = 2, and only one failure + { + TxnStateHandler handler; + handler.subscribe(trigger_poll); + ASSERT_EQ(0, handler.num_poll_failure()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + ASSERT_TRUE(handler.poll_state(Status::InternalError("artificial failure"))); + ASSERT_EQ(1, handler.num_poll_failure()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + ASSERT_TRUE(handler.poll_state(TxnState{TTransactionStatus::COMMITTED, ""})); + ASSERT_EQ(0, handler.num_poll_failure()); + assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, handler.txn_state()); + ASSERT_FALSE(handler.committed_status_from_fe()); + ASSERT_TRUE(handler.poll_state(Status::InternalError("artificial failure"))); + ASSERT_EQ(1, handler.num_poll_failure()); + assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, handler.txn_state()); + ASSERT_FALSE(handler.poll_state(TxnState{TTransactionStatus::VISIBLE, ""})); + ASSERT_EQ(0, handler.num_poll_failure()); + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, handler.txn_state()); + } + + // max failure (merge_commit_txn_state_poll_max_fail_times) = 2, and reach max failure + { + TxnStateHandler handler; + handler.subscribe(trigger_poll); + ASSERT_EQ(0, handler.num_poll_failure()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + ASSERT_TRUE(handler.poll_state(TxnState{TTransactionStatus::COMMITTED, ""})); + ASSERT_EQ(0, handler.num_poll_failure()); + assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, handler.txn_state()); + ASSERT_FALSE(handler.committed_status_from_fe()); + ASSERT_TRUE(handler.poll_state(Status::InternalError("artificial failure"))); + ASSERT_EQ(1, handler.num_poll_failure()); + assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, handler.txn_state()); + ASSERT_FALSE(handler.poll_state(Status::InternalError("artificial failure"))); + ASSERT_EQ(2, handler.num_poll_failure()); + assert_txn_state_eq( + {TTransactionStatus::UNKNOWN, "poll txn state failure exceeds max times 2, last error: " + + Status::InternalError("artificial failure").to_string(false)}, + handler.txn_state()); + } + + // no subscriber + { + TxnStateHandler handler; + handler.subscribe(trigger_poll); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, handler.txn_state()); + ASSERT_TRUE(handler.poll_state(TxnState{TTransactionStatus::PREPARED, ""})); + assert_txn_state_eq({TTransactionStatus::PREPARED, ""}, handler.txn_state()); + handler.unsubscribe(); + ASSERT_FALSE(handler.poll_state(TxnState{TTransactionStatus::COMMITTED, ""})); + } +} + +TEST_F(TxnStateCacheTest, handler_subscriber) { + TxnStateHandler handler; + bool trigger_poll = false; + handler.subscribe(trigger_poll); + ASSERT_EQ(1, handler.TEST_num_subscriber()); + ASSERT_TRUE(trigger_poll); + handler.subscribe(trigger_poll); + ASSERT_EQ(2, handler.TEST_num_subscriber()); + ASSERT_FALSE(trigger_poll); + handler.push_state(TTransactionStatus::VISIBLE, ""); + handler.unsubscribe(); + ASSERT_EQ(1, handler.TEST_num_subscriber()); + handler.unsubscribe(); + ASSERT_EQ(0, handler.TEST_num_subscriber()); + handler.subscribe(trigger_poll); + ASSERT_EQ(1, handler.TEST_num_subscriber()); + ASSERT_FALSE(trigger_poll); +} + +TEST_F(TxnStateCacheTest, handler_push_state_notify_subscriber) { + TxnStateHandler handler; + StatusOr expected_status; + auto wait_func = [&](const std::string& name, int64_t timeout_us) { + bool trigger_poll = false; + handler.subscribe(trigger_poll); + auto st = handler.wait_finished_state(name, timeout_us); + ASSERT_EQ(expected_status.status().to_string(), st.status().to_string()); + if (st.ok()) { + assert_txn_state_eq(expected_status.value(), st.value()); + } + }; + + // wait timeout + expected_status = Status::TimedOut("Wait txn state timeout 10000 us"); + auto t0 = std::thread([&]() { wait_func("t0", 10000); }); + t0.join(); + ASSERT_EQ(0, handler.num_waiting_finished_state()); + + // wait until final state + auto t1 = std::thread([&]() { wait_func("t1", 60000000); }); + auto t2 = std::thread([&]() { wait_func("t2", 60000000); }); + ASSERT_TRUE(Awaitility().timeout(5000000).until([&] { return handler.num_waiting_finished_state() == 2; })); + expected_status = {TTransactionStatus::VISIBLE, ""}; + handler.push_state(TTransactionStatus::VISIBLE, ""); + t1.join(); + t2.join(); + ASSERT_EQ(0, handler.num_waiting_finished_state()); + + // already in final state + auto t3 = std::thread([&]() { wait_func("t3", 60000000); }); + t3.join(); + ASSERT_EQ(0, handler.num_waiting_finished_state()); +} + +TEST_F(TxnStateCacheTest, handler_poll_state_notify_subscriber) { + TxnStateHandler handler; + StatusOr expected_status; + auto wait_func = [&](const std::string& name, int64_t timeout_us) { + bool trigger_poll = false; + handler.subscribe(trigger_poll); + auto st = handler.wait_finished_state(name, timeout_us); + ASSERT_EQ(expected_status.status().to_string(), st.status().to_string()); + if (st.ok()) { + assert_txn_state_eq(expected_status.value(), st.value()); + } + }; + + auto t1 = std::thread([&]() { wait_func("t1", 60000000); }); + auto t2 = std::thread([&]() { wait_func("t2", 60000000); }); + ASSERT_TRUE(Awaitility().timeout(5000000).until([&] { return handler.num_waiting_finished_state() == 2; })); + expected_status = {TTransactionStatus::VISIBLE, ""}; + handler.poll_state(TxnState{TTransactionStatus::VISIBLE, ""}); + t1.join(); + t2.join(); + ASSERT_EQ(0, handler.num_waiting_finished_state()); +} + +TEST_F(TxnStateCacheTest, handler_stop) { + TxnStateHandler handler; + StatusOr expected_status; + auto wait_func = [&](const std::string& name, int64_t timeout_us) { + bool trigger_poll = false; + handler.subscribe(trigger_poll); + auto st = handler.wait_finished_state(name, timeout_us); + ASSERT_EQ(expected_status.status().to_string(), st.status().to_string()); + if (st.ok()) { + assert_txn_state_eq(expected_status.value(), st.value()); + } + }; + + // wait when stopped + auto t1 = std::thread([&]() { wait_func("t1", 60000000); }); + auto t2 = std::thread([&]() { wait_func("t2", 60000000); }); + ASSERT_TRUE(Awaitility().timeout(60000000).until([&] { return handler.num_waiting_finished_state() == 2; })); + expected_status = Status::ServiceUnavailable("Transaction state handler is stopped"); + handler.stop(); + t1.join(); + t2.join(); + + // already stopped + auto t3 = std::thread([&]() { wait_func("t3", 60000000); }); + t3.join(); +} + +TEST_F(TxnStateCacheTest, poller_skip_schedule) { + auto cache = create_cache(2048); + auto old_poll_interval_ms = config::merge_commit_txn_state_poll_interval_ms; + config::merge_commit_txn_state_poll_interval_ms = 100; + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&] { + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::get_current_ms"); + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::_execute_poll::request"); + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::_execute_poll::status"); + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::_execute_poll::response"); + SyncPoint::GetInstance()->DisableProcessing(); + cache->stop(); + config::merge_commit_txn_state_poll_interval_ms = old_poll_interval_ms; + }); + std::atomic num_rpc = 0; + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::request", + [&](void* arg) { num_rpc.fetch_add(1); }); + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::status", + [&](void* arg) { *((Status*)arg) = Status::OK(); }); + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::response", [&](void* arg) { + TGetLoadTxnStatusResult* result = (TGetLoadTxnStatusResult*)arg; + result->__set_status(TTransactionStatus::VISIBLE); + result->__set_reason(""); + }); + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 0; }); + + TxnStatePoller* poller = cache->txn_state_poller(); + ASSERT_TRUE(cache->get_state(1).status().is_not_found()); + // create a subscriber to trigger the poll scheduling + auto s1 = cache->subscribe_state(1, "s1", _db, _tbl, _auth); + ASSERT_OK(s1.status()); + ASSERT_TRUE(poller->TEST_is_txn_pending(1)); + + // transit the state to the final before scheduling the poll, and the poll should be skipped + ASSERT_OK(cache->push_state(1, TTransactionStatus::VISIBLE, "")); + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, cache->get_state(1).value()); + // advance the time to schedule the poll for txn 1 + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 200; }); + // create subscriber for txn 2 to trigger the poll scheduling + auto s2 = cache->subscribe_state(2, "s2", _db, _tbl, _auth); + ASSERT_OK(s2.status()); + ASSERT_TRUE(poller->TEST_is_txn_pending(2)); + // advance the time to schedule the poll for txn 2 + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 400; }); + // when state for txn 2 becomes visible, it means poll task for txn 2 has been finished. The thread pool token + // is serial, so the poll task (if not skipped) for txn 1 also should be finished + ASSERT_TRUE(Awaitility().timeout(5000000).until( + [&] { return s2.value()->current_state().txn_status == TTransactionStatus::VISIBLE; })); + ASSERT_EQ(1, num_rpc.load()); +} + +TEST_F(TxnStateCacheTest, cache_push_state) { + auto cache = create_cache(2048); + DeferOp defer([&] { cache->stop(); }); + + ASSERT_TRUE(cache->get_state(1).status().is_not_found()); + ASSERT_OK(cache->push_state(1, TTransactionStatus::PREPARE, "")); + auto st_1 = cache->get_state(1); + ASSERT_OK(st_1.status()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, st_1.value()); + ASSERT_OK(cache->push_state(1, TTransactionStatus::VISIBLE, "")); + st_1 = cache->get_state(1); + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, st_1.value()); + + ASSERT_TRUE(cache->get_state(2).status().is_not_found()); + ASSERT_OK(cache->push_state(2, TTransactionStatus::ABORTED, "artificial failure")); + auto st_2 = cache->get_state(2); + ASSERT_OK(st_2.status()); + assert_txn_state_eq({TTransactionStatus::ABORTED, "artificial failure"}, st_2.value()); + + ASSERT_TRUE(cache->get_state(3).status().is_not_found()); + ASSERT_OK(cache->push_state(3, TTransactionStatus::UNKNOWN, "")); + auto st_3 = cache->get_state(3); + ASSERT_OK(st_3.status()); + assert_txn_state_eq({TTransactionStatus::UNKNOWN, ""}, st_3.value()); +} + +TEST_F(TxnStateCacheTest, cache_push_state_notify_subscriber) { + auto cache = create_cache(2048); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&] { + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::get_current_ms"); + SyncPoint::GetInstance()->DisableProcessing(); + cache->stop(); + }); + // disable poller to avoid unexpected txn state update + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 0; }); + + auto wait_func = [&](TxnStateSubscriber* subscriber, int64_t timeout_us, StatusOr expected) { + auto st = subscriber->wait_finished_state(timeout_us); + ASSERT_EQ(expected.status().to_string(), st.status().to_string()); + if (st.ok()) { + assert_txn_state_eq(expected.value(), st.value()); + } + }; + + ASSERT_TRUE(cache->get_state(1).status().is_not_found()); + auto s1_1 = cache->subscribe_state(1, "s1_1", _db, _tbl, _auth); + ASSERT_OK(s1_1.status()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s1_1.value()->current_state()); + auto s1_2 = cache->subscribe_state(1, "s1_2", _db, _tbl, _auth); + ASSERT_OK(s1_2.status()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s1_2.value()->current_state()); + + ASSERT_TRUE(cache->get_state(2).status().is_not_found()); + ASSERT_OK(cache->push_state(2, TTransactionStatus::ABORTED, "artificial failure")); + auto s2_1 = cache->subscribe_state(2, "s2_1", _db, _tbl, _auth); + ASSERT_OK(s2_1.status()); + assert_txn_state_eq({TTransactionStatus::ABORTED, "artificial failure"}, s2_1.value()->current_state()); + + auto t1_1 = std::thread([&]() { + wait_func(s1_1.value().get(), 60000000, StatusOr({TTransactionStatus::VISIBLE, ""})); + }); + ASSERT_OK(cache->push_state(1, TTransactionStatus::VISIBLE, "")); + t1_1.join(); + + auto t1_2 = std::thread([&]() { + wait_func(s1_2.value().get(), 60000000, StatusOr({TTransactionStatus::VISIBLE, ""})); + }); + t1_2.join(); + + auto t2_1 = std::thread([&]() { + wait_func(s2_1.value().get(), 60000000, + StatusOr({TTransactionStatus::ABORTED, "artificial failure"})); + }); + t2_1.join(); + + ASSERT_TRUE(cache->get_state(3).status().is_not_found()); + auto s3_1 = cache->subscribe_state(3, "s3_1", _db, _tbl, _auth); + ASSERT_OK(s3_1.status()); + auto t3_1 = std::thread([&]() { + wait_func(s3_1.value().get(), 10000, StatusOr(Status::TimedOut("Wait txn state timeout 10000 us"))); + }); + t3_1.join(); +} + +TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) { + auto cache = create_cache(2048); + auto old_poll_interval_ms = config::merge_commit_txn_state_poll_interval_ms; + config::merge_commit_txn_state_poll_interval_ms = 100; + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&] { + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::get_current_ms"); + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::_execute_poll::request"); + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::_execute_poll::status"); + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::_execute_poll::response"); + SyncPoint::GetInstance()->DisableProcessing(); + cache->stop(); + config::merge_commit_txn_state_poll_interval_ms = old_poll_interval_ms; + }); + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 0; }); + + auto wait_func = [&](TxnStateSubscriber* subscriber, int64_t timeout_us, StatusOr expected) { + auto st = subscriber->wait_finished_state(timeout_us); + ASSERT_EQ(expected.status().to_string(), st.status().to_string()); + if (st.ok()) { + assert_txn_state_eq(expected.value(), st.value()); + } + }; + + std::atomic num_rpc = 0; + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::request", + [&](void* arg) { num_rpc.fetch_add(1); }); + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::status", + [&](void* arg) { *((Status*)arg) = Status::OK(); }); + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::response", [&](void* arg) { + TGetLoadTxnStatusResult* result = (TGetLoadTxnStatusResult*)arg; + result->__set_status(TTransactionStatus::VISIBLE); + result->__set_reason(""); + }); + + // txn 1 and 2 should be scheduled at time 100, current is 0 + TxnStatePoller* poller = cache->txn_state_poller(); + ASSERT_TRUE(cache->get_state(1).status().is_not_found()); + auto s1_1 = cache->subscribe_state(1, "s1_1", _db, _tbl, _auth); + ASSERT_OK(s1_1.status()); + ASSERT_TRUE(poller->TEST_is_txn_pending(1)); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s1_1.value()->current_state()); + auto s1_2 = cache->subscribe_state(1, "s1_2", _db, _tbl, _auth); + ASSERT_OK(s1_2.status()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s1_2.value()->current_state()); + + ASSERT_TRUE(cache->get_state(2).status().is_not_found()); + auto s2_1 = cache->subscribe_state(2, "s2_1", _db, _tbl, _auth); + ASSERT_OK(s2_1.status()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s2_1.value()->current_state()); + ASSERT_TRUE(poller->TEST_is_txn_pending(2)); + + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 50; }); + // txn 3 should be scheduled at time 150, current is 50 + ASSERT_TRUE(cache->get_state(3).status().is_not_found()); + auto s3_1 = cache->subscribe_state(3, "s3_1", _db, _tbl, _auth); + ASSERT_OK(s3_1.status()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s3_1.value()->current_state()); + ASSERT_TRUE(poller->TEST_is_txn_pending(1)); + ASSERT_TRUE(poller->TEST_is_txn_pending(2)); + ASSERT_TRUE(poller->TEST_is_txn_pending(3)); + + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 80; }); + // txn 4 should be scheduled at time 180, current is 80 + ASSERT_TRUE(cache->get_state(4).status().is_not_found()); + auto s4_1 = cache->subscribe_state(4, "s4_1", _db, _tbl, _auth); + ASSERT_OK(s4_1.status()); + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s4_1.value()->current_state()); + ASSERT_TRUE(poller->TEST_is_txn_pending(1)); + ASSERT_TRUE(poller->TEST_is_txn_pending(2)); + ASSERT_TRUE(poller->TEST_is_txn_pending(3)); + ASSERT_TRUE(poller->TEST_is_txn_pending(4)); + + auto t1_1 = std::thread([&]() { + wait_func(s1_1.value().get(), 60000000, StatusOr({TTransactionStatus::VISIBLE, ""})); + }); + auto t1_2 = std::thread([&]() { + wait_func(s1_2.value().get(), 60000000, StatusOr({TTransactionStatus::VISIBLE, ""})); + }); + auto t2_1 = std::thread([&]() { + wait_func(s2_1.value().get(), 60000000, StatusOr({TTransactionStatus::VISIBLE, ""})); + }); + auto t3_1 = std::thread([&]() { + wait_func(s3_1.value().get(), 60000000, StatusOr({TTransactionStatus::VISIBLE, ""})); + }); + + // advance time and should trigger txn 1, 2 and 3 to poll + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 160; }); + t1_1.join(); + t1_2.join(); + t2_1.join(); + t3_1.join(); + + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s1_1.value()->current_state()); + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s2_1.value()->current_state()); + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s3_1.value()->current_state()); + ASSERT_FALSE(poller->TEST_is_txn_pending(1)); + ASSERT_FALSE(poller->TEST_is_txn_pending(2)); + ASSERT_FALSE(poller->TEST_is_txn_pending(3)); + ASSERT_TRUE(poller->TEST_is_txn_pending(4)); + ASSERT_EQ(3, num_rpc.load()); + + assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s4_1.value()->current_state()); + auto t4_1 = std::thread([&]() { + wait_func(s4_1.value().get(), 60000000, StatusOr({TTransactionStatus::VISIBLE, ""})); + }); + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::response", [&](void* arg) { + TGetLoadTxnStatusResult* result = (TGetLoadTxnStatusResult*)arg; + result->__set_status(TTransactionStatus::COMMITTED); + result->__set_reason(""); + }); + // advance time and trigger txn 4 to poll. it should continue polling because + // the state is COMMITTED which is not final. next poll time is 300 + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 200; }); + ASSERT_TRUE(Awaitility().timeout(5000000).until([&] { + auto st = poller->TEST_pending_execution_time(4); + return st.ok() && st.value() == 300; + })); + assert_txn_state_eq({TTransactionStatus::COMMITTED, ""}, s4_1.value()->current_state()); + ASSERT_EQ(4, num_rpc.load()); + + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::response", [&](void* arg) { + TGetLoadTxnStatusResult* result = (TGetLoadTxnStatusResult*)arg; + result->__set_status(TTransactionStatus::VISIBLE); + result->__set_reason(""); + }); + // advance time and trigger txn 4 to poll again. This time it should reach the finished state + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 400; }); + t4_1.join(); + ASSERT_EQ(5, num_rpc.load()); + assert_txn_state_eq({TTransactionStatus::VISIBLE, ""}, s4_1.value()->current_state()); +} + +TEST_F(TxnStateCacheTest, cache_eviction) { + int32_t numShards = 1 << 5; + auto cache = create_cache(numShards * 3); + DeferOp defer_stop([&] { cache->stop(); }); + + int64_t evict_txn_id = -1; + int32_t num_evict = 0; + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([]() { + SyncPoint::GetInstance()->ClearCallBack("TxnStateHandler::destruct"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + SyncPoint::GetInstance()->SetCallBack("TxnStateHandler::destruct", [&](void* arg) { + TxnStateHandler* handler = (TxnStateHandler*)arg; + evict_txn_id = handler->txn_id(); + num_evict += 1; + }); + ASSERT_OK(cache->push_state(1 << 5, TTransactionStatus::VISIBLE, "")); + ASSERT_EQ(1, cache->size()); + ASSERT_EQ(0, num_evict); + auto s1 = cache->subscribe_state(2 << 5, "s1", _db, _tbl, _auth); + ASSERT_OK(s1.status()); + ASSERT_EQ(2, cache->size()); + ASSERT_EQ(0, num_evict); + ASSERT_OK(cache->push_state(3 << 5, TTransactionStatus::VISIBLE, "")); + ASSERT_EQ(3, cache->size()); + ASSERT_EQ(0, num_evict); + + ASSERT_OK(cache->push_state(4 << 5, TTransactionStatus::VISIBLE, "")); + ASSERT_EQ(3, cache->size()); + ASSERT_EQ(1 << 5, evict_txn_id); + ASSERT_EQ(1, num_evict); + + auto s2 = cache->subscribe_state(5 << 5, "s2", _db, _tbl, _auth); + ASSERT_OK(s2.status()); + ASSERT_EQ(3, cache->size()); + ASSERT_EQ(3 << 5, evict_txn_id); + ASSERT_EQ(2, num_evict); + + auto s3 = cache->subscribe_state(6 << 5, "s3", _db, _tbl, _auth); + ASSERT_OK(s3.status()); + ASSERT_EQ(3, cache->size()); + ASSERT_EQ(4 << 5, evict_txn_id); + ASSERT_EQ(3, num_evict); + + s1.value().reset(); + ASSERT_OK(cache->push_state(7 << 5, TTransactionStatus::VISIBLE, "")); + ASSERT_EQ(3, cache->size()); + ASSERT_EQ(2 << 5, evict_txn_id); + ASSERT_EQ(4, num_evict); +} + +TEST_F(TxnStateCacheTest, cache_set_capacity) { + auto cache = create_cache(2048); + DeferOp defer([&] { cache->stop(); }); + auto shards = cache->get_cache_shards(); + for (auto shard : shards) { + ASSERT_EQ(2048 / 32, shard->capacity()); + } + cache->set_capacity(4096); + for (auto shard : shards) { + ASSERT_EQ(4096 / 32, shard->capacity()); + } +} + +TEST_F(TxnStateCacheTest, cache_clean_txn_state) { + auto old_clean_interval_sec = config::merge_commit_txn_state_clean_interval_sec; + auto old_expire_time_sec = config::merge_commit_txn_state_expire_time_sec; + config::merge_commit_txn_state_clean_interval_sec = 1; + config::merge_commit_txn_state_expire_time_sec = 1; + auto cache = create_cache(2048); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&] { + SyncPoint::GetInstance()->ClearCallBack("TxnStateHandler::destruct"); + SyncPoint::GetInstance()->DisableProcessing(); + cache->stop(); + config::merge_commit_txn_state_clean_interval_sec = old_clean_interval_sec; + config::merge_commit_txn_state_expire_time_sec = old_expire_time_sec; + }); + std::atomic num_evict = 0; + SyncPoint::GetInstance()->SetCallBack("TxnStateHandler::destruct", [&](void* arg) { num_evict += 1; }); + ASSERT_OK(cache->push_state(1, TTransactionStatus::VISIBLE, "")); + ASSERT_OK(cache->push_state(2, TTransactionStatus::VISIBLE, "")); + ASSERT_OK(cache->push_state(3, TTransactionStatus::VISIBLE, "")); + ASSERT_TRUE(Awaitility().timeout(10000000).until([&] { return num_evict == 3; })); + for (auto shard : cache->get_cache_shards()) { + ASSERT_EQ(0, shard->object_size()); + } +} + +TEST_F(TxnStateCacheTest, cache_stop) { + auto cache = create_cache(2048); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&] { + SyncPoint::GetInstance()->ClearCallBack("TxnStatePoller::get_current_ms"); + SyncPoint::GetInstance()->DisableProcessing(); + cache->stop(); + }); + // disable poller to avoid unexpected txn state update + SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 0; }); + + Status expected_status = Status::ServiceUnavailable("Transaction state handler is stopped"); + auto wait_func = [&](TxnStateSubscriber* subscriber, int64_t timeout_us, StatusOr expected) { + auto st = subscriber->wait_finished_state(timeout_us); + ASSERT_EQ(expected.status().to_string(), st.status().to_string()); + if (st.ok()) { + assert_txn_state_eq(expected.value(), st.value()); + } + }; + + auto s1 = cache->subscribe_state(1, "s1", _db, _tbl, _auth); + ASSERT_OK(s1.status()); + auto t1 = std::thread([&]() { wait_func(s1.value().get(), 60000000, StatusOr(expected_status)); }); + auto s2 = cache->subscribe_state(1, "s2", _db, _tbl, _auth); + ASSERT_OK(s2.status()); + auto s3 = cache->subscribe_state(2, "s3", _db, _tbl, _auth); + ASSERT_OK(s3.status()); + auto t3 = std::thread([&]() { wait_func(s3.value().get(), 60000000, StatusOr(expected_status)); }); + + ASSERT_TRUE(Awaitility().timeout(5000000).until( + [&] { return s1.value()->entry()->value().num_waiting_finished_state() == 1; })); + ASSERT_TRUE(Awaitility().timeout(5000000).until( + [&] { return s3.value()->entry()->value().num_waiting_finished_state() == 1; })); + cache->stop(); + t1.join(); + t3.join(); + auto wait_st = s2.value()->wait_finished_state(10000); + ASSERT_EQ(expected_status.to_string(), wait_st.status().to_string()); + + expected_status = Status::ServiceUnavailable("Transaction state cache is stopped"); + ASSERT_EQ(expected_status.to_string(), cache->push_state(3, TTransactionStatus::VISIBLE, "").to_string(false)); + ASSERT_EQ(expected_status.to_string(), cache->subscribe_state(3, "s4", _db, _tbl, _auth).status().to_string(false)); + ASSERT_FALSE(cache->txn_state_poller()->TEST_is_scheduling()); +} + +} // namespace starrocks diff --git a/be/test/util/dynamic_cache_test.cpp b/be/test/util/dynamic_cache_test.cpp index b6ec91056cf380..c7a43b6f55e3bc 100644 --- a/be/test/util/dynamic_cache_test.cpp +++ b/be/test/util/dynamic_cache_test.cpp @@ -86,4 +86,19 @@ TEST(DynamicCacheTest, cache2) { } } +TEST(DynamicCacheTest, get_all_entries) { + DynamicCache cache(100); + for (int i = 0; i < 20; i++) { + auto e = cache.get_or_create(i, 1); + cache.release(e); + } + std::vector::Entry*> entries = cache.get_all_entries(); + ASSERT_EQ(20, entries.size()); + for (int i = 0; i < 20; i++) { + auto e = entries[i]; + ASSERT_EQ(i, e->key()); + cache.release(e); + } +} + } // namespace starrocks