Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for typedthreads data race issues #24612

Open
wants to merge 7 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions lib/std/private/threadtypes.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include system/inclrtl
import std/atomics

const hasSharedHeap* = defined(boehmgc) or defined(gogc) # don't share heaps; every thread has its own

Expand Down Expand Up @@ -161,16 +162,29 @@ type

const hasAllocStack* = defined(zephyr) # maybe freertos too?

type
Thread*[TArg] = object
core*: PGcThread
sys*: SysThread
when TArg is void:
dataFn*: proc () {.nimcall, gcsafe.}
else:
dataFn*: proc (m: TArg) {.nimcall, gcsafe.}
data*: TArg
when hasAllocStack:
rawStack*: pointer
when not defined(cpp):
type
Thread*[TArg] = object
core*: Atomic[PGcThread]
sys*: SysThread
when TArg is void:
dataFn*: Atomic[proc () {.nimcall, gcsafe.}]
else:
dataFn*: Atomic[proc (m: TArg) {.nimcall, gcsafe.}]
data*: Atomic[TArg]
when hasAllocStack:
rawStack*: pointer
else:
type
Thread*[TArg] = object
core*: PGcThread
sys*: SysThread
when TArg is void:
dataFn*: proc () {.nimcall, gcsafe.}
else:
dataFn*: proc (m: TArg) {.nimcall, gcsafe.}
data*: TArg
when hasAllocStack:
rawStack*: pointer

proc `=copy`*[TArg](x: var Thread[TArg], y: Thread[TArg]) {.error.}
107 changes: 88 additions & 19 deletions lib/std/typedthreads.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ deinitLock(l)
```
]##


import std/atomics
import std/private/[threadtypes]
export Thread

Expand Down Expand Up @@ -149,9 +149,12 @@ else:
nimThreadProcWrapperBody(closure)
{.pop.}

proc running*[TArg](t: Thread[TArg]): bool {.inline.} =
proc running*[TArg](t: var Thread[TArg]): bool {.inline.} =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the var here is a breaking change, why is it needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please ignore. If I manage to fix other issues that arise, this change will be removed with the next commit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the var here is a breaking change, why is it needed?

nim c --styleCheck:usages --styleCheck:error --verbosity:0 --hints:off --skipParentCfg --skipUserCfg --outdir:build '--nimcache:build/nimcache/$projectName' -d:metricsTest -d:metrics --threads:on -d:nimTypeNames --mm:refc -r tests/chronos_server_tests
/Users/runner/work/Nim/Nim/pkgstemp/metrics/metrics/chronos_httpserver.nim(400, 6) template/generic instantiation of async from here
/Users/runner/work/Nim/Nim/pkgstemp/metrics/metrics/chronos_httpserver.nim(403, 26) template/generic instantiation of running from here
/Users/runner/work/Nim/Nim/lib/std/typedthreads.nim(155, 22) Error: type mismatch
Expression: load(t.dataFn, moAcquireRelease)
[1] t.dataFn: Atomic[proc (m: MetricsServerData){.nimcall, gcsafe.}]
[2] moAcquireRelease: MemoryOrder

Expected one of (first mismatch at [position]):
[1] proc load[T: Trivial](location: var Atomic[T];
order: MemoryOrder = moSequentiallyConsistent): T
[1] proc load[T: not Trivial](location: var Atomic[T];
order: MemoryOrder = moSequentiallyConsistent): T
expression 't.dataFn' is immutable, not 'var'

## Returns true if `t` is running.
result = t.dataFn != nil
when not defined(cpp):
result = t.dataFn.load(moAcquireRelease) != nil
else:
result = t.dataFn != nil

proc handle*[TArg](t: Thread[TArg]): SysThread {.inline.} =
## Returns the thread handle of `t`.
Expand Down Expand Up @@ -202,11 +205,20 @@ when false:
else:
discard pthread_cancel(t.sys)
when declared(registerThread): unregisterThread(addr(t))
t.dataFn = nil
when not defined(cpp):
t.dataFn.store(nil, moAcquireRelease)
else:
t.dataFn = nil
## if thread `t` already exited, `t.core` will be `null`.
if not isNil(t.core):
deallocThreadStorage(t.core)
t.core = nil
when not defined(cpp):
var coreTmp = t.core.load(moAcquireRelease)
if not isNil(coreTmp):
deallocThreadStorage(coreTmp)
t.core.store(nil, moAcquireRelease)
else:
if not isNil(t.core):
deallocThreadStorage(t.core)
t.core = nil

when hostOS == "windows":
proc createThread*[TArg](t: var Thread[TArg],
Expand All @@ -217,11 +229,30 @@ when hostOS == "windows":
## Entry point is the proc `tp`.
## `param` is passed to `tp`. `TArg` can be `void` if you
## don't need to pass any data to the thread.
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
when not defined(cpp):
t.core.store(cast[PGcThread](allocThreadStorage(sizeof(GcThread))), moAcquireRelease)
else:
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))

when TArg isnot void:
when not defined(cpp):
t.data.store(param, moAcquireRelease)
else:
t.data = param

when not defined(cpp):
t.dataFn.store(tp, moAcquireRelease)
else:
t.dataFn = tp

when hasSharedHeap:
when not defined(cpp):
var core = cast[PGcThread](t.core.load(moAcquireRelease))
core.stackSize = ThreadStackSize
t.core.store(core, moAcquireRelease)
else:
t.core.stackSize = ThreadStackSize

when TArg isnot void: t.data = param
t.dataFn = tp
when hasSharedHeap: t.core.stackSize = ThreadStackSize
var dummyThreadId: int32 = 0'i32
t.sys = createThread(nil, ThreadStackSize, threadProcWrapper[TArg],
addr(t), 0'i32, dummyThreadId)
Expand All @@ -242,11 +273,30 @@ elif defined(genode):
proc createThread*[TArg](t: var Thread[TArg],
tp: proc (arg: TArg) {.thread, nimcall.},
param: TArg) =
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
when not defined(cpp):
t.core.store(cast[PGcThread](allocThreadStorage(sizeof(GcThread))), moAcquireRelease)
else:
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))

when TArg isnot void:
when not defined(cpp):
t.data.store(param, moAcquireRelease)
else:
t.data = param

when TArg isnot void: t.data = param
t.dataFn = tp
when hasSharedHeap: t.stackSize = ThreadStackSize
when not defined(cpp):
t.dataFn.store(tp, moAcquireRelease)
else:
t.dataFn = tp

when hasSharedHeap:
when not defined(cpp):
var core = cast[PGcThread](t.core.load(moAcquireRelease))
core.stackSize = ThreadStackSize
t.core.store(core, moAcquireRelease)
else:
t.core.stackSize = ThreadStackSize

t.sys.initThread(
runtimeEnv,
ThreadStackSize.culonglong,
Expand All @@ -266,11 +316,30 @@ else:
## Entry point is the proc `tp`. `param` is passed to `tp`.
## `TArg` can be `void` if you
## don't need to pass any data to the thread.
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
when not defined(cpp):
t.core.store(cast[PGcThread](allocThreadStorage(sizeof(GcThread))), moAcquireRelease)
else:
t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))

when TArg isnot void:
when not defined(cpp):
t.data.store(param, moAcquireRelease)
else:
t.data = param

when not defined(cpp):
t.dataFn.store(tp, moAcquireRelease)
else:
t.dataFn = tp

when hasSharedHeap:
when not defined(cpp):
var core = cast[PGcThread](t.core.load(moAcquireRelease))
core.stackSize = ThreadStackSize
t.core.store(core, moAcquireRelease)
else:
t.core.stackSize = ThreadStackSize

when TArg isnot void: t.data = param
t.dataFn = tp
when hasSharedHeap: t.core.stackSize = ThreadStackSize
var a {.noinit.}: Pthread_attr
doAssert pthread_attr_init(a) == 0
when hasAllocStack:
Expand Down
59 changes: 48 additions & 11 deletions lib/system/threadimpl.nim
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import std/atomics

var
nimThreadDestructionHandlers* {.rtlThreadVar.}: seq[proc () {.closure, gcsafe, raises: [].}]
when not defined(boehmgc) and not hasSharedHeap and not defined(gogc) and not defined(gcRegions):
Expand Down Expand Up @@ -50,10 +52,19 @@ when defined(boehmgc):
boehmGC_register_my_thread(sb)
try:
let thrd = cast[ptr Thread[TArg]](thrd)
when not defined(cpp):
let dataFn = thrd.dataFn.load(moAcquireRelease)
when TArg is void:
thrd.dataFn()
when not defined(cpp):
dataFn()
else:
thrd.dataFn()
else:
thrd.dataFn(thrd.data)
when not defined(cpp):
let data = thrd.data.load(moAcquireRelease)
dataFn(data)
else:
thrd.dataFn(thrd.data)
except:
threadTrouble()
finally:
Expand All @@ -62,15 +73,28 @@ when defined(boehmgc):
else:
proc threadProcWrapDispatch[TArg](thrd: ptr Thread[TArg]) {.raises: [].} =
try:
when not defined(cpp):
let dataFn = thrd.dataFn.load(moAcquireRelease)
when TArg is void:
thrd.dataFn()
when not defined(cpp):
dataFn()
else:
thrd.dataFn()
else:
when defined(nimV2):
thrd.dataFn(thrd.data)
when not defined(cpp):
let data = thrd.data.load(moAcquireRelease)
dataFn(data)
else:
thrd.dataFn(thrd.data)
else:
var x: TArg = default(TArg)
deepCopy(x, thrd.data)
thrd.dataFn(x)
when not defined(cpp):
deepCopy(x, thrd.data.load(moAcquireRelease))
dataFn(x)
else:
deepCopy(x, thrd.data)
thrd.dataFn(x)
except:
threadTrouble()
finally:
Expand All @@ -96,8 +120,16 @@ proc threadProcWrapStackFrame[TArg](thrd: ptr Thread[TArg]) {.raises: [].} =

template nimThreadProcWrapperBody*(closure: untyped): untyped =
var thrd = cast[ptr Thread[TArg]](closure)
var core = thrd.core
when declared(globalsSlot): threadVarSetValue(globalsSlot, thrd.core)
when not defined(cpp):
# Segfault after this line with --tlsEmulation:on
var core = cast[PGcThread](thrd.core.load(moAcquireRelease))
else:
var core = thrd.core
when declared(globalsSlot):
when not defined(cpp):
threadVarSetValue(globalsSlot, cast[PGcThread](thrd.core.load(moAcquireRelease)))
else:
threadVarSetValue(globalsSlot, thrd.core)
threadProcWrapStackFrame(thrd)
# Since an unhandled exception terminates the whole process (!), there is
# no need for a ``try finally`` here, nor would it be correct: The current
Expand All @@ -106,6 +138,11 @@ template nimThreadProcWrapperBody*(closure: untyped): untyped =
# page!

# mark as not running anymore:
thrd.core = nil
thrd.dataFn = nil
deallocThreadStorage(cast[pointer](core))
when not defined(cpp):
thrd.core.store(nil, moAcquireRelease)
thrd.dataFn.store(nil, moAcquireRelease)
deallocThreadStorage(cast[pointer](core))
else:
thrd.core = nil
thrd.dataFn = nil
deallocThreadStorage(cast[pointer](core))
Loading