Skip to content

Commit

Permalink
Revert "*: add broadcast local txs feature"
Browse files Browse the repository at this point in the history
This reverts commit 9e99913.
  • Loading branch information
markya0616 committed Apr 27, 2023
1 parent f477b8a commit 4a182b0
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 135 deletions.
1 change: 0 additions & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ var (
utils.TxPoolNoLocalsFlag,
utils.TxPoolJournalFlag,
utils.TxPoolRejournalFlag,
utils.TxPoolBroadcastPendingLocalTxFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
utils.TxPoolAccountSlotsFlag,
Expand Down
1 change: 0 additions & 1 deletion cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.TxPoolNoLocalsFlag,
utils.TxPoolJournalFlag,
utils.TxPoolRejournalFlag,
utils.TxPoolBroadcastPendingLocalTxFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
utils.TxPoolAccountSlotsFlag,
Expand Down
8 changes: 0 additions & 8 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,6 @@ var (
Usage: "Time interval to regenerate the local transaction journal",
Value: core.DefaultTxPoolConfig.Rejournal,
}
TxPoolBroadcastPendingLocalTxFlag = cli.DurationFlag{
Name: "txpool.broadcastpendinglocaltx",
Usage: "Time interval to broadcast the pending local transaction",
Value: core.DefaultTxPoolConfig.BroadcastPendingLocalTx,
}
TxPoolPriceLimitFlag = cli.Uint64Flag{
Name: "txpool.pricelimit",
Usage: "Minimum gas price limit to enforce for acceptance into the pool",
Expand Down Expand Up @@ -1462,9 +1457,6 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolRejournalFlag.Name) {
cfg.Rejournal = ctx.GlobalDuration(TxPoolRejournalFlag.Name)
}
if ctx.GlobalIsSet(TxPoolBroadcastPendingLocalTxFlag.Name) {
cfg.BroadcastPendingLocalTx = ctx.GlobalDuration(TxPoolBroadcastPendingLocalTxFlag.Name)
}
if ctx.GlobalIsSet(TxPoolPriceLimitFlag.Name) {
cfg.PriceLimit = ctx.GlobalUint64(TxPoolPriceLimitFlag.Name)
}
Expand Down
3 changes: 0 additions & 3 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ type NewTxsEvent struct{ Txs []*types.Transaction }
// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }

// PendingLocalTxsEvent is posted when there are pending local transactions in the transaction pool.
type PendingLocalTxsEvent struct{ Txs []*types.Transaction }

// NewQueuedTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewQueuedTxsEvent struct{ Txs []*types.Transaction }

Expand Down
66 changes: 16 additions & 50 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,10 @@ type blockChain interface {

// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal
BroadcastPendingLocalTx time.Duration // Time interval to broadcast the local transaction
Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal

PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
Expand All @@ -185,9 +184,8 @@ type TxPoolConfig struct {
// DefaultTxPoolConfig contains the default configurations for the transaction
// pool.
var DefaultTxPoolConfig = TxPoolConfig{
Journal: "transactions.rlp",
Rejournal: time.Hour,
BroadcastPendingLocalTx: 5 * time.Minute,
Journal: "transactions.rlp",
Rejournal: time.Hour,

PriceLimit: 1,
PriceBump: 10,
Expand All @@ -209,10 +207,6 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
conf.Rejournal = time.Second
}
if conf.BroadcastPendingLocalTx < time.Second {
log.Warn("Sanitizing invalid txpool broadcast local tx time", "provided", conf.BroadcastPendingLocalTx, "updated", time.Second)
conf.BroadcastPendingLocalTx = time.Second
}
if conf.PriceLimit < 1 {
log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
Expand Down Expand Up @@ -256,17 +250,16 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
reannoTxFeed event.Feed // Event feed for announcing transactions again
pendingLocalTxFeed event.Feed
queuedTxFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
reannoTxFeed event.Feed // Event feed for announcing transactions again
queuedTxFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
Expand Down Expand Up @@ -382,10 +375,6 @@ func (pool *TxPool) loop() {

// Notify tests that the init phase is done
close(pool.initDoneCh)

pendingLocalTxs := time.NewTicker(pool.config.BroadcastPendingLocalTx)
defer pendingLocalTxs.Stop()

for {
select {
// Handle ChainHeadEvent
Expand Down Expand Up @@ -467,23 +456,6 @@ func (pool *TxPool) loop() {
}
pool.mu.Unlock()
}

case <-pendingLocalTxs.C:
pool.mu.RLock()
lTxs := types.Transactions{}
for addr, list := range pool.pending {
// grab local transactions
if !pool.locals.contains(addr) {
continue
}

lTxs = append(lTxs, list.Flatten()...)
}
pool.mu.RUnlock()

if len(lTxs) != 0 {
go pool.pendingLocalTxFeed.Send(PendingLocalTxsEvent{lTxs})
}
}
}
}
Expand Down Expand Up @@ -521,12 +493,6 @@ func (pool *TxPool) SubscribeNewQueuedTxsEvent(ch chan<- NewQueuedTxsEvent) even
return pool.scope.Track(pool.queuedTxFeed.Subscribe(ch))
}

// SubscribePendingLocalTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribePendingLocalTxsEvent(ch chan<- PendingLocalTxsEvent) event.Subscription {
return pool.scope.Track(pool.pendingLocalTxFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand Down
1 change: 0 additions & 1 deletion core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2439,7 +2439,6 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
config.NoLocals = nolocals
config.Journal = journal
config.Rejournal = time.Second
config.BroadcastPendingLocalTx = time.Second

pool := NewTxPool(config, params.TestChainConfig, blockchain)

Expand Down
59 changes: 8 additions & 51 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ const (

// deltaTdThreshold is the threshold of TD difference for peers to broadcast votes.
deltaTdThreshold = 20

// pendingLocalTxChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
pendingLocalTxChanSize = 4096
)

var (
Expand Down Expand Up @@ -91,10 +87,6 @@ type txPool interface {
// SubscribeReannoTxsEvent should return an event subscription of
// ReannoTxsEvent and send events to the given channel.
SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription

// SubscribePendingLocalTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribePendingLocalTxsEvent(chan<- core.PendingLocalTxsEvent) event.Subscription
}

// votePool defines the methods needed from a votes pool implementation to
Expand Down Expand Up @@ -153,16 +145,14 @@ type handler struct {
peers *peerSet
merger *consensus.Merger

eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
reannoTxsCh chan core.ReannoTxsEvent
reannoTxsSub event.Subscription
pendingLocalTxsCh chan core.PendingLocalTxsEvent
pendingLocalTxsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
voteCh chan core.NewVoteEvent
votesSub event.Subscription
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
reannoTxsCh chan core.ReannoTxsEvent
reannoTxsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
voteCh chan core.NewVoteEvent
votesSub event.Subscription

whitelist map[uint64]common.Hash

Expand Down Expand Up @@ -643,10 +633,7 @@ func (h *handler) Start(maxPeers int) {
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
h.pendingLocalTxsCh = make(chan core.PendingLocalTxsEvent, pendingLocalTxChanSize)
h.pendingLocalTxsSub = h.txpool.SubscribePendingLocalTxsEvent(h.pendingLocalTxsCh)
go h.txBroadcastLoop()
go h.pendingLocalTxBroadcastLoop()

// broadcast votes
if h.votepool != nil {
Expand Down Expand Up @@ -676,13 +663,10 @@ func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.reannoTxsSub.Unsubscribe() // quits txReannounceLoop
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop

if h.votepool != nil {
h.votesSub.Unsubscribe() // quits voteBroadcastLoop
}

h.pendingLocalTxsSub.Unsubscribe()

// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
close(h.quitSync)
Expand Down Expand Up @@ -845,20 +829,6 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) {
log.Debug("Vote broadcast", "vote packs", directPeers, "broadcast vote", directCount)
}

// BroadcastPendingLocalTxs will propagate a batch of transactions to all peers
func (h *handler) BroadcastPendingLocalTxs(txs types.Transactions) {
peers := h.peers.Clone()
// Build tx hashes
txHashes := make([]common.Hash, len(txs))
for i, tx := range txs {
txHashes[i] = tx.Hash()
}
for _, peer := range peers {
peer.AsyncSendTransactions(txHashes)
}
log.Info("Broadcast pending local transaction to all peers", "recipients", len(peers))
}

// minedBroadcastLoop sends mined blocks to connected peers.
func (h *handler) minedBroadcastLoop() {
defer h.wg.Done()
Expand Down Expand Up @@ -909,16 +879,3 @@ func (h *handler) voteBroadcastLoop() {
}
}
}

func (h *handler) pendingLocalTxBroadcastLoop() {
for {
select {
case event := <-h.pendingLocalTxsCh:
h.BroadcastPendingLocalTxs(event.Txs)

// Err() channel will be closed when unsubscribing.
case <-h.pendingLocalTxsSub.Err():
return
}
}
}
11 changes: 3 additions & 8 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ var (
type testTxPool struct {
pool map[common.Hash]*types.Transaction // Hash map of collected transactions

pendingLocalTxFeed event.Feed
txFeed event.Feed // Notification feed to allow waiting for inclusion
reannoTxFeed event.Feed // Notification feed to trigger reannouce
lock sync.RWMutex // Protects the transaction pool
txFeed event.Feed // Notification feed to allow waiting for inclusion
reannoTxFeed event.Feed // Notification feed to trigger reannouce
lock sync.RWMutex // Protects the transaction pool
}

// newTestTxPool creates a mock transaction pool.
Expand Down Expand Up @@ -121,10 +120,6 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address]types.Transact
return batches
}

func (p *testTxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEvent) event.Subscription {
return p.pendingLocalTxFeed.Subscribe(ch)
}

// SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
Expand Down
12 changes: 0 additions & 12 deletions eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,3 @@ func (p *bscPeer) info() *bscPeerInfo {
Version: p.Version(),
}
}

// Clone clones a peers
func (ps *peerSet) Clone() []*ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()

list := make([]*ethPeer, 0, len(ps.peers))
for _, p := range ps.peers {
list = append(list, p)
}
return list
}

0 comments on commit 4a182b0

Please sign in to comment.