Skip to content

Commit

Permalink
Merge pull request #937 from permaweb/tillathehun0/cu-queue-mem-copy
Browse files Browse the repository at this point in the history
fix(cu): defer prep work for eval until right before work is being sent to worker
  • Loading branch information
TillaTheHun0 authored Aug 4, 2024
2 parents 86b6902 + 408089a commit b0f543f
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 127 deletions.
112 changes: 90 additions & 22 deletions servers/cu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions servers/cu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@
"bytes": "^3.1.2",
"cors": "^2.8.5",
"dataloader": "^2.2.2",
"debug": "^4.3.5",
"debug": "^4.3.6",
"dotenv": "^16.4.5",
"fast-glob": "^3.3.2",
"fastify": "^4.28.1",
"helmet": "^7.1.0",
"hyper-async": "^1.1.2",
"keccak": "^3.0.4",
"long-timeout": "^0.1.1",
"lru-cache": "^10.3.0",
"lru-cache": "^11.0.0",
"ms": "^2.1.3",
"opossum": "^8.1.4",
"p-map": "^7.0.2",
"p-queue": "^8.0.1",
"prom-client": "^15.1.3",
"ramda": "^0.30.1",
"undici": "^6.19.2",
"undici": "^6.19.5",
"warp-arbundles": "^1.0.4",
"workerpool": "^9.1.3",
"zod": "^3.23.8"
Expand Down
126 changes: 65 additions & 61 deletions servers/cu/src/domain/client/ao-module.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export function findModuleWith ({ db }) {
.toPromise()
}

export function evaluatorWith ({ evaluate, loadWasmModule }) {
export function evaluatorWith ({ evaluateWith, loadWasmModule }) {
const EVAL_DEFER_BACKPRESSURE = 10
return ({ moduleId, moduleOptions }) =>
of(moduleOptions)
Expand Down Expand Up @@ -141,75 +141,79 @@ export function evaluatorWith ({ evaluate, loadWasmModule }) {
*/
if (defer) await new Promise(resolve => setImmediate(resolve))

if (args.Memory) {
/**
* The ArrayBuffer is transferred to the worker as part of performing
* an evaluation. This transfer will subsequently detach any views, Buffers,
* and more broadly, references to the ArrayBuffer on this thread.
*
* So if this is the first eval being performed for the eval stream,
* then we copy the contents of the ArrayBuffer. That way, we can be sure
* that no references on the main thread will be affected during the eval stream
* transfers happening back and forth. This effectively give's each eval stream
* it's own ArrayBuffer to pass back and forth.
*
* (this is no worse than the structured clone that was happening before
* as part of message passing. But instead, the clone is only performed once,
* instead of on each evaluation)
*
* TODO: perhaps there is a way to somehow lock the ArrayBuffer usage
* instead of copying on first evaluation. We have to be careful that nothing
* (ie. a view of the ArrayBuffer in a Wasm Instnace dryrun)
* inadvertantly mutates the underlying ArrayBuffer
*/
if (args.first) {
let stopTimer = () => {}
if (args.Memory.byteLength > TWO_GB) {
stopTimer = timer('copyLargeMemory', {
streamId,
processId: args.processId,
byteLength: args.Memory.byteLength
}).stop
const prep = () => {
if (args.Memory) {
/**
* The ArrayBuffer is transferred to the worker as part of performing
* an evaluation. This transfer will subsequently detach any views, Buffers,
* and more broadly, references to the ArrayBuffer on this thread.
*
* So if this is the first eval being performed for the eval stream,
* then we copy the contents of the ArrayBuffer. That way, we can be sure
* that no references on the main thread will be affected during the eval stream
* transfers happening back and forth. This effectively give's each eval stream
* it's own ArrayBuffer to pass back and forth.
*
* (this is no worse than the structured clone that was happening before
* as part of message passing. But instead, the clone is only performed once,
* instead of on each evaluation)
*
* TODO: perhaps there is a way to somehow lock the ArrayBuffer usage
* instead of copying on first evaluation. We have to be careful that nothing
* (ie. a view of the ArrayBuffer in a Wasm Instnace dryrun)
* inadvertantly mutates the underlying ArrayBuffer
*/
if (args.first) {
let stopTimer = () => {}
if (args.Memory.byteLength > TWO_GB) {
stopTimer = timer('copyLargeMemory', {
streamId,
processId: args.processId,
byteLength: args.Memory.byteLength
}).stop
}
/**
* We must pass a view into copyBytesFrom,
*
* so we first check whether it already is or not,
* and create one on top of the ArrayBuffer if necessary
*
* (NodeJS' Buffer is a subclass of DataView)
*/
args.Memory = ArrayBuffer.isView(args.Memory)
? Buffer.copyBytesFrom(args.Memory)
: Buffer.copyBytesFrom(new Uint8Array(args.Memory))
stopTimer()
}

/**
* We must pass a view into copyBytesFrom,
* If Memory is sufficiently large, transferring the View somehow
* causes the underlying ArrayBuffer to be truncated. This truncation
* does not occur when instead the underlying ArrayBuffer is transferred,
* directly.
*
* So we always ensure the Memory transferred to the worker thread
* is the actual ArrayBuffer, and not a View.
*
* so we first check whether it already is or not,
* and create one on top of the ArrayBuffer if necessary
* (the same is done in the opposite direction in the worker thread)
*
* (NodeJS' Buffer is a subclass of DataView)
* TODO: maybe AoLoader should be made to return the underlying ArrayBuffer
* as Memory, instead of a View?
*/
args.Memory = ArrayBuffer.isView(args.Memory)
? Buffer.copyBytesFrom(args.Memory)
: Buffer.copyBytesFrom(new Uint8Array(args.Memory))
stopTimer()
args.Memory = arrayBufferFromMaybeView(args.Memory)

options = { transfer: [args.Memory] }
}

/**
* If Memory is sufficiently large, transferring the View somehow
* causes the underlying ArrayBuffer to be truncated. This truncation
* does not occur when instead the underlying ArrayBuffer is transferred,
* directly.
*
* So we always ensure the Memory transferred to the worker thread
* is the actual ArrayBuffer, and not a View.
*
* (the same is done in the opposite direction in the worker thread)
*
* TODO: maybe AoLoader should be made to return the underlying ArrayBuffer
* as Memory, instead of a View?
*/
args.Memory = arrayBufferFromMaybeView(args.Memory)

options = { transfer: [args.Memory] }
}
args.streamId = streamId
args.moduleId = moduleId
args.moduleOptions = moduleOptions
args.wasmModule = wasmModule

args.streamId = streamId
args.moduleId = moduleId
args.moduleOptions = moduleOptions
args.wasmModule = wasmModule
return [args, options]
}

return evaluate(args, options)
return evaluateWith(prep)
})
}))
.toPromise()
Expand Down
Loading

0 comments on commit b0f543f

Please sign in to comment.