Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vince juliano/cu boot loader fixes 730 #1012

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions servers/cu/src/domain/lib/hydrateMessages.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { compose as composeStreams, PassThrough, Transform } from 'node:stream'

import { of } from 'hyper-async'
import { mergeRight } from 'ramda'
import { mergeRight, isNil } from 'ramda'
import { z } from 'zod'
import WarpArBundles from 'warp-arbundles'

Expand Down Expand Up @@ -109,7 +109,16 @@ export function maybeMessageIdWith ({ logger }) {
*/
async function calcDataItemDeepHash ({ data, tags, target, anchor }) {
return Promise.resolve()
.then(() => createData(data, signer, { tags, target, anchor }))
/**
* isNil(data) ? '' : data was added to handle the case
* where data is null or undefined. After aop 6, the Data
* field of the first Message is the Data field of the Process
* because the Process is the first Message. Because it is not
* string data in many cases the SU is outputting null.
*
* See: https://github.com/permaweb/ao/issues/730
*/
.then(() => createData(isNil(data) ? '' : data, signer, { tags, target, anchor }))
.then((dataItem) => dataItem.getSignatureData())
.then(bytesToBase64)
}
Expand Down
182 changes: 93 additions & 89 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Transform, compose as composeStreams } from 'node:stream'

import { Resolved, fromPromise, of } from 'hyper-async'
import { T, always, ascend, cond, equals, identity, ifElse, last, length, mergeRight, pipe, prop, reduce, uniqBy } from 'ramda'
import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, mergeRight, pipe, prop, reduce, uniqBy } from 'ramda'
import { z } from 'zod'
import ms from 'ms'

Expand Down Expand Up @@ -369,6 +369,97 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
}
}

export function maybePrependProcessMessage (ctx, logger) {
return async function * ($messages) {
const isColdStart = isNil(ctx.from)

/**
* Generate and emit a message that represents the process itself
* if the Process was started before the aop6 Boot Loader change
*
* It will be the first message evaluated by the module
*/

const messages = $messages[Symbol.asyncIterator]()

/**
* { value: any, done: boolean }
*/
let message = await messages.next()

if (isColdStart) {
const { value, done } = message
/**
* This condition is to handle 3 cases. Before aop6 ECHO Boot loader,
* The first Message in a stream will be an actual Message. But after
* aop6 the first Message is now the process itself, shaped like a Message
*
* As a result, old Processes that were started before the boot loader
* change, can either 1. have no Messages, 2. have the first Message with a tag
* of type Message, as opposed to Process, Or 3. an old Process can have a its
* first Message be a Cron. In these cases on a cold start we need to
* inject the Process as the first Message in the stream, as was done
* prior to the Boot Loader change.
*
* See https://github.com/permaweb/ao/issues/730
*/
if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)
yield {
/**
* Ensure the noSave flag is set, so evaluation does not persist
* this process message
*/
noSave: true,
ordinate: '0',
name: `Process Message ${ctx.id}`,
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Owner: ctx.owner,
/**
* the target of the process message is itself
*/
Target: ctx.id,
Anchor: ctx.anchor,
/**
* Since a process may be spawned from another process,
* the owner may not always be an "end user" wallet,
* but the MU wallet that signed and pushed the spawn.
*
* The MU sets From-Process on any data item it pushes
* on behalf of a process, including spawns.
*
* So we can set From here using the Process tags
* and owner, just like we do for any other message
*/
From: mapFrom({ tags: ctx.tags, owner: ctx.owner }),
Tags: ctx.tags,
Epoch: undefined,
Nonce: undefined,
Timestamp: ctx.block.timestamp,
'Block-Height': ctx.block.height,
Cron: false
},
AoGlobal: {
Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags },
Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags }
}
}
}
}

/**
* Emit the merged stream of Cron and Scheduled Messages
*/
while (!message.done) {
yield message.value
message = await messages.next()
}
}
}

function loadScheduledMessagesWith ({ loadMessages, logger }) {
loadMessages = fromPromise(loadMessagesSchema.implement(loadMessages))

Expand Down Expand Up @@ -606,94 +697,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save
.map($messages => {
return composeStreams(
$messages,
Transform.from(async function * maybeEmitColdStart ($messages) {
const isColdStart = !ctx.from

/**
* Generate and emit a message that represents the process itself
* if the Process was started before the aop6 Boot Loader change
*
* It will be the first message evaluated by the module
*/

const messages = $messages[Symbol.asyncIterator]()

/**
* { value: any, done: boolean }
*/
let message = await messages.next()

if (isColdStart) {
const { value, done } = message
/**
* This condition is to handle 3 cases. Before aop6 ECHO Boot loader,
* The first Message in a stream will be an actual Message. But after
* aop6 the first Message is now the process itself, shaped like a Message
*
* As a result, old Processes that were started before the boot loader
* change, can either 1. have no Messages, 2. have the first Message with a tag
* of type Message, as opposed to Process, Or 3. an old Process can have a its
* first Message be a Cron. In these cases on a cold start we need to
* inject the Process as the first Message in the stream, as was done
* prior to the Boot Loader change.
*
* See https://github.com/permaweb/ao/issues/730
*/
if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)
yield {
/**
* Ensure the noSave flag is set, so evaluation does not persist
* this process message
*/
noSave: true,
ordinate: '^',
name: `Process Message ${ctx.id}`,
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Owner: ctx.owner,
/**
* the target of the process message is itself
*/
Target: ctx.id,
Anchor: ctx.anchor,
/**
* Since a process may be spawned from another process,
* the owner may not always be an "end user" wallet,
* but the MU wallet that signed and pushed the spawn.
*
* The MU sets From-Process on any data item it pushes
* on behalf of a process, including spawns.
*
* So we can set From here using the Process tags
* and owner, just like we do for any other message
*/
From: mapFrom({ tags: ctx.tags, owner: ctx.owner }),
Tags: ctx.tags,
Epoch: undefined,
Nonce: undefined,
Timestamp: ctx.block.timestamp,
'Block-Height': ctx.block.height,
Cron: false
},
AoGlobal: {
Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags },
Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags }
}
}
}
}

/**
* Emit the merged stream of Cron and Scheduled Messages
*/
while (!message.done) {
yield message.value
message = await messages.next()
}
})
Transform.from(maybePrependProcessMessage(ctx, logger))
)
})
.map(messages => ({ messages }))
Expand Down
97 changes: 96 additions & 1 deletion servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import * as assert from 'node:assert'
import ms from 'ms'
import { countBy, uniqBy } from 'ramda'

import { CRON_INTERVAL, parseCrons, isBlockOnCron, isTimestampOnCron, cronMessagesBetweenWith, mergeBlocks } from './loadMessages.js'
import { createTestLogger } from '../logger.js'
import { CRON_INTERVAL, parseCrons, isBlockOnCron, isTimestampOnCron, cronMessagesBetweenWith, mergeBlocks, maybePrependProcessMessage } from './loadMessages.js'
import { Readable } from 'node:stream'

const logger = createTestLogger({ name: 'ao-cu:readState' })

describe('loadMessages', () => {
describe('parseCrons', () => {
Expand Down Expand Up @@ -460,5 +464,96 @@ describe('loadMessages', () => {
})
})

describe('maybePrependProcessMessage', () => {
const ctx = {
id: 'process-123',
tags: [],
owner: 'owner-123',
block: {
timestamp: new Date().getTime(),
height: 123
}
}

describe('should prepend the process message on cold start', () => {
test('if first stream message is not the process', async () => {
const $messages = Readable.from([
{
message: {
Tags: [
{ name: 'Type', value: 'Message' }
]
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
})

test('if the first stream message is a cron message', async () => {
const $messages = Readable.from([
{
message: {
Cron: true,
Tags: [
{ name: 'Type', value: 'Foobar' }
]
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
})

test('if there are no messages', async () => {
const $messages = Readable.from([])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 1)
assert.equal(results[0].name, 'Process Message process-123')
})
})

test('should not prepend the process message if the first stream message is the process', async () => {
const $messages = Readable.from([
{
message: {
Cron: false,
Tags: [
{ name: 'Type', value: 'Process' }
]
}
},
{
message: {
Cron: false,
Tags: [
{ name: 'Type', value: 'Message' }
]
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
})
})

describe.todo('loadMessages', () => {})
})