From d879c3e7f0d688e069dcde0b74c5c03a87565e71 Mon Sep 17 00:00:00 2001 From: Youngwb Date: Sun, 29 Dec 2024 13:12:47 +0800 Subject: [PATCH] [BugFix] cache delta lake snapshot instead of table object Signed-off-by: Youngwb --- .../delta/CachingDeltaLakeMetastore.java | 33 ++++++++-- .../connector/delta/DeltaLakeMetastore.java | 35 ++++++++++- .../connector/delta/DeltaLakeSnapshot.java | 60 +++++++++++++++++++ .../starrocks/connector/delta/DeltaUtils.java | 38 ++++-------- .../connector/delta/IDeltaLakeMetastore.java | 4 ++ .../connector/delta/DeltaUtilsTest.java | 26 ++------ .../test_deltalake/R/test_deltalake_catalog | 6 ++ .../test_deltalake/T/test_deltalake_catalog | 3 + 8 files changed, 150 insertions(+), 55 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeSnapshot.java diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java index fc466b507be8be..2f6ab3ac123353 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java @@ -14,7 +14,8 @@ package com.starrocks.connector.delta; - +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Maps; import com.starrocks.catalog.Database; import com.starrocks.catalog.Table; @@ -40,6 +41,7 @@ import java.util.concurrent.Executor; import java.util.stream.Collectors; +import static com.google.common.cache.CacheLoader.asyncReloading; import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; public class CachingDeltaLakeMetastore extends CachingMetastore implements IDeltaLakeMetastore { @@ -48,12 +50,16 @@ public class CachingDeltaLakeMetastore extends CachingMetastore implements IDelt public final IDeltaLakeMetastore delegate; private final Map lastAccessTimeMap; + protected LoadingCache tableSnapshotCache; public CachingDeltaLakeMetastore(IDeltaLakeMetastore metastore, Executor executor, long expireAfterWriteSec, long refreshIntervalSec, long maxSize) { super(executor, expireAfterWriteSec, refreshIntervalSec, maxSize); this.delegate = metastore; this.lastAccessTimeMap = Maps.newConcurrentMap(); + tableSnapshotCache = newCacheBuilder(expireAfterWriteSec, refreshIntervalSec, maxSize) + .build(asyncReloading(CacheLoader.from(dbTableName -> + getLatestSnapshot(dbTableName.getDatabaseName(), dbTableName.getTableName())), executor)); } public static CachingDeltaLakeMetastore createQueryLevelInstance(IDeltaLakeMetastore metastore, long perQueryCacheMaxSize) { @@ -66,11 +72,16 @@ public static CachingDeltaLakeMetastore createQueryLevelInstance(IDeltaLakeMetas } public static CachingDeltaLakeMetastore createCatalogLevelInstance(IDeltaLakeMetastore metastore, Executor executor, - long expireAfterWrite, long refreshInterval, - long maxSize) { + long expireAfterWrite, long refreshInterval, + long maxSize) { return new CachingDeltaLakeMetastore(metastore, executor, expireAfterWrite, refreshInterval, maxSize); } + @Override + public String getCatalogName() { + return delegate.getCatalogName(); + } + @Override public List getAllDatabaseNames() { return get(databaseNamesCache, ""); @@ -106,6 +117,19 @@ public Table loadTable(DatabaseTableName databaseTableName) { return delegate.getTable(databaseTableName.getDatabaseName(), databaseTableName.getTableName()); } + public DeltaLakeSnapshot getCachedSnapshot(DatabaseTableName databaseTableName) { + return get(tableSnapshotCache, databaseTableName); + } + + @Override + public DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName) { + if (delegate instanceof CachingDeltaLakeMetastore) { + return ((CachingDeltaLakeMetastore) delegate).getCachedSnapshot(DatabaseTableName.of(dbName, tableName)); + } else { + return delegate.getLatestSnapshot(dbName, tableName); + } + } + @Override public MetastoreTable getMetastoreTable(String dbName, String tableName) { return delegate.getMetastoreTable(dbName, tableName); @@ -117,7 +141,8 @@ public Table getTable(String dbName, String tableName) { DatabaseTableName databaseTableName = DatabaseTableName.of(dbName, tableName); lastAccessTimeMap.put(databaseTableName, System.currentTimeMillis()); } - return get(tableCache, DatabaseTableName.of(dbName, tableName)); + DeltaLakeSnapshot snapshot = getCachedSnapshot(DatabaseTableName.of(dbName, tableName)); + return DeltaUtils.convertDeltaSnapshotToSRTable(getCatalogName(), snapshot); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java index 62424908704959..b720ba99e66821 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java @@ -22,16 +22,22 @@ import com.starrocks.catalog.Database; import com.starrocks.catalog.DeltaLakeTable; import com.starrocks.common.Pair; +import com.starrocks.common.profile.Timer; +import com.starrocks.common.profile.Tracers; import com.starrocks.connector.exception.StarRocksConnectorException; import com.starrocks.connector.metastore.IMetastore; import com.starrocks.connector.metastore.MetastoreTable; +import com.starrocks.sql.analyzer.SemanticException; import io.delta.kernel.Scan; import io.delta.kernel.ScanBuilder; +import io.delta.kernel.Table; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import org.apache.hadoop.conf.Configuration; @@ -46,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static com.starrocks.common.profile.Tracers.Module.EXTERNAL; import static com.starrocks.connector.PartitionUtil.toHivePartitionName; public abstract class DeltaLakeMetastore implements IDeltaLakeMetastore { @@ -96,6 +103,11 @@ public List load(@NotNull DeltaLakeFileStatus fileStatus) throws IOExc }); } + @Override + public String getCatalogName() { + return catalogName; + } + @Override public List getAllDatabaseNames() { return delegate.getAllDatabaseNames(); @@ -112,7 +124,7 @@ public Database getDb(String dbName) { } @Override - public DeltaLakeTable getTable(String dbName, String tableName) { + public DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName) { MetastoreTable metastoreTable = getMetastoreTable(dbName, tableName); if (metastoreTable == null) { LOG.error("get metastore table failed. dbName: {}, tableName: {}", dbName, tableName); @@ -121,9 +133,26 @@ public DeltaLakeTable getTable(String dbName, String tableName) { String path = metastoreTable.getTableLocation(); long createTime = metastoreTable.getCreateTime(); + DeltaLakeEngine deltaLakeEngine = DeltaLakeEngine.create(hdfsConfiguration, properties, checkpointCache, jsonCache); + SnapshotImpl snapshot; + + try (Timer ignored = Tracers.watchScope(EXTERNAL, "DeltaLake.getSnapshot")) { + Table deltaTable = Table.forPath(deltaLakeEngine, path); + snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaLakeEngine); + } catch (TableNotFoundException e) { + LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalogName, dbName, tableName, e.getMessage()); + throw new SemanticException("Failed to find Delta table for " + catalogName + "." + dbName + "." + tableName); + } catch (Exception e) { + LOG.error("Failed to get latest snapshot for {}.{}.{}, {}", catalogName, dbName, tableName, e.getMessage()); + throw new SemanticException("Failed to get latest snapshot for " + catalogName + "." + dbName + "." + tableName); + } + return new DeltaLakeSnapshot(dbName, tableName, deltaLakeEngine, snapshot, createTime, path); + } - Engine deltaLakeEngine = DeltaLakeEngine.create(hdfsConfiguration, properties, checkpointCache, jsonCache); - return DeltaUtils.convertDeltaToSRTable(catalogName, dbName, tableName, path, deltaLakeEngine, createTime); + @Override + public DeltaLakeTable getTable(String dbName, String tableName) { + DeltaLakeSnapshot snapshot = getLatestSnapshot(dbName, tableName); + return DeltaUtils.convertDeltaSnapshotToSRTable(catalogName, snapshot); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeSnapshot.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeSnapshot.java new file mode 100644 index 00000000000000..7e7e395e2ea0f7 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeSnapshot.java @@ -0,0 +1,60 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.delta; + +import io.delta.kernel.internal.SnapshotImpl; + +public class DeltaLakeSnapshot { + private final String dbName; + private final String tableName; + private final DeltaLakeEngine deltaLakeEngine; + private final SnapshotImpl snapshot; + private final long createTime; + private final String path; + + public DeltaLakeSnapshot(String dbName, String tableName, DeltaLakeEngine engine, SnapshotImpl snapshot, + long createTime, String path) { + this.dbName = dbName; + this.tableName = tableName; + this.deltaLakeEngine = engine; + this.snapshot = snapshot; + this.createTime = createTime; + this.path = path; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public DeltaLakeEngine getDeltaLakeEngine() { + return deltaLakeEngine; + } + + public SnapshotImpl getSnapshot() { + return snapshot; + } + + public long getCreateTime() { + return createTime; + } + + public String getPath() { + return path; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java index 4e0e184eb31c0a..b0a8aa64b24148 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java @@ -21,18 +21,12 @@ import com.starrocks.catalog.Type; import com.starrocks.common.ErrorCode; import com.starrocks.common.ErrorReport; -import com.starrocks.common.profile.Timer; -import com.starrocks.common.profile.Tracers; import com.starrocks.connector.ColumnTypeConverter; import com.starrocks.connector.exception.StarRocksConnectorException; import com.starrocks.connector.hive.RemoteFileInputFormat; -import com.starrocks.sql.analyzer.SemanticException; -import com.starrocks.sql.common.ErrorType; -import io.delta.kernel.Table; +import com.starrocks.sql.common.ErrorType;; import io.delta.kernel.data.ArrayValue; import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.engine.Engine; -import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; @@ -48,7 +42,6 @@ import java.util.Locale; import static com.starrocks.catalog.Column.COLUMN_UNIQUE_ID_INIT_VALUE; -import static com.starrocks.common.profile.Tracers.Module.EXTERNAL; import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR; public class DeltaUtils { @@ -62,28 +55,20 @@ public static void checkProtocolAndMetadata(Protocol protocol, Metadata metadata } } - public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path, - Engine deltaEngine, long createTime) { - SnapshotImpl snapshot; + public static DeltaLakeTable convertDeltaSnapshotToSRTable(String catalog, DeltaLakeSnapshot snapshot) { + String dbName = snapshot.getDbName(); + String tblName = snapshot.getTableName(); + DeltaLakeEngine deltaLakeEngine = snapshot.getDeltaLakeEngine(); + SnapshotImpl snapshotImpl = snapshot.getSnapshot(); + String path = snapshot.getPath(); - try (Timer ignored = Tracers.watchScope(EXTERNAL, "DeltaLake.getSnapshot")) { - Table deltaTable = Table.forPath(deltaEngine, path); - snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaEngine); - } catch (TableNotFoundException e) { - LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage()); - throw new SemanticException("Failed to find Delta table for " + catalog + "." + dbName + "." + tblName); - } catch (Exception e) { - LOG.error("Failed to get latest snapshot for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage()); - throw new SemanticException("Failed to get latest snapshot for " + catalog + "." + dbName + "." + tblName); - } - - StructType deltaSchema = snapshot.getSchema(deltaEngine); + StructType deltaSchema = snapshotImpl.getSchema(deltaLakeEngine); if (deltaSchema == null) { throw new IllegalArgumentException(String.format("Unable to find Schema information in Delta log for " + "%s.%s.%s", catalog, dbName, tblName)); } - String columnMappingMode = ColumnMapping.getColumnMappingMode(snapshot.getMetadata().getConfiguration()); + String columnMappingMode = ColumnMapping.getColumnMappingMode(snapshotImpl.getMetadata().getConfiguration()); List fullSchema = Lists.newArrayList(); for (StructField field : deltaSchema.fields()) { DataType dataType = field.getDataType(); @@ -99,8 +84,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName } return new DeltaLakeTable(CONNECTOR_ID_GENERATOR.getNextId().asInt(), catalog, dbName, tblName, fullSchema, - loadPartitionColumnNames(snapshot), snapshot, path, - deltaEngine, createTime); + loadPartitionColumnNames(snapshotImpl), snapshotImpl, path, deltaLakeEngine, snapshot.getCreateTime()); } private static List loadPartitionColumnNames(SnapshotImpl snapshot) { @@ -125,7 +109,7 @@ public static Column buildColumnWithColumnMapping(StructField field, Type type, if (columnMappingMode.equalsIgnoreCase(ColumnMapping.COLUMN_MAPPING_MODE_ID) && field.getMetadata().contains(ColumnMapping.COLUMN_MAPPING_ID_KEY)) { - columnUniqueId = ((Long) field.getMetadata().get(ColumnMapping.COLUMN_MAPPING_ID_KEY)).intValue(); + columnUniqueId = ((Long) field.getMetadata().get(ColumnMapping.COLUMN_MAPPING_ID_KEY)).intValue(); } if (columnMappingMode.equalsIgnoreCase(ColumnMapping.COLUMN_MAPPING_MODE_NAME) && field.getMetadata().contains(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY)) { diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/IDeltaLakeMetastore.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/IDeltaLakeMetastore.java index 56087d6d48523a..03d6a9e3e9c115 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/IDeltaLakeMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/IDeltaLakeMetastore.java @@ -21,7 +21,11 @@ import java.util.List; public interface IDeltaLakeMetastore extends IMetastore, MemoryTrackable { + String getCatalogName(); + Table getTable(String dbName, String tableName); List getPartitionKeys(String dbName, String tableName); + + DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName); } diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java index f8dad75d66b02c..b99fac944df7dc 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.optimizer.validate.ValidateException; import io.delta.kernel.Operation; import io.delta.kernel.Snapshot; @@ -68,26 +67,10 @@ public void testCheckTableFeatureSupported2(@Mocked Metadata metadata) { Lists.newArrayList()), metadata); } - @Test - public void testConvertDeltaToSRTableWithException1() { - expectedEx.expect(SemanticException.class); - expectedEx.expectMessage("Failed to find Delta table for catalog.db.tbl"); - - new MockUp() { - @mockit.Mock - public Table forPath(Engine deltaEngine, String path) throws TableNotFoundException { - throw new TableNotFoundException("Table not found"); - } - }; - - DeltaUtils.convertDeltaToSRTable("catalog", "db", "tbl", "path", - DeltaLakeEngine.create(new Configuration()), 0); - } - @Test public void testConvertDeltaToSRTableWithException2() { - expectedEx.expect(SemanticException.class); - expectedEx.expectMessage("Failed to get latest snapshot for catalog.db.tbl"); + expectedEx.expect(RuntimeException.class); + expectedEx.expectMessage("Failed to get latest snapshot"); Table table = new Table() { public Table forPath(Engine engine, String path) { return this; @@ -132,7 +115,8 @@ public Table forPath(Engine engine, String path) { } }; - DeltaUtils.convertDeltaToSRTable("catalog", "db", "tbl", "path", - DeltaLakeEngine.create(new Configuration()), 0); + Engine engine = DeltaLakeEngine.create(new Configuration()); + Table deltaTable = Table.forPath(engine, "path"); + deltaTable.getLatestSnapshot(engine); } } diff --git a/test/sql/test_deltalake/R/test_deltalake_catalog b/test/sql/test_deltalake/R/test_deltalake_catalog index 14255ce3fa179a..4dfa39d5ce384a 100644 --- a/test/sql/test_deltalake/R/test_deltalake_catalog +++ b/test/sql/test_deltalake/R/test_deltalake_catalog @@ -304,6 +304,12 @@ select a.c_int, b.c_map, b.c_nest.c_struct, a.c_nest.c_array[6] from delta_test_ 1 {1:{"c_date":"2001-01-03"}} None None 3 {33:{"c_date":"2003-01-02"}} None None -- !result +select a.c_int, b.c_map, b.c_nest.c_struct_new, a.c_nest.c_array[6] from delta_test_${uuid0}.delta_oss_db.delta_nested_type_par a join delta_test_${uuid0}.delta_oss_db.delta_nested_type_par b on a.c_int = b.c_nest.c_struct_new.c_int order by 1; +-- result: +1 {1:{"c_date":"2001-01-03"}} {"c_int":1} None +1 {1:{"c_date":"2001-01-03"}} {"c_int":1} None +3 {33:{"c_date":"2003-01-02"}} {"c_int":3} None +-- !result drop catalog delta_test_${uuid0} -- result: -- !result \ No newline at end of file diff --git a/test/sql/test_deltalake/T/test_deltalake_catalog b/test/sql/test_deltalake/T/test_deltalake_catalog index be89094c0e2a93..367b3ed3c30cc1 100644 --- a/test/sql/test_deltalake/T/test_deltalake_catalog +++ b/test/sql/test_deltalake/T/test_deltalake_catalog @@ -88,4 +88,7 @@ select col_tinyint,col_array,col_map,col_struct from delta_test_${uuid0}.delta_o select c_int,c_date from delta_test_${uuid0}.delta_oss_db.column_mapping_test where c_nest.c_struct_new.c_int is not null order by c_int nulls last, c_date nulls first; select a.c_int, b.c_map, b.c_nest.c_struct, a.c_nest.c_array[6] from delta_test_${uuid0}.delta_oss_db.column_mapping_test a join delta_test_${uuid0}.delta_oss_db.column_mapping_test b on a.c_int = b.c_nest.c_struct_new.c_int order by 1; +-- test table join self +select a.c_int, b.c_map, b.c_nest.c_struct_new, a.c_nest.c_array[6] from delta_test_${uuid0}.delta_oss_db.delta_nested_type_par a join delta_test_${uuid0}.delta_oss_db.delta_nested_type_par b on a.c_int = b.c_nest.c_struct_new.c_int order by 1; + drop catalog delta_test_${uuid0} \ No newline at end of file