Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kv] Support version merge engine #277

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO_PK;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK;
import static com.alibaba.fluss.record.TestData.DATA3_TABLE_PATH_PK;
import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals;
import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow;
import static com.alibaba.fluss.testutils.DataTestUtils.keyRow;
Expand Down Expand Up @@ -885,7 +887,7 @@ void testFirstRowMergeEngine() throws Exception {
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA1_SCHEMA_PK)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.FIRST_ROW)
.build();
RowType rowType = DATA1_SCHEMA_PK.toRowType();
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
Expand All @@ -902,7 +904,6 @@ void testFirstRowMergeEngine() throws Exception {
expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"}));
}
upsertWriter.flush();

// now, get rows by lookup
for (int id = 0; id < rows; id++) {
InternalRow gotRow =
Expand All @@ -911,17 +912,13 @@ void testFirstRowMergeEngine() throws Exception {
.getRow();
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id));
}

// check scan change log
LogScanner logScanner = table.getLogScanner(new LogScan());
logScanner.subscribeFromBeginning(0);

List<ScanRecord> actualLogRecords = new ArrayList<>(0);
while (actualLogRecords.size() < rows) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
scanRecords.forEach(actualLogRecords::add);
}

assertThat(actualLogRecords).hasSize(rows);
for (int i = 0; i < actualLogRecords.size(); i++) {
ScanRecord scanRecord = actualLogRecords.get(i);
Expand Down Expand Up @@ -1014,4 +1011,71 @@ void testArrowCompressionAndProject(String compression, String level) throws Exc
logScanner.close();
}
}

@Test
void testMergeEngineWithVersion() throws Exception {
// Create table.
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA3_SCHEMA_PK)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.VERSION)
.property(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN, "b")
.build();
RowType rowType = DATA3_SCHEMA_PK.toRowType();
createTable(DATA3_TABLE_PATH_PK, tableDescriptor, false);

int rows = 3;
try (Table table = conn.getTable(DATA3_TABLE_PATH_PK)) {
luoyuxia marked this conversation as resolved.
Show resolved Hide resolved
// put rows.
UpsertWriter upsertWriter = table.getUpsertWriter();
List<ScanRecord> expectedScanRecords = new ArrayList<>(rows);
// init rows.
for (int row = 0; row < rows; row++) {
upsertWriter.upsert(compactedRow(rowType, new Object[] {row, 1000L}));
expectedScanRecords.add(
new ScanRecord(compactedRow(rowType, new Object[] {row, 1000L})));
}
// update row if id=0 and version < 1000L, will not update
upsertWriter.upsert(compactedRow(rowType, new Object[] {0, 999L}));

// update if version> 1000L
upsertWriter.upsert(compactedRow(rowType, new Object[] {1, 1001L}));
// update_before record, don't care about offset/timestamp
expectedScanRecords.add(
new ScanRecord(
-1,
-1,
RowKind.UPDATE_BEFORE,
compactedRow(rowType, new Object[] {1, 1000L})));
// update_after record
expectedScanRecords.add(
new ScanRecord(
-1,
-1,
RowKind.UPDATE_AFTER,
compactedRow(rowType, new Object[] {1, 1001L})));
rows = rows + 2;

upsertWriter.flush();

LogScanner logScanner = table.getLogScanner(new LogScan());
logScanner.subscribeFromBeginning(0);

List<ScanRecord> actualLogRecords = new ArrayList<>(rows);
while (actualLogRecords.size() < rows) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
scanRecords.forEach(actualLogRecords::add);
}

assertThat(actualLogRecords).hasSize(rows);
for (int i = 0; i < rows; i++) {
ScanRecord actualScanRecord = actualLogRecords.get(i);
ScanRecord expectedRecord = expectedScanRecords.get(i);
assertThat(actualScanRecord.getRowKind()).isEqualTo(expectedRecord.getRowKind());
assertThatRow(actualScanRecord.getRow())
.withSchema(rowType)
.isEqualTo(expectedRecord.getRow());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1004,12 +1004,18 @@ public class ConfigOptions {
+ "When this option is set to ture and the datalake tiering service is up,"
+ " the table will be tiered and compacted into datalake format stored on lakehouse storage.");

public static final ConfigOption<MergeEngine> TABLE_MERGE_ENGINE =
public static final ConfigOption<MergeEngine.Type> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngine.class)
.enumType(MergeEngine.Type.class)
.noDefaultValue()
.withDescription("The merge engine for the primary key table.");

public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
key("table.merge-engine.version.column")
.stringType()
.noDefaultValue()
.withDescription("The merge engine version column for the primary key table.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
Expand Down
101 changes: 86 additions & 15 deletions fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* http://www.apache.org/licenses/LICENSE-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,22 +16,95 @@

package com.alibaba.fluss.metadata;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Objects;

/**
* The merge engine for primary key table.
*
* @since 0.6
*/
public enum MergeEngine {
FIRST_ROW("first_row");
public class MergeEngine {
sunxiaojian marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unnecessary to combine merge engine type and version column into a structure. First of all, the column member is very confusing and doesn't apply to first_row and aggregate merge engines. Secondly, you have introduced a runtime RowMergeEngine which already combines the merge engine type and version column into one class during runtime, so this class is necessary. I would prefer to keep it as usual.

Maybe we can rename it to MergeEngineType to be more specific.

/**
 * The merge engine for the primary key table.
 *
 * <p>A primary key table with a merge engine is a special kind of table, called "merge table". Fluss
 * provides 3 kinds of table: "primary key table", "log table", and "merge table". Merge table is a
 * primary key table that has a primary key definition but doesn't directly UPDATE and DELETE rows in
 * the table, and instead, it merges the append rows into a new data set according to the defined
 * {@link MergeEngineType}. Therefore, it doesn't support direct UPDATE (also partial-update) and
 * DELETE operations and only supports INSERT or APPEND operations.
 *
 * <p>Note: A primary key table doesn't have a merge engine by default.
 *
 * @since 0.6
 */
public enum MergeEngineType {
    FIRST_ROW("first_row"),
    VERSION("version");
    // TODO: Support AGGREGATE("aggregate") in the future (#212)

    private final String value;

    MergeEngineType(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return value;
    }
}


private final Type type;

/** When merge engine type is version, column cannot be null. */
@Nullable private final String column;

private MergeEngine(Type type) {
this(type, null);
}

private MergeEngine(Type type, String column) {
this.type = type;
this.column = column;
}

public static MergeEngine create(Map<String, String> properties) {
return create(Configuration.fromMap(properties));
}

private static MergeEngine create(Configuration options) {
MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE);
if (type == null) {
return null;
}
switch (type) {
case FIRST_ROW:
return new MergeEngine(Type.FIRST_ROW);
case VERSION:
String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
if (column == null) {
throw new IllegalArgumentException(
luoyuxia marked this conversation as resolved.
Show resolved Hide resolved
String.format(
"When the merge engine is set to version, the option '%s' must be set.",
ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key()));
}
return new MergeEngine(Type.VERSION, column);
default:
throw new UnsupportedOperationException("Unsupported merge engine: " + type);
}
}

private final String value;
public Type getType() {
return type;
}

public String getColumn() {
return column;
}

MergeEngine(String value) {
this.value = value;
public enum Type {
FIRST_ROW("first_row"),
VERSION("version");
private final String value;

Type(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
MergeEngine that = (MergeEngine) o;
return type == that.type && Objects.equals(column, that.column);
}

@Override
public String toString() {
return value;
public int hashCode() {
return Objects.hash(type, column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public boolean isDataLakeEnabled() {
}

public @Nullable MergeEngine getMergeEngine() {
return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE);
return MergeEngine.create(properties);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean in here to check the data type of version column of version merge engine..

}

/** Gets the Arrow compression type and compression level of the table. */
Expand Down
14 changes: 14 additions & 0 deletions fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,18 @@ public final class TestData {
System.currentTimeMillis(),
System.currentTimeMillis());
// -------------------------------- data2 info end ------------------------------------

// ------------------- data3 and related table info begin ----------------------
public static final Schema DATA3_SCHEMA_PK =
Schema.newBuilder()
.column("a", DataTypes.INT())
.withComment("a is first column")
.column("b", DataTypes.BIGINT())
.withComment("b is second column")
.primaryKey("a")
.build();
public static final TablePath DATA3_TABLE_PATH_PK =
TablePath.of("test_db_3", "test_pk_table_3");
// ---------------------------- data3 table info end ------------------------------

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
import com.alibaba.fluss.connector.flink.source.FlinkTableSource;
import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.TablePath;

import org.apache.flink.api.common.RuntimeExecutionMode;
Expand Down Expand Up @@ -129,7 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
cache,
partitionDiscoveryIntervalMs,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
MergeEngine.create(helper.getOptions().toMap()));
}

@Override
Expand All @@ -150,7 +151,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
MergeEngine.create(helper.getOptions().toMap()),
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
// is 0, when no column specified, it's not partial update
// see FLINK-36000
&& context.getTargetColumns().get().length != 0) {

// is partial update, check whether partial update is supported or not
if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
if (primaryKeyIndexes.length == 0) {
throw new ValidationException(
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");
} else if (mergeEngine == MergeEngine.FIRST_ROW) {
throw new ValidationException(
String.format(
"Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.",
tablePath, MergeEngine.FIRST_ROW));
}
if (mergeEngine != null) {
if (mergeEngine.getType() == MergeEngine.Type.FIRST_ROW
|| mergeEngine.getType() == MergeEngine.Type.VERSION) {
Comment on lines +134 to +135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the merge engines don't support partial update, don't need to enumerate all the merge engines.

throw new ValidationException(
String.format(
"Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.",
tablePath, mergeEngine.getType()));
}
}
}
int[][] targetColumns = context.getTargetColumns().get();
Expand Down Expand Up @@ -311,12 +314,14 @@ private void validateUpdatableAndDeletable() {
"Table %s is a Log Table. Log Table doesn't support DELETE and UPDATE statements.",
tablePath));
}

if (mergeEngine == MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
String.format(
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
tablePath, MergeEngine.FIRST_ROW));
if (mergeEngine != null) {
if (mergeEngine.getType() == MergeEngine.Type.FIRST_ROW
|| mergeEngine.getType() == MergeEngine.Type.VERSION) {
Comment on lines +318 to +319
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the merge engines don't support delete and update, don't need to enumerate all the merge engines.

throw new UnsupportedOperationException(
String.format(
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
tablePath, mergeEngine.getType()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public ChangelogMode getChangelogMode() {
} else {
if (hasPrimaryKey()) {
// pk table
if (mergeEngine == MergeEngine.FIRST_ROW) {
if (mergeEngine != null && mergeEngine.getType() == MergeEngine.Type.FIRST_ROW) {
return ChangelogMode.insertOnly();
} else {
return ChangelogMode.all();
Expand Down
Loading