Skip to content

Commit

Permalink
fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
sstanculeanu committed Feb 11, 2025
1 parent 2a0a6a1 commit 1139d85
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 51 deletions.
90 changes: 58 additions & 32 deletions process/block/baseProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/atomic"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/block"
Expand Down Expand Up @@ -125,9 +124,10 @@ type baseProcessor struct {
nonceOfFirstCommittedBlock core.OptionalUint64
extraDelayRequestBlockInfo time.Duration

proofsPool dataRetriever.ProofsPool
chanNextHeader chan bool
isWaitingForNextHeader atomic.Flag
proofsPool dataRetriever.ProofsPool
mutRequestedAttestingNoncesMap sync.RWMutex
requestedAttestingNoncesMap map[string]uint64
allProofsReceived chan bool
}

type bootStorerDataArgs struct {
Expand Down Expand Up @@ -2342,58 +2342,84 @@ func (bp *baseProcessor) getHeaderHash(header data.HeaderHandler) ([]byte, error
return bp.hasher.Compute(string(marshalledHeader)), nil
}

func (bp *baseProcessor) checkProofRequestingNextHeaderBlockingIfMissing(
func (bp *baseProcessor) checkProofRequestingNextHeaderIfMissing(
headerShard uint32,
headerHash []byte,
headerNonce uint64,
) error {
) {
if bp.proofsPool.HasProof(headerShard, headerHash) {
return nil
return
}

log.Debug("could not find proof for header, requesting the next one",
"current hash", hex.EncodeToString(headerHash),
"header shard", headerShard)
err := bp.requestNextHeaderBlocking(headerNonce+1, headerShard)
if err != nil {
return err
}

if bp.proofsPool.HasProof(headerShard, headerHash) {
return nil
}

return fmt.Errorf("%w for header hash %s", process.ErrMissingHeaderProof, hex.EncodeToString(headerHash))
bp.requestNextHeader(headerHash, headerNonce+1, headerShard)
}

func (bp *baseProcessor) requestNextHeaderBlocking(nonce uint64, shardID uint32) error {
headersPool := bp.dataPool.Headers()

_ = core.EmptyChannel(bp.chanNextHeader)
bp.isWaitingForNextHeader.SetValue(true)
func (bp *baseProcessor) requestNextHeader(currentHeaderHash []byte, nonce uint64, shardID uint32) {
bp.mutRequestedAttestingNoncesMap.Lock()
bp.requestedAttestingNoncesMap[string(currentHeaderHash)] = nonce
bp.mutRequestedAttestingNoncesMap.Unlock()

if shardID == core.MetachainShardId {
go bp.requestHandler.RequestMetaHeaderByNonce(nonce)
} else {
go bp.requestHandler.RequestShardHeaderByNonce(shardID, nonce)
}
}

err := bp.waitForNextHeader()
if err != nil {
return err
func (bp *baseProcessor) waitAllMissingProofs() error {
bp.mutRequestedAttestingNoncesMap.RLock()
isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0
bp.mutRequestedAttestingNoncesMap.RUnlock()
if !isWaitingForProofs {
return nil
}

_, _, err = process.GetShardHeaderFromPoolWithNonce(nonce, shardID, headersPool)

return err
}

func (bp *baseProcessor) waitForNextHeader() error {
select {
case <-bp.chanNextHeader:
bp.isWaitingForNextHeader.Reset()
case <-bp.allProofsReceived:
return nil
case <-time.After(bp.extraDelayRequestBlockInfo):
bp.mutRequestedAttestingNoncesMap.RLock()
defer bp.mutRequestedAttestingNoncesMap.RUnlock()

logMessage := ""
for hash := range bp.requestedAttestingNoncesMap {
logMessage += fmt.Sprintf("\nhash = %s", hex.EncodeToString([]byte(hash)))
}

log.Debug("baseProcessor.waitAllMissingProofs still pending proofs for headers" + logMessage)

return process.ErrTimeIsOut
}
}

func (bp *baseProcessor) checkReceivedHeaderAndUpdateMissingAttesting(headerHandler data.HeaderHandler) {
if !common.ShouldBlockHavePrevProof(headerHandler, bp.enableEpochsHandler, common.EquivalentMessagesFlag) {
return
}

bp.mutRequestedAttestingNoncesMap.Lock()
defer bp.mutRequestedAttestingNoncesMap.Unlock()

receivedShard := headerHandler.GetShardID()
prevHash := headerHandler.GetPrevHash()
_, isHeaderWithoutProof := bp.requestedAttestingNoncesMap[string(prevHash)]
if !isHeaderWithoutProof {
log.Debug("received header does not have previous hash any of the requested ones")
return
}

if !bp.proofsPool.HasProof(receivedShard, prevHash) {
log.Debug("received next header but proof is still missing", "hash", hex.EncodeToString(prevHash))
return
}

delete(bp.requestedAttestingNoncesMap, string(prevHash))

if len(bp.requestedAttestingNoncesMap) == 0 {
bp.allProofsReceived <- true
}
}
17 changes: 7 additions & 10 deletions process/block/metablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
return nil
}

mp.mutRequestedAttestingNoncesMap.Lock()
mp.requestedAttestingNoncesMap = make(map[string]uint64)
mp.mutRequestedAttestingNoncesMap.Unlock()

for _, shardData := range header.ShardInfo {
// TODO: consider the validation of the proof:
// compare the one from proofsPool with what shardData.CurrentSignature and shardData.CurrentPubKeysBitmap hold
Expand All @@ -437,10 +441,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
continue
}

err := mp.checkProofRequestingNextHeaderBlockingIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce)
if err != nil {
return err
}
go mp.checkProofRequestingNextHeaderIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce)

if !common.ShouldBlockHavePrevProof(shardHeader.hdr, mp.enableEpochsHandler, common.EquivalentMessagesFlag) {
continue
Expand All @@ -464,7 +465,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
}
}

return nil
return mp.waitAllMissingProofs()
}

func (mp *metaProcessor) processEpochStartMetaBlock(
Expand Down Expand Up @@ -2073,11 +2074,7 @@ func (mp *metaProcessor) receivedShardHeader(headerHandler data.HeaderHandler, s
"hash", shardHeaderHash,
)

if mp.isWaitingForNextHeader.IsSet() {
log.Trace("received shard header attesting the previous one")
mp.chanNextHeader <- true
return
}
mp.checkReceivedHeaderAndUpdateMissingAttesting(headerHandler)

mp.hdrsForCurrBlock.mutHdrsForBlock.Lock()

Expand Down
20 changes: 11 additions & 9 deletions process/block/shardblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ func (sp *shardProcessor) ProcessBlock(
return process.ErrAccountStateDirty
}

sp.mutRequestedAttestingNoncesMap.Lock()
sp.requestedAttestingNoncesMap = make(map[string]uint64)
sp.mutRequestedAttestingNoncesMap.Unlock()

// check proofs for cross notarized metablocks
for _, metaBlockHash := range header.GetMetaBlockHashes() {
hInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)]
Expand All @@ -304,10 +308,12 @@ func (sp *shardProcessor) ProcessBlock(
continue
}

err = sp.checkProofRequestingNextHeaderBlockingIfMissing(core.MetachainShardId, metaBlockHash, hInfo.hdr.GetNonce())
if err != nil {
return err
}
go sp.checkProofRequestingNextHeaderIfMissing(core.MetachainShardId, metaBlockHash, hInfo.hdr.GetNonce())
}

err = sp.waitAllMissingProofs()
if err != nil {
return err
}

defer func() {
Expand Down Expand Up @@ -1751,11 +1757,7 @@ func (sp *shardProcessor) receivedMetaBlock(headerHandler data.HeaderHandler, me
"hash", metaBlockHash,
)

if sp.isWaitingForNextHeader.IsSet() {
log.Trace("received meta header attesting the previous one")
sp.chanNextHeader <- true
return
}
sp.checkReceivedHeaderAndUpdateMissingAttesting(headerHandler)

sp.hdrsForCurrBlock.mutHdrsForBlock.Lock()

Expand Down

0 comments on commit 1139d85

Please sign in to comment.