Skip to content

Commit

Permalink
core:worldstate.go update record event to array
Browse files Browse the repository at this point in the history
  • Loading branch information
fengzi committed Mar 15, 2018
1 parent 361f881 commit d5617a7
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 49 deletions.
15 changes: 7 additions & 8 deletions core/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func (block *Block) CollectTransactions(deadlineInMs int64) {
executedAt := time.Now().UnixNano()
execute += executedAt - executeAt
if err != nil {
logging.VLog().WithFields(logrus.Fields{
logging.CLog().WithFields(logrus.Fields{
"tx": tx,
"err": err,
"giveback": giveback,
Expand Down Expand Up @@ -842,7 +842,7 @@ func (block *Block) triggerEvent() {
}
block.eventEmitter.Trigger(event)

events, err := block.FetchEvents(v.hash)
events, err := block.FetchCacheEventsOfCurBlock(v.hash)
if err != nil {
for _, e := range events {
block.eventEmitter.Trigger(e)
Expand Down Expand Up @@ -1224,17 +1224,16 @@ func (block *Block) GetNonce(address byteutils.Hash) (uint64, error) {
return account.Nonce(), nil
}

// RecordEvent record event's topic and data with txHash
func (block *Block) RecordEvent(txHash byteutils.Hash, topic, data string) error {
event := &state.Event{Topic: topic, Data: data}
return block.WorldState().RecordEvent(txHash, event)
}

// FetchEvents fetch events by txHash.
func (block *Block) FetchEvents(txHash byteutils.Hash) ([]*state.Event, error) {
return block.WorldState().FetchEvents(txHash)
}

// FetchCacheEventsOfCurBlock fetch events by txHash.
func (block *Block) FetchCacheEventsOfCurBlock(txHash byteutils.Hash) ([]*state.Event, error) {
return block.WorldState().FetchCacheEventsOfCurBlock(txHash)
}

func (block *Block) rewardCoinbaseForMint() error {
coinbaseAddr := block.Coinbase().Bytes()
coinbaseAcc, err := block.WorldState().GetOrCreateUserAccount(coinbaseAddr)
Expand Down
45 changes: 32 additions & 13 deletions core/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,43 @@ func TestBlock_fetchEvents(t *testing.T) {
&state.Event{Topic: "chain.block", Data: "hello"},
&state.Event{Topic: "chain.block", Data: "hello"},
}
err := tail.worldState.Begin()
assert.Nil(t, err)
tx := &Transaction{hash: []byte("tx")}
txWorldState, err := tail.worldState.Prepare(tx)
assert.Nil(t, err)
for _, event := range events {
assert.Nil(t, tail.worldState.RecordEvent(tx.Hash(), event))
assert.Nil(t, txWorldState.RecordEvent(tx.Hash(), event))
}
_, err = tail.worldState.CheckAndUpdate(tx)
assert.Nil(t, err)

es, err := tail.FetchEvents(tx.Hash())
assert.Nil(t, err)
assert.Equal(t, len(events), len(es))
for idx, event := range es {
assert.Equal(t, events[idx], event)
}
}

func TestBlock_fetchCacheEventsOfCurBlock(t *testing.T) {
neb := testNeb(t)
bc := neb.chain

tail := bc.tailBlock
events := []*state.Event{
&state.Event{Topic: "chain.block", Data: "hello"},
&state.Event{Topic: "chain.tx", Data: "hello"},
&state.Event{Topic: "chain.block", Data: "hello"},
&state.Event{Topic: "chain.block", Data: "hello"},
}
tx := &Transaction{hash: []byte("tx")}
for _, event := range events {
assert.Nil(t, tail.worldState.RecordEvent(tx.Hash(), event))
}
es, err := tail.FetchCacheEventsOfCurBlock(tx.Hash())
assert.Nil(t, err)
assert.Equal(t, len(events), len(es))
for idx, event := range es {
assert.Equal(t, events[idx], event)
}
Expand Down Expand Up @@ -555,18 +586,6 @@ func TestGivebackInvalidTx(t *testing.T) {
assert.Equal(t, len(bc.txPool.all), 1)
}

func TestRecordEvent(t *testing.T) {
neb := testNeb(t)
bc := neb.chain
txHash := []byte("hello")
assert.Nil(t, bc.tailBlock.RecordEvent(txHash, TopicSendTransaction, "world"))
events, err := bc.tailBlock.FetchEvents(txHash)
assert.Nil(t, err)
assert.Equal(t, len(events), 1)
assert.Equal(t, events[0].Topic, TopicSendTransaction)
assert.Equal(t, events[0].Data, "world")
}

func TestBlockVerifyIntegrity(t *testing.T) {
neb := testNeb(t)
bc := neb.chain
Expand Down
1 change: 1 addition & 0 deletions core/state/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ type WorldState interface {

RecordEvent(txHash byteutils.Hash, event *Event) error
FetchEvents(byteutils.Hash) ([]*Event, error)
FetchCacheEventsOfCurBlock(byteutils.Hash) ([]*Event, error)

Dynasty() ([]byteutils.Hash, error)
DynastyRoot() byteutils.Hash
Expand Down
90 changes: 64 additions & 26 deletions core/state/world_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type states struct {
txid interface{}

gasConsumed map[string]*util.Uint128

events map[string][]*Event
}

func newStates(consensus Consensus, stor storage.Storage) (*states, error) {
Expand Down Expand Up @@ -104,6 +106,7 @@ func newStates(consensus Consensus, stor storage.Storage) (*states, error) {
txid: nil,

gasConsumed: make(map[string]*util.Uint128),
events: make(map[string][]*Event),
}, nil
}

Expand All @@ -113,10 +116,7 @@ func (s *states) Replay(done *states) error {
if err != nil {
return err
}
_, err = s.txsState.Replay(done.txsState)
if err != nil {
return err
}

_, err = s.eventsState.Replay(done.eventsState)
if err != nil {
return err
Expand All @@ -139,6 +139,40 @@ func (s *states) Replay(done *states) error {
}
}

//reply event
err = s.ReplayEvent(done)
if err != nil {
return err
}

return nil
}

func (s *states) ReplayEvent(done *states) error {

//replay event
for tx, events := range done.events {
txHash, err := byteutils.FromHex(tx)
if err != nil {
return err
}
for idx, event := range events {
cnt := int64(idx + 1)

key := append(txHash, byteutils.FromInt64(cnt)...)
bytes, err := json.Marshal(event)
if err != nil {
return err
}

_, err = s.eventsState.Put(key, bytes)
if err != nil {
return err
}
}
s.events[tx] = done.events[tx]
}

return nil
}

Expand Down Expand Up @@ -190,6 +224,7 @@ func (s *states) Clone() (WorldState, error) {
txid: s.txid,

gasConsumed: make(map[string]*util.Uint128),
events: make(map[string][]*Event),
}, nil
}

Expand Down Expand Up @@ -277,6 +312,7 @@ func (s *states) Prepare(txid interface{}) (TxWorldState, error) {
txid: txid,

gasConsumed: make(map[string]*util.Uint128),
events: make(map[string][]*Event, 0),
}, nil
}

Expand Down Expand Up @@ -396,34 +432,22 @@ func (s *states) PutTx(txHash byteutils.Hash, txBytes []byte) error {
}

func (s *states) RecordEvent(txHash byteutils.Hash, event *Event) error {
iter, err := s.eventsState.Iterator(txHash)
if err != nil && err != storage.ErrKeyNotFound {
return err
}
cnt := int64(0)
if err != storage.ErrKeyNotFound {
exist, err := iter.Next()
if err != nil {
return err
}
for exist {
cnt++
exist, err = iter.Next()
if err != nil {
return err
}
}

events, ok := s.events[txHash.String()]
if !ok {
events = make([]*Event, 0)
}
cnt++

cnt := int64(len(s.events) + 1)

key := append(txHash, byteutils.FromInt64(cnt)...)
bytes, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.eventsState.Put(key, bytes)
if err != nil {
return err
}

s.events[txHash.String()] = append(events, event)

// record change log
if err := s.changelog.Put(key, bytes); err != nil {
return err
Expand All @@ -432,6 +456,20 @@ func (s *states) RecordEvent(txHash byteutils.Hash, event *Event) error {
return nil
}

func (s *states) FetchCacheEventsOfCurBlock(txHash byteutils.Hash) ([]*Event, error) {
txevents, ok := s.events[txHash.String()]
if !ok {
return nil, nil
}

events := []*Event{}
for _, event := range txevents {
events = append(events, event)
}

return events, nil
}

func (s *states) FetchEvents(txHash byteutils.Hash) ([]*Event, error) {
events := []*Event{}
iter, err := s.eventsState.Iterator(txHash)
Expand Down
2 changes: 1 addition & 1 deletion nf/nvm/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ type Block interface {
Height() uint64
VerifyAddress(str string) bool
SerializeTxByHash(hash byteutils.Hash) (proto.Message, error)
RecordEvent(txHash byteutils.Hash, topic, data string) error
}

// TxWorldState interface of world state
type TxWorldState interface {
GetOrCreateUserAccount(addr byteutils.Hash) (state.Account, error)
RecordEvent(txHash byteutils.Hash, event *state.Event) error
}

// AccountState context account state
Expand Down
5 changes: 4 additions & 1 deletion nf/nvm/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import "C"
import (
"unsafe"

"github.com/nebulasio/go-nebulas/core/state"
"github.com/nebulasio/go-nebulas/util/byteutils"
"github.com/nebulasio/go-nebulas/util/logging"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -57,5 +58,7 @@ func EventTriggerFunc(handler unsafe.Pointer, topic, data *C.char) {

txHash, _ := byteutils.FromHex(e.ctx.tx.Hash)
contractTopic := EventNameSpaceContract + "." + gTopic
e.ctx.block.RecordEvent(txHash, contractTopic, gData)

event := &state.Event{Topic: contractTopic, Data: gData}
e.ctx.txWorldState.RecordEvent(txHash, event)
}
2 changes: 2 additions & 0 deletions storage/disk_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ func NewDiskStorage(path string) (*DiskStorage, error) {
BlockSize: 4 * opt.MiB,
Filter: filter.NewBloomFilter(10),
})

if err != nil {
return nil, err
}

return &DiskStorage{
db: db,
enableBatch: false,
Expand Down

0 comments on commit d5617a7

Please sign in to comment.