diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7de0a055ab..4290707495 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: permissions: pull-requests: read steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 name: Checkout code id: checkout - uses: dorny/paths-filter@v2 @@ -61,7 +61,7 @@ jobs: name: build-${{ matrix.os }} steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Get submodules hash id: submodules @@ -92,7 +92,7 @@ jobs: name: test-${{ matrix.os }} steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Get submodules hash id: submodules @@ -158,7 +158,7 @@ jobs: needs: build steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Get submodules hash id: submodules diff --git a/.github/workflows/container-image.yml b/.github/workflows/container-image.yml index 7e993bfaac..cfa66d20ad 100644 --- a/.github/workflows/container-image.yml +++ b/.github/workflows/container-image.yml @@ -44,7 +44,7 @@ jobs: - name: Checkout code if: ${{ steps.secrets.outcome == 'success' }} - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Get submodules hash id: submodules diff --git a/.github/workflows/pr-lint.yml b/.github/workflows/pr-lint.yml index 02da07a4af..45f9dcaad1 100644 --- a/.github/workflows/pr-lint.yml +++ b/.github/workflows/pr-lint.yml @@ -53,7 +53,7 @@ jobs: runs-on: ubuntu-22.04 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 name: Checkout code id: checkout - uses: dorny/paths-filter@v2 diff --git a/.github/workflows/pre-release.yml b/.github/workflows/pre-release.yml index 39223c1953..cf67112604 100644 --- a/.github/workflows/pre-release.yml +++ b/.github/workflows/pre-release.yml @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-22.04 steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Vars id: vars @@ -42,7 +42,7 @@ jobs: runs-on: ${{ matrix.os }} steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: prep variables id: vars @@ -117,7 +117,7 @@ jobs: needs: [ tag-name, build-and-publish ] steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 ref: master diff --git a/CHANGELOG.md b/CHANGELOG.md index f3b32d2c32..cbbf273b97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,88 @@ +## v0.35.0 (2025-02-06) + +### Notes + +- Deprecated parameter + - max-relay-peers + +- New parameters + - relay-service-ratio + + String value with peers distribution within max-connections parameter. + This percentage ratio represents the relay peers to service peers. + For example, 60:40, tells that 60% of the max-connections will be used for relay protocol + and the other 40% of max-connections will be reserved for other service protocols (e.g., + filter, lightpush, store, metadata, etc.) + + - rendezvous + + boolean attribute that optionally activates waku rendezvous discovery server. + True by default. + +### Release highlights + +- New filter approach to keep push stream opened within subscription period. +- Waku sync protocol. +- Libwaku async +- Lite-protocol-tester enhancements. +- New panels and metrics in RLN to control outstanding request quota. + +### Features + +- refactor filter to react when the remote peer closes the stream ([#3281](https://github.com/waku-org/nwaku/issues/3281)) ([5392b8ea4](https://github.com/waku-org/nwaku/commit/5392b8ea4)) +- waku sync shard matching check ([#3259](https://github.com/waku-org/nwaku/issues/3259)) ([42fd6b827](https://github.com/waku-org/nwaku/commit/42fd6b827)) +- waku store sync 2.0 config & setup ([#3217](https://github.com/waku-org/nwaku/issues/3217)) ([7f64dc03a](https://github.com/waku-org/nwaku/commit/7f64dc03a)) +- waku store sync 2.0 protocols & tests ([#3216](https://github.com/waku-org/nwaku/issues/3216)) ([6ee494d90](https://github.com/waku-org/nwaku/commit/6ee494d90)) +- waku store sync 2.0 storage & tests ([#3215](https://github.com/waku-org/nwaku/issues/3215)) ([54a7a6875](https://github.com/waku-org/nwaku/commit/54a7a6875)) +- waku store sync 2.0 common types & codec ([#3213](https://github.com/waku-org/nwaku/issues/3213)) ([29fda2dab](https://github.com/waku-org/nwaku/commit/29fda2dab)) +- add txhash-based eligibility checks for incentivization PoC ([#3166](https://github.com/waku-org/nwaku/issues/3166)) ([505ec84ce](https://github.com/waku-org/nwaku/commit/505ec84ce)) +- connection change event ([#3225](https://github.com/waku-org/nwaku/issues/3225)) ([e81a5517b](https://github.com/waku-org/nwaku/commit/e81a5517b)) +- libwaku add protected topic ([#3211](https://github.com/waku-org/nwaku/issues/3211)) ([d932dd10c](https://github.com/waku-org/nwaku/commit/d932dd10c)) +- topic health tracking ([#3212](https://github.com/waku-org/nwaku/issues/3212)) ([6020a673b](https://github.com/waku-org/nwaku/commit/6020a673b)) +- allowing configuration of application level callbacks ([#3206](https://github.com/waku-org/nwaku/issues/3206)) ([049fbeabb](https://github.com/waku-org/nwaku/commit/049fbeabb)) +- waku rendezvous wrapper ([#2962](https://github.com/waku-org/nwaku/issues/2962)) ([650a9487e](https://github.com/waku-org/nwaku/commit/650a9487e)) +- making dns discovery async ([#3175](https://github.com/waku-org/nwaku/issues/3175)) ([d7d00bfd7](https://github.com/waku-org/nwaku/commit/d7d00bfd7)) +- remove Waku Sync 1.0 & Negentropy ([#3185](https://github.com/waku-org/nwaku/issues/3185)) ([2ab9c3d36](https://github.com/waku-org/nwaku/commit/2ab9c3d36)) +- add waku_dial_peer and get_connected_peers to libwaku ([#3149](https://github.com/waku-org/nwaku/issues/3149)) ([507b1fc4d](https://github.com/waku-org/nwaku/commit/507b1fc4d)) +- running periodicaly peer exchange if discv5 is disabled ([#3150](https://github.com/waku-org/nwaku/issues/3150)) ([400d7a54f](https://github.com/waku-org/nwaku/commit/400d7a54f)) + +### Bug Fixes + +- filter enhancements in subscription management ([#3198](https://github.com/waku-org/nwaku/issues/3198)) ([287e9b12c](https://github.com/waku-org/nwaku/commit/287e9b12c)) +- avoid double db migration for sqlite ([#3244](https://github.com/waku-org/nwaku/issues/3244)) ([2ce245354](https://github.com/waku-org/nwaku/commit/2ce245354)) +- libwaku waku_relay_unsubscribe ([#3207](https://github.com/waku-org/nwaku/issues/3207)) ([ab0c1d4aa](https://github.com/waku-org/nwaku/commit/ab0c1d4aa)) +- libwaku support string and int64 for timestamps ([#3205](https://github.com/waku-org/nwaku/issues/3205)) ([2022f54f5](https://github.com/waku-org/nwaku/commit/2022f54f5)) +- lite-protocol-tester receiver exit check ([#3187](https://github.com/waku-org/nwaku/issues/3187)) ([beb21c78f](https://github.com/waku-org/nwaku/commit/beb21c78f)) +- linting error ([#3156](https://github.com/waku-org/nwaku/issues/3156)) ([99ac68447](https://github.com/waku-org/nwaku/commit/99ac68447)) + +### Changes + +- filter remove all subscription from a peer that is leaving ([#3267](https://github.com/waku-org/nwaku/issues/3267)) ([46747fd49](https://github.com/waku-org/nwaku/commit/46747fd49)) +- send msg hash as string on libwaku message event ([#3234](https://github.com/waku-org/nwaku/issues/3234)) ([9c209b4c3](https://github.com/waku-org/nwaku/commit/9c209b4c3)) +- separate heaptrack from debug build ([#3249](https://github.com/waku-org/nwaku/issues/3249)) ([81f24cc25](https://github.com/waku-org/nwaku/commit/81f24cc25)) +- capping mechanism for relay and service connections ([#3184](https://github.com/waku-org/nwaku/issues/3184)) ([2942782f9](https://github.com/waku-org/nwaku/commit/2942782f9)) +- add extra migration to sqlite and improving error message ([#3240](https://github.com/waku-org/nwaku/issues/3240)) ([bfd60ceab](https://github.com/waku-org/nwaku/commit/bfd60ceab)) +- optimize libwaku size ([#3242](https://github.com/waku-org/nwaku/issues/3242)) ([9c0ad8517](https://github.com/waku-org/nwaku/commit/9c0ad8517)) +- golang example end using negentropy dependency plus simple readme.md ([#3235](https://github.com/waku-org/nwaku/issues/3235)) ([0e0fcfb1a](https://github.com/waku-org/nwaku/commit/0e0fcfb1a)) +- enhance libwaku store protocol and more ([#3223](https://github.com/waku-org/nwaku/issues/3223)) ([22ce9ee87](https://github.com/waku-org/nwaku/commit/22ce9ee87)) +- add two RLN metrics and panel ([#3181](https://github.com/waku-org/nwaku/issues/3181)) ([1b532e8ab](https://github.com/waku-org/nwaku/commit/1b532e8ab)) +- libwaku async ([#3180](https://github.com/waku-org/nwaku/issues/3180)) ([47a623541](https://github.com/waku-org/nwaku/commit/47a623541)) +- filter protocol in libwaku ([#3177](https://github.com/waku-org/nwaku/issues/3177)) ([f856298ca](https://github.com/waku-org/nwaku/commit/f856298ca)) +- add supervisor for lite-protocol-tester infra ([#3176](https://github.com/waku-org/nwaku/issues/3176)) ([a7264d68c](https://github.com/waku-org/nwaku/commit/a7264d68c)) +- libwaku better error handling and better waku thread destroy handling ([#3167](https://github.com/waku-org/nwaku/issues/3167)) ([294dd03c4](https://github.com/waku-org/nwaku/commit/294dd03c4)) +- libwaku allow several multiaddresses for a single peer in store queries ([#3171](https://github.com/waku-org/nwaku/issues/3171)) ([3cb8ebdd8](https://github.com/waku-org/nwaku/commit/3cb8ebdd8)) +- naming connectPeer procedure ([#3157](https://github.com/waku-org/nwaku/issues/3157)) ([b3656d6ee](https://github.com/waku-org/nwaku/commit/b3656d6ee)) + +This release supports the following [libp2p protocols](https://docs.libp2p.io/concepts/protocols/): +| Protocol | Spec status | Protocol id | +| ---: | :---: | :--- | +| [`11/WAKU2-RELAY`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/11/relay.md) | `stable` | `/vac/waku/relay/2.0.0` | +| [`12/WAKU2-FILTER`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/12/filter.md) | `draft` | `/vac/waku/filter/2.0.0-beta1`
`/vac/waku/filter-subscribe/2.0.0-beta1`
`/vac/waku/filter-push/2.0.0-beta1` | +| [`13/WAKU2-STORE`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/13/store.md) | `draft` | `/vac/waku/store/2.0.0-beta4` | +| [`19/WAKU2-LIGHTPUSH`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` | +| [`66/WAKU2-METADATA`](https://github.com/waku-org/specs/blob/master/standards/core/metadata.md) | `raw` | `/vac/waku/metadata/1.0.0` | +| [`WAKU-SYNC`](https://github.com/waku-org/specs/blob/master/standards/core/sync.md) | `draft` | `/vac/waku/reconciliation/1.0.0` & `/vac/waku/transfer/1.0.0` | + ## v0.34.0 (2024-10-29) ### Notes: diff --git a/tests/node/test_wakunode_legacy_store.nim b/tests/node/test_wakunode_legacy_store.nim index b52dc6e6eb..5b0409d865 100644 --- a/tests/node/test_wakunode_legacy_store.nim +++ b/tests/node/test_wakunode_legacy_store.nim @@ -46,16 +46,16 @@ suite "Waku Store - End to End - Sorted Archive": let timeOrigin = now() archiveMessages = @[ - fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)), - fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)), - fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)), - fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)), - fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)), - fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)), - fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)), - fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)), - fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)), - fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)), + fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin)), + fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin)), + fakeWakuMessage(@[byte 02], ts = ts(20, timeOrigin)), + fakeWakuMessage(@[byte 03], ts = ts(30, timeOrigin)), + fakeWakuMessage(@[byte 04], ts = ts(40, timeOrigin)), + fakeWakuMessage(@[byte 05], ts = ts(50, timeOrigin)), + fakeWakuMessage(@[byte 06], ts = ts(60, timeOrigin)), + fakeWakuMessage(@[byte 07], ts = ts(70, timeOrigin)), + fakeWakuMessage(@[byte 08], ts = ts(80, timeOrigin)), + fakeWakuMessage(@[byte 09], ts = ts(90, timeOrigin)), ] historyQuery = HistoryQuery( @@ -657,23 +657,23 @@ suite "Waku Store - End to End - Archive with Multiple Topics": pageSize: 5, ) - let timeOrigin = now() - 90 + let timeOrigin = now() originTs = proc(offset = 0): Timestamp {.gcsafe, raises: [].} = ts(offset, timeOrigin) archiveMessages = @[ - fakeWakuMessage(@[byte 00], ts = originTs(-90), contentTopic = contentTopic), - fakeWakuMessage(@[byte 01], ts = originTs(-80), contentTopic = contentTopicB), - fakeWakuMessage(@[byte 02], ts = originTs(-70), contentTopic = contentTopicC), - fakeWakuMessage(@[byte 03], ts = originTs(-60), contentTopic = contentTopic), - fakeWakuMessage(@[byte 04], ts = originTs(-50), contentTopic = contentTopicB), - fakeWakuMessage(@[byte 05], ts = originTs(-40), contentTopic = contentTopicC), - fakeWakuMessage(@[byte 06], ts = originTs(-30), contentTopic = contentTopic), - fakeWakuMessage(@[byte 07], ts = originTs(-20), contentTopic = contentTopicB), - fakeWakuMessage(@[byte 08], ts = originTs(-10), contentTopic = contentTopicC), + fakeWakuMessage(@[byte 00], ts = originTs(00), contentTopic = contentTopic), + fakeWakuMessage(@[byte 01], ts = originTs(10), contentTopic = contentTopicB), + fakeWakuMessage(@[byte 02], ts = originTs(20), contentTopic = contentTopicC), + fakeWakuMessage(@[byte 03], ts = originTs(30), contentTopic = contentTopic), + fakeWakuMessage(@[byte 04], ts = originTs(40), contentTopic = contentTopicB), + fakeWakuMessage(@[byte 05], ts = originTs(50), contentTopic = contentTopicC), + fakeWakuMessage(@[byte 06], ts = originTs(60), contentTopic = contentTopic), + fakeWakuMessage(@[byte 07], ts = originTs(70), contentTopic = contentTopicB), + fakeWakuMessage(@[byte 08], ts = originTs(80), contentTopic = contentTopicC), fakeWakuMessage( - @[byte 09], ts = originTs(00), contentTopic = contentTopicSpecials + @[byte 09], ts = originTs(90), contentTopic = contentTopicSpecials ), ] @@ -827,9 +827,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics": suite "Validation of Time-based Filtering": asyncTest "Basic Time Filtering": # Given a history query with start and end time - - historyQuery.startTime = some(originTs(-90)) - historyQuery.endTime = some(originTs(-70)) + historyQuery.startTime = some(originTs(20)) + historyQuery.endTime = some(originTs(40)) # When making a history query let queryResponse = await client.query(historyQuery, serverRemotePeerInfo) @@ -837,13 +836,12 @@ suite "Waku Store - End to End - Archive with Multiple Topics": # Then the response contains the messages check: queryResponse.get().messages == - @[archiveMessages[0], archiveMessages[1], archiveMessages[2]] + @[archiveMessages[2], archiveMessages[3], archiveMessages[4]] asyncTest "Only Start Time Specified": # Given a history query with only start time - historyQuery.startTime = some(originTs(-20)) + historyQuery.startTime = some(originTs(20)) historyQuery.endTime = none(Timestamp) - historyQuery.pubsubTopic = none(string) # When making a history query let queryResponse = await client.query(historyQuery, serverRemotePeerInfo) @@ -851,7 +849,12 @@ suite "Waku Store - End to End - Archive with Multiple Topics": # Then the response contains the messages check: queryResponse.get().messages == - @[archiveMessages[7], archiveMessages[8], archiveMessages[9]] + @[ + archiveMessages[2], + archiveMessages[3], + archiveMessages[4], + archiveMessages[5], + ] asyncTest "Only End Time Specified": # Given a history query with only end time @@ -886,8 +889,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics": asyncTest "Time Filtering with Content Filtering": # Given a history query with time and content filtering - historyQuery.startTime = some(originTs(-90)) - historyQuery.endTime = some(originTs(-60)) + historyQuery.startTime = some(originTs(20)) + historyQuery.endTime = some(originTs(60)) historyQuery.contentTopics = @[contentTopicC] # When making a history query @@ -895,7 +898,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics": # Then the response contains the messages check: - queryResponse.get().messages == @[archiveMessages[2]] + queryResponse.get().messages == @[archiveMessages[2], archiveMessages[5]] asyncTest "Messages Outside of Time Range": # Given a history query with a valid time range which does not contain any messages diff --git a/tests/node/test_wakunode_store.nim b/tests/node/test_wakunode_store.nim index 4442fb8fed..49c24c6d83 100644 --- a/tests/node/test_wakunode_store.nim +++ b/tests/node/test_wakunode_store.nim @@ -47,16 +47,16 @@ suite "Waku Store - End to End - Sorted Archive": let timeOrigin = now() let messages = @[ - fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)), - fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)), - fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)), - fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)), - fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)), - fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)), - fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)), - fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)), - fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)), - fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)), + fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin)), + fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin)), + fakeWakuMessage(@[byte 02], ts = ts(20, timeOrigin)), + fakeWakuMessage(@[byte 03], ts = ts(30, timeOrigin)), + fakeWakuMessage(@[byte 04], ts = ts(40, timeOrigin)), + fakeWakuMessage(@[byte 05], ts = ts(50, timeOrigin)), + fakeWakuMessage(@[byte 06], ts = ts(60, timeOrigin)), + fakeWakuMessage(@[byte 07], ts = ts(70, timeOrigin)), + fakeWakuMessage(@[byte 08], ts = ts(80, timeOrigin)), + fakeWakuMessage(@[byte 09], ts = ts(90, timeOrigin)), ] archiveMessages = messages.mapIt( WakuMessageKeyValue( @@ -909,17 +909,17 @@ suite "Waku Store - End to End - Archive with Multiple Topics": let messages = @[ - fakeWakuMessage(@[byte 00], ts = originTs(-90), contentTopic = contentTopic), - fakeWakuMessage(@[byte 01], ts = originTs(-80), contentTopic = contentTopicB), - fakeWakuMessage(@[byte 02], ts = originTs(-70), contentTopic = contentTopicC), - fakeWakuMessage(@[byte 03], ts = originTs(-60), contentTopic = contentTopic), - fakeWakuMessage(@[byte 04], ts = originTs(-50), contentTopic = contentTopicB), - fakeWakuMessage(@[byte 05], ts = originTs(-40), contentTopic = contentTopicC), - fakeWakuMessage(@[byte 06], ts = originTs(-30), contentTopic = contentTopic), - fakeWakuMessage(@[byte 07], ts = originTs(-20), contentTopic = contentTopicB), - fakeWakuMessage(@[byte 08], ts = originTs(-10), contentTopic = contentTopicC), + fakeWakuMessage(@[byte 00], ts = originTs(00), contentTopic = contentTopic), + fakeWakuMessage(@[byte 01], ts = originTs(10), contentTopic = contentTopicB), + fakeWakuMessage(@[byte 02], ts = originTs(20), contentTopic = contentTopicC), + fakeWakuMessage(@[byte 03], ts = originTs(30), contentTopic = contentTopic), + fakeWakuMessage(@[byte 04], ts = originTs(40), contentTopic = contentTopicB), + fakeWakuMessage(@[byte 05], ts = originTs(50), contentTopic = contentTopicC), + fakeWakuMessage(@[byte 06], ts = originTs(60), contentTopic = contentTopic), + fakeWakuMessage(@[byte 07], ts = originTs(70), contentTopic = contentTopicB), + fakeWakuMessage(@[byte 08], ts = originTs(80), contentTopic = contentTopicC), fakeWakuMessage( - @[byte 09], ts = originTs(00), contentTopic = contentTopicSpecials + @[byte 09], ts = originTs(90), contentTopic = contentTopicSpecials ), ] @@ -1089,9 +1089,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics": suite "Validation of Time-based Filtering": asyncTest "Basic Time Filtering": # Given a history query with start and end time - storeQuery.startTime = some(originTs(-90)) - storeQuery.endTime = some(originTs(-60)) - storeQuery.contentTopics = @[contentTopic, contentTopicB, contentTopicC] + storeQuery.startTime = some(originTs(20)) + storeQuery.endTime = some(originTs(40)) # When making a history query let queryResponse = await client.query(storeQuery, serverRemotePeerInfo) @@ -1099,16 +1098,11 @@ suite "Waku Store - End to End - Archive with Multiple Topics": # Then the response contains the messages check: queryResponse.get().messages == - @[ - archiveMessages[0], - archiveMessages[1], - archiveMessages[2], - archiveMessages[3], - ] + @[archiveMessages[2], archiveMessages[3], archiveMessages[4]] asyncTest "Only Start Time Specified": # Given a history query with only start time - storeQuery.startTime = some(originTs(-40)) + storeQuery.startTime = some(originTs(20)) storeQuery.endTime = none(Timestamp) # When making a history query @@ -1116,19 +1110,32 @@ suite "Waku Store - End to End - Archive with Multiple Topics": # Then the response contains the messages check: - queryResponse.get().messages == @[archiveMessages[5]] + queryResponse.get().messages == + @[ + archiveMessages[2], + archiveMessages[3], + archiveMessages[4], + archiveMessages[5], + ] asyncTest "Only End Time Specified": # Given a history query with only end time storeQuery.startTime = none(Timestamp) - storeQuery.endTime = some(originTs(-80)) + storeQuery.endTime = some(originTs(40)) # When making a history query let queryResponse = await client.query(storeQuery, serverRemotePeerInfo) # Then the response contains no messages check: - queryResponse.get().messages == @[archiveMessages[0], archiveMessages[1]] + queryResponse.get().messages == + @[ + archiveMessages[0], + archiveMessages[1], + archiveMessages[2], + archiveMessages[3], + archiveMessages[4], + ] asyncTest "Invalid Time Range": # Given a history query with invalid time range @@ -1144,8 +1151,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics": asyncTest "Time Filtering with Content Filtering": # Given a history query with time and content filtering - storeQuery.startTime = some(originTs(-60)) - storeQuery.endTime = some(originTs(-20)) + storeQuery.startTime = some(originTs(20)) + storeQuery.endTime = some(originTs(60)) storeQuery.contentTopics = @[contentTopicC] # When making a history query @@ -1153,7 +1160,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics": # Then the response contains the messages check: - queryResponse.get().messages == @[archiveMessages[5]] + queryResponse.get().messages == @[archiveMessages[2], archiveMessages[5]] asyncTest "Messages Outside of Time Range": # Given a history query with a valid time range which does not contain any messages diff --git a/tests/waku_archive/test_waku_archive.nim b/tests/waku_archive/test_waku_archive.nim index 5162d310c6..9e1b927e09 100644 --- a/tests/waku_archive/test_waku_archive.nim +++ b/tests/waku_archive/test_waku_archive.nim @@ -491,8 +491,7 @@ procSuite "Waku Archive - find messages": response.messages.mapIt(it.timestamp) == @[ts(30, timeOrigin), ts(50, timeOrigin)] test "handle temporal history query with a zero-size time window": - ## A zero-size window results in an error to the store client. That kind of queries - ## are pointless and we need to rapidly inform about that to the client. + ## A zero-size window results in an empty list of history messages ## Given let req = ArchiveQuery( contentTopics: @[ContentTopic("1")], @@ -504,45 +503,27 @@ procSuite "Waku Archive - find messages": let res = waitFor archiveA.findMessages(req) ## Then - check not res.isOk() + check res.isOk() + + let response = res.tryGet() + check: + response.messages.len == 0 test "handle temporal history query with an invalid time window": - ## A query with an invalid time range should immediately return a query error to the client + ## A history query with an invalid time range results in an empty list of history messages ## Given let req = ArchiveQuery( contentTopics: @[ContentTopic("1")], startTime: some(Timestamp(5)), - endTime: some(Timestamp(4)), + endTime: some(Timestamp(2)), ) ## When let res = waitFor archiveA.findMessages(req) ## Then - check not res.isOk() - - test "time range should be smaller than 24h": - let oneDayRangeNanos = 86_400_000_000_000 - let now = getNowInNanosecondTime() - - var res = waitFor archiveA.findMessages( - ArchiveQuery( - contentTopics: @[ContentTopic("1")], - startTime: some(Timestamp(now - oneDayRangeNanos - 1)), - endTime: some(Timestamp(now)), - ) - ) - - ## It fails if range is a bit bigger than 24h - check not res.isOk() - - res = waitFor archiveA.findMessages( - ArchiveQuery( - contentTopics: @[ContentTopic("1")], - startTime: some(Timestamp(now - oneDayRangeNanos)), - endTime: some(Timestamp(now)), - ) - ) - - ## Ok if range is 24h check res.isOk() + + let response = res.tryGet() + check: + response.messages.len == 0 diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index c5aa3736f9..a4f0a638e7 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit c5aa3736f96e4d66f6aa653a2351ded74b7d21a9 +Subproject commit a4f0a638e718f05ecec01ae3a6ad2838714e7e40 diff --git a/waku/incentivization/eligibility_manager.nim b/waku/incentivization/eligibility_manager.nim index 3343f7186a..da8280da3e 100644 --- a/waku/incentivization/eligibility_manager.nim +++ b/waku/incentivization/eligibility_manager.nim @@ -13,7 +13,8 @@ type EligibilityManager* = ref object # FIXME: make web3 private? proc init*( T: type EligibilityManager, ethClient: string ): Future[EligibilityManager] {.async.} = - return EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]()) + return + EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]()) # TODO: handle error if web3 instance is not established # Clean up the web3 instance diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 6894f55781..ba04b6b004 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -518,6 +518,42 @@ proc connectedPeers*( return (inPeers, outPeers) +proc getStreamByPeerIdAndProtocol*( + pm: PeerManager, peerId: PeerId, protocol: string +): Future[Result[Connection, string]] {.async.} = + ## Establishes a new stream to the given peer and protocol or returns the existing stream, if any. + ## Notice that the "Connection" type represents a stream within a transport connection + ## (we will need to adapt this term.) + + let peerIdsMuxers: Table[PeerId, seq[Muxer]] = pm.switch.connManager.getConnections() + if not peerIdsMuxers.contains(peerId): + return err("peerId not found in connManager: " & $peerId) + + let muxers = peerIdsMuxers[peerId] + + var streams = newSeq[Connection](0) + for m in muxers: + for s in m.getStreams(): + ## getStreams is defined in nim-libp2p + streams.add(s) + + ## Try to get the opened streams for the given protocol + let streamsOfInterest = streams.filterIt( + it.protocol == protocol and not LPStream(it).isClosed and + not LPStream(it).isClosedRemotely + ) + + if streamsOfInterest.len > 0: + ## In theory there should be one stream per protocol. Then we just pick up the 1st + return ok(streamsOfInterest[0]) + + ## There isn't still a stream. Let's dial to create one + let streamRes = await pm.dialPeer(peerId, protocol) + if streamRes.isNone(): + return err("getStreamByPeerIdProto no connection to peer: " & $peerId) + + return ok(streamRes.get()) + proc connectToRelayPeers*(pm: PeerManager) {.async.} = var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) let totalRelayPeers = inRelayPeers.len + outRelayPeers.len diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 4c22521c26..4088b42a8c 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -163,41 +163,11 @@ proc syncMessageIngress*( return ok() -proc validateTimeRange( - startTime: Option[Timestamp], endTime: Option[Timestamp] -): Result[void, ArchiveError] = - ## Returns ok if the given time range is shorter than one day, and error otherwise. - ## We restrict the maximum allowed time of 24h to prevent excessive big queries. - - let oneDayRangeNanos = 86_400_000_000_000 - let now = getNowInNanosecondTime() - - var startTimeToValidate = now - oneDayRangeNanos - if startTime.isSome(): - startTimeToValidate = startTime.get() - - var endTimeToValidate = now - if endTime.isSome(): - endTimeToValidate = endTime.get() - - if startTimeToValidate >= endTimeToValidate: - return err(ArchiveError.invalidQuery("startTime should be before endTime")) - - if (endTimeToValidate - startTimeToValidate) > oneDayRangeNanos: - return err( - ArchiveError.invalidQuery("time range should be smaller than one day in nanos") - ) - - return ok() - proc findMessages*( self: WakuArchive, query: ArchiveQuery ): Future[ArchiveResult] {.async, gcsafe.} = ## Search the archive to return a single page of messages matching the query criteria - validateTimeRange(query.startTime, query.endTime).isOkOr: - return err(error) - if query.cursor.isSome(): let cursor = query.cursor.get() diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 22504488e0..d8b79ab670 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -172,29 +172,15 @@ proc pushToPeer( ): Future[Result[void, string]] {.async.} = debug "pushing message to subscribed peer", peerId = shortLog(peerId) - if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec): - # Check that peer has not been removed from peer store - error "no addresses for peer", peerId = shortLog(peerId) - return err("no addresses for peer: " & $peerId) - - let conn = - if wf.peerConnections.contains(peerId): - wf.peerConnections[peerId] - else: - ## we never pushed a message before, let's dial then - let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec) - if connRes.isNone(): - ## We do not remove this peer, but allow the underlying peer manager - ## to do so if it is deemed necessary - error "pushToPeer no connection to peer", peerId = shortLog(peerId) - return err("pushToPeer no connection to peer: " & shortLog(peerId)) - - let newConn = connRes.get() - wf.peerConnections[peerId] = newConn - newConn - - await conn.writeLp(buffer) - debug "published successful", peerId = shortLog(peerId), conn + let stream = ( + await wf.peerManager.getStreamByPeerIdAndProtocol(peerId, WakuFilterPushCodec) + ).valueOr: + error "pushToPeer failed", error + return err("pushToPeer failed: " & $error) + + await stream.writeLp(buffer) + + debug "published successful", peerId = shortLog(peerId), stream waku_service_network_bytes.inc( amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"] ) diff --git a/waku/waku_store_sync.nim b/waku/waku_store_sync.nim index 03c1b33afc..06699d9fd0 100644 --- a/waku/waku_store_sync.nim +++ b/waku/waku_store_sync.nim @@ -1,6 +1,8 @@ {.push raises: [].} import - ./waku_store_sync/reconciliation, ./waku_store_sync/transfer, ./waku_store_sync/common + ./waku_store_sync/reconciliation, + ./waku_store_sync/transfer, + ./waku_store_sync/common export reconciliation, transfer, common