Skip to content

Commit

Permalink
Fixes for typedthreads data race issues
Browse files Browse the repository at this point in the history
Modifications have been tested with examples in the open issue. All seem to work except one with Thread[Socket] because of ref SocketImpl. Needs testing in Linux and Windows, probably better with some existing software that uses typedthreads to make sure I have not broken it.
  • Loading branch information
mk1nz committed Jan 11, 2025
1 parent 41c447b commit d555881
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
9 changes: 5 additions & 4 deletions lib/std/private/threadtypes.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include system/inclrtl
import std/atomics

const hasSharedHeap* = defined(boehmgc) or defined(gogc) # don't share heaps; every thread has its own

Expand Down Expand Up @@ -163,13 +164,13 @@ const hasAllocStack* = defined(zephyr) # maybe freertos too?

type
Thread*[TArg] = object
core*: PGcThread
core*: Atomic[PGcThread]
sys*: SysThread
when TArg is void:
dataFn*: proc () {.nimcall, gcsafe.}
dataFn*: Atomic[proc () {.nimcall, gcsafe.}]
else:
dataFn*: proc (m: TArg) {.nimcall, gcsafe.}
data*: TArg
dataFn*: Atomic[proc (m: TArg) {.nimcall, gcsafe.}]
data*: Atomic[TArg]
when hasAllocStack:
rawStack*: pointer

Expand Down
22 changes: 11 additions & 11 deletions lib/std/typedthreads.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ deinitLock(l)
```
]##


import std/atomics
import std/private/[threadtypes]
export Thread

Expand Down Expand Up @@ -149,9 +149,9 @@ else:
nimThreadProcWrapperBody(closure)
{.pop.}

proc running*[TArg](t: Thread[TArg]): bool {.inline.} =
proc running*[TArg](t: var Thread[TArg]): bool {.inline.} =
## Returns true if `t` is running.
result = t.dataFn != nil
result = t.dataFn.load() != nil

proc handle*[TArg](t: Thread[TArg]): SysThread {.inline.} =
## Returns the thread handle of `t`.
Expand Down Expand Up @@ -202,7 +202,7 @@ when false:
else:
discard pthread_cancel(t.sys)
when declared(registerThread): unregisterThread(addr(t))
t.dataFn = nil
t.dataFn.store nil
## if thread `t` already exited, `t.core` will be `null`.
if not isNil(t.core):
deallocThreadStorage(t.core)
Expand All @@ -219,8 +219,8 @@ when hostOS == "windows":
## don't need to pass any data to the thread.
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))

when TArg isnot void: t.data = param
t.dataFn = tp
when TArg isnot void: t.data.store param
t.dataFn.store tp
when hasSharedHeap: t.core.stackSize = ThreadStackSize
var dummyThreadId: int32 = 0'i32
t.sys = createThread(nil, ThreadStackSize, threadProcWrapper[TArg],
Expand All @@ -244,8 +244,8 @@ elif defined(genode):
param: TArg) =
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))

when TArg isnot void: t.data = param
t.dataFn = tp
when TArg isnot void: t.data.store param
t.dataFn.store tp
when hasSharedHeap: t.stackSize = ThreadStackSize
t.sys.initThread(
runtimeEnv,
Expand All @@ -266,10 +266,10 @@ else:
## Entry point is the proc `tp`. `param` is passed to `tp`.
## `TArg` can be `void` if you
## don't need to pass any data to the thread.
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
t.core.store cast[PGcThread](allocThreadStorage(sizeof(GcThread)))

when TArg isnot void: t.data = param
t.dataFn = tp
when TArg isnot void: t.data.store param
t.dataFn.store tp
when hasSharedHeap: t.core.stackSize = ThreadStackSize
var a {.noinit.}: Pthread_attr
doAssert pthread_attr_init(a) == 0
Expand Down
23 changes: 14 additions & 9 deletions lib/system/threadimpl.nim
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import std/atomics

var
nimThreadDestructionHandlers* {.rtlThreadVar.}: seq[proc () {.closure, gcsafe, raises: [].}]
when not defined(boehmgc) and not hasSharedHeap and not defined(gogc) and not defined(gcRegions):
Expand Down Expand Up @@ -50,10 +52,11 @@ when defined(boehmgc):
boehmGC_register_my_thread(sb)
try:
let thrd = cast[ptr Thread[TArg]](thrd)
let dataFn = thrd.dataFn.load()
when TArg is void:
thrd.dataFn()
dataFn()
else:
thrd.dataFn(thrd.data)
dataFn(thrd.data)
except:
threadTrouble()
finally:
Expand All @@ -62,15 +65,16 @@ when defined(boehmgc):
else:
proc threadProcWrapDispatch[TArg](thrd: ptr Thread[TArg]) {.raises: [].} =
try:
let dataFn = thrd.dataFn.load()
when TArg is void:
thrd.dataFn()
dataFn()
else:
when defined(nimV2):
thrd.dataFn(thrd.data)
dataFn(thrd.data.load())
else:
var x: TArg = default(TArg)
deepCopy(x, thrd.data)
thrd.dataFn(x)
deepCopy(x, thrd.data.load())
dataFn(x)
except:
threadTrouble()
finally:
Expand All @@ -95,7 +99,8 @@ proc threadProcWrapStackFrame[TArg](thrd: ptr Thread[TArg]) {.raises: [].} =
threadProcWrapDispatch(thrd)

template nimThreadProcWrapperBody*(closure: untyped): untyped =
var thrd = cast[ptr Thread[TArg]](closure)
var thr = cast[Atomic[ptr Thread[TArg]]](closure)
var thrd = thr.load()
var core = thrd.core
when declared(globalsSlot): threadVarSetValue(globalsSlot, thrd.core)
threadProcWrapStackFrame(thrd)
Expand All @@ -106,6 +111,6 @@ template nimThreadProcWrapperBody*(closure: untyped): untyped =
# page!

# mark as not running anymore:
thrd.core = nil
thrd.dataFn = nil
thrd.core.store nil
thrd.dataFn.store nil
deallocThreadStorage(cast[pointer](core))

0 comments on commit d555881

Please sign in to comment.