From 46747fd49671ec990c9ac15974463b5f358f8076 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 31 Jan 2025 17:01:55 +0100 Subject: [PATCH] chore: filter remove all subscription from a peer that is leaving (#3267) * waku/waku_filter_v2/protocol.nim keeps track of the filter-client connections in Table[PeerId, Connection] * waku/waku_filter_v2/protocol.nim starts listening for peer-left events in order to completely remove the previous Connection instance. Also, a new Connection is added when the filter-service starts publishing to its peers. --------- Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> --- tests/node/test_wakunode_filter.nim | 3 +- waku/node/peer_manager/peer_manager.nim | 5 +++ waku/waku_filter_v2/protocol.nim | 54 ++++++++++++++++++++----- waku/waku_filter_v2/subscriptions.nim | 37 ++--------------- 4 files changed, 53 insertions(+), 46 deletions(-) diff --git a/tests/node/test_wakunode_filter.nim b/tests/node/test_wakunode_filter.nim index 563963f8d4..c9ea12f17a 100644 --- a/tests/node/test_wakunode_filter.nim +++ b/tests/node/test_wakunode_filter.nim @@ -556,7 +556,7 @@ suite "Waku Filter - End to End": ) discard await wakuFilter.subscriptions.addSubscription( - clientPeerId, filterCriteria.toHashSet(), peerManager + clientPeerId, filterCriteria.toHashSet() ) let @@ -605,7 +605,6 @@ suite "Waku Filter - End to End": await wakuFilter.subscriptions.addSubscription( peers[index].switch.peerInfo.peerId, @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet(), - peerManager, ) ).isOkOr: assert false, $error diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index e1a7b1071f..6894f55781 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -953,6 +953,11 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = trace "Pruning Peer", Peer = $p asyncSpawn(pm.switch.disconnect(p)) +proc addExtPeerEventHandler*( + pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind +) = + pm.switch.addPeerEventHandler(eventHandler, eventKind) + #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# # Initialization and Constructor # #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 12572b406d..22504488e0 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -28,6 +28,7 @@ type WakuFilter* = ref object of LPProtocol messageCache: TimedCache[string] peerRequestRateLimiter*: PerPeerRateLimiter subscriptionsManagerFut: Future[void] + peerConnections: Table[PeerId, Connection] proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = debug "pinging subscriber", peerId = peerId @@ -69,7 +70,7 @@ proc subscribe( debug "subscribing peer to filter criteria", peerId = peerId, filterCriteria = filterCriteria - (await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr: + (await wf.subscriptions.addSubscription(peerId, filterCriteria)).isOkOr: return err(FilterSubscribeError.serviceUnavailable(error)) debug "correct subscription", peerId = peerId @@ -166,24 +167,40 @@ proc handleSubscribeRequest*( else: return FilterSubscribeResponse.ok(request.requestId) -proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = - debug "pushing message to subscribed peer", peerId = shortLog(peer) +proc pushToPeer( + wf: WakuFilter, peerId: PeerId, buffer: seq[byte] +): Future[Result[void, string]] {.async.} = + debug "pushing message to subscribed peer", peerId = shortLog(peerId) - if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec): + if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec): # Check that peer has not been removed from peer store - error "no addresses for peer", peerId = shortLog(peer) - return + error "no addresses for peer", peerId = shortLog(peerId) + return err("no addresses for peer: " & $peerId) - let conn = wf.subscriptions.getConnectionByPeerId(peer).valueOr: - error "could not get connection by peer id", error = $error - return + let conn = + if wf.peerConnections.contains(peerId): + wf.peerConnections[peerId] + else: + ## we never pushed a message before, let's dial then + let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec) + if connRes.isNone(): + ## We do not remove this peer, but allow the underlying peer manager + ## to do so if it is deemed necessary + error "pushToPeer no connection to peer", peerId = shortLog(peerId) + return err("pushToPeer no connection to peer: " & shortLog(peerId)) + + let newConn = connRes.get() + wf.peerConnections[peerId] = newConn + newConn await conn.writeLp(buffer) - debug "published successful", peerId = shortLog(peer), conn + debug "published successful", peerId = shortLog(peerId), conn waku_service_network_bytes.inc( amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"] ) + return ok() + proc pushToPeers( wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush ) {.async.} = @@ -208,11 +225,12 @@ proc pushToPeers( msg_hash = msgHash let bufferToPublish = messagePush.encode().buffer - var pushFuts: seq[Future[void]] + var pushFuts: seq[Future[Result[void, string]]] for peerId in peers: let pushFut = wf.pushToPeer(peerId, bufferToPublish) pushFuts.add(pushFut) + await allFutures(pushFuts) proc maintainSubscriptions*(wf: WakuFilter) {.async.} = @@ -324,6 +342,15 @@ proc initProtocolHandler(wf: WakuFilter) = wf.handler = handler wf.codec = WakuFilterSubscribeCodec +proc onPeerEventHandler(wf: WakuFilter, peerId: PeerId, event: PeerEvent) {.async.} = + ## These events are dispatched nim-libp2p, triggerPeerEvents proc + case event.kind + of Left: + ## Drop the previous known connection reference + wf.peerConnections.del(peerId) + else: + discard + proc new*( T: type WakuFilter, peerManager: PeerManager, @@ -342,6 +369,11 @@ proc new*( peerRequestRateLimiter: PerPeerRateLimiter(setting: rateLimitSetting), ) + proc peerEventHandler(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} = + wf.onPeerEventHandler(peerId, event) + + peerManager.addExtPeerEventHandler(peerEventHandler, PeerEventKind.Left) + wf.initProtocolHandler() setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting) return wf diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index a490d47539..6b22a94b95 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -31,7 +31,7 @@ type SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids - PeerData* = tuple[lastSeen: Moment, criteriaCount: uint, connection: Connection] + PeerData* = tuple[lastSeen: Moment, criteriaCount: uint] FilterSubscriptions* = ref object peersSubscribed*: Table[PeerID, PeerData] @@ -46,7 +46,6 @@ proc new*( maxFilterPeers: uint32 = MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, ): FilterSubscriptions = - ## Create a new filter subscription object return FilterSubscriptions( peersSubscribed: initTable[PeerID, PeerData](), subscriptions: initTable[FilterCriterion, SubscribedPeers](), @@ -103,10 +102,6 @@ proc removePeer*(s: FilterSubscriptions, peerId: PeerID) {.async.} = debug "removePeer", currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId - s.peersSubscribed.withValue(peerId, peerData): - debug "closing connection with peer", peerId = shortLog(peerId) - await peerData.connection.close() - s.peersSubscribed.del(peerId) debug "removePeer after deletion", @@ -146,15 +141,10 @@ proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) = data.lastSeen = Moment.now() proc addSubscription*( - s: FilterSubscriptions, - peerId: PeerID, - filterCriteria: FilterCriteria, - peerManager: PeerManager, + s: FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria ): Future[Result[void, string]] {.async.} = ## Add a subscription for a given peer - ## - ## The peerManager is needed to establish the first Connection through - ## /vac/waku/filter-push/2.0.0-beta1 + var peerData: ptr PeerData s.peersSubscribed.withValue(peerId, data): @@ -168,17 +158,7 @@ proc addSubscription*( if cast[uint](s.peersSubscribed.len) >= s.maxPeers: return err("node has reached maximum number of subscriptions: " & $(s.maxPeers)) - let connRes = await peerManager.dialPeer(peerId, WakuFilterPushCodec) - if connRes.isNone(): - ## We do not remove this peer, but allow the underlying peer manager - ## to do so if it is deemed necessary - return err("addSubscription no connection to peer: " & shortLog(peerId)) - - let newPeerData: PeerData = - (lastSeen: Moment.now(), criteriaCount: 0, connection: connRes.get()) - - debug "new WakuFilterPushCodec stream", conn = connRes.get() - + let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0) peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData)) for filterCriterion in filterCriteria: @@ -216,14 +196,5 @@ proc removeSubscription*( do: return err("Peer has no subscriptions") -proc getConnectionByPeerId*( - s: FilterSubscriptions, peerId: PeerID -): Result[Connection, string] = - if not s.peersSubscribed.hasKey(peerId): - return err("peer not subscribed: " & shortLog(peerId)) - - let peerData = s.peersSubscribed.getOrDefault(peerId) - return ok(peerData.connection) - proc setSubscriptionTimeout*(s: FilterSubscriptions, newTimeout: Duration) = s.subscriptionTimeout = newTimeout