Skip to content

Commit

Permalink
Merge pull request #435 from uprendis/feature/autocompact
Browse files Browse the repository at this point in the history
Automatically compact DB
  • Loading branch information
uprendis authored Mar 13, 2023
2 parents e585097 + c2cc618 commit cf51fea
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 27 deletions.
11 changes: 3 additions & 8 deletions cmd/opera/launcher/db-transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

"github.com/Fantom-foundation/go-opera/integration"
"github.com/Fantom-foundation/go-opera/utils"
"github.com/Fantom-foundation/go-opera/utils/dbutil/compactdb"
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
)

func dbTransform(ctx *cli.Context) error {
Expand Down Expand Up @@ -264,9 +264,9 @@ func transformComponent(datadir string, dbTypes, tmpDbTypes map[multidb.TypeName
return err
}
toMove[dbLocatorOf(e.New)] = true
newDB = batched.Wrap(newDB)
defer newDB.Close()
newHumanName := path.Join("tmp", string(e.New.Type), e.New.Name)
newDB = batched.Wrap(autocompact.Wrap2M(newDB, opt.GiB, 16*opt.GiB, true, newHumanName))
defer newDB.Close()
log.Info("Copying DB table", "req", e.Req, "old_db", oldHumanName, "old_table", e.Old.Table,
"new_db", newHumanName, "new_table", e.New.Table)
oldTable := utils.NewTableOrSelf(oldDB, []byte(e.Old.Table))
Expand All @@ -292,11 +292,6 @@ func transformComponent(datadir string, dbTypes, tmpDbTypes map[multidb.TypeName
keys = keys[:0]
values = values[:0]
}
err = compactdb.Compact(newTable, newHumanName, 16*opt.GiB)
if err != nil {
log.Error("Database compaction failed", "err", err)
return err
}
return nil
}()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/opera/launcher/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"gopkg.in/urfave/cli.v1"

"github.com/Fantom-foundation/go-opera/gossip"
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
)

var (
Expand Down Expand Up @@ -135,7 +136,7 @@ func exportEvmKeys(ctx *cli.Context) error {
if err != nil {
return err
}
keysDB := batched.Wrap(keysDB_)
keysDB := batched.Wrap(autocompact.Wrap2M(keysDB_, opt.GiB, 16*opt.GiB, true, "evm-keys"))
defer keysDB.Close()

it := gdb.EvmStore().EvmDb.NewIterator(nil, nil)
Expand Down
4 changes: 3 additions & 1 deletion gossip/apply_genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (

"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/kvdb/batched"
"github.com/syndtr/goleveldb/leveldb/opt"

"github.com/Fantom-foundation/go-opera/inter/iblockproc"
"github.com/Fantom-foundation/go-opera/inter/ibr"
"github.com/Fantom-foundation/go-opera/inter/ier"
"github.com/Fantom-foundation/go-opera/opera/genesis"
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
)

// ApplyGenesis writes initial state.
Expand Down Expand Up @@ -76,7 +78,7 @@ func (s *Store) ApplyGenesis(g genesis.Genesis) (genesisHash hash.Hash, err erro
func (s *Store) WrapTablesAsBatched() (unwrap func()) {
origTables := s.table

batchedBlocks := batched.Wrap(s.table.Blocks)
batchedBlocks := batched.Wrap(autocompact.Wrap2M(s.table.Blocks, opt.GiB, 16*opt.GiB, false, "blocks"))
s.table.Blocks = batchedBlocks

batchedBlockHashes := batched.Wrap(s.table.BlockHashes)
Expand Down
23 changes: 7 additions & 16 deletions gossip/evmstore/apply_genesis.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,28 @@
package evmstore

import (
"github.com/Fantom-foundation/lachesis-base/kvdb"
"github.com/Fantom-foundation/lachesis-base/kvdb/batched"
"github.com/syndtr/goleveldb/leveldb/opt"

"github.com/Fantom-foundation/go-opera/opera/genesis"
"github.com/Fantom-foundation/go-opera/utils/adapters/ethdb2kvdb"
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
)

// ApplyGenesis writes initial state.
func (s *Store) ApplyGenesis(g genesis.Genesis) (err error) {
batch := s.EvmDb.NewBatch()
defer batch.Reset()
db := batched.Wrap(autocompact.Wrap2M(ethdb2kvdb.Wrap(s.EvmDb), opt.GiB, 16*opt.GiB, true, "evm"))
g.RawEvmItems.ForEach(func(key, value []byte) bool {
err = db.Put(key, value)
if err != nil {
return false
}
err = batch.Put(key, value)
if err != nil {
return false
}
if batch.ValueSize() > kvdb.IdealBatchSize {
err = batch.Write()
if err != nil {
return false
}
batch.Reset()
}
return true
})
if err != nil {
return err
}
return batch.Write()
return db.Write()
}

func (s *Store) WrapTablesAsBatched() (unwrap func()) {
Expand All @@ -45,7 +36,7 @@ func (s *Store) WrapTablesAsBatched() (unwrap func()) {

unwrapLogs := s.EvmLogs.WrapTablesAsBatched()

batchedReceipts := batched.Wrap(s.table.Receipts)
batchedReceipts := batched.Wrap(autocompact.Wrap2M(s.table.Receipts, opt.GiB, 16*opt.GiB, false, "receipts"))
s.table.Receipts = batchedReceipts
return func() {
_ = batchedTxs.Flush()
Expand Down
3 changes: 2 additions & 1 deletion integration/legacy_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/syndtr/goleveldb/leveldb/opt"

"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
"github.com/Fantom-foundation/go-opera/utils/dbutil/compactdb"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ type transformTask struct {

func transform(m transformTask) error {
openDst := func() *batched.Store {
return batched.Wrap(m.openDst())
return batched.Wrap(autocompact.Wrap2M(m.openDst(), opt.GiB, 16*opt.GiB, true, ""))
}
openSrc := func() *batched.Store {
return batched.Wrap(m.openSrc())
Expand Down
143 changes: 143 additions & 0 deletions utils/dbutil/autocompact/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package autocompact

import (
"sync"

"github.com/Fantom-foundation/lachesis-base/kvdb"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/keycard-go/hexutils"
)

// Store implements automatic compacting of recently inserted/erased data according to provided strategy
type Store struct {
kvdb.Store
limit uint64
cont ContainerI
newCont func() ContainerI
compMu sync.Mutex
name string
}

type Batch struct {
kvdb.Batch
store *Store
cont ContainerI
}

func Wrap(s kvdb.Store, limit uint64, strategy func() ContainerI, name string) *Store {
return &Store{
Store: s,
limit: limit,
newCont: strategy,
cont: strategy(),
name: name,
}
}

func Wrap2(s kvdb.Store, limit1 uint64, limit2 uint64, strategy func() ContainerI, name string) *Store {
return Wrap(Wrap(s, limit1, strategy, name), limit2, strategy, name)
}

func Wrap2M(s kvdb.Store, limit1 uint64, limit2 uint64, forward bool, name string) *Store {
strategy := NewBackwardsCont
if forward {
strategy = NewForwardCont
}
return Wrap2(s, limit1, limit2, strategy, name)
}

func estSize(keyLen int, valLen int) uint64 {
// Storage overheads, related to adding/deleting a record,
//wouldn't be only proportional to length of key and value.
//E.g. if one adds 10 records with length of 2, it will be more expensive than 1 record with length 20
// Now, 64 wasn't really calculated but is rather a guesstimation
return uint64(keyLen + valLen + 64)
}

func (s *Store) onWrite(key []byte, size uint64) {
s.compMu.Lock()
defer s.compMu.Unlock()
if key != nil {
s.cont.Add(key, size)
}
s.mayCompact(false)
}

func (s *Store) onBatchWrite(batchCont ContainerI) {
s.compMu.Lock()
defer s.compMu.Unlock()
s.cont.Merge(batchCont)
s.mayCompact(false)
}

func (s *Store) compact() {
s.compMu.Lock()
defer s.compMu.Unlock()
s.mayCompact(true)
}

func (s *Store) mayCompact(force bool) {
// error handling
err := s.cont.Error()
if err != nil {
s.cont.Reset()
s.newCont = NewDevnullCont
s.cont = s.newCont()
log.Warn("Autocompaction failed, which may lead to performance issues", "name", s.name, "err", err)
}

if force || s.cont.Size() > s.limit {
for _, r := range s.cont.Ranges() {
log.Debug("Autocompact", "name", s.name, "from", hexutils.BytesToHex(r.minKey), "to", hexutils.BytesToHex(r.maxKey))
_ = s.Store.Compact(r.minKey, r.maxKey)
}
s.cont.Reset()
}
}

func (s *Store) Put(key []byte, value []byte) error {
defer s.onWrite(key, estSize(len(key), len(value)))
return s.Store.Put(key, value)
}

func (s *Store) Delete(key []byte) error {
defer s.onWrite(key, estSize(len(key), 0))
return s.Store.Delete(key)
}

func (s *Store) Close() error {
s.compact()
return s.Store.Close()
}

func (s *Store) NewBatch() kvdb.Batch {
batch := s.Store.NewBatch()
if batch == nil {
return nil
}
return &Batch{
Batch: batch,
store: s,
cont: s.newCont(),
}
}

func (s *Batch) Put(key []byte, value []byte) error {
s.cont.Add(key, estSize(len(key), len(value)))
return s.Batch.Put(key, value)
}

func (s *Batch) Delete(key []byte) error {
s.cont.Add(key, estSize(len(key), 0))
return s.Batch.Delete(key)
}

func (s *Batch) Reset() {
s.cont.Reset()
s.Batch.Reset()
}

func (s *Batch) Write() error {
defer s.store.onBatchWrite(s.cont)
return s.Batch.Write()
}
Loading

0 comments on commit cf51fea

Please sign in to comment.