Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 8, 2025
1 parent ef2eada commit d037237
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 453 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,6 @@ void testFirstRowMergeEngine() throws Exception {
.getRow();
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id));
}
// check scan change log
LogScanner logScanner = table.getLogScanner(new LogScan());
logScanner.subscribeFromBeginning(0);
List<ScanRecord> actualLogRecords = new ArrayList<>(0);
Expand Down Expand Up @@ -946,55 +945,54 @@ void testMergeEngineWithVersion() throws Exception {
try (Table table = conn.getTable(DATA3_TABLE_PATH_PK)) {
// put rows.
UpsertWriter upsertWriter = table.getUpsertWriter();
List<InternalRow> expectedRows = new ArrayList<>(rows);
List<ScanRecord> expectedScanRecords = new ArrayList<>(rows);
// init rows.
for (int row = 0; row < rows; row++) {
upsertWriter.upsert(compactedRow(rowType, new Object[] {row, 1000L}));
expectedRows.add(compactedRow(rowType, new Object[] {row, 1000L}));
expectedScanRecords.add(
new ScanRecord(compactedRow(rowType, new Object[] {row, 1000L})));
}
// update row if id=0 and version < 1000L, will not update
upsertWriter.upsert(compactedRow(rowType, new Object[] {0, 999L}));

// update if version> 1000L
upsertWriter.upsert(compactedRow(rowType, new Object[] {1, 1001L}));
// update_before record, don't care about offset/timestamp
expectedScanRecords.add(
new ScanRecord(
-1,
-1,
RowKind.UPDATE_BEFORE,
compactedRow(rowType, new Object[] {1, 1000L})));
// update_after record
expectedScanRecords.add(
new ScanRecord(
-1,
-1,
RowKind.UPDATE_AFTER,
compactedRow(rowType, new Object[] {1, 1001L})));
rows = rows + 2;

upsertWriter.flush();

// check scan change log
LogScanner logScanner = table.getLogScanner(new LogScan());
logScanner.subscribeFromBeginning(0);

List<ScanRecord> actualLogRecords = new ArrayList<>(0);
List<ScanRecord> actualLogRecords = new ArrayList<>(rows);
while (actualLogRecords.size() < rows) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
scanRecords.forEach(actualLogRecords::add);
}

assertThat(actualLogRecords).hasSize(rows);
for (int i = 0; i < 3; i++) {
ScanRecord scanRecord = actualLogRecords.get(i);
assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT);
assertThatRow(scanRecord.getRow())
for (int i = 0; i < rows; i++) {
ScanRecord actualScanRecord = actualLogRecords.get(i);
ScanRecord expectedRecord = expectedScanRecords.get(i);
assertThat(actualScanRecord.getRowKind()).isEqualTo(expectedRecord.getRowKind());
assertThatRow(actualScanRecord.getRow())
.withSchema(rowType)
.isEqualTo(expectedRows.get(i));
.isEqualTo(expectedRecord.getRow());
}

// update_before for id =1
List<ScanRecord> updateActualLogRecords = new ArrayList<>(actualLogRecords);

ScanRecord beforeRecord = updateActualLogRecords.get(3);
assertThat(beforeRecord.getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE);
assertThat(beforeRecord.getRow().getFieldCount()).isEqualTo(2);
assertThat(beforeRecord.getRow().getInt(0)).isEqualTo(1);
assertThat(beforeRecord.getRow().getLong(1)).isEqualTo(1000);

// update_after for id =1
ScanRecord afterRecord = updateActualLogRecords.get(4);
assertThat(afterRecord.getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
assertThat(afterRecord.getRow().getFieldCount()).isEqualTo(2);
assertThat(afterRecord.getRow().getInt(0)).isEqualTo(1);
assertThat(afterRecord.getRow().getLong(1)).isEqualTo(1001);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,37 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets;
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet;
import com.alibaba.fluss.types.BigIntType;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.IntType;
import com.alibaba.fluss.types.LocalZonedTimestampType;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.types.TimeType;
import com.alibaba.fluss.types.TimestampType;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Objects;
import java.util.Set;

/** The merge engine for primary key table. */
/**
* The merge engine for primary key table.
*
* @since 0.6
*/
public class MergeEngine {

public static final Set<String> VERSION_SUPPORTED_DATA_TYPES =
Sets.newHashSet(
ImmutableSet.of(
BigIntType.class.getName(),
IntType.class.getName(),
TimestampType.class.getName(),
TimeType.class.getName(),
LocalZonedTimestampType.class.getName());
private final Type type;
private final String column;

/** When merge engine type is version, column cannot be null. */
@Nullable private final String column;

private MergeEngine(Type type) {
this(type, null);
Expand All @@ -54,46 +60,24 @@ private MergeEngine(Type type, String column) {
}

public static MergeEngine create(Map<String, String> properties) {
return create(properties, null);
}

public static MergeEngine create(Map<String, String> properties, RowType rowType) {
return create(Configuration.fromMap(properties), rowType);
return create(Configuration.fromMap(properties));
}

public static MergeEngine create(Configuration options, RowType rowType) {
if (options == null) {
return null;
}
private static MergeEngine create(Configuration options) {
MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE);
if (type == null) {
return null;
}

switch (type) {
case FIRST_ROW:
return new MergeEngine(Type.FIRST_ROW);
case VERSION:
String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
if (column == null) {
throw new IllegalArgumentException(
"When the merge engine is set to version, the 'table.merge-engine.version.column' cannot be empty.");
}
if (rowType != null) {
int fieldIndex = rowType.getFieldIndex(column);
if (fieldIndex == -1) {
throw new IllegalArgumentException(
String.format(
"When the merge engine is set to version, the column %s does not exist.",
column));
}
DataType dataType = rowType.getTypeAt(fieldIndex);
if (!VERSION_SUPPORTED_DATA_TYPES.contains(dataType.getClass().getName())) {
throw new IllegalArgumentException(
String.format(
"The merge engine column is not support type %s .",
dataType.asSummaryString()));
}
String.format(
"When the merge engine is set to version, the '%s' must be set.",
ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key()));
}
return new MergeEngine(Type.VERSION, column);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public boolean isDataLakeEnabled() {
}

public @Nullable MergeEngine getMergeEngine() {
return MergeEngine.create(configuration(), schema.toRowType());
return MergeEngine.create(properties);
}

public TableDescriptor copy(Map<String, String> newProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
import com.alibaba.fluss.connector.flink.source.FlinkTableSource;
import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.TablePath;

Expand Down Expand Up @@ -152,8 +151,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode,
MergeEngine.create(
helper.getOptions().toMap(), FlinkConversions.toFlussRowType(rowType)));
MergeEngine.create(helper.getOptions().toMap()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public FlinkTableSink(
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming,
MergeEngine mergeEngine) {
@Nullable MergeEngine mergeEngine) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,18 +861,6 @@ void testVersionMergeEngineWithTypeBigint() throws Exception {
expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]");
assertResultsIgnoreOrder(rowIter, expectedRows, false);

// test update a=4
tBatchEnv
.executeSql("UPDATE merge_engine_with_version SET b = 'v45', ts=1001 WHERE a = 4")
.await();
expectedRows = Arrays.asList("-U[4, v44, 1000]", "+U[4, v45, 1001]");
assertResultsIgnoreOrder(rowIter, expectedRows, false);

// test delete a=4
tBatchEnv.executeSql("delete from merge_engine_with_version WHERE a = 4").await();
expectedRows = Arrays.asList("-D[4, v45, 1001]");
assertResultsIgnoreOrder(rowIter, expectedRows, false);

insertJobClient.cancel().get();
}

Expand Down
Loading

0 comments on commit d037237

Please sign in to comment.