diff --git a/servers/cu/src/domain/lib/hydrateMessages.js b/servers/cu/src/domain/lib/hydrateMessages.js index f5a31a3f2..a00165999 100644 --- a/servers/cu/src/domain/lib/hydrateMessages.js +++ b/servers/cu/src/domain/lib/hydrateMessages.js @@ -1,7 +1,7 @@ import { compose as composeStreams, PassThrough, Transform } from 'node:stream' import { of } from 'hyper-async' -import { mergeRight } from 'ramda' +import { mergeRight, isNil } from 'ramda' import { z } from 'zod' import WarpArBundles from 'warp-arbundles' @@ -109,7 +109,16 @@ export function maybeMessageIdWith ({ logger }) { */ async function calcDataItemDeepHash ({ data, tags, target, anchor }) { return Promise.resolve() - .then(() => createData(data, signer, { tags, target, anchor })) + /** + * isNil(data) ? '' : data was added to handle the case + * where data is null or undefined. After aop 6, the Data + * field of the first Message is the Data field of the Process + * because the Process is the first Message. Because it is not + * string data in many cases the SU is outputting null. + * + * See: https://github.com/permaweb/ao/issues/730 + */ + .then(() => createData(isNil(data) ? '' : data, signer, { tags, target, anchor })) .then((dataItem) => dataItem.getSignatureData()) .then(bytesToBase64) } diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index da5715449..c6ae74009 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -1,7 +1,7 @@ import { Transform, compose as composeStreams } from 'node:stream' import { Resolved, fromPromise, of } from 'hyper-async' -import { T, always, ascend, cond, equals, identity, ifElse, last, length, mergeRight, pipe, prop, reduce, uniqBy } from 'ramda' +import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, mergeRight, pipe, prop, reduce, uniqBy } from 'ramda' import { z } from 'zod' import ms from 'ms' @@ -369,6 +369,97 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) { } } +export function maybePrependProcessMessage (ctx, logger) { + return async function * ($messages) { + const isColdStart = isNil(ctx.from) + + /** + * Generate and emit a message that represents the process itself + * if the Process was started before the aop6 Boot Loader change + * + * It will be the first message evaluated by the module + */ + + const messages = $messages[Symbol.asyncIterator]() + + /** + * { value: any, done: boolean } + */ + let message = await messages.next() + + if (isColdStart) { + const { value, done } = message + /** + * This condition is to handle 3 cases. Before aop6 ECHO Boot loader, + * The first Message in a stream will be an actual Message. But after + * aop6 the first Message is now the process itself, shaped like a Message + * + * As a result, old Processes that were started before the boot loader + * change, can either 1. have no Messages, 2. have the first Message with a tag + * of type Message, as opposed to Process, Or 3. an old Process can have a its + * first Message be a Cron. In these cases on a cold start we need to + * inject the Process as the first Message in the stream, as was done + * prior to the Boot Loader change. + * + * See https://github.com/permaweb/ao/issues/730 + */ + if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) { + logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id) + yield { + /** + * Ensure the noSave flag is set, so evaluation does not persist + * this process message + */ + noSave: true, + ordinate: '0', + name: `Process Message ${ctx.id}`, + message: { + Id: ctx.id, + Signature: ctx.signature, + Data: ctx.data, + Owner: ctx.owner, + /** + * the target of the process message is itself + */ + Target: ctx.id, + Anchor: ctx.anchor, + /** + * Since a process may be spawned from another process, + * the owner may not always be an "end user" wallet, + * but the MU wallet that signed and pushed the spawn. + * + * The MU sets From-Process on any data item it pushes + * on behalf of a process, including spawns. + * + * So we can set From here using the Process tags + * and owner, just like we do for any other message + */ + From: mapFrom({ tags: ctx.tags, owner: ctx.owner }), + Tags: ctx.tags, + Epoch: undefined, + Nonce: undefined, + Timestamp: ctx.block.timestamp, + 'Block-Height': ctx.block.height, + Cron: false + }, + AoGlobal: { + Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags }, + Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags } + } + } + } + } + + /** + * Emit the merged stream of Cron and Scheduled Messages + */ + while (!message.done) { + yield message.value + message = await messages.next() + } + } +} + function loadScheduledMessagesWith ({ loadMessages, logger }) { loadMessages = fromPromise(loadMessagesSchema.implement(loadMessages)) @@ -606,94 +697,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save .map($messages => { return composeStreams( $messages, - Transform.from(async function * maybeEmitColdStart ($messages) { - const isColdStart = !ctx.from - - /** - * Generate and emit a message that represents the process itself - * if the Process was started before the aop6 Boot Loader change - * - * It will be the first message evaluated by the module - */ - - const messages = $messages[Symbol.asyncIterator]() - - /** - * { value: any, done: boolean } - */ - let message = await messages.next() - - if (isColdStart) { - const { value, done } = message - /** - * This condition is to handle 3 cases. Before aop6 ECHO Boot loader, - * The first Message in a stream will be an actual Message. But after - * aop6 the first Message is now the process itself, shaped like a Message - * - * As a result, old Processes that were started before the boot loader - * change, can either 1. have no Messages, 2. have the first Message with a tag - * of type Message, as opposed to Process, Or 3. an old Process can have a its - * first Message be a Cron. In these cases on a cold start we need to - * inject the Process as the first Message in the stream, as was done - * prior to the Boot Loader change. - * - * See https://github.com/permaweb/ao/issues/730 - */ - if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) { - logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id) - yield { - /** - * Ensure the noSave flag is set, so evaluation does not persist - * this process message - */ - noSave: true, - ordinate: '^', - name: `Process Message ${ctx.id}`, - message: { - Id: ctx.id, - Signature: ctx.signature, - Data: ctx.data, - Owner: ctx.owner, - /** - * the target of the process message is itself - */ - Target: ctx.id, - Anchor: ctx.anchor, - /** - * Since a process may be spawned from another process, - * the owner may not always be an "end user" wallet, - * but the MU wallet that signed and pushed the spawn. - * - * The MU sets From-Process on any data item it pushes - * on behalf of a process, including spawns. - * - * So we can set From here using the Process tags - * and owner, just like we do for any other message - */ - From: mapFrom({ tags: ctx.tags, owner: ctx.owner }), - Tags: ctx.tags, - Epoch: undefined, - Nonce: undefined, - Timestamp: ctx.block.timestamp, - 'Block-Height': ctx.block.height, - Cron: false - }, - AoGlobal: { - Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags }, - Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags } - } - } - } - } - - /** - * Emit the merged stream of Cron and Scheduled Messages - */ - while (!message.done) { - yield message.value - message = await messages.next() - } - }) + Transform.from(maybePrependProcessMessage(ctx, logger)) ) }) .map(messages => ({ messages })) diff --git a/servers/cu/src/domain/lib/loadMessages.test.js b/servers/cu/src/domain/lib/loadMessages.test.js index 7cba34902..8c92542e2 100644 --- a/servers/cu/src/domain/lib/loadMessages.test.js +++ b/servers/cu/src/domain/lib/loadMessages.test.js @@ -5,7 +5,11 @@ import * as assert from 'node:assert' import ms from 'ms' import { countBy, uniqBy } from 'ramda' -import { CRON_INTERVAL, parseCrons, isBlockOnCron, isTimestampOnCron, cronMessagesBetweenWith, mergeBlocks } from './loadMessages.js' +import { createTestLogger } from '../logger.js' +import { CRON_INTERVAL, parseCrons, isBlockOnCron, isTimestampOnCron, cronMessagesBetweenWith, mergeBlocks, maybePrependProcessMessage } from './loadMessages.js' +import { Readable } from 'node:stream' + +const logger = createTestLogger({ name: 'ao-cu:readState' }) describe('loadMessages', () => { describe('parseCrons', () => { @@ -460,5 +464,96 @@ describe('loadMessages', () => { }) }) + describe('maybePrependProcessMessage', () => { + const ctx = { + id: 'process-123', + tags: [], + owner: 'owner-123', + block: { + timestamp: new Date().getTime(), + height: 123 + } + } + + describe('should prepend the process message on cold start', () => { + test('if first stream message is not the process', async () => { + const $messages = Readable.from([ + { + message: { + Tags: [ + { name: 'Type', value: 'Message' } + ] + } + } + ]) + const $merged = maybePrependProcessMessage(ctx, logger)($messages) + + const results = [] + for await (const m of $merged) results.push(m) + + assert.equal(results.length, 2) + assert.equal(results[0].name, 'Process Message process-123') + }) + + test('if the first stream message is a cron message', async () => { + const $messages = Readable.from([ + { + message: { + Cron: true, + Tags: [ + { name: 'Type', value: 'Foobar' } + ] + } + } + ]) + const $merged = maybePrependProcessMessage(ctx, logger)($messages) + + const results = [] + for await (const m of $merged) results.push(m) + + assert.equal(results.length, 2) + assert.equal(results[0].name, 'Process Message process-123') + }) + + test('if there are no messages', async () => { + const $messages = Readable.from([]) + const $merged = maybePrependProcessMessage(ctx, logger)($messages) + + const results = [] + for await (const m of $merged) results.push(m) + + assert.equal(results.length, 1) + assert.equal(results[0].name, 'Process Message process-123') + }) + }) + + test('should not prepend the process message if the first stream message is the process', async () => { + const $messages = Readable.from([ + { + message: { + Cron: false, + Tags: [ + { name: 'Type', value: 'Process' } + ] + } + }, + { + message: { + Cron: false, + Tags: [ + { name: 'Type', value: 'Message' } + ] + } + } + ]) + const $merged = maybePrependProcessMessage(ctx, logger)($messages) + + const results = [] + for await (const m of $merged) results.push(m) + + assert.equal(results.length, 2) + }) + }) + describe.todo('loadMessages', () => {}) })