From 1486459a6399086e476c5ce27a1f02d09bee74d8 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Thu, 30 Jan 2025 04:38:12 +0100 Subject: [PATCH] better concurrency --- pkg/relayer/session/claim.go | 18 ++++++++++++++++-- pkg/relayer/session/proof.go | 31 +++++++++++++++++++------------ 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go index 2278c9cd8..22f12d6ce 100644 --- a/pkg/relayer/session/claim.go +++ b/pkg/relayer/session/claim.go @@ -233,6 +233,10 @@ func (rs *relayerSessionsManager) goCreateClaimRoots( ) { failedClaims := []relayer.SessionTree{} flushedClaims := []relayer.SessionTree{} + + failedClaimsCh := make(chan relayer.SessionTree, len(sessionTrees)) + flushedClaimsCh := make(chan relayer.SessionTree, len(sessionTrees)) + wg := sync.WaitGroup{} sem := make(chan struct{}, runtime.NumCPU()) for _, sessionTree := range sessionTrees { @@ -251,15 +255,25 @@ func (rs *relayerSessionsManager) goCreateClaimRoots( // This session should no longer be updated if _, err := tree.Flush(); err != nil { rs.logger.Error().Err(err).Msg("failed to flush session") - failedClaims = append(failedClaims, tree) + failedClaimsCh <- tree return } - flushedClaims = append(flushedClaims, tree) + flushedClaimsCh <- tree }(sessionTree) } wg.Wait() + close(failedClaimsCh) + close(flushedClaimsCh) + + for failedClaim := range failedClaimsCh { + failedClaims = append(failedClaims, failedClaim) + } + + for flushedClaim := range flushedClaimsCh { + flushedClaims = append(flushedClaims, flushedClaim) + } failSubmitProofsSessionsCh <- failedClaims claimsFlushedCh <- flushedClaims diff --git a/pkg/relayer/session/proof.go b/pkg/relayer/session/proof.go index 075019771..3e9dfbef4 100644 --- a/pkg/relayer/session/proof.go +++ b/pkg/relayer/session/proof.go @@ -214,10 +214,13 @@ func (rs *relayerSessionsManager) proveClaims( ) (successProofs []relayer.SessionTree, failedProofs []relayer.SessionTree) { logger := rs.logger.With("method", "proveClaims") - proofsMu := sync.Mutex{} - wg := sync.WaitGroup{} + wg := &sync.WaitGroup{} sem := make(chan struct{}, runtime.NumCPU()) + // Create buffered channels for collecting results + successProofsCh := make(chan relayer.SessionTree, len(sessionTrees)) + failedProofsCh := make(chan relayer.SessionTree, len(sessionTrees)) + for _, sessionTree := range sessionTrees { sem <- struct{}{} wg.Add(1) @@ -231,9 +234,7 @@ func (rs *relayerSessionsManager) proveClaims( // do not create the claim since the proof requirement is unknown. // WARNING: Creating a claim and not submitting a proof (if necessary) could lead to a stake burn!! if err != nil { - proofsMu.Lock() - failedProofs = append(failedProofs, tree) - proofsMu.Unlock() + failedProofsCh <- tree rs.logger.Error().Err(err).Msg("failed to determine if proof is required, skipping claim creation") return } @@ -249,22 +250,28 @@ func (rs *relayerSessionsManager) proveClaims( // If the proof cannot be generated, add the sessionTree to the failedProofs. if _, err := tree.ProveClosest(path); err != nil { - proofsMu.Lock() - failedProofs = append(failedProofs, tree) - proofsMu.Unlock() + failedProofsCh <- tree logger.Error().Err(err).Msg("failed to generate proof") return } // If the proof was generated successfully, add the sessionTree to the // successProofs slice that will be sent to the proof submission step. - proofsMu.Lock() - successProofs = append(successProofs, tree) - proofsMu.Unlock() + successProofsCh <- tree } }(sessionTree) + } - wg.Wait() + wg.Wait() + close(successProofsCh) + close(failedProofsCh) + + // Convert channels to slices + for tree := range successProofsCh { + successProofs = append(successProofs, tree) + } + for tree := range failedProofsCh { + failedProofs = append(failedProofs, tree) } return successProofs, failedProofs