Skip to content

Commit

Permalink
[Enhancement] Return aborted reason when getting label/transaction st…
Browse files Browse the repository at this point in the history
…ate (backport #54472) (#54535)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Dec 31, 2024
1 parent bc4e740 commit b91ce95
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 58 deletions.
2 changes: 1 addition & 1 deletion be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ Status IsomorphicBatchWrite::_wait_for_load_status(StreamLoadContext* data_ctx,
case TTransactionStatus::VISIBLE:
return Status::OK();
case TTransactionStatus::ABORTED:
return Status::InternalError("Load is aborted because of failure");
return Status::InternalError("Load is aborted, reason: " + response.reason);
default:
return Status::InternalError("Load status is unknown: " + to_string(response.status));
}
Expand Down
37 changes: 23 additions & 14 deletions be/test/runtime/batch_write/isomorphic_batch_write_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class IsomorphicBatchWriteTest : public testing::Test {
return ctx;
}

void test_append_data_sync_base(const Status& rpc_status, TTransactionStatus::type txn_status,
void test_append_data_sync_base(const Status& rpc_status, const TGetLoadTxnStatusResult& expect_result,
const Status& expect_st);

protected:
Expand Down Expand Up @@ -244,22 +244,30 @@ TEST_F(IsomorphicBatchWriteTest, append_data_async) {
}

TEST_F(IsomorphicBatchWriteTest, append_data_sync) {
test_append_data_sync_base(Status::InternalError("Artificial failure"), TTransactionStatus::UNKNOWN,
TGetLoadTxnStatusResult expect_result;
expect_result.__set_status(TTransactionStatus::UNKNOWN);
test_append_data_sync_base(Status::InternalError("Artificial failure"), expect_result,
Status::InternalError("Failed to get load status, Internal error: Artificial failure"));
test_append_data_sync_base(Status::OK(), TTransactionStatus::PREPARE,
Status::TimedOut("load timeout, txn status: PREPARE"));
test_append_data_sync_base(Status::OK(), TTransactionStatus::PREPARED,
Status::TimedOut("load timeout, txn status: PREPARED"));
test_append_data_sync_base(Status::OK(), TTransactionStatus::COMMITTED,
expect_result.__set_status(TTransactionStatus::PREPARE);
test_append_data_sync_base(Status::OK(), expect_result, Status::TimedOut("load timeout, txn status: PREPARE"));
expect_result.__set_status(TTransactionStatus::PREPARED);
test_append_data_sync_base(Status::OK(), expect_result, Status::TimedOut("load timeout, txn status: PREPARED"));
expect_result.__set_status(TTransactionStatus::COMMITTED);
test_append_data_sync_base(Status::OK(), expect_result,
Status::PublishTimeout("Load has not been published before timeout"));
test_append_data_sync_base(Status::OK(), TTransactionStatus::VISIBLE, Status::OK());
test_append_data_sync_base(Status::OK(), TTransactionStatus::ABORTED,
Status::InternalError("Load is aborted because of failure"));
test_append_data_sync_base(Status::OK(), TTransactionStatus::UNKNOWN,
Status::InternalError("Load status is unknown: UNKNOWN"));
expect_result.__set_status(TTransactionStatus::VISIBLE);
test_append_data_sync_base(Status::OK(), expect_result, Status::OK());
expect_result.__set_status(TTransactionStatus::ABORTED);
expect_result.__set_reason("artificial failure");
test_append_data_sync_base(Status::OK(), expect_result,
Status::InternalError("Load is aborted, reason: artificial failure"));
expect_result.__set_status(TTransactionStatus::UNKNOWN);
expect_result.__set_reason("");
test_append_data_sync_base(Status::OK(), expect_result, Status::InternalError("Load status is unknown: UNKNOWN"));
}

void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_status, TTransactionStatus::type txn_status,
void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_status,
const TGetLoadTxnStatusResult& expect_result,
const Status& expect_st) {
BatchWriteId batch_write_id{
.db = "db", .table = "table", .load_params = {{HTTP_MERGE_COMMIT_ASYNC, "false"}, {HTTP_TIMEOUT, "1"}}};
Expand Down Expand Up @@ -310,7 +318,8 @@ void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_stat
[&](void* arg) { *((Status*)arg) = rpc_status; });
SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::_wait_for_load_status::response", [&](void* arg) {
TGetLoadTxnStatusResult* result = (TGetLoadTxnStatusResult*)arg;
result->__set_status(txn_status);
result->__set_status(expect_result.status);
result->__set_reason(expect_result.reason);
});
StreamLoadContext* data_ctx1 = build_data_context(batch_write_id, "data1");
Status result = batch_write->append_data(data_ctx1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.starrocks.http.BaseResponse;
import com.starrocks.http.IllegalArgException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.transaction.TransactionStateSnapshot;
import io.netty.handler.codec.http.HttpMethod;

public class GetStreamLoadState extends RestBaseAction {
Expand Down Expand Up @@ -81,16 +82,19 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response)
throw new DdlException("unknown database, database=" + dbName);
}

String status = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLabelStatus(db.getId(), label).toString();

sendResult(request, response, new Result(status));
TransactionStateSnapshot transactionStateSnapshot =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLabelStatus(db.getId(), label);
sendResult(request, response,
new Result(transactionStateSnapshot.getStatus().name(), transactionStateSnapshot.getReason()));
}

private static class Result extends RestBaseResult {
private String state;
private String reason;

public Result(String state) {
public Result(String state, String reason) {
this.state = state;
this.reason = reason;
}
}
}
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void recovery() {
if (StringUtils.isEmpty(file.insertLabel)) {
file.loadState = FileListRepo.PipeFileState.ERROR;
} else {
TransactionStatus txnStatus = txnMgr.getLabelStatus(dbId, file.insertLabel);
TransactionStatus txnStatus = txnMgr.getLabelStatus(dbId, file.insertLabel).getStatus();
if (txnStatus == null || txnStatus.isFailed()) {
file.loadState = FileListRepo.PipeFileState.ERROR;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@
import com.starrocks.transaction.TabletFailInfo;
import com.starrocks.transaction.TransactionNotFoundException;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStateSnapshot;
import com.starrocks.transaction.TxnCommitAttachment;
import com.starrocks.warehouse.Warehouse;
import com.starrocks.warehouse.WarehouseInfo;
Expand Down Expand Up @@ -1445,10 +1446,11 @@ public TGetLoadTxnStatusResult getLoadTxnStatus(TGetLoadTxnStatusRequest request
}

try {
TTransactionStatus status =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTxnStatus(db, request.getTxnId());
LOG.debug("txn {} status is {}", request.getTxnId(), status);
result.setStatus(status);
TransactionStateSnapshot transactionStateSnapshot =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTxnState(db, request.getTxnId());
LOG.debug("txn {} status is {}", request.getTxnId(), transactionStateSnapshot);
result.setStatus(transactionStateSnapshot.getStatus().toThrift());
result.setReason(transactionStateSnapshot.getReason());
} catch (Throwable e) {
result.setStatus(TTransactionStatus.UNKNOWN);
LOG.warn("catch unknown result.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.analyzer.FeNameFormat;
import com.starrocks.thrift.TTransactionStatus;
import com.starrocks.thrift.TUniqueId;
import io.opentelemetry.api.trace.Span;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -711,16 +710,17 @@ private void getTxnStateInfo(TransactionState txnState, List<String> info) {
info.add(txnState.getErrMsg());
}

public TransactionStatus getLabelState(String label) {
public TransactionStateSnapshot getLabelState(String label) {
readLock();
try {
Set<Long> existingTxnIds = unprotectedGetTxnIdsByLabel(label);
if (existingTxnIds == null || existingTxnIds.isEmpty()) {
return TransactionStatus.UNKNOWN;
return new TransactionStateSnapshot(TransactionStatus.UNKNOWN, null);
}
// find the latest txn (which id is largest)
long maxTxnId = existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).orElse(Long.MIN_VALUE);
return unprotectedGetTransactionState(maxTxnId).getTransactionStatus();
TransactionState transactionState = unprotectedGetTransactionState(maxTxnId);
return new TransactionStateSnapshot(transactionState.getTransactionStatus(), transactionState.getReason());
} finally {
readUnlock();
}
Expand Down Expand Up @@ -2012,18 +2012,15 @@ private List<TransactionStateListener> populateTransactionStateListeners(@NotNul
return stateListeners;
}

public TTransactionStatus getTxnStatus(long txnId) {
TransactionState transactionState;
public TransactionStateSnapshot getTxnState(long txnId) {
readLock();
try {
transactionState = unprotectedGetTransactionState(txnId);
TransactionState transactionState = unprotectedGetTransactionState(txnId);
return transactionState == null ? new TransactionStateSnapshot(TransactionStatus.UNKNOWN, null)
: new TransactionStateSnapshot(transactionState.getTransactionStatus(), transactionState.getReason());
} finally {
readUnlock();
}
return Optional.ofNullable(transactionState)
.map(TransactionState::getTransactionStatus)
.map(TransactionStatus::toThrift)
.orElse(TTransactionStatus.UNKNOWN);
}

private void checkDatabaseDataQuota() throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import com.starrocks.persist.metablock.SRMetaBlockWriter;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.thrift.TTransactionStatus;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.TransactionState.LoadJobSourceType;
import com.starrocks.transaction.TransactionState.TxnCoordinator;
Expand Down Expand Up @@ -216,13 +215,13 @@ public static void checkValidTimeoutSecond(long timeoutSecond, int maxLoadTimeou
}
}

public TransactionStatus getLabelStatus(long dbId, String label) {
public TransactionStateSnapshot getLabelStatus(long dbId, String label) {
try {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
return dbTransactionMgr.getLabelState(label);
} catch (AnalysisException e) {
LOG.warn("Get transaction status by label " + label + " failed", e);
return TransactionStatus.UNKNOWN;
return new TransactionStateSnapshot(TransactionStatus.UNKNOWN, null);
}
}

Expand Down Expand Up @@ -494,9 +493,9 @@ public void abortTransaction(Long dbId, String label, String reason) throws User
dbTransactionMgr.abortTransaction(label, reason);
}

public TTransactionStatus getTxnStatus(Database db, long transactionId) throws UserException {
public TransactionStateSnapshot getTxnState(Database db, long transactionId) throws UserException {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(db.getId());
return dbTransactionMgr.getTxnStatus(transactionId);
return dbTransactionMgr.getTxnState(transactionId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.transaction;

/** A snapshot of {@link TransactionState}. The snapshot can include more members in the future. */
public class TransactionStateSnapshot {

private final TransactionStatus status;
private final String reason;

public TransactionStateSnapshot(TransactionStatus status, String reason) {
this.status = status;
this.reason = reason;
}

public TransactionStatus getStatus() {
return status;
}

public String getReason() {
return reason;
}

@Override
public String toString() {
return "TransactionStateSnapshot{" +
"status=" + status +
", reason='" + reason + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.http;

import com.starrocks.catalog.Database;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.transaction.GlobalTransactionMgr;
import com.starrocks.transaction.TransactionStateSnapshot;
import com.starrocks.transaction.TransactionStatus;
import mockit.Expectations;
import mockit.Mocked;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.junit.Test;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class GetStreamLoadStateTest extends StarRocksHttpTestCase {

private final OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(100, TimeUnit.SECONDS)
.followRedirects(true)
.build();

@Mocked
private GlobalTransactionMgr globalTransactionMgr;

@Test
public void testSuccessLoad() throws Exception {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(DB_NAME);
String label = UUID.randomUUID().toString();
new Expectations() {
{
globalTransactionMgr.getLabelStatus(db.getId(), label);
times = 1;
result = new TransactionStateSnapshot(TransactionStatus.PREPARE, "");
}
};
verifyResult(label, TransactionStatus.PREPARE, "");

new Expectations() {
{
globalTransactionMgr.getLabelStatus(db.getId(), label);
times = 1;
result = new TransactionStateSnapshot(TransactionStatus.PREPARED, "");
}
};
verifyResult(label, TransactionStatus.PREPARED, "");

new Expectations() {
{
globalTransactionMgr.getLabelStatus(db.getId(), label);
times = 1;
result = new TransactionStateSnapshot(TransactionStatus.COMMITTED, "");
}
};
verifyResult(label, TransactionStatus.COMMITTED, "");

new Expectations() {
{
globalTransactionMgr.getLabelStatus(db.getId(), label);
times = 1;
result = new TransactionStateSnapshot(TransactionStatus.VISIBLE, "");
}
};
verifyResult(label, TransactionStatus.VISIBLE, "");
}

@Test
public void testAbortedState() throws Exception {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(DB_NAME);
String label = UUID.randomUUID().toString();
new Expectations() {
{
globalTransactionMgr.getLabelStatus(db.getId(), label);
times = 1;
result = new TransactionStateSnapshot(TransactionStatus.ABORTED, "artificial failure");
}
};
verifyResult(label, TransactionStatus.ABORTED, "artificial failure");
}

@Test
public void testUnknownState() throws Exception {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(DB_NAME);
String label = UUID.randomUUID().toString();
new Expectations() {
{
globalTransactionMgr.getLabelStatus(db.getId(), label);
times = 1;
result = new TransactionStateSnapshot(TransactionStatus.UNKNOWN, null);
}
};
verifyResult(label, TransactionStatus.UNKNOWN, null);
}

private void verifyResult(String label, TransactionStatus expectedStatus, String expectedReason) throws Exception {
Request request = new Request.Builder()
.addHeader("Authorization", rootAuth)
.url(String.format("%s/api/%s/get_load_state?label=%s", BASE_URL, DB_NAME, label))
.build();
try (Response response = client.newCall(request).execute()) {
assertEquals(200, response.code());
Map<String, Object> result = parseResponseBody(response);
assertEquals("0", result.get("code"));
assertEquals("OK", result.get("status"));
assertEquals("OK", result.get("message"));
assertEquals("Success", result.get("msg"));
assertEquals(expectedStatus.name(), result.get("state"));
assertEquals(expectedReason, result.get("reason"));
}
}
}
Loading

0 comments on commit b91ce95

Please sign in to comment.