Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 19, 2025
1 parent ceb5448 commit 5a0dc3f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ void testUnsupportedStmtOnFirstRowMergeEngine() {
}

@Test
void testUnsupportedStmtOnVersionRowMergeEngine() {
void testUnsupportedStmtOnVersionMergeEngine() {
String t1 = "versionMergeEngineTable";
TablePath tablePath = TablePath.of(DEFAULT_DB, t1);
tBatchEnv.executeSql(
Expand Down Expand Up @@ -872,20 +872,14 @@ void testVersionMergeEngineWithTypeBigint() throws Exception {
tEnv.executeSql(
"create table merge_engine_with_version (a int not null primary key not enforced,"
+ " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')");
tEnv.executeSql(
"create table log_sink (a int not null primary key not enforced, b string, ts bigint)");

JobClient insertJobClient =
tEnv.executeSql("insert into log_sink select * from merge_engine_with_version")
.getJobClient()
.get();

// insert once
tEnv.executeSql(
"insert into merge_engine_with_version (a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)")
.await();

CloseableIterator<Row> rowIter = tEnv.executeSql("select * from log_sink").collect();
CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from merge_engine_with_version").collect();

// id=1 not update
List<String> expectedRows =
Expand All @@ -895,12 +889,10 @@ void testVersionMergeEngineWithTypeBigint() throws Exception {

// insert again, update id=3
tEnv.executeSql(
"insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000)")
"insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000), (2, 'v22', 1000)")
.await();
expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]");
assertResultsIgnoreOrder(rowIter, expectedRows, false);

insertJobClient.cancel().get();
}

@Test
Expand All @@ -915,6 +907,7 @@ void testVersionMergeEngineWithTypeTimestamp() throws Exception {
+ "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), "
+ "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), "
+ "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), "
+ "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123'),"
+ "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');")
.await();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private void checkMergeEngine(MergeEngine mergeEngine, Schema schema) {
}
DataType dataType = rowType.getTypeAt(fieldIndex);
if (!VersionRowMergeEngine.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains(
dataType.getClass().getName())) {
dataType.getTypeRoot())) {
throw new InvalidTableException(
String.format(
"The version merge engine column does not support type %s .",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet;
import com.alibaba.fluss.types.BigIntType;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.DataTypeRoot;
import com.alibaba.fluss.types.IntType;
import com.alibaba.fluss.types.LocalZonedTimestampType;
import com.alibaba.fluss.types.RowType;
Expand All @@ -39,13 +40,12 @@
*/
public class VersionRowMergeEngine implements RowMergeEngine {

public static final Set<String> VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES =
public static final Set<DataTypeRoot> VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES =
ImmutableSet.of(
BigIntType.class.getName(),
IntType.class.getName(),
TimestampType.class.getName(),
TimeType.class.getName(),
LocalZonedTimestampType.class.getName());
DataTypeRoot.BIGINT,
DataTypeRoot.INTEGER,
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);

private final int fieldIndex;
private final InternalRow.FieldGetter fieldGetter;
Expand Down Expand Up @@ -81,7 +81,9 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
return newRow;
}
DataType dataType = rowType.getTypeAt(fieldIndex);
return getValueComparator(dataType).isGreaterThan(newValue, oldValue) ? newRow : null;
return getValueComparator(dataType).isGreaterOrEqualThan(newValue, oldValue)
? newRow
: null;
}

@Override
Expand All @@ -102,18 +104,15 @@ private ValueComparator getValueComparator(DataType dataType) {
return (left, right) -> (Integer) left > (Integer) right;
}
if (dataType instanceof TimestampType) {
return (left, right) ->
((TimestampNtz) left).getMillisecond()
> ((TimestampNtz) right).getMillisecond();
return (left, right) -> ((TimestampNtz) left).compareTo((TimestampNtz) right) > 0;
}
if (dataType instanceof LocalZonedTimestampType) {
return (left, right) ->
((TimestampLtz) left).toEpochMicros() > ((TimestampLtz) right).toEpochMicros();
return (left, right) -> ((TimestampLtz) left).compareTo((TimestampLtz) right) > 0;
}
throw new UnsupportedOperationException("Unsupported data type: " + dataType);
}

interface ValueComparator {
boolean isGreaterThan(Object left, Object right);
boolean isGreaterOrEqualThan(Object left, Object right);
}
}

0 comments on commit 5a0dc3f

Please sign in to comment.