Skip to content

Commit

Permalink
test(cu): add test for prepending process message #730
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Sep 11, 2024
1 parent 5e3837c commit 6add80a
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 90 deletions.
182 changes: 93 additions & 89 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Transform, compose as composeStreams } from 'node:stream'

import { Resolved, fromPromise, of } from 'hyper-async'
import { T, always, ascend, cond, equals, identity, ifElse, last, length, mergeRight, pipe, prop, reduce, uniqBy } from 'ramda'
import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, mergeRight, pipe, prop, reduce, uniqBy } from 'ramda'
import { z } from 'zod'
import ms from 'ms'

Expand Down Expand Up @@ -369,6 +369,97 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
}
}

export function maybePrependProcessMessage (ctx, logger) {
return async function * ($messages) {
const isColdStart = isNil(ctx.from)

/**
* Generate and emit a message that represents the process itself
* if the Process was started before the aop6 Boot Loader change
*
* It will be the first message evaluated by the module
*/

const messages = $messages[Symbol.asyncIterator]()

/**
* { value: any, done: boolean }
*/
let message = await messages.next()

if (isColdStart) {
const { value, done } = message
/**
* This condition is to handle 3 cases. Before aop6 ECHO Boot loader,
* The first Message in a stream will be an actual Message. But after
* aop6 the first Message is now the process itself, shaped like a Message
*
* As a result, old Processes that were started before the boot loader
* change, can either 1. have no Messages, 2. have the first Message with a tag
* of type Message, as opposed to Process, Or 3. an old Process can have a its
* first Message be a Cron. In these cases on a cold start we need to
* inject the Process as the first Message in the stream, as was done
* prior to the Boot Loader change.
*
* See https://github.com/permaweb/ao/issues/730
*/
if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)
yield {
/**
* Ensure the noSave flag is set, so evaluation does not persist
* this process message
*/
noSave: true,
ordinate: '0',
name: `Process Message ${ctx.id}`,
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Owner: ctx.owner,
/**
* the target of the process message is itself
*/
Target: ctx.id,
Anchor: ctx.anchor,
/**
* Since a process may be spawned from another process,
* the owner may not always be an "end user" wallet,
* but the MU wallet that signed and pushed the spawn.
*
* The MU sets From-Process on any data item it pushes
* on behalf of a process, including spawns.
*
* So we can set From here using the Process tags
* and owner, just like we do for any other message
*/
From: mapFrom({ tags: ctx.tags, owner: ctx.owner }),
Tags: ctx.tags,
Epoch: undefined,
Nonce: undefined,
Timestamp: ctx.block.timestamp,
'Block-Height': ctx.block.height,
Cron: false
},
AoGlobal: {
Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags },
Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags }
}
}
}
}

/**
* Emit the merged stream of Cron and Scheduled Messages
*/
while (!message.done) {
yield message.value
message = await messages.next()
}
}
}

function loadScheduledMessagesWith ({ loadMessages, logger }) {
loadMessages = fromPromise(loadMessagesSchema.implement(loadMessages))

Expand Down Expand Up @@ -606,94 +697,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save
.map($messages => {
return composeStreams(
$messages,
Transform.from(async function * maybeEmitColdStart ($messages) {
const isColdStart = !ctx.from

/**
* Generate and emit a message that represents the process itself
* if the Process was started before the aop6 Boot Loader change
*
* It will be the first message evaluated by the module
*/

const messages = $messages[Symbol.asyncIterator]()

/**
* { value: any, done: boolean }
*/
let message = await messages.next()

if (isColdStart) {
const { value, done } = message
/**
* This condition is to handle 3 cases. Before aop6 ECHO Boot loader,
* The first Message in a stream will be an actual Message. But after
* aop6 the first Message is now the process itself, shaped like a Message
*
* As a result, old Processes that were started before the boot loader
* change, can either 1. have no Messages, 2. have the first Message with a tag
* of type Message, as opposed to Process, Or 3. an old Process can have a its
* first Message be a Cron. In these cases on a cold start we need to
* inject the Process as the first Message in the stream, as was done
* prior to the Boot Loader change.
*
* See https://github.com/permaweb/ao/issues/730
*/
if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)
yield {
/**
* Ensure the noSave flag is set, so evaluation does not persist
* this process message
*/
noSave: true,
ordinate: '^',
name: `Process Message ${ctx.id}`,
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Owner: ctx.owner,
/**
* the target of the process message is itself
*/
Target: ctx.id,
Anchor: ctx.anchor,
/**
* Since a process may be spawned from another process,
* the owner may not always be an "end user" wallet,
* but the MU wallet that signed and pushed the spawn.
*
* The MU sets From-Process on any data item it pushes
* on behalf of a process, including spawns.
*
* So we can set From here using the Process tags
* and owner, just like we do for any other message
*/
From: mapFrom({ tags: ctx.tags, owner: ctx.owner }),
Tags: ctx.tags,
Epoch: undefined,
Nonce: undefined,
Timestamp: ctx.block.timestamp,
'Block-Height': ctx.block.height,
Cron: false
},
AoGlobal: {
Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags },
Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags }
}
}
}
}

/**
* Emit the merged stream of Cron and Scheduled Messages
*/
while (!message.done) {
yield message.value
message = await messages.next()
}
})
Transform.from(maybePrependProcessMessage(ctx, logger))
)
})
.map(messages => ({ messages }))
Expand Down
97 changes: 96 additions & 1 deletion servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import * as assert from 'node:assert'
import ms from 'ms'
import { countBy, uniqBy } from 'ramda'

import { CRON_INTERVAL, parseCrons, isBlockOnCron, isTimestampOnCron, cronMessagesBetweenWith, mergeBlocks } from './loadMessages.js'
import { createTestLogger } from '../logger.js'
import { CRON_INTERVAL, parseCrons, isBlockOnCron, isTimestampOnCron, cronMessagesBetweenWith, mergeBlocks, maybePrependProcessMessage } from './loadMessages.js'
import { Readable } from 'node:stream'

const logger = createTestLogger({ name: 'ao-cu:readState' })

describe('loadMessages', () => {
describe('parseCrons', () => {
Expand Down Expand Up @@ -460,5 +464,96 @@ describe('loadMessages', () => {
})
})

describe('maybePrependProcessMessage', () => {
const ctx = {
id: 'process-123',
tags: [],
owner: 'owner-123',
block: {
timestamp: new Date().getTime(),
height: 123
}
}

describe('should prepend the process message on cold start', () => {
test('if first stream message is not the process', async () => {
const $messages = Readable.from([
{
message: {
Tags: [
{ name: 'Type', value: 'Message' }
]
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
})

test('if the first stream message is a cron message', async () => {
const $messages = Readable.from([
{
message: {
Cron: true,
Tags: [
{ name: 'Type', value: 'Foobar' }
]
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
})

test('if there are no messages', async () => {
const $messages = Readable.from([])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 1)
assert.equal(results[0].name, 'Process Message process-123')
})
})

test('should not prepend the process message if the first stream message is the process', async () => {
const $messages = Readable.from([
{
message: {
Cron: false,
Tags: [
{ name: 'Type', value: 'Process' }
]
}
},
{
message: {
Cron: false,
Tags: [
{ name: 'Type', value: 'Message' }
]
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
})
})

describe.todo('loadMessages', () => {})
})

0 comments on commit 6add80a

Please sign in to comment.