Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Test branch for equivalent messages with activation @ epoch 4 #6751

Draft
wants to merge 28 commits into
base: feat/equivalent-messages
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
da100f8
equivalent messages activation @ epoch 4
sstanculeanu Jan 29, 2025
7804345
Merge branch 'feat/equivalent-messages' into test_feat/equivalent-mes…
sstanculeanu Jan 30, 2025
40beff7
Merge branch 'feat/equivalent-messages' into test_feat/equivalent-mes…
AdoAdoAdo Jan 31, 2025
48cb9e5
Merge branch 'feat/equivalent-messages' into test_feat/equivalent-mes…
sstanculeanu Jan 31, 2025
ada5aa6
Merge branch 'consensus_verifier_fix_verify_proof' of https://github.…
sstanculeanu Feb 3, 2025
86e2aed
Merge branch 'consensus_verifier_fix_verify_proof' of https://github.…
sstanculeanu Feb 3, 2025
229059d
Merge branch 'fix_missing_nodes_config_for_epoch' of https://github.c…
sstanculeanu Feb 4, 2025
4b24968
Merge branch 'bootstrap-fixes' of https://github.com/multiversx/mx-ch…
sstanculeanu Feb 6, 2025
d036f88
Merge branch 'bootstrap-fixes' of https://github.com/multiversx/mx-ch…
sstanculeanu Feb 7, 2025
2867207
Merge branch 'consensus-transition-speedup' into test_feat/equivalent…
AdoAdoAdo Feb 7, 2025
d4e89b7
update rounds per epoch
AdoAdoAdo Feb 7, 2025
aa29fdd
add logging for trigger activation
AdoAdoAdo Feb 10, 2025
b526ad9
Merge branch 'feat/equivalent-messages' into trigger-fixes
AdoAdoAdo Feb 10, 2025
651f03a
Merge branch 'trigger-fixes' into test_feat/equivalent-messages
AdoAdoAdo Feb 10, 2025
e0cdb85
fix check finality attesting headers with proofs
AdoAdoAdo Feb 10, 2025
dcac9c1
fix cyclic import
AdoAdoAdo Feb 10, 2025
763f668
Merge branch 'trigger-fixes' into test_feat/equivalent-messages
AdoAdoAdo Feb 10, 2025
2a0a6a1
fix writing on channel when a block is requested in order to attest t…
sstanculeanu Feb 11, 2025
1139d85
fixes after review
sstanculeanu Feb 11, 2025
32d381c
avoid unwanted logs
sstanculeanu Feb 11, 2025
fe0aace
cosmetic changes + empty chan
sstanculeanu Feb 11, 2025
ea0b8ef
no need for goroutines on checkProofRequestingNextHeaderIfMissing
sstanculeanu Feb 11, 2025
7bf2b24
do not write on chan under mut protection
sstanculeanu Feb 11, 2025
554398d
remove improper defer
sstanculeanu Feb 11, 2025
81f985c
- fix validation of grace period for transition to new epoch in shard.
AdoAdoAdo Feb 11, 2025
a69c70f
Merge branch 'trigger-fixes' of https://github.com/multiversx/mx-chai…
sstanculeanu Feb 11, 2025
bf92b84
fixes after review
sstanculeanu Feb 11, 2025
5c85e14
Merge branch 'fix_time_out_error_on_sync' of https://github.com/multi…
sstanculeanu Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
# ChainParametersByEpoch defines chain operation configurable values that can be modified based on epochs
ChainParametersByEpoch = [
{ EnableEpoch = 0, RoundDuration = 6000, ShardConsensusGroupSize = 7, ShardMinNumNodes = 10, MetachainConsensusGroupSize = 10, MetachainMinNumNodes = 10, Hysteresis = 0.2, Adaptivity = false },
{ EnableEpoch = 8, RoundDuration = 6000, ShardConsensusGroupSize = 10, ShardMinNumNodes = 10, MetachainConsensusGroupSize = 10, MetachainMinNumNodes = 10, Hysteresis = 0.2, Adaptivity = false }
{ EnableEpoch = 4, RoundDuration = 6000, ShardConsensusGroupSize = 10, ShardMinNumNodes = 10, MetachainConsensusGroupSize = 10, MetachainMinNumNodes = 10, Hysteresis = 0.2, Adaptivity = false }
]

[HardwareRequirements]
Expand Down Expand Up @@ -632,7 +632,7 @@
[EpochStartConfig]
GenesisEpoch = 0
MinRoundsBetweenEpochs = 20
RoundsPerEpoch = 200
RoundsPerEpoch = 100
# Min and Max ShuffledOutRestartThreshold represents the minimum and maximum duration of an epoch (in percentage) after a node which
# has been shuffled out has to restart its process in order to start in a new shard
MinShuffledOutRestartThreshold = 0.05
Expand Down
4 changes: 2 additions & 2 deletions cmd/node/config/enableEpochs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@
RelayedTransactionsV3EnableEpoch = 1

# EquivalentMessagesEnableEpoch represents the epoch when the equivalent messages are enabled
EquivalentMessagesEnableEpoch = 8 # the chain simulator tests for staking v4 fail if this is set earlier, as they test the transition in epochs 4-7
EquivalentMessagesEnableEpoch = 4 # the chain simulator tests for staking v4 fail if this is set earlier, as they test the transition in epochs 4-7

# FixedOrderInConsensusEnableEpoch represents the epoch when the fixed order in consensus is enabled
FixedOrderInConsensusEnableEpoch = 8 # the chain simulator tests for staking v4 fail if this is set earlier, as they test the transition in epochs 4-7
FixedOrderInConsensusEnableEpoch = 4 # the chain simulator tests for staking v4 fail if this is set earlier, as they test the transition in epochs 4-7

# BLSMultiSignerEnableEpoch represents the activation epoch for different types of BLS multi-signers
BLSMultiSignerEnableEpoch = [
Expand Down
12 changes: 12 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"

"github.com/multiversx/mx-chain-go/consensus"
)

Expand Down Expand Up @@ -84,3 +85,14 @@ func VerifyProofAgainstHeader(proof data.HeaderProofHandler, header data.HeaderH

return nil
}

// GetShardIDs returns a map of shard IDs based on the provided shard coordinator
func GetShardIDs(numShards uint32) map[uint32]struct{} {
shardIdentifiers := make(map[uint32]struct{})
for i := uint32(0); i < numShards; i++ {
shardIdentifiers[i] = struct{}{}
}
shardIdentifiers[core.MetachainShardId] = struct{}{}

return shardIdentifiers
}
12 changes: 1 addition & 11 deletions consensus/broadcast/delayedBroadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (dbb *delayedBlockBroadcaster) registerInterceptorsCallbackForShard(
rootTopic string,
cb func(topic string, hash []byte, data interface{}),
) error {
shardIDs := dbb.shardIdentifiers()
shardIDs := common.GetShardIDs(dbb.shardCoordinator.NumberOfShards())
for idx := range shardIDs {
// interested only in cross shard data
if idx == dbb.shardCoordinator.SelfId() {
Expand All @@ -603,16 +603,6 @@ func (dbb *delayedBlockBroadcaster) registerInterceptorsCallbackForShard(
return nil
}

func (dbb *delayedBlockBroadcaster) shardIdentifiers() map[uint32]struct{} {
shardIdentifiers := make(map[uint32]struct{})
for i := uint32(0); i < dbb.shardCoordinator.NumberOfShards(); i++ {
shardIdentifiers[i] = struct{}{}
}
shardIdentifiers[core.MetachainShardId] = struct{}{}

return shardIdentifiers
}

func (dbb *delayedBlockBroadcaster) interceptedHeader(_ string, headerHash []byte, header interface{}) {
headerHandler, ok := header.(data.HeaderHandler)
if !ok {
Expand Down
8 changes: 8 additions & 0 deletions epochStart/shardchain/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ func (t *trigger) receivedProof(headerProof data.HeaderProofHandler) {
return
}

log.Debug("received proof in trigger", "proof for header hash", headerProof.GetHeaderHash())
t.mutTrigger.Lock()
defer t.mutTrigger.Unlock()

Expand All @@ -584,6 +585,11 @@ func (t *trigger) receivedProof(headerProof data.HeaderProofHandler) {
// receivedMetaBlock is a callback function when a new metablock was received
// upon receiving checks if trigger can be updated
func (t *trigger) receivedMetaBlock(headerHandler data.HeaderHandler, metaBlockHash []byte) {
if headerHandler.GetShardID() != core.MetachainShardId {
return
}

log.Debug("received meta header in trigger", "header hash", metaBlockHash)
if t.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, headerHandler.GetEpoch()) {
proof, err := t.proofsPool.GetProof(headerHandler.GetShardID(), metaBlockHash)
if err != nil {
Expand All @@ -607,10 +613,12 @@ func (t *trigger) checkMetaHeaderForEpochTriggerEquivalentProofs(headerHandler d
if !ok {
return
}
log.Debug("trigger.checkMetaHeaderForEpochTriggerEquivalentProofs", "metaHdr epoch", metaHdr.GetEpoch(), "metaBlockHash", metaBlockHash)
if !t.shouldUpdateTrigger(metaHdr, metaBlockHash) {
return
}

log.Debug("trigger.updateTriggerHeaderData")
t.updateTriggerHeaderData(metaHdr, metaBlockHash)
t.updateTriggerFromMeta()
}
Expand Down
147 changes: 115 additions & 32 deletions process/block/baseProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ type baseProcessor struct {
nonceOfFirstCommittedBlock core.OptionalUint64
extraDelayRequestBlockInfo time.Duration

proofsPool dataRetriever.ProofsPool
chanNextHeader chan bool
proofsPool dataRetriever.ProofsPool
mutRequestedAttestingNoncesMap sync.RWMutex
requestedAttestingNoncesMap map[string]uint64
allProofsReceived chan bool
}

type bootStorerDataArgs struct {
Expand Down Expand Up @@ -666,10 +668,51 @@ func (bp *baseProcessor) verifyFees(header data.HeaderHandler) error {
return nil
}

func (bp *baseProcessor) filterHeadersWithoutProofs() (map[string]*hdrInfo, error) {
removedNonces := make(map[uint32]map[uint64]struct{})
noncesWithProofs := make(map[uint32]map[uint64]struct{})
shardIDs := common.GetShardIDs(bp.shardCoordinator.NumberOfShards())
for shard := range shardIDs {
removedNonces[shard] = make(map[uint64]struct{})
noncesWithProofs[shard] = make(map[uint64]struct{})
}
filteredHeadersInfo := make(map[string]*hdrInfo)

for hdrHash, headerInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo {
if bp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, headerInfo.hdr.GetEpoch()) {
if bp.hasMissingProof(headerInfo, hdrHash) {
removedNonces[headerInfo.hdr.GetShardID()][headerInfo.hdr.GetNonce()] = struct{}{}
continue
}

noncesWithProofs[headerInfo.hdr.GetShardID()][headerInfo.hdr.GetNonce()] = struct{}{}
filteredHeadersInfo[hdrHash] = bp.hdrsForCurrBlock.hdrHashAndInfo[hdrHash]
continue
}

filteredHeadersInfo[hdrHash] = bp.hdrsForCurrBlock.hdrHashAndInfo[hdrHash]
}

for shard, nonces := range removedNonces {
for nonce := range nonces {
if _, ok := noncesWithProofs[shard][nonce]; !ok {
return nil, fmt.Errorf("%w for shard %d and nonce %d", process.ErrMissingHeaderProof, shard, nonce)
}
}
}

return filteredHeadersInfo, nil
}

func (bp *baseProcessor) computeHeadersForCurrentBlock(usedInBlock bool) (map[uint32][]data.HeaderHandler, error) {
hdrsForCurrentBlock := make(map[uint32][]data.HeaderHandler)

for hdrHash, headerInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo {
hdrHashAndInfo, err := bp.filterHeadersWithoutProofs()
if err != nil {
return nil, err
}

for hdrHash, headerInfo := range hdrHashAndInfo {
if headerInfo.usedInBlock != usedInBlock {
continue
}
Expand Down Expand Up @@ -747,7 +790,7 @@ func (bp *baseProcessor) sortHeaderHashesForCurrentBlockByNonce(usedInBlock bool
}

func (bp *baseProcessor) hasMissingProof(headerInfo *hdrInfo, hdrHash string) bool {
isFlagEnabledForHeader := bp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, headerInfo.hdr.GetEpoch()) && headerInfo.hdr.GetNonce() > 1
isFlagEnabledForHeader := bp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, headerInfo.hdr.GetEpoch()) && headerInfo.hdr.GetNonce() >= 1
if !isFlagEnabledForHeader {
return false
}
Expand Down Expand Up @@ -2299,56 +2342,96 @@ func (bp *baseProcessor) getHeaderHash(header data.HeaderHandler) ([]byte, error
return bp.hasher.Compute(string(marshalledHeader)), nil
}

func (bp *baseProcessor) checkProofRequestingNextHeaderBlockingIfMissing(
func (bp *baseProcessor) checkProofRequestingNextHeaderIfMissing(
headerShard uint32,
headerHash []byte,
headerNonce uint64,
) error {
) {
if bp.proofsPool.HasProof(headerShard, headerHash) {
return nil
return
}

log.Trace("could not find proof for header, requesting the next one",
log.Debug("could not find proof for header, requesting the next one",
"current hash", hex.EncodeToString(headerHash),
"header shard", headerShard)
err := bp.requestNextHeaderBlocking(headerNonce+1, headerShard)
if err != nil {
return err
}

if bp.proofsPool.HasProof(headerShard, headerHash) {
return nil
}

return fmt.Errorf("%w for header hash %s", process.ErrMissingHeaderProof, hex.EncodeToString(headerHash))
bp.requestNextHeader(headerHash, headerNonce+1, headerShard)
}

func (bp *baseProcessor) requestNextHeaderBlocking(nonce uint64, shardID uint32) error {
headersPool := bp.dataPool.Headers()

_ = core.EmptyChannel(bp.chanNextHeader)
func (bp *baseProcessor) requestNextHeader(currentHeaderHash []byte, nonce uint64, shardID uint32) {
bp.mutRequestedAttestingNoncesMap.Lock()
bp.requestedAttestingNoncesMap[string(currentHeaderHash)] = nonce
bp.mutRequestedAttestingNoncesMap.Unlock()

if shardID == core.MetachainShardId {
go bp.requestHandler.RequestMetaHeaderByNonce(nonce)
} else {
go bp.requestHandler.RequestShardHeaderByNonce(shardID, nonce)
}
}

err := bp.waitForNextHeader()
if err != nil {
return err
func (bp *baseProcessor) waitAllMissingProofs(waitTime time.Duration) error {
bp.mutRequestedAttestingNoncesMap.RLock()
isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0
bp.mutRequestedAttestingNoncesMap.RUnlock()
if !isWaitingForProofs {
return nil
}

_, _, err = process.GetShardHeaderFromPoolWithNonce(nonce, shardID, headersPool)

return err
}

func (bp *baseProcessor) waitForNextHeader() error {
select {
case <-bp.chanNextHeader:
case <-bp.allProofsReceived:
return nil
case <-time.After(bp.extraDelayRequestBlockInfo):
case <-time.After(waitTime):
bp.mutRequestedAttestingNoncesMap.RLock()
defer bp.mutRequestedAttestingNoncesMap.RUnlock()

logMessage := ""
for hash := range bp.requestedAttestingNoncesMap {
logMessage += fmt.Sprintf("\nhash = %s", hex.EncodeToString([]byte(hash)))
}

log.Debug("baseProcessor.waitAllMissingProofs still pending proofs for headers" + logMessage)

return process.ErrTimeIsOut
}
}

func (bp *baseProcessor) checkReceivedHeaderIfAttestingIsNeeded(headerHandler data.HeaderHandler) {
if !common.ShouldBlockHavePrevProof(headerHandler, bp.enableEpochsHandler, common.EquivalentMessagesFlag) {
return
}

bp.mutRequestedAttestingNoncesMap.RLock()
isWaitingForProofs := len(bp.requestedAttestingNoncesMap) > 0
bp.mutRequestedAttestingNoncesMap.RUnlock()
if !isWaitingForProofs {
return
}

allProofsReceived := bp.checkReceivedHeaderAndUpdateMissingAttesting(headerHandler)
if allProofsReceived {
bp.allProofsReceived <- true
}
}

func (bp *baseProcessor) checkReceivedHeaderAndUpdateMissingAttesting(headerHandler data.HeaderHandler) bool {
bp.mutRequestedAttestingNoncesMap.Lock()
defer bp.mutRequestedAttestingNoncesMap.Unlock()

receivedShard := headerHandler.GetShardID()
prevHash := headerHandler.GetPrevHash()
_, isHeaderWithoutProof := bp.requestedAttestingNoncesMap[string(prevHash)]
if !isHeaderWithoutProof {
log.Debug("received header does not have previous hash any of the requested ones")
return len(bp.requestedAttestingNoncesMap) == 0
}

if !bp.proofsPool.HasProof(receivedShard, prevHash) {
log.Debug("received next header but proof is still missing", "hash", hex.EncodeToString(prevHash))
return len(bp.requestedAttestingNoncesMap) == 0
}

delete(bp.requestedAttestingNoncesMap, string(prevHash))

return len(bp.requestedAttestingNoncesMap) == 0
}
18 changes: 11 additions & 7 deletions process/block/metablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (mp *metaProcessor) ProcessBlock(
}
}

err = mp.checkProofsForShardData(header)
err = mp.checkProofsForShardData(header, haveTime())
if err != nil {
return err
}
Expand Down Expand Up @@ -418,11 +418,16 @@ func (mp *metaProcessor) ProcessBlock(
return nil
}

func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error {
func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock, waitTime time.Duration) error {
if !(mp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.Epoch) && header.GetNonce() > 1) {
return nil
}

mp.mutRequestedAttestingNoncesMap.Lock()
mp.requestedAttestingNoncesMap = make(map[string]uint64)
mp.mutRequestedAttestingNoncesMap.Unlock()
_ = core.EmptyChannel(mp.allProofsReceived)

for _, shardData := range header.ShardInfo {
// TODO: consider the validation of the proof:
// compare the one from proofsPool with what shardData.CurrentSignature and shardData.CurrentPubKeysBitmap hold
Expand All @@ -437,10 +442,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
continue
}

err := mp.checkProofRequestingNextHeaderBlockingIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce)
if err != nil {
return err
}
mp.checkProofRequestingNextHeaderIfMissing(shardData.ShardID, shardData.HeaderHash, shardData.Nonce)

if !common.ShouldBlockHavePrevProof(shardHeader.hdr, mp.enableEpochsHandler, common.EquivalentMessagesFlag) {
continue
Expand All @@ -464,7 +466,7 @@ func (mp *metaProcessor) checkProofsForShardData(header *block.MetaBlock) error
}
}

return nil
return mp.waitAllMissingProofs(waitTime)
}

func (mp *metaProcessor) processEpochStartMetaBlock(
Expand Down Expand Up @@ -2073,6 +2075,8 @@ func (mp *metaProcessor) receivedShardHeader(headerHandler data.HeaderHandler, s
"hash", shardHeaderHash,
)

mp.checkReceivedHeaderIfAttestingIsNeeded(headerHandler)

mp.hdrsForCurrBlock.mutHdrsForBlock.Lock()

haveMissingShardHeaders := mp.hdrsForCurrBlock.missingHdrs > 0 || mp.hdrsForCurrBlock.missingFinalityAttestingHdrs > 0
Expand Down
Loading