From 022ac40e39c4f67c0579936c97b139a5f103af06 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Mon, 14 Oct 2024 20:02:28 +0000 Subject: [PATCH 1/2] chore(cu): correct startup log for trusted checkpoint owners --- servers/cu/src/bootstrap.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servers/cu/src/bootstrap.js b/servers/cu/src/bootstrap.js index ed9489246..360a4d669 100644 --- a/servers/cu/src/bootstrap.js +++ b/servers/cu/src/bootstrap.js @@ -200,7 +200,7 @@ export const createApis = async (ctx) => { ctx.logger('Process File Checkpoint creation is set to "%s"', !ctx.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION) ctx.logger('Ignoring Arweave Checkpoints for processes [ %s ]', ctx.PROCESS_IGNORE_ARWEAVE_CHECKPOINTS.join(', ')) ctx.logger('Ignoring Arweave Checkpoints [ %s ]', ctx.IGNORE_ARWEAVE_CHECKPOINTS.join(', ')) - ctx.logger('Trusting Arweave Checkpoints [ %s ]', ctx.PROCESS_CHECKPOINT_TRUSTED_OWNERS.join(', ')) + ctx.logger('Trusting Arweave Checkpoint Owners [ %s ]', ctx.PROCESS_CHECKPOINT_TRUSTED_OWNERS.join(', ')) ctx.logger('Allowing only process owners [ %s ]', ctx.ALLOW_OWNERS.join(', ')) ctx.logger('Restricting processes [ %s ]', ctx.RESTRICT_PROCESSES.join(', ')) ctx.logger('Allowing only processes [ %s ]', ctx.ALLOW_PROCESSES.join(', ')) From 3491a32c5919e0b370b55ce90c9760ab8b3f0893 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Mon, 14 Oct 2024 20:09:41 +0000 Subject: [PATCH 2/2] fix(cu): circuit breaker should wrap each single page load, not all them at once --- servers/cu/src/effects/ao-block.js | 167 +++++++++++++++-------------- 1 file changed, 86 insertions(+), 81 deletions(-) diff --git a/servers/cu/src/effects/ao-block.js b/servers/cu/src/effects/ao-block.js index ff737ed89..ce970c726 100644 --- a/servers/cu/src/effects/ao-block.js +++ b/servers/cu/src/effects/ao-block.js @@ -140,71 +140,80 @@ export function loadBlocksMetaWith ({ } ` + async function fetchPage ({ min, maxTimestamp }) { + return Promise.resolve({ min, limit: pageSize }) + .then(variables => { + logger( + 'Loading page of up to %s blocks after height %s up to timestamp %s', + pageSize, + min, + maxTimestamp + ) + return variables + }) + .then((variables) => { + return backoff( + () => fetch(GRAPHQL_URL, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + query: GET_BLOCKS_QUERY, + variables + }) + }).then(okRes).catch(async (e) => { + logger( + 'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'', + min, + maxTimestamp + ) + throw new Error(`Can not communicate with gateway to retrieve block metadata: ${await strFromFetchError(e)}`) + }), + { maxRetries: 2, delay: 300, log: logger, name: `loadBlockMeta(${JSON.stringify({ newMin: min, maxTimestamp })})` } + ) + }) + .then(async (res) => { + if (res.ok) return res.json() + logger( + 'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'', + min, + maxTimestamp + ) + throw new Error(`${res.status}: ${await res.text()}`) + }) + .then(path(['data', 'blocks'])) + .then((res) => ({ ...res, maxTimestamp })) + } + + /** + * Fetching each page is wrapped in a circuit breaker, so as to rate limit + * and hedge against timeouts + */ + const circuitBreaker = new CircuitBreaker(fetchPage, breakerOptions) + async function fetchAllPages ({ min, maxTimestamp }) { /** - * Need to convert to seconds, since block timestamp - * from arweave is in seconds - */ + * Need to convert to seconds, since block timestamp + * from arweave is in seconds + */ maxTimestamp = Math.floor(maxTimestamp / 1000) - async function fetchPage ({ min: newMin, maxTimestamp }) { - // deno-fmt-ignore-start - return Promise.resolve({ min: newMin, limit: pageSize }) - .then(variables => { - logger( - 'Loading page of up to %s blocks after height %s up to timestamp %s', - pageSize, - newMin, - maxTimestamp - ) - return variables - }) - .then((variables) => { - return backoff( - () => fetch(GRAPHQL_URL, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - query: GET_BLOCKS_QUERY, - variables - }) - }).then(okRes).catch(async (e) => { - logger( - 'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'', - newMin, - maxTimestamp - ) - throw new Error(`Can not communicate with gateway to retrieve block metadata: ${await strFromFetchError(e)}`) - }), - { maxRetries: 2, delay: 300, log: logger, name: `loadBlockMeta(${JSON.stringify({ newMin, maxTimestamp })})` } - ) - }) - .then(async (res) => { - if (res.ok) return res.json() - logger( - 'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'', - newMin, - maxTimestamp - ) - throw new Error(`${res.status}: ${await res.text()}`) - }) - .then(path(['data', 'blocks'])) - .then((res) => ({ ...res, maxTimestamp })) - } - async function maybeFetchNext ({ pageInfo, edges, maxTimestamp }) { /** - * HACK to incrementally fetch the correct range of blocks with only - * a timestamp as the right most limit. - * - * (we no longer have a sortKey to extract a block height from) - * - * If the last block has a timestamp greater than the maxTimestamp - * then we're done. - * - * We then slice off the results in the page, not within our range. - * So we overfetch a little on the final page, but at MOST pageSize - 1 - */ + * Base cases: + * - the maxTimestamp has been surpassed by the last element in the latest page + * - there is no next page + * + * HACK to incrementally fetch the correct range of blocks with only + * a timestamp as the right most limit. + * + * (we no longer have a sortKey to extract a block height from) + * + * If the last block has a timestamp greater than the maxTimestamp + * then we're done. + * + * We then slice off the results in the page, not within our range. + * So we overfetch a little on the final page, but at MOST pageSize - 1 + */ const surpassedMaxTimestampIdx = edges.findIndex( pathSatisfies( (timestamp) => timestamp > maxTimestamp, @@ -212,17 +221,12 @@ export function loadBlocksMetaWith ({ ) ) if (surpassedMaxTimestampIdx !== -1) return { pageInfo, edges: edges.slice(0, surpassedMaxTimestampIdx) } - if (!pageInfo.hasNextPage) return { pageInfo, edges } - /** - * Either have reached the end and resolve, - * or fetch the next page and recurse - */ return Promise.resolve({ /** - * The next page will start on the next block - */ + * The next page will start on the next block + */ min: pipe( last, path(['node', 'height']), @@ -230,38 +234,39 @@ export function loadBlocksMetaWith ({ )(edges), maxTimestamp }) - .then(fetchPage) + .then((nextArgs) => circuitBreaker.fire(nextArgs)) .then(maybeFetchNext) + /** + * Recursively concatenate all edges + */ .then(({ pageInfo, edges: e }) => ({ pageInfo, edges: edges.concat(e) })) } /** - * Start with the first page, then keep going - */ - return fetchPage({ min, maxTimestamp }).then(maybeFetchNext) + * Start with the first page, then keep going + */ + return circuitBreaker.fire({ min, maxTimestamp }) + .then(maybeFetchNext) + .catch((e) => { + if (e.message === 'Breaker is open') throw new Error('Can not communicate with gateway to retrieve block metadata (breaker is open)') + else throw e + }) } - const circuitBreaker = new CircuitBreaker(fetchAllPages, breakerOptions) - return (args) => of(args) .chain(fromPromise(({ min, maxTimestamp }) => - circuitBreaker.fire({ min, maxTimestamp }) + fetchAllPages({ min, maxTimestamp }) .then(prop('edges')) .then(pluck('node')) .then(map(block => ({ ...block, /** - * Timestamp from gateway is in seconds, - * but we need milliseconds - */ + * Timestamp from gateway is in seconds, + * but we need milliseconds + */ timestamp: block.timestamp * 1000 }))) - .catch((e) => { - if (e.message === 'Breaker is open') throw new Error('Can not communicate with gateway to retrieve block metadata (breaker is open)') - else throw e - }) - // .then(logger.tap('Loaded blocks meta after height %s up to timestamp %s', min, maxTimestamp)) )) .toPromise() }