diff --git a/Makefile b/Makefile index eb11367b02..99ceadbb17 100644 --- a/Makefile +++ b/Makefile @@ -266,7 +266,7 @@ NPH:=$(shell dirname $(NIM_BINARY))/nph build-nph: | build deps ifeq ("$(wildcard $(NPH))","") - $(ENV_SCRIPT) nim c vendor/nph/src/nph.nim && \ + $(ENV_SCRIPT) nim c --skipParentCfg:on vendor/nph/src/nph.nim && \ mv vendor/nph/src/nph $(shell dirname $(NPH)) echo "nph utility is available at " $(NPH) else diff --git a/README.md b/README.md index c3111ad8d2..cb13d6cef4 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,8 @@ These instructions are generic. For more detailed instructions, see the Waku sou The standard developer tools, including a C compiler, GNU Make, Bash, and Git. More information on these installations can be found [here](https://docs.waku.org/guides/nwaku/build-source#install-dependencies). +> In some distributions (Fedora linux for example), you may need to install `which` utility separately. Nimbus build system is relying on it. + ### Wakunode ```bash @@ -51,7 +53,7 @@ If you encounter difficulties building the project on WSL, consider placing the ### How to Build & Run ( Windows ) Note: This is a work in progress. The current setup procedure is as follows: -Goal: Get rid of windows specific procedures and make the build process the same as linux/macos. +Goal: Get rid of windows specific procedures and make the build process the same as linux/macos. The current setup procedure is as follows: diff --git a/library/events/json_message_event.nim b/library/events/json_message_event.nim index 2d066e7b37..2fbfb0d927 100644 --- a/library/events/json_message_event.nim +++ b/library/events/json_message_event.nim @@ -59,7 +59,7 @@ proc `%`*(value: Base64String): JsonNode = type JsonMessageEvent* = ref object of JsonEvent pubsubTopic*: string - messageHash*: WakuMessageHash + messageHash*: string wakuMessage*: JsonMessage proc new*(T: type JsonMessageEvent, pubSubTopic: string, msg: WakuMessage): T = @@ -83,7 +83,7 @@ proc new*(T: type JsonMessageEvent, pubSubTopic: string, msg: WakuMessage): T = return JsonMessageEvent( eventType: "message", pubSubTopic: pubSubTopic, - messageHash: msgHash, + messageHash: msgHash.to0xHex(), wakuMessage: JsonMessage( payload: base64.encode(payload), contentTopic: msg.contentTopic, diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index 91006e9222..59e27dfec9 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -1,5 +1,5 @@ import std/[json, sugar, strutils, options] -import chronos, chronicles, results +import chronos, chronicles, results, stew/byteutils import ../../../../../waku/factory/waku, ../../../../alloc, @@ -27,16 +27,12 @@ func fromJsonNode( for cTopic in jsonContent["content_topics"].getElems(): cTopic.getStr() - let msgHashes = collect(newSeq): - if jsonContent.contains("message_hashes"): - for hashJsonObj in jsonContent["message_hashes"].getElems(): - var hash: WakuMessageHash - var count: int = 0 - for byteValue in hashJsonObj.getElems(): - hash[count] = byteValue.getInt().byte - count.inc() - - hash + var msgHashes: seq[WakuMessageHash] + if jsonContent.contains("message_hashes"): + for hashJsonObj in jsonContent["message_hashes"].getElems(): + let hash = hashJsonObj.getStr().hexToHash().valueOr: + return err("Failed converting message hash hex string to bytes: " & error) + msgHashes.add(hash) let pubsubTopic = if jsonContent.contains("pubsub_topic"): @@ -46,12 +42,9 @@ func fromJsonNode( let paginationCursor = if jsonContent.contains("pagination_cursor"): - var hash: WakuMessageHash - var count: int = 0 - for byteValue in jsonContent["pagination_cursor"].getElems(): - hash[count] = byteValue.getInt().byte - count.inc() - + let hash = jsonContent["pagination_cursor"].getStr().hexToHash().valueOr: + return + err("Failed converting pagination_cursor hex string to bytes: " & error) some(hash) else: none(WakuMessageHash) @@ -120,7 +113,8 @@ proc process_remote_query( let queryResponse = (await waku.node.wakuStoreClient.query(storeQueryRequest, peer)).valueOr: return err("StoreRequest failed store query: " & $error) - return ok($(%*queryResponse)) ## returning the response in json format + let res = $(%*(queryResponse.toHex())) + return ok(res) ## returning the response in json format proc process*( self: ptr StoreRequest, waku: ptr Waku diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim index aa56ff2e51..20a6bdfb19 100644 --- a/tests/waku_store_sync/sync_utils.nim +++ b/tests/waku_store_sync/sync_utils.nim @@ -25,10 +25,14 @@ proc newTestWakuRecon*( idsRx: AsyncQueue[SyncID], wantsTx: AsyncQueue[(PeerId, Fingerprint)], needsTx: AsyncQueue[(PeerId, Fingerprint)], + cluster: uint16 = 1, + shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7], ): Future[SyncReconciliation] {.async.} = let peerManager = PeerManager.new(switch) let res = await SyncReconciliation.new( + cluster = cluster, + shards = shards, peerManager = peerManager, wakuArchive = nil, relayJitter = 0.seconds, diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index d3ffa187f3..f507ad95bc 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -157,6 +157,40 @@ suite "Waku Sync: reconciliation": localWants.contains((clientPeerInfo.peerId, hash3)) == true localWants.contains((serverPeerInfo.peerId, hash2)) == true + asyncTest "sync 2 nodes different shards": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(DefaultPubsubTopic, msg2) + hash3 = computeMessageHash(DefaultPubsubTopic, msg3) + + server.messageIngress(hash1, msg1) + server.messageIngress(hash2, msg2) + client.messageIngress(hash1, msg1) + client.messageIngress(hash3, msg3) + + check: + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false + localWants.contains((clientPeerInfo.peerId, hash3)) == false + localWants.contains((serverPeerInfo.peerId, hash2)) == false + + server = await newTestWakuRecon( + serverSwitch, idsChannel, localWants, remoteNeeds, shards = @[0.uint16, 1, 2, 3] + ) + client = await newTestWakuRecon( + clientSwitch, idsChannel, localWants, remoteNeeds, shards = @[4.uint16, 5, 6, 7] + ) + + var syncRes = await client.storeSynchronization(some(serverPeerInfo)) + assert syncRes.isOk(), $syncRes.error + + check: + remoteNeeds.len == 0 + localWants.len == 0 + asyncTest "sync 2 nodes same hashes": let msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index 24248ee4b3..c31e3939c1 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -74,7 +74,8 @@ procSuite "Waku Rest API - Store v3": messageHash == parsedMsgHashRes.get().get() # Random validation. Obtained the raw values manually - let expected = some("f6za9OzG1xSiEZagZc2b3litRbkd3zRl61rezDd3pgQ%3D") + let expected = + some("0x9e0ea917677a3d2b8610b0126986d89824b6acf76008b5fb9aa8b99ac906c1a7") let msgHashRes = parseHash(expected) assert msgHashRes.isOk(), $msgHashRes.error @@ -147,7 +148,7 @@ procSuite "Waku Rest API - Store v3": "", # start time "", # end time "", # hashes - encodedCursor, # base64-encoded hash + encodedCursor, # hex-encoded hash "true", # ascending "5", # empty implies default page size ) @@ -217,7 +218,7 @@ procSuite "Waku Rest API - Store v3": "3", # start time "6", # end time "", # hashes - "", # base64-encoded hash + "", # hex-encoded hash "true", # ascending "", # empty implies default page size ) @@ -283,7 +284,7 @@ procSuite "Waku Rest API - Store v3": var pages = newSeq[seq[WakuMessage]](2) - var reqHash = none(WakuMessageHash) + var reqHash = none(string) for i in 0 ..< 2: let response = await client.getStoreMessagesV3( @@ -295,9 +296,9 @@ procSuite "Waku Rest API - Store v3": "", # end time. Empty ignores the field. "", # hashes if reqHash.isSome(): - reqHash.get().toRestStringWakuMessageHash() + reqHash.get() else: - "", # base64-encoded digest. Empty ignores the field. + "", # hex-encoded digest. Empty ignores the field. "true", # ascending "7", # page size. Empty implies default page size. ) @@ -775,7 +776,7 @@ procSuite "Waku Rest API - Store v3": var pages = newSeq[seq[WakuMessage]](2) var reqPubsubTopic = DefaultPubsubTopic - var reqHash = none(WakuMessageHash) + var reqHash = none(string) for i in 0 ..< 2: let response = await client.getStoreMessagesV3( @@ -787,9 +788,9 @@ procSuite "Waku Rest API - Store v3": "", # end time. Empty ignores the field. "", # hashes if reqHash.isSome(): - reqHash.get().toRestStringWakuMessageHash() + reqHash.get() else: - "", # base64-encoded digest. Empty ignores the field. + "", # hex-encoded digest. Empty ignores the field. "true", # ascending "3", # page size. Empty implies default page size. ) @@ -823,9 +824,9 @@ procSuite "Waku Rest API - Store v3": "", # end time. Empty ignores the field. "", # hashes if reqHash.isSome(): - reqHash.get().toRestStringWakuMessageHash() + reqHash.get() else: - "", # base64-encoded digest. Empty ignores the field. + "", # hex-encoded digest. Empty ignores the field. ) check: @@ -845,9 +846,9 @@ procSuite "Waku Rest API - Store v3": "", # end time. Empty ignores the field. "", # hashes if reqHash.isSome(): - reqHash.get().toRestStringWakuMessageHash() + reqHash.get() else: - "", # base64-encoded digest. Empty ignores the field. + "", # hex-encoded digest. Empty ignores the field. "true", # ascending "5", # page size. Empty implies default page size. ) diff --git a/waku.nimble b/waku.nimble index 3035b769c9..14c3749827 100644 --- a/waku.nimble +++ b/waku.nimble @@ -65,11 +65,11 @@ proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") = extra_params &= " " & paramStr(i) if `type` == "static": exec "nim c" & " --out:build/" & name & - ".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --undef:metrics " & + ".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --undef:metrics --skipParentCfg:on " & extra_params & " " & srcDir & name & ".nim" else: exec "nim c" & " --out:build/" & name & - ".so --threads:on --app:lib --opt:size --noMain --mm:refc --header --undef:metrics " & + ".so --threads:on --app:lib --opt:size --noMain --mm:refc --header --undef:metrics --skipParentCfg:on " & extra_params & " " & srcDir & name & ".nim" proc buildMobileAndroid(srcDir = ".", params = "") = diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 1e3b2f1272..5b4f9900ff 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -216,9 +216,19 @@ proc mountStoreSync*( let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100) let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100) + var cluster: uint16 + var shards: seq[uint16] + let enrRes = node.enr.toTyped() + if enrRes.isOk(): + let shardingRes = enrRes.get().relaySharding() + if shardingRes.isSome(): + let relayShard = shardingRes.get() + cluster = relayShard.clusterID + shards = relayShard.shardIds + let recon = ?await SyncReconciliation.new( - node.peerManager, node.wakuArchive, storeSyncRange.seconds, + cluster, shards, node.peerManager, node.wakuArchive, storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel, needsChannel, ) diff --git a/waku/waku_api/rest/store/client.nim b/waku/waku_api/rest/store/client.nim index 887279cef3..80939ee252 100644 --- a/waku/waku_api/rest/store/client.nim +++ b/waku/waku_api/rest/store/client.nim @@ -15,12 +15,12 @@ logScope: topics = "waku node rest store_api" proc decodeBytes*( - t: typedesc[StoreQueryResponse], + t: typedesc[StoreQueryResponseHex], data: openArray[byte], contentType: Opt[ContentTypeData], -): RestResult[StoreQueryResponse] = +): RestResult[StoreQueryResponseHex] = if MediaType.init($contentType) == MIMETYPE_JSON: - let decoded = ?decodeFromJsonBytes(StoreQueryResponse, data) + let decoded = ?decodeFromJsonBytes(StoreQueryResponseHex, data) return ok(decoded) if MediaType.init($contentType) == MIMETYPE_TEXT: @@ -30,11 +30,11 @@ proc decodeBytes*( copyMem(addr res[0], unsafeAddr data[0], len(data)) return ok( - StoreQueryResponse( + StoreQueryResponseHex( statusCode: uint32(ErrorCode.BAD_RESPONSE), statusDesc: res, - messages: newSeq[WakuMessageKeyValue](0), - paginationCursor: none(WakuMessageHash), + messages: newSeq[WakuMessageKeyValueHex](0), + paginationCursor: none(string), ) ) @@ -58,6 +58,6 @@ proc getStoreMessagesV3*( cursor: string = "", # base64-encoded hash ascending: string = "", pageSize: string = "", -): RestResponse[StoreQueryResponse] {. +): RestResponse[StoreQueryResponseHex] {. rest, endpoint: "/store/v3/messages", meth: HttpMethod.MethodGet .} diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index 663d796eab..10edb68fb7 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -42,7 +42,7 @@ proc performStoreQuery( error msg, error = futRes.error return RestApiResponse.internalServerError(fmt("{msg} [{futRes.error}]")) - let res = futRes.get() + let res = futRes.get().toHex() if res.statusCode == uint32(ErrorCode.TOO_MANY_REQUESTS): debug "Request rate limit reached on peer ", storePeer @@ -165,7 +165,7 @@ proc retrieveMsgsFromSelfNode( let storeResp = (await self.wakuStore.handleSelfStoreRequest(storeQuery)).valueOr: return RestApiResponse.internalServerError($error) - let resp = RestApiResponse.jsonResponse(storeResp, status = Http200).valueOr: + let resp = RestApiResponse.jsonResponse(storeResp.toHex(), status = Http200).valueOr: const msg = "Error building the json respose" let e = $error error msg, error = e diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index e19bcf6be9..99818b5ff4 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -18,19 +18,22 @@ Json.setWriter JsonWriter, PreferredOutput = string #### Type conversion proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string] = - let base64UrlEncoded = + let hexUrlEncoded = if input.isSome(): input.get() else: return ok(none(WakuMessageHash)) - if base64UrlEncoded == "": + if hexUrlEncoded == "": return ok(none(WakuMessageHash)) - let base64Encoded = decodeUrl(base64UrlEncoded, false) + let hexDecoded = decodeUrl(hexUrlEncoded, false) - let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr: - return err("waku message hash parsing error: " & error) + var decodedBytes: seq[byte] + try: + decodedBytes = hexToSeqByte(hexDecoded) + except ValueError as e: + return err("Exception converting hex string to bytes: " & e.msg) if decodedBytes.len != 32: return @@ -58,12 +61,12 @@ proc parseHashes*(input: Option[string]): Result[seq[WakuMessageHash], string] = return ok(hashes) # Converts a given MessageDigest object into a suitable -# Base64-URL-encoded string suitable to be transmitted in a Rest -# request-response. The MessageDigest is first base64 encoded +# Hex-URL-encoded string suitable to be transmitted in a Rest +# request-response. The MessageDigest is first hex encoded # and this result is URL-encoded. proc toRestStringWakuMessageHash*(self: WakuMessageHash): string = - let base64Encoded = base64.encode(self) - encodeUrl($base64Encoded, false) + let hexEncoded = self.to0xHex() + encodeUrl(hexEncoded, false) ## WakuMessage serde @@ -147,14 +150,14 @@ proc readValue*( proof: proof, ) -## WakuMessageKeyValue serde +## WakuMessageKeyValueHex serde proc writeValue*( - writer: var JsonWriter, value: WakuMessageKeyValue + writer: var JsonWriter, value: WakuMessageKeyValueHex ) {.gcsafe, raises: [IOError].} = writer.beginRecord() - writer.writeField("messageHash", base64.encode(value.messageHash)) + writer.writeField("messageHash", value.messageHash) if value.message.isSome(): writer.writeField("message", value.message.get()) @@ -165,10 +168,10 @@ proc writeValue*( writer.endRecord() proc readValue*( - reader: var JsonReader, value: var WakuMessageKeyValue + reader: var JsonReader, value: var WakuMessageKeyValueHex ) {.gcsafe, raises: [SerializationError, IOError].} = var - messageHash = none(WakuMessageHash) + messageHash = none(string) message = none(WakuMessage) pubsubTopic = none(PubsubTopic) @@ -177,22 +180,19 @@ proc readValue*( of "messageHash": if messageHash.isSome(): reader.raiseUnexpectedField( - "Multiple `messageHash` fields found", "WakuMessageKeyValue" + "Multiple `messageHash` fields found", "WakuMessageKeyValueHex" ) - let base64String = reader.readValue(Base64String) - let bytes = base64.decode(base64String).valueOr: - reader.raiseUnexpectedField("Failed decoding data", "messageHash") - messageHash = some(fromBytes(bytes)) + messageHash = some(reader.readValue(string)) of "message": if message.isSome(): reader.raiseUnexpectedField( - "Multiple `message` fields found", "WakuMessageKeyValue" + "Multiple `message` fields found", "WakuMessageKeyValueHex" ) message = some(reader.readValue(WakuMessage)) of "pubsubTopic": if pubsubTopic.isSome(): reader.raiseUnexpectedField( - "Multiple `pubsubTopic` fields found", "WakuMessageKeyValue" + "Multiple `pubsubTopic` fields found", "WakuMessageKeyValueHex" ) pubsubTopic = some(reader.readValue(string)) else: @@ -201,14 +201,14 @@ proc readValue*( if messageHash.isNone(): reader.raiseUnexpectedValue("Field `messageHash` is missing") - value = WakuMessageKeyValue( + value = WakuMessageKeyValueHex( messageHash: messageHash.get(), message: message, pubsubTopic: pubsubTopic ) ## StoreQueryResponse serde proc writeValue*( - writer: var JsonWriter, value: StoreQueryResponse + writer: var JsonWriter, value: StoreQueryResponseHex ) {.gcsafe, raises: [IOError].} = writer.beginRecord() @@ -218,55 +218,52 @@ proc writeValue*( writer.writeField("messages", value.messages) if value.paginationCursor.isSome(): - writer.writeField("paginationCursor", base64.encode(value.paginationCursor.get())) + writer.writeField("paginationCursor", value.paginationCursor.get()) writer.endRecord() proc readValue*( - reader: var JsonReader, value: var StoreQueryResponse + reader: var JsonReader, value: var StoreQueryResponseHex ) {.gcsafe, raises: [SerializationError, IOError].} = var requestId = none(string) code = none(uint32) desc = none(string) - messages = none(seq[WakuMessageKeyValue]) - cursor = none(WakuMessageHash) + messages = none(seq[WakuMessageKeyValueHex]) + cursor = none(string) for fieldName in readObjectFields(reader): case fieldName of "requestId": if requestId.isSome(): reader.raiseUnexpectedField( - "Multiple `requestId` fields found", "StoreQueryResponse" + "Multiple `requestId` fields found", "StoreQueryResponseHex" ) requestId = some(reader.readValue(string)) of "statusCode": if code.isSome(): reader.raiseUnexpectedField( - "Multiple `statusCode` fields found", "StoreQueryResponse" + "Multiple `statusCode` fields found", "StoreQueryResponseHex" ) code = some(reader.readValue(uint32)) of "statusDesc": if desc.isSome(): reader.raiseUnexpectedField( - "Multiple `statusDesc` fields found", "StoreQueryResponse" + "Multiple `statusDesc` fields found", "StoreQueryResponseHex" ) desc = some(reader.readValue(string)) of "messages": if messages.isSome(): reader.raiseUnexpectedField( - "Multiple `messages` fields found", "StoreQueryResponse" + "Multiple `messages` fields found", "StoreQueryResponseHex" ) - messages = some(reader.readValue(seq[WakuMessageKeyValue])) + messages = some(reader.readValue(seq[WakuMessageKeyValueHex])) of "paginationCursor": if cursor.isSome(): reader.raiseUnexpectedField( - "Multiple `paginationCursor` fields found", "StoreQueryResponse" + "Multiple `paginationCursor` fields found", "StoreQueryResponseHex" ) - let base64String = reader.readValue(Base64String) - let bytes = base64.decode(base64String).valueOr: - reader.raiseUnexpectedField("Failed decoding data", "paginationCursor") - cursor = some(fromBytes(bytes)) + cursor = some(reader.readValue(string)) else: reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) @@ -282,7 +279,7 @@ proc readValue*( if messages.isNone(): reader.raiseUnexpectedValue("Field `messages` is missing") - value = StoreQueryResponse( + value = StoreQueryResponseHex( requestId: requestId.get(), statusCode: code.get(), statusDesc: desc.get(), diff --git a/waku/waku_core/message/digest.nim b/waku/waku_core/message/digest.nim index 0aa1ce610c..8b99abd7e4 100644 --- a/waku/waku_core/message/digest.nim +++ b/waku/waku_core/message/digest.nim @@ -1,6 +1,6 @@ {.push raises: [].} -import std/sequtils, stew/[byteutils, endians2, arrayops], nimcrypto/sha2 +import std/sequtils, stew/[byteutils, endians2, arrayops], nimcrypto/sha2, results import ../topics, ./message ## 14/WAKU2-MESSAGE: Deterministic message hashing @@ -35,6 +35,16 @@ converter toBytesArray*(digest: MDigest[256]): WakuMessageHash = converter toBytes*(digest: MDigest[256]): seq[byte] = toSeq(digest.data) +proc hexToHash*(hexString: string): Result[WakuMessageHash, string] = + var hash: WakuMessageHash + + try: + hash = hexString.hexToSeqByte().fromBytes() + except ValueError as e: + return err("Exception converting hex string to hash: " & e.msg) + + return ok(hash) + proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessageHash = var ctx: sha256 ctx.init() diff --git a/waku/waku_store/common.nim b/waku/waku_store/common.nim index 54f5f54ea4..d11c803f92 100644 --- a/waku/waku_store/common.nim +++ b/waku/waku_store/common.nim @@ -1,6 +1,6 @@ {.push raises: [].} -import std/[options], results +import std/[options, sequtils], results, stew/byteutils import ../waku_core, ../common/paging from ../waku_core/codecs import WakuStoreCodec @@ -48,6 +48,22 @@ type paginationCursor*: Option[WakuMessageHash] + # Types to be used by clients that use the hash in hex + WakuMessageKeyValueHex* = object + messageHash*: string + message*: Option[WakuMessage] + pubsubTopic*: Option[PubsubTopic] + + StoreQueryResponseHex* = object + requestId*: string + + statusCode*: uint32 + statusDesc*: string + + messages*: seq[WakuMessageKeyValueHex] + + paginationCursor*: Option[string] + StatusCode* {.pure.} = enum UNKNOWN = uint32(000) SUCCESS = uint32(200) @@ -117,3 +133,24 @@ proc `$`*(err: StoreError): string = "SERVICE_UNAVAILABLE" of ErrorCode.UNKNOWN: "UNKNOWN" + +proc toHex*(messageData: WakuMessageKeyValue): WakuMessageKeyValueHex = + WakuMessageKeyValueHex( + messageHash: messageData.messageHash.to0xHex(), + # Assuming WakuMessageHash has a toHex method + message: messageData.message, + pubsubTopic: messageData.pubsubTopic, + ) + +proc toHex*(response: StoreQueryResponse): StoreQueryResponseHex = + StoreQueryResponseHex( + requestId: response.requestId, + statusCode: response.statusCode, + statusDesc: response.statusDesc, + messages: response.messages.mapIt(it.toHex()), # Convert each message to hex + paginationCursor: + if response.paginationCursor.isSome: + some(response.paginationCursor.get().to0xHex()) + else: + none[string](), + ) diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index ee0b926a3a..815ed9d618 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -52,6 +52,18 @@ proc deltaEncode*(value: RangesData): seq[byte] = i = 0 j = 0 + # encode cluster + buf = uint64(value.cluster).toBytes(Leb128) + output &= @buf + + # encode shards + buf = uint64(value.shards.len).toBytes(Leb128) + output &= @buf + + for shard in value.shards: + buf = uint64(shard).toBytes(Leb128) + output &= @buf + # the first range is implicit but must be explicit when encoded let (bound, _) = value.ranges[0] @@ -209,6 +221,38 @@ proc getReconciled(idx: var int, buffer: seq[byte]): Result[bool, string] = return ok(recon) +proc getCluster(idx: var int, buffer: seq[byte]): Result[uint16, string] = + if idx + VarIntLen > buffer.len: + return err("Cannot decode cluster") + + let slice = buffer[idx ..< idx + VarIntLen] + let (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + return ok(uint16(val)) + +proc getShards(idx: var int, buffer: seq[byte]): Result[seq[uint16], string] = + if idx + VarIntLen > buffer.len: + return err("Cannot decode shards count") + + let slice = buffer[idx ..< idx + VarIntLen] + let (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + let shardsLen = val + + var shards: seq[uint16] + for i in 0 ..< shardsLen: + if idx + VarIntLen > buffer.len: + return err("Cannot decode shard value. idx: " & $i) + + let slice = buffer[idx ..< idx + VarIntLen] + let (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + shards.add(uint16(val)) + + return ok(shards) + proc deltaDecode*( itemSet: var ItemSet, buffer: seq[byte], setLength: int ): Result[int, string] = @@ -242,7 +286,7 @@ proc getItemSet( return ok(itemSet) proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] = - if buffer.len == 1: + if buffer.len <= 1: return ok(RangesData()) var @@ -250,6 +294,9 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] = lastTime = Timestamp(0) idx = 0 + payload.cluster = ?getCluster(idx, buffer) + payload.shards = ?getShards(idx, buffer) + lastTime = ?getTimestamp(idx, buffer) # implicit first hash is always 0 diff --git a/waku/waku_store_sync/common.nim b/waku/waku_store_sync/common.nim index 2795450786..e2eac0f853 100644 --- a/waku/waku_store_sync/common.nim +++ b/waku/waku_store_sync/common.nim @@ -26,6 +26,9 @@ type ItemSet = 2 RangesData* = object + cluster*: uint16 + shards*: seq[uint16] + ranges*: seq[(Slice[SyncID], RangeType)] fingerprints*: seq[Fingerprint] # Range type fingerprint stored here in order itemSets*: seq[ItemSet] # Range type itemset stored here in order diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 9ac81c6677..10e8aed52c 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[sequtils, options], + std/[sequtils, options, packedsets], stew/byteutils, results, chronicles, @@ -37,6 +37,9 @@ logScope: const DefaultStorageCap = 50_000 type SyncReconciliation* = ref object of LPProtocol + cluster: uint16 + shards: PackedSet[uint16] + peerManager: PeerManager wakuArchive: WakuArchive @@ -114,16 +117,24 @@ proc processRequest( var hashToRecv: seq[WakuMessageHash] hashToSend: seq[WakuMessageHash] + sendPayload: RangesData + rawPayload: seq[byte] + + # Only process the ranges IF the shards and cluster matches + if self.cluster == recvPayload.cluster and + recvPayload.shards.toPackedSet() == self.shards: + sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) - let sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) + sendPayload.cluster = self.cluster + sendPayload.shards = self.shards.toSeq() - for hash in hashToSend: - await self.remoteNeedsTx.addLast((conn.peerId, hash)) + for hash in hashToSend: + await self.remoteNeedsTx.addLast((conn.peerId, hash)) - for hash in hashToRecv: - await self.localWantstx.addLast((conn.peerId, hash)) + for hash in hashToRecv: + await self.localWantstx.addLast((conn.peerId, hash)) - let rawPayload = sendPayload.deltaEncode() + rawPayload = sendPayload.deltaEncode() total_bytes_exchanged.observe( rawPayload.len, labelValues = [Reconciliation, Sending] @@ -162,6 +173,8 @@ proc initiate( fingerprint = self.storage.computeFingerprint(bounds) initPayload = RangesData( + cluster: self.cluster, + shards: self.shards.toSeq(), ranges: @[(bounds, RangeType.Fingerprint)], fingerprints: @[fingerprint], itemSets: @[], @@ -261,6 +274,8 @@ proc initFillStorage( proc new*( T: type SyncReconciliation, + cluster: uint16, + shards: seq[uint16], peerManager: PeerManager, wakuArchive: WakuArchive, syncRange: timer.Duration = DefaultSyncRange, @@ -279,6 +294,8 @@ proc new*( SeqStorage.new(res.get()) var sync = SyncReconciliation( + cluster: cluster, + shards: shards.toPackedSet(), peerManager: peerManager, storage: storage, syncRange: syncRange,