Skip to content

Commit

Permalink
refactor: processBlocksRange() goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
rus-alex committed Nov 1, 2023
1 parent 19c852c commit f89d9bf
Showing 1 changed file with 60 additions and 37 deletions.
97 changes: 60 additions & 37 deletions gossip/store_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/inter/idx"
"github.com/Fantom-foundation/lachesis-base/kvdb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -154,15 +153,14 @@ func (s *Store) calculateUpgradeHeights() error {
}

func (s *Store) fixTxPositionBlockOffset() (err error) {
const parallels = 10
receiptsTable, _ := s.dbs.OpenDB("evm/r")
txPositionsTable, _ := s.dbs.OpenDB("evm/x")

// for each block's receipts
var (
wg sync.WaitGroup
items = new(uint32)
)

receiptsTable, _ := s.dbs.OpenDB("evm/r")
txPositionsTable, _ := s.dbs.OpenDB("evm/x")

processBlockReceipts := func(input <-chan []*types.ReceiptForStorage) {
defer wg.Done()
pos := new(evmstore.TxPosition)
Expand All @@ -181,46 +179,71 @@ func (s *Store) fixTxPositionBlockOffset() (err error) {
}
}

wg.Add(parallels)
threads := make([]chan []*types.ReceiptForStorage, parallels)
for i := range threads {
threads[i] = make(chan []*types.ReceiptForStorage, 10)
go processBlockReceipts(threads[i])
}

// for each block
var (
block idx.Block
start = time.Now()
prevFlushTime = time.Now()
blocks = new(uint32)
)
it := receiptsTable.NewIterator(nil, nil)
defer it.Release()
for n := 0; it.Next(); n++ {
block = idx.BytesToBlock(it.Key())

var receiptsStorage []*types.ReceiptForStorage
err := rlp.DecodeBytes(it.Value(), &receiptsStorage)
if err != nil {
s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value()))
processBlocksRange := func() {
defer wg.Done()
const (
parallels = 10
)
wg.Add(parallels)
threads := make([]chan []*types.ReceiptForStorage, parallels)
for i := range threads {
threads[i] = make(chan []*types.ReceiptForStorage, 10)
go processBlockReceipts(threads[i])
}
threads[n%parallels] <- receiptsStorage

if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod {
prevFlushTime = time.Now()
err = s.flushDBs()
it := receiptsTable.NewIterator(nil, nil)
defer it.Release()
for n := 0; it.Next(); n++ {
atomic.AddUint32(blocks, 1)

var receiptsStorage []*types.ReceiptForStorage
err := rlp.DecodeBytes(it.Value(), &receiptsStorage)
if err != nil {
break
s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value()))
}
s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "block", block, "items", atomic.LoadUint32(items))
threads[n%parallels] <- receiptsStorage
}
for i := range threads {
close(threads[i])
}
}
for i := range threads {
close(threads[i])
}
wg.Wait()
// no need to flush dbs at end as it migration engine does

s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "block", block, "items", *items)
// status log
var (
done = make(chan struct{})
)
go func() {
var (
start = time.Now()
prevFlushTime = time.Now()
)
for again := true; again; {
select {
case <-time.After(s.cfg.MaxNonFlushedPeriod / 5):
again = true
case <-done:
again = false
}
s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "blocks", atomic.LoadUint32(blocks), "items", atomic.LoadUint32(items))
if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod {
prevFlushTime = time.Now()
err = s.flushDBs()
if err != nil {
break
}
}
}
}()

// main start
wg.Add(1)
go processBlocksRange()
wg.Wait()
close(done)

return
}

0 comments on commit f89d9bf

Please sign in to comment.