Skip to content

Commit

Permalink
filter: rm all subscription from a peer that is leaving
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Jan 30, 2025
1 parent 287e9b1 commit e91688f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
5 changes: 5 additions & 0 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,11 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
trace "Pruning Peer", Peer = $p
asyncSpawn(pm.switch.disconnect(p))

proc addExtPeerEventHandler*(
pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind
) =
pm.switch.addPeerEventHandler(eventHandler, eventKind)

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Initialization and Constructor #
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
Expand Down
4 changes: 2 additions & 2 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ proc subscribe(
debug "subscribing peer to filter criteria",
peerId = peerId, filterCriteria = filterCriteria

(await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr:
(await wf.subscriptions.addSubscription(peerId, filterCriteria)).isOkOr:
return err(FilterSubscribeError.serviceUnavailable(error))

debug "correct subscription", peerId = peerId
Expand Down Expand Up @@ -335,7 +335,7 @@ proc new*(
): T =
let wf = WakuFilter(
subscriptions: FilterSubscriptions.new(
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer, peerManager
),
peerManager: peerManager,
messageCache: init(TimedCache[string], messageCacheTTL),
Expand Down
30 changes: 23 additions & 7 deletions waku/waku_filter_v2/subscriptions.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,41 @@ type
subscriptionTimeout: Duration
maxPeers: uint
maxCriteriaPerPeer: uint
peerManager: PeerManager

proc onPeerEventHandler(
s: FilterSubscriptions, peerId: PeerId, event: PeerEvent
) {.async.} =
case event.kind
of Left:
## Drop all the existing subscriptions with that peer
s.peersSubscribed.del(peerId)
else:
discard

proc new*(
T: type FilterSubscriptions,
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
peerManager: PeerManager,
): FilterSubscriptions =
## Create a new filter subscription object
return FilterSubscriptions(
let fs = FilterSubscriptions(
peersSubscribed: initTable[PeerID, PeerData](),
subscriptions: initTable[FilterCriterion, SubscribedPeers](),
subscriptionTimeout: subscriptionTimeout,
maxPeers: maxFilterPeers,
maxCriteriaPerPeer: maxFilterCriteriaPerPeer,
peerManager: peerManager,
)

proc peerEventHandler(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =
fs.onPeerEventHandler(peerId, event)

peerManager.addExtPeerEventHandler(peerEventHandler, PeerEventKind.Left)

return fs

proc isSubscribed*(s: FilterSubscriptions, peerId: PeerID): bool =
s.peersSubscribed.withValue(peerId, data):
return Moment.now() - data.lastSeen <= s.subscriptionTimeout
Expand Down Expand Up @@ -146,10 +165,7 @@ proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
data.lastSeen = Moment.now()

proc addSubscription*(
s: FilterSubscriptions,
peerId: PeerID,
filterCriteria: FilterCriteria,
peerManager: PeerManager,
s: FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
): Future[Result[void, string]] {.async.} =
## Add a subscription for a given peer
##
Expand All @@ -168,7 +184,7 @@ proc addSubscription*(
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
return err("node has reached maximum number of subscriptions: " & $(s.maxPeers))

let connRes = await peerManager.dialPeer(peerId, WakuFilterPushCodec)
let connRes = await s.peerManager.dialPeer(peerId, WakuFilterPushCodec)
if connRes.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
Expand Down

0 comments on commit e91688f

Please sign in to comment.