Skip to content

Commit

Permalink
Introduce version merge engine for primary key table
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiaojian Sun authored and sunxiaojian committed Jan 6, 2025
1 parent 92c6d23 commit 023351d
Show file tree
Hide file tree
Showing 15 changed files with 923 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO_PK;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK;
import static com.alibaba.fluss.record.TestData.DATA3_TABLE_PATH_PK;
import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals;
import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow;
import static com.alibaba.fluss.testutils.DataTestUtils.keyRow;
Expand Down Expand Up @@ -884,7 +886,7 @@ void testFirstRowMergeEngine() throws Exception {
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA1_SCHEMA_PK)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.FIRST_ROW)
.build();
RowType rowType = DATA1_SCHEMA_PK.toRowType();
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
Expand All @@ -901,7 +903,6 @@ void testFirstRowMergeEngine() throws Exception {
expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"}));
}
upsertWriter.flush();

// now, get rows by lookup
for (int id = 0; id < rows; id++) {
InternalRow gotRow =
Expand All @@ -910,6 +911,55 @@ void testFirstRowMergeEngine() throws Exception {
.getRow();
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id));
}
// check scan change log
LogScanner logScanner = table.getLogScanner(new LogScan());
logScanner.subscribeFromBeginning(0);
List<ScanRecord> actualLogRecords = new ArrayList<>(0);
while (actualLogRecords.size() < rows) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
scanRecords.forEach(actualLogRecords::add);
}
assertThat(actualLogRecords).hasSize(rows);
for (int i = 0; i < actualLogRecords.size(); i++) {
ScanRecord scanRecord = actualLogRecords.get(i);
assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT);
assertThatRow(scanRecord.getRow())
.withSchema(rowType)
.isEqualTo(expectedRows.get(i));
}
}
}

@Test
void testMergeEngineWithVersion() throws Exception {
// Create table.
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA3_SCHEMA_PK)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.VERSION)
.property(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN, "b")
.build();
RowType rowType = DATA3_SCHEMA_PK.toRowType();
createTable(DATA3_TABLE_PATH_PK, tableDescriptor, false);

int rows = 3;
try (Table table = conn.getTable(DATA3_TABLE_PATH_PK)) {
// put rows.
UpsertWriter upsertWriter = table.getUpsertWriter();
List<InternalRow> expectedRows = new ArrayList<>(rows);
// init rows.
for (int row = 0; row < rows; row++) {
upsertWriter.upsert(compactedRow(rowType, new Object[] {row, 1000L}));
expectedRows.add(compactedRow(rowType, new Object[] {row, 1000L}));
}
// update row if id=0 and version < 1000L, will not update
upsertWriter.upsert(compactedRow(rowType, new Object[] {0, 999L}));

// update if version> 1000L
upsertWriter.upsert(compactedRow(rowType, new Object[] {1, 1001L}));
rows = rows + 2;

upsertWriter.flush();

// check scan change log
LogScanner logScanner = table.getLogScanner(new LogScan());
Expand All @@ -922,13 +972,29 @@ void testFirstRowMergeEngine() throws Exception {
}

assertThat(actualLogRecords).hasSize(rows);
for (int i = 0; i < actualLogRecords.size(); i++) {
for (int i = 0; i < 3; i++) {
ScanRecord scanRecord = actualLogRecords.get(i);
assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT);
assertThatRow(scanRecord.getRow())
.withSchema(rowType)
.isEqualTo(expectedRows.get(i));
}

// update_before for id =1
List<ScanRecord> updateActualLogRecords = new ArrayList<>(actualLogRecords);

ScanRecord beforeRecord = updateActualLogRecords.get(3);
assertThat(beforeRecord.getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE);
assertThat(beforeRecord.getRow().getFieldCount()).isEqualTo(2);
assertThat(beforeRecord.getRow().getInt(0)).isEqualTo(1);
assertThat(beforeRecord.getRow().getLong(1)).isEqualTo(1000);

// update_after for id =1
ScanRecord afterRecord = updateActualLogRecords.get(4);
assertThat(afterRecord.getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
assertThat(afterRecord.getRow().getFieldCount()).isEqualTo(2);
assertThat(afterRecord.getRow().getInt(0)).isEqualTo(1);
assertThat(afterRecord.getRow().getLong(1)).isEqualTo(1001);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -969,12 +969,18 @@ public class ConfigOptions {
+ "When this option is set to ture and the datalake tiering service is up,"
+ " the table will be tiered and compacted into datalake format stored on lakehouse storage.");

public static final ConfigOption<MergeEngine> TABLE_MERGE_ENGINE =
public static final ConfigOption<MergeEngine.Type> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngine.class)
.enumType(MergeEngine.Type.class)
.noDefaultValue()
.withDescription("The merge engine for the primary key table.");

public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
key("table.merge-engine.version.column")
.stringType()
.noDefaultValue()
.withDescription("The merge engine version column for the primary key table.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
Expand Down
141 changes: 121 additions & 20 deletions fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* http://www.apache.org/licenses/LICENSE-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,22 +16,125 @@

package com.alibaba.fluss.metadata;

/**
* The merge engine for primary key table.
*
* @since 0.6
*/
public enum MergeEngine {
FIRST_ROW("first_row");
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets;
import com.alibaba.fluss.types.BigIntType;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.IntType;
import com.alibaba.fluss.types.LocalZonedTimestampType;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.types.TimeType;
import com.alibaba.fluss.types.TimestampType;

import java.util.Map;
import java.util.Objects;
import java.util.Set;

private final String value;
/** The merge engine for primary key table. */
public class MergeEngine {

public static final Set<String> VERSION_SUPPORTED_DATA_TYPES =
Sets.newHashSet(
BigIntType.class.getName(),
IntType.class.getName(),
TimestampType.class.getName(),
TimeType.class.getName(),
LocalZonedTimestampType.class.getName());
private final Type type;
private final String column;

private MergeEngine(Type type) {
this(type, null);
}

private MergeEngine(Type type, String column) {
this.type = type;
this.column = column;
}

public static MergeEngine create(Map<String, String> properties) {
return create(properties, null);
}

public static MergeEngine create(Map<String, String> properties, RowType rowType) {
return create(Configuration.fromMap(properties), rowType);
}

MergeEngine(String value) {
this.value = value;
public static MergeEngine create(Configuration options, RowType rowType) {
if (options == null) {
return null;
}
MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE);
if (type == null) {
return null;
}

switch (type) {
case FIRST_ROW:
return new MergeEngine(Type.FIRST_ROW);
case VERSION:
String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
if (column == null) {
throw new IllegalArgumentException(
"When the merge engine is set to version, the 'table.merge-engine.version.column' cannot be empty.");
}
if (rowType != null) {
int fieldIndex = rowType.getFieldIndex(column);
if (fieldIndex == -1) {
throw new IllegalArgumentException(
String.format(
"When the merge engine is set to version, the column %s does not exist.",
column));
}
DataType dataType = rowType.getTypeAt(fieldIndex);
if (!VERSION_SUPPORTED_DATA_TYPES.contains(dataType.getClass().getName())) {
throw new IllegalArgumentException(
String.format(
"The merge engine column is not support type %s .",
dataType.asSummaryString()));
}
}
return new MergeEngine(Type.VERSION, column);
default:
throw new UnsupportedOperationException("Unsupported merge engine: " + type);
}
}

public Type getType() {
return type;
}

public String getColumn() {
return column;
}

public enum Type {
FIRST_ROW("first_row"),
VERSION("version");
private final String value;

Type(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
MergeEngine that = (MergeEngine) o;
return type == that.type && Objects.equals(column, that.column);
}

@Override
public String toString() {
return value;
public int hashCode() {
return Objects.hash(type, column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public boolean isDataLakeEnabled() {
}

public @Nullable MergeEngine getMergeEngine() {
return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE);
return MergeEngine.create(configuration(), schema.toRowType());
}

public TableDescriptor copy(Map<String, String> newProperties) {
Expand Down
14 changes: 14 additions & 0 deletions fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,18 @@ public final class TestData {
TableDescriptor.builder().schema(DATA2_SCHEMA).distributedBy(3, "a").build(),
1);
// -------------------------------- data2 info end ------------------------------------

// ------------------- data3 and related table info begin ----------------------
public static final Schema DATA3_SCHEMA_PK =
Schema.newBuilder()
.column("a", DataTypes.INT())
.withComment("a is first column")
.column("b", DataTypes.BIGINT())
.withComment("b is second column")
.primaryKey("a")
.build();
public static final TablePath DATA3_TABLE_PATH_PK =
TablePath.of("test_db_3", "test_pk_table_3");
// ---------------------------- data3 table info end ------------------------------

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
import com.alibaba.fluss.connector.flink.source.FlinkTableSource;
import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.TablePath;

import org.apache.flink.api.common.RuntimeExecutionMode;
Expand Down Expand Up @@ -129,7 +131,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
cache,
partitionDiscoveryIntervalMs,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
MergeEngine.create(helper.getOptions().toMap()));
}

@Override
Expand All @@ -150,7 +152,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
MergeEngine.create(
helper.getOptions().toMap(), FlinkConversions.toFlussRowType(rowType)));
}

@Override
Expand Down
Loading

0 comments on commit 023351d

Please sign in to comment.