Skip to content

Commit

Permalink
[BugFix] cache delta lake snapshot instead of table object (backport #…
Browse files Browse the repository at this point in the history
…54473) (#54524)

Co-authored-by: Youngwb <[email protected]>
  • Loading branch information
mergify[bot] and Youngwb authored Dec 31, 2024
1 parent b9eb862 commit c246034
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

package com.starrocks.connector.delta;


import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import static com.google.common.cache.CacheLoader.asyncReloading;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;

public class CachingDeltaLakeMetastore extends CachingMetastore implements IDeltaLakeMetastore {
Expand All @@ -48,12 +50,16 @@ public class CachingDeltaLakeMetastore extends CachingMetastore implements IDelt

public final IDeltaLakeMetastore delegate;
private final Map<DatabaseTableName, Long> lastAccessTimeMap;
protected LoadingCache<DatabaseTableName, DeltaLakeSnapshot> tableSnapshotCache;

public CachingDeltaLakeMetastore(IDeltaLakeMetastore metastore, Executor executor, long expireAfterWriteSec,
long refreshIntervalSec, long maxSize) {
super(executor, expireAfterWriteSec, refreshIntervalSec, maxSize);
this.delegate = metastore;
this.lastAccessTimeMap = Maps.newConcurrentMap();
tableSnapshotCache = newCacheBuilder(expireAfterWriteSec, refreshIntervalSec, maxSize)
.build(asyncReloading(CacheLoader.from(dbTableName ->
getLatestSnapshot(dbTableName.getDatabaseName(), dbTableName.getTableName())), executor));
}

public static CachingDeltaLakeMetastore createQueryLevelInstance(IDeltaLakeMetastore metastore, long perQueryCacheMaxSize) {
Expand All @@ -66,11 +72,16 @@ public static CachingDeltaLakeMetastore createQueryLevelInstance(IDeltaLakeMetas
}

public static CachingDeltaLakeMetastore createCatalogLevelInstance(IDeltaLakeMetastore metastore, Executor executor,
long expireAfterWrite, long refreshInterval,
long maxSize) {
long expireAfterWrite, long refreshInterval,
long maxSize) {
return new CachingDeltaLakeMetastore(metastore, executor, expireAfterWrite, refreshInterval, maxSize);
}

@Override
public String getCatalogName() {
return delegate.getCatalogName();
}

@Override
public List<String> getAllDatabaseNames() {
return get(databaseNamesCache, "");
Expand Down Expand Up @@ -106,6 +117,19 @@ public Table loadTable(DatabaseTableName databaseTableName) {
return delegate.getTable(databaseTableName.getDatabaseName(), databaseTableName.getTableName());
}

public DeltaLakeSnapshot getCachedSnapshot(DatabaseTableName databaseTableName) {
return get(tableSnapshotCache, databaseTableName);
}

@Override
public DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName) {
if (delegate instanceof CachingDeltaLakeMetastore) {
return ((CachingDeltaLakeMetastore) delegate).getCachedSnapshot(DatabaseTableName.of(dbName, tableName));
} else {
return delegate.getLatestSnapshot(dbName, tableName);
}
}

@Override
public MetastoreTable getMetastoreTable(String dbName, String tableName) {
return delegate.getMetastoreTable(dbName, tableName);
Expand All @@ -117,7 +141,8 @@ public Table getTable(String dbName, String tableName) {
DatabaseTableName databaseTableName = DatabaseTableName.of(dbName, tableName);
lastAccessTimeMap.put(databaseTableName, System.currentTimeMillis());
}
return get(tableCache, DatabaseTableName.of(dbName, tableName));
DeltaLakeSnapshot snapshot = getCachedSnapshot(DatabaseTableName.of(dbName, tableName));
return DeltaUtils.convertDeltaSnapshotToSRTable(getCatalogName(), snapshot);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.DeltaLakeTable;
import com.starrocks.common.Pair;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.metastore.IMetastore;
import com.starrocks.connector.metastore.MetastoreTable;
import com.starrocks.sql.analyzer.SemanticException;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Table;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -46,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.starrocks.common.profile.Tracers.Module.EXTERNAL;
import static com.starrocks.connector.PartitionUtil.toHivePartitionName;

public abstract class DeltaLakeMetastore implements IDeltaLakeMetastore {
Expand Down Expand Up @@ -96,6 +103,11 @@ public List<JsonNode> load(@NotNull DeltaLakeFileStatus fileStatus) throws IOExc
});
}

@Override
public String getCatalogName() {
return catalogName;
}

@Override
public List<String> getAllDatabaseNames() {
return delegate.getAllDatabaseNames();
Expand All @@ -112,7 +124,7 @@ public Database getDb(String dbName) {
}

@Override
public DeltaLakeTable getTable(String dbName, String tableName) {
public DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName) {
MetastoreTable metastoreTable = getMetastoreTable(dbName, tableName);
if (metastoreTable == null) {
LOG.error("get metastore table failed. dbName: {}, tableName: {}", dbName, tableName);
Expand All @@ -121,9 +133,26 @@ public DeltaLakeTable getTable(String dbName, String tableName) {

String path = metastoreTable.getTableLocation();
long createTime = metastoreTable.getCreateTime();
DeltaLakeEngine deltaLakeEngine = DeltaLakeEngine.create(hdfsConfiguration, properties, checkpointCache, jsonCache);
SnapshotImpl snapshot;

try (Timer ignored = Tracers.watchScope(EXTERNAL, "DeltaLake.getSnapshot")) {
Table deltaTable = Table.forPath(deltaLakeEngine, path);
snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaLakeEngine);
} catch (TableNotFoundException e) {
LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalogName, dbName, tableName, e.getMessage());
throw new SemanticException("Failed to find Delta table for " + catalogName + "." + dbName + "." + tableName);
} catch (Exception e) {
LOG.error("Failed to get latest snapshot for {}.{}.{}, {}", catalogName, dbName, tableName, e.getMessage());
throw new SemanticException("Failed to get latest snapshot for " + catalogName + "." + dbName + "." + tableName);
}
return new DeltaLakeSnapshot(dbName, tableName, deltaLakeEngine, snapshot, createTime, path);
}

Engine deltaLakeEngine = DeltaLakeEngine.create(hdfsConfiguration, properties, checkpointCache, jsonCache);
return DeltaUtils.convertDeltaToSRTable(catalogName, dbName, tableName, path, deltaLakeEngine, createTime);
@Override
public DeltaLakeTable getTable(String dbName, String tableName) {
DeltaLakeSnapshot snapshot = getLatestSnapshot(dbName, tableName);
return DeltaUtils.convertDeltaSnapshotToSRTable(catalogName, snapshot);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.delta;

import io.delta.kernel.internal.SnapshotImpl;

public class DeltaLakeSnapshot {
private final String dbName;
private final String tableName;
private final DeltaLakeEngine deltaLakeEngine;
private final SnapshotImpl snapshot;
private final long createTime;
private final String path;

public DeltaLakeSnapshot(String dbName, String tableName, DeltaLakeEngine engine, SnapshotImpl snapshot,
long createTime, String path) {
this.dbName = dbName;
this.tableName = tableName;
this.deltaLakeEngine = engine;
this.snapshot = snapshot;
this.createTime = createTime;
this.path = path;
}

public String getDbName() {
return dbName;
}

public String getTableName() {
return tableName;
}

public DeltaLakeEngine getDeltaLakeEngine() {
return deltaLakeEngine;
}

public SnapshotImpl getSnapshot() {
return snapshot;
}

public long getCreateTime() {
return createTime;
}

public String getPath() {
return path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@
import com.starrocks.catalog.Type;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ColumnTypeConverter;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.hive.RemoteFileInputFormat;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.common.ErrorType;
import io.delta.kernel.Table;
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand All @@ -48,7 +42,6 @@
import java.util.Locale;

import static com.starrocks.catalog.Column.COLUMN_UNIQUE_ID_INIT_VALUE;
import static com.starrocks.common.profile.Tracers.Module.EXTERNAL;
import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR;

public class DeltaUtils {
Expand All @@ -62,28 +55,20 @@ public static void checkProtocolAndMetadata(Protocol protocol, Metadata metadata
}
}

public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path,
Engine deltaEngine, long createTime) {
SnapshotImpl snapshot;
public static DeltaLakeTable convertDeltaSnapshotToSRTable(String catalog, DeltaLakeSnapshot snapshot) {
String dbName = snapshot.getDbName();
String tblName = snapshot.getTableName();
DeltaLakeEngine deltaLakeEngine = snapshot.getDeltaLakeEngine();
SnapshotImpl snapshotImpl = snapshot.getSnapshot();
String path = snapshot.getPath();

try (Timer ignored = Tracers.watchScope(EXTERNAL, "DeltaLake.getSnapshot")) {
Table deltaTable = Table.forPath(deltaEngine, path);
snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaEngine);
} catch (TableNotFoundException e) {
LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage());
throw new SemanticException("Failed to find Delta table for " + catalog + "." + dbName + "." + tblName);
} catch (Exception e) {
LOG.error("Failed to get latest snapshot for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage());
throw new SemanticException("Failed to get latest snapshot for " + catalog + "." + dbName + "." + tblName);
}

StructType deltaSchema = snapshot.getSchema(deltaEngine);
StructType deltaSchema = snapshotImpl.getSchema(deltaLakeEngine);
if (deltaSchema == null) {
throw new IllegalArgumentException(String.format("Unable to find Schema information in Delta log for " +
"%s.%s.%s", catalog, dbName, tblName));
}

String columnMappingMode = ColumnMapping.getColumnMappingMode(snapshot.getMetadata().getConfiguration());
String columnMappingMode = ColumnMapping.getColumnMappingMode(snapshotImpl.getMetadata().getConfiguration());
List<Column> fullSchema = Lists.newArrayList();
for (StructField field : deltaSchema.fields()) {
DataType dataType = field.getDataType();
Expand All @@ -99,11 +84,10 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName
}

return new DeltaLakeTable(CONNECTOR_ID_GENERATOR.getNextId().asInt(), catalog, dbName, tblName, fullSchema,
loadPartitionColumnNames(snapshot), snapshot, path,
deltaEngine, createTime);
loadPartitionColumnNames(snapshotImpl), snapshotImpl, path, deltaLakeEngine, snapshot.getCreateTime());
}

private static List<String> loadPartitionColumnNames(SnapshotImpl snapshot) {
public static List<String> loadPartitionColumnNames(SnapshotImpl snapshot) {
ArrayValue partitionColumns = snapshot.getMetadata().getPartitionColumns();
ColumnVector partitionColNameVector = partitionColumns.getElements();
List<String> partitionColumnNames = Lists.newArrayList();
Expand All @@ -125,7 +109,7 @@ public static Column buildColumnWithColumnMapping(StructField field, Type type,

if (columnMappingMode.equalsIgnoreCase(ColumnMapping.COLUMN_MAPPING_MODE_ID) &&
field.getMetadata().contains(ColumnMapping.COLUMN_MAPPING_ID_KEY)) {
columnUniqueId = ((Long) field.getMetadata().get(ColumnMapping.COLUMN_MAPPING_ID_KEY)).intValue();
columnUniqueId = ((Long) field.getMetadata().get(ColumnMapping.COLUMN_MAPPING_ID_KEY)).intValue();
}
if (columnMappingMode.equalsIgnoreCase(ColumnMapping.COLUMN_MAPPING_MODE_NAME) &&
field.getMetadata().contains(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import java.util.List;

public interface IDeltaLakeMetastore extends IMetastore, MemoryTrackable {
String getCatalogName();

Table getTable(String dbName, String tableName);

List<String> getPartitionKeys(String dbName, String tableName);

DeltaLakeSnapshot getLatestSnapshot(String dbName, String tableName);
}
Loading

0 comments on commit c246034

Please sign in to comment.