Skip to content

Commit

Permalink
feat(cu): add ordinate derived from nonce to all messages to prevent …
Browse files Browse the repository at this point in the history
…timestamp conflicts of scheduled messages
  • Loading branch information
TillaTheHun0 committed Dec 22, 2023
1 parent 817e3a8 commit 2208c10
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 15 deletions.
4 changes: 4 additions & 0 deletions servers/cu/src/domain/client/ao-su.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => {
logger.tap('transforming message retrieved from the SU %o'),
applySpec({
cron: always(undefined),
/**
* Set the ordinate to the message's nonce value
*/
ordinate: path(['nonce']),
message: applySpec({
Id: path(['message', 'id']),
Signature: path(['message', 'signature']),
Expand Down
13 changes: 11 additions & 2 deletions servers/cu/src/domain/client/pouchdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ const messageHashDocSchema = z.object({
type: z.literal('messageHash')
})

function createEvaluationId ({ processId, timestamp, cron }) {
function createEvaluationId ({ processId, timestamp, ordinate, cron }) {
/**
* transactions can sometimes start with an underscore,
* which is not allowed in PouchDB, so prepend to create
* an _id
*/
return `eval-${[processId, timestamp, cron].filter(isNotNil).join(',')}`
return `eval-${[processId, timestamp, ordinate, cron].filter(isNotNil).join(',')}`
}

function createProcessId ({ processId }) {
Expand All @@ -108,6 +108,7 @@ const toEvaluation = applySpec({
processId: prop('processId'),
messageId: prop('messageId'),
timestamp: prop('timestamp'),
ordinate: prop('ordinate'),
blockHeight: prop('blockHeight'),
cron: prop('cron'),
evaluatedAt: prop('evaluatedAt'),
Expand All @@ -125,6 +126,10 @@ const toEvaluation = applySpec({
* This will cause only string with a given prefix to match a range query
*/
export const COLLATION_SEQUENCE_MAX_CHAR = '\ufff0'
/**
* This technically isn't the smallest char, but it's small enough for our needs
*/
export const COLLATION_SEQUENCE_MIN_CHAR = '^'

export function findProcessWith ({ pouchDb }) {
return ({ processId }) => of(processId)
Expand Down Expand Up @@ -216,6 +221,7 @@ export function findLatestEvaluationWith ({ pouchDb = internalPouchDb }) {
processId: evaluationSchema.shape.processId,
messageId: evaluationSchema.shape.messageId,
timestamp: evaluationSchema.shape.timestamp,
ordinate: evaluationSchema.shape.ordinate,
blockHeight: evaluationSchema.shape.blockHeight,
parent: z.string().min(1),
evaluatedAt: evaluationSchema.shape.evaluatedAt,
Expand Down Expand Up @@ -267,6 +273,7 @@ export function saveEvaluationWith ({ pouchDb, logger: _logger }) {
processId: evaluationSchema.shape.processId,
messageId: evaluationSchema.shape.messageId,
timestamp: evaluationSchema.shape.timestamp,
ordinate: evaluationSchema.shape.ordinate,
blockHeight: evaluationSchema.shape.blockHeight,
cron: evaluationSchema.shape.cron,
parent: z.string().min(1),
Expand Down Expand Up @@ -306,6 +313,7 @@ export function saveEvaluationWith ({ pouchDb, logger: _logger }) {
createEvaluationId({
processId: evaluation.processId,
timestamp: evaluation.timestamp,
ordinate: evaluation.ordinate,
/**
* By appending the cron identifier to the evaluation doc _id,
*
Expand All @@ -316,6 +324,7 @@ export function saveEvaluationWith ({ pouchDb, logger: _logger }) {
processId: prop('processId'),
messageId: prop('messageId'),
timestamp: prop('timestamp'),
ordinate: prop('ordinate'),
blockHeight: prop('blockHeight'),
cron: prop('cron'),
parent: (evaluation) => createProcessId({ processId: evaluation.processId }),
Expand Down
13 changes: 11 additions & 2 deletions servers/cu/src/domain/client/pouchdb.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ describe('pouchdb', () => {
doc: {
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
Expand Down Expand Up @@ -189,6 +190,7 @@ describe('pouchdb', () => {
})

assert.equal(res.timestamp, 1702677252111)
assert.equal(res.ordinate, 1)
assert.equal(res.blockHeight, 1234)
assert.equal(res.processId, 'process-123')
assert.deepStrictEqual(res.output, { Memory, Messages: [{ foo: 'bar' }] })
Expand All @@ -209,6 +211,7 @@ describe('pouchdb', () => {
doc: {
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
Expand Down Expand Up @@ -236,6 +239,7 @@ describe('pouchdb', () => {
})

assert.equal(res.timestamp, 1702677252111)
assert.equal(res.ordinate, 1)
assert.equal(res.blockHeight, 1234)
assert.equal(res.processId, 'process-123')
assert.deepStrictEqual(res.output, { Memory, Messages: [{ foo: 'bar' }] })
Expand Down Expand Up @@ -272,9 +276,10 @@ describe('pouchdb', () => {
const { _attachments, evaluatedAt, ...rest } = evaluationDoc

assert.deepStrictEqual(rest, {
_id: 'eval-process-123,1702677252111',
_id: 'eval-process-123,1702677252111,1',
cron: undefined,
timestamp: 1702677252111,
ordinate: '1',
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
Expand All @@ -299,7 +304,7 @@ describe('pouchdb', () => {

assert.deepStrictEqual(messageIdDoc, {
_id: 'messageHash-deepHash-123',
parent: 'eval-process-123,1702677252111',
parent: 'eval-process-123,1702677252111,1',
type: 'messageHash'
})
return Promise.resolve(true)
Expand All @@ -312,6 +317,7 @@ describe('pouchdb', () => {
await saveEvaluation({
deepHash: 'deepHash-123',
timestamp: 1702677252111,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
Expand Down Expand Up @@ -339,6 +345,7 @@ describe('pouchdb', () => {
await saveEvaluation({
// no deep hash
timestamp: 1702677252111,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
Expand All @@ -354,6 +361,7 @@ describe('pouchdb', () => {
const mockEval = {
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
Expand Down Expand Up @@ -387,6 +395,7 @@ describe('pouchdb', () => {
const mockEval = {
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export function evaluateWith (env) {
* Iterate over the async iterable of messages,
* and evaluate each one
*/
for await (const { cron, message, deepHash, AoGlobal } of ctx.messages) {
for await (const { cron, ordinate, message, deepHash, AoGlobal } of ctx.messages) {
/**
* We skip over forwarded messages (which we've calculated a deepHash for - see hydrateMessages)
* if their deepHash is found in the cache, this prevents duplicate evals
Expand Down Expand Up @@ -223,6 +223,7 @@ export function evaluateWith (env) {
saveEvaluation({
deepHash,
cron,
ordinate,
processId: ctx.id,
messageId: message.Id,
timestamp: message.Timestamp,
Expand Down
13 changes: 13 additions & 0 deletions servers/cu/src/domain/lib/evaluate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ describe('evaluate', () => {
Memory: null,
messages: toAsyncIterable([
{
ordinate: 1,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Expand All @@ -41,6 +42,7 @@ describe('evaluate', () => {
AoGlobal: {}
},
{
ordinate: 1,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Expand Down Expand Up @@ -156,6 +158,7 @@ describe('evaluate', () => {
Memory: null,
messages: toAsyncIterable([
{
ordinate: 1,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Expand All @@ -168,6 +171,7 @@ describe('evaluate', () => {
AoGlobal: {}
},
{
ordinate: 1,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Expand Down Expand Up @@ -215,6 +219,7 @@ describe('evaluate', () => {
Memory: null,
messages: toAsyncIterable([
{
ordinate: 1,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Expand All @@ -228,6 +233,7 @@ describe('evaluate', () => {
AoGlobal: {}
},
{
ordinate: 1,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Expand All @@ -242,6 +248,7 @@ describe('evaluate', () => {
},
// no deep hash
{
ordinate: 1,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Expand Down Expand Up @@ -315,6 +322,7 @@ describe('evaluate', () => {
Memory: Buffer.from('Hello', 'utf-8'),
messages: toAsyncIterable([
{
ordinate: 1,
// Will include an error in error
message: {
Id: 'message-123',
Expand Down Expand Up @@ -364,6 +372,7 @@ describe('evaluate', () => {
Memory: Buffer.from('Hello', 'utf-8'),
messages: toAsyncIterable([
{
ordinate: 1,
// Will intentionally throw from the lua process
message: {
Id: 'message-123',
Expand Down Expand Up @@ -406,6 +415,7 @@ describe('evaluate', () => {
Memory: Buffer.from('Hello', 'utf-8'),
messages: toAsyncIterable([
{
ordinate: 1,
// Will unintentionally throw from the lua contract
message: {
Id: 'message-123',
Expand Down Expand Up @@ -448,6 +458,7 @@ describe('evaluate', () => {
messages: toAsyncIterable([
{
// Will include an error in result.error
ordinate: 1,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Expand All @@ -460,6 +471,7 @@ describe('evaluate', () => {
AoGlobal: {}
},
{
ordinate: 1,
// Will increment a counter in global state
message: {
Id: 'message-123',
Expand All @@ -473,6 +485,7 @@ describe('evaluate', () => {
AoGlobal: {}
},
{
ordinate: 1,
// Will increment a counter in global state
message: {
Id: 'message-123',
Expand Down
19 changes: 13 additions & 6 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ export function cronMessagesBetweenWith ({
*/
const leftBlock = left.block
const rightBlock = right.block
const leftOrdinate = left.ordinate

/**
* Start at left's block height, incrementing 1 block per iteration until we get to right's block height
Expand All @@ -190,6 +191,7 @@ export function cronMessagesBetweenWith ({
logger('Generating Block based Cron Message for cron "%s" at block "%s"', `${i}-${cron.interval}`, curBlock.height)
yield {
cron: `${i}-${cron.interval}`,
ordinate: leftOrdinate,
message: {
Owner: processOwner,
Target: processId,
Expand Down Expand Up @@ -226,6 +228,7 @@ export function cronMessagesBetweenWith ({
logger('Generating Time based Cron Message for cron "%s" at timestamp "%s"', `${i}-${cron.interval}`, curTimestamp)
yield {
cron: `${i}-${cron.interval}`,
ordinate: leftOrdinate,
message: {
Owner: processOwner,
Target: processId,
Expand Down Expand Up @@ -314,8 +317,12 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
* The left most boundary is the origin block of the process -- the current block
* at the the time the process was sent to a SU
* OR the block height and timestamp of the most recently evaluated message
*
* We also initilize the ordinate here, which will be used to "generate" an orderable
* ordinate for generated Cron messages. This value is usually the nonce of the most recent
* schedule message. So in sense, Cron message exists "between" scheduled message nonces
*/
leftMost: { block: ctx.from ? { height: ctx.fromBlockHeight, timestamp: ctx.from } : ctx.block },
leftMost: { ordinate: ctx.ordinate, block: ctx.from ? { height: ctx.fromBlockHeight, timestamp: ctx.from } : ctx.block },
/**
* The right most boundary is always the provided to timestamp
* OR the current block timestamp according to the su
Expand Down Expand Up @@ -362,10 +369,10 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
* from the SU.
*
* Our messages retrieved from the SU are perfect boundaries, as they each have a
* block height and timestamp
* block height and timestamp, as well as a ordinate set to its nonce.
*
* This will allow the CU to generate cron messages with monotonically increasing timestamp and accurate block metadata,
* at least w.r.t the SU's claims.
* This will allow the CU to generate cron messages that orderable in and amongst the scheduled message,
* and with accurate block metadata, at least w.r.t the SU's claims.
*/
$scheduled,
/**
Expand All @@ -378,10 +385,10 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
*
* Since we added our left and right bounds, there should always
* be at least one tuple emitted, which will account for any time
* we have <2 cron messages to use as boundaries.
* we have <2 scheduled messages to use as boundaries.
*
* If our leftMost and rightMost boundary are the only boundaries, this effectively means
* that we have no cron messages to merge and evaluate, and only cron messages to generate
* that we have no scheduled messages to merge and evaluate, and only cron messages to generate
*
* [b1, b2, b3] -> [ [b1, b2], [b2, b3] ]
*/
Expand Down
5 changes: 3 additions & 2 deletions servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ describe('loadMessages', () => {
block: {
height: originHeight + 12,
timestamp: scheduledMessagesStartTime
}
},
ordinate: 1
// AoGlobal,
// message
},
Expand Down Expand Up @@ -422,7 +423,7 @@ describe('loadMessages', () => {
test('should create a unique cron identifier for each generated message', async () => {
assert.equal(
cronMessages.length,
uniqBy((node) => `${node.message.Timestamp},${node.cron}`, cronMessages).length
uniqBy((node) => `${node.message.Timestamp},${node.ordinate},${node.cron}`, cronMessages).length
)
})
})
Expand Down
Loading

0 comments on commit 2208c10

Please sign in to comment.