From e91688fbd5ee0ee07fbc87444bf7f9c7cde560c2 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 30 Jan 2025 21:31:31 +0100 Subject: [PATCH] filter: rm all subscription from a peer that is leaving --- waku/node/peer_manager/peer_manager.nim | 5 +++++ waku/waku_filter_v2/protocol.nim | 4 ++-- waku/waku_filter_v2/subscriptions.nim | 30 +++++++++++++++++++------ 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 171b4c6fde..98fe945219 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -962,6 +962,11 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = trace "Pruning Peer", Peer = $p asyncSpawn(pm.switch.disconnect(p)) +proc addExtPeerEventHandler*( + pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind +) = + pm.switch.addPeerEventHandler(eventHandler, eventKind) + #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# # Initialization and Constructor # #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 12572b406d..6ee0fad452 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -69,7 +69,7 @@ proc subscribe( debug "subscribing peer to filter criteria", peerId = peerId, filterCriteria = filterCriteria - (await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr: + (await wf.subscriptions.addSubscription(peerId, filterCriteria)).isOkOr: return err(FilterSubscribeError.serviceUnavailable(error)) debug "correct subscription", peerId = peerId @@ -335,7 +335,7 @@ proc new*( ): T = let wf = WakuFilter( subscriptions: FilterSubscriptions.new( - subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer + subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer, peerManager ), peerManager: peerManager, messageCache: init(TimedCache[string], messageCacheTTL), diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index a490d47539..38a27692d9 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -39,22 +39,41 @@ type subscriptionTimeout: Duration maxPeers: uint maxCriteriaPerPeer: uint + peerManager: PeerManager + +proc onPeerEventHandler( + s: FilterSubscriptions, peerId: PeerId, event: PeerEvent +) {.async.} = + case event.kind + of Left: + ## Drop all the existing subscriptions with that peer + s.peersSubscribed.del(peerId) + else: + discard proc new*( T: type FilterSubscriptions, subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, + peerManager: PeerManager, ): FilterSubscriptions = - ## Create a new filter subscription object - return FilterSubscriptions( + let fs = FilterSubscriptions( peersSubscribed: initTable[PeerID, PeerData](), subscriptions: initTable[FilterCriterion, SubscribedPeers](), subscriptionTimeout: subscriptionTimeout, maxPeers: maxFilterPeers, maxCriteriaPerPeer: maxFilterCriteriaPerPeer, + peerManager: peerManager, ) + proc peerEventHandler(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} = + fs.onPeerEventHandler(peerId, event) + + peerManager.addExtPeerEventHandler(peerEventHandler, PeerEventKind.Left) + + return fs + proc isSubscribed*(s: FilterSubscriptions, peerId: PeerID): bool = s.peersSubscribed.withValue(peerId, data): return Moment.now() - data.lastSeen <= s.subscriptionTimeout @@ -146,10 +165,7 @@ proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) = data.lastSeen = Moment.now() proc addSubscription*( - s: FilterSubscriptions, - peerId: PeerID, - filterCriteria: FilterCriteria, - peerManager: PeerManager, + s: FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria ): Future[Result[void, string]] {.async.} = ## Add a subscription for a given peer ## @@ -168,7 +184,7 @@ proc addSubscription*( if cast[uint](s.peersSubscribed.len) >= s.maxPeers: return err("node has reached maximum number of subscriptions: " & $(s.maxPeers)) - let connRes = await peerManager.dialPeer(peerId, WakuFilterPushCodec) + let connRes = await s.peerManager.dialPeer(peerId, WakuFilterPushCodec) if connRes.isNone(): ## We do not remove this peer, but allow the underlying peer manager ## to do so if it is deemed necessary