From 42b33b1899bd6291c0042b9d1f1e9ec6ef37e5a9 Mon Sep 17 00:00:00 2001 From: Jack Frain Date: Tue, 7 Jan 2025 15:33:42 -0500 Subject: [PATCH 1/3] fix(cu): select cached messages with correct seq where clause --- servers/cu/src/effects/ao-evaluation.js | 15 ++++++++++++--- servers/cu/src/effects/ao-evaluation.test.js | 12 +++++++++--- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/servers/cu/src/effects/ao-evaluation.js b/servers/cu/src/effects/ao-evaluation.js index d6ef1eec7..e71fd7842 100644 --- a/servers/cu/src/effects/ao-evaluation.js +++ b/servers/cu/src/effects/ao-evaluation.js @@ -252,14 +252,23 @@ export function findMessageBeforeWith ({ db }) { FROM ${MESSAGES_TABLE} WHERE id = ? - AND "processId" = ? - AND seq < ? + AND processId = ? + AND ( + CAST(substr(seq, instr(seq, ':') + 1) as UNSIGNED) < ? + OR + ( + CAST(SUBSTR(seq, 1, INSTR(seq, ':') - 1) AS INTEGER) = ? + AND CAST(SUBSTR(seq, INSTR(seq, ':') + 1) AS INTEGER) < ? + ) + ) LIMIT 1; `, parameters: [ createMessageId({ messageId, deepHash, isAssignment }), processId, - `${epoch}:${nonce}` // 0:13 + epoch, + epoch, + nonce ] } } diff --git a/servers/cu/src/effects/ao-evaluation.test.js b/servers/cu/src/effects/ao-evaluation.test.js index 925c24e34..59b21ae1f 100644 --- a/servers/cu/src/effects/ao-evaluation.test.js +++ b/servers/cu/src/effects/ao-evaluation.test.js @@ -382,7 +382,9 @@ describe('ao-evaluation', () => { assert.deepStrictEqual(parameters, [ 'deepHash-123', 'process-123', - '0:3' + 0, + 0, + 3 ]) const mockAssigment = { @@ -418,7 +420,9 @@ describe('ao-evaluation', () => { assert.deepStrictEqual(parameters, [ 'message-123', 'process-123', - '0:3' + 0, + 0, + 3 ]) const mockAssigment = { @@ -453,7 +457,9 @@ describe('ao-evaluation', () => { assert.deepStrictEqual(parameters, [ 'message-123', 'process-123', - '0:3' + 0, + 0, + 3 ]) const mockAssigment = { From e37e8f272dc97ecdcd100791f6121ba4e4be7b8f Mon Sep 17 00:00:00 2001 From: Jack Frain Date: Mon, 13 Jan 2025 16:55:02 -0500 Subject: [PATCH 2/3] fix(cu): optimize query for postgres --- servers/cu/src/effects/ao-block.js | 2 +- servers/cu/src/effects/ao-evaluation.js | 54 +++++++++++++++++-------- servers/cu/src/effects/pg.js | 1 + servers/cu/src/effects/sqlite.js | 1 + 4 files changed, 40 insertions(+), 18 deletions(-) diff --git a/servers/cu/src/effects/ao-block.js b/servers/cu/src/effects/ao-block.js index 00136a83e..226792237 100644 --- a/servers/cu/src/effects/ao-block.js +++ b/servers/cu/src/effects/ao-block.js @@ -33,7 +33,7 @@ export function saveBlocksWith ({ db }) { VALUES ${new Array(blocks.length).fill('(?, ?, ?)').join(',\n')} `, - parameters: blocks.map(props(['height', 'height', 'timestamp'])) + parameters: db.engine === 'sqlite' ? blocks.map(props(['height', 'height', 'timestamp'])) : blocks.map(props(['height', 'height', 'timestamp'])).flat() } } return (blocks) => { diff --git a/servers/cu/src/effects/ao-evaluation.js b/servers/cu/src/effects/ao-evaluation.js index e71fd7842..177ecc9f3 100644 --- a/servers/cu/src/effects/ao-evaluation.js +++ b/servers/cu/src/effects/ao-evaluation.js @@ -245,24 +245,44 @@ export function findEvaluationsWith ({ db }) { export function findMessageBeforeWith ({ db }) { function createQuery ({ messageId, deepHash, isAssignment, processId, nonce, epoch }) { + const sqliteQuery = ` + SELECT + id, seq + FROM ${MESSAGES_TABLE} + WHERE + id = ? + AND processId = ? + AND ( + CAST(substr(seq, instr(seq, ':') + 1) as UNSIGNED) < ? + OR + ( + CAST(SUBSTR(seq, 1, INSTR(seq, ':') - 1) AS INTEGER) = ? + AND CAST(SUBSTR(seq, INSTR(seq, ':') + 1) AS INTEGER) < ? + ) + ) + LIMIT 1; + ` + + const postgresQuery = ` + SELECT + id, seq + FROM ${MESSAGES_TABLE} + WHERE + "id" = ? + AND "processId" = ? + AND ( + CAST(SUBSTR(seq, POSITION(':' in seq) + 1) AS INTEGER) < ? + OR + ( + CAST(SUBSTR(seq, 1, POSITION(':' in seq) - 1) AS INTEGER) = ? + AND CAST(SUBSTR(seq, POSITION(':' in seq) + 1) AS INTEGER) < ? + ) + ) + LIMIT 1; + ` + return { - sql: ` - SELECT - id, seq - FROM ${MESSAGES_TABLE} - WHERE - id = ? - AND processId = ? - AND ( - CAST(substr(seq, instr(seq, ':') + 1) as UNSIGNED) < ? - OR - ( - CAST(SUBSTR(seq, 1, INSTR(seq, ':') - 1) AS INTEGER) = ? - AND CAST(SUBSTR(seq, INSTR(seq, ':') + 1) AS INTEGER) < ? - ) - ) - LIMIT 1; - `, + sql: db.engine === 'sqlite' ? sqliteQuery : postgresQuery, parameters: [ createMessageId({ messageId, deepHash, isAssignment }), processId, diff --git a/servers/cu/src/effects/pg.js b/servers/cu/src/effects/pg.js index 88e810af1..bd9b13819 100644 --- a/servers/cu/src/effects/pg.js +++ b/servers/cu/src/effects/pg.js @@ -31,6 +31,7 @@ export async function createPostgresClient ({ url, bootstrap = false, ...rest }) } return { + engine: 'postgres', query: async ({ sql, parameters }) => pool.query(toOrdinals(sql), parameters).then(rows), run: async ({ sql, parameters }) => pool.query(toOrdinals(sql), parameters).then(rows), transaction: async (statements) => pool.connect() diff --git a/servers/cu/src/effects/sqlite.js b/servers/cu/src/effects/sqlite.js index 95f968afb..c977e891a 100644 --- a/servers/cu/src/effects/sqlite.js +++ b/servers/cu/src/effects/sqlite.js @@ -141,6 +141,7 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b } return { + engine: 'sqlite', query: async ({ sql, parameters }) => db.prepare(sql).all(...parameters), run: async ({ sql, parameters }) => db.prepare(sql).run(...parameters), transaction: async (statements) => db.transaction( From 0a7c3743fccb0707f62327290f95f0ce40075c6f Mon Sep 17 00:00:00 2001 From: Jack Frain Date: Mon, 13 Jan 2025 17:08:33 -0500 Subject: [PATCH 3/3] fix(cu): add tests for new postgres queries --- servers/cu/src/effects/ao-block.test.js | 27 ++++++++++ servers/cu/src/effects/ao-evaluation.test.js | 56 +++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/servers/cu/src/effects/ao-block.test.js b/servers/cu/src/effects/ao-block.test.js index ac4d9e273..e5b107655 100644 --- a/servers/cu/src/effects/ao-block.test.js +++ b/servers/cu/src/effects/ao-block.test.js @@ -17,6 +17,7 @@ describe('ao-block', () => { const findBlocks = findBlocksSchema.implement( findBlocksWith({ db: { + engine: 'sqlite', query: async ({ parameters }) => { assert.deepStrictEqual(parameters, [123, 456]) return [ @@ -41,6 +42,7 @@ describe('ao-block', () => { const findBlocks = findBlocksSchema.implement( findBlocksWith({ db: { + engine: 'sqlite', query: async ({ parameters }) => [] } }) @@ -56,6 +58,7 @@ describe('ao-block', () => { const saveBlocks = saveBlocksSchema.implement( saveBlocksWith({ db: { + engine: 'sqlite', run: async ({ parameters }) => { assert.deepStrictEqual(parameters, [ [123, 123, 123], @@ -74,10 +77,33 @@ describe('ao-block', () => { ]) }) + test('save the blocks, postgres', async () => { + const saveBlocks = saveBlocksSchema.implement( + saveBlocksWith({ + db: { + engine: 'postgres', + run: async ({ parameters }) => { + assert.equal(parameters.length, 9) + assert.deepStrictEqual(parameters, [ + 123, 123, 123, 124, 124, 345, 125, 125, 456 + ]) + } + } + }) + ) + + await saveBlocks([ + { height: 123, timestamp: 123 }, + { height: 124, timestamp: 345 }, + { height: 125, timestamp: 456 } + ]) + }) + test('should noop a block if it already exists the blocks', async () => { const saveBlocks = saveBlocksSchema.implement( saveBlocksWith({ db: { + engine: 'sqlite', run: async ({ sql }) => { assert.ok(sql.trim().startsWith('INSERT OR IGNORE')) } @@ -96,6 +122,7 @@ describe('ao-block', () => { const saveBlocks = saveBlocksSchema.implement( saveBlocksWith({ db: { + engine: 'sqlite', run: async () => assert.fail('should not be called if no blocks') } }) diff --git a/servers/cu/src/effects/ao-evaluation.test.js b/servers/cu/src/effects/ao-evaluation.test.js index 59b21ae1f..04a78be8a 100644 --- a/servers/cu/src/effects/ao-evaluation.test.js +++ b/servers/cu/src/effects/ao-evaluation.test.js @@ -17,6 +17,7 @@ describe('ao-evaluation', () => { const findEvaluation = findEvaluationSchema.implement( findEvaluationWith({ db: { + engine: 'sqlite', query: async ({ parameters }) => { assert.deepStrictEqual(parameters, ['process-123,1702677252111,1']) @@ -65,6 +66,7 @@ describe('ao-evaluation', () => { const findEvaluation = findEvaluationSchema.implement( findEvaluationWith({ db: { + engine: 'sqlite', query: async () => [] }, logger @@ -107,6 +109,7 @@ describe('ao-evaluation', () => { const saveEvaluation = saveEvaluationSchema.implement( saveEvaluationWith({ db: { + engine: 'sqlite', transaction: async ([{ parameters: evaluationDocParams }, { parameters: messageDocParams }]) => { assert.deepStrictEqual(evaluationDocParams, [ 'process-123,1702677252111,1', @@ -143,6 +146,7 @@ describe('ao-evaluation', () => { const saveEvaluation = saveEvaluationSchema.implement( saveEvaluationWith({ db: { + engine: 'sqlite', transaction: async ([{ parameters: evaluationDocParams }, { parameters: messageDocParams }]) => { assert.deepStrictEqual(evaluationDocParams, [ 'process-123,1702677252111,1', @@ -183,6 +187,7 @@ describe('ao-evaluation', () => { const saveEvaluation = saveEvaluationSchema.implement( saveEvaluationWith({ db: { + engine: 'sqlite', transaction: async ([{ parameters: evaluationDocParams }, { parameters: messageDocParams }]) => { assert.deepStrictEqual(evaluationDocParams, [ 'process-123,1702677252111,1', @@ -224,6 +229,7 @@ describe('ao-evaluation', () => { const saveEvaluation = saveEvaluationSchema.implement( saveEvaluationWith({ db: { + engine: 'sqlite', transaction: async ([{ sql: evaluationDocSql }, { sql: messageDocSql }]) => { assert.ok(evaluationDocSql.trim().startsWith(`INSERT OR IGNORE INTO ${EVALUATIONS_TABLE}`)) assert.ok(messageDocSql.trim().startsWith(`INSERT OR IGNORE INTO ${MESSAGES_TABLE}`)) @@ -255,6 +261,7 @@ describe('ao-evaluation', () => { saveEvaluationWith({ DISABLE_PROCESS_EVALUATION_CACHE: true, db: { + engine: 'sqlite', transaction: async (statements) => { assert.equal(statements.length, 1) const [{ sql: messageDocSql }] = statements @@ -299,6 +306,7 @@ describe('ao-evaluation', () => { const findEvaluations = findEvaluationsSchema.implement( findEvaluationsWith({ db: { + engine: 'sqlite', query: async ({ sql, parameters }) => { assert.ok(sql.includes('AND cron IS NOT NULL')) assert.ok(sql.includes('timestamp ASC')) @@ -340,6 +348,7 @@ describe('ao-evaluation', () => { const findEvaluations = findEvaluationsSchema.implement( findEvaluationsWith({ db: { + engine: 'sqlite', query: async ({ sql, parameters }) => { /** * no onlyCron @@ -378,6 +387,7 @@ describe('ao-evaluation', () => { const findMessageBefore = findMessageBeforeSchema.implement( findMessageBeforeWith({ db: { + engine: 'sqlite', query: async ({ parameters }) => { assert.deepStrictEqual(parameters, [ 'deepHash-123', @@ -416,6 +426,7 @@ describe('ao-evaluation', () => { const findMessageBefore = findMessageBeforeSchema.implement( findMessageBeforeWith({ db: { + engine: 'sqlite', query: async ({ parameters }) => { assert.deepStrictEqual(parameters, [ 'message-123', @@ -453,7 +464,49 @@ describe('ao-evaluation', () => { const findMessageBefore = findMessageBeforeSchema.implement( findMessageBeforeWith({ db: { - query: async ({ parameters }) => { + engine: 'sqlite', + query: async ({ sql, parameters }) => { + // Only the postgres engine uses POSITION + assert.ok(!sql.includes('POSITION')) + assert.deepStrictEqual(parameters, [ + 'message-123', + 'process-123', + 0, + 0, + 3 + ]) + + const mockAssigment = { + id: 'message-123', + processId: 'process-123', + seq: '0:3' + } + + return [mockAssigment] + } + } + }) + ) + + const res = await findMessageBefore({ + processId: 'process-123', + messageId: 'message-123', + deepHash: 'deepHash-123', + isAssignment: true, + epoch: 0, + nonce: 3 + }) + + assert.deepStrictEqual(res, { id: 'message-123' }) + }) + test('if it is an assignment, postgres', async () => { + const findMessageBefore = findMessageBeforeSchema.implement( + findMessageBeforeWith({ + db: { + engine: 'postgres', + query: async ({ sql, parameters }) => { + // Only the postgres engine uses POSITION + assert.ok(sql.includes('POSITION')) assert.deepStrictEqual(parameters, [ 'message-123', 'process-123', @@ -491,6 +544,7 @@ describe('ao-evaluation', () => { const findMessageBefore = findMessageBeforeSchema.implement( findMessageBeforeWith({ db: { + engine: 'sqlite', query: async () => [] }, logger