Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Fetch State to allow dynamically manage partitions #405

Merged
merged 23 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions codegenerator/cli/npm/envio/src/ReorgDetection.res
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ type blockData = {
blockTimestamp: int,
}

type reorgGuard = {
lastBlockScannedData: blockData,
firstBlockParentNumberAndHash: option<blockNumberAndHash>,
}

module LastBlockScannedHashes: {
type t
/**Instantiat t with existing data*/
Expand Down Expand Up @@ -69,7 +74,7 @@ module LastBlockScannedHashes: {

let getAllBlockNumbers: t => Belt.Array.t<int>

let hasReorgOccurred: (t, ~firstBlockParentNumberAndHash: option<blockNumberAndHash>) => bool
let hasReorgOccurred: (t, ~reorgGuard: reorgGuard) => bool

/**
Return a BlockNumbersAndHashes.t rolled back to where blockData is less
Expand Down Expand Up @@ -402,31 +407,34 @@ module LastBlockScannedHashes: {
/**
Checks whether reorg has occured by comparing the parent hash with the last saved block hash.
*/
let rec hasReorgOccurredInternal = (
lastBlockScannedDataList,
~firstBlockParentNumberAndHash: option<blockNumberAndHash>,
) => {
switch (firstBlockParentNumberAndHash, lastBlockScannedDataList) {
| (Some({blockHash: parentHash, blockNumber: parentBlockNumber}), list{head, ...tail}) =>
if parentBlockNumber == head.blockNumber {
parentHash != head.blockHash
} else {
//if block numbers do not match, this is a dynamic contract case and should recurse
//through the list to look for a matching block or nothing to validate
tail->hasReorgOccurredInternal(~firstBlockParentNumberAndHash)
}
| _ => //If parentHash is None, either it's the genesis block (no reorg)
let rec hasReorgOccurredInternal = (lastBlockScannedDataList, ~reorgGuard: reorgGuard) => {
switch lastBlockScannedDataList {
| list{head, ...tail} =>
switch reorgGuard {
| {lastBlockScannedData} if lastBlockScannedData.blockNumber == head.blockNumber =>
lastBlockScannedData.blockHash != head.blockHash
//If parentHash is None, either it's the genesis block (no reorg)
//Or its already confirmed so no Reorg
//If recentLastBlockData is None, we have not yet saved blockData to compare against
false
| {firstBlockParentNumberAndHash: None} => false
| {
firstBlockParentNumberAndHash: Some({
blockHash: parentHash,
blockNumber: parentBlockNumber,
}),
} =>
if parentBlockNumber == head.blockNumber {
parentHash != head.blockHash
} else {
//if block numbers do not match, this is a dynamic contract case and should recurse
//through the list to look for a matching block or nothing to validate
tail->hasReorgOccurredInternal(~reorgGuard)
}
}
//If recentLastBlockData is None, we have not yet saved blockData to compare against
| _ => false
}
}

let hasReorgOccurred = (
lastBlockScannedHashes: t,
~firstBlockParentNumberAndHash: option<blockNumberAndHash>,
) =>
lastBlockScannedHashes.lastBlockScannedDataList->hasReorgOccurredInternal(
~firstBlockParentNumberAndHash,
)
let hasReorgOccurred = (lastBlockScannedHashes: t, ~reorgGuard: reorgGuard) =>
lastBlockScannedHashes.lastBlockScannedDataList->hasReorgOccurredInternal(~reorgGuard)
}
37 changes: 30 additions & 7 deletions codegenerator/cli/npm/envio/src/Utils.res
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,35 @@ module Dict = {

let merge: (dict<'a>, dict<'a>) => dict<'a> = %raw(`(dictA, dictB) => ({...dictA, ...dictB})`)

let map = (dict, fn) => {
let newDict = Js.Dict.empty()
let keys = dict->Js.Dict.keys
for idx in 0 to keys->Js.Array2.length - 1 {
let key = keys->Js.Array2.unsafe_get(idx)
newDict->Js.Dict.set(key, fn(dict->Js.Dict.unsafeGet(key)))
}
newDict
}

let forEach = (dict, fn) => {
let keys = dict->Js.Dict.keys
for idx in 0 to keys->Js.Array2.length - 1 {
fn(dict->Js.Dict.unsafeGet(keys->Js.Array2.unsafe_get(idx)))
}
}

let deleteInPlace: (dict<'a>, string) => unit = %raw(`(dict, key) => {
delete dict[key];
}
`)

let updateImmutable: (
dict<'a>,
string,
'a,
) => dict<'a> = %raw(`(dict, key, value) => ({...dict, [key]: value})`)

let shallowCopy: dict<'a> => dict<'a> = %raw(`(dict) => ({...dict})`)
}

module Math = {
Expand Down Expand Up @@ -156,13 +180,9 @@ Helper to check if a value exists in an array
Index > array length or < 0 results in a copy of the array
*/
let removeAtIndex = (array, index) => {
if index < 0 || index >= array->Array.length {
array->Array.copy
} else {
let head = array->Js.Array2.slice(~start=0, ~end_=index)
let tail = array->Belt.Array.sliceToEnd(index + 1)
Belt.Array.concat(head, tail)
}
array
->Js.Array2.slice(~start=0, ~end_=index)
->Js.Array2.concat(array->Js.Array2.sliceFrom(index + 1))
}

let last = (arr: array<'a>): option<'a> => arr->Belt.Array.get(arr->Array.length - 1)
Expand Down Expand Up @@ -205,6 +225,9 @@ Helper to check if a value exists in an array
})
interleaved
}

@send
external flatten: (array<array<'a>>, @as(json`1`) _) => array<'a> = "flat"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, never done a binding like this with hardcoded params :) does @as(1) not work as well out of interest?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it works 😁 I didn't know that it does :)

}

module String = {
Expand Down
7 changes: 2 additions & 5 deletions codegenerator/cli/templates/static/codegen/src/Benchmark.res
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,10 @@ let addBlockRangeFetched = (
~chainId,
~fromBlock,
~toBlock,
~fetchStateRegisterId: FetchState.id,
~numEvents,
~partitionId,
) => {
let registerName = fetchStateRegisterId->FetchState.registerIdToName

let group = `BlockRangeFetched Summary for Chain ${chainId->Belt.Int.toString} ${registerName} Register`
let group = `BlockRangeFetched Summary for Chain ${chainId->Belt.Int.toString} Partition ${partitionId}`
let add = (label, value) => data->Data.addSummaryData(~group, ~label, ~value=Utils.magic(value))

add("Total Time Elapsed (ms)", totalTimeElapsed)
Expand All @@ -281,7 +278,7 @@ let addBlockRangeFetched = (
add("Block Range Size", toBlock - fromBlock)

data->Data.incrementMillis(
~label=`Total Time Fetching Chain ${chainId->Belt.Int.toString} Partition ${partitionId->Belt.Int.toString}`,
~label=`Total Time Fetching Chain ${chainId->Belt.Int.toString} Partition ${partitionId}`,
~amount=totalTimeElapsed,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,21 @@ let getAllAddresses = (mapping: mapping) => {
mapping.nameByAddress->Js.Dict.keys->stringsToAddresses
}

let combine = (a, b) => {
let m = make()
[a, b]->Belt.Array.forEach(v =>
v.nameByAddress
->Js.Dict.entries
->Belt.Array.forEach(((addr, name)) => {
m->addAddress(~address=addr->Utils.magic, ~name)
})
)
m
let copy = (mapping: mapping) => {
{
nameByAddress: mapping.nameByAddress->Utils.Dict.shallowCopy,
// Since Belt.Set.String.t is immutable, we can simply do shallow copy here
addressesByName: mapping.addressesByName->Utils.Dict.shallowCopy,
}
}

let mergeInPlace = (map, ~target) => {
map.nameByAddress
->Js.Dict.keys
->Belt.Array.forEach(addr => {
let name = map.nameByAddress->Js.Dict.unsafeGet(addr)
target->addAddress(~address=addr->Address.unsafeFromString, ~name)
})
}

let fromArray = (nameAddrTuples: array<(Address.t, string)>) => {
Expand Down
5 changes: 2 additions & 3 deletions codegenerator/cli/templates/static/codegen/src/Env.res
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ let updateSyncTimeOnRestart =
let maxEventFetchedQueueSize = envSafe->EnvSafe.get("MAX_QUEUE_SIZE", S.int, ~fallback=100_000)
let maxProcessBatchSize = envSafe->EnvSafe.get("MAX_BATCH_SIZE", S.int, ~fallback=5_000)
let maxAddrInPartition = envSafe->EnvSafe.get("MAX_PARTITION_SIZE", S.int, ~fallback=5_000)
let maxPartitionConcurrency =
envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10)

let metricsPort = envSafe->EnvSafe.get("METRICS_PORT", S.int->S.port, ~devFallback=9898)

Expand Down Expand Up @@ -88,9 +90,6 @@ module Benchmark = {
saveDataStrategy->SaveDataStrategy.shouldSaveJsonFile
}

let maxPartitionConcurrency =
envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10)

type logStrategyType =
| @as("ecs-file") EcsFile
| @as("ecs-console") EcsConsole
Expand Down
61 changes: 28 additions & 33 deletions codegenerator/cli/templates/static/codegen/src/EventProcessing.res
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,15 @@ let updateEventSyncState = (
)
}

type dynamicContractRegistration = FetchState.dynamicContractRegistration

type dynamicContractRegistrations = {
registrations: array<dynamicContractRegistration>,
dynamicContractsByChain: dict<array<TablesStatic.DynamicContractRegistry.t>>,
unprocessedBatch: array<Internal.eventItem>,
}

let addToDynamicContractRegistrations = (
eventItem: Internal.eventItem,
~dynamicContracts,
~registeringEventBlockNumber,
~registeringEventLogIndex,
~registrations,
~dynamicContractsByChain,
~unprocessedBatch,
) => {
//If there are any dynamic contract registrations, put this item in the unprocessedBatch flagged
Expand All @@ -81,21 +77,22 @@ let addToDynamicContractRegistrations = (
},
]

let registrations = switch dynamicContracts {
| [] => registrations
switch dynamicContracts {
| [] => ()
| dynamicContracts =>
let dynamicContractRegistration = {
FetchState.dynamicContracts,
registeringEventBlockNumber,
registeringEventLogIndex,
registeringEventChain: eventItem.chain,
}
[...registrations, dynamicContractRegistration]
let key = eventItem.chain->ChainMap.Chain.toString
dynamicContractsByChain->Js.Dict.set(
key,
dynamicContractsByChain
->Utils.Dict.dangerouslyGetNonOption(key)
->Option.getWithDefault([])
->Array.concat(dynamicContracts),
)
}

{
unprocessedBatch,
registrations,
dynamicContractsByChain,
}
}

Expand All @@ -107,13 +104,14 @@ let checkContractIsInCurrentRegistrations = (
) => {
switch dynamicContractRegistrations {
| Some(dynamicContracts) =>
dynamicContracts.registrations->Array.some(d =>
d.dynamicContracts->Array.some(d =>
d.chainId == chain->ChainMap.Chain.toChainId &&
d.contractType == contractType &&
d.contractAddress == contractAddress
dynamicContracts.dynamicContractsByChain
->Utils.Dict.dangerouslyGetNonOption(chain->ChainMap.Chain.toString)
->Option.mapWithDefault(false, dcs =>
dcs->Array.some(dc =>
dc.contractType == contractType && dc.contractAddress == contractAddress
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we need to remove this check. I removed it in all the other places I think this was left by mistake. We are not allowing the same address registered to multiple contracts.

Suggested change
dc.contractType == contractType && dc.contractAddress == contractAddress
dc.contractAddress == contractAddress

)
)

| None => false
}
}
Expand All @@ -128,7 +126,7 @@ let runEventContractRegister = (
~preRegisterLatestProcessedBlocks=?,
~shouldSaveHistory,
) => {
let {chain, event, blockNumber} = eventItem
let {chain} = eventItem

let contextEnv = ContextEnv.make(~eventItem, ~logger)

Expand Down Expand Up @@ -156,23 +154,20 @@ let runEventContractRegister = (
)
)

let addToDynamicContractRegistrations =
eventItem->(
addToDynamicContractRegistrations(
~registeringEventBlockNumber=blockNumber,
~registeringEventLogIndex=event.logIndex,
...
)
)
let addToDynamicContractRegistrations = addToDynamicContractRegistrations(eventItem, ...)

let val = switch (dynamicContracts, dynamicContractRegistrations) {
| ([], None) => None
| (dynamicContracts, Some({registrations, unprocessedBatch})) =>
addToDynamicContractRegistrations(~dynamicContracts, ~registrations, ~unprocessedBatch)->Some
| (dynamicContracts, Some({dynamicContractsByChain, unprocessedBatch})) =>
addToDynamicContractRegistrations(
~dynamicContracts,
~dynamicContractsByChain,
~unprocessedBatch,
)->Some
| (dynamicContracts, None) =>
addToDynamicContractRegistrations(
~dynamicContracts,
~registrations=[],
~dynamicContractsByChain=Js.Dict.empty(),
~unprocessedBatch=[],
)->Some
}
Expand Down
4 changes: 2 additions & 2 deletions codegenerator/cli/templates/static/codegen/src/Prometheus.res
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ module BenchmarkCounters = {
}

module PartitionBlockFetched = {
type labels = {chainId: int, partitionId: int}
type labels = {chainId: int, partitionId: string}
let intAsString = S.string->S.transform(s => {
serializer: int => int->Belt.Int.toString,
parser: string =>
Expand All @@ -213,7 +213,7 @@ module PartitionBlockFetched = {

let labelSchema = S.schema(s => {
chainId: s.matches(intAsString),
partitionId: s.matches(intAsString),
partitionId: s.matches(S.string),
})

let counter = SafeGauge.makeOrThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ let make = (
{
logger,
chainConfig,
sourceManager: SourceManager.make(~maxPartitionConcurrency=Env.maxPartitionConcurrency, ~endBlock, ~logger),
sourceManager: SourceManager.make(
~maxPartitionConcurrency=Env.maxPartitionConcurrency,
~endBlock,
~logger,
),
lastBlockScannedHashes,
currentBlockHeight: 0,
partitionedFetchState,
Expand Down Expand Up @@ -320,9 +324,9 @@ Updates of fetchState and cleans up event filters. Should be used whenever updat
to ensure processingFilters are always valid.
Returns Error if the node with given id cannot be found (unexpected)
*/
let updateFetchState = (
let setQueryResponse = (
self: t,
~id,
~query: FetchState.query,
~latestFetchedBlockTimestamp,
~latestFetchedBlockNumber,
~fetchedEvents,
Expand All @@ -334,8 +338,8 @@ let updateFetchState = (
}

self.partitionedFetchState
->PartitionedFetchState.update(
~id,
->PartitionedFetchState.setQueryResponse(
~query,
~latestFetchedBlock={
blockNumber: latestFetchedBlockNumber,
blockTimestamp: latestFetchedBlockTimestamp,
Expand All @@ -349,7 +353,8 @@ let updateFetchState = (
...self,
partitionedFetchState,
processingFilters: switch self.processingFilters {
| Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~partitionedFetchState)
| Some(processingFilters) =>
processingFilters->cleanUpProcessingFilters(~partitionedFetchState)
| None => None
},
}
Expand Down
Loading
Loading