Skip to content

Commit

Permalink
epoch start shard header syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
laurci committed Dec 4, 2024
1 parent dd9cb7d commit 3888988
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 40 deletions.
60 changes: 20 additions & 40 deletions epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type epochStartBootstrap struct {
dataPool dataRetriever.PoolsHolder
miniBlocksSyncer epochStart.PendingMiniBlocksSyncHandler
headersSyncer epochStart.HeadersByHashSyncer
epochStartShardHeaderSyncer epochStart.PendingEpochStartShardHeaderSyncer
txSyncerForScheduled update.TransactionsSyncHandler
epochStartMetaBlockSyncer epochStart.StartOfEpochMetaSyncer
nodesConfigHandler StartOfEpochNodesConfigHandler
Expand Down Expand Up @@ -618,6 +619,16 @@ func (e *epochStartBootstrap) createSyncers() error {
return err
}

epochStartShardHeaderSyncerArgs := updateSync.ArgsNewPendingEpochStartShardHeaderSyncer{
HeadersPool: e.dataPool.Headers(),
Marshalizer: e.coreComponentsHolder.InternalMarshalizer(),
RequestHandler: e.requestHandler,
}
e.epochStartShardHeaderSyncer, err = updateSync.NewPendingEpochStartShardHeaderSyncer(epochStartShardHeaderSyncerArgs)
if err != nil {
return err
}

syncTxsArgs := updateSync.ArgsNewTransactionsSyncer{
DataPools: e.dataPool,
Storages: disabled.NewChainStorer(),
Expand Down Expand Up @@ -914,49 +925,18 @@ func (e *epochStartBootstrap) syncLatestEpochStartShardBlock(targetEpoch uint32,
return nil, nil, epochStart.ErrEpochStartDataForShardNotFound
}

type headerResult struct {
header data.HeaderHandler
hash []byte
}

waitForHeader := func(nonce uint64, shardId uint32, ch chan headerResult) {
for {
select {
case <-ch:
return
default:
hdrs, hashes, err := e.dataPool.Headers().GetHeadersByNonceAndShardId(nonce, shardId)
if err == nil && len(hdrs) > 0 {
ch <- headerResult{
header: hdrs[0],
hash: hashes[0],
}
return
}
}
}
e.epochStartShardHeaderSyncer.ClearFields()
err := e.epochStartShardHeaderSyncer.SyncEpochStartShardHeader(e.shardCoordinator.SelfId(), targetEpoch, prevEpochLatestFinalizedBlock.GetNonce(), ctx)
if err != nil {
return nil, nil, err
}

nonce := prevEpochLatestFinalizedBlock.GetNonce()
for {
e.requestHandler.RequestShardHeaderByNonce(e.shardCoordinator.SelfId(), nonce+1)

resultChan := make(chan headerResult)
defer close(resultChan)

go waitForHeader(nonce+1, e.shardCoordinator.SelfId(), resultChan)

select {
case <-ctx.Done():
return nil, nil, epochStart.ErrTimeoutWaitingForShardBlock
case res := <-resultChan:
if res.header.GetEpoch() == targetEpoch {
return res.header, res.hash, nil
} else {
nonce = res.header.GetNonce()
}
}
epochStartShardBlock, epochStartShardBlockHash, err := e.epochStartShardHeaderSyncer.GetEpochStartHeader()
if err != nil {
return nil, nil, err
}

return epochStartShardBlock, epochStartShardBlockHash, nil
}

func (e *epochStartBootstrap) requestAndProcessForShard(peerMiniBlocks []*block.MiniBlock) error {
Expand Down
8 changes: 8 additions & 0 deletions epochStart/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ type HeadersByHashSyncer interface {
IsInterfaceNil() bool
}

// PendingEpochStartShardHeaderSyncer defines the methods to sync pending epoch start shard headers
type PendingEpochStartShardHeaderSyncer interface {
SyncEpochStartShardHeader(shardId uint32, epoch uint32, startNonce uint64, ctx context.Context) error
GetEpochStartHeader() (data.HeaderHandler, []byte, error)
ClearFields()
IsInterfaceNil() bool
}

// PendingMiniBlocksSyncHandler defines the methods to sync all pending miniblocks
type PendingMiniBlocksSyncHandler interface {
SyncPendingMiniBlocks(miniBlockHeaders []data.MiniBlockHeaderHandler, ctx context.Context) error
Expand Down
3 changes: 3 additions & 0 deletions update/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ var ErrNilRequestersContainer = errors.New("nil requesters container")
// ErrNilCacher signals that nil cacher was provided
var ErrNilCacher = errors.New("nil cacher")

// ErrNilHeadersPool signals that nil headers pool was provided
var ErrNilHeadersPool = errors.New("nil headers pool")

// ErrNilEpochHandler signals that nil epoch handler was provided
var ErrNilEpochHandler = errors.New("nil epoch handler")

Expand Down
8 changes: 8 additions & 0 deletions update/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ type EpochStartPendingMiniBlocksSyncHandler interface {
IsInterfaceNil() bool
}

// PendingEpochStartShardHeaderSyncHandler defines the methods to sync pending epoch start shard headers
type PendingEpochStartShardHeaderSyncHandler interface {
SyncEpochStartShardHeader(shardId uint32, epoch uint32, startNonce uint64, ctx context.Context) error
GetEpochStartHeader() (data.HeaderHandler, []byte, error)
ClearFields()
IsInterfaceNil() bool
}

// TransactionsSyncHandler defines the methods to sync all transactions from a set of miniblocks
type TransactionsSyncHandler interface {
SyncTransactionsFor(miniBlocks map[string]*block.MiniBlock, epoch uint32, ctx context.Context) error
Expand Down
171 changes: 171 additions & 0 deletions update/sync/syncEpochStartShardHeaders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package sync

import (
"context"
"sync"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/update"
)

var _ update.PendingEpochStartShardHeaderSyncHandler = (*pendingEpochStartShardHeader)(nil)

type pendingEpochStartShardHeader struct {
mutPending sync.Mutex
epochStartHeader data.HeaderHandler
epochStartHash []byte
latestReceivedHeader data.HeaderHandler
latestReceivedHash []byte
targetEpoch uint32
targetShardId uint32
headersPool dataRetriever.HeadersPool
chReceived chan bool
chNew chan bool
marshalizer marshal.Marshalizer
stopSyncing bool
synced bool
requestHandler process.RequestHandler
waitTimeBetweenRequests time.Duration
}

// ArgsNewPendingMiniBlocksSyncer defines the arguments needed for the sycner
type ArgsNewPendingEpochStartShardHeaderSyncer struct {
HeadersPool dataRetriever.HeadersPool
Marshalizer marshal.Marshalizer
RequestHandler process.RequestHandler
}

// NewPendingMiniBlocksSyncer creates a syncer for all pending miniblocks
func NewPendingEpochStartShardHeaderSyncer(args ArgsNewPendingEpochStartShardHeaderSyncer) (*pendingEpochStartShardHeader, error) {
if check.IfNil(args.HeadersPool) {
return nil, update.ErrNilHeadersPool
}
if check.IfNil(args.Marshalizer) {
return nil, dataRetriever.ErrNilMarshalizer
}
if check.IfNil(args.RequestHandler) {
return nil, process.ErrNilRequestHandler
}

p := &pendingEpochStartShardHeader{
mutPending: sync.Mutex{},
epochStartHeader: nil,
epochStartHash: nil,
targetEpoch: 0,
targetShardId: 0,
headersPool: args.HeadersPool,
chReceived: make(chan bool),
chNew: make(chan bool),
requestHandler: args.RequestHandler,
stopSyncing: true,
synced: false,
marshalizer: args.Marshalizer,
waitTimeBetweenRequests: args.RequestHandler.RequestInterval(),
}

p.headersPool.RegisterHandler(p.receivedHeader)

return p, nil
}

// SyncEpochStartShardHeader will sync the epoch start header for a specific shard
func (p *pendingEpochStartShardHeader) SyncEpochStartShardHeader(shardId uint32, epoch uint32, startNonce uint64, ctx context.Context) error {
return p.syncEpochStartShardHeader(shardId, epoch, startNonce, ctx)
}

func (p *pendingEpochStartShardHeader) syncEpochStartShardHeader(shardId uint32, epoch uint32, startNonce uint64, ctx context.Context) error {
_ = core.EmptyChannel(p.chReceived)
_ = core.EmptyChannel(p.chNew)

p.mutPending.Lock()
p.stopSyncing = false
p.targetEpoch = epoch
p.targetShardId = shardId
p.mutPending.Unlock()

nonce := startNonce
for {
p.mutPending.Lock()
p.stopSyncing = false
p.requestHandler.RequestShardHeaderByNonce(shardId, nonce+1)
p.mutPending.Unlock()

select {
case <-p.chReceived:
p.mutPending.Lock()
p.stopSyncing = true
p.synced = true
p.mutPending.Unlock()
return nil
case <-p.chNew:
nonce = p.latestReceivedHeader.GetNonce()
continue
case <-ctx.Done():
p.mutPending.Lock()
p.stopSyncing = true
p.mutPending.Unlock()
return update.ErrTimeIsOut
}
}
}

// receivedHeader is a callback function when a new header was received
func (p *pendingEpochStartShardHeader) receivedHeader(header data.HeaderHandler, headerHash []byte) {
p.mutPending.Lock()
if p.stopSyncing {
p.mutPending.Unlock()
return
}

if header.GetShardID() != p.targetShardId {
p.mutPending.Unlock()
return
}

p.latestReceivedHash = headerHash
p.latestReceivedHeader = header

if header.GetEpoch() != p.targetEpoch || !header.IsStartOfEpochBlock() {
p.mutPending.Unlock()
p.chNew <- true
return
}

p.epochStartHash = headerHash
p.epochStartHeader = header
p.mutPending.Unlock()

p.chReceived <- true
}

// GetEpochStartHeader returns the synced epoch start header
func (p *pendingEpochStartShardHeader) GetEpochStartHeader() (data.HeaderHandler, []byte, error) {
p.mutPending.Lock()
defer p.mutPending.Unlock()

if !p.synced || p.epochStartHeader == nil || p.epochStartHash == nil {
return nil, nil, update.ErrNotSynced
}

return p.epochStartHeader, p.epochStartHash, nil
}

// ClearFields will reset the state
func (p *pendingEpochStartShardHeader) ClearFields() {
p.mutPending.Lock()
p.epochStartHash = nil
p.epochStartHeader = nil
p.synced = false
p.mutPending.Unlock()
}

// IsInterfaceNil returns nil if underlying object is nil
func (p *pendingEpochStartShardHeader) IsInterfaceNil() bool {
return p == nil
}

0 comments on commit 3888988

Please sign in to comment.