From 40b7987003f7d309ad33ee894092276f63420a76 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Wed, 8 Jan 2025 00:11:05 +0800 Subject: [PATCH] fixed --- .../fluss/client/table/FlussTableITCase.java | 50 +++--- .../alibaba/fluss/metadata/MergeEngine.java | 50 ++---- .../fluss/metadata/TableDescriptor.java | 2 +- .../flink/catalog/FlinkTableFactory.java | 4 +- .../connector/flink/sink/FlinkTableSink.java | 2 +- .../flink/sink/FlinkTableSinkITCase.java | 12 -- .../com/alibaba/fluss/server/kv/KvTablet.java | 145 ++++++++++------ .../kv/mergeengine/DefaultRowMergeEngine.java | 50 ------ .../kv/mergeengine/FirstRowMergeEngine.java | 47 ++--- .../server/kv/mergeengine/RowMergeEngine.java | 162 +----------------- .../kv/mergeengine/VersionRowMergeEngine.java | 76 +++----- .../alibaba/fluss/server/kv/KvTabletTest.java | 2 +- 12 files changed, 175 insertions(+), 427 deletions(-) delete mode 100644 fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DefaultRowMergeEngine.java diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 923ccff5f..cf51f0b7b 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -911,7 +911,6 @@ void testFirstRowMergeEngine() throws Exception { .getRow(); assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id)); } - // check scan change log LogScanner logScanner = table.getLogScanner(new LogScan()); logScanner.subscribeFromBeginning(0); List actualLogRecords = new ArrayList<>(0); @@ -946,55 +945,54 @@ void testMergeEngineWithVersion() throws Exception { try (Table table = conn.getTable(DATA3_TABLE_PATH_PK)) { // put rows. UpsertWriter upsertWriter = table.getUpsertWriter(); - List expectedRows = new ArrayList<>(rows); + List expectedScanRecords = new ArrayList<>(rows); // init rows. for (int row = 0; row < rows; row++) { upsertWriter.upsert(compactedRow(rowType, new Object[] {row, 1000L})); - expectedRows.add(compactedRow(rowType, new Object[] {row, 1000L})); + expectedScanRecords.add( + new ScanRecord(compactedRow(rowType, new Object[] {row, 1000L}))); } // update row if id=0 and version < 1000L, will not update upsertWriter.upsert(compactedRow(rowType, new Object[] {0, 999L})); // update if version> 1000L upsertWriter.upsert(compactedRow(rowType, new Object[] {1, 1001L})); + // update_before record, don't care about offset/timestamp + expectedScanRecords.add( + new ScanRecord( + -1, + -1, + RowKind.UPDATE_BEFORE, + compactedRow(rowType, new Object[] {1, 1000L}))); + // update_after record + expectedScanRecords.add( + new ScanRecord( + -1, + -1, + RowKind.UPDATE_AFTER, + compactedRow(rowType, new Object[] {1, 1001L}))); rows = rows + 2; upsertWriter.flush(); - // check scan change log LogScanner logScanner = table.getLogScanner(new LogScan()); logScanner.subscribeFromBeginning(0); - List actualLogRecords = new ArrayList<>(0); + List actualLogRecords = new ArrayList<>(rows); while (actualLogRecords.size() < rows) { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); scanRecords.forEach(actualLogRecords::add); } assertThat(actualLogRecords).hasSize(rows); - for (int i = 0; i < 3; i++) { - ScanRecord scanRecord = actualLogRecords.get(i); - assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT); - assertThatRow(scanRecord.getRow()) + for (int i = 0; i < rows; i++) { + ScanRecord actualScanRecord = actualLogRecords.get(i); + ScanRecord expectedRecord = expectedScanRecords.get(i); + assertThat(actualScanRecord.getRowKind()).isEqualTo(expectedRecord.getRowKind()); + assertThatRow(actualScanRecord.getRow()) .withSchema(rowType) - .isEqualTo(expectedRows.get(i)); + .isEqualTo(expectedRecord.getRow()); } - - // update_before for id =1 - List updateActualLogRecords = new ArrayList<>(actualLogRecords); - - ScanRecord beforeRecord = updateActualLogRecords.get(3); - assertThat(beforeRecord.getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE); - assertThat(beforeRecord.getRow().getFieldCount()).isEqualTo(2); - assertThat(beforeRecord.getRow().getInt(0)).isEqualTo(1); - assertThat(beforeRecord.getRow().getLong(1)).isEqualTo(1000); - - // update_after for id =1 - ScanRecord afterRecord = updateActualLogRecords.get(4); - assertThat(afterRecord.getRowKind()).isEqualTo(RowKind.UPDATE_AFTER); - assertThat(afterRecord.getRow().getFieldCount()).isEqualTo(2); - assertThat(afterRecord.getRow().getInt(0)).isEqualTo(1); - assertThat(afterRecord.getRow().getLong(1)).isEqualTo(1001); } } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java index 963f83907..505de2abc 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java @@ -18,31 +18,37 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets; +import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet; import com.alibaba.fluss.types.BigIntType; -import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.IntType; import com.alibaba.fluss.types.LocalZonedTimestampType; -import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.types.TimeType; import com.alibaba.fluss.types.TimestampType; +import javax.annotation.Nullable; + import java.util.Map; import java.util.Objects; import java.util.Set; -/** The merge engine for primary key table. */ +/** + * The merge engine for primary key table. + * + * @since 0.6 + */ public class MergeEngine { public static final Set VERSION_SUPPORTED_DATA_TYPES = - Sets.newHashSet( + ImmutableSet.of( BigIntType.class.getName(), IntType.class.getName(), TimestampType.class.getName(), TimeType.class.getName(), LocalZonedTimestampType.class.getName()); private final Type type; - private final String column; + + /** When merge engine type is version, column cannot be null. */ + @Nullable private final String column; private MergeEngine(Type type) { this(type, null); @@ -54,22 +60,14 @@ private MergeEngine(Type type, String column) { } public static MergeEngine create(Map properties) { - return create(properties, null); - } - - public static MergeEngine create(Map properties, RowType rowType) { - return create(Configuration.fromMap(properties), rowType); + return create(Configuration.fromMap(properties)); } - public static MergeEngine create(Configuration options, RowType rowType) { - if (options == null) { - return null; - } + private static MergeEngine create(Configuration options) { MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE); if (type == null) { return null; } - switch (type) { case FIRST_ROW: return new MergeEngine(Type.FIRST_ROW); @@ -77,23 +75,9 @@ public static MergeEngine create(Configuration options, RowType rowType) { String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN); if (column == null) { throw new IllegalArgumentException( - "When the merge engine is set to version, the 'table.merge-engine.version.column' cannot be empty."); - } - if (rowType != null) { - int fieldIndex = rowType.getFieldIndex(column); - if (fieldIndex == -1) { - throw new IllegalArgumentException( - String.format( - "When the merge engine is set to version, the column %s does not exist.", - column)); - } - DataType dataType = rowType.getTypeAt(fieldIndex); - if (!VERSION_SUPPORTED_DATA_TYPES.contains(dataType.getClass().getName())) { - throw new IllegalArgumentException( - String.format( - "The merge engine column is not support type %s .", - dataType.asSummaryString())); - } + String.format( + "When the merge engine is set to version, the '%s' must be set.", + ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key())); } return new MergeEngine(Type.VERSION, column); default: diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 028bbe243..a850201e4 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -281,7 +281,7 @@ public boolean isDataLakeEnabled() { } public @Nullable MergeEngine getMergeEngine() { - return MergeEngine.create(configuration(), schema.toRowType()); + return MergeEngine.create(properties); } public TableDescriptor copy(Map newProperties) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index 729667d7c..1e0713bbe 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -23,7 +23,6 @@ import com.alibaba.fluss.connector.flink.sink.FlinkTableSink; import com.alibaba.fluss.connector.flink.source.FlinkTableSource; import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils; -import com.alibaba.fluss.connector.flink.utils.FlinkConversions; import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.TablePath; @@ -152,8 +151,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { rowType, context.getPrimaryKeyIndexes(), isStreamingMode, - MergeEngine.create( - helper.getOptions().toMap(), FlinkConversions.toFlussRowType(rowType))); + MergeEngine.create(helper.getOptions().toMap())); } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java index 5cec7caf4..c0a33a9e9 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java @@ -75,7 +75,7 @@ public FlinkTableSink( RowType tableRowType, int[] primaryKeyIndexes, boolean streaming, - MergeEngine mergeEngine) { + @Nullable MergeEngine mergeEngine) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index eaee11451..6b28077c2 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -861,18 +861,6 @@ void testVersionMergeEngineWithTypeBigint() throws Exception { expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]"); assertResultsIgnoreOrder(rowIter, expectedRows, false); - // test update a=4 - tBatchEnv - .executeSql("UPDATE merge_engine_with_version SET b = 'v45', ts=1001 WHERE a = 4") - .await(); - expectedRows = Arrays.asList("-U[4, v44, 1000]", "+U[4, v45, 1001]"); - assertResultsIgnoreOrder(rowIter, expectedRows, false); - - // test delete a=4 - tBatchEnv.executeSql("delete from merge_engine_with_version WHERE a = 4").await(); - expectedRows = Arrays.asList("-D[4, v45, 1001]"); - assertResultsIgnoreOrder(rowIter, expectedRows, false); - insertJobClient.cancel().get(); } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index d1070eaaf..df568273c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -32,12 +32,13 @@ import com.alibaba.fluss.record.KvRecord; import com.alibaba.fluss.record.KvRecordBatch; import com.alibaba.fluss.record.KvRecordReadContext; +import com.alibaba.fluss.record.RowKind; import com.alibaba.fluss.row.BinaryRow; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.arrow.ArrowWriterPool; import com.alibaba.fluss.row.arrow.ArrowWriterProvider; import com.alibaba.fluss.row.encode.ValueDecoder; -import com.alibaba.fluss.server.kv.mergeengine.DefaultRowMergeEngine; +import com.alibaba.fluss.row.encode.ValueEncoder; import com.alibaba.fluss.server.kv.mergeengine.FirstRowMergeEngine; import com.alibaba.fluss.server.kv.mergeengine.RowMergeEngine; import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; @@ -59,6 +60,7 @@ import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.utils.BytesUtils; import com.alibaba.fluss.utils.FileUtils; import com.alibaba.fluss.utils.FlussPaths; import com.alibaba.fluss.utils.types.Tuple2; @@ -261,28 +263,92 @@ public LogAppendInfo putAsLeader( long logEndOffsetOfPrevBatch = logTablet.localLogEndOffset(); DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]); try { + long logOffset = logEndOffsetOfPrevBatch; + // TODO: reuse the read context and decoder KvRecordBatch.ReadContext readContext = KvRecordReadContext.createReadContext(kvFormat, fieldTypes); ValueDecoder valueDecoder = new ValueDecoder(readContext.getRowDecoder(schemaId)); - + RowMergeEngine rowMergeEngine = getRowMergeEngine(schema); int appendedRecordCount = 0; - - RowMergeEngine rowMergeEngine = - getRowMergeEngine( - partialUpdater, - valueDecoder, - walBuilder, - kvPreWriteBuffer, - schema, - schemaId, - appendedRecordCount, - logEndOffsetOfPrevBatch); for (KvRecord kvRecord : kvRecords.records(readContext)) { - rowMergeEngine.writeRecord(kvRecord); + byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); + KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); + if (kvRecord.getRow() == null) { + // it's for deletion + byte[] oldValue = getFromBufferOrKv(key); + if (oldValue == null) { + // there might be large amount of such deletion, so we don't log + LOG.debug( + "The specific key can't be found in kv tablet although the kv record is for deletion, " + + "ignore it directly as it doesn't exist in the kv tablet yet."); + } else { + + BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; + BinaryRow newRow = deleteRow(oldRow, partialUpdater); + if (rowMergeEngine.shouldSkipDeletion(newRow)) { + continue; + } + newRow = rowMergeEngine.merge(oldRow, newRow); + if (newRow == null) { + continue; + } + + // if newRow is null, it means the row should be deleted + if (newRow == null) { + walBuilder.append(RowKind.DELETE, oldRow); + appendedRecordCount += 1; + kvPreWriteBuffer.delete(key, logOffset++); + } else { + // otherwise, it's a partial update, should produce -U,+U + walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); + walBuilder.append(RowKind.UPDATE_AFTER, newRow); + appendedRecordCount += 2; + kvPreWriteBuffer.put( + key, + ValueEncoder.encodeValue(schemaId, newRow), + logOffset + 1); + logOffset += 2; + } + } + } else { + // upsert operation + byte[] oldValue = getFromBufferOrKv(key); + // it's update + if (oldValue != null) { + BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; + BinaryRow newRow = + updateRow(oldRow, kvRecord.getRow(), partialUpdater); + newRow = rowMergeEngine.merge(oldRow, newRow); + if (newRow == null) { + continue; + } + walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); + walBuilder.append(RowKind.UPDATE_AFTER, newRow); + appendedRecordCount += 2; + // logOffset is for -U, logOffset + 1 is for +U, we need to use + // the log offset for +U + kvPreWriteBuffer.put( + key, + ValueEncoder.encodeValue(schemaId, newRow), + logOffset + 1); + logOffset += 2; + } else { + // it's insert + // TODO: we should add guarantees that all non-specified columns + // of the input row are set to null. + BinaryRow newRow = kvRecord.getRow(); + walBuilder.append(RowKind.INSERT, newRow); + appendedRecordCount += 1; + kvPreWriteBuffer.put( + key, + ValueEncoder.encodeValue(schemaId, newRow), + logOffset++); + } + } } - appendedRecordCount = rowMergeEngine.getAppendedRecordCount(); + // if appendedRecordCount is 0, it means there is no record to append, we // should not append. if (appendedRecordCount > 0) { @@ -314,52 +380,21 @@ public LogAppendInfo putAsLeader( }); } - private RowMergeEngine getRowMergeEngine( - PartialUpdater partialUpdater, - ValueDecoder valueDecoder, - WalBuilder walBuilder, - KvPreWriteBuffer kvPreWriteBuffer, - Schema schema, - short schemaId, - int appendedRecordCount, - long logOffset) { + private RowMergeEngine getRowMergeEngine(Schema schema) { if (mergeEngine != null) { switch (mergeEngine.getType()) { case VERSION: - return new VersionRowMergeEngine( - partialUpdater, - valueDecoder, - walBuilder, - kvPreWriteBuffer, - rocksDBKv, - schema, - schemaId, - appendedRecordCount, - logOffset, - mergeEngine); + return new VersionRowMergeEngine(schema, mergeEngine); case FIRST_ROW: - return new FirstRowMergeEngine( - partialUpdater, - valueDecoder, - walBuilder, - kvPreWriteBuffer, - rocksDBKv, - schema, - schemaId, - appendedRecordCount, - logOffset); + return new FirstRowMergeEngine(); } } - return new DefaultRowMergeEngine( - partialUpdater, - valueDecoder, - walBuilder, - kvPreWriteBuffer, - rocksDBKv, - schema, - schemaId, - appendedRecordCount, - logOffset); + return new RowMergeEngine() { + @Override + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + return newRow; + } + }; } private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DefaultRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DefaultRowMergeEngine.java deleted file mode 100644 index c944268c6..000000000 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DefaultRowMergeEngine.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2024 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.fluss.server.kv.mergeengine; - -import com.alibaba.fluss.metadata.Schema; -import com.alibaba.fluss.row.encode.ValueDecoder; -import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; -import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; -import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; -import com.alibaba.fluss.server.kv.wal.WalBuilder; - -/** A wrapper for no merge engine table. */ -public class DefaultRowMergeEngine extends RowMergeEngine { - - public DefaultRowMergeEngine( - PartialUpdater partialUpdater, - ValueDecoder valueDecoder, - WalBuilder walBuilder, - KvPreWriteBuffer kvPreWriteBuffer, - RocksDBKv rocksDBKv, - Schema schema, - short schemaId, - int appendedRecordCount, - long logOffset) { - super( - partialUpdater, - valueDecoder, - walBuilder, - kvPreWriteBuffer, - rocksDBKv, - schema, - schemaId, - appendedRecordCount, - logOffset); - } -} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java index fceb3b380..942d3eef2 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java @@ -13,50 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.alibaba.fluss.server.kv.mergeengine; -import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.row.BinaryRow; -import com.alibaba.fluss.row.encode.ValueDecoder; -import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; -import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; -import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; -import com.alibaba.fluss.server.kv.wal.WalBuilder; - -/** A wrapper for first row merge engine. */ -public class FirstRowMergeEngine extends RowMergeEngine { - - public FirstRowMergeEngine( - PartialUpdater partialUpdater, - ValueDecoder valueDecoder, - WalBuilder walBuilder, - KvPreWriteBuffer kvPreWriteBuffer, - RocksDBKv rocksDBKv, - Schema schema, - short schemaId, - int appendedRecordCount, - long logOffset) { - super( - partialUpdater, - valueDecoder, - walBuilder, - kvPreWriteBuffer, - rocksDBKv, - schema, - schemaId, - appendedRecordCount, - logOffset); - } +/** + * The first row merge engine for primary key table. Always retain the first row. + * + * @since 0.6 + */ +public class FirstRowMergeEngine implements RowMergeEngine { @Override - protected void deleteOrUpdate(KvPreWriteBuffer.Key key, byte[] oldValue) throws Exception { - // When Merge engine is "first_row", We don't need to do any update and delete operations. + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + return oldRow == null ? newRow : null; } @Override - protected void update(KvPreWriteBuffer.Key key, BinaryRow oldRow, BinaryRow newRow) - throws Exception { - // When Merge engine is "first_row", We don't need to do any update operations. + public boolean shouldSkipDeletion(BinaryRow newRow) { + return newRow == null; } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java index 85f04f0ac..56831cdf1 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java @@ -13,163 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.alibaba.fluss.server.kv.mergeengine; -import com.alibaba.fluss.metadata.Schema; -import com.alibaba.fluss.record.KvRecord; -import com.alibaba.fluss.record.RowKind; import com.alibaba.fluss.row.BinaryRow; -import com.alibaba.fluss.row.InternalRow; -import com.alibaba.fluss.row.encode.ValueDecoder; -import com.alibaba.fluss.row.encode.ValueEncoder; -import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; -import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; -import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; -import com.alibaba.fluss.server.kv.wal.WalBuilder; -import com.alibaba.fluss.utils.BytesUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.IOException; - -/** Abstract common operations for merge engine. */ -public abstract class RowMergeEngine { - - private static final Logger LOG = LoggerFactory.getLogger(RowMergeEngine.class); - protected final PartialUpdater partialUpdater; - protected final ValueDecoder valueDecoder; - protected final WalBuilder walBuilder; - protected final KvPreWriteBuffer kvPreWriteBuffer; - protected final RocksDBKv rocksDBKv; - protected final Schema schema; - protected final short schemaId; - protected int appendedRecordCount; - protected long logOffset; - - public RowMergeEngine( - PartialUpdater partialUpdater, - ValueDecoder valueDecoder, - WalBuilder walBuilder, - KvPreWriteBuffer kvPreWriteBuffer, - RocksDBKv rocksDBKv, - Schema schema, - short schemaId, - int appendedRecordCount, - long logOffset) { - this.partialUpdater = partialUpdater; - this.valueDecoder = valueDecoder; - this.walBuilder = walBuilder; - this.kvPreWriteBuffer = kvPreWriteBuffer; - this.rocksDBKv = rocksDBKv; - this.schema = schema; - this.schemaId = schemaId; - this.appendedRecordCount = appendedRecordCount; - this.logOffset = logOffset; - } - - public int getAppendedRecordCount() { - return appendedRecordCount; - } - - public void writeRecord(KvRecord kvRecord) throws Exception { - byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); - KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); - byte[] oldValue = getFromBufferOrKv(key); - if (kvRecord.getRow() == null) { - // it's for deletion - if (oldValue == null) { - // there might be large amount of such deletion, so we don't log - LOG.debug( - "The specific key can't be found in kv tablet although the kv record is for deletion, " - + "ignore it directly as it doesn't exist in the kv tablet yet."); - } else { - deleteOrUpdate(key, oldValue); - } - } else { - // upsert operation - upsert(key, kvRecord, oldValue); - } - } - - private void upsert(KvPreWriteBuffer.Key key, KvRecord kvRecord, byte[] oldValue) - throws Exception { - // it's update - if (oldValue != null) { - BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; - BinaryRow newRow = updateRow(oldRow, kvRecord.getRow(), partialUpdater); - update(key, oldRow, newRow); - } else { - // it's insert - // TODO: we should add guarantees that all non-specified columns - // of the input row are set to null. - insert(key, kvRecord); - } - } - protected void deleteOrUpdate(KvPreWriteBuffer.Key key, byte[] oldValue) throws Exception { - BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; - BinaryRow newRow = deleteRow(oldRow, partialUpdater); - // if newRow is null, it means the row should be deleted - if (newRow == null) { - delete(key, oldRow); - } else { - // otherwise, it's a partial update, should produce -U,+U - update(key, oldRow, newRow); - } - } - - protected void delete(KvPreWriteBuffer.Key key, BinaryRow oldRow) throws Exception { - walBuilder.append(RowKind.DELETE, oldRow); - appendedRecordCount += 1; - kvPreWriteBuffer.delete(key, logOffset++); - } - - protected void insert(KvPreWriteBuffer.Key key, KvRecord kvRecord) throws Exception { - BinaryRow newRow = kvRecord.getRow(); - walBuilder.append(RowKind.INSERT, newRow); - appendedRecordCount += 1; - kvPreWriteBuffer.put(key, ValueEncoder.encodeValue(schemaId, newRow), logOffset++); - } - - protected void update(KvPreWriteBuffer.Key key, BinaryRow oldRow, BinaryRow newRow) - throws Exception { - walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); - walBuilder.append(RowKind.UPDATE_AFTER, newRow); - appendedRecordCount += 2; - // logOffset is for -U, logOffset + 1 is for +U, we need to use - // the log offset for +U - kvPreWriteBuffer.put(key, ValueEncoder.encodeValue(schemaId, newRow), logOffset + 1); - logOffset += 2; - } - - // get from kv pre-write buffer first, if can't find, get from rocksdb - protected byte[] getFromBufferOrKv(KvPreWriteBuffer.Key key) throws IOException { - KvPreWriteBuffer.Value value = kvPreWriteBuffer.get(key); - if (value == null) { - return rocksDBKv.get(key.get()); - } - return value.get(); - } - - protected @Nullable BinaryRow deleteRow( - InternalRow oldRow, @Nullable PartialUpdater partialUpdater) { - if (partialUpdater == null) { - return null; - } - return partialUpdater.deleteRow(oldRow); - } +/** + * The row merge engine for primary key table. + * + * @since 0.6 + */ +public interface RowMergeEngine { + BinaryRow merge(BinaryRow oldRow, BinaryRow newRow); - protected BinaryRow updateRow( - BinaryRow oldRow, BinaryRow updateRow, @Nullable PartialUpdater partialUpdater) { - // if is not partial update, return the update row - if (partialUpdater == null) { - return updateRow; - } - // otherwise, do partial update - return partialUpdater.updateRow(oldRow, updateRow); + default boolean shouldSkipDeletion(BinaryRow newRow) { + return false; } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java index 13b89cd99..5c29dc8ee 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.exception.FlussRuntimeException; @@ -23,11 +22,6 @@ import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.TimestampLtz; import com.alibaba.fluss.row.TimestampNtz; -import com.alibaba.fluss.row.encode.ValueDecoder; -import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; -import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; -import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; -import com.alibaba.fluss.server.kv.wal.WalBuilder; import com.alibaba.fluss.types.BigIntType; import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.IntType; @@ -36,66 +30,38 @@ import com.alibaba.fluss.types.TimeType; import com.alibaba.fluss.types.TimestampType; -/** A wrapper for version merge engine. */ -public class VersionRowMergeEngine extends RowMergeEngine { +/** + * The version row merge engine for primary key table. The update will only occur if the new value + * of the specified version field is greater than the old value. + * + * @since 0.6 + */ +public class VersionRowMergeEngine implements RowMergeEngine { private final MergeEngine mergeEngine; - private final Schema schema; private final InternalRow.FieldGetter[] currentFieldGetters; + private final RowType rowType; - public VersionRowMergeEngine( - PartialUpdater partialUpdater, - ValueDecoder valueDecoder, - WalBuilder walBuilder, - KvPreWriteBuffer kvPreWriteBuffer, - RocksDBKv rocksDBKv, - Schema schema, - short schemaId, - int appendedRecordCount, - long logOffset, - MergeEngine mergeEngine) { - super( - partialUpdater, - valueDecoder, - walBuilder, - kvPreWriteBuffer, - rocksDBKv, - schema, - schemaId, - appendedRecordCount, - logOffset); + public VersionRowMergeEngine(Schema schema, MergeEngine mergeEngine) { this.mergeEngine = mergeEngine; - this.schema = schema; - RowType currentRowType = schema.toRowType(); - this.currentFieldGetters = new InternalRow.FieldGetter[currentRowType.getFieldCount()]; - for (int i = 0; i < currentRowType.getFieldCount(); i++) { - currentFieldGetters[i] = InternalRow.createFieldGetter(currentRowType.getTypeAt(i), i); + this.rowType = schema.toRowType(); + this.currentFieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + currentFieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i); } } @Override - protected void update(KvPreWriteBuffer.Key key, BinaryRow oldRow, BinaryRow newRow) - throws Exception { - RowType rowType = schema.toRowType(); - if (checkVersionMergeEngine(rowType, oldRow, newRow)) { - return; - } - super.update(key, oldRow, newRow); + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + return checkNewRowVersion(rowType, oldRow, newRow) ? newRow : null; } - private boolean checkVersionMergeEngine(RowType rowType, BinaryRow oldRow, BinaryRow newRow) { - if (!checkNewRowVersion(mergeEngine, rowType, oldRow, newRow)) { - // When the specified field version is less - // than the version number of the old - // record, do not update - return true; - } - return false; + @Override + public boolean shouldSkipDeletion(BinaryRow newRow) { + return true; } - // Check row version. - private boolean checkNewRowVersion( - MergeEngine mergeEngine, RowType rowType, BinaryRow oldRow, BinaryRow newRow) { + private boolean checkNewRowVersion(RowType rowType, BinaryRow oldRow, BinaryRow newRow) { int fieldIndex = rowType.getFieldIndex(mergeEngine.getColumn()); if (fieldIndex == -1) { throw new IllegalArgumentException( @@ -123,9 +89,9 @@ private boolean checkNewRowVersion( if (dataType instanceof BigIntType) { return (Long) newValue > (Long) oldValue; - } else if (dataType instanceof IntType) { + } else if (dataType instanceof IntType || dataType instanceof TimeType) { return (Integer) newValue > (Integer) oldValue; - } else if (dataType instanceof TimestampType || dataType instanceof TimeType) { + } else if (dataType instanceof TimestampType) { return ((TimestampNtz) newValue).getMillisecond() > ((TimestampNtz) oldValue).getMillisecond(); } else if (dataType instanceof LocalZonedTimestampType) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 027081952..e6f99204c 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -619,7 +619,7 @@ void testVersionRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) Map config = new HashMap<>(); config.put("table.merge-engine", "version"); config.put("table.merge-engine.version.column", "b"); - MergeEngine mergeEngine = MergeEngine.create(config, DATA3_SCHEMA_PK.toRowType()); + MergeEngine mergeEngine = MergeEngine.create(config); KvTablet kvTablet = createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, mergeEngine);