Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cu): circuit breaker should wrap each single page load, not all them at once #1036

Merged
merged 2 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ export const createApis = async (ctx) => {
ctx.logger('Process File Checkpoint creation is set to "%s"', !ctx.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION)
ctx.logger('Ignoring Arweave Checkpoints for processes [ %s ]', ctx.PROCESS_IGNORE_ARWEAVE_CHECKPOINTS.join(', '))
ctx.logger('Ignoring Arweave Checkpoints [ %s ]', ctx.IGNORE_ARWEAVE_CHECKPOINTS.join(', '))
ctx.logger('Trusting Arweave Checkpoints [ %s ]', ctx.PROCESS_CHECKPOINT_TRUSTED_OWNERS.join(', '))
ctx.logger('Trusting Arweave Checkpoint Owners [ %s ]', ctx.PROCESS_CHECKPOINT_TRUSTED_OWNERS.join(', '))
ctx.logger('Allowing only process owners [ %s ]', ctx.ALLOW_OWNERS.join(', '))
ctx.logger('Restricting processes [ %s ]', ctx.RESTRICT_PROCESSES.join(', '))
ctx.logger('Allowing only processes [ %s ]', ctx.ALLOW_PROCESSES.join(', '))
Expand Down
167 changes: 86 additions & 81 deletions servers/cu/src/effects/ao-block.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,128 +140,133 @@ export function loadBlocksMetaWith ({
}
`

async function fetchPage ({ min, maxTimestamp }) {
return Promise.resolve({ min, limit: pageSize })
.then(variables => {
logger(
'Loading page of up to %s blocks after height %s up to timestamp %s',
pageSize,
min,
maxTimestamp
)
return variables
})
.then((variables) => {
return backoff(
() => fetch(GRAPHQL_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
query: GET_BLOCKS_QUERY,
variables
})
}).then(okRes).catch(async (e) => {
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
min,
maxTimestamp
)
throw new Error(`Can not communicate with gateway to retrieve block metadata: ${await strFromFetchError(e)}`)
}),
{ maxRetries: 2, delay: 300, log: logger, name: `loadBlockMeta(${JSON.stringify({ newMin: min, maxTimestamp })})` }
)
})
.then(async (res) => {
if (res.ok) return res.json()
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
min,
maxTimestamp
)
throw new Error(`${res.status}: ${await res.text()}`)
})
.then(path(['data', 'blocks']))
.then((res) => ({ ...res, maxTimestamp }))
}

/**
* Fetching each page is wrapped in a circuit breaker, so as to rate limit
* and hedge against timeouts
*/
const circuitBreaker = new CircuitBreaker(fetchPage, breakerOptions)

async function fetchAllPages ({ min, maxTimestamp }) {
/**
* Need to convert to seconds, since block timestamp
* from arweave is in seconds
*/
* Need to convert to seconds, since block timestamp
* from arweave is in seconds
*/
maxTimestamp = Math.floor(maxTimestamp / 1000)

async function fetchPage ({ min: newMin, maxTimestamp }) {
// deno-fmt-ignore-start
return Promise.resolve({ min: newMin, limit: pageSize })
.then(variables => {
logger(
'Loading page of up to %s blocks after height %s up to timestamp %s',
pageSize,
newMin,
maxTimestamp
)
return variables
})
.then((variables) => {
return backoff(
() => fetch(GRAPHQL_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
query: GET_BLOCKS_QUERY,
variables
})
}).then(okRes).catch(async (e) => {
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
newMin,
maxTimestamp
)
throw new Error(`Can not communicate with gateway to retrieve block metadata: ${await strFromFetchError(e)}`)
}),
{ maxRetries: 2, delay: 300, log: logger, name: `loadBlockMeta(${JSON.stringify({ newMin, maxTimestamp })})` }
)
})
.then(async (res) => {
if (res.ok) return res.json()
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
newMin,
maxTimestamp
)
throw new Error(`${res.status}: ${await res.text()}`)
})
.then(path(['data', 'blocks']))
.then((res) => ({ ...res, maxTimestamp }))
}

async function maybeFetchNext ({ pageInfo, edges, maxTimestamp }) {
/**
* HACK to incrementally fetch the correct range of blocks with only
* a timestamp as the right most limit.
*
* (we no longer have a sortKey to extract a block height from)
*
* If the last block has a timestamp greater than the maxTimestamp
* then we're done.
*
* We then slice off the results in the page, not within our range.
* So we overfetch a little on the final page, but at MOST pageSize - 1
*/
* Base cases:
* - the maxTimestamp has been surpassed by the last element in the latest page
* - there is no next page
*
* HACK to incrementally fetch the correct range of blocks with only
* a timestamp as the right most limit.
*
* (we no longer have a sortKey to extract a block height from)
*
* If the last block has a timestamp greater than the maxTimestamp
* then we're done.
*
* We then slice off the results in the page, not within our range.
* So we overfetch a little on the final page, but at MOST pageSize - 1
*/
const surpassedMaxTimestampIdx = edges.findIndex(
pathSatisfies(
(timestamp) => timestamp > maxTimestamp,
['node', 'timestamp']
)
)
if (surpassedMaxTimestampIdx !== -1) return { pageInfo, edges: edges.slice(0, surpassedMaxTimestampIdx) }

if (!pageInfo.hasNextPage) return { pageInfo, edges }

/**
* Either have reached the end and resolve,
* or fetch the next page and recurse
*/
return Promise.resolve({
/**
* The next page will start on the next block
*/
* The next page will start on the next block
*/
min: pipe(
last,
path(['node', 'height']),
height => height + 1
)(edges),
maxTimestamp
})
.then(fetchPage)
.then((nextArgs) => circuitBreaker.fire(nextArgs))
.then(maybeFetchNext)
/**
* Recursively concatenate all edges
*/
.then(({ pageInfo, edges: e }) => ({ pageInfo, edges: edges.concat(e) }))
}

/**
* Start with the first page, then keep going
*/
return fetchPage({ min, maxTimestamp }).then(maybeFetchNext)
* Start with the first page, then keep going
*/
return circuitBreaker.fire({ min, maxTimestamp })
.then(maybeFetchNext)
.catch((e) => {
if (e.message === 'Breaker is open') throw new Error('Can not communicate with gateway to retrieve block metadata (breaker is open)')
else throw e
})
}

const circuitBreaker = new CircuitBreaker(fetchAllPages, breakerOptions)

return (args) =>
of(args)
.chain(fromPromise(({ min, maxTimestamp }) =>
circuitBreaker.fire({ min, maxTimestamp })
fetchAllPages({ min, maxTimestamp })
.then(prop('edges'))
.then(pluck('node'))
.then(map(block => ({
...block,
/**
* Timestamp from gateway is in seconds,
* but we need milliseconds
*/
* Timestamp from gateway is in seconds,
* but we need milliseconds
*/
timestamp: block.timestamp * 1000
})))
.catch((e) => {
if (e.message === 'Breaker is open') throw new Error('Can not communicate with gateway to retrieve block metadata (breaker is open)')
else throw e
})
// .then(logger.tap('Loaded blocks meta after height %s up to timestamp %s', min, maxTimestamp))
))
.toPromise()
}