diff --git a/core/block.go b/core/block.go index dbe3cdb26..96f8a9286 100644 --- a/core/block.go +++ b/core/block.go @@ -571,7 +571,7 @@ func (block *Block) CollectTransactions(deadlineInMs int64) { executedAt := time.Now().UnixNano() execute += executedAt - executeAt if err != nil { - logging.VLog().WithFields(logrus.Fields{ + logging.CLog().WithFields(logrus.Fields{ "tx": tx, "err": err, "giveback": giveback, @@ -842,7 +842,7 @@ func (block *Block) triggerEvent() { } block.eventEmitter.Trigger(event) - events, err := block.FetchEvents(v.hash) + events, err := block.FetchCacheEventsOfCurBlock(v.hash) if err != nil { for _, e := range events { block.eventEmitter.Trigger(e) @@ -1224,17 +1224,16 @@ func (block *Block) GetNonce(address byteutils.Hash) (uint64, error) { return account.Nonce(), nil } -// RecordEvent record event's topic and data with txHash -func (block *Block) RecordEvent(txHash byteutils.Hash, topic, data string) error { - event := &state.Event{Topic: topic, Data: data} - return block.WorldState().RecordEvent(txHash, event) -} - // FetchEvents fetch events by txHash. func (block *Block) FetchEvents(txHash byteutils.Hash) ([]*state.Event, error) { return block.WorldState().FetchEvents(txHash) } +// FetchCacheEventsOfCurBlock fetch events by txHash. +func (block *Block) FetchCacheEventsOfCurBlock(txHash byteutils.Hash) ([]*state.Event, error) { + return block.WorldState().FetchCacheEventsOfCurBlock(txHash) +} + func (block *Block) rewardCoinbaseForMint() error { coinbaseAddr := block.Coinbase().Bytes() coinbaseAcc, err := block.WorldState().GetOrCreateUserAccount(coinbaseAddr) diff --git a/core/block_test.go b/core/block_test.go index 732038919..d7fa56370 100644 --- a/core/block_test.go +++ b/core/block_test.go @@ -488,12 +488,43 @@ func TestBlock_fetchEvents(t *testing.T) { &state.Event{Topic: "chain.block", Data: "hello"}, &state.Event{Topic: "chain.block", Data: "hello"}, } + err := tail.worldState.Begin() + assert.Nil(t, err) tx := &Transaction{hash: []byte("tx")} + txWorldState, err := tail.worldState.Prepare(tx) + assert.Nil(t, err) for _, event := range events { - assert.Nil(t, tail.worldState.RecordEvent(tx.Hash(), event)) + assert.Nil(t, txWorldState.RecordEvent(tx.Hash(), event)) } + _, err = tail.worldState.CheckAndUpdate(tx) + assert.Nil(t, err) + es, err := tail.FetchEvents(tx.Hash()) assert.Nil(t, err) + assert.Equal(t, len(events), len(es)) + for idx, event := range es { + assert.Equal(t, events[idx], event) + } +} + +func TestBlock_fetchCacheEventsOfCurBlock(t *testing.T) { + neb := testNeb(t) + bc := neb.chain + + tail := bc.tailBlock + events := []*state.Event{ + &state.Event{Topic: "chain.block", Data: "hello"}, + &state.Event{Topic: "chain.tx", Data: "hello"}, + &state.Event{Topic: "chain.block", Data: "hello"}, + &state.Event{Topic: "chain.block", Data: "hello"}, + } + tx := &Transaction{hash: []byte("tx")} + for _, event := range events { + assert.Nil(t, tail.worldState.RecordEvent(tx.Hash(), event)) + } + es, err := tail.FetchCacheEventsOfCurBlock(tx.Hash()) + assert.Nil(t, err) + assert.Equal(t, len(events), len(es)) for idx, event := range es { assert.Equal(t, events[idx], event) } @@ -555,18 +586,6 @@ func TestGivebackInvalidTx(t *testing.T) { assert.Equal(t, len(bc.txPool.all), 1) } -func TestRecordEvent(t *testing.T) { - neb := testNeb(t) - bc := neb.chain - txHash := []byte("hello") - assert.Nil(t, bc.tailBlock.RecordEvent(txHash, TopicSendTransaction, "world")) - events, err := bc.tailBlock.FetchEvents(txHash) - assert.Nil(t, err) - assert.Equal(t, len(events), 1) - assert.Equal(t, events[0].Topic, TopicSendTransaction) - assert.Equal(t, events[0].Data, "world") -} - func TestBlockVerifyIntegrity(t *testing.T) { neb := testNeb(t) bc := neb.chain diff --git a/core/state/types.go b/core/state/types.go index af846de69..5a56757e9 100644 --- a/core/state/types.go +++ b/core/state/types.go @@ -148,6 +148,7 @@ type WorldState interface { RecordEvent(txHash byteutils.Hash, event *Event) error FetchEvents(byteutils.Hash) ([]*Event, error) + FetchCacheEventsOfCurBlock(byteutils.Hash) ([]*Event, error) Dynasty() ([]byteutils.Hash, error) DynastyRoot() byteutils.Hash diff --git a/core/state/world_state.go b/core/state/world_state.go index b67e36342..705a9d766 100644 --- a/core/state/world_state.go +++ b/core/state/world_state.go @@ -66,6 +66,8 @@ type states struct { txid interface{} gasConsumed map[string]*util.Uint128 + + events map[string][]*Event } func newStates(consensus Consensus, stor storage.Storage) (*states, error) { @@ -104,6 +106,7 @@ func newStates(consensus Consensus, stor storage.Storage) (*states, error) { txid: nil, gasConsumed: make(map[string]*util.Uint128), + events: make(map[string][]*Event), }, nil } @@ -113,10 +116,7 @@ func (s *states) Replay(done *states) error { if err != nil { return err } - _, err = s.txsState.Replay(done.txsState) - if err != nil { - return err - } + _, err = s.eventsState.Replay(done.eventsState) if err != nil { return err @@ -139,6 +139,40 @@ func (s *states) Replay(done *states) error { } } + //reply event + err = s.ReplayEvent(done) + if err != nil { + return err + } + + return nil +} + +func (s *states) ReplayEvent(done *states) error { + + //replay event + for tx, events := range done.events { + txHash, err := byteutils.FromHex(tx) + if err != nil { + return err + } + for idx, event := range events { + cnt := int64(idx + 1) + + key := append(txHash, byteutils.FromInt64(cnt)...) + bytes, err := json.Marshal(event) + if err != nil { + return err + } + + _, err = s.eventsState.Put(key, bytes) + if err != nil { + return err + } + } + s.events[tx] = done.events[tx] + } + return nil } @@ -190,6 +224,7 @@ func (s *states) Clone() (WorldState, error) { txid: s.txid, gasConsumed: make(map[string]*util.Uint128), + events: make(map[string][]*Event), }, nil } @@ -277,6 +312,7 @@ func (s *states) Prepare(txid interface{}) (TxWorldState, error) { txid: txid, gasConsumed: make(map[string]*util.Uint128), + events: make(map[string][]*Event, 0), }, nil } @@ -396,34 +432,22 @@ func (s *states) PutTx(txHash byteutils.Hash, txBytes []byte) error { } func (s *states) RecordEvent(txHash byteutils.Hash, event *Event) error { - iter, err := s.eventsState.Iterator(txHash) - if err != nil && err != storage.ErrKeyNotFound { - return err - } - cnt := int64(0) - if err != storage.ErrKeyNotFound { - exist, err := iter.Next() - if err != nil { - return err - } - for exist { - cnt++ - exist, err = iter.Next() - if err != nil { - return err - } - } + + events, ok := s.events[txHash.String()] + if !ok { + events = make([]*Event, 0) } - cnt++ + + cnt := int64(len(s.events) + 1) + key := append(txHash, byteutils.FromInt64(cnt)...) bytes, err := json.Marshal(event) if err != nil { return err } - _, err = s.eventsState.Put(key, bytes) - if err != nil { - return err - } + + s.events[txHash.String()] = append(events, event) + // record change log if err := s.changelog.Put(key, bytes); err != nil { return err @@ -432,6 +456,20 @@ func (s *states) RecordEvent(txHash byteutils.Hash, event *Event) error { return nil } +func (s *states) FetchCacheEventsOfCurBlock(txHash byteutils.Hash) ([]*Event, error) { + txevents, ok := s.events[txHash.String()] + if !ok { + return nil, nil + } + + events := []*Event{} + for _, event := range txevents { + events = append(events, event) + } + + return events, nil +} + func (s *states) FetchEvents(txHash byteutils.Hash) ([]*Event, error) { events := []*Event{} iter, err := s.eventsState.Iterator(txHash) diff --git a/nf/nvm/context.go b/nf/nvm/context.go index 8f132ac90..7b53f759c 100644 --- a/nf/nvm/context.go +++ b/nf/nvm/context.go @@ -42,12 +42,12 @@ type Block interface { Height() uint64 VerifyAddress(str string) bool SerializeTxByHash(hash byteutils.Hash) (proto.Message, error) - RecordEvent(txHash byteutils.Hash, topic, data string) error } // TxWorldState interface of world state type TxWorldState interface { GetOrCreateUserAccount(addr byteutils.Hash) (state.Account, error) + RecordEvent(txHash byteutils.Hash, event *state.Event) error } // AccountState context account state diff --git a/nf/nvm/event.go b/nf/nvm/event.go index 3bd84809e..09be4c442 100644 --- a/nf/nvm/event.go +++ b/nf/nvm/event.go @@ -22,6 +22,7 @@ import "C" import ( "unsafe" + "github.com/nebulasio/go-nebulas/core/state" "github.com/nebulasio/go-nebulas/util/byteutils" "github.com/nebulasio/go-nebulas/util/logging" "github.com/sirupsen/logrus" @@ -57,5 +58,7 @@ func EventTriggerFunc(handler unsafe.Pointer, topic, data *C.char) { txHash, _ := byteutils.FromHex(e.ctx.tx.Hash) contractTopic := EventNameSpaceContract + "." + gTopic - e.ctx.block.RecordEvent(txHash, contractTopic, gData) + + event := &state.Event{Topic: contractTopic, Data: gData} + e.ctx.txWorldState.RecordEvent(txHash, event) } diff --git a/storage/disk_storage.go b/storage/disk_storage.go index 4e7e6a3f6..fbe0e9628 100644 --- a/storage/disk_storage.go +++ b/storage/disk_storage.go @@ -50,9 +50,11 @@ func NewDiskStorage(path string) (*DiskStorage, error) { BlockSize: 4 * opt.MiB, Filter: filter.NewBloomFilter(10), }) + if err != nil { return nil, err } + return &DiskStorage{ db: db, enableBatch: false,