Skip to content

Commit

Permalink
[Feature] Add Transaction Commit/Rollback logical and state (#54218)
Browse files Browse the repository at this point in the history
  • Loading branch information
HangyuanLiu authored Jan 21, 2025
1 parent 1f73917 commit 92b45a6
Show file tree
Hide file tree
Showing 15 changed files with 914 additions and 116 deletions.
12 changes: 9 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ public enum ErrorCode {
ERR_INVALID_VALUE(5018, new byte[] {'H', 'Y', '0', '0', '0'}, "Invalid %s: '%s'. Expected values should be %s"),
ERR_NO_ALTER_OPERATION(5023, new byte[] {'H', 'Y', '0', '0', '0'},
"No operation in alter statement"),
ERR_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'}, "%s reached its timeout of %d seconds, %s"),
ERR_FAILED_WHEN_INSERT(5025, new byte[] {'H', 'Y', '0', '0', '0'}, "Failed when INSERT execute"),
ERR_TIMEOUT(5024, new byte[] {'5', '3', '4', '0', '0'}, "%s reached its timeout of %d seconds, %s"),
ERR_UNSUPPORTED_TYPE_IN_CTAS(5026, new byte[] {'H', 'Y', '0', '0', '0'},
"Unsupported type '%s' in create table as select statement"),
ERR_MISSING_PARAM(5027, new byte[] {'H', 'Y', '0', '0', '0'}, "Missing param: %s "),
Expand Down Expand Up @@ -208,7 +207,7 @@ public enum ErrorCode {
"Create table like does not support create view."),
ERROR_SET_CONFIG_FAILED(5076, new byte[] {'4', '2', '0', '0', '0'},
"set config failed: %s"),
ERR_QUERY_EXCEPTION(5077, new byte[] {'4', '2', '0', '0', '0'},
ERR_QUERY_CANCELLED_BY_CRASH(5077, new byte[] {'X', 'X', '0', '0', '0'},
"Query cancelled by crash of backends."),
ERR_BAD_CATALOG_ERROR(5078, new byte[] {'4', '2', '0', '0', '0'},
"Unknown catalog '%s'"),
Expand Down Expand Up @@ -274,6 +273,11 @@ public enum ErrorCode {
*/
ERR_LOCK_ERROR(5300, new byte[] {'5', '5', 'P', '0', '3'}, "Failed to acquire lock: %s"),
ERR_BEGIN_TXN_FAILED(5301, new byte[] {'5', '5', 'P', '0', '3'}, "Failed to begin transaction: %s"),
ERR_TXN_NOT_EXIST(5302, new byte[] {'2', '5', 'P', '0', '1'}, "Transaction %s does not exist"),
ERR_TXN_IMPORT_SAME_TABLE(5303, new byte[] {'2', '5', 'P', '0', '1'},
"NOT allowed to read or write tables that have been subjected to DML operations before"),
ERR_TXN_FORBID_CROSS_DB(5304, new byte[] {'2', '5', 'P', '0', '1'},
"Cannot execute cross-database transactions. All DML target tables must belong to the same db"),

/**
* 5400 - 5499: Internal error
Expand Down Expand Up @@ -339,6 +343,8 @@ public enum ErrorCode {
"You can modify 'kafka_offsets' property through ALTER ROUTINE LOAD and RESUME the job"),
ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'4', '2', '6', '0', '1'},
"%s column: %s has no matching %s column"),
ERR_FAILED_WHEN_INSERT(5609, new byte[] {'2', '2', '0', '0', '0'}, "Failed when executing INSERT : '%s'"),
ERR_LOAD_HAS_FILTERED_DATA(5610, new byte[] {'2', '2', '0', '0', '0'}, "Insert has filtered data : %s"),

/**
* 5700 - 5799: Partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public LoadException(String msg) {
public LoadException(String msg, Throwable e) {
super(msg, e);
}

public LoadException(ErrorCode errorCode, Object... objs) {
super(errorCode, objs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public NoAliveBackendException(String msg, Throwable cause) {
public NoAliveBackendException(String msg) {
super(msg);
}

public NoAliveBackendException() {
super(ErrorCode.ERR_QUERY_CANCELLED_BY_CRASH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public class TimeoutException extends StarRocksException {
public TimeoutException(String msg) {
super(msg);
}

public TimeoutException(String type, long timeout, String errMsg) {
super(ErrorCode.ERR_TIMEOUT, type, timeout, errMsg);
}
}
25 changes: 20 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,24 @@ public void recordFinishedOrCancelledLoadJob(long jobId, EtlJobType jobType, Str
}
}

public InsertLoadJob registerInsertLoadJob(String label, String dbName, long tableId, long txnId, String loadId, String user,
EtlJobType jobType, long createTimestamp, long estimateScanRows,
int estimateFileNum, long estimateFileSize, long timeout,
public static class EstimateStats {
long estimateScanRows;
int estimateFileNum;
long estimateFileSize;

public EstimateStats(long estimateScanRows, int estimateFileNum, long estimateFileSize) {
this.estimateScanRows = estimateScanRows;
this.estimateFileNum = estimateFileNum;
this.estimateFileSize = estimateFileSize;
}
}

public InsertLoadJob registerInsertLoadJob(String label, String dbName, long tableId, long txnId, String loadId,
String user,
EtlJobType jobType,
long createTimestamp,
EstimateStats estimateStats,
long timeout,
long warehouseId,
Coordinator coordinator) throws StarRocksException {
// get db id
Expand All @@ -249,8 +264,8 @@ public InsertLoadJob registerInsertLoadJob(String label, String dbName, long tab
if (Objects.requireNonNull(jobType) == EtlJobType.INSERT) {
loadJob = new InsertLoadJob(label, db.getId(), tableId, txnId, loadId, user,
createTimestamp, timeout, warehouseId, coordinator);
loadJob.setLoadFileInfo(estimateFileNum, estimateFileSize);
loadJob.setEstimateScanRow(estimateScanRows);
loadJob.setLoadFileInfo(estimateStats.estimateFileNum, estimateStats.estimateFileSize);
loadJob.setEstimateScanRow(estimateStats.estimateScanRows);
loadJob.setTransactionId(txnId);
} else {
throw new LoadException("Unknown job type [" + jobType.name() + "]");
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TWorkGroup;
import com.starrocks.transaction.ExplicitTxnState;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -251,6 +252,19 @@ public class ConnectContext {
// `insert into table select external table`. Currently, this feature only supports hive table.
private Optional<Boolean> useConnectorMetadataCache = Optional.empty();


// Explicit transaction in a session. The temporary state generated by multiple statements in a transaction is recorded in
// ExplicitTxnStateItem, and the transaction state is recorded in TransactionState.
private ExplicitTxnState explicitTxnState;

public void setExplicitTxnState(ExplicitTxnState explicitTxnState) {
this.explicitTxnState = explicitTxnState;
}

public ExplicitTxnState getExplicitTxnState() {
return explicitTxnState;
}

public StmtExecutor getExecutor() {
return executor;
}
Expand Down
35 changes: 22 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import com.starrocks.load.InsertOverwriteJobMgr;
import com.starrocks.load.loadv2.InsertLoadJob;
import com.starrocks.load.loadv2.LoadJob;
import com.starrocks.load.loadv2.LoadMgr;
import com.starrocks.metric.MetricRepo;
import com.starrocks.metric.TableMetricsEntity;
import com.starrocks.metric.TableMetricsRegistry;
Expand Down Expand Up @@ -233,6 +234,7 @@
import com.starrocks.transaction.TransactionCommitFailedException;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStatus;
import com.starrocks.transaction.TransactionStmtExecutor;
import com.starrocks.transaction.VisibleStateWaiter;
import com.starrocks.warehouse.WarehouseIdleChecker;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -523,7 +525,7 @@ public void execute() throws Exception {
context.setExecutionId(UUIDUtil.toTUniqueId(uuid));
SessionVariable sessionVariableBackup = context.getSessionVariable();

// if use http protocal, use httpResultSender to send result to netty channel
// if use http protocol, use httpResultSender to send result to netty channel
if (context instanceof HttpConnectContext) {
httpResultSender = new HttpResultSender((HttpConnectContext) context);
}
Expand Down Expand Up @@ -793,12 +795,14 @@ public void execute() throws Exception {
handleDelBackendBlackListStmt();
} else if (parsedStmt instanceof PlanAdvisorStmt) {
handlePlanAdvisorStmt();
} else if (parsedStmt instanceof BeginStmt
|| parsedStmt instanceof CommitStmt
|| parsedStmt instanceof RollbackStmt) {
handleUnsupportedStmt();
} else if (parsedStmt instanceof TranslateStmt) {
handleTranslateStmt();
} else if (parsedStmt instanceof BeginStmt) {
TransactionStmtExecutor.beginStmt(context, (BeginStmt) parsedStmt);
} else if (parsedStmt instanceof CommitStmt) {
TransactionStmtExecutor.commitStmt(context, (CommitStmt) parsedStmt);
} else if (parsedStmt instanceof RollbackStmt) {
TransactionStmtExecutor.rollbackStmt(context, (RollbackStmt) parsedStmt);
} else {
context.getState().setError("Do not support this query.");
}
Expand All @@ -820,7 +824,7 @@ public void execute() throws Exception {
} else if (e instanceof NoAliveBackendException) {
context.getState().setErrType(QueryState.ErrType.INTERNAL_ERR);
} else {
// TODO: some UserException doesn't belong to analysis error
// TODO: some StarRocksException doesn't belong to analysis error
// we should set such error type to internal error
context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
}
Expand Down Expand Up @@ -2314,6 +2318,12 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
}

MetricRepo.COUNTER_LOAD_ADD.increase(1L);

if (context.getExplicitTxnState() != null) {
TransactionStmtExecutor.loadData(database, targetTable, execPlan, stmt, originStmt, context);
return;
}

long transactionId = stmt.getTxnId();
TransactionState txnState = null;
String label = DebugUtil.printId(context.getExecutionId());
Expand Down Expand Up @@ -2346,6 +2356,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
TransactionStatus txnStatus = TransactionStatus.ABORTED;
boolean insertError = false;
String trackingSql = "";

try {
coord = getCoordinatorFactory().createInsertScheduler(
context, execPlan.getFragments(), execPlan.getScanNodes(), execPlan.getDescTbl().toThrift());
Expand Down Expand Up @@ -2386,16 +2397,14 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
context.getQualifiedUser(),
EtlJobType.INSERT,
createTime,
estimateScanRows,
estimateFileNum,
estimateScanFileSize,
new LoadMgr.EstimateStats(estimateScanRows, estimateFileNum, estimateScanFileSize),
getExecTimeout(),
context.getCurrentWarehouseId(),
coord);
loadJob.setJobProperties(stmt.getProperties());
jobId = loadJob.getId();
if (txnState != null) {
txnState.setCallbackId(jobId);
txnState.addCallbackId(jobId);
}
}

Expand Down Expand Up @@ -2435,8 +2444,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
execPlan.getExplainString(TExplainLevel.COSTS));
}

coord.cancel(ErrorCode.ERR_QUERY_EXCEPTION.formatErrorMsg());
ErrorReport.reportNoAliveBackendException(ErrorCode.ERR_QUERY_EXCEPTION);
coord.cancel(ErrorCode.ERR_QUERY_CANCELLED_BY_CRASH.formatErrorMsg());
ErrorReport.reportNoAliveBackendException(ErrorCode.ERR_QUERY_CANCELLED_BY_CRASH);
} else {
coord.cancel(ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeout, ""));
if (coord.isThriftServerHighLoad()) {
Expand Down Expand Up @@ -2725,7 +2734,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
"",
coord.getTrackingUrl());
}
} catch (MetaNotFoundException e) {
} catch (StarRocksException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ private static void setOutfileSink(QueryStatement queryStmt, ExecPlan plan) {
private static void beginTransaction(DmlStmt stmt, ConnectContext session)
throws BeginTransactionException, RunningTxnExceedException, AnalysisException, LabelAlreadyUsedException,
DuplicatedRequestException {
if (session.getExplicitTxnState() != null) {
stmt.setTxnId(session.getExplicitTxnState().getTransactionState().getTransactionId());
return;
}

// not need begin transaction here
// 1. explain (exclude explain analyze)
// 2. insert into files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.UpdateStmt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DMLStmtAnalyzer {
private static final Logger LOGGER = LoggerFactory.getLogger(DMLStmtAnalyzer.DMLStmtAnalyzerVisitor.class);

public static void analyze(DmlStmt stmt, ConnectContext context) {
new DMLStmtAnalyzer.DMLStmtAnalyzerVisitor().analyze(stmt, context);
}
Expand Down
Loading

0 comments on commit 92b45a6

Please sign in to comment.