Skip to content

Commit

Permalink
[BugFix] cache delta lake snapshot instead of table object
Browse files Browse the repository at this point in the history
Signed-off-by: Youngwb <[email protected]>
  • Loading branch information
Youngwb committed Dec 29, 2024
1 parent 42926be commit d879c3e
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

package com.starrocks.connector.delta;


import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import static com.google.common.cache.CacheLoader.asyncReloading;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;

public class CachingDeltaLakeMetastore extends CachingMetastore implements IDeltaLakeMetastore {
Expand All @@ -48,12 +50,16 @@ public class CachingDeltaLakeMetastore extends CachingMetastore implements IDelt

public final IDeltaLakeMetastore delegate;
private final Map<DatabaseTableName, Long> lastAccessTimeMap;
protected LoadingCache<DatabaseTableName, DeltaLakeSnapshot> tableSnapshotCache;

public CachingDeltaLakeMetastore(IDeltaLakeMetastore metastore, Executor executor, long expireAfterWriteSec,
long refreshIntervalSec, long maxSize) {
super(executor, expireAfterWriteSec, refreshIntervalSec, maxSize);
this.delegate = metastore;
this.lastAccessTimeMap = Maps.newConcurrentMap();
tableSnapshotCache = newCacheBuilder(expireAfterWriteSec, refreshIntervalSec, maxSize)
.build(asyncReloading(CacheLoader.from(dbTableName ->
getLatestSnapshot(dbTableName.getDatabaseName(), dbTableName.getTableName())), executor));
}

public static CachingDeltaLakeMetastore createQueryLevelInstance(IDeltaLakeMetastore metastore, long perQueryCacheMaxSize) {
Expand All @@ -66,11 +72,16 @@ public static CachingDeltaLakeMetastore createQueryLevelInstance(IDeltaLakeMetas
}

public static CachingDeltaLakeMetastore createCatalogLevelInstance(IDeltaLakeMetastore metastore, Executor executor,
long expireAfterWrite, long refreshInterval,
long maxSize) {
long expireAfterWrite, long refreshInterval,
long maxSize) {
return new CachingDeltaLakeMetastore(metastore, executor, expireAfterWrite, refreshInterval, maxSize);
}

@Override
public String getCatalogName() {
return delegate.getCatalogName();
}

@Override
public List<String> getAllDatabaseNames() {
return get(databaseNamesCache, "");
Expand Down Expand Up @@ -106,6 +117,19 @@ public Table loadTable(DatabaseTableName databaseTableName) {
return delegate.getTable(databaseTableName.getDatabaseName(), databaseTableName.getTableName());
}

public DeltaLakeSnapshot getCachedSnapshot(DatabaseTableName databaseTableName) {
return get(tableSnapshotCache, databaseTableName);
}

@Override
public DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName) {
if (delegate instanceof CachingDeltaLakeMetastore) {
return ((CachingDeltaLakeMetastore) delegate).getCachedSnapshot(DatabaseTableName.of(dbName, tableName));
} else {
return delegate.getLatestSnapshot(dbName, tableName);
}
}

@Override
public MetastoreTable getMetastoreTable(String dbName, String tableName) {
return delegate.getMetastoreTable(dbName, tableName);
Expand All @@ -117,7 +141,8 @@ public Table getTable(String dbName, String tableName) {
DatabaseTableName databaseTableName = DatabaseTableName.of(dbName, tableName);
lastAccessTimeMap.put(databaseTableName, System.currentTimeMillis());
}
return get(tableCache, DatabaseTableName.of(dbName, tableName));
DeltaLakeSnapshot snapshot = getCachedSnapshot(DatabaseTableName.of(dbName, tableName));
return DeltaUtils.convertDeltaSnapshotToSRTable(getCatalogName(), snapshot);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.DeltaLakeTable;
import com.starrocks.common.Pair;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.metastore.IMetastore;
import com.starrocks.connector.metastore.MetastoreTable;
import com.starrocks.sql.analyzer.SemanticException;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Table;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -46,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.starrocks.common.profile.Tracers.Module.EXTERNAL;
import static com.starrocks.connector.PartitionUtil.toHivePartitionName;

public abstract class DeltaLakeMetastore implements IDeltaLakeMetastore {
Expand Down Expand Up @@ -96,6 +103,11 @@ public List<JsonNode> load(@NotNull DeltaLakeFileStatus fileStatus) throws IOExc
});
}

@Override
public String getCatalogName() {
return catalogName;
}

@Override
public List<String> getAllDatabaseNames() {
return delegate.getAllDatabaseNames();
Expand All @@ -112,7 +124,7 @@ public Database getDb(String dbName) {
}

@Override
public DeltaLakeTable getTable(String dbName, String tableName) {
public DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName) {
MetastoreTable metastoreTable = getMetastoreTable(dbName, tableName);
if (metastoreTable == null) {
LOG.error("get metastore table failed. dbName: {}, tableName: {}", dbName, tableName);
Expand All @@ -121,9 +133,26 @@ public DeltaLakeTable getTable(String dbName, String tableName) {

String path = metastoreTable.getTableLocation();
long createTime = metastoreTable.getCreateTime();
DeltaLakeEngine deltaLakeEngine = DeltaLakeEngine.create(hdfsConfiguration, properties, checkpointCache, jsonCache);
SnapshotImpl snapshot;

try (Timer ignored = Tracers.watchScope(EXTERNAL, "DeltaLake.getSnapshot")) {
Table deltaTable = Table.forPath(deltaLakeEngine, path);
snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaLakeEngine);
} catch (TableNotFoundException e) {
LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalogName, dbName, tableName, e.getMessage());
throw new SemanticException("Failed to find Delta table for " + catalogName + "." + dbName + "." + tableName);
} catch (Exception e) {
LOG.error("Failed to get latest snapshot for {}.{}.{}, {}", catalogName, dbName, tableName, e.getMessage());
throw new SemanticException("Failed to get latest snapshot for " + catalogName + "." + dbName + "." + tableName);
}
return new DeltaLakeSnapshot(dbName, tableName, deltaLakeEngine, snapshot, createTime, path);
}

Engine deltaLakeEngine = DeltaLakeEngine.create(hdfsConfiguration, properties, checkpointCache, jsonCache);
return DeltaUtils.convertDeltaToSRTable(catalogName, dbName, tableName, path, deltaLakeEngine, createTime);
@Override
public DeltaLakeTable getTable(String dbName, String tableName) {
DeltaLakeSnapshot snapshot = getLatestSnapshot(dbName, tableName);
return DeltaUtils.convertDeltaSnapshotToSRTable(catalogName, snapshot);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.delta;

import io.delta.kernel.internal.SnapshotImpl;

public class DeltaLakeSnapshot {
private final String dbName;
private final String tableName;
private final DeltaLakeEngine deltaLakeEngine;
private final SnapshotImpl snapshot;
private final long createTime;
private final String path;

public DeltaLakeSnapshot(String dbName, String tableName, DeltaLakeEngine engine, SnapshotImpl snapshot,
long createTime, String path) {
this.dbName = dbName;
this.tableName = tableName;
this.deltaLakeEngine = engine;
this.snapshot = snapshot;
this.createTime = createTime;
this.path = path;
}

public String getDbName() {
return dbName;
}

public String getTableName() {
return tableName;
}

public DeltaLakeEngine getDeltaLakeEngine() {
return deltaLakeEngine;
}

public SnapshotImpl getSnapshot() {
return snapshot;
}

public long getCreateTime() {
return createTime;
}

public String getPath() {
return path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@
import com.starrocks.catalog.Type;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ColumnTypeConverter;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.hive.RemoteFileInputFormat;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.common.ErrorType;
import io.delta.kernel.Table;
import com.starrocks.sql.common.ErrorType;;

Check failure on line 27 in fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java

View workflow job for this annotation

GitHub Actions / FE Code Style Check

[checkstyle] reported by reviewdog 🐶 Line matches the illegal pattern 'Two or more semicolons appear, which is a duplicate.'. Raw Output: /github/workspace/./fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java:27:0: error: Line matches the illegal pattern 'Two or more semicolons appear, which is a duplicate.'. (com.puppycrawl.tools.checkstyle.checks.regexp.RegexpCheck)

Check failure on line 27 in fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java

View workflow job for this annotation

GitHub Actions / FE Code Style Check

[checkstyle] reported by reviewdog 🐶 Only one statement per line allowed. Raw Output: /github/workspace/./fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java:27:43: error: Only one statement per line allowed. (com.puppycrawl.tools.checkstyle.checks.coding.OneStatementPerLineCheck)
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand All @@ -48,7 +42,6 @@
import java.util.Locale;

import static com.starrocks.catalog.Column.COLUMN_UNIQUE_ID_INIT_VALUE;
import static com.starrocks.common.profile.Tracers.Module.EXTERNAL;
import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR;

public class DeltaUtils {
Expand All @@ -62,28 +55,20 @@ public static void checkProtocolAndMetadata(Protocol protocol, Metadata metadata
}
}

public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path,
Engine deltaEngine, long createTime) {
SnapshotImpl snapshot;
public static DeltaLakeTable convertDeltaSnapshotToSRTable(String catalog, DeltaLakeSnapshot snapshot) {
String dbName = snapshot.getDbName();
String tblName = snapshot.getTableName();
DeltaLakeEngine deltaLakeEngine = snapshot.getDeltaLakeEngine();
SnapshotImpl snapshotImpl = snapshot.getSnapshot();
String path = snapshot.getPath();

try (Timer ignored = Tracers.watchScope(EXTERNAL, "DeltaLake.getSnapshot")) {
Table deltaTable = Table.forPath(deltaEngine, path);
snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaEngine);
} catch (TableNotFoundException e) {
LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage());
throw new SemanticException("Failed to find Delta table for " + catalog + "." + dbName + "." + tblName);
} catch (Exception e) {
LOG.error("Failed to get latest snapshot for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage());
throw new SemanticException("Failed to get latest snapshot for " + catalog + "." + dbName + "." + tblName);
}

StructType deltaSchema = snapshot.getSchema(deltaEngine);
StructType deltaSchema = snapshotImpl.getSchema(deltaLakeEngine);
if (deltaSchema == null) {
throw new IllegalArgumentException(String.format("Unable to find Schema information in Delta log for " +
"%s.%s.%s", catalog, dbName, tblName));
}

String columnMappingMode = ColumnMapping.getColumnMappingMode(snapshot.getMetadata().getConfiguration());
String columnMappingMode = ColumnMapping.getColumnMappingMode(snapshotImpl.getMetadata().getConfiguration());
List<Column> fullSchema = Lists.newArrayList();
for (StructField field : deltaSchema.fields()) {
DataType dataType = field.getDataType();
Expand All @@ -99,8 +84,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName
}

return new DeltaLakeTable(CONNECTOR_ID_GENERATOR.getNextId().asInt(), catalog, dbName, tblName, fullSchema,
loadPartitionColumnNames(snapshot), snapshot, path,
deltaEngine, createTime);
loadPartitionColumnNames(snapshotImpl), snapshotImpl, path, deltaLakeEngine, snapshot.getCreateTime());
}

private static List<String> loadPartitionColumnNames(SnapshotImpl snapshot) {
Expand All @@ -125,7 +109,7 @@ public static Column buildColumnWithColumnMapping(StructField field, Type type,

if (columnMappingMode.equalsIgnoreCase(ColumnMapping.COLUMN_MAPPING_MODE_ID) &&
field.getMetadata().contains(ColumnMapping.COLUMN_MAPPING_ID_KEY)) {
columnUniqueId = ((Long) field.getMetadata().get(ColumnMapping.COLUMN_MAPPING_ID_KEY)).intValue();
columnUniqueId = ((Long) field.getMetadata().get(ColumnMapping.COLUMN_MAPPING_ID_KEY)).intValue();
}
if (columnMappingMode.equalsIgnoreCase(ColumnMapping.COLUMN_MAPPING_MODE_NAME) &&
field.getMetadata().contains(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import java.util.List;

public interface IDeltaLakeMetastore extends IMetastore, MemoryTrackable {
String getCatalogName();

Table getTable(String dbName, String tableName);

List<String> getPartitionKeys(String dbName, String tableName);

DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.optimizer.validate.ValidateException;
import io.delta.kernel.Operation;
import io.delta.kernel.Snapshot;
Expand Down Expand Up @@ -68,26 +67,10 @@ public void testCheckTableFeatureSupported2(@Mocked Metadata metadata) {
Lists.newArrayList()), metadata);
}

@Test
public void testConvertDeltaToSRTableWithException1() {
expectedEx.expect(SemanticException.class);
expectedEx.expectMessage("Failed to find Delta table for catalog.db.tbl");

new MockUp<Table>() {
@mockit.Mock
public Table forPath(Engine deltaEngine, String path) throws TableNotFoundException {
throw new TableNotFoundException("Table not found");
}
};

DeltaUtils.convertDeltaToSRTable("catalog", "db", "tbl", "path",
DeltaLakeEngine.create(new Configuration()), 0);
}

@Test
public void testConvertDeltaToSRTableWithException2() {
expectedEx.expect(SemanticException.class);
expectedEx.expectMessage("Failed to get latest snapshot for catalog.db.tbl");
expectedEx.expect(RuntimeException.class);
expectedEx.expectMessage("Failed to get latest snapshot");
Table table = new Table() {
public Table forPath(Engine engine, String path) {
return this;
Expand Down Expand Up @@ -132,7 +115,8 @@ public Table forPath(Engine engine, String path) {
}
};

DeltaUtils.convertDeltaToSRTable("catalog", "db", "tbl", "path",
DeltaLakeEngine.create(new Configuration()), 0);
Engine engine = DeltaLakeEngine.create(new Configuration());
Table deltaTable = Table.forPath(engine, "path");
deltaTable.getLatestSnapshot(engine);
}
}
6 changes: 6 additions & 0 deletions test/sql/test_deltalake/R/test_deltalake_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ select a.c_int, b.c_map, b.c_nest.c_struct, a.c_nest.c_array[6] from delta_test_
1 {1:{"c_date":"2001-01-03"}} None None
3 {33:{"c_date":"2003-01-02"}} None None
-- !result
select a.c_int, b.c_map, b.c_nest.c_struct_new, a.c_nest.c_array[6] from delta_test_${uuid0}.delta_oss_db.delta_nested_type_par a join delta_test_${uuid0}.delta_oss_db.delta_nested_type_par b on a.c_int = b.c_nest.c_struct_new.c_int order by 1;
-- result:
1 {1:{"c_date":"2001-01-03"}} {"c_int":1} None
1 {1:{"c_date":"2001-01-03"}} {"c_int":1} None
3 {33:{"c_date":"2003-01-02"}} {"c_int":3} None
-- !result
drop catalog delta_test_${uuid0}
-- result:
-- !result
3 changes: 3 additions & 0 deletions test/sql/test_deltalake/T/test_deltalake_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,7 @@ select col_tinyint,col_array,col_map,col_struct from delta_test_${uuid0}.delta_o
select c_int,c_date from delta_test_${uuid0}.delta_oss_db.column_mapping_test where c_nest.c_struct_new.c_int is not null order by c_int nulls last, c_date nulls first;
select a.c_int, b.c_map, b.c_nest.c_struct, a.c_nest.c_array[6] from delta_test_${uuid0}.delta_oss_db.column_mapping_test a join delta_test_${uuid0}.delta_oss_db.column_mapping_test b on a.c_int = b.c_nest.c_struct_new.c_int order by 1;

-- test table join self
select a.c_int, b.c_map, b.c_nest.c_struct_new, a.c_nest.c_array[6] from delta_test_${uuid0}.delta_oss_db.delta_nested_type_par a join delta_test_${uuid0}.delta_oss_db.delta_nested_type_par b on a.c_int = b.c_nest.c_struct_new.c_int order by 1;

drop catalog delta_test_${uuid0}

0 comments on commit d879c3e

Please sign in to comment.