diff --git a/codegenerator/cli/templates/static/codegen/src/Config.res b/codegenerator/cli/templates/static/codegen/src/Config.res index 8e06062a6..cf77e3a27 100644 --- a/codegenerator/cli/templates/static/codegen/src/Config.res +++ b/codegenerator/cli/templates/static/codegen/src/Config.res @@ -48,6 +48,19 @@ let shouldPreRegisterDynamicContracts = (chainConfig: chainConfig) => { }) } +let hasWildcard = (chainConfig: chainConfig) => { + chainConfig.contracts->Array.some(contract => { + contract.events->Array.some(event => { + let module(Event) = event + + let {isWildcard} = + Event.handlerRegister->Types.HandlerTypes.Register.getEventOptions + + isWildcard + }) + }) +} + type historyFlag = FullHistory | MinHistory type rollbackFlag = RollbackOnReorg | NoRollback type historyConfig = {rollbackFlag: rollbackFlag, historyFlag: historyFlag} diff --git a/codegenerator/cli/templates/static/codegen/src/Index.res b/codegenerator/cli/templates/static/codegen/src/Index.res index 373efbb3c..1e4d74132 100644 --- a/codegenerator/cli/templates/static/codegen/src/Index.res +++ b/codegenerator/cli/templates/static/codegen/src/Index.res @@ -71,9 +71,9 @@ let makeAppState = (globalState: GlobalState.t): EnvioInkApp.appState => { globalState.chainManager.chainFetchers ->ChainMap.values ->Array.map(cf => { - let {numEventsProcessed, fetchState, numBatchesFetched} = cf + let {numEventsProcessed, partitionedFetchState, numBatchesFetched} = cf let latestFetchedBlockNumber = PartitionedFetchState.getLatestFullyFetchedBlock( - fetchState, + partitionedFetchState, ).blockNumber let hasProcessedToEndblock = cf->ChainFetcher.hasProcessedToEndblock let currentBlockHeight = diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res index 9d632f307..9bf3414ab 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res @@ -10,7 +10,7 @@ type processingFilter = { type addressToDynContractLookup = dict type t = { logger: Pino.t, - fetchState: PartitionedFetchState.t, + partitionedFetchState: PartitionedFetchState.t, sourceManager: SourceManager.t, chainConfig: Config.chainConfig, //The latest known block of the chain @@ -51,22 +51,25 @@ let make = ( let module(ChainWorker) = chainConfig.chainWorker logger->Logging.childInfo("Initializing ChainFetcher with " ++ ChainWorker.name ++ " worker") - let fetchState = PartitionedFetchState.make( + let isPreRegisteringDynamicContracts = dynamicContractPreRegistration->Option.isSome + let partitionedFetchState = PartitionedFetchState.make( ~maxAddrInPartition, ~staticContracts, ~dynamicContractRegistrations, ~startBlock, ~endBlock, + ~isPreRegisteringDynamicContracts, + ~hasWildcard=Config.hasWildcard(chainConfig), ~logger, ) { logger, chainConfig, - sourceManager: SourceManager.make(~maxPartitionConcurrency=Env.maxPartitionConcurrency, ~logger), + sourceManager: SourceManager.make(~maxPartitionConcurrency=Env.maxPartitionConcurrency, ~endBlock, ~logger), lastBlockScannedHashes, currentBlockHeight: 0, - fetchState, + partitionedFetchState, dbFirstEventBlockNumber, latestProcessedBlock, timestampCaughtUpToHeadOrEndblock, @@ -303,7 +306,7 @@ let applyProcessingFilters = ( //any that meet the cleanup condition let cleanUpProcessingFilters = ( processingFilters: array, - ~fetchState as {partitions}: PartitionedFetchState.t, + ~partitionedFetchState as {partitions}: PartitionedFetchState.t, ) => { switch processingFilters->Array.keep(processingFilter => partitions->Array.reduce(false, (accum, partition) => { @@ -333,7 +336,7 @@ let updateFetchState = ( | Some(processingFilters) => fetchedEvents->applyProcessingFilters(~processingFilters) } - self.fetchState + self.partitionedFetchState ->PartitionedFetchState.update( ~id, ~latestFetchedBlock={ @@ -344,12 +347,12 @@ let updateFetchState = ( ~currentBlockHeight, ~chain=self.chainConfig.chain, ) - ->Result.map(fetchState => { + ->Result.map(partitionedFetchState => { { ...self, - fetchState, + partitionedFetchState, processingFilters: switch self.processingFilters { - | Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~fetchState) + | Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~partitionedFetchState) | None => None }, } @@ -368,7 +371,7 @@ let hasProcessedToEndblock = (self: t) => { } let hasNoMoreEventsToProcess = (self: t, ~hasArbQueueEvents) => { - !hasArbQueueEvents && self.fetchState->PartitionedFetchState.queueSize === 0 + !hasArbQueueEvents && self.partitionedFetchState->PartitionedFetchState.queueSize === 0 } /** @@ -424,12 +427,12 @@ let getLastScannedBlockData = lastBlockData => { } let isFetchingAtHead = (chainFetcher: t) => - chainFetcher.fetchState->PartitionedFetchState.isFetchingAtHead + chainFetcher.partitionedFetchState->PartitionedFetchState.isFetchingAtHead let getFirstEventBlockNumber = (chainFetcher: t) => Utils.Math.minOptInt( chainFetcher.dbFirstEventBlockNumber, - chainFetcher.fetchState->PartitionedFetchState.getFirstEventBlockNumber, + chainFetcher.partitionedFetchState->PartitionedFetchState.getFirstEventBlockNumber, ) let isPreRegisteringDynamicContracts = (chainFetcher: t) => diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res index c3080c7e3..c12c9685a 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res @@ -326,8 +326,8 @@ let getFetchStateWithData = (self: t, ~shouldDeepCopy=false): ChainMap.tChainMap.map(cf => { { partitionedFetchState: shouldDeepCopy - ? cf.fetchState->PartitionedFetchState.copy - : cf.fetchState, + ? cf.partitionedFetchState->PartitionedFetchState.copy + : cf.partitionedFetchState, heighestBlockBelowThreshold: cf->ChainFetcher.getHeighestBlockBelowThreshold, } }) @@ -417,7 +417,7 @@ let createBatch = (self: t, ~maxBatchSize: int, ~onlyBelowReorgThreshold: bool) ->ChainMap.values ->Array.map(fetcher => ( fetcher.chainConfig.chain->ChainMap.Chain.toString, - fetcher.fetchState->PartitionedFetchState.queueSize, + fetcher.partitionedFetchState->PartitionedFetchState.queueSize, )) ->Array.concat([("arbitrary", self.arbitraryEventQueue->Array.length)]) ->Js.Dict.fromArray diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res index 17420c9a8..cb20c18f2 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res @@ -110,7 +110,7 @@ type rec register = { ...registerData, } and registerType = - | RootRegister({endBlock: option}) + | RootRegister | DynamicContractRegister({id: EventUtils.eventIndex, nextRegister: register}) type dynamicContractRegistration = { @@ -119,7 +119,10 @@ type dynamicContractRegistration = { registeringEventChain: ChainMap.Chain.t, dynamicContracts: array, } +type kind = Normal | Wildcard type t = { + partitionId: int, + kind: kind, baseRegister: register, pendingDynamicContracts: array, isFetchingAtHead: bool, @@ -189,7 +192,7 @@ let shallowCopyRegister = (register: register) => { let copy = (self: t) => { let rec loop = (register: register, ~parent=?) => switch register.registerType { - | RootRegister(_) => + | RootRegister => let copied = register->shallowCopyRegister switch parent { | Some(parent) => parent->Parent.joinChild(copied) @@ -204,6 +207,8 @@ let copy = (self: t) => { let baseRegister = loop(self.baseRegister) let pendingDynamicContracts = self.pendingDynamicContracts->Array.copy { + partitionId: self.partitionId, + kind: self.kind, baseRegister, pendingDynamicContracts, isFetchingAtHead: self.isFetchingAtHead, @@ -268,7 +273,7 @@ let mergeIntoNextRegistered = (self: register) => { ), }, } - | RootRegister(_) => self //already merged + | RootRegister => self //already merged } } @@ -290,7 +295,7 @@ Constructs id from a register */ let getRegisterId = (self: register) => { switch self.registerType { - | RootRegister(_) => Root + | RootRegister => Root | DynamicContractRegister({id}) => DynamicContract(id) } } @@ -338,7 +343,7 @@ let rec updateInternal = ( } switch (register.registerType, id) { - | (RootRegister(_), Root) => + | (RootRegister, Root) => register ->updateRegister(~reversedNewItems, ~latestFetchedBlock) ->handleParent @@ -353,7 +358,7 @@ let rec updateInternal = ( ~reversedNewItems, ~parent=register->Parent.make(~dynamicContractId, ~parent), ) - | (RootRegister(_), DynamicContract(_)) => Error(UnexpectedRegisterDoesNotExist(id)) + | (RootRegister, DynamicContract(_)) => Error(UnexpectedRegisterDoesNotExist(id)) } } @@ -425,7 +430,7 @@ let rec addDynamicContractRegister = ( let latestFetchedBlockNumber = registeringEventBlockNumber - 1 switch self.registerType { - | RootRegister(_) => self->addToHead + | RootRegister => self->addToHead | DynamicContractRegister(_) if latestFetchedBlockNumber <= self.latestFetchedBlock.blockNumber => self->addToHead | DynamicContractRegister({id: dynamicContractId, nextRegister}) => @@ -470,7 +475,7 @@ exception NextRegisterIsLessThanCurrent let isRootRegister = registerType => switch registerType { - | RootRegister(_) => true + | RootRegister => true | DynamicContractRegister(_) => false } @@ -481,7 +486,7 @@ If no merging happens, None is returned let rec pruneAndMergeNextRegistered = (register: register, ~isMerged=false) => { let merged = isMerged ? Some(register) : None switch register.registerType { - | RootRegister(_) => merged + | RootRegister => merged | DynamicContractRegister({nextRegister}) if register.latestFetchedBlock.blockNumber < nextRegister.latestFetchedBlock.blockNumber => merged @@ -521,7 +526,7 @@ Returns Error if the node with given id cannot be found (unexpected) newItems are ordered earliest to latest (as they are returned from the worker) */ let update = ( - {baseRegister, pendingDynamicContracts, isFetchingAtHead}: t, + {baseRegister, pendingDynamicContracts, isFetchingAtHead, partitionId, kind}: t, ~id, ~latestFetchedBlock: blockNumberAndTimestamp, ~newItems, @@ -536,6 +541,8 @@ let update = ( updatedRegister->addDynamicContractRegisters(pendingDynamicContracts) let maybeMerged = withNewDynamicContracts->pruneAndMergeNextRegistered { + partitionId, + kind, baseRegister: maybeMerged->Option.getWithDefault(withNewDynamicContracts), pendingDynamicContracts: [], isFetchingAtHead, @@ -545,6 +552,7 @@ let update = ( type nextQuery = { fetchStateRegisterId: id, + kind: kind, //used to id the partition of the fetchstate partitionId: int, fromBlock: int, @@ -558,9 +566,10 @@ let getQueryLogger = ( ) => { let fetchStateRegister = fetchStateRegisterId->registerIdToString let allAddresses = contractAddressMapping->ContractAddressingMap.getAllAddresses - let addresses = allAddresses->Js.Array2.slice(~start=0, ~end_=3)->Array.map(addr => addr->Address.toString) + let addresses = + allAddresses->Js.Array2.slice(~start=0, ~end_=3)->Array.map(addr => addr->Address.toString) let restCount = allAddresses->Array.length - addresses->Array.length - if restCount > 0 { + if restCount > 0 { addresses->Js.Array2.push(`... and ${restCount->Int.toString} more`)->ignore } let params = { @@ -641,21 +650,23 @@ Gets the next query either with a to block of the nextRegistered latestBlockNumber to catch up and merge or None if we don't care about an end block of a query */ -let getNextQuery = ({baseRegister}: t, ~partitionId) => { +let getNextQuery = ({baseRegister, partitionId, kind}: t, ~endBlock) => { let fromBlock = getNextFromBlock(baseRegister) - switch baseRegister.registerType { - | RootRegister({endBlock: Some(endBlock)}) if fromBlock > endBlock => Done - | RootRegister({endBlock}) => + switch (baseRegister.registerType, endBlock) { + | (RootRegister, Some(endBlock)) if fromBlock > endBlock => Done + | (RootRegister, _) => NextQuery({ partitionId, + kind, fetchStateRegisterId: Root, fromBlock, toBlock: endBlock, contractAddressMapping: baseRegister.contractAddressMapping, }) - | DynamicContractRegister({id, nextRegister: {latestFetchedBlock}}) => + | (DynamicContractRegister({id, nextRegister: {latestFetchedBlock}}), _) => NextQuery({ partitionId, + kind, fetchStateRegisterId: DynamicContract(id), fromBlock, toBlock: Some(latestFetchedBlock.blockNumber), @@ -742,7 +753,7 @@ let rec findRegisterIdWithEarliestQueueItem = (~currentEarliestRegister=?, regis } switch register.registerType { - | RootRegister(_) => currentEarliestRegister->getRegisterId + | RootRegister => currentEarliestRegister->getRegisterId | DynamicContractRegister({nextRegister}) => nextRegister->findRegisterIdWithEarliestQueueItem(~currentEarliestRegister) } @@ -756,11 +767,11 @@ Recurses through registers and Errors if ID does not exist */ let rec popQItemAtRegisterId = (register: register, ~id) => { switch register.registerType { - | RootRegister(_) + | RootRegister | DynamicContractRegister(_) if id == register->getRegisterId => register->getEarliestEventInRegister->Ok | DynamicContractRegister({nextRegister}) => nextRegister->popQItemAtRegisterId(~id) - | RootRegister(_) => Error(UnexpectedRegisterDoesNotExist(id)) + | RootRegister => Error(UnexpectedRegisterDoesNotExist(id)) } } @@ -807,11 +818,12 @@ let getEarliestEvent = (self: t) => { Instantiates a fetch state with root register */ let make = ( + ~partitionId, ~staticContracts, ~dynamicContractRegistrations: array, ~startBlock, - ~endBlock, ~isFetchingAtHead, + ~kind=Normal, ~logger as _, ): t => { let contractAddressMapping = ContractAddressingMap.make() @@ -839,7 +851,7 @@ let make = ( }) let baseRegister = { - registerType: RootRegister({endBlock: endBlock}), + registerType: RootRegister, latestFetchedBlock: { blockTimestamp: 0, // Here's a bug that startBlock: 1 won't work @@ -852,6 +864,8 @@ let make = ( } { + partitionId, + kind, baseRegister, pendingDynamicContracts: [], isFetchingAtHead, @@ -864,7 +878,7 @@ Calculates the cummulative queue sizes in all registers let rec registerQueueSize = (register: register, ~accum=0) => { let accum = register.fetchedEventQueue->Array.length + accum switch register.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->registerQueueSize(~accum) } } @@ -927,7 +941,7 @@ let rec checkBaseRegisterContainsRegisteredContract = ( true | _ => switch register.registerType { - | RootRegister(_) => false + | RootRegister => false | DynamicContractRegister({nextRegister}) => nextRegister->checkBaseRegisterContainsRegisteredContract( ~contractName, @@ -1039,7 +1053,7 @@ let rec rollbackRegister = ( switch self.registerType { //Case 1 Root register that has only fetched up to a confirmed valid block number //Should just return itself unchanged - | RootRegister(_) if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => + | RootRegister if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => self->handleParent //Case 2 Dynamic register that has only fetched up to a confirmed valid block number //Should just return itself, with the next register rolled back recursively @@ -1053,7 +1067,7 @@ let rec rollbackRegister = ( //Case 3 Root register that has fetched further than the confirmed valid block number //Should prune its queue and set its latest fetched block data to the latest known confirmed block - | RootRegister(_) => + | RootRegister => { ...self, fetchedEventQueue: self.fetchedEventQueue->pruneQueueFromFirstChangeEvent(~firstChangeEvent), @@ -1113,10 +1127,10 @@ let rollback = (self: t, ~lastScannedBlock, ~firstChangeEvent) => { * Returns a boolean indicating whether the fetch state is actively indexing * used for comparing event queues in the chain manager */ -let isActivelyIndexing = ({baseRegister}: t) => { +let isActivelyIndexing = ({baseRegister}: t, ~endBlock) => { // nesting to limit additional unnecessary computation - switch baseRegister.registerType { - | RootRegister({endBlock: Some(endBlock)}) => + switch (baseRegister.registerType, endBlock) { + | (RootRegister, Some(endBlock)) => let isPastEndblock = baseRegister.latestFetchedBlock.blockNumber >= endBlock if isPastEndblock { baseRegister->registerQueueSize > 0 @@ -1131,7 +1145,7 @@ let getNumContracts = (self: t) => { let rec loop = (register: register, ~accum=0) => { let accum = accum + register.contractAddressMapping->ContractAddressingMap.addressCount switch register.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->loop(~accum) } } @@ -1144,7 +1158,7 @@ Helper functions for debugging and printing module DebugHelpers = { let registerToString = register => switch register { - | RootRegister(_) => "root" + | RootRegister => "root" | DynamicContractRegister({id: {blockNumber, logIndex}}) => `DC-${blockNumber->Int.toString}-${logIndex->Int.toString}` } @@ -1153,7 +1167,7 @@ module DebugHelpers = { let next = (register.registerType->registerToString, register.fetchedEventQueue->Array.length) let accum = list{next, ...accum} switch register.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->getQueueSizesInternal(~accum) } } @@ -1164,7 +1178,7 @@ module DebugHelpers = { let rec numberRegistered = (~accum=0, self: register) => { let accum = accum + 1 switch self.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->numberRegistered(~accum) } } @@ -1172,7 +1186,7 @@ module DebugHelpers = { let rec getRegisterAddressMaps = (self: register, ~accum=[]) => { accum->Js.Array2.push(self.contractAddressMapping.nameByAddress)->ignore switch self.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->getRegisterAddressMaps(~accum) } } diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res index 485f753ed..527388b5c 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res @@ -14,29 +14,39 @@ type id = { fetchStateId: FetchState.id, } -type partition = { - fetchState: FetchState.t, - partitionId: partitionId, -} - let make = ( ~maxAddrInPartition, ~endBlock, ~staticContracts, ~dynamicContractRegistrations, ~startBlock, + ~isPreRegisteringDynamicContracts, + ~hasWildcard, ~logger, ) => { - let numAddresses = staticContracts->Array.length + dynamicContractRegistrations->Array.length - let partitions = [] - if numAddresses <= maxAddrInPartition { + let totalAddressesNumber = staticContracts->Array.length + dynamicContractRegistrations->Array.length + + if hasWildcard && !isPreRegisteringDynamicContracts { + let wildcardPartition = FetchState.make( + ~partitionId=partitions->Array.length, + ~staticContracts=[], + ~dynamicContractRegistrations=[], + ~startBlock, + ~kind=Wildcard, + ~logger, + ~isFetchingAtHead=false, + ) + partitions->Js.Array2.push(wildcardPartition)->ignore + } + + if totalAddressesNumber <= maxAddrInPartition { let partition = FetchState.make( + ~partitionId=partitions->Array.length, ~staticContracts, ~dynamicContractRegistrations, ~startBlock, - ~endBlock, ~logger, ~isFetchingAtHead=false, ) @@ -50,10 +60,10 @@ let make = ( staticContractsClone->Js.Array2.removeCountInPlace(~pos=0, ~count=maxAddrInPartition) let staticContractPartition = FetchState.make( + ~partitionId=partitions->Array.length, ~staticContracts=staticContractsChunk, ~dynamicContractRegistrations=[], ~startBlock, - ~endBlock, ~logger, ~isFetchingAtHead=false, ) @@ -65,13 +75,13 @@ let make = ( //Add the rest of the static addresses filling the remainder of the partition with dynamic contract //registrations let remainingStaticContractsWithDynamicPartition = FetchState.make( + ~partitionId=partitions->Array.length, ~staticContracts=staticContractsClone, ~dynamicContractRegistrations=dynamicContractRegistrationsClone->Js.Array2.removeCountInPlace( ~pos=0, ~count=maxAddrInPartition - staticContractsClone->Array.length, ), ~startBlock, - ~endBlock, ~logger, ~isFetchingAtHead=false, ) @@ -86,10 +96,10 @@ let make = ( ) let dynamicContractPartition = FetchState.make( + ~partitionId=partitions->Array.length, ~staticContracts=[], ~dynamicContractRegistrations=dynamicContractRegistrationsChunk, ~startBlock, - ~endBlock, ~logger, ~isFetchingAtHead=false, ) @@ -134,8 +144,8 @@ let registerDynamicContracts = ( partitions->Utils.Array.setIndexImmutable(newestPartitionIndex, updated) } else { let newPartition = FetchState.make( + ~partitionId=partitions->Array.length, ~startBlock, - ~endBlock, ~logger, ~staticContracts=[], ~dynamicContractRegistrations=dynamicContractRegistration.dynamicContracts, @@ -191,17 +201,10 @@ let getReadyPartitions = ( ) => { let numPartitions = allPartitions->Array.length let maxPartitionQueueSize = maxPerChainQueueSize / numPartitions - - let readyPartitions = [] - allPartitions->Belt.Array.forEachWithIndex((partitionId, fetchState) => { - if ( - !(fetchingPartitions->Utils.Set.has(partitionId)) && + allPartitions->Js.Array2.filter(fetchState => { + !(fetchingPartitions->Utils.Set.has(fetchState.partitionId)) && fetchState->FetchState.isReadyForNextQuery(~maxQueueSize=maxPartitionQueueSize) - ) { - readyPartitions->Js.Array2.push({fetchState, partitionId})->ignore - } }) - readyPartitions } /** @@ -218,7 +221,7 @@ let rollback = (self: t, ~lastScannedBlock, ~firstChangeEvent) => { let getEarliestEvent = (self: t) => self.partitions->Array.reduce(None, (accum, fetchState) => { // If the fetch state has reached the end block we don't need to consider it - if fetchState->FetchState.isActivelyIndexing { + if fetchState->FetchState.isActivelyIndexing(~endBlock=self.endBlock) { let nextItem = fetchState->FetchState.getEarliestEvent switch accum { | Some(accumItem) if FetchState.qItemLt(accumItem, nextItem) => accum diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res index 631c7d933..21df9c91c 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res @@ -22,6 +22,7 @@ type partitionFetchingState = { // with a mutable state for easier reasoning and testing. type t = { logger: Pino.t, + endBlock: option, maxPartitionConcurrency: int, mutable isWaitingForNewBlock: bool, mutable allPartitionsFetchingState: array, @@ -33,8 +34,9 @@ type t = { mutable currentStateId: int, } -let make = (~maxPartitionConcurrency, ~logger) => { +let make = (~maxPartitionConcurrency, ~endBlock, ~logger) => { logger, + endBlock, maxPartitionConcurrency, isWaitingForNewBlock: false, // Don't prefill with empty partitionFetchingState, @@ -67,7 +69,7 @@ let fetchBatch = async ( // Reset instead of clear, so updating state from partitions from prev state doesn't corrupt data sourceManager.allPartitionsFetchingState = [] } - let {logger, allPartitionsFetchingState, maxPartitionConcurrency} = sourceManager + let {logger, endBlock, allPartitionsFetchingState, maxPartitionConcurrency} = sourceManager let fetchingPartitions = Utils.Set.make() // Js.Array2.forEachi automatically skips empty items @@ -84,15 +86,15 @@ let fetchBatch = async ( let mergedPartitions = Js.Dict.empty() let hasQueryWaitingForNewBlock = ref(false) - let queries = readyPartitions->Array.keepMap(({fetchState, partitionId}) => { + let queries = readyPartitions->Array.keepMap(fetchState => { let mergedFetchState = fetchState->FetchState.mergeRegistersBeforeNextQuery if mergedFetchState !== fetchState { - mergedPartitions->Js.Dict.set(partitionId->(Utils.magic: int => string), mergedFetchState) + mergedPartitions->Js.Dict.set(fetchState.partitionId->(Utils.magic: int => string), mergedFetchState) } - switch mergedFetchState->FetchState.getNextQuery(~partitionId) { + switch mergedFetchState->FetchState.getNextQuery(~endBlock) { | Done => None | NextQuery(nextQuery) => - switch allPartitionsFetchingState->Belt.Array.get(partitionId) { + switch allPartitionsFetchingState->Belt.Array.get(fetchState.partitionId) { // Deduplicate queries when fetchBatch is called after // isFetching was set to false, but state isn't updated with fetched data | Some({lastFetchedQueryId}) if lastFetchedQueryId === toQueryId(nextQuery) => None diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperFuelWorker.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperFuelWorker.res index 145df4ff3..8aec22636 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperFuelWorker.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperFuelWorker.res @@ -98,46 +98,17 @@ let makeWorkerConfigOrThrow = (~contracts: array, ~ } } -let makeGetRecieptsSelection = ( - ~wildcardLogDataRbs, +let logDataReceiptTypeSelection: array = [LogData] + +// only transactions with status 1 (success) +let txStatusSelection = [1] + +let makeGetNormalRecieptsSelection = ( ~nonWildcardLogDataRbsByContract, ~nonLogDataReceiptTypesByContract, - ~nonLogDataWildcardReceiptTypes, ~contracts: array, ) => { - let logDataReceiptTypeSelection: array = [LogData] - - // only transactions with status 1 (success) - let txStatusSelection = [1] - - let maybeWildcardNonLogDataSelection = switch nonLogDataWildcardReceiptTypes { - | [] => None - | nonLogDataWildcardReceiptTypes => - Some( - ( - { - receiptType: nonLogDataWildcardReceiptTypes, - txStatus: txStatusSelection, - }: HyperFuelClient.QueryTypes.receiptSelection - ), - ) - } - - let maybeWildcardLogDataSelection = switch wildcardLogDataRbs { - | [] => None - | wildcardLogDataRbs => - Some( - ( - { - receiptType: logDataReceiptTypeSelection, - txStatus: txStatusSelection, - rb: wildcardLogDataRbs, - }: HyperFuelClient.QueryTypes.receiptSelection - ), - ) - } - - (~contractAddressMapping, ~shouldApplyWildcards) => { + (~contractAddressMapping) => { let selection: array = [] //Instantiate each time to add new registered contract addresses @@ -179,25 +150,45 @@ let makeGetRecieptsSelection = ( } }) - if shouldApplyWildcards { - switch maybeWildcardNonLogDataSelection { - | None => () - | Some(wildcardNonLogDataSelection) => - selection - ->Array.push(wildcardNonLogDataSelection) - ->ignore - } - switch maybeWildcardLogDataSelection { - | None => () - | Some(wildcardLogSelection) => - selection - ->Array.push(wildcardLogSelection) - ->ignore - } - } + selection + } +} +let makeWildcardRecieptsSelection = (~wildcardLogDataRbs, ~nonLogDataWildcardReceiptTypes) => { + let selection: array = [] + + switch nonLogDataWildcardReceiptTypes { + | [] => () + | nonLogDataWildcardReceiptTypes => selection + ->Js.Array2.push( + ( + { + receiptType: nonLogDataWildcardReceiptTypes, + txStatus: txStatusSelection, + }: HyperFuelClient.QueryTypes.receiptSelection + ), + ) + ->ignore } + + switch wildcardLogDataRbs { + | [] => () + | wildcardLogDataRbs => + selection + ->Js.Array2.push( + ( + { + receiptType: logDataReceiptTypeSelection, + txStatus: txStatusSelection, + rb: wildcardLogDataRbs, + }: HyperFuelClient.QueryTypes.receiptSelection + ), + ) + ->ignore + } + + selection } module Make = ( @@ -264,20 +255,23 @@ module Make = ( ) } - let getRecieptsSelection = makeGetRecieptsSelection( - ~wildcardLogDataRbs=workerConfig.wildcardLogDataRbs, + let getNormalRecieptsSelection = makeGetNormalRecieptsSelection( ~nonWildcardLogDataRbsByContract=workerConfig.nonWildcardLogDataRbsByContract, ~nonLogDataReceiptTypesByContract=workerConfig.nonLogDataReceiptTypesByContract, - ~nonLogDataWildcardReceiptTypes=workerConfig.nonLogDataWildcardReceiptTypes, ~contracts=T.contracts, ) + let wildcardReceiptsSelection = makeWildcardRecieptsSelection( + ~nonLogDataWildcardReceiptTypes=workerConfig.nonLogDataWildcardReceiptTypes, + ~wildcardLogDataRbs=workerConfig.wildcardLogDataRbs, + ) + let getNextPage = async ( ~fromBlock, ~toBlock, ~logger, ~contractAddressMapping, - ~shouldApplyWildcards, + ~kind: FetchState.kind, ~isPreRegisteringDynamicContracts, ) => { //Instantiate each time to add new registered contract addresses @@ -285,7 +279,10 @@ module Make = ( //TODO: create receipt selections for dynamic contract preregistration Js.Exn.raiseError("HyperFuel does not support pre registering dynamic contracts yet") } else { - getRecieptsSelection(~contractAddressMapping, ~shouldApplyWildcards) + switch kind { + | Normal => getNormalRecieptsSelection(~contractAddressMapping) + | Wildcard => wildcardReceiptsSelection + } } let startFetchingBatchTimeRef = Hrtime.makeTimer() @@ -311,7 +308,14 @@ module Make = ( ) => { let mkLogAndRaise = ErrorHandling.mkLogAndRaise(~logger, ...) try { - let {fetchStateRegisterId, partitionId, fromBlock, contractAddressMapping, toBlock} = query + let { + fetchStateRegisterId, + partitionId, + fromBlock, + contractAddressMapping, + toBlock, + kind, + } = query let startFetchingBatchTimeRef = Hrtime.makeTimer() //fetch batch let {page: pageUnsafe, pageFetchTime} = await getNextPage( @@ -319,9 +323,7 @@ module Make = ( ~toBlock, ~contractAddressMapping, ~logger, - //Only apply wildcards on the first partition and root register - //to avoid duplicate wildcard queries - ~shouldApplyWildcards=fetchStateRegisterId == Root && partitionId == 0, + ~kind, ~isPreRegisteringDynamicContracts, ) @@ -530,7 +532,7 @@ module Make = ( let stats = { totalTimeElapsed, parsingTimeElapsed, - pageFetchTime + pageFetchTime, } { diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperSyncWorker.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperSyncWorker.res index a637f329f..0ce63910b 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperSyncWorker.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperSyncWorker.res @@ -121,12 +121,8 @@ let makeGetNextPage = ( }) }) - let getLogSelectionOrThrow = (~contractAddressMapping, ~shouldApplyWildcards): array< - LogSelection.t, - > => { - let nonWildcardLogSelection = contracts->Belt.Array.keepMap((contract): option< - LogSelection.t, - > => { + let getNormalLogSelectionOrThrow = (~contractAddressMapping): array => { + contracts->Belt.Array.keepMap((contract): option => { switch contractAddressMapping->ContractAddressingMap.getAddressesFromContractName( ~contractName=contract.name, ) { @@ -144,10 +140,6 @@ let makeGetNextPage = ( } } }) - - shouldApplyWildcards - ? nonWildcardLogSelection->Array.concat(wildcardLogSelection) - : nonWildcardLogSelection } async ( @@ -155,7 +147,7 @@ let makeGetNextPage = ( ~toBlock, ~logger, ~contractAddressMapping, - ~shouldApplyWildcards, + ~kind: FetchState.kind, ~isPreRegisteringDynamicContracts, ) => { //Instantiate each time to add new registered contract addresses @@ -168,7 +160,10 @@ let makeGetNextPage = ( if isPreRegisteringDynamicContracts { getContractPreRegistrationLogSelection(~contractAddressMapping) } else { - getLogSelectionOrThrow(~contractAddressMapping, ~shouldApplyWildcards) + switch kind { + | Normal => getNormalLogSelectionOrThrow(~contractAddressMapping) + | Wildcard => wildcardLogSelection + } } } catch { | exn => @@ -295,17 +290,22 @@ module Make = ( ) => { let mkLogAndRaise = ErrorHandling.mkLogAndRaise(~logger, ...) try { - let {fetchStateRegisterId, partitionId, fromBlock, contractAddressMapping, toBlock} = query + let { + fetchStateRegisterId, + partitionId, + fromBlock, + contractAddressMapping, + toBlock, + kind, + } = query let startFetchingBatchTimeRef = Hrtime.makeTimer() - //fetch batch + let {page: pageUnsafe, contractInterfaceManager, pageFetchTime} = await getNextPage( ~fromBlock, ~toBlock, ~contractAddressMapping, ~logger, - //Only apply wildcards on the first partition and root register - //to avoid duplicate wildcard queries - ~shouldApplyWildcards=fetchStateRegisterId == Root && partitionId == 0, //only + ~kind, ~isPreRegisteringDynamicContracts, ) @@ -476,7 +476,11 @@ module Make = ( | Some(eventMod) => let module(Event) = eventMod - switch contractInterfaceManager->ContractInterfaceManager.parseLogViemOrThrow(~address=log.address, ~topics=log.topics, ~data=log.data) { + switch contractInterfaceManager->ContractInterfaceManager.parseLogViemOrThrow( + ~address=log.address, + ~topics=log.topics, + ~data=log.data, + ) { | exception exn => handleDecodeFailure( ~eventMod, diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index 82258c30c..4a5ec0590 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -144,7 +144,7 @@ let updateChainMetadataTable = async (cm: ChainManager.t, ~throttler: Throttler. cm.chainFetchers ->ChainMap.values ->Belt.Array.map(cf => { - let latestFetchedBlock = cf.fetchState->PartitionedFetchState.getLatestFullyFetchedBlock + let latestFetchedBlock = cf.partitionedFetchState->PartitionedFetchState.getLatestFullyFetchedBlock let chainMetadata: DbFunctions.ChainMetadata.chainMetadata = { chainId: cf.chainConfig.chain->ChainMap.Chain.toChainId, startBlock: cf.chainConfig.startBlock, @@ -275,14 +275,14 @@ let updateLatestProcessedBlocks = ( let chainManager = { ...state.chainManager, chainFetchers: state.chainManager.chainFetchers->ChainMap.map(cf => { - let {chainConfig: {chain}, fetchState} = cf + let {chainConfig: {chain}, partitionedFetchState} = cf let {numEventsProcessed, latestProcessedBlock} = latestProcessedBlocks->ChainMap.get(chain) let hasArbQueueEvents = state.chainManager->ChainManager.hasChainItemsOnArbQueue(~chain) let hasNoMoreEventsToProcess = cf->ChainFetcher.hasNoMoreEventsToProcess(~hasArbQueueEvents) let latestProcessedBlock = if hasNoMoreEventsToProcess { - PartitionedFetchState.getLatestFullyFetchedBlock(fetchState).blockNumber->Some + PartitionedFetchState.getLatestFullyFetchedBlock(partitionedFetchState).blockNumber->Some } else { latestProcessedBlock } @@ -370,7 +370,7 @@ let handleBlockRangeResponse = (state, ~chain, ~response: ChainWorker.blockRange let latestProcessedBlock = if hasNoMoreEventsToProcess { PartitionedFetchState.getLatestFullyFetchedBlock( - updatedChainFetcher.fetchState, + updatedChainFetcher.partitionedFetchState, ).blockNumber->Some } else { updatedChainFetcher.latestProcessedBlock @@ -544,14 +544,14 @@ let actionReducer = (state: t, action: action) => { : (false, None) let updatedFetchState = - currentChainFetcher.fetchState->PartitionedFetchState.registerDynamicContracts( + currentChainFetcher.partitionedFetchState->PartitionedFetchState.registerDynamicContracts( registration, ~isFetchingAtHead, ) let updatedChainFetcher = { ...currentChainFetcher, - fetchState: updatedFetchState, + partitionedFetchState: updatedFetchState, timestampCaughtUpToHeadOrEndblock, } @@ -575,7 +575,7 @@ let actionReducer = (state: t, action: action) => { ->ChainMap.entries ->Array.forEach(((chain, chainFetcher)) => { let highestFetchedBlockOnChain = PartitionedFetchState.getLatestFullyFetchedBlock( - chainFetcher.fetchState, + chainFetcher.partitionedFetchState, ).blockNumber Prometheus.setFetchedEventsUntilHeight(~blockNumber=highestFetchedBlockOnChain, ~chain) @@ -608,14 +608,14 @@ let actionReducer = (state: t, action: action) => { (state, []) } else { updateChainFetcher(currentChainFetcher => { - let partitionsCopy = currentChainFetcher.fetchState.partitions->Js.Array2.copy + let partitionsCopy = currentChainFetcher.partitionedFetchState.partitions->Js.Array2.copy updatedPartitionIds->Js.Array2.forEach(partitionId => { let partition = updatedPartitions->Js.Dict.unsafeGet(partitionId) partitionsCopy->Js.Array2.unsafe_set(partitionId->(Utils.magic: string => int), partition) }) { ...currentChainFetcher, - fetchState: {...currentChainFetcher.fetchState, partitions: partitionsCopy}, + partitionedFetchState: {...currentChainFetcher.partitionedFetchState, partitions: partitionsCopy}, } }, ~chain, ~state) } @@ -640,7 +640,7 @@ let actionReducer = (state: t, action: action) => { let chainFetchers = state.chainManager.chainFetchers->ChainMap.mapWithKey((chain, cf) => { { ...cf, - fetchState: ChainMap.get(fetchStatesMap, chain).partitionedFetchState, + partitionedFetchState: ChainMap.get(fetchStatesMap, chain).partitionedFetchState, } }) @@ -729,7 +729,7 @@ let actionReducer = (state: t, action: action) => { let { chainConfig, logger, - fetchState: {startBlock, endBlock, maxAddrInPartition}, + partitionedFetchState: {startBlock, endBlock, maxAddrInPartition}, dynamicContractPreRegistration, } = cf @@ -832,10 +832,10 @@ let checkAndFetchForChain = ( ) => async chain => { let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain) if !isRollingBack(state) { - let {chainConfig: {chainWorker}, logger, currentBlockHeight, fetchState} = chainFetcher + let {chainConfig: {chainWorker}, logger, currentBlockHeight, partitionedFetchState} = chainFetcher await chainFetcher.sourceManager->SourceManager.fetchBatch( - ~allPartitions=fetchState.partitions, + ~allPartitions=partitionedFetchState.partitions, ~waitForNewBlock=(~currentBlockHeight, ~logger) => chainWorker->waitForNewBlock(~currentBlockHeight, ~logger), ~onNewBlock=(~currentBlockHeight) => dispatchAction(FinishWaitingForNewBlock({chain, currentBlockHeight})), ~currentBlockHeight, @@ -1186,8 +1186,8 @@ let injectedTaskReducer = ( ~blockNumber=firstChangeEvent.blockNumber, ) - let fetchState = - cf.fetchState->PartitionedFetchState.rollback( + let partitionedFetchState = + cf.partitionedFetchState->PartitionedFetchState.rollback( ~lastScannedBlock=rolledBackLastBlockData->ChainFetcher.getLastScannedBlockData, ~firstChangeEvent, ) @@ -1195,7 +1195,7 @@ let injectedTaskReducer = ( let rolledBackCf = { ...cf, lastBlockScannedHashes: rolledBackLastBlockData, - fetchState, + partitionedFetchState, } //On other chains, filter out evennts based on the first change present on the chain after the reorg rolledBackCf->ChainFetcher.addProcessingFilter( diff --git a/scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res b/scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res index 160c70038..fefbf22b9 100644 --- a/scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res +++ b/scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res @@ -204,7 +204,7 @@ describe("Dynamic contract restart resistance test", () => { ~maxAddrInPartition=Env.maxAddrInPartition, ) - let restartedFetchState = switch restartedChainFetcher.fetchState.partitions { + let restartedFetchState = switch restartedChainFetcher.partitionedFetchState.partitions { | [partition] => partition | _ => failwith("No partitions found in restarted chain fetcher") } @@ -242,7 +242,7 @@ describe("Dynamic contract restart resistance test", () => { ~maxAddrInPartition=Env.maxAddrInPartition, ) - let restartedFetchState = switch restartedChainFetcher.fetchState.partitions { + let restartedFetchState = switch restartedChainFetcher.partitionedFetchState.partitions { | [partition] => partition | _ => failwith("No partitions found in restarted chain fetcher with") } @@ -306,7 +306,7 @@ describe("Dynamic contract restart resistance test", () => { ) let restartedFetchState = - restartedChainFetcher.fetchState.partitions->Array.get(0)->Option.getExn + restartedChainFetcher.partitionedFetchState.partitions->Array.get(0)->Option.getExn let dynamicContracts = restartedFetchState.baseRegister.dynamicContracts diff --git a/scenarios/erc20_multichain_factory/test/RollbackDynamicContract_test.res b/scenarios/erc20_multichain_factory/test/RollbackDynamicContract_test.res index 776c0b988..f590e652d 100644 --- a/scenarios/erc20_multichain_factory/test/RollbackDynamicContract_test.res +++ b/scenarios/erc20_multichain_factory/test/RollbackDynamicContract_test.res @@ -154,7 +154,7 @@ describe("Dynamic contract rollback test", () => { let getFetchState = chain => { let cf = chain->getChainFetcher - cf.fetchState + cf.partitionedFetchState } let getLatestFetchedBlock = chain => { @@ -177,7 +177,8 @@ describe("Dynamic contract rollback test", () => { ->ChainMap.values ->Array.reduce( 0, - (accum, chainFetcher) => accum + chainFetcher.fetchState->PartitionedFetchState.queueSize, + (accum, chainFetcher) => + accum + chainFetcher.partitionedFetchState->PartitionedFetchState.queueSize, ) } diff --git a/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res b/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res index 918df84b9..5c7f3210f 100644 --- a/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res +++ b/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res @@ -252,7 +252,7 @@ describe("Multichain rollback test", () => { let getFetchState = chain => { let cf = chain->getChainFetcher - cf.fetchState + cf.partitionedFetchState } let getLatestFetchedBlock = chain => { @@ -275,7 +275,8 @@ describe("Multichain rollback test", () => { ->ChainMap.values ->Array.reduce( 0, - (accum, chainFetcher) => accum + chainFetcher.fetchState->PartitionedFetchState.queueSize, + (accum, chainFetcher) => + accum + chainFetcher.partitionedFetchState->PartitionedFetchState.queueSize, ) } diff --git a/scenarios/helpers/src/Indexer.res b/scenarios/helpers/src/Indexer.res index d7d1092f2..d34791c9a 100644 --- a/scenarios/helpers/src/Indexer.res +++ b/scenarios/helpers/src/Indexer.res @@ -68,8 +68,10 @@ module type S = { module FetchState: { type id + type kind type nextQuery = { fetchStateRegisterId: id, + kind: kind, partitionId: int, fromBlock: int, toBlock: option, diff --git a/scenarios/test_codegen/test/ChainManager_test.res b/scenarios/test_codegen/test/ChainManager_test.res index dd7b11e7d..1f2a90f8a 100644 --- a/scenarios/test_codegen/test/ChainManager_test.res +++ b/scenarios/test_codegen/test/ChainManager_test.res @@ -26,6 +26,8 @@ let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ()) ~staticContracts=[], ~dynamicContractRegistrations=[], ~startBlock=0, + ~hasWildcard=false, + ~isPreRegisteringDynamicContracts=false, ~logger=Logging.logger, ) @@ -121,10 +123,11 @@ let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ()) latestProcessedBlock: None, numEventsProcessed: 0, numBatchesFetched: 0, - fetchState: fetchState.contents, + partitionedFetchState: fetchState.contents, logger: Logging.logger, sourceManager: SourceManager.make( ~maxPartitionConcurrency=Env.maxPartitionConcurrency, + ~endBlock=None, ~logger=Logging.logger, ), chainConfig, @@ -246,10 +249,10 @@ describe("ChainManager", () => { // ) let nextChainFetchers = chainManager.chainFetchers->ChainMap.mapWithKey( (chain, fetcher) => { - let {partitionedFetchState: fetchState} = fetchStatesMap->ChainMap.get(chain) + let {partitionedFetchState} = fetchStatesMap->ChainMap.get(chain) { ...fetcher, - fetchState, + partitionedFetchState, } }, ) @@ -276,7 +279,7 @@ describe("ChainManager", () => { ->Belt.Array.reduce( 0, (accum, val) => { - accum + val.fetchState->PartitionedFetchState.queueSize + accum + val.partitionedFetchState->PartitionedFetchState.queueSize }, ) @@ -322,8 +325,10 @@ describe("determineNextEvent", () => { } let makeMockFetchState = (~latestFetchedBlockTimestamp, ~item): FetchState.t => { + partitionId: 0, + kind: Normal, baseRegister: { - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, latestFetchedBlock: { blockTimestamp: latestFetchedBlockTimestamp, blockNumber: 0, diff --git a/scenarios/test_codegen/test/HyperSyncWorker_test.res b/scenarios/test_codegen/test/HyperSyncWorker_test.res index 0200cae5f..81e56f8f2 100644 --- a/scenarios/test_codegen/test/HyperSyncWorker_test.res +++ b/scenarios/test_codegen/test/HyperSyncWorker_test.res @@ -71,7 +71,7 @@ describe("HyperSyncWorker - getNextPage", () => { ~toBlock=2, ~logger=Logging.logger, ~contractAddressMapping=ContractAddressingMap.make(), - ~shouldApplyWildcards=true, + ~kind=Normal, ~isPreRegisteringDynamicContracts=false, ) @@ -120,7 +120,7 @@ describe("HyperSyncWorker - getNextPage", () => { ~toBlock=2, ~logger=Logging.logger, ~contractAddressMapping=ContractAddressingMap.make(), - ~shouldApplyWildcards=true, + ~kind=Normal, ~isPreRegisteringDynamicContracts=false, ) diff --git a/scenarios/test_codegen/test/lib_tests/FetchState_test.res b/scenarios/test_codegen/test/lib_tests/FetchState_test.res index b24bc39dc..b9c8fab2d 100644 --- a/scenarios/test_codegen/test/lib_tests/FetchState_test.res +++ b/scenarios/test_codegen/test/lib_tests/FetchState_test.res @@ -53,6 +53,8 @@ let getDynContractId = ( } let makeMockFetchState = (baseRegister, ~isFetchingAtHead=false) => { + partitionId: 0, + kind: Normal, baseRegister, pendingDynamicContracts: [], isFetchingAtHead, @@ -86,8 +88,8 @@ describe("FetchState.fetchState", () => { it("dynamic contract registration", () => { let root = make( + ~partitionId=0, ~startBlock=10_000, - ~endBlock=None, ~staticContracts=[((Gravatar :> string), mockAddress1)], ~dynamicContractRegistrations=[], ~isFetchingAtHead=false, @@ -234,7 +236,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, }, }), } @@ -255,7 +257,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=1, ~logIndex=2), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } Assert.deepEqual(fetchState->mergeIntoNextRegistered, expected) @@ -275,7 +277,7 @@ describe("FetchState.fetchState", () => { firstEventBlockNumber: Some(1), dynamicContracts: DynamicContractsMap.empty, fetchedEventQueue: currentEvents, - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let fetchState = makeMockFetchState(root) @@ -413,7 +415,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, }, }), } @@ -441,7 +443,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=105), mockEvent(~blockNumber=101, ~logIndex=2), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let dynamicContractRegistration = { registeringEventBlockNumber: 100, @@ -482,7 +484,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=5), mockEvent(~blockNumber=1, ~logIndex=2), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let fetchState = baseRegister->makeMockFetchState @@ -515,7 +517,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=5), mockEvent(~blockNumber=4, ~logIndex=2), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let baseRegister = { @@ -566,7 +568,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=5), mockEvent(~blockNumber=1, ~logIndex=2), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let dynamicContractRegistration = { registeringEventBlockNumber: 100, @@ -607,23 +609,23 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let fetchState = { + partitionId: 0, + kind: Normal, baseRegister: root, pendingDynamicContracts: [], isFetchingAtHead: false, } - let partitionId = 0 - let nextQuery = fetchState->getNextQuery(~partitionId) - Assert.deepEqual( - nextQuery, + fetchState->getNextQuery(~endBlock=None), NextQuery({ + partitionId: 0, + kind: Normal, fetchStateRegisterId: Root, - partitionId, fromBlock: root.latestFetchedBlock.blockNumber + 1, toBlock: None, contractAddressMapping: root.contractAddressMapping, @@ -639,11 +641,11 @@ describe("FetchState.fetchState", () => { blockTimestamp: 0, }, fetchedEventQueue: [], - registerType: RootRegister({endBlock: Some(500)}), + registerType: RootRegister, }, } - let nextQuery = endblockCase->getNextQuery(~partitionId) + let nextQuery = endblockCase->getNextQuery(~endBlock=Some(500)) Assert.deepEqual(Done, nextQuery) }) @@ -678,12 +680,14 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, }, }), } let fetchState = { + partitionId: 0, + kind: Normal, baseRegister, pendingDynamicContracts: [], isFetchingAtHead: false, @@ -708,17 +712,17 @@ describe("FetchState.fetchState", () => { firstEventBlockNumber: None, dynamicContracts: DynamicContractsMap.empty, fetchedEventQueue: [mockEvent(~blockNumber=140), mockEvent(~blockNumber=99)], - registerType: RootRegister({endBlock: Some(150)}), + registerType: RootRegister, } - case1->makeMockFetchState->isActivelyIndexing->Assert.equal(true) + case1->makeMockFetchState->isActivelyIndexing(~endBlock=Some(150))->Assert.equal(true) let case2 = { ...case1, fetchedEventQueue: [], } - case2->makeMockFetchState->isActivelyIndexing->Assert.equal(false) + case2->makeMockFetchState->isActivelyIndexing(~endBlock=Some(150))->Assert.equal(false) let case3 = { ...case2, @@ -728,21 +732,21 @@ describe("FetchState.fetchState", () => { }), } - case3->makeMockFetchState->isActivelyIndexing->Assert.equal(true) + case3->makeMockFetchState->isActivelyIndexing(~endBlock=Some(150))->Assert.equal(true) let case4 = { ...case1, - registerType: RootRegister({endBlock: Some(151)}), + registerType: RootRegister, } - case4->makeMockFetchState->isActivelyIndexing->Assert.equal(true) + case4->makeMockFetchState->isActivelyIndexing(~endBlock=Some(151))->Assert.equal(true) let case5 = { ...case1, - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } - case5->makeMockFetchState->isActivelyIndexing->Assert.equal(true) + case5->makeMockFetchState->isActivelyIndexing(~endBlock=None)->Assert.equal(true) }) it("rolls back", () => { @@ -757,7 +761,7 @@ describe("FetchState.fetchState", () => { firstEventBlockNumber: None, dynamicContracts: DynamicContractsMap.empty, fetchedEventQueue: [mockEvent(~blockNumber=140), mockEvent(~blockNumber=99)], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let register2 = { @@ -843,7 +847,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, }, }), } @@ -854,7 +858,6 @@ describe("FetchState.fetchState", () => { it( "Adding dynamic between two registers while query is mid flight does no result in early merged registers", () => { - let partitionId = 0 let currentBlockHeight = 600 let chainId = 1 let chain = ChainMap.Chain.makeUnsafe(~chainId) @@ -871,7 +874,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let mockFetchState = rootRegister->makeMockFetchState @@ -890,7 +893,7 @@ describe("FetchState.fetchState", () => { let withAddedDynamicContractRegisterA = withRegisteredDynamicContractA->mergeRegistersBeforeNextQuery //Received query - let queryA = switch withAddedDynamicContractRegisterA->getNextQuery(~partitionId) { + let queryA = switch withAddedDynamicContractRegisterA->getNextQuery(~endBlock=None) { | NextQuery(queryA) => switch queryA { | { @@ -934,7 +937,7 @@ describe("FetchState.fetchState", () => { ) ->Utils.unwrapResultExn - switch updatesWithResponseFromQueryA->getNextQuery(~partitionId) { + switch updatesWithResponseFromQueryA->getNextQuery(~endBlock=None) { | NextQuery({ fetchStateRegisterId: DynamicContract({blockNumber: 200, logIndex: 0}), fromBlock: 200, diff --git a/scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res b/scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res index 538c93c78..4b2fcd4ed 100644 --- a/scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res +++ b/scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res @@ -1,7 +1,7 @@ open Belt open RescriptMocha -describe("PartitionedFetchState getMostBehindPartitions", () => { +describe("PartitionedFetchState", () => { let mockPartitionedFetchState = (~partitions, ~maxAddrInPartition=1): PartitionedFetchState.t => { { partitions, @@ -12,6 +12,185 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { } } + it("Create PartitionedFetchState without predefined contracts", () => { + let partitionedFetchState = PartitionedFetchState.make( + ~maxAddrInPartition=2, + ~dynamicContractRegistrations=[], + ~startBlock=0, + ~endBlock=None, + ~staticContracts=[], + ~hasWildcard=false, + ~isPreRegisteringDynamicContracts=false, + ~logger=Logging.logger, + ) + + Assert.deepEqual( + partitionedFetchState, + { + partitions: [ + FetchState.make( + ~partitionId=0, + ~staticContracts=[], + ~dynamicContractRegistrations=[], + ~isFetchingAtHead=false, + ~kind=Normal, + ~logger=Logging.logger, + ~startBlock=0, + ), + ], + maxAddrInPartition: 2, + startBlock: 0, + endBlock: None, + logger: Logging.logger, + }, + ~message="Eventhough there are no predefined contract, must create a partition", + ) + }) + + it("Create PartitionedFetchState without predefined contracts and wildcard events", () => { + let partitionedFetchState = PartitionedFetchState.make( + ~maxAddrInPartition=2, + ~dynamicContractRegistrations=[], + ~startBlock=0, + ~endBlock=None, + ~staticContracts=[], + ~hasWildcard=true, + ~isPreRegisteringDynamicContracts=false, + ~logger=Logging.logger, + ) + + // FIXME: Test next query + Assert.deepEqual( + partitionedFetchState, + { + partitions: [ + FetchState.make( + ~partitionId=0, + ~staticContracts=[], + ~dynamicContractRegistrations=[], + ~isFetchingAtHead=false, + ~kind=Wildcard, + ~logger=Logging.logger, + ~startBlock=0, + ), + FetchState.make( + ~partitionId=1, + ~staticContracts=[], + ~dynamicContractRegistrations=[], + ~isFetchingAtHead=false, + ~kind=Normal, + ~logger=Logging.logger, + ~startBlock=0, + ), + ], + maxAddrInPartition: 2, + startBlock: 0, + endBlock: None, + logger: Logging.logger, + }, + ~message="Should still create a normal partition in case we are going to add dynamic contracts", + ) + }) + + it("Create PartitionedFetchState with multiple partitions", () => { + let contractRegistration1: TablesStatic.DynamicContractRegistry.t = { + id: "dcr1", + chainId: 137, + registeringEventBlockNumber: 10, + registeringEventBlockTimestamp: 10 * 15, + registeringEventLogIndex: 0, + registeringEventContractName: "MockFactory", + registeringEventName: "MockCreateGravatar", + registeringEventSrcAddress: TestHelpers.Addresses.mockAddresses[0]->Option.getExn, + contractAddress: TestHelpers.Addresses.mockAddresses[5]->Option.getExn, + contractType: Enums.ContractType.Gravatar, + } + let contractRegistration2: TablesStatic.DynamicContractRegistry.t = { + id: "dcr2", + chainId: 137, + registeringEventBlockNumber: 10, + registeringEventBlockTimestamp: 10 * 15, + registeringEventLogIndex: 0, + registeringEventContractName: "MockFactory", + registeringEventName: "MockCreateGravatar", + registeringEventSrcAddress: TestHelpers.Addresses.mockAddresses[0]->Option.getExn, + contractAddress: TestHelpers.Addresses.mockAddresses[5]->Option.getExn, + contractType: Enums.ContractType.NftFactory, + } + + let partitionedFetchState = PartitionedFetchState.make( + ~maxAddrInPartition=2, + ~dynamicContractRegistrations=[contractRegistration1, contractRegistration2], + ~startBlock=0, + ~endBlock=None, + ~staticContracts=[ + ("Contract1", "0x1"->Address.unsafeFromString), + ("Contract2", "0x2"->Address.unsafeFromString), + ("Contract3", "0x3"->Address.unsafeFromString), + ("Contract1", "0x4"->Address.unsafeFromString), + ("Contract2", "0x5"->Address.unsafeFromString), + ], + ~hasWildcard=false, + ~isPreRegisteringDynamicContracts=false, + ~logger=Logging.logger, + ) + + Assert.deepEqual( + partitionedFetchState, + { + partitions: [ + FetchState.make( + ~partitionId=0, + ~staticContracts=[ + ("Contract1", "0x1"->Address.unsafeFromString), + ("Contract2", "0x2"->Address.unsafeFromString), + ], + ~dynamicContractRegistrations=[], + ~isFetchingAtHead=false, + ~kind=Normal, + ~logger=Logging.logger, + ~startBlock=0, + ), + FetchState.make( + ~partitionId=1, + ~staticContracts=[ + ("Contract3", "0x3"->Address.unsafeFromString), + ("Contract1", "0x4"->Address.unsafeFromString), + ], + ~dynamicContractRegistrations=[], + ~isFetchingAtHead=false, + ~kind=Normal, + ~logger=Logging.logger, + ~startBlock=0, + ), + FetchState.make( + ~partitionId=2, + ~staticContracts=[("Contract2", "0x5"->Address.unsafeFromString)], + ~dynamicContractRegistrations=[contractRegistration1], + ~isFetchingAtHead=false, + ~kind=Normal, + ~logger=Logging.logger, + ~startBlock=0, + ), + FetchState.make( + ~partitionId=3, + ~staticContracts=[], + ~dynamicContractRegistrations=[contractRegistration2], + ~isFetchingAtHead=false, + ~kind=Normal, + ~logger=Logging.logger, + ~startBlock=0, + ), + ], + maxAddrInPartition: 2, + startBlock: 0, + endBlock: None, + logger: Logging.logger, + }, + ~message="Create partitions for static contracts first, and then add dynamic contracts", + ) + }) + it("Partition id never changes when adding new partitions", () => { let rootContractAddressMapping = ContractAddressingMap.make() @@ -21,7 +200,7 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { } let rootRegister: FetchState.register = { - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, latestFetchedBlock: { blockNumber: 100, blockTimestamp: 100 * 15, @@ -53,6 +232,8 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { } let fetchState0: FetchState.t = { + partitionId: 0, + kind: Normal, baseRegister, isFetchingAtHead: false, pendingDynamicContracts: [], @@ -75,12 +256,7 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { ~maxPerChainQueueSize=10, ~fetchingPartitions=Utils.Set.make(), ), - [ - { - partitionId: 0, - fetchState: fetchState0, - }, - ], + [fetchState0], ~message="Should have only one partition with id 0", ) @@ -124,32 +300,28 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { ~fetchingPartitions=Utils.Set.make(), ), [ - { - partitionId: 0, - fetchState: fetchState0, - }, + fetchState0, { partitionId: 1, - fetchState: { - baseRegister: { - registerType: RootRegister({endBlock: None}), - latestFetchedBlock: {blockNumber: 0, blockTimestamp: 0}, - contractAddressMapping: ContractAddressingMap.fromArray([ - (TestHelpers.Addresses.mockAddresses[5]->Option.getExn, "Gravatar"), - ]), - fetchedEventQueue: [], - dynamicContracts: FetchState.DynamicContractsMap.empty->FetchState.DynamicContractsMap.addAddress( - { - blockNumber: 10, - logIndex: 0, - }, - TestHelpers.Addresses.mockAddresses[5]->Option.getExn, - ), - firstEventBlockNumber: None, - }, - pendingDynamicContracts: [], - isFetchingAtHead: false, + kind: Normal, + baseRegister: { + registerType: RootRegister, + latestFetchedBlock: {blockNumber: 0, blockTimestamp: 0}, + contractAddressMapping: ContractAddressingMap.fromArray([ + (TestHelpers.Addresses.mockAddresses[5]->Option.getExn, "Gravatar"), + ]), + fetchedEventQueue: [], + dynamicContracts: FetchState.DynamicContractsMap.empty->FetchState.DynamicContractsMap.addAddress( + { + blockNumber: 10, + logIndex: 0, + }, + TestHelpers.Addresses.mockAddresses[5]->Option.getExn, + ), + firstEventBlockNumber: None, }, + pendingDynamicContracts: [], + isFetchingAtHead: false, }, ], ~message="Should have a new partition with id 1", diff --git a/scenarios/test_codegen/test/lib_tests/SourceManager_test.res b/scenarios/test_codegen/test/lib_tests/SourceManager_test.res index ff4ac6b20..e8812e21f 100644 --- a/scenarios/test_codegen/test/lib_tests/SourceManager_test.res +++ b/scenarios/test_codegen/test/lib_tests/SourceManager_test.res @@ -67,10 +67,10 @@ let onNewBlockMock = () => { describe("SourceManager fetchBatch", () => { let mockFetchState = ( + ~partitionId, ~latestFetchedBlockNumber, ~fetchedEventQueue=[], ~numContracts=1, - ~endBlock=?, ): FetchState.t => { let contractAddressMapping = ContractAddressingMap.make() @@ -80,8 +80,10 @@ describe("SourceManager fetchBatch", () => { } { + partitionId, + kind: Normal, baseRegister: { - registerType: RootRegister({endBlock: endBlock}), + registerType: RootRegister, latestFetchedBlock: { blockNumber: latestFetchedBlockNumber, blockTimestamp: latestFetchedBlockNumber * 15, @@ -113,11 +115,15 @@ describe("SourceManager fetchBatch", () => { ) Async.it("Executes partitions in any order when we didn't reach concurency limit", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=None, + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=4) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState2 = mockFetchState(~latestFetchedBlockNumber=1) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) + let fetchState2 = mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=1) let executePartitionQueryMock = executePartitionQueryMock() @@ -138,6 +144,7 @@ describe("SourceManager fetchBatch", () => { [ { partitionId: 0, + kind: Normal, fetchStateRegisterId: fetchState0.baseRegister->FetchState.getRegisterId, fromBlock: 5, toBlock: None, @@ -145,6 +152,7 @@ describe("SourceManager fetchBatch", () => { }, { partitionId: 1, + kind: Normal, fetchStateRegisterId: fetchState0.baseRegister->FetchState.getRegisterId, fromBlock: 6, toBlock: None, @@ -152,6 +160,7 @@ describe("SourceManager fetchBatch", () => { }, { partitionId: 2, + kind: Normal, fetchStateRegisterId: fetchState0.baseRegister->FetchState.getRegisterId, fromBlock: 2, toBlock: None, @@ -174,11 +183,15 @@ describe("SourceManager fetchBatch", () => { Async.it( "Slices partitions to the concurrency limit, takes the earliest queries first", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=2, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=2, + ~endBlock=None, + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=4) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState2 = mockFetchState(~latestFetchedBlockNumber=1) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) + let fetchState2 = mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=1) let executePartitionQueryMock = executePartitionQueryMock() @@ -209,12 +222,16 @@ describe("SourceManager fetchBatch", () => { ) Async.it("Skips partitions at the chain last block and the ones at the endBlock", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=Some(5), + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=4) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState2 = mockFetchState(~latestFetchedBlockNumber=1) - let fetchState3 = mockFetchState(~latestFetchedBlockNumber=4, ~endBlock=4) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) + let fetchState2 = mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=1) + let fetchState3 = mockFetchState(~partitionId=3, ~latestFetchedBlockNumber=4) let executePartitionQueryMock = executePartitionQueryMock() @@ -222,7 +239,7 @@ describe("SourceManager fetchBatch", () => { sourceManager->SourceManager.fetchBatch( ~allPartitions=[fetchState0, fetchState1, fetchState2, fetchState3], ~maxPerChainQueueSize=1000, - ~currentBlockHeight=5, + ~currentBlockHeight=4, ~setMergedPartitions=noopSetMergedPartitions, ~executePartitionQuery=executePartitionQueryMock.fn, ~waitForNewBlock=neverWaitForNewBlock, @@ -230,13 +247,13 @@ describe("SourceManager fetchBatch", () => { ~stateId=0, ) - Assert.deepEqual(executePartitionQueryMock.calls->Js.Array2.map(q => q.partitionId), [0, 2]) + Assert.deepEqual(executePartitionQueryMock.calls->Js.Array2.map(q => q.partitionId), [2]) executePartitionQueryMock.resolveAll() Assert.deepEqual( executePartitionQueryMock.calls->Js.Array2.length, - 2, + 1, ~message="Shouldn't have called more after resolving prev promises", ) @@ -244,14 +261,18 @@ describe("SourceManager fetchBatch", () => { }) Async.it("Starts indexing from the initial state", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=None, + ~logger=Logging.logger, + ) let waitForNewBlockMock = waitForNewBlockMock() let onNewBlockMock = onNewBlockMock() let fetchBatchPromise1 = sourceManager->SourceManager.fetchBatch( - ~allPartitions=[mockFetchState(~latestFetchedBlockNumber=0)], + ~allPartitions=[mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=0)], ~maxPerChainQueueSize=1000, ~currentBlockHeight=0, ~setMergedPartitions=noopSetMergedPartitions, @@ -271,7 +292,7 @@ describe("SourceManager fetchBatch", () => { // Can wait the second time let fetchBatchPromise2 = sourceManager->SourceManager.fetchBatch( - ~allPartitions=[mockFetchState(~latestFetchedBlockNumber=20)], + ~allPartitions=[mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=20)], ~maxPerChainQueueSize=1000, ~currentBlockHeight=20, ~setMergedPartitions=noopSetMergedPartitions, @@ -292,14 +313,18 @@ describe("SourceManager fetchBatch", () => { Async.it( "Waits for new block with currentBlockHeight=0 even when all partitions are done", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=Some(5), + ~logger=Logging.logger, + ) let waitForNewBlockMock = waitForNewBlockMock() let onNewBlockMock = onNewBlockMock() let fetchBatchPromise1 = sourceManager->SourceManager.fetchBatch( - ~allPartitions=[mockFetchState(~latestFetchedBlockNumber=5, ~endBlock=5)], + ~allPartitions=[mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=5)], ~maxPerChainQueueSize=1000, ~currentBlockHeight=0, ~setMergedPartitions=noopSetMergedPartitions, @@ -319,10 +344,14 @@ describe("SourceManager fetchBatch", () => { ) Async.it("Waits for new block when all partitions are at the currentBlockHeight", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=None, + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=5) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) let waitForNewBlockMock = waitForNewBlockMock() let onNewBlockMock = onNewBlockMock() @@ -366,12 +395,16 @@ describe("SourceManager fetchBatch", () => { }) Async.it("Can add new partitions until the concurrency limit reached", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=3, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=3, + ~endBlock=None, + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=4) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState2 = mockFetchState(~latestFetchedBlockNumber=2) - let fetchState3 = mockFetchState(~latestFetchedBlockNumber=1) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) + let fetchState2 = mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=2) + let fetchState3 = mockFetchState(~partitionId=3, ~latestFetchedBlockNumber=1) let executePartitionQueryMock = executePartitionQueryMock() @@ -472,8 +505,8 @@ describe("SourceManager fetchBatch", () => { // but we've alredy called them with the same query await sourceManager->SourceManager.fetchBatch( ~allPartitions=[ - mockFetchState(~latestFetchedBlockNumber=10), - mockFetchState(~latestFetchedBlockNumber=10), + mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=10), + mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=10), fetchState2, fetchState3, ], @@ -498,24 +531,30 @@ describe("SourceManager fetchBatch", () => { }) Async.it("Should not query partitions that are at max queue size", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=None, + ~logger=Logging.logger, + ) let executePartitionQueryMock = executePartitionQueryMock() let fetchBatchPromise = sourceManager->SourceManager.fetchBatch( ~allPartitions=[ - mockFetchState(~latestFetchedBlockNumber=4), - mockFetchState(~latestFetchedBlockNumber=5), + mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4), + mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5), mockFetchState( + ~partitionId=2, ~latestFetchedBlockNumber=1, ~fetchedEventQueue=["mockEvent1", "mockEvent2", "mockEvent3"]->Utils.magic, ), mockFetchState( + ~partitionId=3, ~latestFetchedBlockNumber=2, ~fetchedEventQueue=["mockEvent4", "mockEvent5"]->Utils.magic, ), - mockFetchState(~latestFetchedBlockNumber=3), + mockFetchState(~partitionId=4, ~latestFetchedBlockNumber=3), ], ~maxPerChainQueueSize=10, //each partition should therefore have a max of 2 events ~currentBlockHeight=10, @@ -538,7 +577,11 @@ describe("SourceManager fetchBatch", () => { }) Async.it("Sorts after all the filtering is applied", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=1, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=1, + ~endBlock=Some(11), + ~logger=Logging.logger, + ) let executePartitionQueryMock = executePartitionQueryMock() @@ -546,15 +589,16 @@ describe("SourceManager fetchBatch", () => { ~allPartitions=[ // Exceeds max queue size mockFetchState( + ~partitionId=0, ~latestFetchedBlockNumber=0, ~fetchedEventQueue=["mockEvent1", "mockEvent2", "mockEvent3"]->Utils.magic, ), // Finished fetching to endBlock - mockFetchState(~latestFetchedBlockNumber=2, ~endBlock=2), + mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=11), // Waiting for new block - mockFetchState(~latestFetchedBlockNumber=10), - mockFetchState(~latestFetchedBlockNumber=6), - mockFetchState(~latestFetchedBlockNumber=4), + mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=10), + mockFetchState(~partitionId=3, ~latestFetchedBlockNumber=6), + mockFetchState(~partitionId=4, ~latestFetchedBlockNumber=4), ], ~maxPerChainQueueSize=10, //each partition should therefore have a max of 2 events ~currentBlockHeight=10, diff --git a/scenarios/test_codegen/test/rollback/Rollback_test.res b/scenarios/test_codegen/test/rollback/Rollback_test.res index 662bb8c59..cda96da69 100644 --- a/scenarios/test_codegen/test/rollback/Rollback_test.res +++ b/scenarios/test_codegen/test/rollback/Rollback_test.res @@ -193,7 +193,7 @@ describe("Single Chain Simple Rollback", () => { ) Assert.equal( - getChainFetcher().fetchState->PartitionedFetchState.queueSize, + getChainFetcher().partitionedFetchState->PartitionedFetchState.queueSize, 3, ~message="should have 3 events on the queue from the first 3 blocks of inital chainData", ) @@ -256,7 +256,7 @@ describe("Single Chain Simple Rollback", () => { ) Assert.equal( - getChainFetcher().fetchState->PartitionedFetchState.queueSize, + getChainFetcher().partitionedFetchState->PartitionedFetchState.queueSize, 3, ~message="should have 3 events on the queue from the first 3 blocks of inital chainData", )