Skip to content

Commit

Permalink
[Relayminer] Fix: relayminer on time sessions (#550)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
bryanchriswhite and Olshansk authored May 30, 2024
1 parent 729aa6b commit 2848ab8
Showing 1 changed file with 97 additions and 80 deletions.
177 changes: 97 additions & 80 deletions pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ func NewRelayerSessions(
return nil, err
}

rs.sessionsToClaimObs = channel.Map(
sessionsToClaimObs, sessionsToClaimPublishCh := channel.NewObservable[[]relayer.SessionTree]()
rs.sessionsToClaimObs = sessionsToClaimObs
channel.ForEach(
ctx,
rs.blockClient.CommittedBlocksSequence(ctx),
rs.mapBlockToSessionsToClaim,
rs.forEachBlockClaimSessionsFn(sessionsToClaimPublishCh),
)

return rs, nil
Expand Down Expand Up @@ -163,94 +165,109 @@ func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes.
return sessionTree, nil
}

// mapBlockToSessionsToClaim is a MapFn which maps committed blocks to a list of
// sessions which can be claimed as of that block.
// forEachBlockClaimSessionsFn returns a new ForEachFn that sends a lists of sessions which
// are eligible to be claimed at each block height on sessionsToClaimsPublishCh, effectively
// mapping committed blocks to a list of sessions which can be claimed as of that block.
//
// forEachBlockClaimSessionsFn returns a new ForEachFn that is called once for each block height.
// Given the current sessions in the rs.sessionsTrees map at the time of each call, it:
// - fetches the current shared module params
// - builds a list of "on-time" & "late" sessions that are eligible to be claimed as of a given block height
// - sends "late" & "on-time" sessions on sessionsToClaimsPublishCh as distinct notifications
//
// If "late" sessions are found, they are emitted as quickly as possible and are expected
// to bypass downstream delay operations. "late" sessions are emitted, as they're discovered
// (by iterating over map keys).
//
// Under nominal conditions, only one set of "on-time" sessions (w/ the same session start/end heights)
// should be present in the rs.sessionsTrees map. "Late" sessions
// are expected to present in the presence of network interruptions, restarts, or other
// disruptions to the relayminer process.
// TODO_IMPROVE: Add the ability for the process to resume where it left off in
// case the process is restarted or the connection is dropped and reconnected.
func (rs *relayerSessionsManager) mapBlockToSessionsToClaim(
ctx context.Context,
block client.Block,
) (sessionTrees []relayer.SessionTree, skip bool) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()

// onTimeSessions are the sessions that are still within their grace period.
// They are on time and will wait for their create claim window to open.
// They will be emitted last, after all the late sessions have been emitted.
var onTimeSessions []relayer.SessionTree

// TODO_TECHDEBT(#543): We don't really want to have to query the params for every method call.
// Once `ModuleParamsClient` is implemented, use its replay observable's `#Last()` method
// to get the most recently (asynchronously) observed (and cached) value.
sharedParams, err := rs.sharedQueryClient.GetParams(ctx)
if err != nil {
rs.logger.Error().Err(err).Msg("unable to query shared module params")
return nil, true
}
func (rs *relayerSessionsManager) forEachBlockClaimSessionsFn(
sessionsToClaimsPublishCh chan<- []relayer.SessionTree,
) channel.ForEachFn[client.Block] {
return func(ctx context.Context, block client.Block) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()

// onTimeSessions are the sessions that are still within their grace period.
// They are on time and will wait for their create claim window to open.
// They will be emitted last, after all the late sessions have been emitted.
var onTimeSessions []relayer.SessionTree

// TODO_TECHDEBT(#543): We don't really want to have to query the params for every method call.
// Once `ModuleParamsClient` is implemented, use its replay observable's `#Last()` method
// to get the most recently (asynchronously) observed (and cached) value.
sharedParams, err := rs.sharedQueryClient.GetParams(ctx)
if err != nil {
rs.logger.Error().Err(err).Msg("unable to query shared module params")
return
}

numBlocksPerSession := sharedParams.NumBlocksPerSession

// Check if there are sessions that need to enter the claim/proof phase as their
// end block height was the one before the last committed block or earlier.
// Iterate over the sessionsTrees map to get the ones that end at a block height
// lower than the current block height.
for sessionEndHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees {
// Late sessions are the ones that have their session grace period elapsed
// and should already have been claimed.
// Group them by their end block height and emit each group separately
// before emitting the on-time sessions.
var lateSessions []relayer.SessionTree

sessionGracePeriodEndHeight := shared.GetSessionGracePeriodEndHeight(sessionEndHeight)

// Checking for sessions to claim with <= operator,
// which means that it would include sessions that were supposed to be
// claimed in previous block heights too.
// These late sessions might have their create claim window closed and are
// no longer eligible to be claimed, but that's not always the case.
// Once claim window closing is implemented, they will be filtered out
// downstream at the waitForEarliestCreateClaimsHeight step.
// TODO_BLOCKER: Introduce governance claim and proof window durations,
// implement off-chain window closing and on-chain window checks.
if sessionGracePeriodEndHeight <= block.Height() {
// Iterate over the sessionsTrees that have grace period ending at this
// block height and add them to the list of sessionTrees to be published.
for _, sessionTree := range sessionsTreesEndingAtBlockHeight {
// Mark the session as claimed and add it to the list of sessionTrees to be published.
// If the session has already been claimed, it will be skipped.
// Appending the sessionTree to the list of sessionTrees is protected
// against concurrent access by the sessionsTreesMu such that the first
// call that marks the session as claimed will be the only one to add the
// sessionTree to the list.
if err := sessionTree.StartClaiming(); err != nil {
continue
numBlocksPerSession := sharedParams.NumBlocksPerSession

// Check if there are sessions that need to enter the claim/proof phase as their
// end block height was the one before the last committed block or earlier.
// Iterate over the sessionsTrees map to get the ones that end at a block height
// lower than the current block height.
for sessionEndHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees {
// Late sessions are the ones that have their session grace period elapsed
// and should already have been claimed.
// Group them by their end block height and emit each group separately
// before emitting the on-time sessions.
var lateSessions []relayer.SessionTree

sessionGracePeriodEndHeight := shared.GetSessionGracePeriodEndHeight(sessionEndHeight)

// Checking for sessions to claim with <= operator,
// which means that it would include sessions that were supposed to be
// claimed in previous block heights too.
// These late sessions might have their create claim window closed and are
// no longer eligible to be claimed, but that's not always the case.
// Once claim window closing is implemented, they will be filtered out
// downstream at the waitForEarliestCreateClaimsHeight step.
// TODO_BLOCKER: Introduce governance claim and proof window durations,
// implement off-chain window closing and on-chain window checks.
if sessionGracePeriodEndHeight <= block.Height() {
// Iterate over the sessionsTrees that have grace period ending at this
// block height and add them to the list of sessionTrees to be published.
for _, sessionTree := range sessionsTreesEndingAtBlockHeight {
// Mark the session as claimed and add it to the list of sessionTrees to be published.
// If the session has already been claimed, it will be skipped.
// Appending the sessionTree to the list of sessionTrees is protected
// against concurrent access by the sessionsTreesMu such that the first
// call that marks the session as claimed will be the only one to add the
// sessionTree to the list.
if err := sessionTree.StartClaiming(); err != nil {
continue
}

// Separate the sessions that are on-time from the ones that are late.
// If the session is past its grace period, it is considered late,
// otherwise it is on time and will be emitted last.
if sessionGracePeriodEndHeight+int64(numBlocksPerSession) < block.Height() {
lateSessions = append(lateSessions, sessionTree)
} else {
onTimeSessions = append(onTimeSessions, sessionTree)
}
}

// Separate the sessions that are on-time from the ones that are late.
// If the session is past its grace period, it is considered late,
// otherwise it is on time and will be emitted last.
if sessionGracePeriodEndHeight+int64(numBlocksPerSession) < block.Height() {
lateSessions = append(lateSessions, sessionTree)
} else {
onTimeSessions = append(onTimeSessions, sessionTree)
// If there are any late sessions to be claimed, emit them first.
// The wait for claim submission window pipeline step will return immediately
// without blocking them.
if len(lateSessions) > 0 {
sessionsToClaimsPublishCh <- lateSessions
}
}

// If there are any late sessions to be claimed, emit them first.
// The wait for claim submission window pipeline step will return immediately
// without blocking them.
if len(lateSessions) > 0 {
return lateSessions, false
}
}
}

// Emit the on-time sessions last, after all the late sessions have been emitted.
if len(onTimeSessions) > 0 {
return onTimeSessions, false
// Emit the on-time sessions last, after all the late sessions have been emitted.
if len(onTimeSessions) > 0 {
sessionsToClaimsPublishCh <- onTimeSessions
}
}

return nil, true
}

// removeFromRelayerSessions removes the SessionTree from the relayerSessions.
Expand Down

0 comments on commit 2848ab8

Please sign in to comment.