Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: supporting parallel libwaku requests #3273

Closed
wants to merge 8 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion library/waku_thread/waku_thread.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}

import std/[options, atomics, os, net]
import std/[options, atomics, os, net, locks]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types

type WakuContext* = object
thread: Thread[(ptr WakuContext)]
lock: Lock
reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest]
reqSignal: ThreadSignalPtr
# to inform The Waku Thread (a.k.a TWT) that a new request is sent
Expand Down Expand Up @@ -59,6 +60,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.lock.initLock()

ctx.running.store(true)

Expand All @@ -81,6 +83,7 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
return err("failed to signal reqSignal on time in destroyWakuThread")

joinThread(ctx.thread)
ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)
Expand All @@ -95,23 +98,34 @@ proc sendRequestToWakuThread*(
userData: pointer,
): Result[void, string] =
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)

# This lock is only necessary while we use a SP Channel and while the signalling
# between threads assumes that there aren't concurrent requests.
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
# requests concurrently and spare us the need of locks
ctx.lock.acquire()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens with this lock during the return err... below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the lock even here? channels are mpmc, you should not need a lock..

Copy link
Member

@richard-ramos richard-ramos Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a ChannelSPSCSingle channel from taskpools. According to comments in its code, it can only buffer a single element. We observed the following behavior: When you try to call trySend more than once without having received any pending value on the other end of the channel, trySend will return false. With this lock we wait until the other end receives the value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if we should instead use nim's builtin Channel instead of ChannelSPSCSingle. I just saw that it has a send that is blocking, as well as allowing to specify a max number of items in its buffer... 🤔

Copy link
Contributor Author

@gabrielmer gabrielmer Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! So I first tried to move it to a Channel and had race conditions, not because of the Channel itself but because of how we designed the mechanism of sending signals between threads. It is designed in such a way that assumes that a signal won't be sent if we have a pending request.

I first wanted to try if we could have it working without redesigning too much. Given that the design already assumed there won't be concurrent requests to the channel and the only way for it to work was adding a lock, then ChannelSPSCSingle worked perfectly too so I switched back to it.

After attempting to add a lock to the existing code, it resolved all the race conditions and performed really well - the only period it is locked is between the time a request is received to the moment it starts getting processed, but multiple requests can be processed in parallel.

I actually commented @Ivansete-status that we could refactor and redesign the mechanism of sending signals between threads to work without assuming that requests come one at a time but the current version performed so well that it seems to be an optimization that we can take care of if we see that this becomes a bottleneck.

Not sure if it makes sense, lmk if that's not the case :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

starts getting processed, but multiple requests can be processed in parallel.

"starts getting processed" in the loose sense of the term here, ie if the processing thread is running a synchronous computation that takes a while, it will not be removing things from the queue so all the other requests will form a line - the only "parallel" processing you get is what async gives you and these requests might end up getting blocked. This is probably worth documenting at least and it is what an MP queue would solve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"starts getting processed" in the loose sense of the term here, ie if the processing thread is running a synchronous computation that takes a while, it will not be removing things from the queue so all the other requests will form a line - the only "parallel" processing you get is what async gives you and these requests might end up getting blocked

True! But isn't that the case also with a MP queue? in the sense that if there's a sync operation in the processing thread that takes lots of computation, the queue will start getting filled with requests too.

The MP queue helps us add elements to the queue concurrently, but the actual execution of the requests should be the same in both cases right? or maybe I'm missing something?

Copy link
Contributor Author

@gabrielmer gabrielmer Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should document this! It's clear that a MP queue is a better design and we should not forget about it, this PR is only based on the fact that this solution works really well without the need of rearchitecting the logic of signaling between threads - so we can continue focusing on developing/fixing things critical for the integration of nwaku into Status Desktop and then optimize if needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the queue will start getting filled with requests too.

the queue will hold the work but the thread that places the work on the queue can move on - with the lock, it will instead be blocked until the receiver has removed the item.

the "usual" way to do mp*c queues is to bound them to a fixed number of items and have a policy (block/drop) for what to do when the consumer is too slow - but when there's only one spot in the queue the blocking unnecessarily happens during short bursts even if the average production is lower than the capacity of the consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh makes sense! Didn't think about that!
So 100% let's take care of it now - it can have a greater performance impact than what I envisioned.

Will close this PR and work on a MP*C solution.

Thanks so much for the help!

## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
ctx.lock.release()
deallocShared(req)
return err("Couldn't send a request to the waku thread: " & $req[])

let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
ctx.lock.release()
deallocShared(req)
return err("failed fireSync: " & $fireSyncRes.error)

if fireSyncRes.get() == false:
ctx.lock.release()
deallocShared(req)
return err("Couldn't fireSync in time")

## wait until the Waku Thread properly received the request
let res = ctx.reqReceivedSignal.waitSync()
ctx.lock.release()

if res.isErr():
deallocShared(req)
return err("Couldn't receive reqReceivedSignal signal")
Expand Down
Loading