diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 4e8019b08a..e7487a3182 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -2,12 +2,13 @@ {.pragma: callback, cdecl, raises: [], gcsafe.} {.passc: "-fPIC".} -import std/[options, atomics, os, net] +import std/[options, atomics, os, net, locks] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types type WakuContext* = object thread: Thread[(ptr WakuContext)] + lock: Lock reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr # to inform The Waku Thread (a.k.a TWT) that a new request is sent @@ -59,6 +60,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = return err("couldn't create reqSignal ThreadSignalPtr") ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.lock.initLock() ctx.running.store(true) @@ -81,6 +83,7 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = return err("failed to signal reqSignal on time in destroyWakuThread") joinThread(ctx.thread) + ctx.lock.deinitLock() ?ctx.reqSignal.close() ?ctx.reqReceivedSignal.close() freeShared(ctx) @@ -95,23 +98,34 @@ proc sendRequestToWakuThread*( userData: pointer, ): Result[void, string] = let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) + + # This lock is only necessary while we use a SP Channel and while the signalling + # between threads assumes that there aren't concurrent requests. + # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive + # requests concurrently and spare us the need of locks + ctx.lock.acquire() ## Sending the request let sentOk = ctx.reqChannel.trySend(req) if not sentOk: + ctx.lock.release() deallocShared(req) return err("Couldn't send a request to the waku thread: " & $req[]) let fireSyncRes = ctx.reqSignal.fireSync() if fireSyncRes.isErr(): + ctx.lock.release() deallocShared(req) return err("failed fireSync: " & $fireSyncRes.error) if fireSyncRes.get() == false: + ctx.lock.release() deallocShared(req) return err("Couldn't fireSync in time") ## wait until the Waku Thread properly received the request let res = ctx.reqReceivedSignal.waitSync() + ctx.lock.release() + if res.isErr(): deallocShared(req) return err("Couldn't receive reqReceivedSignal signal")