Skip to content

Commit

Permalink
better concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
red-0ne committed Jan 30, 2025
1 parent 6049331 commit 1486459
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 14 deletions.
18 changes: 16 additions & 2 deletions pkg/relayer/session/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ func (rs *relayerSessionsManager) goCreateClaimRoots(
) {
failedClaims := []relayer.SessionTree{}
flushedClaims := []relayer.SessionTree{}

failedClaimsCh := make(chan relayer.SessionTree, len(sessionTrees))
flushedClaimsCh := make(chan relayer.SessionTree, len(sessionTrees))

wg := sync.WaitGroup{}
sem := make(chan struct{}, runtime.NumCPU())
for _, sessionTree := range sessionTrees {
Expand All @@ -251,15 +255,25 @@ func (rs *relayerSessionsManager) goCreateClaimRoots(
// This session should no longer be updated
if _, err := tree.Flush(); err != nil {
rs.logger.Error().Err(err).Msg("failed to flush session")
failedClaims = append(failedClaims, tree)
failedClaimsCh <- tree
return
}

flushedClaims = append(flushedClaims, tree)
flushedClaimsCh <- tree
}(sessionTree)
}

wg.Wait()
close(failedClaimsCh)
close(flushedClaimsCh)

for failedClaim := range failedClaimsCh {
failedClaims = append(failedClaims, failedClaim)
}

for flushedClaim := range flushedClaimsCh {
flushedClaims = append(flushedClaims, flushedClaim)
}

failSubmitProofsSessionsCh <- failedClaims
claimsFlushedCh <- flushedClaims
Expand Down
31 changes: 19 additions & 12 deletions pkg/relayer/session/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,13 @@ func (rs *relayerSessionsManager) proveClaims(
) (successProofs []relayer.SessionTree, failedProofs []relayer.SessionTree) {
logger := rs.logger.With("method", "proveClaims")

proofsMu := sync.Mutex{}
wg := sync.WaitGroup{}
wg := &sync.WaitGroup{}
sem := make(chan struct{}, runtime.NumCPU())

// Create buffered channels for collecting results
successProofsCh := make(chan relayer.SessionTree, len(sessionTrees))
failedProofsCh := make(chan relayer.SessionTree, len(sessionTrees))

for _, sessionTree := range sessionTrees {
sem <- struct{}{}
wg.Add(1)
Expand All @@ -231,9 +234,7 @@ func (rs *relayerSessionsManager) proveClaims(
// do not create the claim since the proof requirement is unknown.
// WARNING: Creating a claim and not submitting a proof (if necessary) could lead to a stake burn!!
if err != nil {
proofsMu.Lock()
failedProofs = append(failedProofs, tree)
proofsMu.Unlock()
failedProofsCh <- tree
rs.logger.Error().Err(err).Msg("failed to determine if proof is required, skipping claim creation")
return
}
Expand All @@ -249,22 +250,28 @@ func (rs *relayerSessionsManager) proveClaims(

// If the proof cannot be generated, add the sessionTree to the failedProofs.
if _, err := tree.ProveClosest(path); err != nil {
proofsMu.Lock()
failedProofs = append(failedProofs, tree)
proofsMu.Unlock()
failedProofsCh <- tree
logger.Error().Err(err).Msg("failed to generate proof")
return
}

// If the proof was generated successfully, add the sessionTree to the
// successProofs slice that will be sent to the proof submission step.
proofsMu.Lock()
successProofs = append(successProofs, tree)
proofsMu.Unlock()
successProofsCh <- tree
}
}(sessionTree)
}

wg.Wait()
wg.Wait()
close(successProofsCh)
close(failedProofsCh)

// Convert channels to slices
for tree := range successProofsCh {
successProofs = append(successProofs, tree)
}
for tree := range failedProofsCh {
failedProofs = append(failedProofs, tree)
}

return successProofs, failedProofs
Expand Down

0 comments on commit 1486459

Please sign in to comment.