Skip to content

Commit

Permalink
Merge pull request #2187 from cheng762/feature/bump-version-to-1.5.0
Browse files Browse the repository at this point in the history
update for AddValidatorEvent
  • Loading branch information
benbaley authored Nov 6, 2023
2 parents d29c0c0 + 0f6d6fc commit 206f257
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 32 deletions.
39 changes: 26 additions & 13 deletions consensus/cbft/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,16 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even

isValidatorAfter := vp.isValidator(epoch, vp.nodeID)

nodes := make(map[enode.ID]struct{})
for _, validator := range vp.currentValidators.Nodes {
nodes[validator.NodeID] = struct{}{}
if isValidatorBefore || isValidatorAfter {
nodes := make(map[enode.ID]struct{})
for _, validator := range vp.currentValidators.Nodes {
nodes[validator.NodeID] = struct{}{}
}
eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes})
}
eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes})

removes := make([]*enode.Node, 0)
adds := make([]*enode.Node, 0)

if isValidatorBefore {
// If we are still a consensus node, that adding
Expand All @@ -435,21 +440,22 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even
if isValidatorAfter {
for nodeID, vnode := range vp.prevValidators.Nodes {
if node, _ := vp.currentValidators.FindNodeByID(nodeID); node == nil {
eventMux.Post(cbfttypes.RemoveValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)})
log.Trace("Post RemoveValidatorEvent", "nodeID", nodeID.String())
removes = append(removes, enode.NewV4(vnode.PubKey, nil, 0, 0))
log.Trace("add to RemoveValidatorEvent", "nodeID", nodeID, "isValidatorAfter")
}
}

for nodeID, vnode := range vp.currentValidators.Nodes {
if node, _ := vp.prevValidators.FindNodeByID(nodeID); node == nil {
eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)})
log.Trace("Post AddValidatorEvent", "nodeID", nodeID.String())
adds = append(adds, enode.NewV4(vnode.PubKey, nil, 0, 0))
log.Trace("add to AddValidatorEvent", "nodeID", nodeID)
}
}

} else {
for nodeID, vnode := range vp.prevValidators.Nodes {
eventMux.Post(cbfttypes.RemoveValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)})
log.Trace("Post RemoveValidatorEvent", "nodeID", nodeID.String())
removes = append(removes, enode.NewV4(vnode.PubKey, nil, 0, 0))
log.Trace("add to RemoveValidatorEvent", "nodeID", nodeID)
}
}
} else {
Expand All @@ -459,12 +465,19 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even
// with other validators in the consensus stages.
if isValidatorAfter {
for nodeID, vnode := range vp.currentValidators.Nodes {
eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)})
log.Trace("Post AddValidatorEvent", "nodeID", nodeID.String())
adds = append(adds, enode.NewV4(vnode.PubKey, nil, 0, 0))
log.Trace("add to AddValidatorEvent", "nodeID", nodeID)
}
}
}

if len(removes) > 0 {
eventMux.Post(cbfttypes.RemoveValidatorEvent{Nodes: removes})
log.Trace("Post RemoveValidatorEvent", "num", len(removes), "isValidatorBefore", isValidatorBefore, "isValidatorAfter", isValidatorAfter)
}
if len(adds) > 0 {
eventMux.Post(cbfttypes.AddValidatorEvent{Nodes: adds})
log.Trace("Post AddValidatorEvent", "num", len(adds), "isValidatorBefore", isValidatorBefore, "isValidatorAfter", isValidatorAfter)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/cbfttypes/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type CbftResult struct {
}

type AddValidatorEvent struct {
Node *enode.Node
Nodes []*enode.Node
}

type RemoveValidatorEvent struct {
Node *enode.Node
Nodes []*enode.Node
}

type UpdateValidatorEvent struct {
Expand Down
11 changes: 3 additions & 8 deletions p2p/consensus_dialed.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package p2p

import (
"fmt"
"strings"

"github.com/PlatONnetwork/PlatON-Go/log"
"github.com/PlatONnetwork/PlatON-Go/p2p/enode"
)
Expand Down Expand Up @@ -68,8 +65,6 @@ func (tasks *dialedTasks) RemoveTask(NodeID enode.ID) error {
}

func (tasks *dialedTasks) ListTask() []*dialTask {

log.Info("[after list]Consensus dialed task list after ListTask operation", "task queue", tasks.description())
return tasks.queue
}

Expand Down Expand Up @@ -138,10 +133,10 @@ func (tasks *dialedTasks) isEmpty() bool {
return false
}

func (tasks *dialedTasks) description() string {
func (tasks *dialedTasks) description() []string {
var description []string
for _, t := range tasks.queue {
description = append(description, fmt.Sprintf("%x", t.dest.ID().TerminalString()))
description = append(description, t.dest.ID().TerminalString())
}
return strings.Join(description, ",")
return description
}
5 changes: 5 additions & 0 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ func (d *dialScheduler) startConsensusDials(n int) (started int) {
n = 3
}
}
if n <= 0 {
return
}

log.Debug("startConsensusDials", "maxConsensusPeers", d.MaxConsensusPeers, "consensusPeers", d.consensusPeers, "n", n, "task queue", d.consensusPool.description())

// Create dials for consensus nodes if they are not connected.
for _, t := range d.consensusPool.ListTask() {
Expand Down
52 changes: 52 additions & 0 deletions p2p/discover/v4wire/v4wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ type (
Rest []rlp.RawValue `rlp:"tail"`
}

PingV1 struct {
Version uint
From, To Endpoint
Expiration uint64

// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}

// Pong is the reply to ping.
Pong struct {
// This field should mirror the UDP envelope address
Expand All @@ -71,7 +80,17 @@ type (
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
PongV1 struct {
// This field should mirror the UDP envelope address
// of the ping packet, which provides a way to discover the
// the external address (after NAT).
To Endpoint
ReplyTok []byte // This contains the hash of the ping packet.
Expiration uint64 // Absolute timestamp at which the packet becomes invalid.

// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// Findnode is a query for nodes close to the given target.
Findnode struct {
Target Pubkey
Expand Down Expand Up @@ -184,6 +203,9 @@ func (req *Ping) DecodeRLP(s *rlp.Stream) error {
if err := decodePingRLP(req, blob); err == nil {
return nil
}
if err := decodeV1PingRLP(req, blob); err != nil {
return err
}

return nil
}
Expand All @@ -205,6 +227,21 @@ func decodePingRLP(p *Ping, blob []byte) error {
return nil
}

func decodeV1PingRLP(p *Ping, blob []byte) error {
var ping PingV1
if err := rlp.DecodeBytes(blob, &ping); err != nil {
return err
}

p.Version = ping.Version
p.From = ping.From
p.To = ping.To
p.Expiration = ping.Expiration
p.ForkID = ping.Rest

return nil
}

func (req *Pong) Name() string { return "PONG/v4" }
func (req *Pong) Kind() byte { return PongPacket }
func (req *Pong) Fork() []rlp.RawValue {
Expand All @@ -220,6 +257,9 @@ func (req *Pong) DecodeRLP(s *rlp.Stream) error {
if err := decodePongRLP(req, blob); err == nil {
return nil
}
if err := decodeV1PongRLP(req, blob); err != nil {
return err
}
return nil
}

Expand All @@ -239,6 +279,18 @@ func decodePongRLP(p *Pong, blob []byte) error {
return nil
}

func decodeV1PongRLP(p *Pong, blob []byte) error {
var pong PongV1
if err := rlp.DecodeBytes(blob, &pong); err != nil {
return err
}
p.To = pong.To
p.ReplyTok = pong.ReplyTok
p.Expiration = pong.Expiration
p.ForkID = pong.Rest
return nil
}

func (req *Findnode) Name() string { return "FINDNODE/v4" }
func (req *Findnode) Kind() byte { return FindnodePacket }
func (req *Findnode) Fork() []rlp.RawValue {
Expand Down
16 changes: 10 additions & 6 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,8 +796,7 @@ running:
// This channel is used by RemoveConsensusNode to remove an enode
// from the consensus node set.
srv.log.Trace("Removing consensus node", "node", n.ID())
id := n.ID()
if srv.localnode.ID() == id {
if srv.localnode.ID() == n.ID() {
srv.log.Debug("We are not an consensus node")
srv.consensus = false
}
Expand Down Expand Up @@ -1276,11 +1275,16 @@ func (srv *Server) watching() {

switch data := ev.Data.(type) {
case cbfttypes.AddValidatorEvent:
srv.log.Trace("Received AddValidatorEvent", "nodeID", data.Node.ID())
srv.AddConsensusPeer(data.Node)
srv.log.Trace("Received AddValidatorEvent", "num", len(data.Nodes))
for _, node := range data.Nodes {
srv.AddConsensusPeer(node)
}

case cbfttypes.RemoveValidatorEvent:
srv.log.Trace("Received RemoveValidatorEvent", "nodeID", data.Node.ID())
srv.RemoveConsensusPeer(data.Node)
srv.log.Trace("Received RemoveValidatorEvent", "num", len(data.Nodes))
for _, node := range data.Nodes {
srv.RemoveConsensusPeer(node)
}
case cbfttypes.UpdateValidatorEvent:
consensusPeer := 0
if _, ok := data.Nodes[srv.localnode.ID()]; ok {
Expand Down
8 changes: 5 additions & 3 deletions x/plugin/staking_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,14 +390,16 @@ func (sk *StakingPlugin) Confirmed(nodeId enode.IDv0, block *types.Block) error
}

func (sk *StakingPlugin) addConsensusNode(nodes staking.ValidatorQueue) {
adds := make([]*enode.Node, 0)
for _, node := range nodes {
pub, err := node.NodeId.Pubkey()
if err != nil {
panic(err)
}
if err := sk.eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(pub, nil, 0, 0)}); nil != err {
log.Error("post AddValidatorEvent failed", "nodeId", node.NodeId.TerminalString(), "err", err)
}
adds = append(adds, enode.NewV4(pub, nil, 0, 0))
}
if err := sk.eventMux.Post(cbfttypes.AddValidatorEvent{Nodes: adds}); nil != err {
log.Error("post AddValidatorEvent failed", "num", len(adds), "err", err)
}
}

Expand Down

0 comments on commit 206f257

Please sign in to comment.