Skip to content

Commit

Permalink
Merge branch 'master' into filter-peer-left-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Jan 30, 2025
2 parents e91688f + 42fd6b8 commit 6854745
Show file tree
Hide file tree
Showing 17 changed files with 251 additions and 95 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ NPH:=$(shell dirname $(NIM_BINARY))/nph

build-nph: | build deps
ifeq ("$(wildcard $(NPH))","")
$(ENV_SCRIPT) nim c vendor/nph/src/nph.nim && \
$(ENV_SCRIPT) nim c --skipParentCfg:on vendor/nph/src/nph.nim && \
mv vendor/nph/src/nph $(shell dirname $(NPH))
echo "nph utility is available at " $(NPH)
else
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ These instructions are generic. For more detailed instructions, see the Waku sou

The standard developer tools, including a C compiler, GNU Make, Bash, and Git. More information on these installations can be found [here](https://docs.waku.org/guides/nwaku/build-source#install-dependencies).

> In some distributions (Fedora linux for example), you may need to install `which` utility separately. Nimbus build system is relying on it.
### Wakunode

```bash
Expand Down Expand Up @@ -51,7 +53,7 @@ If you encounter difficulties building the project on WSL, consider placing the
### How to Build & Run ( Windows )

Note: This is a work in progress. The current setup procedure is as follows:
Goal: Get rid of windows specific procedures and make the build process the same as linux/macos.
Goal: Get rid of windows specific procedures and make the build process the same as linux/macos.

The current setup procedure is as follows:

Expand Down
4 changes: 2 additions & 2 deletions library/events/json_message_event.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ proc `%`*(value: Base64String): JsonNode =

type JsonMessageEvent* = ref object of JsonEvent
pubsubTopic*: string
messageHash*: WakuMessageHash
messageHash*: string
wakuMessage*: JsonMessage

proc new*(T: type JsonMessageEvent, pubSubTopic: string, msg: WakuMessage): T =
Expand All @@ -83,7 +83,7 @@ proc new*(T: type JsonMessageEvent, pubSubTopic: string, msg: WakuMessage): T =
return JsonMessageEvent(
eventType: "message",
pubSubTopic: pubSubTopic,
messageHash: msgHash,
messageHash: msgHash.to0xHex(),
wakuMessage: JsonMessage(
payload: base64.encode(payload),
contentTopic: msg.contentTopic,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import std/[json, sugar, strutils, options]
import chronos, chronicles, results
import chronos, chronicles, results, stew/byteutils
import
../../../../../waku/factory/waku,
../../../../alloc,
Expand Down Expand Up @@ -27,16 +27,12 @@ func fromJsonNode(
for cTopic in jsonContent["content_topics"].getElems():
cTopic.getStr()

let msgHashes = collect(newSeq):
if jsonContent.contains("message_hashes"):
for hashJsonObj in jsonContent["message_hashes"].getElems():
var hash: WakuMessageHash
var count: int = 0
for byteValue in hashJsonObj.getElems():
hash[count] = byteValue.getInt().byte
count.inc()

hash
var msgHashes: seq[WakuMessageHash]
if jsonContent.contains("message_hashes"):
for hashJsonObj in jsonContent["message_hashes"].getElems():
let hash = hashJsonObj.getStr().hexToHash().valueOr:
return err("Failed converting message hash hex string to bytes: " & error)
msgHashes.add(hash)

let pubsubTopic =
if jsonContent.contains("pubsub_topic"):
Expand All @@ -46,12 +42,9 @@ func fromJsonNode(

let paginationCursor =
if jsonContent.contains("pagination_cursor"):
var hash: WakuMessageHash
var count: int = 0
for byteValue in jsonContent["pagination_cursor"].getElems():
hash[count] = byteValue.getInt().byte
count.inc()

let hash = jsonContent["pagination_cursor"].getStr().hexToHash().valueOr:
return
err("Failed converting pagination_cursor hex string to bytes: " & error)
some(hash)
else:
none(WakuMessageHash)
Expand Down Expand Up @@ -120,7 +113,8 @@ proc process_remote_query(
let queryResponse = (await waku.node.wakuStoreClient.query(storeQueryRequest, peer)).valueOr:
return err("StoreRequest failed store query: " & $error)

return ok($(%*queryResponse)) ## returning the response in json format
let res = $(%*(queryResponse.toHex()))
return ok(res) ## returning the response in json format

proc process*(
self: ptr StoreRequest, waku: ptr Waku
Expand Down
4 changes: 4 additions & 0 deletions tests/waku_store_sync/sync_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ proc newTestWakuRecon*(
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[(PeerId, Fingerprint)],
needsTx: AsyncQueue[(PeerId, Fingerprint)],
cluster: uint16 = 1,
shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7],
): Future[SyncReconciliation] {.async.} =
let peerManager = PeerManager.new(switch)

let res = await SyncReconciliation.new(
cluster = cluster,
shards = shards,
peerManager = peerManager,
wakuArchive = nil,
relayJitter = 0.seconds,
Expand Down
34 changes: 34 additions & 0 deletions tests/waku_store_sync/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,40 @@ suite "Waku Sync: reconciliation":
localWants.contains((clientPeerInfo.peerId, hash3)) == true
localWants.contains((serverPeerInfo.peerId, hash2)) == true

asyncTest "sync 2 nodes different shards":
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic)
hash1 = computeMessageHash(DefaultPubsubTopic, msg1)
hash2 = computeMessageHash(DefaultPubsubTopic, msg2)
hash3 = computeMessageHash(DefaultPubsubTopic, msg3)

server.messageIngress(hash1, msg1)
server.messageIngress(hash2, msg2)
client.messageIngress(hash1, msg1)
client.messageIngress(hash3, msg3)

check:
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false
localWants.contains((clientPeerInfo.peerId, hash3)) == false
localWants.contains((serverPeerInfo.peerId, hash2)) == false

server = await newTestWakuRecon(
serverSwitch, idsChannel, localWants, remoteNeeds, shards = @[0.uint16, 1, 2, 3]
)
client = await newTestWakuRecon(
clientSwitch, idsChannel, localWants, remoteNeeds, shards = @[4.uint16, 5, 6, 7]
)

var syncRes = await client.storeSynchronization(some(serverPeerInfo))
assert syncRes.isOk(), $syncRes.error

check:
remoteNeeds.len == 0
localWants.len == 0

asyncTest "sync 2 nodes same hashes":
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
Expand Down
27 changes: 14 additions & 13 deletions tests/wakunode_rest/test_rest_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ procSuite "Waku Rest API - Store v3":
messageHash == parsedMsgHashRes.get().get()

# Random validation. Obtained the raw values manually
let expected = some("f6za9OzG1xSiEZagZc2b3litRbkd3zRl61rezDd3pgQ%3D")
let expected =
some("0x9e0ea917677a3d2b8610b0126986d89824b6acf76008b5fb9aa8b99ac906c1a7")

let msgHashRes = parseHash(expected)
assert msgHashRes.isOk(), $msgHashRes.error
Expand Down Expand Up @@ -147,7 +148,7 @@ procSuite "Waku Rest API - Store v3":
"", # start time
"", # end time
"", # hashes
encodedCursor, # base64-encoded hash
encodedCursor, # hex-encoded hash
"true", # ascending
"5", # empty implies default page size
)
Expand Down Expand Up @@ -217,7 +218,7 @@ procSuite "Waku Rest API - Store v3":
"3", # start time
"6", # end time
"", # hashes
"", # base64-encoded hash
"", # hex-encoded hash
"true", # ascending
"", # empty implies default page size
)
Expand Down Expand Up @@ -283,7 +284,7 @@ procSuite "Waku Rest API - Store v3":

var pages = newSeq[seq[WakuMessage]](2)

var reqHash = none(WakuMessageHash)
var reqHash = none(string)

for i in 0 ..< 2:
let response = await client.getStoreMessagesV3(
Expand All @@ -295,9 +296,9 @@ procSuite "Waku Rest API - Store v3":
"", # end time. Empty ignores the field.
"", # hashes
if reqHash.isSome():
reqHash.get().toRestStringWakuMessageHash()
reqHash.get()
else:
"", # base64-encoded digest. Empty ignores the field.
"", # hex-encoded digest. Empty ignores the field.
"true", # ascending
"7", # page size. Empty implies default page size.
)
Expand Down Expand Up @@ -775,7 +776,7 @@ procSuite "Waku Rest API - Store v3":
var pages = newSeq[seq[WakuMessage]](2)

var reqPubsubTopic = DefaultPubsubTopic
var reqHash = none(WakuMessageHash)
var reqHash = none(string)

for i in 0 ..< 2:
let response = await client.getStoreMessagesV3(
Expand All @@ -787,9 +788,9 @@ procSuite "Waku Rest API - Store v3":
"", # end time. Empty ignores the field.
"", # hashes
if reqHash.isSome():
reqHash.get().toRestStringWakuMessageHash()
reqHash.get()
else:
"", # base64-encoded digest. Empty ignores the field.
"", # hex-encoded digest. Empty ignores the field.
"true", # ascending
"3", # page size. Empty implies default page size.
)
Expand Down Expand Up @@ -823,9 +824,9 @@ procSuite "Waku Rest API - Store v3":
"", # end time. Empty ignores the field.
"", # hashes
if reqHash.isSome():
reqHash.get().toRestStringWakuMessageHash()
reqHash.get()
else:
"", # base64-encoded digest. Empty ignores the field.
"", # hex-encoded digest. Empty ignores the field.
)

check:
Expand All @@ -845,9 +846,9 @@ procSuite "Waku Rest API - Store v3":
"", # end time. Empty ignores the field.
"", # hashes
if reqHash.isSome():
reqHash.get().toRestStringWakuMessageHash()
reqHash.get()
else:
"", # base64-encoded digest. Empty ignores the field.
"", # hex-encoded digest. Empty ignores the field.
"true", # ascending
"5", # page size. Empty implies default page size.
)
Expand Down
4 changes: 2 additions & 2 deletions waku.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") =
extra_params &= " " & paramStr(i)
if `type` == "static":
exec "nim c" & " --out:build/" & name &
".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --undef:metrics " &
".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --undef:metrics --skipParentCfg:on " &
extra_params & " " & srcDir & name & ".nim"
else:
exec "nim c" & " --out:build/" & name &
".so --threads:on --app:lib --opt:size --noMain --mm:refc --header --undef:metrics " &
".so --threads:on --app:lib --opt:size --noMain --mm:refc --header --undef:metrics --skipParentCfg:on " &
extra_params & " " & srcDir & name & ".nim"

proc buildMobileAndroid(srcDir = ".", params = "") =
Expand Down
12 changes: 11 additions & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,19 @@ proc mountStoreSync*(
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)

var cluster: uint16
var shards: seq[uint16]
let enrRes = node.enr.toTyped()
if enrRes.isOk():
let shardingRes = enrRes.get().relaySharding()
if shardingRes.isSome():
let relayShard = shardingRes.get()
cluster = relayShard.clusterID
shards = relayShard.shardIds

let recon =
?await SyncReconciliation.new(
node.peerManager, node.wakuArchive, storeSyncRange.seconds,
cluster, shards, node.peerManager, node.wakuArchive, storeSyncRange.seconds,
storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel,
needsChannel,
)
Expand Down
14 changes: 7 additions & 7 deletions waku/waku_api/rest/store/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ logScope:
topics = "waku node rest store_api"

proc decodeBytes*(
t: typedesc[StoreQueryResponse],
t: typedesc[StoreQueryResponseHex],
data: openArray[byte],
contentType: Opt[ContentTypeData],
): RestResult[StoreQueryResponse] =
): RestResult[StoreQueryResponseHex] =
if MediaType.init($contentType) == MIMETYPE_JSON:
let decoded = ?decodeFromJsonBytes(StoreQueryResponse, data)
let decoded = ?decodeFromJsonBytes(StoreQueryResponseHex, data)
return ok(decoded)

if MediaType.init($contentType) == MIMETYPE_TEXT:
Expand All @@ -30,11 +30,11 @@ proc decodeBytes*(
copyMem(addr res[0], unsafeAddr data[0], len(data))

return ok(
StoreQueryResponse(
StoreQueryResponseHex(
statusCode: uint32(ErrorCode.BAD_RESPONSE),
statusDesc: res,
messages: newSeq[WakuMessageKeyValue](0),
paginationCursor: none(WakuMessageHash),
messages: newSeq[WakuMessageKeyValueHex](0),
paginationCursor: none(string),
)
)

Expand All @@ -58,6 +58,6 @@ proc getStoreMessagesV3*(
cursor: string = "", # base64-encoded hash
ascending: string = "",
pageSize: string = "",
): RestResponse[StoreQueryResponse] {.
): RestResponse[StoreQueryResponseHex] {.
rest, endpoint: "/store/v3/messages", meth: HttpMethod.MethodGet
.}
4 changes: 2 additions & 2 deletions waku/waku_api/rest/store/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ proc performStoreQuery(
error msg, error = futRes.error
return RestApiResponse.internalServerError(fmt("{msg} [{futRes.error}]"))

let res = futRes.get()
let res = futRes.get().toHex()

if res.statusCode == uint32(ErrorCode.TOO_MANY_REQUESTS):
debug "Request rate limit reached on peer ", storePeer
Expand Down Expand Up @@ -165,7 +165,7 @@ proc retrieveMsgsFromSelfNode(
let storeResp = (await self.wakuStore.handleSelfStoreRequest(storeQuery)).valueOr:
return RestApiResponse.internalServerError($error)

let resp = RestApiResponse.jsonResponse(storeResp, status = Http200).valueOr:
let resp = RestApiResponse.jsonResponse(storeResp.toHex(), status = Http200).valueOr:
const msg = "Error building the json respose"
let e = $error
error msg, error = e
Expand Down
Loading

0 comments on commit 6854745

Please sign in to comment.