diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index ed2bfb943..923ccff5f 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -72,6 +72,8 @@ import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK; +import static com.alibaba.fluss.record.TestData.DATA3_TABLE_PATH_PK; import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; @@ -884,7 +886,7 @@ void testFirstRowMergeEngine() throws Exception { TableDescriptor tableDescriptor = TableDescriptor.builder() .schema(DATA1_SCHEMA_PK) - .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW) + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.FIRST_ROW) .build(); RowType rowType = DATA1_SCHEMA_PK.toRowType(); createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false); @@ -901,7 +903,6 @@ void testFirstRowMergeEngine() throws Exception { expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"})); } upsertWriter.flush(); - // now, get rows by lookup for (int id = 0; id < rows; id++) { InternalRow gotRow = @@ -910,6 +911,55 @@ void testFirstRowMergeEngine() throws Exception { .getRow(); assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id)); } + // check scan change log + LogScanner logScanner = table.getLogScanner(new LogScan()); + logScanner.subscribeFromBeginning(0); + List actualLogRecords = new ArrayList<>(0); + while (actualLogRecords.size() < rows) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + scanRecords.forEach(actualLogRecords::add); + } + assertThat(actualLogRecords).hasSize(rows); + for (int i = 0; i < actualLogRecords.size(); i++) { + ScanRecord scanRecord = actualLogRecords.get(i); + assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT); + assertThatRow(scanRecord.getRow()) + .withSchema(rowType) + .isEqualTo(expectedRows.get(i)); + } + } + } + + @Test + void testMergeEngineWithVersion() throws Exception { + // Create table. + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA3_SCHEMA_PK) + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.VERSION) + .property(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN, "b") + .build(); + RowType rowType = DATA3_SCHEMA_PK.toRowType(); + createTable(DATA3_TABLE_PATH_PK, tableDescriptor, false); + + int rows = 3; + try (Table table = conn.getTable(DATA3_TABLE_PATH_PK)) { + // put rows. + UpsertWriter upsertWriter = table.getUpsertWriter(); + List expectedRows = new ArrayList<>(rows); + // init rows. + for (int row = 0; row < rows; row++) { + upsertWriter.upsert(compactedRow(rowType, new Object[] {row, 1000L})); + expectedRows.add(compactedRow(rowType, new Object[] {row, 1000L})); + } + // update row if id=0 and version < 1000L, will not update + upsertWriter.upsert(compactedRow(rowType, new Object[] {0, 999L})); + + // update if version> 1000L + upsertWriter.upsert(compactedRow(rowType, new Object[] {1, 1001L})); + rows = rows + 2; + + upsertWriter.flush(); // check scan change log LogScanner logScanner = table.getLogScanner(new LogScan()); @@ -922,13 +972,29 @@ void testFirstRowMergeEngine() throws Exception { } assertThat(actualLogRecords).hasSize(rows); - for (int i = 0; i < actualLogRecords.size(); i++) { + for (int i = 0; i < 3; i++) { ScanRecord scanRecord = actualLogRecords.get(i); assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT); assertThatRow(scanRecord.getRow()) .withSchema(rowType) .isEqualTo(expectedRows.get(i)); } + + // update_before for id =1 + List updateActualLogRecords = new ArrayList<>(actualLogRecords); + + ScanRecord beforeRecord = updateActualLogRecords.get(3); + assertThat(beforeRecord.getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE); + assertThat(beforeRecord.getRow().getFieldCount()).isEqualTo(2); + assertThat(beforeRecord.getRow().getInt(0)).isEqualTo(1); + assertThat(beforeRecord.getRow().getLong(1)).isEqualTo(1000); + + // update_after for id =1 + ScanRecord afterRecord = updateActualLogRecords.get(4); + assertThat(afterRecord.getRowKind()).isEqualTo(RowKind.UPDATE_AFTER); + assertThat(afterRecord.getRow().getFieldCount()).isEqualTo(2); + assertThat(afterRecord.getRow().getInt(0)).isEqualTo(1); + assertThat(afterRecord.getRow().getLong(1)).isEqualTo(1001); } } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 888049d2d..dc1b8a9da 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -969,12 +969,18 @@ public class ConfigOptions { + "When this option is set to ture and the datalake tiering service is up," + " the table will be tiered and compacted into datalake format stored on lakehouse storage."); - public static final ConfigOption TABLE_MERGE_ENGINE = + public static final ConfigOption TABLE_MERGE_ENGINE = key("table.merge-engine") - .enumType(MergeEngine.class) + .enumType(MergeEngine.Type.class) .noDefaultValue() .withDescription("The merge engine for the primary key table."); + public static final ConfigOption TABLE_MERGE_ENGINE_VERSION_COLUMN = + key("table.merge-engine.version.column") + .stringType() + .noDefaultValue() + .withDescription("The merge engine version column for the primary key table."); + // ------------------------------------------------------------------------ // ConfigOptions for Kv // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java index fe1cfdb15..963f83907 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java @@ -1,13 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Copyright (c) 2024 Alibaba Group Holding Ltd. * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,22 +16,125 @@ package com.alibaba.fluss.metadata; -/** - * The merge engine for primary key table. - * - * @since 0.6 - */ -public enum MergeEngine { - FIRST_ROW("first_row"); +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets; +import com.alibaba.fluss.types.BigIntType; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimeType; +import com.alibaba.fluss.types.TimestampType; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; - private final String value; +/** The merge engine for primary key table. */ +public class MergeEngine { + + public static final Set VERSION_SUPPORTED_DATA_TYPES = + Sets.newHashSet( + BigIntType.class.getName(), + IntType.class.getName(), + TimestampType.class.getName(), + TimeType.class.getName(), + LocalZonedTimestampType.class.getName()); + private final Type type; + private final String column; + + private MergeEngine(Type type) { + this(type, null); + } + + private MergeEngine(Type type, String column) { + this.type = type; + this.column = column; + } + + public static MergeEngine create(Map properties) { + return create(properties, null); + } + + public static MergeEngine create(Map properties, RowType rowType) { + return create(Configuration.fromMap(properties), rowType); + } - MergeEngine(String value) { - this.value = value; + public static MergeEngine create(Configuration options, RowType rowType) { + if (options == null) { + return null; + } + MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE); + if (type == null) { + return null; + } + + switch (type) { + case FIRST_ROW: + return new MergeEngine(Type.FIRST_ROW); + case VERSION: + String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN); + if (column == null) { + throw new IllegalArgumentException( + "When the merge engine is set to version, the 'table.merge-engine.version.column' cannot be empty."); + } + if (rowType != null) { + int fieldIndex = rowType.getFieldIndex(column); + if (fieldIndex == -1) { + throw new IllegalArgumentException( + String.format( + "When the merge engine is set to version, the column %s does not exist.", + column)); + } + DataType dataType = rowType.getTypeAt(fieldIndex); + if (!VERSION_SUPPORTED_DATA_TYPES.contains(dataType.getClass().getName())) { + throw new IllegalArgumentException( + String.format( + "The merge engine column is not support type %s .", + dataType.asSummaryString())); + } + } + return new MergeEngine(Type.VERSION, column); + default: + throw new UnsupportedOperationException("Unsupported merge engine: " + type); + } + } + + public Type getType() { + return type; + } + + public String getColumn() { + return column; + } + + public enum Type { + FIRST_ROW("first_row"), + VERSION("version"); + private final String value; + + Type(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + MergeEngine that = (MergeEngine) o; + return type == that.type && Objects.equals(column, that.column); } @Override - public String toString() { - return value; + public int hashCode() { + return Objects.hash(type, column); } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 9a346ce7a..028bbe243 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -281,7 +281,7 @@ public boolean isDataLakeEnabled() { } public @Nullable MergeEngine getMergeEngine() { - return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE); + return MergeEngine.create(configuration(), schema.toRowType()); } public TableDescriptor copy(Map newProperties) { diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java b/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java index f9af6cee4..64ae19fed 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java @@ -187,4 +187,18 @@ public final class TestData { TableDescriptor.builder().schema(DATA2_SCHEMA).distributedBy(3, "a").build(), 1); // -------------------------------- data2 info end ------------------------------------ + + // ------------------- data3 and related table info begin ---------------------- + public static final Schema DATA3_SCHEMA_PK = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.BIGINT()) + .withComment("b is second column") + .primaryKey("a") + .build(); + public static final TablePath DATA3_TABLE_PATH_PK = + TablePath.of("test_db_3", "test_pk_table_3"); + // ---------------------------- data3 table info end ------------------------------ + } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index 7f08a4765..729667d7c 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -23,6 +23,8 @@ import com.alibaba.fluss.connector.flink.sink.FlinkTableSink; import com.alibaba.fluss.connector.flink.source.FlinkTableSource; import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils; +import com.alibaba.fluss.connector.flink.utils.FlinkConversions; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.TablePath; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -129,7 +131,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { cache, partitionDiscoveryIntervalMs, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); + MergeEngine.create(helper.getOptions().toMap())); } @Override @@ -150,7 +152,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { rowType, context.getPrimaryKeyIndexes(), isStreamingMode, - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); + MergeEngine.create( + helper.getOptions().toMap(), FlinkConversions.toFlussRowType(rowType))); } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java index d9a9b96db..5cec7caf4 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java @@ -75,7 +75,7 @@ public FlinkTableSink( RowType tableRowType, int[] primaryKeyIndexes, boolean streaming, - @Nullable MergeEngine mergeEngine) { + MergeEngine mergeEngine) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; @@ -116,19 +116,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // is 0, when no column specified, it's not partial update // see FLINK-36000 && context.getTargetColumns().get().length != 0) { - // is partial update, check whether partial update is supported or not if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) { if (primaryKeyIndexes.length == 0) { throw new ValidationException( "Fluss table sink does not support partial updates for table without primary key. Please make sure the " + "number of specified columns in INSERT INTO matches columns of the Fluss table."); - } else if (mergeEngine == MergeEngine.FIRST_ROW) { - throw new ValidationException( - String.format( - "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the " - + "number of specified columns in INSERT INTO matches columns of the Fluss table.", - tablePath, MergeEngine.FIRST_ROW)); + } + if (mergeEngine != null) { + if (mergeEngine.getType() == MergeEngine.Type.FIRST_ROW + || mergeEngine.getType() == MergeEngine.Type.VERSION) { + throw new ValidationException( + String.format( + "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the " + + "number of specified columns in INSERT INTO matches columns of the Fluss table.", + tablePath, mergeEngine.getType())); + } } } int[][] targetColumns = context.getTargetColumns().get(); @@ -298,12 +301,14 @@ private void validateUpdatableAndDeletable() { "Table %s is a Log Table. Log Table doesn't support DELETE and UPDATE statements.", tablePath)); } - - if (mergeEngine == MergeEngine.FIRST_ROW) { - throw new UnsupportedOperationException( - String.format( - "Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.", - tablePath, MergeEngine.FIRST_ROW)); + if (mergeEngine != null) { + if (mergeEngine.getType() == MergeEngine.Type.FIRST_ROW + || mergeEngine.getType() == MergeEngine.Type.VERSION) { + throw new UnsupportedOperationException( + String.format( + "Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.", + tablePath, mergeEngine.getType())); + } } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java index 77484e15f..930d05e55 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java @@ -164,7 +164,7 @@ public ChangelogMode getChangelogMode() { } else { if (hasPrimaryKey()) { // pk table - if (mergeEngine == MergeEngine.FIRST_ROW) { + if (mergeEngine != null && mergeEngine.getType() == MergeEngine.Type.FIRST_ROW) { return ChangelogMode.insertOnly(); } else { return ChangelogMode.all(); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index eda67565a..eaee11451 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -787,6 +787,123 @@ void testUnsupportedStmtOnFirstRowMergeEngine() { tablePath); } + @Test + void testUnsupportedStmtOnVersionRowMergeEngine() { + String t1 = "versionMergeEngineTable"; + TablePath tablePath = TablePath.of(DEFAULT_DB, t1); + tBatchEnv.executeSql( + String.format( + "create table %s (" + + " a int not null," + + " b bigint null, " + + " c string null, " + + " primary key (a) not enforced" + + ") with ('table.merge-engine' = 'version', 'table.merge-engine.version.column' = 'b')", + t1)); + assertThatThrownBy(() -> tBatchEnv.executeSql("DELETE FROM " + t1 + " WHERE a = 1").await()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support DELETE or UPDATE statements.", + tablePath); + + assertThatThrownBy( + () -> + tBatchEnv + .executeSql("UPDATE " + t1 + " SET b = 4004 WHERE a = 1") + .await()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support DELETE or UPDATE statements.", + tablePath); + + assertThatThrownBy( + () -> + tBatchEnv + .executeSql("INSERT INTO " + t1 + "(a, c) VALUES(1, 'c1')") + .await()) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support partial updates." + + " Please make sure the number of specified columns in INSERT INTO matches columns of the Fluss table.", + tablePath); + } + + @Test + void testVersionMergeEngineWithTypeBigint() throws Exception { + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + tEnv.executeSql( + "create table log_sink (a int not null primary key not enforced, b string, ts bigint)"); + + JobClient insertJobClient = + tEnv.executeSql("insert into log_sink select * from merge_engine_with_version") + .getJobClient() + .get(); + + // insert once + tEnv.executeSql( + "insert into merge_engine_with_version (a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)") + .await(); + + CloseableIterator rowIter = tEnv.executeSql("select * from log_sink").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList("+I[1, v1, 1000]", "+I[2, v2, 1000]", "+I[3, v3, 1000]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // insert again, update id=3 + tEnv.executeSql( + "insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000)") + .await(); + expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]"); + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // test update a=4 + tBatchEnv + .executeSql("UPDATE merge_engine_with_version SET b = 'v45', ts=1001 WHERE a = 4") + .await(); + expectedRows = Arrays.asList("-U[4, v44, 1000]", "+U[4, v45, 1001]"); + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // test delete a=4 + tBatchEnv.executeSql("delete from merge_engine_with_version WHERE a = 4").await(); + expectedRows = Arrays.asList("-D[4, v45, 1001]"); + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + insertJobClient.cancel().get(); + } + + @Test + void testVersionMergeEngineWithTypeTimestamp() throws Exception { + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts TIMESTAMP(3)) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + + // insert once + tEnv.executeSql( + "INSERT INTO merge_engine_with_version (a, b, ts) VALUES " + + "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), " + + "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), " + + "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), " + + "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');") + .await(); + + CloseableIterator rowIter = + tEnv.executeSql("select * from merge_engine_with_version").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList( + "+I[1, v1, 2024-12-27T12:00:00.123]", + "+I[2, v2, 2024-12-27T12:00:00.123]", + "+I[3, v3, 2024-12-27T12:00:00.123]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + private InsertAndExpectValues rowsToInsertInto(Collection partitions) { List insertValues = new ArrayList<>(); List expectedValues = new ArrayList<>(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index abab7d7a8..d1070eaaf 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -32,13 +32,15 @@ import com.alibaba.fluss.record.KvRecord; import com.alibaba.fluss.record.KvRecordBatch; import com.alibaba.fluss.record.KvRecordReadContext; -import com.alibaba.fluss.record.RowKind; import com.alibaba.fluss.row.BinaryRow; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.arrow.ArrowWriterPool; import com.alibaba.fluss.row.arrow.ArrowWriterProvider; import com.alibaba.fluss.row.encode.ValueDecoder; -import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.kv.mergeengine.DefaultRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.FirstRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.RowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdaterCache; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -57,7 +59,6 @@ import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.RowType; -import com.alibaba.fluss.utils.BytesUtils; import com.alibaba.fluss.utils.FileUtils; import com.alibaba.fluss.utils.FlussPaths; import com.alibaba.fluss.utils.types.Tuple2; @@ -260,8 +261,6 @@ public LogAppendInfo putAsLeader( long logEndOffsetOfPrevBatch = logTablet.localLogEndOffset(); DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]); try { - long logOffset = logEndOffsetOfPrevBatch; - // TODO: reuse the read context and decoder KvRecordBatch.ReadContext readContext = KvRecordReadContext.createReadContext(kvFormat, fieldTypes); @@ -269,78 +268,21 @@ public LogAppendInfo putAsLeader( new ValueDecoder(readContext.getRowDecoder(schemaId)); int appendedRecordCount = 0; + + RowMergeEngine rowMergeEngine = + getRowMergeEngine( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + schema, + schemaId, + appendedRecordCount, + logEndOffsetOfPrevBatch); for (KvRecord kvRecord : kvRecords.records(readContext)) { - byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); - KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); - if (kvRecord.getRow() == null) { - // it's for deletion - byte[] oldValue = getFromBufferOrKv(key); - if (oldValue == null) { - // there might be large amount of such deletion, so we don't log - LOG.debug( - "The specific key can't be found in kv tablet although the kv record is for deletion, " - + "ignore it directly as it doesn't exist in the kv tablet yet."); - } else { - if (mergeEngine == MergeEngine.FIRST_ROW) { - // if the merge engine is first row, skip the deletion - continue; - } - BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; - BinaryRow newRow = deleteRow(oldRow, partialUpdater); - // if newRow is null, it means the row should be deleted - if (newRow == null) { - walBuilder.append(RowKind.DELETE, oldRow); - appendedRecordCount += 1; - kvPreWriteBuffer.delete(key, logOffset++); - } else { - // otherwise, it's a partial update, should produce -U,+U - walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); - walBuilder.append(RowKind.UPDATE_AFTER, newRow); - appendedRecordCount += 2; - kvPreWriteBuffer.put( - key, - ValueEncoder.encodeValue(schemaId, newRow), - logOffset + 1); - logOffset += 2; - } - } - } else { - // upsert operation - byte[] oldValue = getFromBufferOrKv(key); - // it's update - if (oldValue != null) { - if (mergeEngine == MergeEngine.FIRST_ROW) { - // if the merge engine is first row, skip the update - continue; - } - BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; - BinaryRow newRow = - updateRow(oldRow, kvRecord.getRow(), partialUpdater); - walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); - walBuilder.append(RowKind.UPDATE_AFTER, newRow); - appendedRecordCount += 2; - // logOffset is for -U, logOffset + 1 is for +U, we need to use - // the log offset for +U - kvPreWriteBuffer.put( - key, - ValueEncoder.encodeValue(schemaId, newRow), - logOffset + 1); - logOffset += 2; - } else { - // it's insert - // TODO: we should add guarantees that all non-specified columns - // of the input row are set to null. - BinaryRow newRow = kvRecord.getRow(); - walBuilder.append(RowKind.INSERT, newRow); - appendedRecordCount += 1; - kvPreWriteBuffer.put( - key, - ValueEncoder.encodeValue(schemaId, newRow), - logOffset++); - } - } + rowMergeEngine.writeRecord(kvRecord); } - + appendedRecordCount = rowMergeEngine.getAppendedRecordCount(); // if appendedRecordCount is 0, it means there is no record to append, we // should not append. if (appendedRecordCount > 0) { @@ -372,6 +314,54 @@ public LogAppendInfo putAsLeader( }); } + private RowMergeEngine getRowMergeEngine( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset) { + if (mergeEngine != null) { + switch (mergeEngine.getType()) { + case VERSION: + return new VersionRowMergeEngine( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset, + mergeEngine); + case FIRST_ROW: + return new FirstRowMergeEngine( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset); + } + } + return new DefaultRowMergeEngine( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset); + } + private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { switch (logFormat) { case INDEXED: diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DefaultRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DefaultRowMergeEngine.java new file mode 100644 index 000000000..c944268c6 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DefaultRowMergeEngine.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; +import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; +import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; +import com.alibaba.fluss.server.kv.wal.WalBuilder; + +/** A wrapper for no merge engine table. */ +public class DefaultRowMergeEngine extends RowMergeEngine { + + public DefaultRowMergeEngine( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + RocksDBKv rocksDBKv, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset) { + super( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset); + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java new file mode 100644 index 000000000..fceb3b380 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.row.BinaryRow; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; +import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; +import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; +import com.alibaba.fluss.server.kv.wal.WalBuilder; + +/** A wrapper for first row merge engine. */ +public class FirstRowMergeEngine extends RowMergeEngine { + + public FirstRowMergeEngine( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + RocksDBKv rocksDBKv, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset) { + super( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset); + } + + @Override + protected void deleteOrUpdate(KvPreWriteBuffer.Key key, byte[] oldValue) throws Exception { + // When Merge engine is "first_row", We don't need to do any update and delete operations. + } + + @Override + protected void update(KvPreWriteBuffer.Key key, BinaryRow oldRow, BinaryRow newRow) + throws Exception { + // When Merge engine is "first_row", We don't need to do any update operations. + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java new file mode 100644 index 000000000..85f04f0ac --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.record.KvRecord; +import com.alibaba.fluss.record.RowKind; +import com.alibaba.fluss.row.BinaryRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; +import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; +import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; +import com.alibaba.fluss.server.kv.wal.WalBuilder; +import com.alibaba.fluss.utils.BytesUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** Abstract common operations for merge engine. */ +public abstract class RowMergeEngine { + + private static final Logger LOG = LoggerFactory.getLogger(RowMergeEngine.class); + protected final PartialUpdater partialUpdater; + protected final ValueDecoder valueDecoder; + protected final WalBuilder walBuilder; + protected final KvPreWriteBuffer kvPreWriteBuffer; + protected final RocksDBKv rocksDBKv; + protected final Schema schema; + protected final short schemaId; + protected int appendedRecordCount; + protected long logOffset; + + public RowMergeEngine( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + RocksDBKv rocksDBKv, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset) { + this.partialUpdater = partialUpdater; + this.valueDecoder = valueDecoder; + this.walBuilder = walBuilder; + this.kvPreWriteBuffer = kvPreWriteBuffer; + this.rocksDBKv = rocksDBKv; + this.schema = schema; + this.schemaId = schemaId; + this.appendedRecordCount = appendedRecordCount; + this.logOffset = logOffset; + } + + public int getAppendedRecordCount() { + return appendedRecordCount; + } + + public void writeRecord(KvRecord kvRecord) throws Exception { + byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); + KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); + byte[] oldValue = getFromBufferOrKv(key); + if (kvRecord.getRow() == null) { + // it's for deletion + if (oldValue == null) { + // there might be large amount of such deletion, so we don't log + LOG.debug( + "The specific key can't be found in kv tablet although the kv record is for deletion, " + + "ignore it directly as it doesn't exist in the kv tablet yet."); + } else { + deleteOrUpdate(key, oldValue); + } + } else { + // upsert operation + upsert(key, kvRecord, oldValue); + } + } + + private void upsert(KvPreWriteBuffer.Key key, KvRecord kvRecord, byte[] oldValue) + throws Exception { + // it's update + if (oldValue != null) { + BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; + BinaryRow newRow = updateRow(oldRow, kvRecord.getRow(), partialUpdater); + update(key, oldRow, newRow); + } else { + // it's insert + // TODO: we should add guarantees that all non-specified columns + // of the input row are set to null. + insert(key, kvRecord); + } + } + + protected void deleteOrUpdate(KvPreWriteBuffer.Key key, byte[] oldValue) throws Exception { + BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; + BinaryRow newRow = deleteRow(oldRow, partialUpdater); + // if newRow is null, it means the row should be deleted + if (newRow == null) { + delete(key, oldRow); + } else { + // otherwise, it's a partial update, should produce -U,+U + update(key, oldRow, newRow); + } + } + + protected void delete(KvPreWriteBuffer.Key key, BinaryRow oldRow) throws Exception { + walBuilder.append(RowKind.DELETE, oldRow); + appendedRecordCount += 1; + kvPreWriteBuffer.delete(key, logOffset++); + } + + protected void insert(KvPreWriteBuffer.Key key, KvRecord kvRecord) throws Exception { + BinaryRow newRow = kvRecord.getRow(); + walBuilder.append(RowKind.INSERT, newRow); + appendedRecordCount += 1; + kvPreWriteBuffer.put(key, ValueEncoder.encodeValue(schemaId, newRow), logOffset++); + } + + protected void update(KvPreWriteBuffer.Key key, BinaryRow oldRow, BinaryRow newRow) + throws Exception { + walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); + walBuilder.append(RowKind.UPDATE_AFTER, newRow); + appendedRecordCount += 2; + // logOffset is for -U, logOffset + 1 is for +U, we need to use + // the log offset for +U + kvPreWriteBuffer.put(key, ValueEncoder.encodeValue(schemaId, newRow), logOffset + 1); + logOffset += 2; + } + + // get from kv pre-write buffer first, if can't find, get from rocksdb + protected byte[] getFromBufferOrKv(KvPreWriteBuffer.Key key) throws IOException { + KvPreWriteBuffer.Value value = kvPreWriteBuffer.get(key); + if (value == null) { + return rocksDBKv.get(key.get()); + } + return value.get(); + } + + protected @Nullable BinaryRow deleteRow( + InternalRow oldRow, @Nullable PartialUpdater partialUpdater) { + if (partialUpdater == null) { + return null; + } + return partialUpdater.deleteRow(oldRow); + } + + protected BinaryRow updateRow( + BinaryRow oldRow, BinaryRow updateRow, @Nullable PartialUpdater partialUpdater) { + // if is not partial update, return the update row + if (partialUpdater == null) { + return updateRow; + } + // otherwise, do partial update + return partialUpdater.updateRow(oldRow, updateRow); + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java new file mode 100644 index 000000000..13b89cd99 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.metadata.MergeEngine; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.row.BinaryRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; +import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; +import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; +import com.alibaba.fluss.server.kv.wal.WalBuilder; +import com.alibaba.fluss.types.BigIntType; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimeType; +import com.alibaba.fluss.types.TimestampType; + +/** A wrapper for version merge engine. */ +public class VersionRowMergeEngine extends RowMergeEngine { + + private final MergeEngine mergeEngine; + private final Schema schema; + private final InternalRow.FieldGetter[] currentFieldGetters; + + public VersionRowMergeEngine( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + RocksDBKv rocksDBKv, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset, + MergeEngine mergeEngine) { + super( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset); + this.mergeEngine = mergeEngine; + this.schema = schema; + RowType currentRowType = schema.toRowType(); + this.currentFieldGetters = new InternalRow.FieldGetter[currentRowType.getFieldCount()]; + for (int i = 0; i < currentRowType.getFieldCount(); i++) { + currentFieldGetters[i] = InternalRow.createFieldGetter(currentRowType.getTypeAt(i), i); + } + } + + @Override + protected void update(KvPreWriteBuffer.Key key, BinaryRow oldRow, BinaryRow newRow) + throws Exception { + RowType rowType = schema.toRowType(); + if (checkVersionMergeEngine(rowType, oldRow, newRow)) { + return; + } + super.update(key, oldRow, newRow); + } + + private boolean checkVersionMergeEngine(RowType rowType, BinaryRow oldRow, BinaryRow newRow) { + if (!checkNewRowVersion(mergeEngine, rowType, oldRow, newRow)) { + // When the specified field version is less + // than the version number of the old + // record, do not update + return true; + } + return false; + } + + // Check row version. + private boolean checkNewRowVersion( + MergeEngine mergeEngine, RowType rowType, BinaryRow oldRow, BinaryRow newRow) { + int fieldIndex = rowType.getFieldIndex(mergeEngine.getColumn()); + if (fieldIndex == -1) { + throw new IllegalArgumentException( + String.format( + "When the merge engine is set to version, the column %s does not exist.", + mergeEngine.getColumn())); + } + InternalRow.FieldGetter fieldGetter = currentFieldGetters[fieldIndex]; + Object oldValue = fieldGetter.getFieldOrNull(oldRow); + if (oldValue == null) { + throw new RuntimeException( + String.format( + "When the merge engine is set to version, the column %s old value cannot be null.", + mergeEngine.getColumn())); + } + Object newValue = fieldGetter.getFieldOrNull(newRow); + if (newValue == null) { + throw new RuntimeException( + String.format( + "When the merge engine is set to version, the column %s new value cannot be null.", + mergeEngine.getColumn())); + } + + DataType dataType = rowType.getTypeAt(fieldIndex); + + if (dataType instanceof BigIntType) { + return (Long) newValue > (Long) oldValue; + } else if (dataType instanceof IntType) { + return (Integer) newValue > (Integer) oldValue; + } else if (dataType instanceof TimestampType || dataType instanceof TimeType) { + return ((TimestampNtz) newValue).getMillisecond() + > ((TimestampNtz) oldValue).getMillisecond(); + } else if (dataType instanceof LocalZonedTimestampType) { + return ((TimestampLtz) newValue).toEpochMicros() + > ((TimestampLtz) oldValue).toEpochMicros(); + } else { + throw new FlussRuntimeException("Unsupported data type: " + dataType.asSummaryString()); + } + } +} diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 89674aaa3..bdcde2a26 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -72,6 +72,7 @@ import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DATA2_SCHEMA; +import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; @@ -558,8 +559,11 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) LogTablet logTablet = createLogTablet(tempLogDir, 1L, tablePath); TableBucket tableBucket = logTablet.getTableBucket(); + Map config = new HashMap<>(); + config.put("table.merge-engine", "first_row"); KvTablet kvTablet = - createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, MergeEngine.FIRST_ROW); + createKvTablet( + tablePath, tableBucket, logTablet, tmpKvDir, MergeEngine.create(config)); List kvData1 = Arrays.asList( @@ -568,7 +572,6 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v23"})); KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1); kvTablet.putAsLeader(kvRecordBatch1, null, DATA1_SCHEMA_PK); - long endOffset = logTablet.localLogEndOffset(); LogRecords actualLogRecords = readLogRecords(logTablet); List expectedLogs = @@ -600,6 +603,79 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) checkEqual(actualLogRecords, expectedLogs); } + void testVersionRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) + throws Exception { + + KvRecordTestUtils.KvRecordFactory kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(DATA3_SCHEMA_PK.toRowType()); + + PhysicalTablePath tablePath = + PhysicalTablePath.of(TablePath.of("testDb", "test_version_row")); + + LogTablet logTablet = createLogTablet(tempLogDir, 1L, tablePath); + TableBucket tableBucket = logTablet.getTableBucket(); + + Map config = new HashMap<>(); + config.put("table.merge-engine", "version"); + config.put("table.merge-engine.version.column", "b"); + MergeEngine mergeEngine = MergeEngine.create(config, DATA3_SCHEMA_PK.toRowType()); + KvTablet kvTablet = + createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, mergeEngine); + + List kvData1 = + Arrays.asList( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 1000L}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, 1000L}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, 1001L})); + KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1); + kvTablet.putAsLeader(kvRecordBatch1, null, DATA3_SCHEMA_PK); + + long endOffset = logTablet.localLogEndOffset(); + LogRecords actualLogRecords = readLogRecords(logTablet); + List expectedLogs = + Collections.singletonList( + logRecords( + DATA3_SCHEMA_PK.toRowType(), + 0, + Arrays.asList( + RowKind.INSERT, + RowKind.INSERT, + RowKind.UPDATE_BEFORE, + RowKind.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 1000L}, + new Object[] {2, 1000L}, + new Object[] {2, 1000L}, + new Object[] {2, 1001L}))); + checkEqual(actualLogRecords, expectedLogs, DATA3_SCHEMA_PK.toRowType()); + + List kvData2 = + Arrays.asList( + kvRecordFactory.ofRecord( + "k2".getBytes(), new Object[] {2, 1000L}), // not update + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 1001L}), // -U , +U + kvRecordFactory.ofRecord("k3".getBytes(), new Object[] {3, 1000L})); // +I + KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2); + kvTablet.putAsLeader(kvRecordBatch2, null, DATA3_SCHEMA_PK); + + expectedLogs = + Collections.singletonList( + logRecords( + DATA3_SCHEMA_PK.toRowType(), + endOffset, + Arrays.asList( + RowKind.UPDATE_BEFORE, + RowKind.UPDATE_AFTER, + RowKind.INSERT), + Arrays.asList( + new Object[] {1, 1000L}, + new Object[] {1, 1001L}, + new Object[] {3, 1000L}))); + actualLogRecords = readLogRecords(logTablet, endOffset); + checkEqual(actualLogRecords, expectedLogs, DATA3_SCHEMA_PK.toRowType()); + } + private LogRecords readLogRecords() throws Exception { return readLogRecords(0L); }