diff --git a/load-testing/tests/relays_stress_helpers_test.go b/load-testing/tests/relays_stress_helpers_test.go index 1abb26595..a3a74a21a 100644 --- a/load-testing/tests/relays_stress_helpers_test.go +++ b/load-testing/tests/relays_stress_helpers_test.go @@ -1556,14 +1556,14 @@ func (s *relaysSuite) forEachRelayBatchSendBatch(_ context.Context, relayBatchIn // each sending relayRatePerApp relays per second. relaysPerSec := len(relayBatchInfo.appAccounts) * int(s.relayRatePerApp) // Determine the interval between each relay request. - relayInterval := time.Second / time.Duration(40) + relayInterval := time.Second / time.Duration(25) batchWaitGroup := new(sync.WaitGroup) batchWaitGroup.Add(relaysPerSec * int(blockDurationSec)) now := time.Now() - for i := 0; i < 24000; i++ { + for i := 0; i < 10000; i++ { iterationTime := now.Add(time.Duration(i+1) * relayInterval) batchLimiter.Go(s.ctx, func() { diff --git a/x/proof/keeper/validate_proofs.go b/x/proof/keeper/validate_proofs.go index c2374d8df..0e085aafe 100644 --- a/x/proof/keeper/validate_proofs.go +++ b/x/proof/keeper/validate_proofs.go @@ -50,7 +50,16 @@ func (k Keeper) ValidateSubmittedProofs(ctx sdk.Context) (numValidProofs, numInv // Iterate over proofs using an iterator to prevent OOM issues caused by bulk fetching. proofIterator := k.GetAllProofsIterator(ctx) - proofValidationCoordinator := newProofValidationTaskCoordinator(numCPU) + proofValidationCoordinator := &proofValidationTaskCoordinator{ + // Parallelize proof validation across CPU cores since they are independent from one another. + // Use semaphores to limit concurrent goroutines and prevent memory issues. + sem: make(chan struct{}, numCPU), + // Use a wait group to wait for all goroutines to finish before returning. + wg: &sync.WaitGroup{}, + + processedProofs: make(map[string][]string), + coordinatorMu: &sync.Mutex{}, + } for ; proofIterator.Valid(); proofIterator.Next() { proofBz := proofIterator.Value() @@ -94,17 +103,17 @@ func (k Keeper) ValidateSubmittedProofs(ctx sdk.Context) (numValidProofs, numInv func (k Keeper) validateProof( ctx context.Context, proofBz []byte, - proofValidationCoordinator *proofValidationTaskCoordinator, + coordinator *proofValidationTaskCoordinator, ) { sdkCtx := sdk.UnwrapSDKContext(ctx) logger := k.Logger().With("method", "validateProof") // Decrement the wait group when the goroutine finishes. - defer proofValidationCoordinator.wg.Done() + defer coordinator.wg.Done() // Release the semaphore after the goroutine finishes which unblocks another one. - defer func() { <-proofValidationCoordinator.sem }() + defer func() { <-coordinator.sem }() var proof types.Proof // proofBz is not expected to fail unmarshalling since it is should have @@ -169,8 +178,8 @@ func (k Keeper) validateProof( } // Protect the subsequent operations from concurrent access. - proofValidationCoordinator.coordinatorMu.Lock() - defer proofValidationCoordinator.coordinatorMu.Unlock() + coordinator.coordinatorMu.Lock() + defer coordinator.coordinatorMu.Unlock() // Update the claim to reflect the validation result of the associated proof. // @@ -180,31 +189,18 @@ func (k Keeper) validateProof( claim.ProofValidationStatus = proofStatus k.UpsertClaim(ctx, claim) - // Update the counters - if proofStatus == types.ClaimProofStatus_INVALID { - proofValidationCoordinator.numInvalidProofs++ - } else { - proofValidationCoordinator.numValidProofs++ - } - - // Update processed proofs - proofValidationCoordinator.processedProofs[supplierOperatorAddr] = append( - proofValidationCoordinator.processedProofs[supplierOperatorAddr], + // Collect the processed proofs info to delete them after the proofIterator is closed + // to prevent iterator invalidation. + coordinator.processedProofs[supplierOperatorAddr] = append( + coordinator.processedProofs[supplierOperatorAddr], sessionHeader.GetSessionId(), ) - proofValidationCoordinator.coordinatorMu.Unlock() -} - -// newProofValidationTaskCoordinator creates a new proofValidationTaskCoordinator -func newProofValidationTaskCoordinator(numWorkers int) *proofValidationTaskCoordinator { - return &proofValidationTaskCoordinator{ - // Parallelize proof validation across CPU cores since they are independent from one another. - // Use semaphores to limit concurrent goroutines and prevent memory issues. - sem: make(chan struct{}, numCPU), - // Use a wait group to wait for all goroutines to finish before returning. - wg: &sync.WaitGroup{}, - processedProofs: make(map[string][]string), - coordinatorMu: &sync.Mutex{}, + if proofStatus == types.ClaimProofStatus_INVALID { + // Increment the number of invalid proofs. + coordinator.numInvalidProofs++ + } else { + // Increment the number of valid proofs. + coordinator.numValidProofs++ } }