Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix writing on channel when a block is requested in order to attest the current one #6794

Merged
merged 10 commits into from
Feb 12, 2025
101 changes: 69 additions & 32 deletions process/block/baseProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please optimize as well func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) to request needed blocks and verify the shard data in parallel

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/atomic"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/block"
Expand Down Expand Up @@ -125,9 +124,10 @@ type baseProcessor struct {
nonceOfFirstCommittedBlock core.OptionalUint64
extraDelayRequestBlockInfo time.Duration

proofsPool dataRetriever.ProofsPool
chanNextHeader chan bool
isWaitingForNextHeader atomic.Flag
proofsPool dataRetriever.ProofsPool
mutRequestedAttestingNoncesMap sync.RWMutex
requestedAttestingNoncesMap map[string]uint64
allProofsReceived chan bool
}

type bootStorerDataArgs struct {
Expand Down Expand Up @@ -2342,58 +2342,95 @@ func (bp *baseProcessor) getHeaderHash(header data.HeaderHandler) ([]byte, error
return bp.hasher.Compute(string(marshalledHeader)), nil
}

func (bp *baseProcessor) checkProofRequestingNextHeaderBlockingIfMissing(
func (bp *baseProcessor) checkProofRequestingNextHeaderIfMissing(
headerShard uint32,
headerHash []byte,
headerNonce uint64,
) error {
) {
if bp.proofsPool.HasProof(headerShard, headerHash) {
return nil
return
}

log.Debug("could not find proof for header, requesting the next one",
"current hash", hex.EncodeToString(headerHash),
"header shard", headerShard)
err := bp.requestNextHeaderBlocking(headerNonce+1, headerShard)
if err != nil {
return err
}

if bp.proofsPool.HasProof(headerShard, headerHash) {
return nil
}

return fmt.Errorf("%w for header hash %s", process.ErrMissingHeaderProof, hex.EncodeToString(headerHash))
bp.requestNextHeader(headerHash, headerNonce+1, headerShard)
}

func (bp *baseProcessor) requestNextHeaderBlocking(nonce uint64, shardID uint32) error {
headersPool := bp.dataPool.Headers()

_ = core.EmptyChannel(bp.chanNextHeader)
bp.isWaitingForNextHeader.SetValue(true)
func (bp *baseProcessor) requestNextHeader(currentHeaderHash []byte, nonce uint64, shardID uint32) {
bp.mutRequestedAttestingNoncesMap.Lock()
bp.requestedAttestingNoncesMap[string(currentHeaderHash)] = nonce
bp.mutRequestedAttestingNoncesMap.Unlock()

if shardID == core.MetachainShardId {
go bp.requestHandler.RequestMetaHeaderByNonce(nonce)
} else {
go bp.requestHandler.RequestShardHeaderByNonce(shardID, nonce)
}
}

err := bp.waitForNextHeader()
if err != nil {
return err
func (bp *baseProcessor) waitAllMissingProofs() error {
bp.mutRequestedAttestingNoncesMap.RLock()
isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0
bp.mutRequestedAttestingNoncesMap.RUnlock()
if !isWaitingForProofs {
return nil
}

_, _, err = process.GetShardHeaderFromPoolWithNonce(nonce, shardID, headersPool)

return err
}

func (bp *baseProcessor) waitForNextHeader() error {
select {
case <-bp.chanNextHeader:
bp.isWaitingForNextHeader.Reset()
case <-bp.allProofsReceived:
return nil
case <-time.After(bp.extraDelayRequestBlockInfo):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check what happens in case of synced node processing in consensus
if this timeout is set properly for that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use the passed haveTime callback instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

bp.mutRequestedAttestingNoncesMap.RLock()
defer bp.mutRequestedAttestingNoncesMap.RUnlock()

logMessage := ""
for hash := range bp.requestedAttestingNoncesMap {
logMessage += fmt.Sprintf("\nhash = %s", hex.EncodeToString([]byte(hash)))
}

log.Debug("baseProcessor.waitAllMissingProofs still pending proofs for headers" + logMessage)

return process.ErrTimeIsOut
}
}

func (bp *baseProcessor) checkReceivedHeaderAndUpdateMissingAttesting(headerHandler data.HeaderHandler) {
if !common.ShouldBlockHavePrevProof(headerHandler, bp.enableEpochsHandler, common.EquivalentMessagesFlag) {
return
}

bp.mutRequestedAttestingNoncesMap.RLock()
isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0
bp.mutRequestedAttestingNoncesMap.RUnlock()
if !isWaitingForProofs {
return
}

bp.mutRequestedAttestingNoncesMap.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe extract the locked section into a method, then you can do a defer and cover future exit cases


receivedShard := headerHandler.GetShardID()
prevHash := headerHandler.GetPrevHash()
_, isHeaderWithoutProof := bp.requestedAttestingNoncesMap[string(prevHash)]
if !isHeaderWithoutProof {
log.Debug("received header does not have previous hash any of the requested ones")
bp.mutRequestedAttestingNoncesMap.Unlock()
return
}

if !bp.proofsPool.HasProof(receivedShard, prevHash) {
log.Debug("received next header but proof is still missing", "hash", hex.EncodeToString(prevHash))
bp.mutRequestedAttestingNoncesMap.Unlock()
return
}

delete(bp.requestedAttestingNoncesMap, string(prevHash))

allProofsReceived := len(bp.requestedAttestingNoncesMap) == 0
bp.mutRequestedAttestingNoncesMap.Unlock()

if allProofsReceived {
bp.allProofsReceived <- true
}
}
18 changes: 8 additions & 10 deletions process/block/metablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
return nil
}

mp.mutRequestedAttestingNoncesMap.Lock()
mp.requestedAttestingNoncesMap = make(map[string]uint64)
_ = core.EmptyChannel(mp.allProofsReceived)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outside of the locked section

mp.mutRequestedAttestingNoncesMap.Unlock()

for _, shardData := range header.ShardInfo {
// TODO: consider the validation of the proof:
// compare the one from proofsPool with what shardData.CurrentSignature and shardData.CurrentPubKeysBitmap hold
Expand All @@ -437,10 +442,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
continue
}

err := mp.checkProofRequestingNextHeaderBlockingIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce)
if err != nil {
return err
}
mp.checkProofRequestingNextHeaderIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce)

if !common.ShouldBlockHavePrevProof(shardHeader.hdr, mp.enableEpochsHandler, common.EquivalentMessagesFlag) {
continue
Expand All @@ -464,7 +466,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
}
}

return nil
return mp.waitAllMissingProofs()
}

func (mp *metaProcessor) processEpochStartMetaBlock(
Expand Down Expand Up @@ -2073,11 +2075,7 @@ func (mp *metaProcessor) receivedShardHeader(headerHandler data.HeaderHandler, s
"hash", shardHeaderHash,
)

if mp.isWaitingForNextHeader.IsSet() {
log.Trace("received shard header attesting the previous one")
mp.chanNextHeader <- true
return
}
mp.checkReceivedHeaderAndUpdateMissingAttesting(headerHandler)

mp.hdrsForCurrBlock.mutHdrsForBlock.Lock()

Expand Down
21 changes: 12 additions & 9 deletions process/block/shardblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ func (sp *shardProcessor) ProcessBlock(
return process.ErrAccountStateDirty
}

sp.mutRequestedAttestingNoncesMap.Lock()
sp.requestedAttestingNoncesMap = make(map[string]uint64)
_ = core.EmptyChannel(sp.allProofsReceived)
sp.mutRequestedAttestingNoncesMap.Unlock()

// check proofs for cross notarized metablocks
for _, metaBlockHash := range header.GetMetaBlockHashes() {
hInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)]
Expand All @@ -304,10 +309,12 @@ func (sp *shardProcessor) ProcessBlock(
continue
}

err = sp.checkProofRequestingNextHeaderBlockingIfMissing(core.MetachainShardId, metaBlockHash, hInfo.hdr.GetNonce())
if err != nil {
return err
}
sp.checkProofRequestingNextHeaderIfMissing(core.MetachainShardId, metaBlockHash, hInfo.hdr.GetNonce())
}

err = sp.waitAllMissingProofs()
if err != nil {
return err
}

defer func() {
Expand Down Expand Up @@ -1756,11 +1763,7 @@ func (sp *shardProcessor) receivedMetaBlock(headerHandler data.HeaderHandler, me
"hash", metaBlockHash,
)

if sp.isWaitingForNextHeader.IsSet() {
log.Trace("received meta header attesting the previous one")
sp.chanNextHeader <- true
return
}
sp.checkReceivedHeaderAndUpdateMissingAttesting(headerHandler)

sp.hdrsForCurrBlock.mutHdrsForBlock.Lock()

Expand Down