Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 20, 2025
1 parent ad7909b commit 108155f
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,6 @@ void testFirstRowMergeEngine() throws Exception {
}
}


@ParameterizedTest
@CsvSource({"none,3", "lz4_frame,3", "zstd,3", "zstd,9"})
void testArrowCompressionAndProject(String compression, String level) throws Exception {
Expand All @@ -951,12 +950,12 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc
createTable(tablePath, tableDescriptor, false);

try (Connection conn = ConnectionFactory.createConnection(clientConf);
Table table = conn.getTable(tablePath)) {
Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.getAppendWriter();
int expectedSize = 30;
for (int i = 0; i < expectedSize; i++) {
String value = i % 2 == 0 ? "hello, friend " + i : null;
InternalRow row = row(schema.toRowType(), new Object[]{i, 100, value, i * 10L});
InternalRow row = row(schema.toRowType(), new Object[] {i, 100, value, i * 10L});
appendWriter.append(row);
if (i % 10 == 0) {
// insert 3 bathes, each batch has 10 rows
Expand Down Expand Up @@ -989,7 +988,7 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc
logScanner.close();

// fetch data with project.
logScanner = createLogScanner(table, new int[]{0, 2});
logScanner = createLogScanner(table, new int[] {0, 2});
subscribeFromBeginning(logScanner, table);
count = 0;
while (count < expectedSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ void testUnsupportedStmtOnFirstRowMergeEngine() {
}

@Test
void testUnsupportedStmtOnVersionRowMergeEngine() {
void testUnsupportedStmtOnVersionMergeEngine() {
String t1 = "versionMergeEngineTable";
TablePath tablePath = TablePath.of(DEFAULT_DB, t1);
tBatchEnv.executeSql(
Expand Down Expand Up @@ -891,20 +891,14 @@ void testVersionMergeEngineWithTypeBigint() throws Exception {
tEnv.executeSql(
"create table merge_engine_with_version (a int not null primary key not enforced,"
+ " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')");
tEnv.executeSql(
"create table log_sink (a int not null primary key not enforced, b string, ts bigint)");

JobClient insertJobClient =
tEnv.executeSql("insert into log_sink select * from merge_engine_with_version")
.getJobClient()
.get();

// insert once
tEnv.executeSql(
"insert into merge_engine_with_version (a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)")
.await();

CloseableIterator<Row> rowIter = tEnv.executeSql("select * from log_sink").collect();
CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from merge_engine_with_version").collect();

// id=1 not update
List<String> expectedRows =
Expand All @@ -914,12 +908,10 @@ void testVersionMergeEngineWithTypeBigint() throws Exception {

// insert again, update id=3
tEnv.executeSql(
"insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000)")
"insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000), (2, 'v22', 1000)")
.await();
expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]");
assertResultsIgnoreOrder(rowIter, expectedRows, false);

insertJobClient.cancel().get();
}

@Test
Expand All @@ -934,7 +926,8 @@ void testVersionMergeEngineWithTypeTimestamp() throws Exception {
+ "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), "
+ "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), "
+ "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), "
+ "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');")
+ "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123'),"
+ "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');")
.await();

CloseableIterator<Row> rowIter =
Expand All @@ -950,6 +943,66 @@ void testVersionMergeEngineWithTypeTimestamp() throws Exception {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}

@Test
void testVersionMergeEngineWithTypeTimestamp9() throws Exception {
tEnv.executeSql(
"create table merge_engine_with_version (a int not null primary key not enforced,"
+ " b string, ts TIMESTAMP(9)) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')");

// insert once
tEnv.executeSql(
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
+ "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123456789'), "
+ "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123456789'), "
+ "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123456789'), "
+ "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123456789'),"
+ "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123456789');")
.await();

CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from merge_engine_with_version").collect();

// id=1 not update
List<String> expectedRows =
Arrays.asList(
"+I[1, v1, 2024-12-27T12:00:00.123456789]",
"+I[2, v2, 2024-12-27T12:00:00.123456789]",
"+I[3, v3, 2024-12-27T12:00:00.123456789]");

assertResultsIgnoreOrder(rowIter, expectedRows, true);
}

@Test
void testVersionMergeEngineWithTypeTimestampLTZ9() throws Exception {

tEnv.getConfig().set("table.local-time-zone", "UTC");
tEnv.executeSql(
"create table merge_engine_with_version (a int not null primary key not enforced,"
+ " b string, ts TIMESTAMP(9) WITH LOCAL TIME ZONE ) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')");

// insert once
tEnv.executeSql(
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
+ "(1, 'v1', CAST(TO_TIMESTAMP('2024-12-27 12:00:00.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
+ "(2, 'v2', CAST(TO_TIMESTAMP('2024-12-27 12:00:00.987654321', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
+ "(1, 'v11', CAST(TO_TIMESTAMP('2024-12-27 11:59:59.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
+ "(3, 'v3', CAST(TO_TIMESTAMP('2024-12-27 12:00:00.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE)),"
+ "(3, 'v33', CAST(TO_TIMESTAMP('2024-12-27 12:00:00.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
.await();

CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from merge_engine_with_version").collect();

// id=1 not update
List<String> expectedRows =
Arrays.asList(
"+I[1, v1, 2024-12-27T12:00:00.123Z]",
"+I[2, v2, 2024-12-27T12:00:00.987Z]",
"+I[3, v3, 2024-12-27T12:00:00.123Z]");

assertResultsIgnoreOrder(rowIter, expectedRows, true);
}

private InsertAndExpectValues rowsToInsertInto(Collection<String> partitions) {
List<String> insertValues = new ArrayList<>();
List<String> expectedValues = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
import com.alibaba.fluss.server.coordinator.event.EventManager;
import com.alibaba.fluss.server.entity.CommitKvSnapshotData;
import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine;
import com.alibaba.fluss.server.kv.mergeengine.VersionRowMerger;
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde;
import com.alibaba.fluss.server.metadata.ServerMetadataCache;
Expand Down Expand Up @@ -203,14 +203,14 @@ private void checkMergeEngine(MergeEngine mergeEngine, Schema schema) {
RowType rowType = schema.toRowType();
int fieldIndex = rowType.getFieldIndex(column);
if (fieldIndex == -1) {
throw new IllegalArgumentException(
throw new InvalidTableException(
String.format(
"The version merge engine column %s does not exist.", column));
}
DataType dataType = rowType.getTypeAt(fieldIndex);
if (!VersionRowMergeEngine.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains(
dataType.getClass().getName())) {
throw new IllegalArgumentException(
if (!VersionRowMerger.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains(
dataType.getTypeRoot())) {
throw new InvalidTableException(
String.format(
"The version merge engine column does not support type %s .",
dataType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import com.alibaba.fluss.row.arrow.ArrowWriterProvider;
import com.alibaba.fluss.row.encode.ValueDecoder;
import com.alibaba.fluss.row.encode.ValueEncoder;
import com.alibaba.fluss.server.kv.mergeengine.DeduplicateRowMergeEngine;
import com.alibaba.fluss.server.kv.mergeengine.FirstRowMergeEngine;
import com.alibaba.fluss.server.kv.mergeengine.RowMergeEngine;
import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine;
import com.alibaba.fluss.server.kv.mergeengine.DeduplicateRowMerger;
import com.alibaba.fluss.server.kv.mergeengine.FirstRowMerger;
import com.alibaba.fluss.server.kv.mergeengine.RowMerger;
import com.alibaba.fluss.server.kv.mergeengine.VersionRowMerger;
import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater;
import com.alibaba.fluss.server.kv.partialupdate.PartialUpdaterCache;
import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer;
Expand Down Expand Up @@ -278,13 +278,13 @@ public LogAppendInfo putAsLeader(
KvRecordReadContext.createReadContext(kvFormat, fieldTypes);
ValueDecoder valueDecoder =
new ValueDecoder(readContext.getRowDecoder(schemaId));
RowMergeEngine rowMergeEngine = getRowMergeEngine(schema);
RowMerger rowMerger = getRowMerger(schema);
for (KvRecord kvRecord : kvRecords.records(readContext)) {
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
if (kvRecord.getRow() == null) {
// currently, all supported merge engine will ignore delete row.
if (rowMergeEngine.ignoreDelete()) {
if (rowMerger.ignoreDelete()) {
continue;
}
// it's for deletion
Expand Down Expand Up @@ -320,7 +320,7 @@ public LogAppendInfo putAsLeader(
BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row;
BinaryRow newRow =
updateRow(oldRow, kvRecord.getRow(), partialUpdater);
newRow = rowMergeEngine.merge(oldRow, newRow);
newRow = rowMerger.merge(oldRow, newRow);
if (newRow == null) {
continue;
}
Expand Down Expand Up @@ -374,16 +374,16 @@ public LogAppendInfo putAsLeader(
});
}

private RowMergeEngine getRowMergeEngine(Schema schema) {
private RowMerger getRowMerger(Schema schema) {
if (mergeEngine != null) {
switch (mergeEngine.getType()) {
case VERSION:
return new VersionRowMergeEngine(schema, mergeEngine);
return new VersionRowMerger(schema, mergeEngine);
case FIRST_ROW:
return new FirstRowMergeEngine();
return new FirstRowMerger();
}
}
return new DeduplicateRowMergeEngine();
return new DeduplicateRowMerger();
}

private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.server.kv.mergeengine;

import com.alibaba.fluss.row.BinaryRow;

/** The default merge engine always retains the last record. */
public class DeduplicateRowMergeEngine implements RowMergeEngine {
public class DeduplicateRowMerger implements RowMerger {
@Override
public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
return newRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.server.kv.mergeengine;

import com.alibaba.fluss.row.BinaryRow;

/** The first row merge engine for primary key table. Always retain the first row. */
public class FirstRowMergeEngine implements RowMergeEngine {
public class FirstRowMerger implements RowMerger {
@Override
public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
return null;
Expand All @@ -28,9 +29,4 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
public boolean ignoreDelete() {
return true;
}

@Override
public boolean ignorePartialUpdate() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.server.kv.mergeengine;

import com.alibaba.fluss.row.BinaryRow;

/** The row merge engine for primary key table. */
public interface RowMergeEngine {
public interface RowMerger {
BinaryRow merge(BinaryRow oldRow, BinaryRow newRow);

default boolean ignoreDelete() {
return false;
}

default boolean ignorePartialUpdate() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.server.kv.mergeengine;

import com.alibaba.fluss.metadata.MergeEngine;
Expand All @@ -24,6 +25,7 @@
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet;
import com.alibaba.fluss.types.BigIntType;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.DataTypeRoot;
import com.alibaba.fluss.types.IntType;
import com.alibaba.fluss.types.LocalZonedTimestampType;
import com.alibaba.fluss.types.RowType;
Expand All @@ -36,21 +38,20 @@
* The version row merge engine for primary key table. The update will only occur if the new value
* of the specified version field is greater than the old value.
*/
public class VersionRowMergeEngine implements RowMergeEngine {
public class VersionRowMerger implements RowMerger {

public static final Set<String> VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES =
public static final Set<DataTypeRoot> VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES =
ImmutableSet.of(
BigIntType.class.getName(),
IntType.class.getName(),
TimestampType.class.getName(),
TimeType.class.getName(),
LocalZonedTimestampType.class.getName());
DataTypeRoot.BIGINT,
DataTypeRoot.INTEGER,
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);

private final int fieldIndex;
private final InternalRow.FieldGetter fieldGetter;
private final RowType rowType;

public VersionRowMergeEngine(Schema schema, MergeEngine mergeEngine) {
public VersionRowMerger(Schema schema, MergeEngine mergeEngine) {
this.rowType = schema.toRowType();
InternalRow.FieldGetter[] currentFieldGetters =
new InternalRow.FieldGetter[rowType.getFieldCount()];
Expand Down Expand Up @@ -80,19 +81,16 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
return newRow;
}
DataType dataType = rowType.getTypeAt(fieldIndex);
return getValueComparator(dataType).isGreaterThan(newValue, oldValue) ? newRow : null;
return getValueComparator(dataType).isGreaterOrEqualThan(newValue, oldValue)
? newRow
: null;
}

@Override
public boolean ignoreDelete() {
return true;
}

@Override
public boolean ignorePartialUpdate() {
return true;
}

private ValueComparator getValueComparator(DataType dataType) {
if (dataType instanceof BigIntType) {
return (left, right) -> (Long) left > (Long) right;
Expand All @@ -101,18 +99,15 @@ private ValueComparator getValueComparator(DataType dataType) {
return (left, right) -> (Integer) left > (Integer) right;
}
if (dataType instanceof TimestampType) {
return (left, right) ->
((TimestampNtz) left).getMillisecond()
> ((TimestampNtz) right).getMillisecond();
return (left, right) -> ((TimestampNtz) left).compareTo((TimestampNtz) right) > 0;
}
if (dataType instanceof LocalZonedTimestampType) {
return (left, right) ->
((TimestampLtz) left).toEpochMicros() > ((TimestampLtz) right).toEpochMicros();
return (left, right) -> ((TimestampLtz) left).compareTo((TimestampLtz) right) > 0;
}
throw new UnsupportedOperationException("Unsupported data type: " + dataType);
}

interface ValueComparator {
boolean isGreaterThan(Object left, Object right);
boolean isGreaterOrEqualThan(Object left, Object right);
}
}
Loading

0 comments on commit 108155f

Please sign in to comment.