From be4450f056e19a0ff85b8814545df903a696507f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 31 Jan 2025 13:09:24 +0200 Subject: [PATCH 1/8] adding temporary debug log --- library/libwaku.nim | 1 + 1 file changed, 1 insertion(+) diff --git a/library/libwaku.nim b/library/libwaku.nim index 204b73d0dd..9ad6b8d2be 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -79,6 +79,7 @@ proc handleRequest( ): cint = waku_thread.sendRequestToWakuThread(ctx, requestType, content, callback, userData).isOkOr: let msg = "libwaku error: " & $error + echo msg callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR From 06c0ed0d4cdb2c7fa1d0007b088df743034cfe74 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 3 Feb 2025 13:04:18 +0200 Subject: [PATCH 2/8] using Channel --- library/waku_thread/waku_thread.nim | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 4e8019b08a..c0de2711ed 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -8,7 +8,7 @@ import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../f type WakuContext* = object thread: Thread[(ptr WakuContext)] - reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] + reqChannel: Channel[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr # to inform The Waku Thread (a.k.a TWT) that a new request is sent reqReceivedSignal: ThreadSignalPtr @@ -33,9 +33,15 @@ proc runWaku(ctx: ptr WakuContext) {.async.} = if ctx.running.load == false: break - ## Trying to get a request from the libwaku requestor thread + var recvOk: bool var request: ptr WakuThreadRequest - let recvOk = ctx.reqChannel.tryRecv(request) + ## Trying to get a request from the libwaku requestor thread + # var request: ptr WakuThreadRequest + try: + (recvOk, request) = ctx.reqChannel.tryRecv() + except Exception: + error "exception trying to receive a request" + continue if not recvOk: error "waku thread could not receive a request" continue @@ -55,6 +61,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = ## This proc is called from the main thread and it creates ## the Waku working thread. var ctx = createShared(WakuContext, 1) + ctx.reqChannel.open() ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: @@ -81,6 +88,7 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = return err("failed to signal reqSignal on time in destroyWakuThread") joinThread(ctx.thread) + ctx.reqChannel.close() ?ctx.reqSignal.close() ?ctx.reqReceivedSignal.close() freeShared(ctx) From 263a6601701291b26a0ca0db834cf7e0ec8b8b6b Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 3 Feb 2025 13:04:46 +0200 Subject: [PATCH 3/8] Revert "using Channel" This reverts commit 90a6049e35825b3f3ec15c76abeca39633e5feea. --- library/waku_thread/waku_thread.nim | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index c0de2711ed..4e8019b08a 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -8,7 +8,7 @@ import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../f type WakuContext* = object thread: Thread[(ptr WakuContext)] - reqChannel: Channel[ptr WakuThreadRequest] + reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr # to inform The Waku Thread (a.k.a TWT) that a new request is sent reqReceivedSignal: ThreadSignalPtr @@ -33,15 +33,9 @@ proc runWaku(ctx: ptr WakuContext) {.async.} = if ctx.running.load == false: break - var recvOk: bool - var request: ptr WakuThreadRequest ## Trying to get a request from the libwaku requestor thread - # var request: ptr WakuThreadRequest - try: - (recvOk, request) = ctx.reqChannel.tryRecv() - except Exception: - error "exception trying to receive a request" - continue + var request: ptr WakuThreadRequest + let recvOk = ctx.reqChannel.tryRecv(request) if not recvOk: error "waku thread could not receive a request" continue @@ -61,7 +55,6 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = ## This proc is called from the main thread and it creates ## the Waku working thread. var ctx = createShared(WakuContext, 1) - ctx.reqChannel.open() ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: @@ -88,7 +81,6 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = return err("failed to signal reqSignal on time in destroyWakuThread") joinThread(ctx.thread) - ctx.reqChannel.close() ?ctx.reqSignal.close() ?ctx.reqReceivedSignal.close() freeShared(ctx) From 1d4c297a5002bea4de6fb43845be5ed3ff94bf98 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 3 Feb 2025 13:25:11 +0200 Subject: [PATCH 4/8] adding lock --- library/waku_thread/waku_thread.nim | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 4e8019b08a..b3afb84419 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -2,12 +2,13 @@ {.pragma: callback, cdecl, raises: [], gcsafe.} {.passc: "-fPIC".} -import std/[options, atomics, os, net] +import std/[options, atomics, os, net, locks] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types type WakuContext* = object thread: Thread[(ptr WakuContext)] + lock: Lock reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr # to inform The Waku Thread (a.k.a TWT) that a new request is sent @@ -59,6 +60,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = return err("couldn't create reqSignal ThreadSignalPtr") ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.lock.initLock() ctx.running.store(true) @@ -81,6 +83,7 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = return err("failed to signal reqSignal on time in destroyWakuThread") joinThread(ctx.thread) + ctx.lock.deinitLock() ?ctx.reqSignal.close() ?ctx.reqReceivedSignal.close() freeShared(ctx) @@ -96,6 +99,7 @@ proc sendRequestToWakuThread*( ): Result[void, string] = let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) ## Sending the request + ctx.lock.acquire() let sentOk = ctx.reqChannel.trySend(req) if not sentOk: deallocShared(req) @@ -112,6 +116,8 @@ proc sendRequestToWakuThread*( ## wait until the Waku Thread properly received the request let res = ctx.reqReceivedSignal.waitSync() + ctx.lock.release() + if res.isErr(): deallocShared(req) return err("Couldn't receive reqReceivedSignal signal") From 57951b2cfc4c5561c564af4554fe5e051c07fac1 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 3 Feb 2025 14:15:58 +0200 Subject: [PATCH 5/8] removing debug log --- library/libwaku.nim | 1 - 1 file changed, 1 deletion(-) diff --git a/library/libwaku.nim b/library/libwaku.nim index 9ad6b8d2be..204b73d0dd 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -79,7 +79,6 @@ proc handleRequest( ): cint = waku_thread.sendRequestToWakuThread(ctx, requestType, content, callback, userData).isOkOr: let msg = "libwaku error: " & $error - echo msg callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR From 89f4a6f6d307b4656eecf34efe33923926575591 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 3 Feb 2025 22:33:19 +0200 Subject: [PATCH 6/8] releasing lock on errors --- library/waku_thread/waku_thread.nim | 3 +++ 1 file changed, 3 insertions(+) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index b3afb84419..0101dac781 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -102,15 +102,18 @@ proc sendRequestToWakuThread*( ctx.lock.acquire() let sentOk = ctx.reqChannel.trySend(req) if not sentOk: + ctx.lock.release() deallocShared(req) return err("Couldn't send a request to the waku thread: " & $req[]) let fireSyncRes = ctx.reqSignal.fireSync() if fireSyncRes.isErr(): + ctx.lock.release() deallocShared(req) return err("failed fireSync: " & $fireSyncRes.error) if fireSyncRes.get() == false: + ctx.lock.release() deallocShared(req) return err("Couldn't fireSync in time") From a83bdf36ed037ca2ee63646551244780ad6d06fa Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 4 Feb 2025 10:54:14 +0200 Subject: [PATCH 7/8] adding comment --- library/waku_thread/waku_thread.nim | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 0101dac781..97bd6692cf 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -98,8 +98,13 @@ proc sendRequestToWakuThread*( userData: pointer, ): Result[void, string] = let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) - ## Sending the request + + # This lock is only necessary while we use a SP Channel and while the signalling + # between threads assumes that there aren't concurrent requests. + # Rearchitecting the signaling + migrating to a MP Channel will allow us receive + # requests concurrently and spare us the need of locks ctx.lock.acquire() + ## Sending the request let sentOk = ctx.reqChannel.trySend(req) if not sentOk: ctx.lock.release() From 5a106f7a69b2b673fd30cf8e2479fd400369ea35 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 4 Feb 2025 10:55:40 +0200 Subject: [PATCH 8/8] improving comment --- library/waku_thread/waku_thread.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 97bd6692cf..e7487a3182 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -101,7 +101,7 @@ proc sendRequestToWakuThread*( # This lock is only necessary while we use a SP Channel and while the signalling # between threads assumes that there aren't concurrent requests. - # Rearchitecting the signaling + migrating to a MP Channel will allow us receive + # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive # requests concurrently and spare us the need of locks ctx.lock.acquire() ## Sending the request