Skip to content

Commit

Permalink
[Enhancement]global dictionary cache for table on lake (#55265)
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 authored Jan 22, 2025
1 parent 3a3a45c commit 51a2366
Show file tree
Hide file tree
Showing 14 changed files with 680 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public String getUUID() {
}
}

@Override
public List<Column> getPartitionColumns() {
return partColumnNames.stream()
.map(name -> nameToColumn.get(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ public class ThreadPoolManager {

private static final long KEEP_ALIVE_TIME = 60L;

private static final ThreadPoolExecutor DICT_CACHE_THREAD =
ThreadPoolManager.newDaemonCacheThreadPool(Config.dict_collect_thread_pool_size, "cache-dict",
false);

public static ThreadPoolExecutor getDictCacheThread() {
return DICT_CACHE_THREAD;
}

public static void registerAllThreadPoolMetric() {
for (Map.Entry<String, ThreadPoolExecutor> entry : nameToThreadPoolMap.entrySet()) {
registerThreadPoolMetric(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.connector.statistics;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -28,6 +29,7 @@
import io.trino.hive.$internal.org.apache.commons.lang3.tuple.ImmutableTriple;
import io.trino.hive.$internal.org.apache.commons.lang3.tuple.Triple;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -68,6 +70,12 @@ public static Triple<String, Database, Table> getTableTripleByUUID(String tableU
return ImmutableTriple.of(splits[0], db, table);
}

public static List<String> getTableNameByUUID(String tableUUID) {
String[] splits = tableUUID.split("\\.");
Preconditions.checkState(splits.length >= 3);
return ImmutableList.of(splits[0], splits[1], splits[2]);
}

public static Statistics buildDefaultStatistics(Set<ColumnRefOperator> columns) {
Statistics.Builder statisticsBuilder = Statistics.builder();
statisticsBuilder.setOutputRowCount(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.starrocks.connector.GetRemoteFilesParams;
import com.starrocks.connector.RemoteFileInfoDefaultSource;
import com.starrocks.connector.RemoteFileInfoSource;
import com.starrocks.connector.RemoteFilesSampleStrategy;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.delta.DeltaConnectorScanRangeSource;
import com.starrocks.connector.delta.DeltaUtils;
Expand Down Expand Up @@ -212,4 +213,9 @@ protected void toThrift(TPlanNode msg) {
public boolean canUseRuntimeAdaptiveDop() {
return true;
}

@Override
public void setScanSampleStrategy(RemoteFilesSampleStrategy strategy) {
scanRangeSource.setSampleStrategy(strategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.catalog.HiveTable;
import com.starrocks.catalog.Type;
import com.starrocks.connector.CatalogConnector;
import com.starrocks.connector.RemoteFilesSampleStrategy;
import com.starrocks.connector.hive.HiveConnectorScanRangeSource;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.datacache.DataCacheOptions;
Expand Down Expand Up @@ -277,4 +278,9 @@ public boolean canUseRuntimeAdaptiveDop() {
protected boolean supportTopNRuntimeFilter() {
return true;
}

@Override
public void setScanSampleStrategy(RemoteFilesSampleStrategy strategy) {
scanRangeSource.setSampleStrategy(strategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.starrocks.connector.RemoteFileInfo;
import com.starrocks.connector.RemoteFileInfoDefaultSource;
import com.starrocks.connector.RemoteFileInfoSource;
import com.starrocks.connector.RemoteFilesSampleStrategy;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.iceberg.IcebergConnectorScanRangeSource;
import com.starrocks.connector.iceberg.IcebergGetRemoteFilesParams;
Expand Down Expand Up @@ -311,4 +312,9 @@ public boolean canUseRuntimeAdaptiveDop() {
protected boolean supportTopNRuntimeFilter() {
return true;
}

@Override
public void setScanSampleStrategy(RemoteFilesSampleStrategy strategy) {
scanRangeSource.setSampleStrategy(strategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public List<TGlobalDict> dictToThrift(List<Pair<Integer, ColumnDict>> dicts) {
strings.add(kv.getKey());
integers.add(kv.getValue());
}
globalDict.setVersion(dictPair.second.getCollectedVersionTime());
globalDict.setVersion(dictPair.second.getCollectedVersion());
globalDict.setStrings(strings);
globalDict.setIds(integers);
result.add(globalDict);
Expand All @@ -550,7 +550,7 @@ public List<TGlobalDict> normalizeDicts(List<Pair<Integer, ColumnDict>> dicts, F
for (Pair<Integer, ColumnDict> dictPair : sortedDicts) {
TGlobalDict globalDict = new TGlobalDict();
globalDict.setColumnId(dictPair.first);
globalDict.setVersion(dictPair.second.getCollectedVersionTime());
globalDict.setVersion(dictPair.second.getCollectedVersion());
result.add(globalDict);
}
return result;
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.starrocks.analysis.TupleDescriptor;
import com.starrocks.catalog.ColumnAccessPath;
import com.starrocks.common.StarRocksException;
import com.starrocks.connector.RemoteFilesSampleStrategy;
import com.starrocks.datacache.DataCacheOptions;
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.optimizer.ScanOptimzeOption;
Expand Down Expand Up @@ -179,4 +180,7 @@ public boolean needCollectExecStats() {
public boolean isRunningAsConnectorOperator() {
return true;
}

public void setScanSampleStrategy(RemoteFilesSampleStrategy strategy) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ public CompletableFuture<Optional<ColumnDict>> asyncReload(

private final AsyncLoadingCache<ColumnIdentifier, Optional<ColumnDict>> dictStatistics = Caffeine.newBuilder()
.maximumSize(Config.statistic_dict_columns)
.executor(ThreadPoolManager.newDaemonCacheThreadPool(Config.dict_collect_thread_pool_size, "cache-dict",
false))
.executor(ThreadPoolManager.getDictCacheThread())
.buildAsync(dictLoader);

private Optional<ColumnDict> deserializeColumnDict(long tableId, ColumnId columnName, TStatisticData statisticData) {
Expand Down Expand Up @@ -193,7 +192,7 @@ public boolean hasGlobalDict(long tableId, ColumnId columnName, long versionTime
if (!realResult.isPresent()) {
LOG.debug("Invalidate column {} dict cache because don't present", columnName);
dictStatistics.synchronous().invalidate(columnIdentifier);
} else if (realResult.get().getVersionTime() < versionTime) {
} else if (realResult.get().getVersion() < versionTime) {
LOG.debug("Invalidate column {} dict cache because out of date", columnName);
dictStatistics.synchronous().invalidate(columnIdentifier);
} else {
Expand Down Expand Up @@ -267,14 +266,14 @@ public void updateGlobalDict(long tableId, ColumnId columnName, long collectVers
Optional<ColumnDict> columnOptional = future.get();
if (columnOptional.isPresent()) {
ColumnDict columnDict = columnOptional.get();
long lastVersion = columnDict.getVersionTime();
long dictCollectVersion = columnDict.getCollectedVersionTime();
long lastVersion = columnDict.getVersion();
long dictCollectVersion = columnDict.getCollectedVersion();
if (collectVersion != dictCollectVersion) {
LOG.info("remove dict by unmatched version {}:{}", collectVersion, dictCollectVersion);
removeGlobalDict(tableId, columnName);
return;
}
columnDict.updateVersionTime(versionTime);
columnDict.updateVersion(versionTime);
LOG.info("update dict for table {} column {} from version {} to {}", tableId, columnName,
lastVersion, versionTime);
}
Expand Down
Loading

0 comments on commit 51a2366

Please sign in to comment.