Skip to content

Commit

Permalink
Merge branch 'main' of github.com:permaweb/ao into feat/fix-image-pip…
Browse files Browse the repository at this point in the history
…elines
  • Loading branch information
bredamatt committed Oct 11, 2024
2 parents 9958e50 + 5e24009 commit d917f69
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 30 deletions.
2 changes: 2 additions & 0 deletions loader/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

All notable changes to this project will be documented in this file. See [commit-and-tag-version](https://github.com/absolute-version/commit-and-tag-version) for commit guidelines.

## [0.0.38](https://github.com/permaweb/ao/compare/[email protected]@v0.0.38) (2024-10-03)

## [0.0.37](https://github.com/permaweb/ao/compare/[email protected]@v0.0.37) (2024-09-26)


Expand Down
4 changes: 2 additions & 2 deletions loader/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion loader/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@permaweb/ao-loader",
"version": "0.0.37",
"version": "0.0.38",
"repository": {
"type": "git",
"url": "https://github.com/permaweb/ao.git",
Expand Down
12 changes: 7 additions & 5 deletions loader/src/index.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ module.exports = async function (binary, options) {
} else {

if (typeof binary === "function") {
WebAssembly.instantiate = async function (wasm, info) {
const meteredWasm = metering.meterWASM(wasm, { meterType });
return originalInstantiate(meteredWasm, info);
};
// TODO: wasmMetering is currently disabled on
// WebAssembly.instantiate = async function (wasm, info) {
// const meteredWasm = metering.meterWASM(wasm, { meterType });
// return originalInstantiate(wasm, info);
// };
options.instantiateWasm = binary
} else {
binary = metering.meterWASM(binary, { meterType })
//binary = metering.meterWASM(binary, { meterType })
options.wasmBinary = binary
}

Expand Down Expand Up @@ -156,6 +157,7 @@ module.exports = async function (binary, options) {
}

return async (buffer, msg, env) => {

const originalRandom = Math.random
// const OriginalDate = Date
const originalLog = console.log
Expand Down
137 changes: 123 additions & 14 deletions servers/cu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions servers/cu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
},
"dependencies": {
"@fastify/middie": "^8.3.1",
"@permaweb/ao-loader": "^0.0.35",
"@permaweb/ao-loader": "^0.0.38",
"@permaweb/ao-scheduler-utils": "^0.0.19",
"@permaweb/weavedrive": "^0.0.6",
"@permaweb/weavedrive": "^0.0.8",
"arweave": "^1.15.1",
"async-lock": "^1.4.1",
"better-sqlite3": "^11.1.2",
Expand Down Expand Up @@ -47,4 +47,4 @@
"engines": {
"node": ">=18"
}
}
}
21 changes: 17 additions & 4 deletions servers/mu/src/domain/clients/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const counterWith = ({ prefix = 'ao_mu' } = {}) => {
}

export const gaugeWith = ({ prefix = 'ao_mu' } = {}) => {
return ({ name, description, collect, labelNames = [] }) => {
return ({ name, description, collect, labeledCollect, labelNames = [] }) => {
const g = new PromClient.Gauge({
name: `${prefix}_${name}`,
help: description,
Expand All @@ -60,9 +60,22 @@ export const gaugeWith = ({ prefix = 'ao_mu' } = {}) => {
* that simply returns the collected value to set,
* which will this call set here
*/
...(collect
? { collect: async function () { this.set(await collect()) } }
: {}
...(labeledCollect
? {
collect: async function () {
const labeledValues = await labeledCollect()
if (Array.isArray(labeledValues)) {
labeledValues.forEach(({ labelName, labelValue, value }) => {
if (labelName && labelValue && value) {
this.labels({ [labelName]: labelValue }).set(value)
}
})
}
}
}
: collect
? { collect: async function () { this.set(await collect()) } }
: {}
),
enableExemplars: true
})
Expand Down
17 changes: 16 additions & 1 deletion servers/mu/src/domain/clients/taskQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ import { TASKS_TABLE } from './sqlite.js'
*/
export async function createTaskQueue ({ queueId, logger, db }) {
logger({ log: `Initializing queue for queue index ${queueId}` })
function createDeleteQuery () {
return {
sql: `
DELETE FROM ${TASKS_TABLE}
WHERE queueId = ? AND timestamp < (strftime('%s', 'now') - 3600) * 1000;
`,
parameters: [
queueId
]
}
}

function createQuery () {
return {
sql: `
Expand All @@ -24,6 +36,8 @@ export async function createTaskQueue ({ queueId, logger, db }) {
}
}

await db.run(createDeleteQuery())
await db.run({ sql: 'VACUUM;', parameters: [] })
const queryResults = (await db.query(createQuery())).map((row) => ({ ...JSON.parse(row.data), dbId: row.id }))
const taskQueue = queryResults || []
return taskQueue
Expand Down Expand Up @@ -92,7 +106,7 @@ export function removeDequeuedTasksWith ({ dequeuedTasks, queueId, db }) {
return {
sql: `
DELETE FROM ${TASKS_TABLE}
WHERE id IN (${Array.from(dequeuedTasks).map(() => '?').join(',')})
WHERE id IN (${Array.from(dequeuedTasks).map(() => '?').join(',')}) OR timestamp < (strftime('%s', 'now') - 3600) * 1000;
`,
parameters: Array.from(dequeuedTasks)
}
Expand All @@ -104,6 +118,7 @@ export function removeDequeuedTasksWith ({ dequeuedTasks, queueId, db }) {
const query = createQuery(taskCopy)

db.run(query)
db.run({ sql: 'VACUUM;', parameters: [] })
taskCopy.forEach((task) => {
dequeuedTasks.delete(task)
})
Expand Down
20 changes: 20 additions & 0 deletions servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ export const createApis = async (ctx) => {
const TRACE_DB_URL = `${ctx.TRACE_DB_URL}.sqlite`
const traceDb = await SqliteClient.createSqliteClient({ url: TRACE_DB_URL, bootstrap: true, type: 'traces' })

// Create trace database metrics
MetricsClient.gaugeWith({})({
name: 'db_size',
description: 'The size of the databases',
labelNames: ['database'],
labeledCollect: async () => {
const taskPageSize = await db.pragma('page_size', { simple: true })
const taskPageCount = await db.pragma('page_count', { simple: true })

const tracePageSize = await traceDb.pragma('page_size', { simple: true })
const tracePageCount = await traceDb.pragma('page_count', { simple: true })

const labelValues = [
{ labelName: 'database', labelValue: 'task', value: taskPageSize * taskPageCount },
{ labelName: 'database', labelValue: 'trace', value: tracePageSize * tracePageCount }
]
return labelValues
}
})

/**
* Select queue ids from database on startup.
* This will allow us to "pick up" persisted tasks
Expand Down

0 comments on commit d917f69

Please sign in to comment.