Skip to content

Commit

Permalink
[Enhancement] Introduce TxnStateCache for merge commit sync mode (bac…
Browse files Browse the repository at this point in the history
…kport #55001) (#55150)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Jan 16, 2025
1 parent 5161c3f commit faf0e38
Show file tree
Hide file tree
Showing 19 changed files with 1,768 additions and 169 deletions.
22 changes: 13 additions & 9 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1517,15 +1517,19 @@ CONF_mInt32(apply_version_slow_log_sec, "30");
CONF_mInt32(merge_commit_stream_load_pipe_block_wait_us, "500");
// The maximum number of bytes that the merge commit stream load pipe can buffer.
CONF_mInt64(merge_commit_stream_load_pipe_max_buffered_bytes, "1073741824");
CONF_Int32(batch_write_thread_pool_num_min, "0");
CONF_Int32(batch_write_thread_pool_num_max, "512");
CONF_Int32(batch_write_thread_pool_queue_size, "4096");
CONF_mInt32(batch_write_default_timeout_ms, "600000");
CONF_mInt32(batch_write_rpc_request_retry_num, "10");
CONF_mInt32(batch_write_rpc_request_retry_interval_ms, "500");
CONF_mInt32(batch_write_rpc_reqeust_timeout_ms, "10000");
CONF_mInt32(batch_write_poll_load_status_interval_ms, "200");
CONF_mBool(batch_write_trace_log_enable, "false");
CONF_Int32(merge_commit_thread_pool_num_min, "0");
CONF_Int32(merge_commit_thread_pool_num_max, "512");
CONF_Int32(merge_commit_thread_pool_queue_size, "4096");
CONF_mInt32(merge_commit_default_timeout_ms, "600000");
CONF_mInt32(merge_commit_rpc_request_retry_num, "10");
CONF_mInt32(merge_commit_rpc_request_retry_interval_ms, "500");
CONF_mInt32(merge_commit_rpc_reqeust_timeout_ms, "10000");
CONF_mBool(merge_commit_trace_log_enable, "false");
CONF_mInt32(merge_commit_txn_state_cache_capacity, "4096");
CONF_mInt32(merge_commit_txn_state_clean_interval_sec, "300");
CONF_mInt32(merge_commit_txn_state_expire_time_sec, "1800");
CONF_mInt32(merge_commit_txn_state_poll_interval_ms, "2000");
CONF_mInt32(merge_commit_txn_state_poll_max_fail_times, "2");

// ignore union type tag in avro kafka routine load
CONF_mBool(avro_ignore_union_type_tag, "false");
Expand Down
10 changes: 10 additions & 0 deletions be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "runtime/batch_write/batch_write_mgr.h"
#include "runtime/batch_write/txn_state_cache.h"
#include "storage/compaction_manager.h"
#include "storage/lake/compaction_scheduler.h"
#include "storage/lake/tablet_manager.h"
Expand Down Expand Up @@ -316,6 +318,14 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
}
return Status::OK();
});
_config_callback.emplace("merge_commit_txn_state_cache_capacity", [&]() -> Status {
LOG(INFO) << "set merge_commit_txn_state_cache_capacity: " << config::merge_commit_txn_state_cache_capacity;
auto batch_write_mgr = _exec_env->batch_write_mgr();
if (batch_write_mgr) {
batch_write_mgr->txn_state_cache()->set_capacity(config::merge_commit_txn_state_cache_capacity);
}
return Status::OK();
});

#ifdef USE_STAROS
#define UPDATE_STARLET_CONFIG(BE_CONFIG, STARLET_CONFIG) \
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ set(RUNTIME_FILES
batch_write/isomorphic_batch_write.cpp
batch_write/batch_write_mgr.cpp
batch_write/batch_write_util.cpp
batch_write/txn_state_cache.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_group.cpp
routine_load/data_consumer_pool.cpp
Expand Down
55 changes: 53 additions & 2 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@

namespace starrocks {

BatchWriteMgr::BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor) : _executor(std::move(executor)) {}

Status BatchWriteMgr::init() {
std::unique_ptr<ThreadPoolToken> token =
_executor->get_thread_pool()->new_token(ThreadPool::ExecutionMode::CONCURRENT);
_txn_state_cache = std::make_unique<TxnStateCache>(config::merge_commit_txn_state_cache_capacity, std::move(token));
return _txn_state_cache->init();
}

Status BatchWriteMgr::register_stream_load_pipe(StreamLoadContext* pipe_ctx) {
BatchWriteId batch_write_id = {
.db = pipe_ctx->db, .table = pipe_ctx->table, .load_params = pipe_ctx->load_parameters};
Expand Down Expand Up @@ -78,7 +87,7 @@ StatusOr<IsomorphicBatchWriteSharedPtr> BatchWriteMgr::_get_batch_write(const st
return it->second;
}

auto batch_write = std::make_shared<IsomorphicBatchWrite>(batch_write_id, _executor.get());
auto batch_write = std::make_shared<IsomorphicBatchWrite>(batch_write_id, _executor.get(), _txn_state_cache.get());
Status st = batch_write->init();
if (!st.ok()) {
LOG(ERROR) << "Fail to init batch write, " << batch_write_id << ", status: " << st;
Expand All @@ -105,6 +114,10 @@ void BatchWriteMgr::stop() {
for (auto& batch_write : stop_writes) {
batch_write->stop();
}
if (_txn_state_cache) {
_txn_state_cache->stop();
}
_executor->get_thread_pool()->shutdown();
}

StatusOr<StreamLoadContext*> BatchWriteMgr::create_and_register_pipe(
Expand Down Expand Up @@ -224,7 +237,45 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
ctx->buffer->flip();
ctx->receive_bytes = io_buf.size();
ctx->mc_read_data_cost_nanos = MonotonicNanos() - ctx->start_nanos;
ctx->status = exec_env->batch_write_mgr()->append_data(ctx);
ctx->status = append_data(ctx);
}

static TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) {
switch (status) {
case TRANS_UNKNOWN:
return TTransactionStatus::UNKNOWN;
case TRANS_PREPARE:
return TTransactionStatus::PREPARE;
case TRANS_COMMITTED:
return TTransactionStatus::COMMITTED;
case TRANS_VISIBLE:
return TTransactionStatus::VISIBLE;
case TRANS_ABORTED:
return TTransactionStatus::ABORTED;
case TRANS_PREPARED:
return TTransactionStatus::PREPARED;
default:
return TTransactionStatus::UNKNOWN;
}
}

void BatchWriteMgr::update_transaction_state(const PUpdateTransactionStateRequest* request,
PUpdateTransactionStateResponse* response) {
for (int i = 0; i < request->states_size(); i++) {
auto& txn_state = request->states(i);
auto st = _txn_state_cache->push_state(txn_state.txn_id(), to_thrift_txn_status(txn_state.status()),
txn_state.reason());
if (!st.ok()) {
LOG(WARNING) << "Failed to update transaction state, txn_id: " << txn_state.txn_id()
<< ", txn status: " << TransactionStatusPB_Name(txn_state.status())
<< ", status reason: " << txn_state.reason() << ", update error: " << st;
} else {
TRACE_BATCH_WRITE << "Update transaction state, txn_id: " << txn_state.txn_id()
<< ", txn status: " << TransactionStatusPB_Name(txn_state.status())
<< ", status reason: " << txn_state.reason();
}
st.to_protobuf(response->add_results());
}
}

} // namespace starrocks
17 changes: 14 additions & 3 deletions be/src/runtime/batch_write/batch_write_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "common/statusor.h"
#include "runtime/batch_write/isomorphic_batch_write.h"
#include "runtime/batch_write/txn_state_cache.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/bthreads/bthread_shared_mutex.h"
#include "util/bthreads/executor.h"
Expand All @@ -32,10 +33,13 @@ class ExecEnv;
class PStreamLoadRequest;
class PStreamLoadResponse;
class StreamLoadContext;
class PUpdateTransactionStateRequest;
class PUpdateTransactionStateResponse;

class BatchWriteMgr {
public:
BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor) : _executor(std::move(executor)){};
BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor);
Status init();

Status register_stream_load_pipe(StreamLoadContext* pipe_ctx);
void unregister_stream_load_pipe(StreamLoadContext* pipe_ctx);
Expand All @@ -45,19 +49,26 @@ class BatchWriteMgr {

void stop();

bthreads::ThreadPoolExecutor* executor() { return _executor.get(); }
TxnStateCache* txn_state_cache() { return _txn_state_cache.get(); }

static StatusOr<StreamLoadContext*> create_and_register_pipe(
ExecEnv* exec_env, BatchWriteMgr* batch_write_mgr, const string& db, const string& table,
const std::map<std::string, std::string>& load_parameters, const string& label, long txn_id,
const TUniqueId& load_id, int32_t batch_write_interval_ms);

static void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request,
PStreamLoadResponse* response);
void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request,
PStreamLoadResponse* response);

void update_transaction_state(const PUpdateTransactionStateRequest* request,
PUpdateTransactionStateResponse* response);

private:
StatusOr<IsomorphicBatchWriteSharedPtr> _get_batch_write(const BatchWriteId& batch_write_id,
bool create_if_missing);

std::unique_ptr<bthreads::ThreadPoolExecutor> _executor;
std::unique_ptr<TxnStateCache> _txn_state_cache;
bthreads::BThreadSharedMutex _rw_mutex;
std::unordered_map<BatchWriteId, IsomorphicBatchWriteSharedPtr, BatchWriteIdHash, BatchWriteIdEqual>
_batch_write_map;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/batch_write/batch_write_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace starrocks {

#define TRACE_BATCH_WRITE LOG_IF(INFO, config::batch_write_trace_log_enable)
#define TRACE_BATCH_WRITE LOG_IF(INFO, config::merge_commit_trace_log_enable)

using BatchWriteLoadParams = std::map<std::string, std::string>;

Expand Down
105 changes: 33 additions & 72 deletions be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ class AsyncAppendDataContext {
std::atomic_int num_retries{-1};
};

IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor)
: _batch_write_id(std::move(batch_write_id)), _executor(executor) {}
IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor,
TxnStateCache* txn_state_cache)
: _batch_write_id(std::move(batch_write_id)), _executor(executor), _txn_state_cache(txn_state_cache) {}

Status IsomorphicBatchWrite::init() {
TEST_ERROR_POINT("IsomorphicBatchWrite::init::error");
Expand Down Expand Up @@ -220,7 +221,6 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
}
int64_t start_ts = MonotonicNanos();
AsyncAppendDataContext* async_ctx = new AsyncAppendDataContext(data_ctx);
async_ctx->ref();
async_ctx->create_time_ts.store(MonotonicNanos());
Expand Down Expand Up @@ -258,10 +258,7 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
if (_batch_write_async) {
return Status::OK();
}
int64_t timeout_ms =
data_ctx->timeout_second > 0 ? data_ctx->timeout_second * 1000 : config::batch_write_default_timeout_ms;
int64_t left_timeout_ns = std::max((int64_t)0, timeout_ms * 1000 * 1000 - (MonotonicNanos() - start_ts));
return _wait_for_load_status(data_ctx, left_timeout_ns);
return _wait_for_load_finish(data_ctx);
}

int IsomorphicBatchWrite::_execute_tasks(void* meta, bthread::TaskIterator<Task>& iter) {
Expand Down Expand Up @@ -309,7 +306,7 @@ Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) {
SCOPED_RAW_TIMER(&write_data_cost_ns);
st = _write_data_to_pipe(async_ctx);
}
if (st.ok() || num_retries >= config::batch_write_rpc_request_retry_num) {
if (st.ok() || num_retries >= config::merge_commit_rpc_request_retry_num) {
break;
}
num_retries += 1;
Expand All @@ -324,7 +321,7 @@ Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) {
SCOPED_RAW_TIMER(&wait_pipe_cost_ns);
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_alive_stream_load_pipe_ctxs.empty()) {
_cv.wait_for(lock, config::batch_write_rpc_request_retry_interval_ms * 1000);
_cv.wait_for(lock, config::merge_commit_rpc_request_retry_interval_ms * 1000);
}
}
}
Expand Down Expand Up @@ -404,7 +401,7 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &response](FrontendServiceConnection& client) { client->requestMergeCommit(response, request); },
config::batch_write_rpc_reqeust_timeout_ms);
config::merge_commit_rpc_reqeust_timeout_ms);
TRACE_BATCH_WRITE << "receive requestBatchWrite response, " << _batch_write_id
<< ", user label: " << data_ctx->label << ", master: " << master_addr
<< ", cost: " << ((MonotonicNanos() - start_ts) / 1000) << "us, status: " << st
Expand All @@ -418,78 +415,42 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
return st.ok() ? Status(response.status) : st;
}

bool is_final_load_status(const TTransactionStatus::type& status) {
switch (status) {
case TTransactionStatus::VISIBLE:
case TTransactionStatus::ABORTED:
case TTransactionStatus::UNKNOWN:
return true;
default:
return false;
Status IsomorphicBatchWrite::_wait_for_load_finish(StreamLoadContext* data_ctx) {
int64_t total_timeout_ms =
data_ctx->timeout_second > 0 ? data_ctx->timeout_second * 1000 : config::merge_commit_default_timeout_ms;
int64_t left_timeout_ms =
std::max((int64_t)0, total_timeout_ms - (MonotonicNanos() - data_ctx->start_nanos) / 1000000);
StatusOr<TxnStateSubscriberPtr> subscriber_status = _txn_state_cache->subscribe_state(
data_ctx->txn_id, data_ctx->label, data_ctx->db, data_ctx->table, data_ctx->auth);
if (!subscriber_status.ok()) {
return Status::InternalError("Failed to create txn state subscriber, " +
subscriber_status.status().to_string());
}
}

// TODO just poll the load status periodically. improve it later, such as cache the label, and FE notify the BE
Status IsomorphicBatchWrite::_wait_for_load_status(StreamLoadContext* data_ctx, int64_t timeout_ns) {
TxnStateSubscriberPtr subscriber = std::move(subscriber_status.value());
int64_t start_ts = MonotonicNanos();
int64_t wait_load_finish_ns = std::max((int64_t)0, data_ctx->mc_left_merge_time_nanos) + 1000000;
bthread_usleep(std::min(wait_load_finish_ns, timeout_ns) / 1000);
TGetLoadTxnStatusRequest request;
request.__set_db(_batch_write_id.db);
request.__set_tbl(_batch_write_id.table);
request.__set_txnId(data_ctx->txn_id);
set_request_auth(&request, data_ctx->auth);
TGetLoadTxnStatusResult response;
Status st;
do {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
}
#ifndef BE_TEST
int64_t rpc_ts = MonotonicNanos();
TNetworkAddress master_addr = get_master_address();
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &response](FrontendServiceConnection& client) {
client->getLoadTxnStatus(response, request);
},
config::batch_write_rpc_reqeust_timeout_ms);
TRACE_BATCH_WRITE << "receive getLoadTxnStatus response, " << _batch_write_id
<< ", user label: " << data_ctx->label << ", txn_id: " << data_ctx->txn_id
<< ", label: " << data_ctx->batch_write_label << ", master: " << master_addr
<< ", cost: " << ((MonotonicNanos() - rpc_ts) / 1000) << "us, status: " << st
<< ", response: " << response;
#else
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::request", &request);
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::status", &st);
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::response", &response);
#endif
if (st.ok() && is_final_load_status(response.status)) {
break;
}
int64_t left_timeout_ns = timeout_ns - (MonotonicNanos() - start_ts);
if (left_timeout_ns <= 0) {
break;
}
bthread_usleep(
std::min(config::batch_write_poll_load_status_interval_ms * (int64_t)1000, left_timeout_ns / 1000));
} while (true);
StatusOr<TxnState> status_or = subscriber->wait_finished_state(left_timeout_ms * 1000);
data_ctx->mc_wait_finish_cost_nanos = MonotonicNanos() - start_ts;
if (!st.ok()) {
return Status::InternalError("Failed to get load status, " + st.to_string());
TRACE_BATCH_WRITE << "finish to wait load, " << _batch_write_id << ", user label: " << data_ctx->label
<< ", txn_id: " << data_ctx->txn_id << ", load label: " << data_ctx->batch_write_label
<< ", cost: " << (data_ctx->mc_wait_finish_cost_nanos / 1000)
<< "us, wait status: " << status_or.status() << ", "
<< (status_or.ok() ? status_or.value() : subscriber->current_state());
if (!status_or.ok()) {
TxnState current_state = subscriber->current_state();
return Status::InternalError(fmt::format("Failed to get load final status, current status: {}, error: {}",
to_string(current_state.txn_status), status_or.status().to_string()));
}
switch (response.status) {
case TTransactionStatus::PREPARE:
case TTransactionStatus::PREPARED:
return Status::TimedOut("load timeout, txn status: " + to_string(response.status));
switch (status_or.value().txn_status) {
case TTransactionStatus::COMMITTED:
return Status::PublishTimeout("Load has not been published before timeout");
case TTransactionStatus::VISIBLE:
return Status::OK();
case TTransactionStatus::ABORTED:
return Status::InternalError("Load is aborted, reason: " + response.reason);
return Status::InternalError("Load is aborted, reason: " + status_or.value().reason);
case TTransactionStatus::UNKNOWN:
return Status::InternalError("Can't find the transaction, reason: " + status_or.value().reason);
default:
return Status::InternalError("Load status is unknown: " + to_string(response.status));
return Status::InternalError("Load status is not final: " + to_string(status_or.value().txn_status));
}
}

Expand Down
Loading

0 comments on commit faf0e38

Please sign in to comment.