-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Fetch State to allow dynamically manage partitions #405
Conversation
4f43c5a
to
118b5c6
Compare
) | ||
| Some(nextResponse) => | ||
switch nextResponse.reorgGuard.firstBlockParentNumberAndHash { | ||
| None => Js.Exn.raiseError("Unexpected, nextResponse reorgGuard is missing firstBlockParentNumberAndHash") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually got the error at block 30,948,833 of Gnosis indexer.
@@ -205,6 +225,9 @@ Helper to check if a value exists in an array | |||
}) | |||
interleaved | |||
} | |||
|
|||
@send | |||
external flatten: (array<array<'a>>, @as(json`1`) _) => array<'a> = "flat" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, never done a binding like this with hardcoded params :) does @as(1) not work as well out of interest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it works 😁 I didn't know that it does :)
->Utils.Dict.dangerouslyGetNonOption(chain->ChainMap.Chain.toString) | ||
->Option.mapWithDefault(false, dcs => | ||
dcs->Array.some(dc => | ||
dc.contractType == contractType && dc.contractAddress == contractAddress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think we need to remove this check. I removed it in all the other places I think this was left by mistake. We are not allowing the same address registered to multiple contracts.
dc.contractType == contractType && dc.contractAddress == contractAddress | |
dc.contractAddress == contractAddress |
fetchedEventQueue: [], | ||
contractAddressMapping: restContractAddressMapping, | ||
dynamicContracts: restDcs, | ||
latestFetchedBlock, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be any validation in this function that latestFetchedBlock is exactly the same between the partitions being merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it's not necessary, since the function won't run when the latestFetchedBlock is not the same (according to the setQueryResponse logic). And this is checked by integration tests.
switch query { | ||
| PartitionQuery({partitionId}) | ||
| MergeQuery({partitionId}) => | ||
switch partitions->Array.getIndexBy(p => p.id === partitionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually makes me think we should rather use a dict over an array. Not a big deal though is these arrays will never get massive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we could always just maintain indexes by using delete on an index of the array 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During the refactoring I changed it to dict, but then returned it back, because it was more annoying to write mocks in tests :)
~isFetchingAtHead=fetchState.isFetchingAtHead || | ||
currentBlockHeight <= latestFetchedBlock.blockNumber, | ||
~firstEventBlockNumber=switch newItems->Array.get(0) { | ||
| Some(newFirstItem) => | ||
Utils.Math.minOptInt(fetchState.firstEventBlockNumber, Some(newFirstItem.blockNumber)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can just use min and not minOptInt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DZakh just checking if you saw this comment
|
||
{ | ||
maxAddrInPartition, | ||
partitions, | ||
partitions: [partition], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is temporary right now right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the PartitionedFetchState is now 100% useless, I didn't remove it in the PR only to minimize already huge diff.
let partitionId = switch query { | ||
| PartitionQuery({partitionId}) | ||
| MergeQuery({partitionId}) => partitionId | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe query should keep shared fields outside of the variant constr payload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll consider this suggestion, when I work on partition kind.
| PartitionQuery({partitionId, fromBlock, toBlock, contractAddressMapping}) => | ||
chainWorker->ChainWorker.fetchBlockRange( | ||
~fromBlock, | ||
~toBlock, | ||
~contractAddressMapping, | ||
~partitionId, | ||
~chain, | ||
~currentBlockHeight, | ||
~isPreRegisteringDynamicContracts, | ||
~logger, | ||
//Only apply wildcards on the first partition | ||
//to avoid duplicate wildcard queries | ||
~shouldApplyWildcards=partitionId === "0", | ||
) | ||
| MergeQuery({partitionId, fromBlock, toBlock, contractAddressMapping}) => | ||
chainWorker->ChainWorker.fetchBlockRange( | ||
~fromBlock, | ||
~toBlock=Some(toBlock), | ||
~contractAddressMapping, | ||
~partitionId, | ||
~chain, | ||
~currentBlockHeight, | ||
~isPreRegisteringDynamicContracts, | ||
~logger, | ||
//Only apply wildcards on the first partition | ||
//to avoid duplicate wildcard queries | ||
~shouldApplyWildcards=partitionId === "0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two seem so similar it looks like we should be able to have more shared data in query. Maybe even just currying the function above would look better 😝
} = chainFetcher | ||
|
||
await chainFetcher.sourceManager->SourceManager.fetchNext( | ||
~fetchState=partitionedFetchState.partitions->Js.Array2.unsafe_get(0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think lets get rid of partitionedFetchState ASAP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do it in the next PR
let mergedRegister = switch registerByLatestBlock->Utils.Dict.dangerouslyGetNonOption(key) { | ||
| Some(next) => register->mergeWithNextRegister(~next) | ||
| None => register | ||
let firstPartition = partitions->Js.Array2.unsafe_get(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note here @DZakh, I've been testing your branch on an indexer that reorgs a lot this morning and a reorg crashes the indexer here. First partition does not exist in the case I ran in to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it yesterday, could you double check that you have the latest version
10fdcd9
@@ -100,16 +100,14 @@ let checkContractIsInCurrentRegistrations = ( | |||
~dynamicContractRegistrations: option<dynamicContractRegistrations>, | |||
~chain, | |||
~contractAddress, | |||
~contractType, | |||
~contractType as _, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this field? Or add todo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do in the next PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool just don't want to forget about it 👍🏼
let fetchStatesMap = fetchStatesMap->ChainMap.map(v => { | ||
{ | ||
...v, | ||
partitionedFetchState: v.partitionedFetchState->PartitionedFetchState.syncStateOnQueueUpdate, | ||
} | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be done inside getFetchStateWithData function on the shouldDeepCopy flag or with a different flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During shouldDeepCopy
, we don't know the number of events taken from the queue to batch, so we can't correctly update the state. I was actually thinking of changing the way how the batch is created so the part of the code is more reliable, but I scoped it out since this was not mandatory for the code to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, well what should probably happen is the popItem function should decrement the queue size. But all good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken in the most I can with reviewing. Everything looks good to me. It's difficult to comprehend the whole change and everything it touches but I think we can move forward 👍🏼
Nice refactor Dmitry, lots of improvements all over.
Refactor FetchState to allow dynamically control partitions.