Skip to content

Commit

Permalink
Merge branch 'fix_time_out_error_on_sync' of https://github.com/multi…
Browse files Browse the repository at this point in the history
…versx/mx-chain-go into test_feat/equivalent-messages
  • Loading branch information
sstanculeanu committed Feb 11, 2025
2 parents 763f668 + bf92b84 commit 5c85e14
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 42 deletions.
102 changes: 72 additions & 30 deletions process/block/baseProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ type baseProcessor struct {
nonceOfFirstCommittedBlock core.OptionalUint64
extraDelayRequestBlockInfo time.Duration

proofsPool dataRetriever.ProofsPool
chanNextHeader chan bool
proofsPool dataRetriever.ProofsPool
mutRequestedAttestingNoncesMap sync.RWMutex
requestedAttestingNoncesMap map[string]uint64
allProofsReceived chan bool
}

type bootStorerDataArgs struct {
Expand Down Expand Up @@ -2340,56 +2342,96 @@ func (bp *baseProcessor) getHeaderHash(header data.HeaderHandler) ([]byte, error
return bp.hasher.Compute(string(marshalledHeader)), nil
}

func (bp *baseProcessor) checkProofRequestingNextHeaderBlockingIfMissing(
func (bp *baseProcessor) checkProofRequestingNextHeaderIfMissing(
headerShard uint32,
headerHash []byte,
headerNonce uint64,
) error {
) {
if bp.proofsPool.HasProof(headerShard, headerHash) {
return nil
return
}

log.Trace("could not find proof for header, requesting the next one",
log.Debug("could not find proof for header, requesting the next one",
"current hash", hex.EncodeToString(headerHash),
"header shard", headerShard)
err := bp.requestNextHeaderBlocking(headerNonce+1, headerShard)
if err != nil {
return err
}

if bp.proofsPool.HasProof(headerShard, headerHash) {
return nil
}

return fmt.Errorf("%w for header hash %s", process.ErrMissingHeaderProof, hex.EncodeToString(headerHash))
bp.requestNextHeader(headerHash, headerNonce+1, headerShard)
}

func (bp *baseProcessor) requestNextHeaderBlocking(nonce uint64, shardID uint32) error {
headersPool := bp.dataPool.Headers()

_ = core.EmptyChannel(bp.chanNextHeader)
func (bp *baseProcessor) requestNextHeader(currentHeaderHash []byte, nonce uint64, shardID uint32) {
bp.mutRequestedAttestingNoncesMap.Lock()
bp.requestedAttestingNoncesMap[string(currentHeaderHash)] = nonce
bp.mutRequestedAttestingNoncesMap.Unlock()

if shardID == core.MetachainShardId {
go bp.requestHandler.RequestMetaHeaderByNonce(nonce)
} else {
go bp.requestHandler.RequestShardHeaderByNonce(shardID, nonce)
}
}

err := bp.waitForNextHeader()
if err != nil {
return err
func (bp *baseProcessor) waitAllMissingProofs(waitTime time.Duration) error {
bp.mutRequestedAttestingNoncesMap.RLock()
isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0
bp.mutRequestedAttestingNoncesMap.RUnlock()
if !isWaitingForProofs {
return nil
}

_, _, err = process.GetShardHeaderFromPoolWithNonce(nonce, shardID, headersPool)

return err
}

func (bp *baseProcessor) waitForNextHeader() error {
select {
case <-bp.chanNextHeader:
case <-bp.allProofsReceived:
return nil
case <-time.After(bp.extraDelayRequestBlockInfo):
case <-time.After(waitTime):
bp.mutRequestedAttestingNoncesMap.RLock()
defer bp.mutRequestedAttestingNoncesMap.RUnlock()

logMessage := ""
for hash := range bp.requestedAttestingNoncesMap {
logMessage += fmt.Sprintf("\nhash = %s", hex.EncodeToString([]byte(hash)))
}

log.Debug("baseProcessor.waitAllMissingProofs still pending proofs for headers" + logMessage)

return process.ErrTimeIsOut
}
}

func (bp *baseProcessor) checkReceivedHeaderIfAttestingIsNeeded(headerHandler data.HeaderHandler) {
if !common.ShouldBlockHavePrevProof(headerHandler, bp.enableEpochsHandler, common.EquivalentMessagesFlag) {
return
}

bp.mutRequestedAttestingNoncesMap.RLock()
isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0
bp.mutRequestedAttestingNoncesMap.RUnlock()
if !isWaitingForProofs {
return
}

allProofsReceived := bp.checkReceivedHeaderAndUpdateMissingAttesting(headerHandler)
if allProofsReceived {
bp.allProofsReceived <- true
}
}

func (bp *baseProcessor) checkReceivedHeaderAndUpdateMissingAttesting(headerHandler data.HeaderHandler) bool {
bp.mutRequestedAttestingNoncesMap.Lock()
defer bp.mutRequestedAttestingNoncesMap.Unlock()

receivedShard := headerHandler.GetShardID()
prevHash := headerHandler.GetPrevHash()
_, isHeaderWithoutProof := bp.requestedAttestingNoncesMap[string(prevHash)]
if !isHeaderWithoutProof {
log.Debug("received header does not have previous hash any of the requested ones")
return len(bp.requestedAttestingNoncesMap) == 0
}

if !bp.proofsPool.HasProof(receivedShard, prevHash) {
log.Debug("received next header but proof is still missing", "hash", hex.EncodeToString(prevHash))
return len(bp.requestedAttestingNoncesMap) == 0
}

delete(bp.requestedAttestingNoncesMap, string(prevHash))

return len(bp.requestedAttestingNoncesMap) == 0
}
18 changes: 11 additions & 7 deletions process/block/metablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (mp *metaProcessor) ProcessBlock(
}
}

err = mp.checkProofsForShardData(header)
err = mp.checkProofsForShardData(header, haveTime())
if err != nil {
return err
}
Expand Down Expand Up @@ -418,11 +418,16 @@ func (mp *metaProcessor) ProcessBlock(
return nil
}

func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error {
func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock, waitTime time.Duration) error {
if !(mp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.Epoch) && header.GetNonce() > 1) {
return nil
}

mp.mutRequestedAttestingNoncesMap.Lock()
mp.requestedAttestingNoncesMap = make(map[string]uint64)
mp.mutRequestedAttestingNoncesMap.Unlock()
_ = core.EmptyChannel(mp.allProofsReceived)

for _, shardData := range header.ShardInfo {
// TODO: consider the validation of the proof:
// compare the one from proofsPool with what shardData.CurrentSignature and shardData.CurrentPubKeysBitmap hold
Expand All @@ -437,10 +442,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
continue
}

err := mp.checkProofRequestingNextHeaderBlockingIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce)
if err != nil {
return err
}
mp.checkProofRequestingNextHeaderIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce)

if !common.ShouldBlockHavePrevProof(shardHeader.hdr, mp.enableEpochsHandler, common.EquivalentMessagesFlag) {
continue
Expand All @@ -464,7 +466,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
}
}

return nil
return mp.waitAllMissingProofs(waitTime)
}

func (mp *metaProcessor) processEpochStartMetaBlock(
Expand Down Expand Up @@ -2073,6 +2075,8 @@ func (mp *metaProcessor) receivedShardHeader(headerHandler data.HeaderHandler, s
"hash", shardHeaderHash,
)

mp.checkReceivedHeaderIfAttestingIsNeeded(headerHandler)

mp.hdrsForCurrBlock.mutHdrsForBlock.Lock()

haveMissingShardHeaders := mp.hdrsForCurrBlock.missingHdrs > 0 || mp.hdrsForCurrBlock.missingFinalityAttestingHdrs > 0
Expand Down
24 changes: 19 additions & 5 deletions process/block/shardblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ func (sp *shardProcessor) ProcessBlock(
return process.ErrAccountStateDirty
}

sp.mutRequestedAttestingNoncesMap.Lock()
sp.requestedAttestingNoncesMap = make(map[string]uint64)
sp.mutRequestedAttestingNoncesMap.Unlock()
_ = core.EmptyChannel(sp.allProofsReceived)

// check proofs for cross notarized metablocks
for _, metaBlockHash := range header.GetMetaBlockHashes() {
hInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)]
Expand All @@ -304,10 +309,12 @@ func (sp *shardProcessor) ProcessBlock(
continue
}

err = sp.checkProofRequestingNextHeaderBlockingIfMissing(core.MetachainShardId, metaBlockHash, hInfo.hdr.GetNonce())
if err != nil {
return err
}
sp.checkProofRequestingNextHeaderIfMissing(core.MetachainShardId, metaBlockHash, hInfo.hdr.GetNonce())
}

err = sp.waitAllMissingProofs(haveTime())
if err != nil {
return err
}

defer func() {
Expand Down Expand Up @@ -487,10 +494,15 @@ func (sp *shardProcessor) checkEpochCorrectness(
process.ErrEpochDoesNotMatch, header.GetEpoch(), sp.epochStartTrigger.MetaEpoch())
}

epochChangeConfirmed := sp.epochStartTrigger.EpochStartRound() < sp.epochStartTrigger.EpochFinalityAttestingRound()
if sp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.GetEpoch()) {
epochChangeConfirmed = sp.epochStartTrigger.EpochStartRound() <= sp.epochStartTrigger.EpochFinalityAttestingRound()
}

isOldEpochAndShouldBeNew := sp.epochStartTrigger.IsEpochStart() &&
header.GetRound() > sp.epochStartTrigger.EpochFinalityAttestingRound()+process.EpochChangeGracePeriod &&
header.GetEpoch() < sp.epochStartTrigger.MetaEpoch() &&
sp.epochStartTrigger.EpochStartRound() < sp.epochStartTrigger.EpochFinalityAttestingRound()
epochChangeConfirmed
if isOldEpochAndShouldBeNew {
return fmt.Errorf("%w proposed header with epoch %d should be in epoch %d",
process.ErrEpochDoesNotMatch, header.GetEpoch(), sp.epochStartTrigger.MetaEpoch())
Expand Down Expand Up @@ -1751,6 +1763,8 @@ func (sp *shardProcessor) receivedMetaBlock(headerHandler data.HeaderHandler, me
"hash", metaBlockHash,
)

sp.checkReceivedHeaderIfAttestingIsNeeded(headerHandler)

sp.hdrsForCurrBlock.mutHdrsForBlock.Lock()

haveMissingMetaHeaders := sp.hdrsForCurrBlock.missingHdrs > 0 || sp.hdrsForCurrBlock.missingFinalityAttestingHdrs > 0
Expand Down
1 change: 1 addition & 0 deletions process/sync/storageBootstrap/baseStorageBootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ func (st *storageBootstrapper) applyBlock(headerHash []byte, header data.HeaderH
st.forkDetector.AddCheckpoint(header.GetNonce(), header.GetRound(), headerHash)
if header.GetShardID() == core.MetachainShardId || !check.IfNilReflect(header.GetPreviousProof()) {
st.forkDetector.SetFinalToLastCheckpoint()
st.forkDetector.ResetProbableHighestNonce()
}
return nil
}
Expand Down

0 comments on commit 5c85e14

Please sign in to comment.