Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Fetch State to allow dynamically manage partitions #405

Merged
merged 23 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions codegenerator/cli/npm/envio/src/Utils.res
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,13 @@ Helper to check if a value exists in an array
Index > array length or < 0 results in a copy of the array
*/
let removeAtIndex = (array, index) => {
array
->Js.Array2.slice(~start=0, ~end_=index)
->Js.Array2.concat(array->Js.Array2.sliceFrom(index + 1))
if index < 0 {
array->Array.copy
} else {
array
->Js.Array2.slice(~start=0, ~end_=index)
->Js.Array2.concat(array->Js.Array2.sliceFrom(index + 1))
}
}

let last = (arr: array<'a>): option<'a> => arr->Belt.Array.get(arr->Array.length - 1)
Expand Down
8 changes: 5 additions & 3 deletions codegenerator/cli/templates/static/codegen/src/Benchmark.res
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,21 @@ let addBlockRangeFetched = (
~fromBlock,
~toBlock,
~numEvents,
~partitionId,
~numAddresses,
~queryName
) => {
let group = `BlockRangeFetched Summary for Chain ${chainId->Belt.Int.toString} Partition ${partitionId}`
let group = `BlockRangeFetched Summary for Chain ${chainId->Belt.Int.toString} ${queryName}`
let add = (label, value) => data->Data.addSummaryData(~group, ~label, ~value=Utils.magic(value))

add("Total Time Elapsed (ms)", totalTimeElapsed)
add("Parsing Time Elapsed (ms)", parsingTimeElapsed)
add("Page Fetch Time (ms)", pageFetchTime)
add("Num Events", numEvents)
add("Num Addresses", numAddresses)
add("Block Range Size", toBlock - fromBlock)

data->Data.incrementMillis(
~label=`Total Time Fetching Chain ${chainId->Belt.Int.toString} Partition ${partitionId}`,
~label=`Total Time Fetching Chain ${chainId->Belt.Int.toString} ${queryName}`,
~amount=totalTimeElapsed,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,14 @@ let checkContractIsInCurrentRegistrations = (
~dynamicContractRegistrations: option<dynamicContractRegistrations>,
~chain,
~contractAddress,
~contractType,
~contractType as _,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this field? Or add todo

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do in the next PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool just don't want to forget about it 👍🏼

) => {
switch dynamicContractRegistrations {
| Some(dynamicContracts) =>
dynamicContracts.dynamicContractsByChain
->Utils.Dict.dangerouslyGetNonOption(chain->ChainMap.Chain.toString)
->Option.mapWithDefault(false, dcs =>
dcs->Array.some(dc =>
dc.contractType == contractType && dc.contractAddress == contractAddress
)
dcs->Array.some(dc => dc.contractAddress == contractAddress)
)

| None => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ module PartitionBlockFetched = {
}
}

// TODO: implement this metric that updates in batches, currently unused
let processedUntilHeight = PromClient.Gauge.makeGauge({
"name": "chain_block_height_processed",
"help": "Block height processed by indexer",
Expand Down Expand Up @@ -277,7 +276,7 @@ let setProcessedUntilHeight = (~blockNumber, ~chain) => {
}

let setFetchedEventsUntilHeight = (~blockNumber, ~chain) => {
processedUntilHeight
fetchedEventsUntilHeight
->PromClient.Gauge.labels({"chainId": chain->ChainMap.Chain.toString})
->PromClient.Gauge.set(blockNumber)
}
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ type batchRes = {
let createBatch = (self: t, ~maxBatchSize: int, ~onlyBelowReorgThreshold: bool) => {
let refTime = Hrtime.makeTimer()

let {arbitraryEventQueue, chainFetchers} = self
let {arbitraryEventQueue} = self
//Make a copy of the queues and fetch states since we are going to mutate them
let arbitraryEventQueue = arbitraryEventQueue->Array.copy
let fetchStatesMap = self->getFetchStateWithData(~shouldDeepCopy=true)
Expand All @@ -409,15 +409,23 @@ let createBatch = (self: t, ~maxBatchSize: int, ~onlyBelowReorgThreshold: bool)
~onlyBelowReorgThreshold,
)

// Needed to recalculate the computed queue sizes
let fetchStatesMap = fetchStatesMap->ChainMap.map(v => {
{
...v,
partitionedFetchState: v.partitionedFetchState->PartitionedFetchState.syncStateOnQueueUpdate,
}
})

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be done inside getFetchStateWithData function on the shouldDeepCopy flag or with a different flag.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During shouldDeepCopy, we don't know the number of events taken from the queue to batch, so we can't correctly update the state. I was actually thinking of changing the way how the batch is created so the part of the code is more reliable, but I scoped it out since this was not mandatory for the code to work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, well what should probably happen is the popItem function should decrement the queue size. But all good

let batchSize = batch->Array.length

let val = if batchSize > 0 {
let fetchedEventsBuffer =
chainFetchers
->ChainMap.values
->Array.map(fetcher => (
fetcher.chainConfig.chain->ChainMap.Chain.toString,
fetcher.partitionedFetchState->PartitionedFetchState.queueSize,
fetchStatesMap
->ChainMap.entries
->Array.map(((chain, v)) => (
chain->ChainMap.Chain.toString,
v.partitionedFetchState->PartitionedFetchState.queueSize,
))
->Array.concat([("arbitrary", self.arbitraryEventQueue->Array.length)])
->Js.Dict.fromArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ type t = {
isFetchingAtHead: bool,
maxAddrInPartition: int,
batchSize: int,
firstEventBlockNumber: option<int>,
// Fields computed by updateInternal
latestFullyFetchedBlock: blockNumberAndTimestamp,
queueSize: int,
firstEventBlockNumber: option<int>,
}

let shallowCopyRegister = (register: register) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,6 @@ let handlePartitionQueryResponse = (
} = response
let {lastBlockScannedData} = reorgGuard

let partitionId = query->FetchState.queryPartitionId

if Env.Benchmark.shouldSaveData {
Benchmark.addBlockRangeFetched(
~totalTimeElapsed=stats.totalTimeElapsed,
Expand All @@ -334,7 +332,15 @@ let handlePartitionQueryResponse = (
~fromBlock=fromBlockQueried,
~toBlock=latestFetchedBlockNumber,
~numEvents=parsedQueueItems->Array.length,
~partitionId,
~numAddresses=switch query {
| PartitionQuery({contractAddressMapping})
| MergeQuery({contractAddressMapping}) => contractAddressMapping
}->ContractAddressingMap.addressCount,
~queryName=switch query {
| PartitionQuery({partitionId}) => `Partition ${partitionId}`
// Group all merge queries into a single summary
| MergeQuery(_) => `Merge Query`
},
)
}

Expand Down Expand Up @@ -571,7 +577,7 @@ let actionReducer = (state: t, action: action) => {
...state,
chainManager: updatedChainManager,
}

let nextState = updateLatestProcessedBlocks(~state=nextState, ~latestProcessedBlocks)
// This ONLY updates the metrics - no logic is performed.
nextState.chainManager.chainFetchers
->ChainMap.entries
Expand All @@ -581,8 +587,11 @@ let actionReducer = (state: t, action: action) => {
).blockNumber

Prometheus.setFetchedEventsUntilHeight(~blockNumber=highestFetchedBlockOnChain, ~chain)
switch chainFetcher.latestProcessedBlock {
| Some(blockNumber) => Prometheus.setProcessedUntilHeight(~blockNumber, ~chain)
| None => ()
}
})
let nextState = updateLatestProcessedBlocks(~state=nextState, ~latestProcessedBlocks)
(nextState, nextTasks)

| EventBatchProcessed({latestProcessedBlocks, dynamicContractRegistrations: None}) =>
Expand Down Expand Up @@ -625,9 +634,7 @@ let actionReducer = (state: t, action: action) => {
let chainFetchers = state.chainManager.chainFetchers->ChainMap.mapWithKey((chain, cf) => {
{
...cf,
partitionedFetchState: (
fetchStatesMap->ChainMap.get(chain)
).partitionedFetchState->PartitionedFetchState.syncStateOnQueueUpdate,
partitionedFetchState: ChainMap.get(fetchStatesMap, chain).partitionedFetchState,
}
})

Expand Down
5 changes: 1 addition & 4 deletions scenarios/erc20_multichain_factory/test/ChainDataHelpers.res
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,10 @@ module Stubs = {
~chainWorker,
~currentBlockHeight,
~chain,
~dispatchAction,
~isPreRegisteringDynamicContracts,
) => {
(logger, currentBlockHeight, chainWorker, isPreRegisteringDynamicContracts)->ignore

let response = stubData->getMockChainData(chain)->MockChainData.executeQuery(query)
dispatchAction(GlobalState.BlockRangeResponse(chain, response))
stubData->getMockChainData(chain)->MockChainData.executeQuery(query)->Ok
}

//Stub for getting block hashes instead of the worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ describe("dynamic contract event processing test", () => {
~checkContractIsRegistered=(~chain as _, ~contractAddress as _, ~contractName as _) => false,
)
switch res {
| Ok({dynamicContractRegistrations: Some({registrations, unprocessedBatch})}) =>
let individualRegistrations = registrations->Array.flatMap(r => r.dynamicContracts)
| Ok({dynamicContractRegistrations: Some({dynamicContractsByChain, unprocessedBatch})}) =>
let individualRegistrations = dynamicContractsByChain->Js.Dict.values->Utils.Array.flatten
Assert.equal(
individualRegistrations->Js.Array2.length,
1,
Expand Down Expand Up @@ -119,8 +119,8 @@ describe("dynamic contract event processing test", () => {
contractAddress == Mock.mockDyamicToken3,
)
switch res {
| Ok({dynamicContractRegistrations: Some({registrations, unprocessedBatch})}) =>
let individualRegistrations = registrations->Array.flatMap(r => r.dynamicContracts)
| Ok({dynamicContractRegistrations: Some({dynamicContractsByChain, unprocessedBatch})}) =>
let individualRegistrations = dynamicContractsByChain->Js.Dict.values->Utils.Array.flatten
Assert.equal(
individualRegistrations->Js.Array2.length,
1,
Expand Down Expand Up @@ -149,8 +149,8 @@ describe("dynamic contract event processing test", () => {
contractAddress == Mock.mockDyamicToken3,
)
switch res {
| Ok({dynamicContractRegistrations: Some({registrations, unprocessedBatch})}) =>
let individualRegistrations = registrations->Array.flatMap(r => r.dynamicContracts)
| Ok({dynamicContractRegistrations: Some({dynamicContractsByChain, unprocessedBatch})}) =>
let individualRegistrations = dynamicContractsByChain->Js.Dict.values->Utils.Array.flatten
Assert.equal(
individualRegistrations->Js.Array2.length,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ describe("Dynamic contract restart resistance test", () => {
DbHelpers.runUpDownMigration()
})

let getFetchingDcAddressesFromDbState = async (~chainId=1) => {
let chainFetcher = await ChainFetcher.makeFromDbState(
config.chainMap->ChainMap.get(ChainMap.Chain.makeUnsafe(~chainId)),
~maxAddrInPartition=Env.maxAddrInPartition,
)

let dcs =
chainFetcher.partitionedFetchState.partitions->Array.flatMap(p =>
p.partitions->Array.flatMap(p => p.dynamicContracts->Array.map(dc => dc.contractAddress))
)

dcs
}

Async.it(
"Indexer should restart with only the dynamic contracts up to the block that was processed",
async () => {
Expand Down Expand Up @@ -199,24 +213,9 @@ describe("Dynamic contract restart resistance test", () => {

let chainConfig = config.chainMap->ChainMap.get(ChainMap.Chain.makeUnsafe(~chainId=1))

let restartedChainFetcher = await ChainFetcher.makeFromDbState(
chainConfig,
~maxAddrInPartition=Env.maxAddrInPartition,
)

let restartedFetchState = switch restartedChainFetcher.partitionedFetchState.partitions {
| [partition] => partition
| _ => failwith("No partitions found in restarted chain fetcher")
}

let dynamicContracts =
restartedFetchState.mostBehindRegister.dynamicContracts
->Belt.Map.valuesToArray
->Array.flatMap(set => set->Belt.Set.String.toArray)

Assert.deepEqual(
dynamicContracts,
[Mock.mockDynamicToken1->Address.toString],
await getFetchingDcAddressesFromDbState(),
[Mock.mockDynamicToken1],
~message="Should have registered only the dynamic contract up to the block that was processed",
)

Expand All @@ -242,24 +241,14 @@ describe("Dynamic contract restart resistance test", () => {
~maxAddrInPartition=Env.maxAddrInPartition,
)

let restartedFetchState = switch restartedChainFetcher.partitionedFetchState.partitions {
| [partition] => partition
| _ => failwith("No partitions found in restarted chain fetcher with")
}

let dynamicContracts =
restartedFetchState.mostBehindRegister.dynamicContracts
->Belt.Map.valuesToArray
->Array.flatMap(set => set->Belt.Set.String.toArray)

Assert.deepEqual(
restartedChainFetcher.dynamicContractPreRegistration->Option.getExn->Js.Dict.keys,
[Mock.mockDynamicToken1->Address.toString, Mock.mockDynamicToken2->Address.toString],
~message="Should return all the dynamic contracts related to handler that uses preRegistration",
)

Assert.deepEqual(
dynamicContracts,
await getFetchingDcAddressesFromDbState(),
[],
~message="Should have no dynamic contracts yet since this tests the case starting in preregistration",
)
Expand Down Expand Up @@ -300,22 +289,9 @@ describe("Dynamic contract restart resistance test", () => {
// toBlock: 3
await dispatchAllTasks()

let restartedChainFetcher = await ChainFetcher.makeFromDbState(
chainConfig,
~maxAddrInPartition=Env.maxAddrInPartition,
)

let restartedFetchState =
restartedChainFetcher.partitionedFetchState.partitions->Array.get(0)->Option.getExn

let dynamicContracts =
restartedFetchState.mostBehindRegister.dynamicContracts
->Belt.Map.valuesToArray
->Array.flatMap(set => set->Belt.Set.String.toArray)

Assert.deepEqual(
dynamicContracts,
[Mock.mockDynamicToken1->Address.toString, Mock.mockDynamicToken2->Address.toString],
await getFetchingDcAddressesFromDbState(),
[Mock.mockDynamicToken1, Mock.mockDynamicToken2],
~message="Should have registered both dynamic contracts up to the block that was processed",
)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ describe("Dynamic contract rollback test", () => {

let getFetchState = chain => {
let cf = chain->getChainFetcher
cf.partitionedFetchState
cf.partitionedFetchState.partitions->Js.Array2.unsafe_get(0)
}

let getLatestFetchedBlock = chain => {
chain->getFetchState->PartitionedFetchState.getLatestFullyFetchedBlock
chain->getFetchState->FetchState.getLatestFullyFetchedBlock
}

let getTokenBalance = (~accountAddress) => chain => {
Expand Down Expand Up @@ -361,13 +361,12 @@ describe("Dynamic contract rollback test", () => {
NextQuery(CheckAllChains),
]

let getFetchStateRegisterId = () =>
switch getFetchState(Mock.Chain1.chain).partitions {
| [p] => (p->FetchState.mergeBeforeNextQuery).mostBehindRegister.id
| _ => raise(Not_found)
}
Assert.deepEqual(
getFetchState(Mock.Chain1.chain).partitions->Array.map(p => p.id),
["0"],
~message=`Should have only one partition`,
)

Assert.deepEqual(FetchState.rootRegisterId, getFetchStateRegisterId())
//Process batch 2 of events
//And make queries (C)
await dispatchAllTasks()
Expand Down Expand Up @@ -404,10 +403,26 @@ describe("Dynamic contract rollback test", () => {
~message="Next round of tasks after query C",
)

let partitions = getFetchState(Mock.Chain1.chain).partitions
Assert.deepEqual(
getFetchStateRegisterId(),
FetchState.makeDynamicContractRegisterId({blockNumber: 3, logIndex: 0}),
partitions->Array.map(p => p.id),
["0", "1"],
~message=`Should get a dc partition`,
)
let dcPartition = partitions->Js.Array2.unsafe_get(1)
Assert.deepEqual(
dcPartition.latestFetchedBlock,
{
{blockNumber: 2, blockTimestamp: 0}
},
~message=`Should get a root partition`,
)
Assert.deepEqual(
dcPartition.dynamicContracts->Js.Array2.length,
1,
~message=`Should have a single dc`,
)

await makeAssertions(
~queryName="C",
~chain1LatestFetchBlock=2, //dynamic contract registered and fetchState set to block before registration (dyn contract query not yet made)
Expand Down
Loading
Loading