Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(cu/loader): Integrate WASM Metering into CU and Loader #1035

Merged
merged 6 commits into from
Oct 11, 2024
50 changes: 40 additions & 10 deletions loader/src/index.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,50 @@ const metering = require('@permaweb/wasm-metering')
* @param {Options} [options]
* @returns {Promise<handleFunction>}
*/

/*
* Custom WebAssembly.compileStreaming Implementation with WASM Metering
*
* This implementation overrides WebAssembly.compileStreaming to add metering
* to WebAssembly binaries. Metering enables tracking of resource usage (e.g.,
* CPU or memory) within the WebAssembly runtime, which is critical for monitoring
* performance and managing resource allocation in constrained environments.
*
* Why Metering?
* The CU (Compute Unit) calls WebAssembly.compileStreaming to execute WebAssembly
* modules, but to centralize and streamline metering, we handle it here in the loader
* instead of within each CU instance. By integrating metering directly into the
* loader, we ensure that all WebAssembly binaries are metered consistently across
* different CUs, reducing redundancy and enhancing modularity.
*/

// Override WebAssembly.compileStreaming to apply metering
WebAssembly.compileStreaming = async function (source, importObject = {}) {
// Read the response and convert it to an ArrayBuffer
const arrayBuffer = await source.arrayBuffer();

// Convert ArrayBuffer to Uint8Array for metering compatibility
const nodeBuffer = Buffer.from(arrayBuffer);

if(importObject.format === "wasm32-unknown-emscripten" || importObject.format === "wasm32-unknown-emscripten2" || importObject.format === "wasm32-unknown-emscripten3") {
return WebAssembly.compile(nodeBuffer);
}

let meterType = importObject.format.startsWith("wasm32") ? "i32" : "i64";

// Apply metering with the Uint8Array buffer
const meteredView = metering.meterWASM(nodeBuffer, { meterType });

// const meteredResponse = new Response(meteredBuffer, { headers: { 'Content-Type': 'application/wasm' } });
return WebAssembly.compile(meteredView.buffer);
};

module.exports = async function (binary, options) {
let instance = null
let doHandle = null

let meterType = options.format.startsWith("wasm32") ? "i32" : "i64";
const originalInstantiate = WebAssembly.instantiate;


if (options === null) {
options = { format: 'wasm32-unknown-emscripten' }
Expand All @@ -110,14 +148,9 @@ module.exports = async function (binary, options) {
} else {

if (typeof binary === "function") {
// TODO: wasmMetering is currently disabled on
// WebAssembly.instantiate = async function (wasm, info) {
// const meteredWasm = metering.meterWASM(wasm, { meterType });
// return originalInstantiate(wasm, info);
// };
options.instantiateWasm = binary
} else {
//binary = metering.meterWASM(binary, { meterType })
binary = metering.meterWASM(binary, { meterType })
options.wasmBinary = binary
}

Expand Down Expand Up @@ -152,9 +185,6 @@ module.exports = async function (binary, options) {
doHandle = instance.cwrap('handle', 'string', ['string', 'string'])
}

if (typeof binary === "function") {
WebAssembly.instantiate = originalInstantiate;
}

return async (buffer, msg, env) => {

Expand Down
3 changes: 2 additions & 1 deletion loader/test/emscripten2.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ describe('loader', async () => {
new Response(
Readable.toWeb(createReadStream('./test/process/process.wasm')),
{ headers: { 'Content-Type': 'application/wasm' } }
)
),
{ format: 'wasm32-unknown-emscripten2' }
)

const handle = await AoLoader((info, receiveInstance) => {
Expand Down
3 changes: 2 additions & 1 deletion loader/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ describe('loader', async () => {
new Response(
Readable.toWeb(createReadStream('./test/legacy/process.wasm')),
{ headers: { 'Content-Type': 'application/wasm' } }
)
),
{ format: 'wasm32-unknown-emscripten' }
)

const handle = await AoLoader((info, receiveInstance) => {
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const MODE = process.env.NODE_CONFIG_ENV

if (!MODE) throw new Error('NODE_CONFIG_ENV must be defined')

const DEFAULT_PROCESS_WASM_MODULE_FORMATS = ['wasm32-unknown-emscripten', 'wasm32-unknown-emscripten2', 'wasm64-unknown-emscripten-draft_2024_02_15']
const DEFAULT_PROCESS_WASM_MODULE_FORMATS = ['wasm32-unknown-emscripten', 'wasm32-unknown-emscripten2', 'wasm32-unknown-emscripten3', 'wasm32-unknown-emscripten4', 'wasm64-unknown-emscripten-draft_2024_02_15']

/**
* The server config is an extension of the config required by the domain (business logic).
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/effects/ao-module.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export function evaluatorWith ({ evaluateWith, loadWasmModule }) {
return (args) =>
Promise.resolve(!(backpressure = ++backpressure % EVAL_DEFER_BACKPRESSURE))
.then(async (defer) => {
if (!wasmModule) wasmModule = await loadWasmModule({ moduleId })
if (!wasmModule) wasmModule = await loadWasmModule({ moduleId, moduleOptions })
return defer
})
.then(async (defer) => {
Expand Down
14 changes: 7 additions & 7 deletions servers/cu/src/effects/wasm.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export function loadWasmModuleWith ({ fetch, ARWEAVE_URL, WASM_BINARY_FILE_DIREC
const readWasmFile = fromPromise(readWasmFileWith({ DIR: WASM_BINARY_FILE_DIRECTORY }))
const writeWasmFile = writeWasmFileWith({ DIR: WASM_BINARY_FILE_DIRECTORY })

const toWasmResponse = fromPromise((stream) => WebAssembly.compileStreaming(wasmResponse(Readable.toWeb(stream))))
const toWasmResponse = (moduleOptions) => fromPromise((stream) => WebAssembly.compileStreaming(wasmResponse(Readable.toWeb(stream), moduleOptions)))

function maybeCachedModule (args) {
const { moduleId } = args
Expand All @@ -147,16 +147,16 @@ export function loadWasmModuleWith ({ fetch, ARWEAVE_URL, WASM_BINARY_FILE_DIREC
}

function maybeStoredBinary (args) {
const { moduleId } = args
const { moduleId, moduleOptions } = args
logger('Checking for wasm file to load module "%s"...', moduleId)

return of(moduleId)
.chain(readWasmFile)
.chain(toWasmResponse)
.chain(toWasmResponse(moduleOptions))
.bimap(always(args), identity)
}

function loadTransaction ({ moduleId }) {
function loadTransaction ({ moduleId, moduleOptions }) {
logger('Loading wasm transaction "%s"...', moduleId)

return of(moduleId)
Expand All @@ -169,15 +169,15 @@ export function loadWasmModuleWith ({ fetch, ARWEAVE_URL, WASM_BINARY_FILE_DIREC
.chain(fromPromise(([s1, s2]) =>
Promise.all([
writeWasmFile(moduleId, Readable.fromWeb(s1)),
WebAssembly.compileStreaming(wasmResponse(s2))
WebAssembly.compileStreaming(wasmResponse(s2), moduleOptions)
])
))
.map(([, res]) => res)
}

const lock = new AsyncLock()

return ({ moduleId }) => {
return ({ moduleId, moduleOptions }) => {
/**
* Prevent multiple eval streams close together
* from compiling the wasm module multiple times
Expand All @@ -190,7 +190,7 @@ export function loadWasmModuleWith ({ fetch, ARWEAVE_URL, WASM_BINARY_FILE_DIREC
*
* then create the Wasm instance
*/
() => of({ moduleId })
() => of({ moduleId, moduleOptions })
.chain(maybeStoredBinary)
.bichain(loadTransaction, Resolved)
/**
Expand Down