Skip to content

Commit

Permalink
Merge pull request #1102 from permaweb/jfrain99/cu-fix-where-seq-clause
Browse files Browse the repository at this point in the history
fix(cu): select cached messages with correct seq where clause
  • Loading branch information
jfrain99 authored Jan 14, 2025
2 parents 284a0df + 0a7c374 commit 1fd0bff
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 16 deletions.
2 changes: 1 addition & 1 deletion servers/cu/src/effects/ao-block.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function saveBlocksWith ({ db }) {
VALUES
${new Array(blocks.length).fill('(?, ?, ?)').join(',\n')}
`,
parameters: blocks.map(props(['height', 'height', 'timestamp']))
parameters: db.engine === 'sqlite' ? blocks.map(props(['height', 'height', 'timestamp'])) : blocks.map(props(['height', 'height', 'timestamp'])).flat()
}
}
return (blocks) => {
Expand Down
27 changes: 27 additions & 0 deletions servers/cu/src/effects/ao-block.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('ao-block', () => {
const findBlocks = findBlocksSchema.implement(
findBlocksWith({
db: {
engine: 'sqlite',
query: async ({ parameters }) => {
assert.deepStrictEqual(parameters, [123, 456])
return [
Expand All @@ -41,6 +42,7 @@ describe('ao-block', () => {
const findBlocks = findBlocksSchema.implement(
findBlocksWith({
db: {
engine: 'sqlite',
query: async ({ parameters }) => []
}
})
Expand All @@ -56,6 +58,7 @@ describe('ao-block', () => {
const saveBlocks = saveBlocksSchema.implement(
saveBlocksWith({
db: {
engine: 'sqlite',
run: async ({ parameters }) => {
assert.deepStrictEqual(parameters, [
[123, 123, 123],
Expand All @@ -74,10 +77,33 @@ describe('ao-block', () => {
])
})

test('save the blocks, postgres', async () => {
const saveBlocks = saveBlocksSchema.implement(
saveBlocksWith({
db: {
engine: 'postgres',
run: async ({ parameters }) => {
assert.equal(parameters.length, 9)
assert.deepStrictEqual(parameters, [
123, 123, 123, 124, 124, 345, 125, 125, 456
])
}
}
})
)

await saveBlocks([
{ height: 123, timestamp: 123 },
{ height: 124, timestamp: 345 },
{ height: 125, timestamp: 456 }
])
})

test('should noop a block if it already exists the blocks', async () => {
const saveBlocks = saveBlocksSchema.implement(
saveBlocksWith({
db: {
engine: 'sqlite',
run: async ({ sql }) => {
assert.ok(sql.trim().startsWith('INSERT OR IGNORE'))
}
Expand All @@ -96,6 +122,7 @@ describe('ao-block', () => {
const saveBlocks = saveBlocksSchema.implement(
saveBlocksWith({
db: {
engine: 'sqlite',
run: async () => assert.fail('should not be called if no blocks')
}
})
Expand Down
51 changes: 40 additions & 11 deletions servers/cu/src/effects/ao-evaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,21 +245,50 @@ export function findEvaluationsWith ({ db }) {

export function findMessageBeforeWith ({ db }) {
function createQuery ({ messageId, deepHash, isAssignment, processId, nonce, epoch }) {
const sqliteQuery = `
SELECT
id, seq
FROM ${MESSAGES_TABLE}
WHERE
id = ?
AND processId = ?
AND (
CAST(substr(seq, instr(seq, ':') + 1) as UNSIGNED) < ?
OR
(
CAST(SUBSTR(seq, 1, INSTR(seq, ':') - 1) AS INTEGER) = ?
AND CAST(SUBSTR(seq, INSTR(seq, ':') + 1) AS INTEGER) < ?
)
)
LIMIT 1;
`

const postgresQuery = `
SELECT
id, seq
FROM ${MESSAGES_TABLE}
WHERE
"id" = ?
AND "processId" = ?
AND (
CAST(SUBSTR(seq, POSITION(':' in seq) + 1) AS INTEGER) < ?
OR
(
CAST(SUBSTR(seq, 1, POSITION(':' in seq) - 1) AS INTEGER) = ?
AND CAST(SUBSTR(seq, POSITION(':' in seq) + 1) AS INTEGER) < ?
)
)
LIMIT 1;
`

return {
sql: `
SELECT
id, seq
FROM ${MESSAGES_TABLE}
WHERE
id = ?
AND "processId" = ?
AND seq < ?
LIMIT 1;
`,
sql: db.engine === 'sqlite' ? sqliteQuery : postgresQuery,
parameters: [
createMessageId({ messageId, deepHash, isAssignment }),
processId,
`${epoch}:${nonce}` // 0:13
epoch,
epoch,
nonce
]
}
}
Expand Down
68 changes: 64 additions & 4 deletions servers/cu/src/effects/ao-evaluation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('ao-evaluation', () => {
const findEvaluation = findEvaluationSchema.implement(
findEvaluationWith({
db: {
engine: 'sqlite',
query: async ({ parameters }) => {
assert.deepStrictEqual(parameters, ['process-123,1702677252111,1'])

Expand Down Expand Up @@ -65,6 +66,7 @@ describe('ao-evaluation', () => {
const findEvaluation = findEvaluationSchema.implement(
findEvaluationWith({
db: {
engine: 'sqlite',
query: async () => []
},
logger
Expand Down Expand Up @@ -107,6 +109,7 @@ describe('ao-evaluation', () => {
const saveEvaluation = saveEvaluationSchema.implement(
saveEvaluationWith({
db: {
engine: 'sqlite',
transaction: async ([{ parameters: evaluationDocParams }, { parameters: messageDocParams }]) => {
assert.deepStrictEqual(evaluationDocParams, [
'process-123,1702677252111,1',
Expand Down Expand Up @@ -143,6 +146,7 @@ describe('ao-evaluation', () => {
const saveEvaluation = saveEvaluationSchema.implement(
saveEvaluationWith({
db: {
engine: 'sqlite',
transaction: async ([{ parameters: evaluationDocParams }, { parameters: messageDocParams }]) => {
assert.deepStrictEqual(evaluationDocParams, [
'process-123,1702677252111,1',
Expand Down Expand Up @@ -183,6 +187,7 @@ describe('ao-evaluation', () => {
const saveEvaluation = saveEvaluationSchema.implement(
saveEvaluationWith({
db: {
engine: 'sqlite',
transaction: async ([{ parameters: evaluationDocParams }, { parameters: messageDocParams }]) => {
assert.deepStrictEqual(evaluationDocParams, [
'process-123,1702677252111,1',
Expand Down Expand Up @@ -224,6 +229,7 @@ describe('ao-evaluation', () => {
const saveEvaluation = saveEvaluationSchema.implement(
saveEvaluationWith({
db: {
engine: 'sqlite',
transaction: async ([{ sql: evaluationDocSql }, { sql: messageDocSql }]) => {
assert.ok(evaluationDocSql.trim().startsWith(`INSERT OR IGNORE INTO ${EVALUATIONS_TABLE}`))
assert.ok(messageDocSql.trim().startsWith(`INSERT OR IGNORE INTO ${MESSAGES_TABLE}`))
Expand Down Expand Up @@ -255,6 +261,7 @@ describe('ao-evaluation', () => {
saveEvaluationWith({
DISABLE_PROCESS_EVALUATION_CACHE: true,
db: {
engine: 'sqlite',
transaction: async (statements) => {
assert.equal(statements.length, 1)
const [{ sql: messageDocSql }] = statements
Expand Down Expand Up @@ -299,6 +306,7 @@ describe('ao-evaluation', () => {
const findEvaluations = findEvaluationsSchema.implement(
findEvaluationsWith({
db: {
engine: 'sqlite',
query: async ({ sql, parameters }) => {
assert.ok(sql.includes('AND cron IS NOT NULL'))
assert.ok(sql.includes('timestamp ASC'))
Expand Down Expand Up @@ -340,6 +348,7 @@ describe('ao-evaluation', () => {
const findEvaluations = findEvaluationsSchema.implement(
findEvaluationsWith({
db: {
engine: 'sqlite',
query: async ({ sql, parameters }) => {
/**
* no onlyCron
Expand Down Expand Up @@ -378,11 +387,14 @@ describe('ao-evaluation', () => {
const findMessageBefore = findMessageBeforeSchema.implement(
findMessageBeforeWith({
db: {
engine: 'sqlite',
query: async ({ parameters }) => {
assert.deepStrictEqual(parameters, [
'deepHash-123',
'process-123',
'0:3'
0,
0,
3
])

const mockAssigment = {
Expand Down Expand Up @@ -414,11 +426,14 @@ describe('ao-evaluation', () => {
const findMessageBefore = findMessageBeforeSchema.implement(
findMessageBeforeWith({
db: {
engine: 'sqlite',
query: async ({ parameters }) => {
assert.deepStrictEqual(parameters, [
'message-123',
'process-123',
'0:3'
0,
0,
3
])

const mockAssigment = {
Expand Down Expand Up @@ -449,11 +464,55 @@ describe('ao-evaluation', () => {
const findMessageBefore = findMessageBeforeSchema.implement(
findMessageBeforeWith({
db: {
query: async ({ parameters }) => {
engine: 'sqlite',
query: async ({ sql, parameters }) => {
// Only the postgres engine uses POSITION
assert.ok(!sql.includes('POSITION'))
assert.deepStrictEqual(parameters, [
'message-123',
'process-123',
0,
0,
3
])

const mockAssigment = {
id: 'message-123',
processId: 'process-123',
seq: '0:3'
}

return [mockAssigment]
}
}
})
)

const res = await findMessageBefore({
processId: 'process-123',
messageId: 'message-123',
deepHash: 'deepHash-123',
isAssignment: true,
epoch: 0,
nonce: 3
})

assert.deepStrictEqual(res, { id: 'message-123' })
})
test('if it is an assignment, postgres', async () => {
const findMessageBefore = findMessageBeforeSchema.implement(
findMessageBeforeWith({
db: {
engine: 'postgres',
query: async ({ sql, parameters }) => {
// Only the postgres engine uses POSITION
assert.ok(sql.includes('POSITION'))
assert.deepStrictEqual(parameters, [
'message-123',
'process-123',
'0:3'
0,
0,
3
])

const mockAssigment = {
Expand Down Expand Up @@ -485,6 +544,7 @@ describe('ao-evaluation', () => {
const findMessageBefore = findMessageBeforeSchema.implement(
findMessageBeforeWith({
db: {
engine: 'sqlite',
query: async () => []
},
logger
Expand Down
1 change: 1 addition & 0 deletions servers/cu/src/effects/pg.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export async function createPostgresClient ({ url, bootstrap = false, ...rest })
}

return {
engine: 'postgres',
query: async ({ sql, parameters }) => pool.query(toOrdinals(sql), parameters).then(rows),
run: async ({ sql, parameters }) => pool.query(toOrdinals(sql), parameters).then(rows),
transaction: async (statements) => pool.connect()
Expand Down
1 change: 1 addition & 0 deletions servers/cu/src/effects/sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b
}

return {
engine: 'sqlite',
query: async ({ sql, parameters }) => db.prepare(sql).all(...parameters),
run: async ({ sql, parameters }) => db.prepare(sql).run(...parameters),
transaction: async (statements) => db.transaction(
Expand Down

0 comments on commit 1fd0bff

Please sign in to comment.