Skip to content

Commit

Permalink
fix: remove isolated unlock
Browse files Browse the repository at this point in the history
  • Loading branch information
red-0ne committed Jan 29, 2025
1 parent bb52fa6 commit a66280c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 31 deletions.
4 changes: 2 additions & 2 deletions load-testing/tests/relays_stress_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,14 +1556,14 @@ func (s *relaysSuite) forEachRelayBatchSendBatch(_ context.Context, relayBatchIn
// each sending relayRatePerApp relays per second.
relaysPerSec := len(relayBatchInfo.appAccounts) * int(s.relayRatePerApp)
// Determine the interval between each relay request.
relayInterval := time.Second / time.Duration(40)
relayInterval := time.Second / time.Duration(25)

batchWaitGroup := new(sync.WaitGroup)
batchWaitGroup.Add(relaysPerSec * int(blockDurationSec))

now := time.Now()

for i := 0; i < 24000; i++ {
for i := 0; i < 10000; i++ {
iterationTime := now.Add(time.Duration(i+1) * relayInterval)
batchLimiter.Go(s.ctx, func() {

Expand Down
54 changes: 25 additions & 29 deletions x/proof/keeper/validate_proofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ func (k Keeper) ValidateSubmittedProofs(ctx sdk.Context) (numValidProofs, numInv
// Iterate over proofs using an iterator to prevent OOM issues caused by bulk fetching.
proofIterator := k.GetAllProofsIterator(ctx)

proofValidationCoordinator := newProofValidationTaskCoordinator(numCPU)
proofValidationCoordinator := &proofValidationTaskCoordinator{
// Parallelize proof validation across CPU cores since they are independent from one another.
// Use semaphores to limit concurrent goroutines and prevent memory issues.
sem: make(chan struct{}, numCPU),
// Use a wait group to wait for all goroutines to finish before returning.
wg: &sync.WaitGroup{},

processedProofs: make(map[string][]string),
coordinatorMu: &sync.Mutex{},
}

for ; proofIterator.Valid(); proofIterator.Next() {
proofBz := proofIterator.Value()
Expand Down Expand Up @@ -94,17 +103,17 @@ func (k Keeper) ValidateSubmittedProofs(ctx sdk.Context) (numValidProofs, numInv
func (k Keeper) validateProof(
ctx context.Context,
proofBz []byte,
proofValidationCoordinator *proofValidationTaskCoordinator,
coordinator *proofValidationTaskCoordinator,
) {
sdkCtx := sdk.UnwrapSDKContext(ctx)

logger := k.Logger().With("method", "validateProof")

// Decrement the wait group when the goroutine finishes.
defer proofValidationCoordinator.wg.Done()
defer coordinator.wg.Done()

// Release the semaphore after the goroutine finishes which unblocks another one.
defer func() { <-proofValidationCoordinator.sem }()
defer func() { <-coordinator.sem }()

var proof types.Proof
// proofBz is not expected to fail unmarshalling since it is should have
Expand Down Expand Up @@ -169,8 +178,8 @@ func (k Keeper) validateProof(
}

// Protect the subsequent operations from concurrent access.
proofValidationCoordinator.coordinatorMu.Lock()
defer proofValidationCoordinator.coordinatorMu.Unlock()
coordinator.coordinatorMu.Lock()
defer coordinator.coordinatorMu.Unlock()

// Update the claim to reflect the validation result of the associated proof.
//
Expand All @@ -180,31 +189,18 @@ func (k Keeper) validateProof(
claim.ProofValidationStatus = proofStatus
k.UpsertClaim(ctx, claim)

// Update the counters
if proofStatus == types.ClaimProofStatus_INVALID {
proofValidationCoordinator.numInvalidProofs++
} else {
proofValidationCoordinator.numValidProofs++
}

// Update processed proofs
proofValidationCoordinator.processedProofs[supplierOperatorAddr] = append(
proofValidationCoordinator.processedProofs[supplierOperatorAddr],
// Collect the processed proofs info to delete them after the proofIterator is closed
// to prevent iterator invalidation.
coordinator.processedProofs[supplierOperatorAddr] = append(
coordinator.processedProofs[supplierOperatorAddr],
sessionHeader.GetSessionId(),
)
proofValidationCoordinator.coordinatorMu.Unlock()
}

// newProofValidationTaskCoordinator creates a new proofValidationTaskCoordinator
func newProofValidationTaskCoordinator(numWorkers int) *proofValidationTaskCoordinator {
return &proofValidationTaskCoordinator{
// Parallelize proof validation across CPU cores since they are independent from one another.
// Use semaphores to limit concurrent goroutines and prevent memory issues.
sem: make(chan struct{}, numCPU),
// Use a wait group to wait for all goroutines to finish before returning.
wg: &sync.WaitGroup{},

processedProofs: make(map[string][]string),
coordinatorMu: &sync.Mutex{},
if proofStatus == types.ClaimProofStatus_INVALID {
// Increment the number of invalid proofs.
coordinator.numInvalidProofs++
} else {
// Increment the number of valid proofs.
coordinator.numValidProofs++
}
}

0 comments on commit a66280c

Please sign in to comment.