Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Fetch State to allow dynamically manage partitions #405

Merged
merged 23 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions codegenerator/cli/npm/envio/src/ReorgDetection.res
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ type blockData = {
blockTimestamp: int,
}

type reorgGuard = {
lastBlockScannedData: blockData,
firstBlockParentNumberAndHash: option<blockNumberAndHash>,
}

module LastBlockScannedHashes: {
type t
/**Instantiat t with existing data*/
Expand Down Expand Up @@ -69,7 +74,7 @@ module LastBlockScannedHashes: {

let getAllBlockNumbers: t => Belt.Array.t<int>

let hasReorgOccurred: (t, ~firstBlockParentNumberAndHash: option<blockNumberAndHash>) => bool
let hasReorgOccurred: (t, ~reorgGuard: reorgGuard) => bool

/**
Return a BlockNumbersAndHashes.t rolled back to where blockData is less
Expand Down Expand Up @@ -402,31 +407,34 @@ module LastBlockScannedHashes: {
/**
Checks whether reorg has occured by comparing the parent hash with the last saved block hash.
*/
let rec hasReorgOccurredInternal = (
lastBlockScannedDataList,
~firstBlockParentNumberAndHash: option<blockNumberAndHash>,
) => {
switch (firstBlockParentNumberAndHash, lastBlockScannedDataList) {
| (Some({blockHash: parentHash, blockNumber: parentBlockNumber}), list{head, ...tail}) =>
if parentBlockNumber == head.blockNumber {
parentHash != head.blockHash
} else {
//if block numbers do not match, this is a dynamic contract case and should recurse
//through the list to look for a matching block or nothing to validate
tail->hasReorgOccurredInternal(~firstBlockParentNumberAndHash)
}
| _ => //If parentHash is None, either it's the genesis block (no reorg)
let rec hasReorgOccurredInternal = (lastBlockScannedDataList, ~reorgGuard: reorgGuard) => {
switch lastBlockScannedDataList {
| list{head, ...tail} =>
switch reorgGuard {
| {lastBlockScannedData} if lastBlockScannedData.blockNumber == head.blockNumber =>
lastBlockScannedData.blockHash != head.blockHash
//If parentHash is None, either it's the genesis block (no reorg)
//Or its already confirmed so no Reorg
//If recentLastBlockData is None, we have not yet saved blockData to compare against
false
| {firstBlockParentNumberAndHash: None} => false
| {
firstBlockParentNumberAndHash: Some({
blockHash: parentHash,
blockNumber: parentBlockNumber,
}),
} =>
if parentBlockNumber == head.blockNumber {
parentHash != head.blockHash
} else {
//if block numbers do not match, this is a dynamic contract case and should recurse
//through the list to look for a matching block or nothing to validate
tail->hasReorgOccurredInternal(~reorgGuard)
}
}
//If recentLastBlockData is None, we have not yet saved blockData to compare against
| _ => false
}
}

let hasReorgOccurred = (
lastBlockScannedHashes: t,
~firstBlockParentNumberAndHash: option<blockNumberAndHash>,
) =>
lastBlockScannedHashes.lastBlockScannedDataList->hasReorgOccurredInternal(
~firstBlockParentNumberAndHash,
)
let hasReorgOccurred = (lastBlockScannedHashes: t, ~reorgGuard: reorgGuard) =>
lastBlockScannedHashes.lastBlockScannedDataList->hasReorgOccurredInternal(~reorgGuard)
}
37 changes: 30 additions & 7 deletions codegenerator/cli/npm/envio/src/Utils.res
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,35 @@ module Dict = {

let merge: (dict<'a>, dict<'a>) => dict<'a> = %raw(`(dictA, dictB) => ({...dictA, ...dictB})`)

let map = (dict, fn) => {
let newDict = Js.Dict.empty()
let keys = dict->Js.Dict.keys
for idx in 0 to keys->Js.Array2.length - 1 {
let key = keys->Js.Array2.unsafe_get(idx)
newDict->Js.Dict.set(key, fn(dict->Js.Dict.unsafeGet(key)))
}
newDict
}

let forEach = (dict, fn) => {
let keys = dict->Js.Dict.keys
for idx in 0 to keys->Js.Array2.length - 1 {
fn(dict->Js.Dict.unsafeGet(keys->Js.Array2.unsafe_get(idx)))
}
}

let deleteInPlace: (dict<'a>, string) => unit = %raw(`(dict, key) => {
delete dict[key];
}
`)

let updateImmutable: (
dict<'a>,
string,
'a,
) => dict<'a> = %raw(`(dict, key, value) => ({...dict, [key]: value})`)

let shallowCopy: dict<'a> => dict<'a> = %raw(`(dict) => ({...dict})`)
}

module Math = {
Expand Down Expand Up @@ -156,13 +180,9 @@ Helper to check if a value exists in an array
Index > array length or < 0 results in a copy of the array
*/
let removeAtIndex = (array, index) => {
if index < 0 || index >= array->Array.length {
array->Array.copy
} else {
let head = array->Js.Array2.slice(~start=0, ~end_=index)
let tail = array->Belt.Array.sliceToEnd(index + 1)
Belt.Array.concat(head, tail)
}
array
->Js.Array2.slice(~start=0, ~end_=index)
->Js.Array2.concat(array->Js.Array2.sliceFrom(index + 1))
}

let last = (arr: array<'a>): option<'a> => arr->Belt.Array.get(arr->Array.length - 1)
Expand Down Expand Up @@ -205,6 +225,9 @@ Helper to check if a value exists in an array
})
interleaved
}

@send
external flatten: (array<array<'a>>, @as(1) _) => array<'a> = "flat"
}

module String = {
Expand Down
7 changes: 2 additions & 5 deletions codegenerator/cli/templates/static/codegen/src/Benchmark.res
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,10 @@ let addBlockRangeFetched = (
~chainId,
~fromBlock,
~toBlock,
~fetchStateRegisterId: FetchState.id,
~numEvents,
~partitionId,
) => {
let registerName = fetchStateRegisterId->FetchState.registerIdToName

let group = `BlockRangeFetched Summary for Chain ${chainId->Belt.Int.toString} ${registerName} Register`
let group = `BlockRangeFetched Summary for Chain ${chainId->Belt.Int.toString} Partition ${partitionId}`
let add = (label, value) => data->Data.addSummaryData(~group, ~label, ~value=Utils.magic(value))

add("Total Time Elapsed (ms)", totalTimeElapsed)
Expand All @@ -281,7 +278,7 @@ let addBlockRangeFetched = (
add("Block Range Size", toBlock - fromBlock)

data->Data.incrementMillis(
~label=`Total Time Fetching Chain ${chainId->Belt.Int.toString} Partition ${partitionId->Belt.Int.toString}`,
~label=`Total Time Fetching Chain ${chainId->Belt.Int.toString} Partition ${partitionId}`,
~amount=totalTimeElapsed,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,21 @@ let getAllAddresses = (mapping: mapping) => {
mapping.nameByAddress->Js.Dict.keys->stringsToAddresses
}

let combine = (a, b) => {
let m = make()
[a, b]->Belt.Array.forEach(v =>
v.nameByAddress
->Js.Dict.entries
->Belt.Array.forEach(((addr, name)) => {
m->addAddress(~address=addr->Utils.magic, ~name)
})
)
m
let copy = (mapping: mapping) => {
{
nameByAddress: mapping.nameByAddress->Utils.Dict.shallowCopy,
// Since Belt.Set.String.t is immutable, we can simply do shallow copy here
addressesByName: mapping.addressesByName->Utils.Dict.shallowCopy,
}
}

let mergeInPlace = (map, ~target) => {
map.nameByAddress
->Js.Dict.keys
->Belt.Array.forEach(addr => {
let name = map.nameByAddress->Js.Dict.unsafeGet(addr)
target->addAddress(~address=addr->Address.unsafeFromString, ~name)
})
}

let fromArray = (nameAddrTuples: array<(Address.t, string)>) => {
Expand All @@ -91,14 +96,18 @@ let fromArray = (nameAddrTuples: array<(Address.t, string)>) => {
Creates a new mapping from the previous without the addresses passed in as "addressesToRemove"
*/
let removeAddresses = (mapping: mapping, ~addressesToRemove: array<Address.t>) => {
mapping.nameByAddress
->Js.Dict.entries
->Belt.Array.keep(((addr, _name)) => {
let shouldRemove = addressesToRemove->Utils.Array.includes(addr->Utils.magic)
!shouldRemove
})
->keyValStringToAddress
->fromArray
switch addressesToRemove {
| [] => mapping
| _ =>
mapping.nameByAddress
->Js.Dict.entries
->Belt.Array.keep(((addr, _name)) => {
let shouldRemove = addressesToRemove->Utils.Array.includes(addr->Utils.magic)
!shouldRemove
})
->keyValStringToAddress
->fromArray
}
}

let addressCount = (mapping: mapping) => mapping.nameByAddress->Js.Dict.keys->Belt.Array.length
Expand Down
5 changes: 2 additions & 3 deletions codegenerator/cli/templates/static/codegen/src/Env.res
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ let updateSyncTimeOnRestart =
let maxEventFetchedQueueSize = envSafe->EnvSafe.get("MAX_QUEUE_SIZE", S.int, ~fallback=100_000)
let maxProcessBatchSize = envSafe->EnvSafe.get("MAX_BATCH_SIZE", S.int, ~fallback=5_000)
let maxAddrInPartition = envSafe->EnvSafe.get("MAX_PARTITION_SIZE", S.int, ~fallback=5_000)
let maxPartitionConcurrency =
envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10)

let metricsPort = envSafe->EnvSafe.get("METRICS_PORT", S.int->S.port, ~devFallback=9898)

Expand Down Expand Up @@ -88,9 +90,6 @@ module Benchmark = {
saveDataStrategy->SaveDataStrategy.shouldSaveJsonFile
}

let maxPartitionConcurrency =
envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10)

type logStrategyType =
| @as("ecs-file") EcsFile
| @as("ecs-console") EcsConsole
Expand Down
61 changes: 28 additions & 33 deletions codegenerator/cli/templates/static/codegen/src/EventProcessing.res
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,15 @@ let updateEventSyncState = (
)
}

type dynamicContractRegistration = FetchState.dynamicContractRegistration

type dynamicContractRegistrations = {
registrations: array<dynamicContractRegistration>,
dynamicContractsByChain: dict<array<TablesStatic.DynamicContractRegistry.t>>,
unprocessedBatch: array<Internal.eventItem>,
}

let addToDynamicContractRegistrations = (
eventItem: Internal.eventItem,
~dynamicContracts,
~registeringEventBlockNumber,
~registeringEventLogIndex,
~registrations,
~dynamicContractsByChain,
~unprocessedBatch,
) => {
//If there are any dynamic contract registrations, put this item in the unprocessedBatch flagged
Expand All @@ -81,21 +77,22 @@ let addToDynamicContractRegistrations = (
},
]

let registrations = switch dynamicContracts {
| [] => registrations
switch dynamicContracts {
| [] => ()
| dynamicContracts =>
let dynamicContractRegistration = {
FetchState.dynamicContracts,
registeringEventBlockNumber,
registeringEventLogIndex,
registeringEventChain: eventItem.chain,
}
[...registrations, dynamicContractRegistration]
let key = eventItem.chain->ChainMap.Chain.toString
dynamicContractsByChain->Js.Dict.set(
key,
dynamicContractsByChain
->Utils.Dict.dangerouslyGetNonOption(key)
->Option.getWithDefault([])
->Array.concat(dynamicContracts),
)
}

{
unprocessedBatch,
registrations,
dynamicContractsByChain,
}
}

Expand All @@ -107,13 +104,14 @@ let checkContractIsInCurrentRegistrations = (
) => {
switch dynamicContractRegistrations {
| Some(dynamicContracts) =>
dynamicContracts.registrations->Array.some(d =>
d.dynamicContracts->Array.some(d =>
d.chainId == chain->ChainMap.Chain.toChainId &&
d.contractType == contractType &&
d.contractAddress == contractAddress
dynamicContracts.dynamicContractsByChain
->Utils.Dict.dangerouslyGetNonOption(chain->ChainMap.Chain.toString)
->Option.mapWithDefault(false, dcs =>
dcs->Array.some(dc =>
dc.contractType == contractType && dc.contractAddress == contractAddress
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we need to remove this check. I removed it in all the other places I think this was left by mistake. We are not allowing the same address registered to multiple contracts.

Suggested change
dc.contractType == contractType && dc.contractAddress == contractAddress
dc.contractAddress == contractAddress

)
)

| None => false
}
}
Expand All @@ -128,7 +126,7 @@ let runEventContractRegister = (
~preRegisterLatestProcessedBlocks=?,
~shouldSaveHistory,
) => {
let {chain, event, blockNumber} = eventItem
let {chain} = eventItem

let contextEnv = ContextEnv.make(~eventItem, ~logger)

Expand Down Expand Up @@ -156,23 +154,20 @@ let runEventContractRegister = (
)
)

let addToDynamicContractRegistrations =
eventItem->(
addToDynamicContractRegistrations(
~registeringEventBlockNumber=blockNumber,
~registeringEventLogIndex=event.logIndex,
...
)
)
let addToDynamicContractRegistrations = addToDynamicContractRegistrations(eventItem, ...)

let val = switch (dynamicContracts, dynamicContractRegistrations) {
| ([], None) => None
| (dynamicContracts, Some({registrations, unprocessedBatch})) =>
addToDynamicContractRegistrations(~dynamicContracts, ~registrations, ~unprocessedBatch)->Some
| (dynamicContracts, Some({dynamicContractsByChain, unprocessedBatch})) =>
addToDynamicContractRegistrations(
~dynamicContracts,
~dynamicContractsByChain,
~unprocessedBatch,
)->Some
| (dynamicContracts, None) =>
addToDynamicContractRegistrations(
~dynamicContracts,
~registrations=[],
~dynamicContractsByChain=Js.Dict.empty(),
~unprocessedBatch=[],
)->Some
}
Expand Down
4 changes: 2 additions & 2 deletions codegenerator/cli/templates/static/codegen/src/Prometheus.res
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ module BenchmarkCounters = {
}

module PartitionBlockFetched = {
type labels = {chainId: int, partitionId: int}
type labels = {chainId: int, partitionId: string}
let intAsString = S.string->S.transform(s => {
serializer: int => int->Belt.Int.toString,
parser: string =>
Expand All @@ -213,7 +213,7 @@ module PartitionBlockFetched = {

let labelSchema = S.schema(s => {
chainId: s.matches(intAsString),
partitionId: s.matches(intAsString),
partitionId: s.matches(S.string),
})

let counter = SafeGauge.makeOrThrow(
Expand Down
Loading