diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 531c1208e..98a681d2a 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -89,10 +89,12 @@ func NewRelayerSessions( return nil, err } - rs.sessionsToClaimObs = channel.Map( + sessionsToClaimObs, sessionsToClaimPublishCh := channel.NewObservable[[]relayer.SessionTree]() + rs.sessionsToClaimObs = sessionsToClaimObs + channel.ForEach( ctx, rs.blockClient.CommittedBlocksSequence(ctx), - rs.mapBlockToSessionsToClaim, + rs.forEachBlockClaimSessionsFn(sessionsToClaimPublishCh), ) return rs, nil @@ -163,94 +165,109 @@ func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes. return sessionTree, nil } -// mapBlockToSessionsToClaim is a MapFn which maps committed blocks to a list of -// sessions which can be claimed as of that block. +// forEachBlockClaimSessionsFn returns a new ForEachFn that sends a lists of sessions which +// are eligible to be claimed at each block height on sessionsToClaimsPublishCh, effectively +// mapping committed blocks to a list of sessions which can be claimed as of that block. +// +// forEachBlockClaimSessionsFn returns a new ForEachFn that is called once for each block height. +// Given the current sessions in the rs.sessionsTrees map at the time of each call, it: +// - fetches the current shared module params +// - builds a list of "on-time" & "late" sessions that are eligible to be claimed as of a given block height +// - sends "late" & "on-time" sessions on sessionsToClaimsPublishCh as distinct notifications +// +// If "late" sessions are found, they are emitted as quickly as possible and are expected +// to bypass downstream delay operations. "late" sessions are emitted, as they're discovered +// (by iterating over map keys). +// +// Under nominal conditions, only one set of "on-time" sessions (w/ the same session start/end heights) +// should be present in the rs.sessionsTrees map. "Late" sessions +// are expected to present in the presence of network interruptions, restarts, or other +// disruptions to the relayminer process. // TODO_IMPROVE: Add the ability for the process to resume where it left off in // case the process is restarted or the connection is dropped and reconnected. -func (rs *relayerSessionsManager) mapBlockToSessionsToClaim( - ctx context.Context, - block client.Block, -) (sessionTrees []relayer.SessionTree, skip bool) { - rs.sessionsTreesMu.Lock() - defer rs.sessionsTreesMu.Unlock() - - // onTimeSessions are the sessions that are still within their grace period. - // They are on time and will wait for their create claim window to open. - // They will be emitted last, after all the late sessions have been emitted. - var onTimeSessions []relayer.SessionTree - - // TODO_TECHDEBT(#543): We don't really want to have to query the params for every method call. - // Once `ModuleParamsClient` is implemented, use its replay observable's `#Last()` method - // to get the most recently (asynchronously) observed (and cached) value. - sharedParams, err := rs.sharedQueryClient.GetParams(ctx) - if err != nil { - rs.logger.Error().Err(err).Msg("unable to query shared module params") - return nil, true - } +func (rs *relayerSessionsManager) forEachBlockClaimSessionsFn( + sessionsToClaimsPublishCh chan<- []relayer.SessionTree, +) channel.ForEachFn[client.Block] { + return func(ctx context.Context, block client.Block) { + rs.sessionsTreesMu.Lock() + defer rs.sessionsTreesMu.Unlock() + + // onTimeSessions are the sessions that are still within their grace period. + // They are on time and will wait for their create claim window to open. + // They will be emitted last, after all the late sessions have been emitted. + var onTimeSessions []relayer.SessionTree + + // TODO_TECHDEBT(#543): We don't really want to have to query the params for every method call. + // Once `ModuleParamsClient` is implemented, use its replay observable's `#Last()` method + // to get the most recently (asynchronously) observed (and cached) value. + sharedParams, err := rs.sharedQueryClient.GetParams(ctx) + if err != nil { + rs.logger.Error().Err(err).Msg("unable to query shared module params") + return + } - numBlocksPerSession := sharedParams.NumBlocksPerSession - - // Check if there are sessions that need to enter the claim/proof phase as their - // end block height was the one before the last committed block or earlier. - // Iterate over the sessionsTrees map to get the ones that end at a block height - // lower than the current block height. - for sessionEndHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees { - // Late sessions are the ones that have their session grace period elapsed - // and should already have been claimed. - // Group them by their end block height and emit each group separately - // before emitting the on-time sessions. - var lateSessions []relayer.SessionTree - - sessionGracePeriodEndHeight := shared.GetSessionGracePeriodEndHeight(sessionEndHeight) - - // Checking for sessions to claim with <= operator, - // which means that it would include sessions that were supposed to be - // claimed in previous block heights too. - // These late sessions might have their create claim window closed and are - // no longer eligible to be claimed, but that's not always the case. - // Once claim window closing is implemented, they will be filtered out - // downstream at the waitForEarliestCreateClaimsHeight step. - // TODO_BLOCKER: Introduce governance claim and proof window durations, - // implement off-chain window closing and on-chain window checks. - if sessionGracePeriodEndHeight <= block.Height() { - // Iterate over the sessionsTrees that have grace period ending at this - // block height and add them to the list of sessionTrees to be published. - for _, sessionTree := range sessionsTreesEndingAtBlockHeight { - // Mark the session as claimed and add it to the list of sessionTrees to be published. - // If the session has already been claimed, it will be skipped. - // Appending the sessionTree to the list of sessionTrees is protected - // against concurrent access by the sessionsTreesMu such that the first - // call that marks the session as claimed will be the only one to add the - // sessionTree to the list. - if err := sessionTree.StartClaiming(); err != nil { - continue + numBlocksPerSession := sharedParams.NumBlocksPerSession + + // Check if there are sessions that need to enter the claim/proof phase as their + // end block height was the one before the last committed block or earlier. + // Iterate over the sessionsTrees map to get the ones that end at a block height + // lower than the current block height. + for sessionEndHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees { + // Late sessions are the ones that have their session grace period elapsed + // and should already have been claimed. + // Group them by their end block height and emit each group separately + // before emitting the on-time sessions. + var lateSessions []relayer.SessionTree + + sessionGracePeriodEndHeight := shared.GetSessionGracePeriodEndHeight(sessionEndHeight) + + // Checking for sessions to claim with <= operator, + // which means that it would include sessions that were supposed to be + // claimed in previous block heights too. + // These late sessions might have their create claim window closed and are + // no longer eligible to be claimed, but that's not always the case. + // Once claim window closing is implemented, they will be filtered out + // downstream at the waitForEarliestCreateClaimsHeight step. + // TODO_BLOCKER: Introduce governance claim and proof window durations, + // implement off-chain window closing and on-chain window checks. + if sessionGracePeriodEndHeight <= block.Height() { + // Iterate over the sessionsTrees that have grace period ending at this + // block height and add them to the list of sessionTrees to be published. + for _, sessionTree := range sessionsTreesEndingAtBlockHeight { + // Mark the session as claimed and add it to the list of sessionTrees to be published. + // If the session has already been claimed, it will be skipped. + // Appending the sessionTree to the list of sessionTrees is protected + // against concurrent access by the sessionsTreesMu such that the first + // call that marks the session as claimed will be the only one to add the + // sessionTree to the list. + if err := sessionTree.StartClaiming(); err != nil { + continue + } + + // Separate the sessions that are on-time from the ones that are late. + // If the session is past its grace period, it is considered late, + // otherwise it is on time and will be emitted last. + if sessionGracePeriodEndHeight+int64(numBlocksPerSession) < block.Height() { + lateSessions = append(lateSessions, sessionTree) + } else { + onTimeSessions = append(onTimeSessions, sessionTree) + } } - // Separate the sessions that are on-time from the ones that are late. - // If the session is past its grace period, it is considered late, - // otherwise it is on time and will be emitted last. - if sessionGracePeriodEndHeight+int64(numBlocksPerSession) < block.Height() { - lateSessions = append(lateSessions, sessionTree) - } else { - onTimeSessions = append(onTimeSessions, sessionTree) + // If there are any late sessions to be claimed, emit them first. + // The wait for claim submission window pipeline step will return immediately + // without blocking them. + if len(lateSessions) > 0 { + sessionsToClaimsPublishCh <- lateSessions } } - - // If there are any late sessions to be claimed, emit them first. - // The wait for claim submission window pipeline step will return immediately - // without blocking them. - if len(lateSessions) > 0 { - return lateSessions, false - } } - } - // Emit the on-time sessions last, after all the late sessions have been emitted. - if len(onTimeSessions) > 0 { - return onTimeSessions, false + // Emit the on-time sessions last, after all the late sessions have been emitted. + if len(onTimeSessions) > 0 { + sessionsToClaimsPublishCh <- onTimeSessions + } } - - return nil, true } // removeFromRelayerSessions removes the SessionTree from the relayerSessions.