Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Silkworm poller for Antelope EVM #108

Merged
merged 2 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions blockfetcher/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package blockfetcher

import (
"context"
"fmt"
"sync"
"time"

"github.com/abourget/llerrgroup"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/eth-go/rpc"
pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type ToEthBlock func(in *rpc.Block, receipts map[string]*rpc.TransactionReceipt) (*pbeth.Block, map[string]bool)

type BlockFetcher struct {
rpcClient *rpc.Client
latest uint64
latestBlockRetryInterval time.Duration
fetchInterval time.Duration
toEthBlock ToEthBlock
lastFetchAt time.Time
logger *zap.Logger
}

func NewBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch, latestBlockRetryInterval time.Duration, toEthBlock ToEthBlock, logger *zap.Logger) *BlockFetcher {
return &BlockFetcher{
rpcClient: rpcClient,
latestBlockRetryInterval: latestBlockRetryInterval,
toEthBlock: toEthBlock,
fetchInterval: intervalBetweenFetch,
logger: logger,
}
}

func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbstream.Block, err error) {
f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum))
for f.latest < blockNum {
f.latest, err = f.rpcClient.LatestBlockNum(ctx)
if err != nil {
return nil, fmt.Errorf("fetching latest block num: %w", err)
}

f.logger.Info("got latest block", zap.Uint64("latest", f.latest), zap.Uint64("block_num", blockNum))

if f.latest < blockNum {
time.Sleep(f.latestBlockRetryInterval)
continue
}
break
}

sinceLastFetch := time.Since(f.lastFetchAt)
if sinceLastFetch < f.fetchInterval {
time.Sleep(f.fetchInterval - sinceLastFetch)
}

rpcBlock, err := f.rpcClient.GetBlockByNumber(ctx, rpc.BlockNumber(blockNum), rpc.WithGetBlockFullTransaction())
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blockNum, err)
}

receipts, err := FetchReceipts(ctx, rpcBlock, f.rpcClient)
if err != nil {
return nil, fmt.Errorf("fetching receipts for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}

f.logger.Debug("fetched receipts", zap.Int("count", len(receipts)))

f.lastFetchAt = time.Now()

if err != nil {
return nil, fmt.Errorf("fetching logs for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}

ethBlock, _ := f.toEthBlock(rpcBlock, receipts)
anyBlock, err := anypb.New(ethBlock)
if err != nil {
return nil, fmt.Errorf("create any block: %w", err)
}

return &pbbstream.Block{
Number: ethBlock.Number,
Id: ethBlock.GetFirehoseBlockID(),
ParentId: ethBlock.GetFirehoseBlockParentID(),
Timestamp: timestamppb.New(ethBlock.GetFirehoseBlockTime()),
LibNum: ethBlock.LIBNum(),
ParentNum: ethBlock.GetFirehoseBlockParentNumber(),
Payload: anyBlock,
}, nil
}

func FetchReceipts(ctx context.Context, block *rpc.Block, client *rpc.Client) (out map[string]*rpc.TransactionReceipt, err error) {
out = make(map[string]*rpc.TransactionReceipt)
lock := sync.Mutex{}

eg := llerrgroup.New(10)
for _, tx := range block.Transactions.Transactions {
if eg.Stop() {
continue // short-circuit the loop if we got an error
}
eg.Go(func() error {
receipt, err := client.TransactionReceipt(ctx, tx.Hash)
if err != nil {
return fmt.Errorf("fetching receipt for tx %q: %w", tx.Hash.Pretty(), err)
}
lock.Lock()
out[tx.Hash.Pretty()] = receipt
lock.Unlock()
return err
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

return
}
29 changes: 29 additions & 0 deletions blockfetcher/silkworm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package blockfetcher

import (
"context"
"time"

"go.uber.org/zap"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/eth-go/rpc"
"github.com/streamingfast/firehose-ethereum/block"
)

type SilkwormBlockFetcher struct {
fetcher *BlockFetcher
}

func NewSilkwormBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *SilkwormBlockFetcher {
fetcher := NewBlockFetcher(rpcClient, intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger)
return &SilkwormBlockFetcher{
fetcher: fetcher,
}
}

func (f *SilkwormBlockFetcher) PollingInterval() time.Duration { return 1 * time.Second }

func (f *SilkwormBlockFetcher) Fetch(ctx context.Context, blockNum uint64) (*pbbstream.Block, error) {
return f.fetcher.Fetch(ctx, blockNum)
}
4 changes: 3 additions & 1 deletion cmd/fireantelope/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ func Chain() *firecore.Chain[*pbantelope.Block] {

Tools: &firecore.ToolsConfig[*pbantelope.Block]{

RegisterExtraCmd: func(chain *firecore.Chain[*pbantelope.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error {
RegisterExtraCmd: func(chain *firecore.Chain[*pbantelope.Block], parent *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error {
//toolsCmd.AddCommand(newToolsGenerateNodeKeyCmd(chain))
//toolsCmd.AddCommand(newToolsBackfillCmd(zlog))
parent.AddCommand(newPollerCmd(zlog, tracer))
parent.AddCommand(newSilkwormPollerCmd(zlog, tracer))

return nil
},
Expand Down
79 changes: 79 additions & 0 deletions cmd/fireantelope/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"fmt"
"github.com/pinax-network/firehose-antelope/blockfetcher"
"path"
"strconv"
"time"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/eth-go/rpc"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/blockpoller"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func newPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "poller",
Short: "poll blocks from different sources",
}

cmd.AddCommand(newSilkwormPollerCmd(logger, tracer))
return cmd
}

func newSilkwormPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "silkworm <rpc-endpoint> <first-streamable-block>",
Short: "poll blocks from silkworm rpc",
Args: cobra.ExactArgs(2),
RunE: pollerRunE(logger, tracer),
}
cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch")

return cmd
}

func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()

rpcEndpoint := args[0]

dataDir := sflags.MustGetString(cmd, "data-dir")
stateDir := path.Join(dataDir, "poller-state")

logger.Info("launching firehose-antelope poller", zap.String("rpc_endpoint", rpcEndpoint), zap.String("data_dir", dataDir), zap.String("state_dir", stateDir))

rpcClient := rpc.NewClient(rpcEndpoint)

firstStreamableBlock, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return fmt.Errorf("unable to parse first streamable block %d: %w", firstStreamableBlock, err)
}

fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch")

fetcher := blockfetcher.NewSilkwormBlockFetcher(rpcClient, fetchInterval, 1*time.Second, logger)
handler := blockpoller.NewFireBlockHandler("type.googleapis.com/sf.ethereum.type.v2.Block")
poller := blockpoller.New(fetcher, handler, blockpoller.WithStoringState(stateDir), blockpoller.WithLogger(logger))

// there is currently no support for rpc.FinalizedBlock on eos evm, so we use the latest one
latestBlock, err := rpcClient.GetBlockByNumber(ctx, rpc.LatestBlock)
if err != nil {
return fmt.Errorf("getting latest block: %w", err)
}

err = poller.Run(ctx, firstStreamableBlock, bstream.NewBlockRef(latestBlock.Hash.String(), uint64(latestBlock.Number)))
if err != nil {
return fmt.Errorf("running poller: %w", err)
}

return nil
}
}
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ go 1.21
toolchain go1.21.5

require (
github.com/abourget/llerrgroup v0.2.0
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/eoscanada/eos-go v0.10.3-0.20231109144819-59afdfa3a37d
github.com/lytics/ordpool v0.0.0-20130426221837-8d833f097fe7
github.com/mitchellh/go-testing-interface v1.14.1
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/streamingfast/bstream v0.0.2-0.20231211192436-01f6a005b0e4
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/eth-go v0.0.0-20231204174036-34e2cb6d64af
github.com/streamingfast/firehose-core v1.0.0
github.com/streamingfast/firehose-ethereum v1.4.23-0.20231213134745-920366d3b7aa
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -45,14 +49,14 @@ require (
github.com/RoaringBitmap/roaring v0.9.4 // indirect
github.com/ShinyTrinkets/meta-logger v0.2.0 // indirect
github.com/ShinyTrinkets/overseer v0.3.0 // indirect
github.com/abourget/llerrgroup v0.2.0 // indirect
github.com/aws/aws-sdk-go v1.44.325 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.3.1 // indirect
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/bobg/go-generics/v2 v2.1.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/bufbuild/connect-go v1.10.0 // indirect
github.com/bufbuild/connect-grpchealth-go v1.1.1 // indirect
github.com/bufbuild/connect-grpcreflect-go v1.0.0 // indirect
Expand Down Expand Up @@ -97,6 +101,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/boxo v0.8.0 // indirect
github.com/ipfs/go-cid v0.4.0 // indirect
Expand Down Expand Up @@ -157,7 +162,6 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.15.0 // indirect
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80 // indirect
github.com/streamingfast/dauth v0.0.0-20231120142446-843f4e045cc2 // indirect
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c // indirect
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bobg/go-generics/v2 v2.1.1 h1:4rN9upY6Xm4TASSMeH+NzUghgO4h/SbNrQphIjRd/R0=
github.com/bobg/go-generics/v2 v2.1.1/go.mod h1:iPMSRVFlzkJSYOCXQ0n92RA3Vxw0RBv2E8j9ZODXgHk=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/bufbuild/connect-go v1.10.0 h1:QAJ3G9A1OYQW2Jbk3DeoJbkCxuKArrvZgDt47mjdTbg=
github.com/bufbuild/connect-go v1.10.0/go.mod h1:CAIePUgkDR5pAFaylSMtNK45ANQjp9JvpluG20rhpV8=
github.com/bufbuild/connect-grpchealth-go v1.1.1 h1:ldceS3m7+Qvl3GI4yzB4oCg3uOdD+Y1bytc/5xuMpqo=
Expand Down Expand Up @@ -388,6 +392,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM=
github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -606,8 +612,12 @@ github.com/streamingfast/dstore v0.1.1-0.20230620124109-3924b3b36c77 h1:u7FWLqz3
github.com/streamingfast/dstore v0.1.1-0.20230620124109-3924b3b36c77/go.mod h1:ngKU7WzHwVjOFpt2g+Wtob5mX4IvN90HYlnARcTRbmQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/eth-go v0.0.0-20231204174036-34e2cb6d64af h1:KH0tXSwKrsbNZ9RCKv+Fp4riu7ytzrc/hB3kXkwn0/Q=
github.com/streamingfast/eth-go v0.0.0-20231204174036-34e2cb6d64af/go.mod h1:UEm8dqibr3c3A1iIA3CHpkhN7j3X78prN7/55sXf3A0=
github.com/streamingfast/firehose-core v1.0.0 h1:6TV9M4JWTGkqI1Ocmn2ula4LLRq/WGfgLSJYH3wi+wc=
github.com/streamingfast/firehose-core v1.0.0/go.mod h1:y6GgxrGypjPyY25O/LBRIu2S1aZhjpMVX5AGLsqch0k=
github.com/streamingfast/firehose-ethereum v1.4.23-0.20231213134745-920366d3b7aa h1:doUV6rNUz/A+Y8wvNT/2k2qUKF02kuCRvaKgr/S2d3k=
github.com/streamingfast/firehose-ethereum v1.4.23-0.20231213134745-920366d3b7aa/go.mod h1:LYBs6siIpmpKyUsSUiziYr6YkW+s5HKCTzzuMiCfjK0=
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw=
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0/go.mod h1:cTNObq2Uofb330y05JbbZZ6RwE6QUXw5iVcHk1Fx3fk=
github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
Expand Down