From ad7909b0e490926f03083627db7a6765adccf1ac Mon Sep 17 00:00:00 2001 From: Xiaojian Sun <“sunxiaojian926@163.com”> Date: Thu, 26 Dec 2024 02:00:14 +0800 Subject: [PATCH 1/2] Introduce version merge engine for primary key table --- .../fluss/client/table/FlussTableITCase.java | 83 ++++++++++-- .../alibaba/fluss/config/ConfigOptions.java | 10 +- .../alibaba/fluss/metadata/MergeEngine.java | 101 ++++++++++++--- .../fluss/metadata/TableDescriptor.java | 2 +- .../com/alibaba/fluss/record/TestData.java | 14 +++ .../flink/catalog/FlinkTableFactory.java | 5 +- .../connector/flink/sink/FlinkTableSink.java | 31 +++-- .../flink/source/FlinkTableSource.java | 2 +- .../flink/sink/FlinkTableSinkITCase.java | 105 ++++++++++++++++ .../coordinator/CoordinatorService.java | 29 +++++ .../com/alibaba/fluss/server/kv/KvTablet.java | 34 +++-- .../DeduplicateRowMergeEngine.java | 26 ++++ .../kv/mergeengine/FirstRowMergeEngine.java | 36 ++++++ .../server/kv/mergeengine/RowMergeEngine.java | 31 +++++ .../kv/mergeengine/VersionRowMergeEngine.java | 118 ++++++++++++++++++ .../alibaba/fluss/server/kv/KvTabletTest.java | 81 +++++++++++- 16 files changed, 654 insertions(+), 54 deletions(-) create mode 100644 fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java create mode 100644 fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java create mode 100644 fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java create mode 100644 fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index e4cd9ca9c..363b57c46 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -73,6 +73,8 @@ import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK; +import static com.alibaba.fluss.record.TestData.DATA3_TABLE_PATH_PK; import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; @@ -885,7 +887,7 @@ void testFirstRowMergeEngine() throws Exception { TableDescriptor tableDescriptor = TableDescriptor.builder() .schema(DATA1_SCHEMA_PK) - .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW) + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.FIRST_ROW) .build(); RowType rowType = DATA1_SCHEMA_PK.toRowType(); createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false); @@ -902,7 +904,6 @@ void testFirstRowMergeEngine() throws Exception { expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"})); } upsertWriter.flush(); - // now, get rows by lookup for (int id = 0; id < rows; id++) { InternalRow gotRow = @@ -911,17 +912,13 @@ void testFirstRowMergeEngine() throws Exception { .getRow(); assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id)); } - - // check scan change log LogScanner logScanner = table.getLogScanner(new LogScan()); logScanner.subscribeFromBeginning(0); - List actualLogRecords = new ArrayList<>(0); while (actualLogRecords.size() < rows) { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); scanRecords.forEach(actualLogRecords::add); } - assertThat(actualLogRecords).hasSize(rows); for (int i = 0; i < actualLogRecords.size(); i++) { ScanRecord scanRecord = actualLogRecords.get(i); @@ -933,6 +930,7 @@ void testFirstRowMergeEngine() throws Exception { } } + @ParameterizedTest @CsvSource({"none,3", "lz4_frame,3", "zstd,3", "zstd,9"}) void testArrowCompressionAndProject(String compression, String level) throws Exception { @@ -953,12 +951,12 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc createTable(tablePath, tableDescriptor, false); try (Connection conn = ConnectionFactory.createConnection(clientConf); - Table table = conn.getTable(tablePath)) { + Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); int expectedSize = 30; for (int i = 0; i < expectedSize; i++) { String value = i % 2 == 0 ? "hello, friend " + i : null; - InternalRow row = row(schema.toRowType(), new Object[] {i, 100, value, i * 10L}); + InternalRow row = row(schema.toRowType(), new Object[]{i, 100, value, i * 10L}); appendWriter.append(row); if (i % 10 == 0) { // insert 3 bathes, each batch has 10 rows @@ -991,7 +989,7 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc logScanner.close(); // fetch data with project. - logScanner = createLogScanner(table, new int[] {0, 2}); + logScanner = createLogScanner(table, new int[]{0, 2}); subscribeFromBeginning(logScanner, table); count = 0; while (count < expectedSize) { @@ -1014,4 +1012,71 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc logScanner.close(); } } + + @Test + void testMergeEngineWithVersion() throws Exception { + // Create table. + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA3_SCHEMA_PK) + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.VERSION) + .property(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN, "b") + .build(); + RowType rowType = DATA3_SCHEMA_PK.toRowType(); + createTable(DATA3_TABLE_PATH_PK, tableDescriptor, false); + + int rows = 3; + try (Table table = conn.getTable(DATA3_TABLE_PATH_PK)) { + // put rows. + UpsertWriter upsertWriter = table.getUpsertWriter(); + List expectedScanRecords = new ArrayList<>(rows); + // init rows. + for (int row = 0; row < rows; row++) { + upsertWriter.upsert(compactedRow(rowType, new Object[] {row, 1000L})); + expectedScanRecords.add( + new ScanRecord(compactedRow(rowType, new Object[] {row, 1000L}))); + } + // update row if id=0 and version < 1000L, will not update + upsertWriter.upsert(compactedRow(rowType, new Object[] {0, 999L})); + + // update if version> 1000L + upsertWriter.upsert(compactedRow(rowType, new Object[] {1, 1001L})); + // update_before record, don't care about offset/timestamp + expectedScanRecords.add( + new ScanRecord( + -1, + -1, + RowKind.UPDATE_BEFORE, + compactedRow(rowType, new Object[] {1, 1000L}))); + // update_after record + expectedScanRecords.add( + new ScanRecord( + -1, + -1, + RowKind.UPDATE_AFTER, + compactedRow(rowType, new Object[] {1, 1001L}))); + rows = rows + 2; + + upsertWriter.flush(); + + LogScanner logScanner = table.getLogScanner(new LogScan()); + logScanner.subscribeFromBeginning(0); + + List actualLogRecords = new ArrayList<>(rows); + while (actualLogRecords.size() < rows) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + scanRecords.forEach(actualLogRecords::add); + } + + assertThat(actualLogRecords).hasSize(rows); + for (int i = 0; i < rows; i++) { + ScanRecord actualScanRecord = actualLogRecords.get(i); + ScanRecord expectedRecord = expectedScanRecords.get(i); + assertThat(actualScanRecord.getRowKind()).isEqualTo(expectedRecord.getRowKind()); + assertThatRow(actualScanRecord.getRow()) + .withSchema(rowType) + .isEqualTo(expectedRecord.getRow()); + } + } + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 64de99626..f9dfcf8df 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -1004,12 +1004,18 @@ public class ConfigOptions { + "When this option is set to ture and the datalake tiering service is up," + " the table will be tiered and compacted into datalake format stored on lakehouse storage."); - public static final ConfigOption TABLE_MERGE_ENGINE = + public static final ConfigOption TABLE_MERGE_ENGINE = key("table.merge-engine") - .enumType(MergeEngine.class) + .enumType(MergeEngine.Type.class) .noDefaultValue() .withDescription("The merge engine for the primary key table."); + public static final ConfigOption TABLE_MERGE_ENGINE_VERSION_COLUMN = + key("table.merge-engine.version.column") + .stringType() + .noDefaultValue() + .withDescription("The merge engine version column for the primary key table."); + // ------------------------------------------------------------------------ // ConfigOptions for Kv // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java index fe1cfdb15..18231b6ae 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java @@ -1,13 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Copyright (c) 2024 Alibaba Group Holding Ltd. * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,22 +16,95 @@ package com.alibaba.fluss.metadata; +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Objects; + /** * The merge engine for primary key table. * * @since 0.6 */ -public enum MergeEngine { - FIRST_ROW("first_row"); +public class MergeEngine { + + private final Type type; + + /** When merge engine type is version, column cannot be null. */ + @Nullable private final String column; + + private MergeEngine(Type type) { + this(type, null); + } + + private MergeEngine(Type type, String column) { + this.type = type; + this.column = column; + } + + public static MergeEngine create(Map properties) { + return create(Configuration.fromMap(properties)); + } + + private static MergeEngine create(Configuration options) { + MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE); + if (type == null) { + return null; + } + switch (type) { + case FIRST_ROW: + return new MergeEngine(Type.FIRST_ROW); + case VERSION: + String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN); + if (column == null) { + throw new IllegalArgumentException( + String.format( + "When the merge engine is set to version, the option '%s' must be set.", + ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key())); + } + return new MergeEngine(Type.VERSION, column); + default: + throw new UnsupportedOperationException("Unsupported merge engine: " + type); + } + } - private final String value; + public Type getType() { + return type; + } + + public String getColumn() { + return column; + } - MergeEngine(String value) { - this.value = value; + public enum Type { + FIRST_ROW("first_row"), + VERSION("version"); + private final String value; + + Type(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + MergeEngine that = (MergeEngine) o; + return type == that.type && Objects.equals(column, that.column); } @Override - public String toString() { - return value; + public int hashCode() { + return Objects.hash(type, column); } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 161d89013..df5e2e349 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -301,7 +301,7 @@ public boolean isDataLakeEnabled() { } public @Nullable MergeEngine getMergeEngine() { - return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE); + return MergeEngine.create(properties); } /** Gets the Arrow compression type and compression level of the table. */ diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java b/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java index e96ae69fd..a5d391671 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java @@ -207,4 +207,18 @@ public final class TestData { System.currentTimeMillis(), System.currentTimeMillis()); // -------------------------------- data2 info end ------------------------------------ + + // ------------------- data3 and related table info begin ---------------------- + public static final Schema DATA3_SCHEMA_PK = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.BIGINT()) + .withComment("b is second column") + .primaryKey("a") + .build(); + public static final TablePath DATA3_TABLE_PATH_PK = + TablePath.of("test_db_3", "test_pk_table_3"); + // ---------------------------- data3 table info end ------------------------------ + } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index 4f02323d7..fa79a7954 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.connector.flink.sink.FlinkTableSink; import com.alibaba.fluss.connector.flink.source.FlinkTableSource; import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.TablePath; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -129,7 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { cache, partitionDiscoveryIntervalMs, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); + MergeEngine.create(helper.getOptions().toMap())); } @Override @@ -150,7 +151,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { rowType, context.getPrimaryKeyIndexes(), isStreamingMode, - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), + MergeEngine.create(helper.getOptions().toMap()), tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE)); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java index e83e70220..48511ad74 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java @@ -123,19 +123,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // is 0, when no column specified, it's not partial update // see FLINK-36000 && context.getTargetColumns().get().length != 0) { - // is partial update, check whether partial update is supported or not if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) { if (primaryKeyIndexes.length == 0) { throw new ValidationException( "Fluss table sink does not support partial updates for table without primary key. Please make sure the " + "number of specified columns in INSERT INTO matches columns of the Fluss table."); - } else if (mergeEngine == MergeEngine.FIRST_ROW) { - throw new ValidationException( - String.format( - "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the " - + "number of specified columns in INSERT INTO matches columns of the Fluss table.", - tablePath, MergeEngine.FIRST_ROW)); + } + if (mergeEngine != null) { + if (mergeEngine.getType() == MergeEngine.Type.FIRST_ROW + || mergeEngine.getType() == MergeEngine.Type.VERSION) { + throw new ValidationException( + String.format( + "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the " + + "number of specified columns in INSERT INTO matches columns of the Fluss table.", + tablePath, mergeEngine.getType())); + } } } int[][] targetColumns = context.getTargetColumns().get(); @@ -311,12 +314,14 @@ private void validateUpdatableAndDeletable() { "Table %s is a Log Table. Log Table doesn't support DELETE and UPDATE statements.", tablePath)); } - - if (mergeEngine == MergeEngine.FIRST_ROW) { - throw new UnsupportedOperationException( - String.format( - "Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.", - tablePath, MergeEngine.FIRST_ROW)); + if (mergeEngine != null) { + if (mergeEngine.getType() == MergeEngine.Type.FIRST_ROW + || mergeEngine.getType() == MergeEngine.Type.VERSION) { + throw new UnsupportedOperationException( + String.format( + "Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.", + tablePath, mergeEngine.getType())); + } } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java index 77484e15f..930d05e55 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java @@ -164,7 +164,7 @@ public ChangelogMode getChangelogMode() { } else { if (hasPrimaryKey()) { // pk table - if (mergeEngine == MergeEngine.FIRST_ROW) { + if (mergeEngine != null && mergeEngine.getType() == MergeEngine.Type.FIRST_ROW) { return ChangelogMode.insertOnly(); } else { return ChangelogMode.all(); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index 86ae8b527..8db65538f 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -845,6 +845,111 @@ void testUnsupportedStmtOnFirstRowMergeEngine() { tablePath); } + @Test + void testUnsupportedStmtOnVersionRowMergeEngine() { + String t1 = "versionMergeEngineTable"; + TablePath tablePath = TablePath.of(DEFAULT_DB, t1); + tBatchEnv.executeSql( + String.format( + "create table %s (" + + " a int not null," + + " b bigint null, " + + " c string null, " + + " primary key (a) not enforced" + + ") with ('table.merge-engine' = 'version', 'table.merge-engine.version.column' = 'b')", + t1)); + assertThatThrownBy(() -> tBatchEnv.executeSql("DELETE FROM " + t1 + " WHERE a = 1").await()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support DELETE or UPDATE statements.", + tablePath); + + assertThatThrownBy( + () -> + tBatchEnv + .executeSql("UPDATE " + t1 + " SET b = 4004 WHERE a = 1") + .await()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support DELETE or UPDATE statements.", + tablePath); + + assertThatThrownBy( + () -> + tBatchEnv + .executeSql("INSERT INTO " + t1 + "(a, c) VALUES(1, 'c1')") + .await()) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support partial updates." + + " Please make sure the number of specified columns in INSERT INTO matches columns of the Fluss table.", + tablePath); + } + + @Test + void testVersionMergeEngineWithTypeBigint() throws Exception { + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + tEnv.executeSql( + "create table log_sink (a int not null primary key not enforced, b string, ts bigint)"); + + JobClient insertJobClient = + tEnv.executeSql("insert into log_sink select * from merge_engine_with_version") + .getJobClient() + .get(); + + // insert once + tEnv.executeSql( + "insert into merge_engine_with_version (a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)") + .await(); + + CloseableIterator rowIter = tEnv.executeSql("select * from log_sink").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList("+I[1, v1, 1000]", "+I[2, v2, 1000]", "+I[3, v3, 1000]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // insert again, update id=3 + tEnv.executeSql( + "insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000)") + .await(); + expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]"); + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + insertJobClient.cancel().get(); + } + + @Test + void testVersionMergeEngineWithTypeTimestamp() throws Exception { + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts TIMESTAMP(3)) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + + // insert once + tEnv.executeSql( + "INSERT INTO merge_engine_with_version (a, b, ts) VALUES " + + "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), " + + "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), " + + "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), " + + "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');") + .await(); + + CloseableIterator rowIter = + tEnv.executeSql("select * from merge_engine_with_version").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList( + "+I[1, v1, 2024-12-27T12:00:00.123]", + "+I[2, v2, 2024-12-27T12:00:00.123]", + "+I[3, v3, 2024-12-27T12:00:00.123]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + private InsertAndExpectValues rowsToInsertInto(Collection partitions) { List insertValues = new ArrayList<>(); List expectedValues = new ArrayList<>(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 414adc7ff..a7025b285 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.fs.FileSystem; import com.alibaba.fluss.metadata.DatabaseDescriptor; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; @@ -50,6 +51,7 @@ import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import com.alibaba.fluss.server.coordinator.event.EventManager; import com.alibaba.fluss.server.entity.CommitKvSnapshotData; +import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot; import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde; import com.alibaba.fluss.server.metadata.ServerMetadataCache; @@ -59,6 +61,7 @@ import com.alibaba.fluss.server.zk.data.TableAssignment; import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.DataTypeRoot; +import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.AutoPartitionStrategy; import com.alibaba.fluss.utils.concurrent.FutureUtils; @@ -182,6 +185,11 @@ public CompletableFuture createTable(CreateTableRequest req sanityCheckPartitionedTable(tableDescriptor); } + MergeEngine mergeEngine = tableDescriptor.getMergeEngine(); + if (mergeEngine != null) { + checkMergeEngine(mergeEngine, tableDescriptor.getSchema()); + } + // then create table; metadataManager.createTable( tablePath, tableDescriptor, tableAssignment, request.isIgnoreIfExists()); @@ -189,6 +197,27 @@ public CompletableFuture createTable(CreateTableRequest req return CompletableFuture.completedFuture(response); } + private void checkMergeEngine(MergeEngine mergeEngine, Schema schema) { + if (mergeEngine.getType() == MergeEngine.Type.VERSION) { + String column = mergeEngine.getColumn(); + RowType rowType = schema.toRowType(); + int fieldIndex = rowType.getFieldIndex(column); + if (fieldIndex == -1) { + throw new IllegalArgumentException( + String.format( + "The version merge engine column %s does not exist.", column)); + } + DataType dataType = rowType.getTypeAt(fieldIndex); + if (!VersionRowMergeEngine.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains( + dataType.getClass().getName())) { + throw new IllegalArgumentException( + String.format( + "The version merge engine column does not support type %s .", + dataType)); + } + } + } + private void sanityCheckPartitionedTable(TableDescriptor tableDescriptor) { AutoPartitionStrategy autoPartitionStrategy = tableDescriptor.getAutoPartitionStrategy(); if (!autoPartitionStrategy.isAutoPartitionEnabled()) { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index e01023049..1bc190f0c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -39,6 +39,10 @@ import com.alibaba.fluss.row.arrow.ArrowWriterProvider; import com.alibaba.fluss.row.encode.ValueDecoder; import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.kv.mergeengine.DeduplicateRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.FirstRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.RowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdaterCache; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -274,11 +278,15 @@ public LogAppendInfo putAsLeader( KvRecordReadContext.createReadContext(kvFormat, fieldTypes); ValueDecoder valueDecoder = new ValueDecoder(readContext.getRowDecoder(schemaId)); - + RowMergeEngine rowMergeEngine = getRowMergeEngine(schema); for (KvRecord kvRecord : kvRecords.records(readContext)) { byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); if (kvRecord.getRow() == null) { + // currently, all supported merge engine will ignore delete row. + if (rowMergeEngine.ignoreDelete()) { + continue; + } // it's for deletion byte[] oldValue = getFromBufferOrKv(key); if (oldValue == null) { @@ -287,10 +295,6 @@ public LogAppendInfo putAsLeader( "The specific key can't be found in kv tablet although the kv record is for deletion, " + "ignore it directly as it doesn't exist in the kv tablet yet."); } else { - if (mergeEngine == MergeEngine.FIRST_ROW) { - // if the merge engine is first row, skip the deletion - continue; - } BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = deleteRow(oldRow, partialUpdater); // if newRow is null, it means the row should be deleted @@ -313,13 +317,13 @@ public LogAppendInfo putAsLeader( byte[] oldValue = getFromBufferOrKv(key); // it's update if (oldValue != null) { - if (mergeEngine == MergeEngine.FIRST_ROW) { - // if the merge engine is first row, skip the update - continue; - } BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = updateRow(oldRow, kvRecord.getRow(), partialUpdater); + newRow = rowMergeEngine.merge(oldRow, newRow); + if (newRow == null) { + continue; + } walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); walBuilder.append(RowKind.UPDATE_AFTER, newRow); // logOffset is for -U, logOffset + 1 is for +U, we need to use @@ -370,6 +374,18 @@ public LogAppendInfo putAsLeader( }); } + private RowMergeEngine getRowMergeEngine(Schema schema) { + if (mergeEngine != null) { + switch (mergeEngine.getType()) { + case VERSION: + return new VersionRowMergeEngine(schema, mergeEngine); + case FIRST_ROW: + return new FirstRowMergeEngine(); + } + } + return new DeduplicateRowMergeEngine(); + } + private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { switch (logFormat) { case INDEXED: diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java new file mode 100644 index 000000000..b3abe51c4 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.row.BinaryRow; + +/** The default merge engine always retains the last record. */ +public class DeduplicateRowMergeEngine implements RowMergeEngine { + @Override + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + return newRow; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java new file mode 100644 index 000000000..e834f9435 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.row.BinaryRow; + +/** The first row merge engine for primary key table. Always retain the first row. */ +public class FirstRowMergeEngine implements RowMergeEngine { + @Override + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + return null; + } + + @Override + public boolean ignoreDelete() { + return true; + } + + @Override + public boolean ignorePartialUpdate() { + return true; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java new file mode 100644 index 000000000..1ea0bda91 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.row.BinaryRow; + +/** The row merge engine for primary key table. */ +public interface RowMergeEngine { + BinaryRow merge(BinaryRow oldRow, BinaryRow newRow); + + default boolean ignoreDelete() { + return false; + } + + default boolean ignorePartialUpdate() { + return false; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java new file mode 100644 index 000000000..54617cc81 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.metadata.MergeEngine; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.row.BinaryRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet; +import com.alibaba.fluss.types.BigIntType; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimeType; +import com.alibaba.fluss.types.TimestampType; + +import java.util.Set; + +/** + * The version row merge engine for primary key table. The update will only occur if the new value + * of the specified version field is greater than the old value. + */ +public class VersionRowMergeEngine implements RowMergeEngine { + + public static final Set VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES = + ImmutableSet.of( + BigIntType.class.getName(), + IntType.class.getName(), + TimestampType.class.getName(), + TimeType.class.getName(), + LocalZonedTimestampType.class.getName()); + + private final int fieldIndex; + private final InternalRow.FieldGetter fieldGetter; + private final RowType rowType; + + public VersionRowMergeEngine(Schema schema, MergeEngine mergeEngine) { + this.rowType = schema.toRowType(); + InternalRow.FieldGetter[] currentFieldGetters = + new InternalRow.FieldGetter[rowType.getFieldCount()]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + currentFieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i); + } + this.fieldIndex = rowType.getFieldIndex(mergeEngine.getColumn()); + if (fieldIndex == -1) { + throw new IllegalArgumentException( + String.format( + "The merge engine is set to version, but the version column %s does not exist.", + mergeEngine.getColumn())); + } + fieldGetter = currentFieldGetters[fieldIndex]; + } + + @Override + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + Object oldValue = fieldGetter.getFieldOrNull(oldRow); + Object newValue = fieldGetter.getFieldOrNull(newRow); + // If the new value is empty, ignore it directly. + if (newValue == null) { + return null; + } + // If the old value is null, simply overwrite it with the new value. + if (oldValue == null) { + return newRow; + } + DataType dataType = rowType.getTypeAt(fieldIndex); + return getValueComparator(dataType).isGreaterThan(newValue, oldValue) ? newRow : null; + } + + @Override + public boolean ignoreDelete() { + return true; + } + + @Override + public boolean ignorePartialUpdate() { + return true; + } + + private ValueComparator getValueComparator(DataType dataType) { + if (dataType instanceof BigIntType) { + return (left, right) -> (Long) left > (Long) right; + } + if (dataType instanceof IntType || dataType instanceof TimeType) { + return (left, right) -> (Integer) left > (Integer) right; + } + if (dataType instanceof TimestampType) { + return (left, right) -> + ((TimestampNtz) left).getMillisecond() + > ((TimestampNtz) right).getMillisecond(); + } + if (dataType instanceof LocalZonedTimestampType) { + return (left, right) -> + ((TimestampLtz) left).toEpochMicros() > ((TimestampLtz) right).toEpochMicros(); + } + throw new UnsupportedOperationException("Unsupported data type: " + dataType); + } + + interface ValueComparator { + boolean isGreaterThan(Object left, Object right); + } +} diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 31fce67bd..4bfcd2b52 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -73,6 +73,7 @@ import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DATA2_SCHEMA; +import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; @@ -560,8 +561,11 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) LogTablet logTablet = createLogTablet(tempLogDir, 1L, tablePath); TableBucket tableBucket = logTablet.getTableBucket(); + Map config = new HashMap<>(); + config.put("table.merge-engine", "first_row"); KvTablet kvTablet = - createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, MergeEngine.FIRST_ROW); + createKvTablet( + tablePath, tableBucket, logTablet, tmpKvDir, MergeEngine.create(config)); List kvData1 = Arrays.asList( @@ -570,7 +574,6 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v23"})); KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1); kvTablet.putAsLeader(kvRecordBatch1, null, DATA1_SCHEMA_PK); - long endOffset = logTablet.localLogEndOffset(); LogRecords actualLogRecords = readLogRecords(logTablet); List expectedLogs = @@ -602,6 +605,80 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) checkEqual(actualLogRecords, expectedLogs); } + @Test + void testVersionRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) + throws Exception { + + KvRecordTestUtils.KvRecordFactory kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(DATA3_SCHEMA_PK.toRowType()); + + PhysicalTablePath tablePath = + PhysicalTablePath.of(TablePath.of("testDb", "test_version_row")); + + LogTablet logTablet = createLogTablet(tempLogDir, 1L, tablePath); + TableBucket tableBucket = logTablet.getTableBucket(); + + Map config = new HashMap<>(); + config.put("table.merge-engine", "version"); + config.put("table.merge-engine.version.column", "b"); + MergeEngine mergeEngine = MergeEngine.create(config); + KvTablet kvTablet = + createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, mergeEngine); + + List kvData1 = + Arrays.asList( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 1000L}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, 1000L}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, 1001L})); + KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1); + kvTablet.putAsLeader(kvRecordBatch1, null, DATA3_SCHEMA_PK); + + long endOffset = logTablet.localLogEndOffset(); + LogRecords actualLogRecords = readLogRecords(logTablet); + List expectedLogs = + Collections.singletonList( + logRecords( + DATA3_SCHEMA_PK.toRowType(), + 0, + Arrays.asList( + RowKind.INSERT, + RowKind.INSERT, + RowKind.UPDATE_BEFORE, + RowKind.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 1000L}, + new Object[] {2, 1000L}, + new Object[] {2, 1000L}, + new Object[] {2, 1001L}))); + checkEqual(actualLogRecords, expectedLogs, DATA3_SCHEMA_PK.toRowType()); + + List kvData2 = + Arrays.asList( + kvRecordFactory.ofRecord( + "k2".getBytes(), new Object[] {2, 1000L}), // not update + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 1001L}), // -U , +U + kvRecordFactory.ofRecord("k3".getBytes(), new Object[] {3, 1000L})); // +I + KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2); + kvTablet.putAsLeader(kvRecordBatch2, null, DATA3_SCHEMA_PK); + + expectedLogs = + Collections.singletonList( + logRecords( + DATA3_SCHEMA_PK.toRowType(), + endOffset, + Arrays.asList( + RowKind.UPDATE_BEFORE, + RowKind.UPDATE_AFTER, + RowKind.INSERT), + Arrays.asList( + new Object[] {1, 1000L}, + new Object[] {1, 1001L}, + new Object[] {3, 1000L}))); + actualLogRecords = readLogRecords(logTablet, endOffset); + checkEqual(actualLogRecords, expectedLogs, DATA3_SCHEMA_PK.toRowType()); + } + private LogRecords readLogRecords() throws Exception { return readLogRecords(0L); } From 108155f610a51d89638abf2950ef5fc036e59f9c Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sat, 11 Jan 2025 20:29:59 +0800 Subject: [PATCH 2/2] fixed --- .../fluss/client/table/FlussTableITCase.java | 7 +- .../flink/sink/FlinkTableSinkITCase.java | 79 ++++++++++++++++--- .../coordinator/CoordinatorService.java | 10 +-- .../com/alibaba/fluss/server/kv/KvTablet.java | 22 +++--- ...eEngine.java => DeduplicateRowMerger.java} | 3 +- ...owMergeEngine.java => FirstRowMerger.java} | 8 +- .../{RowMergeEngine.java => RowMerger.java} | 7 +- ...MergeEngine.java => VersionRowMerger.java} | 35 ++++---- .../alibaba/fluss/server/kv/KvTabletTest.java | 1 + 9 files changed, 107 insertions(+), 65 deletions(-) rename fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/{DeduplicateRowMergeEngine.java => DeduplicateRowMerger.java} (92%) rename fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/{FirstRowMergeEngine.java => FirstRowMerger.java} (87%) rename fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/{RowMergeEngine.java => RowMerger.java} (89%) rename fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/{VersionRowMergeEngine.java => VersionRowMerger.java} (78%) diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 363b57c46..d1fa5e841 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -930,7 +930,6 @@ void testFirstRowMergeEngine() throws Exception { } } - @ParameterizedTest @CsvSource({"none,3", "lz4_frame,3", "zstd,3", "zstd,9"}) void testArrowCompressionAndProject(String compression, String level) throws Exception { @@ -951,12 +950,12 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc createTable(tablePath, tableDescriptor, false); try (Connection conn = ConnectionFactory.createConnection(clientConf); - Table table = conn.getTable(tablePath)) { + Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); int expectedSize = 30; for (int i = 0; i < expectedSize; i++) { String value = i % 2 == 0 ? "hello, friend " + i : null; - InternalRow row = row(schema.toRowType(), new Object[]{i, 100, value, i * 10L}); + InternalRow row = row(schema.toRowType(), new Object[] {i, 100, value, i * 10L}); appendWriter.append(row); if (i % 10 == 0) { // insert 3 bathes, each batch has 10 rows @@ -989,7 +988,7 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc logScanner.close(); // fetch data with project. - logScanner = createLogScanner(table, new int[]{0, 2}); + logScanner = createLogScanner(table, new int[] {0, 2}); subscribeFromBeginning(logScanner, table); count = 0; while (count < expectedSize) { diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index 8db65538f..641fe9264 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -846,7 +846,7 @@ void testUnsupportedStmtOnFirstRowMergeEngine() { } @Test - void testUnsupportedStmtOnVersionRowMergeEngine() { + void testUnsupportedStmtOnVersionMergeEngine() { String t1 = "versionMergeEngineTable"; TablePath tablePath = TablePath.of(DEFAULT_DB, t1); tBatchEnv.executeSql( @@ -891,20 +891,14 @@ void testVersionMergeEngineWithTypeBigint() throws Exception { tEnv.executeSql( "create table merge_engine_with_version (a int not null primary key not enforced," + " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); - tEnv.executeSql( - "create table log_sink (a int not null primary key not enforced, b string, ts bigint)"); - - JobClient insertJobClient = - tEnv.executeSql("insert into log_sink select * from merge_engine_with_version") - .getJobClient() - .get(); // insert once tEnv.executeSql( "insert into merge_engine_with_version (a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)") .await(); - CloseableIterator rowIter = tEnv.executeSql("select * from log_sink").collect(); + CloseableIterator rowIter = + tEnv.executeSql("select * from merge_engine_with_version").collect(); // id=1 not update List expectedRows = @@ -914,12 +908,10 @@ void testVersionMergeEngineWithTypeBigint() throws Exception { // insert again, update id=3 tEnv.executeSql( - "insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000)") + "insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000), (2, 'v22', 1000)") .await(); expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]"); assertResultsIgnoreOrder(rowIter, expectedRows, false); - - insertJobClient.cancel().get(); } @Test @@ -934,7 +926,8 @@ void testVersionMergeEngineWithTypeTimestamp() throws Exception { + "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), " + "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), " + "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), " - + "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');") + + "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123')," + + "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');") .await(); CloseableIterator rowIter = @@ -950,6 +943,66 @@ void testVersionMergeEngineWithTypeTimestamp() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + @Test + void testVersionMergeEngineWithTypeTimestamp9() throws Exception { + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts TIMESTAMP(9)) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + + // insert once + tEnv.executeSql( + "INSERT INTO merge_engine_with_version (a, b, ts) VALUES " + + "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123456789'), " + + "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123456789'), " + + "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123456789'), " + + "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123456789')," + + "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123456789');") + .await(); + + CloseableIterator rowIter = + tEnv.executeSql("select * from merge_engine_with_version").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList( + "+I[1, v1, 2024-12-27T12:00:00.123456789]", + "+I[2, v2, 2024-12-27T12:00:00.123456789]", + "+I[3, v3, 2024-12-27T12:00:00.123456789]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + + @Test + void testVersionMergeEngineWithTypeTimestampLTZ9() throws Exception { + + tEnv.getConfig().set("table.local-time-zone", "UTC"); + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts TIMESTAMP(9) WITH LOCAL TIME ZONE ) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + + // insert once + tEnv.executeSql( + "INSERT INTO merge_engine_with_version (a, b, ts) VALUES " + + "(1, 'v1', CAST(TO_TIMESTAMP('2024-12-27 12:00:00.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), " + + "(2, 'v2', CAST(TO_TIMESTAMP('2024-12-27 12:00:00.987654321', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), " + + "(1, 'v11', CAST(TO_TIMESTAMP('2024-12-27 11:59:59.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), " + + "(3, 'v3', CAST(TO_TIMESTAMP('2024-12-27 12:00:00.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE))," + + "(3, 'v33', CAST(TO_TIMESTAMP('2024-12-27 12:00:00.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS TIMESTAMP(9) WITH LOCAL TIME ZONE));") + .await(); + + CloseableIterator rowIter = + tEnv.executeSql("select * from merge_engine_with_version").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList( + "+I[1, v1, 2024-12-27T12:00:00.123Z]", + "+I[2, v2, 2024-12-27T12:00:00.987Z]", + "+I[3, v3, 2024-12-27T12:00:00.123Z]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + private InsertAndExpectValues rowsToInsertInto(Collection partitions) { List insertValues = new ArrayList<>(); List expectedValues = new ArrayList<>(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index a7025b285..7307def9c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -51,7 +51,7 @@ import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import com.alibaba.fluss.server.coordinator.event.EventManager; import com.alibaba.fluss.server.entity.CommitKvSnapshotData; -import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.VersionRowMerger; import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot; import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde; import com.alibaba.fluss.server.metadata.ServerMetadataCache; @@ -203,14 +203,14 @@ private void checkMergeEngine(MergeEngine mergeEngine, Schema schema) { RowType rowType = schema.toRowType(); int fieldIndex = rowType.getFieldIndex(column); if (fieldIndex == -1) { - throw new IllegalArgumentException( + throw new InvalidTableException( String.format( "The version merge engine column %s does not exist.", column)); } DataType dataType = rowType.getTypeAt(fieldIndex); - if (!VersionRowMergeEngine.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains( - dataType.getClass().getName())) { - throw new IllegalArgumentException( + if (!VersionRowMerger.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains( + dataType.getTypeRoot())) { + throw new InvalidTableException( String.format( "The version merge engine column does not support type %s .", dataType)); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 1bc190f0c..6eaad683b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -39,10 +39,10 @@ import com.alibaba.fluss.row.arrow.ArrowWriterProvider; import com.alibaba.fluss.row.encode.ValueDecoder; import com.alibaba.fluss.row.encode.ValueEncoder; -import com.alibaba.fluss.server.kv.mergeengine.DeduplicateRowMergeEngine; -import com.alibaba.fluss.server.kv.mergeengine.FirstRowMergeEngine; -import com.alibaba.fluss.server.kv.mergeengine.RowMergeEngine; -import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.DeduplicateRowMerger; +import com.alibaba.fluss.server.kv.mergeengine.FirstRowMerger; +import com.alibaba.fluss.server.kv.mergeengine.RowMerger; +import com.alibaba.fluss.server.kv.mergeengine.VersionRowMerger; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdaterCache; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -278,13 +278,13 @@ public LogAppendInfo putAsLeader( KvRecordReadContext.createReadContext(kvFormat, fieldTypes); ValueDecoder valueDecoder = new ValueDecoder(readContext.getRowDecoder(schemaId)); - RowMergeEngine rowMergeEngine = getRowMergeEngine(schema); + RowMerger rowMerger = getRowMerger(schema); for (KvRecord kvRecord : kvRecords.records(readContext)) { byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); if (kvRecord.getRow() == null) { // currently, all supported merge engine will ignore delete row. - if (rowMergeEngine.ignoreDelete()) { + if (rowMerger.ignoreDelete()) { continue; } // it's for deletion @@ -320,7 +320,7 @@ public LogAppendInfo putAsLeader( BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = updateRow(oldRow, kvRecord.getRow(), partialUpdater); - newRow = rowMergeEngine.merge(oldRow, newRow); + newRow = rowMerger.merge(oldRow, newRow); if (newRow == null) { continue; } @@ -374,16 +374,16 @@ public LogAppendInfo putAsLeader( }); } - private RowMergeEngine getRowMergeEngine(Schema schema) { + private RowMerger getRowMerger(Schema schema) { if (mergeEngine != null) { switch (mergeEngine.getType()) { case VERSION: - return new VersionRowMergeEngine(schema, mergeEngine); + return new VersionRowMerger(schema, mergeEngine); case FIRST_ROW: - return new FirstRowMergeEngine(); + return new FirstRowMerger(); } } - return new DeduplicateRowMergeEngine(); + return new DeduplicateRowMerger(); } private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMerger.java similarity index 92% rename from fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMerger.java index b3abe51c4..77016cd93 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMerger.java @@ -13,12 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; /** The default merge engine always retains the last record. */ -public class DeduplicateRowMergeEngine implements RowMergeEngine { +public class DeduplicateRowMerger implements RowMerger { @Override public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { return newRow; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMerger.java similarity index 87% rename from fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMerger.java index e834f9435..34b4e36dd 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMerger.java @@ -13,12 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; /** The first row merge engine for primary key table. Always retain the first row. */ -public class FirstRowMergeEngine implements RowMergeEngine { +public class FirstRowMerger implements RowMerger { @Override public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { return null; @@ -28,9 +29,4 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { public boolean ignoreDelete() { return true; } - - @Override - public boolean ignorePartialUpdate() { - return true; - } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMerger.java similarity index 89% rename from fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMerger.java index 1ea0bda91..d31fb7cef 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMerger.java @@ -13,19 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; /** The row merge engine for primary key table. */ -public interface RowMergeEngine { +public interface RowMerger { BinaryRow merge(BinaryRow oldRow, BinaryRow newRow); default boolean ignoreDelete() { return false; } - - default boolean ignorePartialUpdate() { - return false; - } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMerger.java similarity index 78% rename from fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMerger.java index 54617cc81..22b489487 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMerger.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.metadata.MergeEngine; @@ -24,6 +25,7 @@ import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet; import com.alibaba.fluss.types.BigIntType; import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DataTypeRoot; import com.alibaba.fluss.types.IntType; import com.alibaba.fluss.types.LocalZonedTimestampType; import com.alibaba.fluss.types.RowType; @@ -36,21 +38,20 @@ * The version row merge engine for primary key table. The update will only occur if the new value * of the specified version field is greater than the old value. */ -public class VersionRowMergeEngine implements RowMergeEngine { +public class VersionRowMerger implements RowMerger { - public static final Set VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES = + public static final Set VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES = ImmutableSet.of( - BigIntType.class.getName(), - IntType.class.getName(), - TimestampType.class.getName(), - TimeType.class.getName(), - LocalZonedTimestampType.class.getName()); + DataTypeRoot.BIGINT, + DataTypeRoot.INTEGER, + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); private final int fieldIndex; private final InternalRow.FieldGetter fieldGetter; private final RowType rowType; - public VersionRowMergeEngine(Schema schema, MergeEngine mergeEngine) { + public VersionRowMerger(Schema schema, MergeEngine mergeEngine) { this.rowType = schema.toRowType(); InternalRow.FieldGetter[] currentFieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()]; @@ -80,7 +81,9 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { return newRow; } DataType dataType = rowType.getTypeAt(fieldIndex); - return getValueComparator(dataType).isGreaterThan(newValue, oldValue) ? newRow : null; + return getValueComparator(dataType).isGreaterOrEqualThan(newValue, oldValue) + ? newRow + : null; } @Override @@ -88,11 +91,6 @@ public boolean ignoreDelete() { return true; } - @Override - public boolean ignorePartialUpdate() { - return true; - } - private ValueComparator getValueComparator(DataType dataType) { if (dataType instanceof BigIntType) { return (left, right) -> (Long) left > (Long) right; @@ -101,18 +99,15 @@ private ValueComparator getValueComparator(DataType dataType) { return (left, right) -> (Integer) left > (Integer) right; } if (dataType instanceof TimestampType) { - return (left, right) -> - ((TimestampNtz) left).getMillisecond() - > ((TimestampNtz) right).getMillisecond(); + return (left, right) -> ((TimestampNtz) left).compareTo((TimestampNtz) right) > 0; } if (dataType instanceof LocalZonedTimestampType) { - return (left, right) -> - ((TimestampLtz) left).toEpochMicros() > ((TimestampLtz) right).toEpochMicros(); + return (left, right) -> ((TimestampLtz) left).compareTo((TimestampLtz) right) > 0; } throw new UnsupportedOperationException("Unsupported data type: " + dataType); } interface ValueComparator { - boolean isGreaterThan(Object left, Object right); + boolean isGreaterOrEqualThan(Object left, Object right); } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 4bfcd2b52..57340f7be 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -658,6 +658,7 @@ void testVersionRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) "k2".getBytes(), new Object[] {2, 1000L}), // not update kvRecordFactory.ofRecord( "k1".getBytes(), new Object[] {1, 1001L}), // -U , +U + kvRecordFactory.ofRecord("k1".getBytes(), null), // not update kvRecordFactory.ofRecord("k3".getBytes(), new Object[] {3, 1000L})); // +I KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2); kvTablet.putAsLeader(kvRecordBatch2, null, DATA3_SCHEMA_PK);