From bd830c6e1cc954b76476a82dc036639991664ca8 Mon Sep 17 00:00:00 2001 From: clearly <910372762@qq.com> Date: Wed, 1 Nov 2023 16:34:45 +0800 Subject: [PATCH 1/3] update for AddValidatorEvent --- consensus/cbft/validator/validator.go | 29 ++++++++++++++++++--------- core/cbfttypes/type.go | 4 ++-- p2p/consensus_dialed.go | 11 +++------- p2p/dial.go | 5 +++++ p2p/server.go | 16 +++++++++------ x/plugin/staking_plugin.go | 8 +++++--- 6 files changed, 45 insertions(+), 28 deletions(-) diff --git a/consensus/cbft/validator/validator.go b/consensus/cbft/validator/validator.go index b6cb59bbb7..4ecd3d1432 100644 --- a/consensus/cbft/validator/validator.go +++ b/consensus/cbft/validator/validator.go @@ -425,6 +425,9 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even } eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes}) + removes := make([]*enode.Node, 0) + adds := make([]*enode.Node, 0) + if isValidatorBefore { // If we are still a consensus node, that adding // new validators as consensus peer, and removing @@ -435,21 +438,22 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even if isValidatorAfter { for nodeID, vnode := range vp.prevValidators.Nodes { if node, _ := vp.currentValidators.FindNodeByID(nodeID); node == nil { - eventMux.Post(cbfttypes.RemoveValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)}) - log.Trace("Post RemoveValidatorEvent", "nodeID", nodeID.String()) + removes = append(removes, enode.NewV4(vnode.PubKey, nil, 0, 0)) + log.Trace("add to RemoveValidatorEvent", "nodeID", nodeID, "isValidatorAfter") } } for nodeID, vnode := range vp.currentValidators.Nodes { if node, _ := vp.prevValidators.FindNodeByID(nodeID); node == nil { - eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)}) - log.Trace("Post AddValidatorEvent", "nodeID", nodeID.String()) + adds = append(adds, enode.NewV4(vnode.PubKey, nil, 0, 0)) + log.Trace("add to AddValidatorEvent", "nodeID", nodeID) } } + } else { for nodeID, vnode := range vp.prevValidators.Nodes { - eventMux.Post(cbfttypes.RemoveValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)}) - log.Trace("Post RemoveValidatorEvent", "nodeID", nodeID.String()) + removes = append(removes, enode.NewV4(vnode.PubKey, nil, 0, 0)) + log.Trace("add to RemoveValidatorEvent", "nodeID", nodeID) } } } else { @@ -459,12 +463,19 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even // with other validators in the consensus stages. if isValidatorAfter { for nodeID, vnode := range vp.currentValidators.Nodes { - eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)}) - log.Trace("Post AddValidatorEvent", "nodeID", nodeID.String()) + adds = append(adds, enode.NewV4(vnode.PubKey, nil, 0, 0)) + log.Trace("add to AddValidatorEvent", "nodeID", nodeID) } } } - + if len(removes) > 0 { + eventMux.Post(cbfttypes.RemoveValidatorEvent{Nodes: removes}) + log.Trace("Post RemoveValidatorEvent", "num", len(removes), "isValidatorBefore", isValidatorBefore, "isValidatorAfter", isValidatorAfter) + } + if len(adds) > 0 { + eventMux.Post(cbfttypes.AddValidatorEvent{Nodes: adds}) + log.Trace("Post AddValidatorEvent", "num", len(adds), "isValidatorBefore", isValidatorBefore, "isValidatorAfter", isValidatorAfter) + } return nil } diff --git a/core/cbfttypes/type.go b/core/cbfttypes/type.go index fd50b15f39..206a3f2e97 100644 --- a/core/cbfttypes/type.go +++ b/core/cbfttypes/type.go @@ -47,11 +47,11 @@ type CbftResult struct { } type AddValidatorEvent struct { - Node *enode.Node + Nodes []*enode.Node } type RemoveValidatorEvent struct { - Node *enode.Node + Nodes []*enode.Node } type UpdateValidatorEvent struct { diff --git a/p2p/consensus_dialed.go b/p2p/consensus_dialed.go index 459a87882f..b06b853b19 100644 --- a/p2p/consensus_dialed.go +++ b/p2p/consensus_dialed.go @@ -1,9 +1,6 @@ package p2p import ( - "fmt" - "strings" - "github.com/PlatONnetwork/PlatON-Go/log" "github.com/PlatONnetwork/PlatON-Go/p2p/enode" ) @@ -68,8 +65,6 @@ func (tasks *dialedTasks) RemoveTask(NodeID enode.ID) error { } func (tasks *dialedTasks) ListTask() []*dialTask { - - log.Info("[after list]Consensus dialed task list after ListTask operation", "task queue", tasks.description()) return tasks.queue } @@ -138,10 +133,10 @@ func (tasks *dialedTasks) isEmpty() bool { return false } -func (tasks *dialedTasks) description() string { +func (tasks *dialedTasks) description() []string { var description []string for _, t := range tasks.queue { - description = append(description, fmt.Sprintf("%x", t.dest.ID().TerminalString())) + description = append(description, t.dest.ID().TerminalString()) } - return strings.Join(description, ",") + return description } diff --git a/p2p/dial.go b/p2p/dial.go index a112c222a0..bccbbf4b10 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -484,6 +484,11 @@ func (d *dialScheduler) startConsensusDials(n int) (started int) { n = 3 } } + if n <= 0 { + return + } + + log.Debug("startConsensusDials", "maxConsensusPeers", d.MaxConsensusPeers, "consensusPeers", d.consensusPeers, "n", n, "task queue", d.consensusPool.description()) // Create dials for consensus nodes if they are not connected. for _, t := range d.consensusPool.ListTask() { diff --git a/p2p/server.go b/p2p/server.go index 809e948755..7903c04efb 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -796,8 +796,7 @@ running: // This channel is used by RemoveConsensusNode to remove an enode // from the consensus node set. srv.log.Trace("Removing consensus node", "node", n.ID()) - id := n.ID() - if srv.localnode.ID() == id { + if srv.localnode.ID() == n.ID() { srv.log.Debug("We are not an consensus node") srv.consensus = false } @@ -1276,11 +1275,16 @@ func (srv *Server) watching() { switch data := ev.Data.(type) { case cbfttypes.AddValidatorEvent: - srv.log.Trace("Received AddValidatorEvent", "nodeID", data.Node.ID()) - srv.AddConsensusPeer(data.Node) + srv.log.Trace("Received AddValidatorEvent", "num", len(data.Nodes)) + for _, node := range data.Nodes { + srv.AddConsensusPeer(node) + } + case cbfttypes.RemoveValidatorEvent: - srv.log.Trace("Received RemoveValidatorEvent", "nodeID", data.Node.ID()) - srv.RemoveConsensusPeer(data.Node) + srv.log.Trace("Received RemoveValidatorEvent", "num", len(data.Nodes)) + for _, node := range data.Nodes { + srv.RemoveConsensusPeer(node) + } case cbfttypes.UpdateValidatorEvent: consensusPeer := 0 if _, ok := data.Nodes[srv.localnode.ID()]; ok { diff --git a/x/plugin/staking_plugin.go b/x/plugin/staking_plugin.go index 3b22286584..457899f69f 100644 --- a/x/plugin/staking_plugin.go +++ b/x/plugin/staking_plugin.go @@ -390,14 +390,16 @@ func (sk *StakingPlugin) Confirmed(nodeId enode.IDv0, block *types.Block) error } func (sk *StakingPlugin) addConsensusNode(nodes staking.ValidatorQueue) { + adds := make([]*enode.Node, 0) for _, node := range nodes { pub, err := node.NodeId.Pubkey() if err != nil { panic(err) } - if err := sk.eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(pub, nil, 0, 0)}); nil != err { - log.Error("post AddValidatorEvent failed", "nodeId", node.NodeId.TerminalString(), "err", err) - } + adds = append(adds, enode.NewV4(pub, nil, 0, 0)) + } + if err := sk.eventMux.Post(cbfttypes.AddValidatorEvent{Nodes: adds}); nil != err { + log.Error("post AddValidatorEvent failed", "num", len(adds), "err", err) } } From 193a88962031b5cd8eba51673ef1857bc953a31e Mon Sep 17 00:00:00 2001 From: clearly <910372762@qq.com> Date: Thu, 2 Nov 2023 12:12:37 +0800 Subject: [PATCH 2/3] fix discover --- p2p/discover/v4wire/v4wire.go | 52 +++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/p2p/discover/v4wire/v4wire.go b/p2p/discover/v4wire/v4wire.go index 89cd8bb8e7..4889eae5f7 100644 --- a/p2p/discover/v4wire/v4wire.go +++ b/p2p/discover/v4wire/v4wire.go @@ -57,6 +57,15 @@ type ( Rest []rlp.RawValue `rlp:"tail"` } + PingV1 struct { + Version uint + From, To Endpoint + Expiration uint64 + + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` + } + // Pong is the reply to ping. Pong struct { // This field should mirror the UDP envelope address @@ -71,7 +80,17 @@ type ( // Ignore additional fields (for forward compatibility). Rest []rlp.RawValue `rlp:"tail"` } + PongV1 struct { + // This field should mirror the UDP envelope address + // of the ping packet, which provides a way to discover the + // the external address (after NAT). + To Endpoint + ReplyTok []byte // This contains the hash of the ping packet. + Expiration uint64 // Absolute timestamp at which the packet becomes invalid. + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` + } // Findnode is a query for nodes close to the given target. Findnode struct { Target Pubkey @@ -184,6 +203,9 @@ func (req *Ping) DecodeRLP(s *rlp.Stream) error { if err := decodePingRLP(req, blob); err == nil { return nil } + if err := decodeV1PingRLP(req, blob); err != nil { + return err + } return nil } @@ -205,6 +227,21 @@ func decodePingRLP(p *Ping, blob []byte) error { return nil } +func decodeV1PingRLP(p *Ping, blob []byte) error { + var ping PingV1 + if err := rlp.DecodeBytes(blob, &ping); err != nil { + return err + } + + p.Version = ping.Version + p.From = ping.From + p.To = ping.To + p.Expiration = ping.Expiration + p.ForkID = ping.Rest + + return nil +} + func (req *Pong) Name() string { return "PONG/v4" } func (req *Pong) Kind() byte { return PongPacket } func (req *Pong) Fork() []rlp.RawValue { @@ -220,6 +257,9 @@ func (req *Pong) DecodeRLP(s *rlp.Stream) error { if err := decodePongRLP(req, blob); err == nil { return nil } + if err := decodeV1PongRLP(req, blob); err != nil { + return err + } return nil } @@ -239,6 +279,18 @@ func decodePongRLP(p *Pong, blob []byte) error { return nil } +func decodeV1PongRLP(p *Pong, blob []byte) error { + var pong PongV1 + if err := rlp.DecodeBytes(blob, &pong); err != nil { + return err + } + p.To = pong.To + p.ReplyTok = pong.ReplyTok + p.Expiration = pong.Expiration + p.ForkID = pong.Rest + return nil +} + func (req *Findnode) Name() string { return "FINDNODE/v4" } func (req *Findnode) Kind() byte { return FindnodePacket } func (req *Findnode) Fork() []rlp.RawValue { From 0f6d6fc5db01eac32c7dc0b8960eadc626efc935 Mon Sep 17 00:00:00 2001 From: clearly <910372762@qq.com> Date: Thu, 2 Nov 2023 14:25:52 +0800 Subject: [PATCH 3/3] update --- consensus/cbft/validator/validator.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/consensus/cbft/validator/validator.go b/consensus/cbft/validator/validator.go index 4ecd3d1432..1579c04644 100644 --- a/consensus/cbft/validator/validator.go +++ b/consensus/cbft/validator/validator.go @@ -419,11 +419,13 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even isValidatorAfter := vp.isValidator(epoch, vp.nodeID) - nodes := make(map[enode.ID]struct{}) - for _, validator := range vp.currentValidators.Nodes { - nodes[validator.NodeID] = struct{}{} + if isValidatorBefore || isValidatorAfter { + nodes := make(map[enode.ID]struct{}) + for _, validator := range vp.currentValidators.Nodes { + nodes[validator.NodeID] = struct{}{} + } + eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes}) } - eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes}) removes := make([]*enode.Node, 0) adds := make([]*enode.Node, 0)