Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
Signed-off-by: drake_wang <[email protected]>
  • Loading branch information
wxl24life committed Jan 20, 2025
1 parent 51d052b commit a264e07
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ public class CompactionMgr implements MemoryTrackable {
*
* Note that, this will prevent all partitions whose tableId is maintained in the map from being compacted
*/
private final ConcurrentHashMap<Long, Long> activeCompactionTransactionMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Long> remainedActiveCompactionTxnWhenStart = new ConcurrentHashMap<>();

@VisibleForTesting
protected synchronized ConcurrentHashMap<Long, Long> getActiveCompactionTransactionMap() {
return activeCompactionTransactionMap;
protected ConcurrentHashMap<Long, Long> getRemainedActiveCompactionTxnWhenStart() {
return remainedActiveCompactionTxnWhenStart;
}

public CompactionMgr() {
Expand Down Expand Up @@ -115,22 +115,22 @@ public void start() {
/**
* iterate all transactions and find those with LAKE_COMPACTION labels and are not finished before FE restart.
**/
protected synchronized void rebuildActiveCompactionTransactionMapOnRestart() {
protected void rebuildActiveCompactionTransactionMapOnRestart() {
Map<Long, Long> activeTxnStates =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLakeCompactionActiveTxnStats();
for (Map.Entry<Long, Long> txnState : activeTxnStates.entrySet()) {
// for lake compaction txn, there can only be one table id for each txn state
activeCompactionTransactionMap.put(txnState.getKey(), txnState.getValue());
remainedActiveCompactionTxnWhenStart.put(txnState.getKey(), txnState.getValue());
LOG.info("Found lake compaction transaction not finished on table {}, txn_id: {}", txnState.getValue(),
txnState.getKey());
}
}

protected synchronized void removeFromStartupActiveCompactionTransactionMap(long txnId) {
if (activeCompactionTransactionMap.isEmpty()) {
protected void removeFromStartupActiveCompactionTransactionMap(long txnId) {
if (remainedActiveCompactionTxnWhenStart.isEmpty()) {
return;
}
boolean ret = activeCompactionTransactionMap.keySet().removeIf(key -> key == txnId);
boolean ret = remainedActiveCompactionTxnWhenStart.keySet().removeIf(key -> key == txnId);
if (ret) {
LOG.info("Removed transaction {} from startup active compaction transaction map", txnId);
}
Expand Down Expand Up @@ -172,25 +172,15 @@ public void handleCompactionFinished(PartitionIdentifier partition, long version

@NotNull
List<PartitionStatisticsSnapshot> choosePartitionsToCompact(@NotNull Set<PartitionIdentifier> excludes,
@NotNull Set<Long> excludeTables) {
return choosePartitionsToCompact(excludeTables)
@NotNull Set<Long> excludeTables) {
Set<Long> copiedExcludeTables = new HashSet<>(excludeTables);
copiedExcludeTables.addAll(remainedActiveCompactionTxnWhenStart.values());
return choosePartitionsToCompact(copiedExcludeTables)
.stream()
.filter(p -> !excludes.contains(p.getPartition()))
.filter(p -> !shouldSkipChoseTable(p.getPartition().getTableId()))
.collect(Collectors.toList());
}

/**
* check if this table in `activeCompactionTransactionMap`,
* if that is the case, we should skip this table while `choosePartitionsToCompact`
*/
protected synchronized boolean shouldSkipChoseTable(long tableId) {
if (activeCompactionTransactionMap.isEmpty()) {
return false;
}
return activeCompactionTransactionMap.containsValue(tableId);
}

@NotNull
List<PartitionStatisticsSnapshot> choosePartitionsToCompact(Set<Long> excludeTables) {
List<PartitionStatisticsSnapshot> selection = sorter.sort(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,21 +782,14 @@ public Map<Long, Long> getLakeCompactionActiveTxnMap() {
// for lake compaction txn, there can only be one table id for each txn state
Map<Long, Long> txnIdToTableIdMap = new HashMap<>();
idToRunningTransactionState.values().stream()
.filter(state -> !isTransactionFinished(state))
.filter(state -> state.getSourceType() == TransactionState.LoadJobSourceType.LAKE_COMPACTION)
.sorted(Comparator.comparing(TransactionState::getPrepareTime))
.forEach(state -> txnIdToTableIdMap.put(state.getTransactionId(), state.getTableIdList().get(0)));
return txnIdToTableIdMap;
} finally {
readUnlock();
}
}

private boolean isTransactionFinished(TransactionState transactionState) {
return transactionState.getTransactionStatus() == TransactionStatus.ABORTED ||
transactionState.getTransactionStatus() == TransactionStatus.VISIBLE;
}

// Check whether there is committed txns on partitionId.
public boolean hasCommittedTxnOnPartition(long tableId, long partitionId) {
readLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public void testChoosePartitionsToCompactWithActiveTxnFilter() {
// both partition10 and partition11 are filtered because table1 has active txn
Assert.assertEquals(1, compactionList.size());
Assert.assertSame(partition20, compactionList.get(0).getPartition());

Set<Long> excludeTables = new HashSet<>();
excludeTables.add(tableId2);
compactionList = compactionManager.choosePartitionsToCompact(new HashSet<>(), excludeTables);
// tableId2 is filtered by excludeTables
Assert.assertEquals(0, compactionList.size());
}

@Test
Expand Down Expand Up @@ -292,13 +298,11 @@ public void testActiveCompactionTransactionMapOnRestart() {

CompactionMgr compactionMgr = new CompactionMgr();
compactionMgr.rebuildActiveCompactionTransactionMapOnRestart();
ConcurrentHashMap<Long, Long> activeCompactionTransactionMap = compactionMgr.getActiveCompactionTransactionMap();
ConcurrentHashMap<Long, Long> activeCompactionTransactionMap =
compactionMgr.getRemainedActiveCompactionTxnWhenStart();
Assert.assertEquals(1, activeCompactionTransactionMap.size());
Assert.assertTrue(activeCompactionTransactionMap.containsValue(tableId));

// should skip table
Assert.assertTrue(compactionMgr.shouldSkipChoseTable(tableId));

// test for removeFromStartupActiveCompactionTransactionMap
long nonExistedTxnId = 10003L;
compactionMgr.removeFromStartupActiveCompactionTransactionMap(nonExistedTxnId);
Expand Down

0 comments on commit a264e07

Please sign in to comment.