diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index 8d67ec7a10..657d4a5471 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -124,8 +124,10 @@ type baseProcessor struct { nonceOfFirstCommittedBlock core.OptionalUint64 extraDelayRequestBlockInfo time.Duration - proofsPool dataRetriever.ProofsPool - chanNextHeader chan bool + proofsPool dataRetriever.ProofsPool + mutRequestedAttestingNoncesMap sync.RWMutex + requestedAttestingNoncesMap map[string]uint64 + allProofsReceived chan bool } type bootStorerDataArgs struct { @@ -2340,56 +2342,96 @@ func (bp *baseProcessor) getHeaderHash(header data.HeaderHandler) ([]byte, error return bp.hasher.Compute(string(marshalledHeader)), nil } -func (bp *baseProcessor) checkProofRequestingNextHeaderBlockingIfMissing( +func (bp *baseProcessor) checkProofRequestingNextHeaderIfMissing( headerShard uint32, headerHash []byte, headerNonce uint64, -) error { +) { if bp.proofsPool.HasProof(headerShard, headerHash) { - return nil + return } - log.Trace("could not find proof for header, requesting the next one", + log.Debug("could not find proof for header, requesting the next one", "current hash", hex.EncodeToString(headerHash), "header shard", headerShard) - err := bp.requestNextHeaderBlocking(headerNonce+1, headerShard) - if err != nil { - return err - } - - if bp.proofsPool.HasProof(headerShard, headerHash) { - return nil - } - return fmt.Errorf("%w for header hash %s", process.ErrMissingHeaderProof, hex.EncodeToString(headerHash)) + bp.requestNextHeader(headerHash, headerNonce+1, headerShard) } -func (bp *baseProcessor) requestNextHeaderBlocking(nonce uint64, shardID uint32) error { - headersPool := bp.dataPool.Headers() - - _ = core.EmptyChannel(bp.chanNextHeader) +func (bp *baseProcessor) requestNextHeader(currentHeaderHash []byte, nonce uint64, shardID uint32) { + bp.mutRequestedAttestingNoncesMap.Lock() + bp.requestedAttestingNoncesMap[string(currentHeaderHash)] = nonce + bp.mutRequestedAttestingNoncesMap.Unlock() if shardID == core.MetachainShardId { go bp.requestHandler.RequestMetaHeaderByNonce(nonce) } else { go bp.requestHandler.RequestShardHeaderByNonce(shardID, nonce) } +} - err := bp.waitForNextHeader() - if err != nil { - return err +func (bp *baseProcessor) waitAllMissingProofs(waitTime time.Duration) error { + bp.mutRequestedAttestingNoncesMap.RLock() + isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0 + bp.mutRequestedAttestingNoncesMap.RUnlock() + if !isWaitingForProofs { + return nil } - _, _, err = process.GetShardHeaderFromPoolWithNonce(nonce, shardID, headersPool) - - return err -} - -func (bp *baseProcessor) waitForNextHeader() error { select { - case <-bp.chanNextHeader: + case <-bp.allProofsReceived: return nil - case <-time.After(bp.extraDelayRequestBlockInfo): + case <-time.After(waitTime): + bp.mutRequestedAttestingNoncesMap.RLock() + defer bp.mutRequestedAttestingNoncesMap.RUnlock() + + logMessage := "" + for hash := range bp.requestedAttestingNoncesMap { + logMessage += fmt.Sprintf("\nhash = %s", hex.EncodeToString([]byte(hash))) + } + + log.Debug("baseProcessor.waitAllMissingProofs still pending proofs for headers" + logMessage) + return process.ErrTimeIsOut } } + +func (bp *baseProcessor) checkReceivedHeaderIfAttestingIsNeeded(headerHandler data.HeaderHandler) { + if !common.ShouldBlockHavePrevProof(headerHandler, bp.enableEpochsHandler, common.EquivalentMessagesFlag) { + return + } + + bp.mutRequestedAttestingNoncesMap.RLock() + isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0 + bp.mutRequestedAttestingNoncesMap.RUnlock() + if !isWaitingForProofs { + return + } + + allProofsReceived := bp.checkReceivedHeaderAndUpdateMissingAttesting(headerHandler) + if allProofsReceived { + bp.allProofsReceived <- true + } +} + +func (bp *baseProcessor) checkReceivedHeaderAndUpdateMissingAttesting(headerHandler data.HeaderHandler) bool { + bp.mutRequestedAttestingNoncesMap.Lock() + defer bp.mutRequestedAttestingNoncesMap.Unlock() + + receivedShard := headerHandler.GetShardID() + prevHash := headerHandler.GetPrevHash() + _, isHeaderWithoutProof := bp.requestedAttestingNoncesMap[string(prevHash)] + if !isHeaderWithoutProof { + log.Debug("received header does not have previous hash any of the requested ones") + return len(bp.requestedAttestingNoncesMap) == 0 + } + + if !bp.proofsPool.HasProof(receivedShard, prevHash) { + log.Debug("received next header but proof is still missing", "hash", hex.EncodeToString(prevHash)) + return len(bp.requestedAttestingNoncesMap) == 0 + } + + delete(bp.requestedAttestingNoncesMap, string(prevHash)) + + return len(bp.requestedAttestingNoncesMap) == 0 +} diff --git a/process/block/metablock.go b/process/block/metablock.go index ca3ba86687..9f7c193fcf 100644 --- a/process/block/metablock.go +++ b/process/block/metablock.go @@ -343,7 +343,7 @@ func (mp *metaProcessor) ProcessBlock( } } - err = mp.checkProofsForShardData(header) + err = mp.checkProofsForShardData(header, haveTime()) if err != nil { return err } @@ -418,11 +418,16 @@ func (mp *metaProcessor) ProcessBlock( return nil } -func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error { +func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock, waitTime time.Duration) error { if !(mp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.Epoch) && header.GetNonce() > 1) { return nil } + mp.mutRequestedAttestingNoncesMap.Lock() + mp.requestedAttestingNoncesMap = make(map[string]uint64) + mp.mutRequestedAttestingNoncesMap.Unlock() + _ = core.EmptyChannel(mp.allProofsReceived) + for _, shardData := range header.ShardInfo { // TODO: consider the validation of the proof: // compare the one from proofsPool with what shardData.CurrentSignature and shardData.CurrentPubKeysBitmap hold @@ -437,10 +442,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error continue } - err := mp.checkProofRequestingNextHeaderBlockingIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce) - if err != nil { - return err - } + mp.checkProofRequestingNextHeaderIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce) if !common.ShouldBlockHavePrevProof(shardHeader.hdr, mp.enableEpochsHandler, common.EquivalentMessagesFlag) { continue @@ -464,7 +466,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error } } - return nil + return mp.waitAllMissingProofs(waitTime) } func (mp *metaProcessor) processEpochStartMetaBlock( @@ -2073,6 +2075,8 @@ func (mp *metaProcessor) receivedShardHeader(headerHandler data.HeaderHandler, s "hash", shardHeaderHash, ) + mp.checkReceivedHeaderIfAttestingIsNeeded(headerHandler) + mp.hdrsForCurrBlock.mutHdrsForBlock.Lock() haveMissingShardHeaders := mp.hdrsForCurrBlock.missingHdrs > 0 || mp.hdrsForCurrBlock.missingFinalityAttestingHdrs > 0 diff --git a/process/block/shardblock.go b/process/block/shardblock.go index 83af49a03d..e88dc71828 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -293,6 +293,11 @@ func (sp *shardProcessor) ProcessBlock( return process.ErrAccountStateDirty } + sp.mutRequestedAttestingNoncesMap.Lock() + sp.requestedAttestingNoncesMap = make(map[string]uint64) + sp.mutRequestedAttestingNoncesMap.Unlock() + _ = core.EmptyChannel(sp.allProofsReceived) + // check proofs for cross notarized metablocks for _, metaBlockHash := range header.GetMetaBlockHashes() { hInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] @@ -304,10 +309,12 @@ func (sp *shardProcessor) ProcessBlock( continue } - err = sp.checkProofRequestingNextHeaderBlockingIfMissing(core.MetachainShardId, metaBlockHash, hInfo.hdr.GetNonce()) - if err != nil { - return err - } + sp.checkProofRequestingNextHeaderIfMissing(core.MetachainShardId, metaBlockHash, hInfo.hdr.GetNonce()) + } + + err = sp.waitAllMissingProofs(haveTime()) + if err != nil { + return err } defer func() { @@ -487,10 +494,15 @@ func (sp *shardProcessor) checkEpochCorrectness( process.ErrEpochDoesNotMatch, header.GetEpoch(), sp.epochStartTrigger.MetaEpoch()) } + epochChangeConfirmed := sp.epochStartTrigger.EpochStartRound() < sp.epochStartTrigger.EpochFinalityAttestingRound() + if sp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.GetEpoch()) { + epochChangeConfirmed = sp.epochStartTrigger.EpochStartRound() <= sp.epochStartTrigger.EpochFinalityAttestingRound() + } + isOldEpochAndShouldBeNew := sp.epochStartTrigger.IsEpochStart() && header.GetRound() > sp.epochStartTrigger.EpochFinalityAttestingRound()+process.EpochChangeGracePeriod && header.GetEpoch() < sp.epochStartTrigger.MetaEpoch() && - sp.epochStartTrigger.EpochStartRound() < sp.epochStartTrigger.EpochFinalityAttestingRound() + epochChangeConfirmed if isOldEpochAndShouldBeNew { return fmt.Errorf("%w proposed header with epoch %d should be in epoch %d", process.ErrEpochDoesNotMatch, header.GetEpoch(), sp.epochStartTrigger.MetaEpoch()) @@ -1751,6 +1763,8 @@ func (sp *shardProcessor) receivedMetaBlock(headerHandler data.HeaderHandler, me "hash", metaBlockHash, ) + sp.checkReceivedHeaderIfAttestingIsNeeded(headerHandler) + sp.hdrsForCurrBlock.mutHdrsForBlock.Lock() haveMissingMetaHeaders := sp.hdrsForCurrBlock.missingHdrs > 0 || sp.hdrsForCurrBlock.missingFinalityAttestingHdrs > 0 diff --git a/process/sync/storageBootstrap/baseStorageBootstrapper.go b/process/sync/storageBootstrap/baseStorageBootstrapper.go index bb013cb50a..e2a063007a 100644 --- a/process/sync/storageBootstrap/baseStorageBootstrapper.go +++ b/process/sync/storageBootstrap/baseStorageBootstrapper.go @@ -456,6 +456,7 @@ func (st *storageBootstrapper) applyBlock(headerHash []byte, header data.HeaderH st.forkDetector.AddCheckpoint(header.GetNonce(), header.GetRound(), headerHash) if header.GetShardID() == core.MetachainShardId || !check.IfNilReflect(header.GetPreviousProof()) { st.forkDetector.SetFinalToLastCheckpoint() + st.forkDetector.ResetProbableHighestNonce() } return nil }