diff --git a/cmd/opera/launcher/db-transform.go b/cmd/opera/launcher/db-transform.go index 77d6568bb..046c02551 100644 --- a/cmd/opera/launcher/db-transform.go +++ b/cmd/opera/launcher/db-transform.go @@ -17,7 +17,7 @@ import ( "github.com/Fantom-foundation/go-opera/integration" "github.com/Fantom-foundation/go-opera/utils" - "github.com/Fantom-foundation/go-opera/utils/dbutil/compactdb" + "github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact" ) func dbTransform(ctx *cli.Context) error { @@ -264,9 +264,9 @@ func transformComponent(datadir string, dbTypes, tmpDbTypes map[multidb.TypeName return err } toMove[dbLocatorOf(e.New)] = true - newDB = batched.Wrap(newDB) - defer newDB.Close() newHumanName := path.Join("tmp", string(e.New.Type), e.New.Name) + newDB = batched.Wrap(autocompact.Wrap2M(newDB, opt.GiB, 16*opt.GiB, true, newHumanName)) + defer newDB.Close() log.Info("Copying DB table", "req", e.Req, "old_db", oldHumanName, "old_table", e.Old.Table, "new_db", newHumanName, "new_table", e.New.Table) oldTable := utils.NewTableOrSelf(oldDB, []byte(e.Old.Table)) @@ -292,11 +292,6 @@ func transformComponent(datadir string, dbTypes, tmpDbTypes map[multidb.TypeName keys = keys[:0] values = values[:0] } - err = compactdb.Compact(newTable, newHumanName, 16*opt.GiB) - if err != nil { - log.Error("Database compaction failed", "err", err) - return err - } return nil }() if err != nil { diff --git a/cmd/opera/launcher/export.go b/cmd/opera/launcher/export.go index 30b7bd48c..7681f7fdf 100644 --- a/cmd/opera/launcher/export.go +++ b/cmd/opera/launcher/export.go @@ -21,6 +21,7 @@ import ( "gopkg.in/urfave/cli.v1" "github.com/Fantom-foundation/go-opera/gossip" + "github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact" ) var ( @@ -135,7 +136,7 @@ func exportEvmKeys(ctx *cli.Context) error { if err != nil { return err } - keysDB := batched.Wrap(keysDB_) + keysDB := batched.Wrap(autocompact.Wrap2M(keysDB_, opt.GiB, 16*opt.GiB, true, "evm-keys")) defer keysDB.Close() it := gdb.EvmStore().EvmDb.NewIterator(nil, nil) diff --git a/gossip/apply_genesis.go b/gossip/apply_genesis.go index ec06630e2..c34ba70aa 100644 --- a/gossip/apply_genesis.go +++ b/gossip/apply_genesis.go @@ -5,11 +5,13 @@ import ( "github.com/Fantom-foundation/lachesis-base/hash" "github.com/Fantom-foundation/lachesis-base/kvdb/batched" + "github.com/syndtr/goleveldb/leveldb/opt" "github.com/Fantom-foundation/go-opera/inter/iblockproc" "github.com/Fantom-foundation/go-opera/inter/ibr" "github.com/Fantom-foundation/go-opera/inter/ier" "github.com/Fantom-foundation/go-opera/opera/genesis" + "github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact" ) // ApplyGenesis writes initial state. @@ -76,7 +78,7 @@ func (s *Store) ApplyGenesis(g genesis.Genesis) (genesisHash hash.Hash, err erro func (s *Store) WrapTablesAsBatched() (unwrap func()) { origTables := s.table - batchedBlocks := batched.Wrap(s.table.Blocks) + batchedBlocks := batched.Wrap(autocompact.Wrap2M(s.table.Blocks, opt.GiB, 16*opt.GiB, false, "blocks")) s.table.Blocks = batchedBlocks batchedBlockHashes := batched.Wrap(s.table.BlockHashes) diff --git a/gossip/evmstore/apply_genesis.go b/gossip/evmstore/apply_genesis.go index 62f7bab8b..1e9fb3beb 100644 --- a/gossip/evmstore/apply_genesis.go +++ b/gossip/evmstore/apply_genesis.go @@ -1,37 +1,28 @@ package evmstore import ( - "github.com/Fantom-foundation/lachesis-base/kvdb" "github.com/Fantom-foundation/lachesis-base/kvdb/batched" + "github.com/syndtr/goleveldb/leveldb/opt" "github.com/Fantom-foundation/go-opera/opera/genesis" + "github.com/Fantom-foundation/go-opera/utils/adapters/ethdb2kvdb" + "github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact" ) // ApplyGenesis writes initial state. func (s *Store) ApplyGenesis(g genesis.Genesis) (err error) { - batch := s.EvmDb.NewBatch() - defer batch.Reset() + db := batched.Wrap(autocompact.Wrap2M(ethdb2kvdb.Wrap(s.EvmDb), opt.GiB, 16*opt.GiB, true, "evm")) g.RawEvmItems.ForEach(func(key, value []byte) bool { + err = db.Put(key, value) if err != nil { return false } - err = batch.Put(key, value) - if err != nil { - return false - } - if batch.ValueSize() > kvdb.IdealBatchSize { - err = batch.Write() - if err != nil { - return false - } - batch.Reset() - } return true }) if err != nil { return err } - return batch.Write() + return db.Write() } func (s *Store) WrapTablesAsBatched() (unwrap func()) { @@ -45,7 +36,7 @@ func (s *Store) WrapTablesAsBatched() (unwrap func()) { unwrapLogs := s.EvmLogs.WrapTablesAsBatched() - batchedReceipts := batched.Wrap(s.table.Receipts) + batchedReceipts := batched.Wrap(autocompact.Wrap2M(s.table.Receipts, opt.GiB, 16*opt.GiB, false, "receipts")) s.table.Receipts = batchedReceipts return func() { _ = batchedTxs.Flush() diff --git a/integration/legacy_migrate.go b/integration/legacy_migrate.go index 83b38c102..f38f2a27b 100644 --- a/integration/legacy_migrate.go +++ b/integration/legacy_migrate.go @@ -18,6 +18,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact" "github.com/Fantom-foundation/go-opera/utils/dbutil/compactdb" ) @@ -46,7 +47,7 @@ type transformTask struct { func transform(m transformTask) error { openDst := func() *batched.Store { - return batched.Wrap(m.openDst()) + return batched.Wrap(autocompact.Wrap2M(m.openDst(), opt.GiB, 16*opt.GiB, true, "")) } openSrc := func() *batched.Store { return batched.Wrap(m.openSrc()) diff --git a/utils/dbutil/autocompact/store.go b/utils/dbutil/autocompact/store.go new file mode 100644 index 000000000..85e4f76c1 --- /dev/null +++ b/utils/dbutil/autocompact/store.go @@ -0,0 +1,143 @@ +package autocompact + +import ( + "sync" + + "github.com/Fantom-foundation/lachesis-base/kvdb" + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/keycard-go/hexutils" +) + +// Store implements automatic compacting of recently inserted/erased data according to provided strategy +type Store struct { + kvdb.Store + limit uint64 + cont ContainerI + newCont func() ContainerI + compMu sync.Mutex + name string +} + +type Batch struct { + kvdb.Batch + store *Store + cont ContainerI +} + +func Wrap(s kvdb.Store, limit uint64, strategy func() ContainerI, name string) *Store { + return &Store{ + Store: s, + limit: limit, + newCont: strategy, + cont: strategy(), + name: name, + } +} + +func Wrap2(s kvdb.Store, limit1 uint64, limit2 uint64, strategy func() ContainerI, name string) *Store { + return Wrap(Wrap(s, limit1, strategy, name), limit2, strategy, name) +} + +func Wrap2M(s kvdb.Store, limit1 uint64, limit2 uint64, forward bool, name string) *Store { + strategy := NewBackwardsCont + if forward { + strategy = NewForwardCont + } + return Wrap2(s, limit1, limit2, strategy, name) +} + +func estSize(keyLen int, valLen int) uint64 { + // Storage overheads, related to adding/deleting a record, + //wouldn't be only proportional to length of key and value. + //E.g. if one adds 10 records with length of 2, it will be more expensive than 1 record with length 20 + // Now, 64 wasn't really calculated but is rather a guesstimation + return uint64(keyLen + valLen + 64) +} + +func (s *Store) onWrite(key []byte, size uint64) { + s.compMu.Lock() + defer s.compMu.Unlock() + if key != nil { + s.cont.Add(key, size) + } + s.mayCompact(false) +} + +func (s *Store) onBatchWrite(batchCont ContainerI) { + s.compMu.Lock() + defer s.compMu.Unlock() + s.cont.Merge(batchCont) + s.mayCompact(false) +} + +func (s *Store) compact() { + s.compMu.Lock() + defer s.compMu.Unlock() + s.mayCompact(true) +} + +func (s *Store) mayCompact(force bool) { + // error handling + err := s.cont.Error() + if err != nil { + s.cont.Reset() + s.newCont = NewDevnullCont + s.cont = s.newCont() + log.Warn("Autocompaction failed, which may lead to performance issues", "name", s.name, "err", err) + } + + if force || s.cont.Size() > s.limit { + for _, r := range s.cont.Ranges() { + log.Debug("Autocompact", "name", s.name, "from", hexutils.BytesToHex(r.minKey), "to", hexutils.BytesToHex(r.maxKey)) + _ = s.Store.Compact(r.minKey, r.maxKey) + } + s.cont.Reset() + } +} + +func (s *Store) Put(key []byte, value []byte) error { + defer s.onWrite(key, estSize(len(key), len(value))) + return s.Store.Put(key, value) +} + +func (s *Store) Delete(key []byte) error { + defer s.onWrite(key, estSize(len(key), 0)) + return s.Store.Delete(key) +} + +func (s *Store) Close() error { + s.compact() + return s.Store.Close() +} + +func (s *Store) NewBatch() kvdb.Batch { + batch := s.Store.NewBatch() + if batch == nil { + return nil + } + return &Batch{ + Batch: batch, + store: s, + cont: s.newCont(), + } +} + +func (s *Batch) Put(key []byte, value []byte) error { + s.cont.Add(key, estSize(len(key), len(value))) + return s.Batch.Put(key, value) +} + +func (s *Batch) Delete(key []byte) error { + s.cont.Add(key, estSize(len(key), 0)) + return s.Batch.Delete(key) +} + +func (s *Batch) Reset() { + s.cont.Reset() + s.Batch.Reset() +} + +func (s *Batch) Write() error { + defer s.store.onBatchWrite(s.cont) + return s.Batch.Write() +} diff --git a/utils/dbutil/autocompact/strategy.go b/utils/dbutil/autocompact/strategy.go new file mode 100644 index 000000000..1d6bc45f4 --- /dev/null +++ b/utils/dbutil/autocompact/strategy.go @@ -0,0 +1,137 @@ +package autocompact + +import ( + "bytes" + "errors" + + "github.com/ethereum/go-ethereum/common" +) + +type ContainerI interface { + Add(key []byte, size uint64) + Merge(c ContainerI) + Error() error + Reset() + Size() uint64 + Ranges() []Range +} + +type Range struct { + minKey []byte + maxKey []byte +} + +// MonotonicContainer implements tracking of compaction ranges in cases when keys are inserted as series of monotonic ranges +type MonotonicContainer struct { + forward bool + ranges []Range + size uint64 + err error +} + +type DevnullContainer struct{} + +func (d DevnullContainer) Add(key []byte, size uint64) {} + +func (d DevnullContainer) Merge(c ContainerI) {} + +func (d DevnullContainer) Error() error { + return nil +} + +func (d DevnullContainer) Reset() { + +} + +func (d DevnullContainer) Size() uint64 { + return 0 +} + +func (d DevnullContainer) Ranges() []Range { + return []Range{} +} + +func NewForwardCont() ContainerI { + return &MonotonicContainer{ + forward: true, + } +} + +func NewBackwardsCont() ContainerI { + return &MonotonicContainer{ + forward: false, + } +} + +func NewDevnullCont() ContainerI { + return DevnullContainer{} +} + +func (m *MonotonicContainer) addRange(key []byte) { + m.ranges = append(m.ranges, Range{ + minKey: common.CopyBytes(key), + maxKey: common.CopyBytes(key), + }) +} + +func (m *MonotonicContainer) Add(key []byte, size uint64) { + m.size += size + if len(m.ranges) == 0 { + m.addRange(key) + } + // extend the last range if it's a monotonic addition or start new range otherwise + l := len(m.ranges) - 1 + if m.forward { + if bytes.Compare(key, m.ranges[l].maxKey) >= 0 { + m.ranges[l].maxKey = common.CopyBytes(key) + } else { + m.addRange(key) + } + } else { + if bytes.Compare(key, m.ranges[l].minKey) <= 0 { + m.ranges[l].minKey = common.CopyBytes(key) + } else { + m.addRange(key) + } + } +} + +func (m *MonotonicContainer) Merge(c ContainerI) { + if err := c.Error(); err != nil { + m.err = err + } + + for _, r := range c.Ranges() { + if m.forward { + m.Add(r.minKey, 0) + m.Add(r.maxKey, 0) + } else { + m.Add(r.maxKey, 0) + m.Add(r.minKey, 0) + } + } + m.size += c.Size() +} + +func (m *MonotonicContainer) Error() error { + if m.err != nil { + return m.err + } + if len(m.ranges) > 2 { + return errors.New("too many compaction ranges, it's likely that dataset isn't monotonic enough") + } + return nil +} + +func (m *MonotonicContainer) Reset() { + m.ranges = nil + m.size = 0 +} + +func (m *MonotonicContainer) Size() uint64 { + return m.size +} + +func (m *MonotonicContainer) Ranges() []Range { + return m.ranges +}