Skip to content

Commit

Permalink
chore(cu): more optimizations to prevent excessive 409s cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Jan 23, 2024
1 parent 76c3ca5 commit 087a47f
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 74 deletions.
59 changes: 49 additions & 10 deletions servers/cu/src/domain/client/pouchdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ const processDocSchema = z.object({
type: z.literal('process')
})

const evaluationDocSchema = z.object({
_id: z.string().min(1),
processId: evaluationSchema.shape.processId,
messageId: evaluationSchema.shape.messageId,
timestamp: evaluationSchema.shape.timestamp,
ordinate: evaluationSchema.shape.ordinate,
blockHeight: evaluationSchema.shape.blockHeight,
parent: z.string().min(1),
evaluatedAt: evaluationSchema.shape.evaluatedAt,
output: evaluationSchema.shape.output,
type: z.literal('evaluation')
})

const messageHashDocSchema = z.object({
_id: z.string().min(1),
/**
Expand Down Expand Up @@ -228,6 +241,40 @@ export function saveProcessWith ({ pouchDb }) {
}
}

export function findEvaluationWith ({ pouchDb = internalPouchDb }) {
const memoryLens = lensPath(['output', 'Memory'])

return ({ processId, to, ordinate, cron }) => {
return of({ processId, to, ordinate, cron })
.chain(fromPromise(() => pouchDb.get(createEvaluationId({ processId, timestamp: to, ordinate, cron }))))
.bichain(
(err) => {
if (err.status === 404) return Rejected({ status: 404 })
return Rejected(err)
},
(found) => of(found)
/**
* Also retrieve the state buffer, persisted as an attachment
* and set it on the output.Memory field to match the expected output shape
*/
.chain(fromPromise(async (doc) => {
const buffer = await pouchDb.getAttachment(doc._id, 'memory.txt')
/**
* Make sure to decompress the state buffer
*/
return set(memoryLens, await inflateP(buffer), doc)
}))
/**
* Ensure the input matches the expected
* shape
*/
.map(evaluationDocSchema.parse)
.map(toEvaluation)
)
.toPromise()
}
}

export function findLatestEvaluationWith ({ pouchDb = internalPouchDb }) {
function createQuery ({ processId, to, ordinate, cron }) {
const query = {
Expand Down Expand Up @@ -266,17 +313,9 @@ export function findLatestEvaluationWith ({ pouchDb = internalPouchDb }) {
* Criteria was provided, so overwrite upper range with actual upper range
*/
if (to || ordinate || cron) {
query.selector._id.$lte = `${createEvaluationId({ processId, timestamp: to, ordinate, cron })}`
query.selector._id.$lte =
`${createEvaluationId({ processId, timestamp: to, ordinate, cron })}${COLLATION_SEQUENCE_MAX_CHAR}`
}
/**
* No cron was provided, which means we're looking
* for a Scheduled Message's eval.
*
* So in this case, fetch the latest eval that is NOT the result of a Cron Message.
* We do this, so the Schedule Message's result is returned, and not a Cron Message's,
* whose schedule happened to coincide with the Scheduled Messages timestamp.
*/
if (!cron) query.selector.cron = { $exists: false }

return query
}
Expand Down
55 changes: 1 addition & 54 deletions servers/cu/src/domain/client/pouchdb.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,14 @@ describe('pouchdb', () => {
})

describe('findLatestEvaluation', () => {
test('return the lastest evaluation for the process, including from Cron Messages', async () => {
test('return the lastest evaluation for the process', async () => {
const evaluatedAt = new Date().toISOString()
const Memory = Buffer.from('Hello World', 'utf-8')

const findLatestEvaluation = findLatestEvaluationSchema.implement(
findLatestEvaluationWith({
pouchDb: {
find: async (op) => {
assert.equal(op.selector.cron, undefined)

return {
docs: [
{
Expand Down Expand Up @@ -216,57 +214,6 @@ describe('pouchdb', () => {
assert.equal(res.evaluatedAt.toISOString(), evaluatedAt)
})

test("with 'to' and 'ordinate', return the lastest evaluation from a scheduled message", async () => {
const evaluatedAt = new Date().toISOString()
const Memory = Buffer.from('Hello World', 'utf-8')

const findLatestEvaluation = findLatestEvaluationSchema.implement(
findLatestEvaluationWith({
pouchDb: {
find: async (op) => {
assert.deepStrictEqual(op.selector.cron, { $exists: false })

return {
docs: [
{
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
parent: 'proc-process-123',
output: { Messages: [{ foo: 'bar' }] },
evaluatedAt,
type: 'evaluation'
}
]
}
},
getAttachment: async (_id, name) => {
assert.equal(_id, 'eval-process-123,1702677252111')
assert.equal(name, 'memory.txt')
// impl will inflate this buffer
return deflateP(Memory)
}
},
logger
}))

const res = await findLatestEvaluation({
processId: 'process-123',
to: 1702677252111,
ordinate: '1'
})

assert.equal(res.timestamp, 1702677252111)
assert.equal(res.ordinate, '1')
assert.equal(res.blockHeight, 1234)
assert.equal(res.processId, 'process-123')
assert.deepStrictEqual(res.output, { Memory, Messages: [{ foo: 'bar' }] })
assert.equal(res.evaluatedAt.toISOString(), evaluatedAt)
})

test('rejects if no interaction is found', async () => {
const findLatestEvaluation = findLatestEvaluationSchema.implement(
findLatestEvaluationWith({
Expand Down
4 changes: 3 additions & 1 deletion servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const saveProcessSchema = z.function()
.args(processSchema)
.returns(z.promise(z.any()))

export const findLatestEvaluationSchema = z.function()
export const findEvaluationSchema = z.function()
.args(z.object({
processId: z.string(),
to: z.coerce.number().nullish(),
Expand All @@ -56,6 +56,8 @@ export const findLatestEvaluationSchema = z.function()
}))
.returns(z.promise(evaluationSchema))

export const findLatestEvaluationSchema = findEvaluationSchema

export const saveEvaluationSchema = z.function()
.args(evaluationSchema.extend({ deepHash: z.string().nullish() }))
.returns(z.promise(z.any()))
Expand Down
1 change: 1 addition & 0 deletions servers/cu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export const createApis = async (ctx) => {
loadBlocksMeta: GatewayClient.loadBlocksMetaWith({ fetch: ctx.fetch, GATEWAY_URL: ctx.GATEWAY_URL, pageSize: 90, logger: logger.child('gateway') }),
findProcess: PouchDbClient.findProcessWith({ pouchDb, logger }),
saveProcess: PouchDbClient.saveProcessWith({ pouchDb, logger }),
findEvaluation: PouchDbClient.findEvaluationWith({ pouchDb, logger }),
findLatestEvaluation: PouchDbClient.findLatestEvaluationWith({ pouchDb, logger }),
saveEvaluation: PouchDbClient.saveEvaluationWith({ pouchDb, logger }),
findMessageHash: PouchDbClient.findMessageHashWith({ pouchDb, logger }),
Expand Down
12 changes: 7 additions & 5 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { messageSchema, streamSchema } from '../model.js'
import { loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, locateSchedulerSchema } from '../dal.js'
import { trimSlash } from '../utils.js'

export const toSeconds = (millis) => Math.floor(millis / 1000)

/**
* - { name: 'Cron-Interval', value: 'interval' }
* - { name: 'Cron-Tag-Foo', value: 'Bar' }
Expand Down Expand Up @@ -39,7 +41,7 @@ export function parseCrons ({ tags }) {
* TODO: harden
*/
[T, pipe(
always({ interval, unit: 'seconds', value: Math.floor(ms([value, unit].join(' ')) / 1000) }),
always({ interval, unit: 'seconds', value: toSeconds(ms([value, unit].join(' '))) }),
(cron) => {
if (cron.value <= 0) throw new Error('time-based cron cannot be less than 1 second')
return cron
Expand Down Expand Up @@ -129,8 +131,8 @@ export function isTimestampOnCron ({ timestamp, originTimestamp, cron }) {
*
* So convert the times to seconds perform applying modulo
*/
timestamp = Math.floor(timestamp / 1000)
originTimestamp = Math.floor(originTimestamp / 1000)
timestamp = toSeconds(timestamp)
originTimestamp = toSeconds(originTimestamp)
/**
* don't count the origin timestamp as a match
*/
Expand Down Expand Up @@ -193,7 +195,7 @@ export function cronMessagesBetweenWith ({
* before time-based messages
*/
const nextBlock = blocksInRange[0]
if (nextBlock && Math.floor(curTimestamp / 1000) >= Math.floor(nextBlock.timestamp / 1000)) {
if (nextBlock && toSeconds(curTimestamp) >= toSeconds(nextBlock.timestamp)) {
/**
* Make sure to remove the block from our range,
* since we've ticked past it,
Expand Down Expand Up @@ -446,7 +448,7 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
* this last tuple won't be fired
*/
const rightMostBlock = last(blocksMeta)
if (prev.block.timestamp < rightMostBlock.timestamp) {
if (toSeconds(prev.block.timestamp) < toSeconds(rightMostBlock.timestamp)) {
logger(
'rightMostBlock at timestamp "%s" is later than latest message at timestamp "%s". Emitting extra set of boundaries on end...',
rightMostBlock.timestamp,
Expand Down
6 changes: 5 additions & 1 deletion servers/cu/src/domain/readResult.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ export function readResultWith (env) {
*
* So we explicitly set cron to undefined, for posterity
*/
cron: undefined
cron: undefined,
/**
* We want an exact match to this messages evaluation
*/
exact: true
}))
.map(omit(['Memory']))
.map(
Expand Down
29 changes: 26 additions & 3 deletions servers/cu/src/domain/readState.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Resolved, of } from 'hyper-async'
import { Resolved, fromPromise, of } from 'hyper-async'

import { findEvaluationSchema } from './dal.js'
import { loadProcessWith } from './lib/loadProcess.js'
import { loadModuleWith } from './lib/loadModule.js'
import { loadMessagesWith } from './lib/loadMessages.js'
Expand Down Expand Up @@ -28,13 +29,19 @@ export function readStateWith (env) {
const loadModule = loadModuleWith(env)
const evaluate = evaluateWith(env)

return ({ processId, to, ordinate, cron }) => {
const findEvaluation = fromPromise(findEvaluationSchema.implement(env.findEvaluation))

return ({ processId, to, ordinate, cron, exact }) => {
return of({ id: processId, to, ordinate, cron })
.chain(loadProcess)
.chain(res => {
/**
* The exact evaluation (identified by its input messages timestamp)
* was found in the cache, so just return it
* was found in the cache, so just return it.
*
* We perform a similar check below, but checking additionally here
* prevents unecessarily loading the process wasm module and heap,
* a non-trivial boon for system resources
*/
if (res.from &&
res.from === to &&
Expand All @@ -56,6 +63,22 @@ export function readStateWith (env) {
.chain(hydrateMessages)
.chain(loadModule)
.chain(evaluate)
.chain((ctx) => {
/**
* Some upstream apis like readResult need an exact match on the message evaluation,
* and pass the 'exact' flag
*
* If this flag is set, we ensure that by fetching the exact match from the db.
* This hedges against race conditions where multiple requests are resulting in the evaluation
* of the same messages in a process.
*
* Having this should allow readState to always start on the latestEvalutaion, relative to 'to',
* and reduce the chances of unnecessary 409s, due to concurrent evalutions of the same messages,
* across multiple requests.
*/
if (exact) return findEvaluation({ processId, to, ordinate, cron })
return Resolved(ctx)
})
.map((ctx) => ctx.output)
})
}
Expand Down

0 comments on commit 087a47f

Please sign in to comment.