From 8ebe6c31260d6f2ad4c2af7fda4e8bff546bcf05 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sat, 11 Jan 2025 20:29:59 +0800 Subject: [PATCH] fixed --- .../fluss/client/table/FlussTableITCase.java | 7 ++-- .../flink/sink/FlinkTableSinkITCase.java | 19 ++++------ .../coordinator/CoordinatorService.java | 10 +++--- .../com/alibaba/fluss/server/kv/KvTablet.java | 22 ++++++------ ...eEngine.java => DeduplicateRowMerger.java} | 3 +- ...owMergeEngine.java => FirstRowMerger.java} | 8 ++--- .../{RowMergeEngine.java => RowMerger.java} | 7 ++-- ...MergeEngine.java => VersionRowMerger.java} | 35 ++++++++----------- .../alibaba/fluss/server/kv/KvTabletTest.java | 1 + 9 files changed, 47 insertions(+), 65 deletions(-) rename fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/{DeduplicateRowMergeEngine.java => DeduplicateRowMerger.java} (92%) rename fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/{FirstRowMergeEngine.java => FirstRowMerger.java} (87%) rename fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/{RowMergeEngine.java => RowMerger.java} (89%) rename fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/{VersionRowMergeEngine.java => VersionRowMerger.java} (78%) diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 363b57c4..d1fa5e84 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -930,7 +930,6 @@ void testFirstRowMergeEngine() throws Exception { } } - @ParameterizedTest @CsvSource({"none,3", "lz4_frame,3", "zstd,3", "zstd,9"}) void testArrowCompressionAndProject(String compression, String level) throws Exception { @@ -951,12 +950,12 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc createTable(tablePath, tableDescriptor, false); try (Connection conn = ConnectionFactory.createConnection(clientConf); - Table table = conn.getTable(tablePath)) { + Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); int expectedSize = 30; for (int i = 0; i < expectedSize; i++) { String value = i % 2 == 0 ? "hello, friend " + i : null; - InternalRow row = row(schema.toRowType(), new Object[]{i, 100, value, i * 10L}); + InternalRow row = row(schema.toRowType(), new Object[] {i, 100, value, i * 10L}); appendWriter.append(row); if (i % 10 == 0) { // insert 3 bathes, each batch has 10 rows @@ -989,7 +988,7 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc logScanner.close(); // fetch data with project. - logScanner = createLogScanner(table, new int[]{0, 2}); + logScanner = createLogScanner(table, new int[] {0, 2}); subscribeFromBeginning(logScanner, table); count = 0; while (count < expectedSize) { diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index 8db65538..60c88e27 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -846,7 +846,7 @@ void testUnsupportedStmtOnFirstRowMergeEngine() { } @Test - void testUnsupportedStmtOnVersionRowMergeEngine() { + void testUnsupportedStmtOnVersionMergeEngine() { String t1 = "versionMergeEngineTable"; TablePath tablePath = TablePath.of(DEFAULT_DB, t1); tBatchEnv.executeSql( @@ -891,20 +891,14 @@ void testVersionMergeEngineWithTypeBigint() throws Exception { tEnv.executeSql( "create table merge_engine_with_version (a int not null primary key not enforced," + " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); - tEnv.executeSql( - "create table log_sink (a int not null primary key not enforced, b string, ts bigint)"); - - JobClient insertJobClient = - tEnv.executeSql("insert into log_sink select * from merge_engine_with_version") - .getJobClient() - .get(); // insert once tEnv.executeSql( "insert into merge_engine_with_version (a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)") .await(); - CloseableIterator rowIter = tEnv.executeSql("select * from log_sink").collect(); + CloseableIterator rowIter = + tEnv.executeSql("select * from merge_engine_with_version").collect(); // id=1 not update List expectedRows = @@ -914,12 +908,10 @@ void testVersionMergeEngineWithTypeBigint() throws Exception { // insert again, update id=3 tEnv.executeSql( - "insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000)") + "insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000), (2, 'v22', 1000)") .await(); expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]"); assertResultsIgnoreOrder(rowIter, expectedRows, false); - - insertJobClient.cancel().get(); } @Test @@ -934,7 +926,8 @@ void testVersionMergeEngineWithTypeTimestamp() throws Exception { + "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), " + "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), " + "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), " - + "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');") + + "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123')," + + "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');") .await(); CloseableIterator rowIter = diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index a7025b28..7307def9 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -51,7 +51,7 @@ import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import com.alibaba.fluss.server.coordinator.event.EventManager; import com.alibaba.fluss.server.entity.CommitKvSnapshotData; -import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.VersionRowMerger; import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot; import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde; import com.alibaba.fluss.server.metadata.ServerMetadataCache; @@ -203,14 +203,14 @@ private void checkMergeEngine(MergeEngine mergeEngine, Schema schema) { RowType rowType = schema.toRowType(); int fieldIndex = rowType.getFieldIndex(column); if (fieldIndex == -1) { - throw new IllegalArgumentException( + throw new InvalidTableException( String.format( "The version merge engine column %s does not exist.", column)); } DataType dataType = rowType.getTypeAt(fieldIndex); - if (!VersionRowMergeEngine.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains( - dataType.getClass().getName())) { - throw new IllegalArgumentException( + if (!VersionRowMerger.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains( + dataType.getTypeRoot())) { + throw new InvalidTableException( String.format( "The version merge engine column does not support type %s .", dataType)); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 1bc190f0..6eaad683 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -39,10 +39,10 @@ import com.alibaba.fluss.row.arrow.ArrowWriterProvider; import com.alibaba.fluss.row.encode.ValueDecoder; import com.alibaba.fluss.row.encode.ValueEncoder; -import com.alibaba.fluss.server.kv.mergeengine.DeduplicateRowMergeEngine; -import com.alibaba.fluss.server.kv.mergeengine.FirstRowMergeEngine; -import com.alibaba.fluss.server.kv.mergeengine.RowMergeEngine; -import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.DeduplicateRowMerger; +import com.alibaba.fluss.server.kv.mergeengine.FirstRowMerger; +import com.alibaba.fluss.server.kv.mergeengine.RowMerger; +import com.alibaba.fluss.server.kv.mergeengine.VersionRowMerger; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdaterCache; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -278,13 +278,13 @@ public LogAppendInfo putAsLeader( KvRecordReadContext.createReadContext(kvFormat, fieldTypes); ValueDecoder valueDecoder = new ValueDecoder(readContext.getRowDecoder(schemaId)); - RowMergeEngine rowMergeEngine = getRowMergeEngine(schema); + RowMerger rowMerger = getRowMerger(schema); for (KvRecord kvRecord : kvRecords.records(readContext)) { byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); if (kvRecord.getRow() == null) { // currently, all supported merge engine will ignore delete row. - if (rowMergeEngine.ignoreDelete()) { + if (rowMerger.ignoreDelete()) { continue; } // it's for deletion @@ -320,7 +320,7 @@ public LogAppendInfo putAsLeader( BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = updateRow(oldRow, kvRecord.getRow(), partialUpdater); - newRow = rowMergeEngine.merge(oldRow, newRow); + newRow = rowMerger.merge(oldRow, newRow); if (newRow == null) { continue; } @@ -374,16 +374,16 @@ public LogAppendInfo putAsLeader( }); } - private RowMergeEngine getRowMergeEngine(Schema schema) { + private RowMerger getRowMerger(Schema schema) { if (mergeEngine != null) { switch (mergeEngine.getType()) { case VERSION: - return new VersionRowMergeEngine(schema, mergeEngine); + return new VersionRowMerger(schema, mergeEngine); case FIRST_ROW: - return new FirstRowMergeEngine(); + return new FirstRowMerger(); } } - return new DeduplicateRowMergeEngine(); + return new DeduplicateRowMerger(); } private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMerger.java similarity index 92% rename from fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMerger.java index b3abe51c..77016cd9 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMerger.java @@ -13,12 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; /** The default merge engine always retains the last record. */ -public class DeduplicateRowMergeEngine implements RowMergeEngine { +public class DeduplicateRowMerger implements RowMerger { @Override public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { return newRow; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMerger.java similarity index 87% rename from fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMerger.java index e834f943..34b4e36d 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMerger.java @@ -13,12 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; /** The first row merge engine for primary key table. Always retain the first row. */ -public class FirstRowMergeEngine implements RowMergeEngine { +public class FirstRowMerger implements RowMerger { @Override public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { return null; @@ -28,9 +29,4 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { public boolean ignoreDelete() { return true; } - - @Override - public boolean ignorePartialUpdate() { - return true; - } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMerger.java similarity index 89% rename from fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMerger.java index 1ea0bda9..d31fb7ce 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMerger.java @@ -13,19 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; /** The row merge engine for primary key table. */ -public interface RowMergeEngine { +public interface RowMerger { BinaryRow merge(BinaryRow oldRow, BinaryRow newRow); default boolean ignoreDelete() { return false; } - - default boolean ignorePartialUpdate() { - return false; - } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMerger.java similarity index 78% rename from fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMerger.java index 54617cc8..22b48948 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMerger.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.metadata.MergeEngine; @@ -24,6 +25,7 @@ import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet; import com.alibaba.fluss.types.BigIntType; import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DataTypeRoot; import com.alibaba.fluss.types.IntType; import com.alibaba.fluss.types.LocalZonedTimestampType; import com.alibaba.fluss.types.RowType; @@ -36,21 +38,20 @@ * The version row merge engine for primary key table. The update will only occur if the new value * of the specified version field is greater than the old value. */ -public class VersionRowMergeEngine implements RowMergeEngine { +public class VersionRowMerger implements RowMerger { - public static final Set VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES = + public static final Set VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES = ImmutableSet.of( - BigIntType.class.getName(), - IntType.class.getName(), - TimestampType.class.getName(), - TimeType.class.getName(), - LocalZonedTimestampType.class.getName()); + DataTypeRoot.BIGINT, + DataTypeRoot.INTEGER, + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); private final int fieldIndex; private final InternalRow.FieldGetter fieldGetter; private final RowType rowType; - public VersionRowMergeEngine(Schema schema, MergeEngine mergeEngine) { + public VersionRowMerger(Schema schema, MergeEngine mergeEngine) { this.rowType = schema.toRowType(); InternalRow.FieldGetter[] currentFieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()]; @@ -80,7 +81,9 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { return newRow; } DataType dataType = rowType.getTypeAt(fieldIndex); - return getValueComparator(dataType).isGreaterThan(newValue, oldValue) ? newRow : null; + return getValueComparator(dataType).isGreaterOrEqualThan(newValue, oldValue) + ? newRow + : null; } @Override @@ -88,11 +91,6 @@ public boolean ignoreDelete() { return true; } - @Override - public boolean ignorePartialUpdate() { - return true; - } - private ValueComparator getValueComparator(DataType dataType) { if (dataType instanceof BigIntType) { return (left, right) -> (Long) left > (Long) right; @@ -101,18 +99,15 @@ private ValueComparator getValueComparator(DataType dataType) { return (left, right) -> (Integer) left > (Integer) right; } if (dataType instanceof TimestampType) { - return (left, right) -> - ((TimestampNtz) left).getMillisecond() - > ((TimestampNtz) right).getMillisecond(); + return (left, right) -> ((TimestampNtz) left).compareTo((TimestampNtz) right) > 0; } if (dataType instanceof LocalZonedTimestampType) { - return (left, right) -> - ((TimestampLtz) left).toEpochMicros() > ((TimestampLtz) right).toEpochMicros(); + return (left, right) -> ((TimestampLtz) left).compareTo((TimestampLtz) right) > 0; } throw new UnsupportedOperationException("Unsupported data type: " + dataType); } interface ValueComparator { - boolean isGreaterThan(Object left, Object right); + boolean isGreaterOrEqualThan(Object left, Object right); } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 4bfcd2b5..57340f7b 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -658,6 +658,7 @@ void testVersionRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) "k2".getBytes(), new Object[] {2, 1000L}), // not update kvRecordFactory.ofRecord( "k1".getBytes(), new Object[] {1, 1001L}), // -U , +U + kvRecordFactory.ofRecord("k1".getBytes(), null), // not update kvRecordFactory.ofRecord("k3".getBytes(), new Object[] {3, 1000L})); // +I KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2); kvTablet.putAsLeader(kvRecordBatch2, null, DATA3_SCHEMA_PK);