Skip to content

Commit

Permalink
[Relayminer] refactor: relayerSessionsManager#waitForBlock() (#648)
Browse files Browse the repository at this point in the history
Co-authored-by: Redouane Lakrache <[email protected]>
  • Loading branch information
bryanchriswhite and red-0ne authored Jul 12, 2024
1 parent 69f4f65 commit 471069b
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 75 deletions.
6 changes: 3 additions & 3 deletions pkg/observable/channel/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ func MapExpand[S, D any](
// each notification received from the observable. If the transformFn returns a
// skip bool of true, the notification is skipped and not emitted to the resulting
// observable.
// The resulting observable will receive the last replayBufferSize
// The resulting observable will receive the last replayBufferCap
// number of values published to the source observable before receiving new values.
func MapReplay[S, D any](
ctx context.Context,
replayBufferSize int,
replayBufferCap int,
srcObservable observable.Observable[S],
transformFn MapFn[S, D],
) observable.ReplayObservable[D] {
dstObservable, dstProducer := NewReplayObservable[D](ctx, replayBufferSize)
dstObservable, dstProducer := NewReplayObservable[D](ctx, replayBufferCap)
srcObserver := srcObservable.Subscribe(ctx)

go goMapTransformNotification(
Expand Down
39 changes: 23 additions & 16 deletions pkg/observable/channel/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ var (
)

type replayObservable[V any] struct {
// replayBufferSize is the number of notifications to buffer so that they
// replayBufferCap is the number of notifications to buffer so that they
// can be replayed to new observers.
replayBufferSize int
replayBufferCap int
// replayBufferMu protects replayBuffer from concurrent access/updates.
replayBufferMu sync.RWMutex
// replayBuffer holds the last relayBufferSize number of notifications received
Expand All @@ -29,30 +29,30 @@ type replayObservable[V any] struct {
}

// NewReplayObservable returns a new ReplayObservable with the given replay buffer
// replayBufferSize and the corresponding publish channel to notify it of new values.
// replayBufferCap and the corresponding publish channel to notify it of new values.
func NewReplayObservable[V any](
ctx context.Context,
replayBufferSize int,
replayBufferCap int,
opts ...option[V],
) (observable.ReplayObservable[V], chan<- V) {
obsvbl, publishCh := NewObservable[V](opts...)

return ToReplayObservable(ctx, replayBufferSize, obsvbl), publishCh
return ToReplayObservable(ctx, replayBufferCap, obsvbl), publishCh
}

// ToReplayObservable returns an observable which replays the last replayBufferSize
// ToReplayObservable returns an observable which replays the last replayBufferCap
// number of values published to the source observable to new observers, before
// publishing new values.
// It should only be used with a srcObservable which contains channelObservers
// (i.e. channelObservable or similar).
func ToReplayObservable[V any](
ctx context.Context,
replayBufferSize int,
replayBufferCap int,
srcObsvbl observable.Observable[V],
) observable.ReplayObservable[V] {
replayObsvbl := &replayObservable[V]{
replayBufferSize: replayBufferSize,
replayBuffer: []V{},
replayBufferCap: replayBufferCap,
replayBuffer: []V{},
}

replayObsvbl.bufferingObsvbl = replayObsvbl.initBufferingObservable(ctx, srcObsvbl)
Expand All @@ -70,11 +70,11 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
defer cancel()

// If n is greater than the replay buffer size, return the entire replay buffer.
if n > ro.replayBufferSize {
n = ro.replayBufferSize
if n > ro.replayBufferCap {
n = ro.replayBufferCap
logger.Warn().
Int("requested_replay_buffer_size", n).
Int("replay_buffer_capacity", ro.replayBufferSize).
Int("replay_buffer_capacity", ro.replayBufferCap).
Msg("requested replay buffer size is greater than replay buffer capacity; returning entire replay buffer")
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
// It replays the values stored in the replay buffer in the order of their arrival
// before emitting new values.
func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
return ro.SubscribeFromLatestBufferedOffset(ctx, ro.replayBufferSize)
return ro.SubscribeFromLatestBufferedOffset(ctx, ro.replayBufferCap)
}

// SubscribeFromLatestBufferedOffset returns an observer which is initially notified of
Expand All @@ -131,7 +131,7 @@ func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observe
// After this range of the replay buffer is notified, the observer continues to be notified,
// in real-time, when the publishCh channel receives a new value.
//
// If offset is greater than replayBufferSize or the number of elements it currently contains,
// If offset is greater than replayBufferCap or the number of elements it currently contains,
// the observer is notified of all elements in the replayBuffer, starting from the beginning.
//
// Passing 0 for offset is equivalent to calling Subscribe() on a non-replay observable.
Expand Down Expand Up @@ -176,6 +176,13 @@ func (ro *replayObservable[V]) UnsubscribeAll() {
ro.bufferingObsvbl.UnsubscribeAll()
}

// GetReplayBufferSize returns the number of elements currently in the replay buffer.
func (ro *replayObservable[V]) GetReplayBufferSize() int {
ro.replayBufferMu.RLock()
defer ro.replayBufferMu.RUnlock()
return len(ro.replayBuffer)
}

// initBufferingObservable receives and buffers the last n notifications from
// the a source observable and emits all buffered values at once.
func (ro *replayObservable[V]) initBufferingObservable(
Expand All @@ -191,10 +198,10 @@ func (ro *replayObservable[V]) initBufferingObservable(
for value := range ch {
ro.replayBufferMu.Lock()
// The newest value is always at the beginning of the replay buffer.
if len(ro.replayBuffer) < ro.replayBufferSize {
if len(ro.replayBuffer) < ro.replayBufferCap {
ro.replayBuffer = append([]V{value}, ro.replayBuffer...)
} else {
ro.replayBuffer = append([]V{value}, ro.replayBuffer[:ro.replayBufferSize-1]...)
ro.replayBuffer = append([]V{value}, ro.replayBuffer[:ro.replayBufferCap-1]...)
}
// Emit all buffered values at once.
bufferedObsvblCh <- ro.replayBuffer
Expand Down
86 changes: 43 additions & 43 deletions pkg/observable/channel/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func TestReplayObservable_Overflow(t *testing.T) {

func TestReplayObservable(t *testing.T) {
var (
replayBufferSize = 3
values = []int{1, 2, 3, 4, 5}
replayBufferCap = 3
values = []int{1, 2, 3, 4, 5}
// the replay buffer is full and has shifted out values with index <
// len(values)-replayBufferSize so Last should return values starting
// len(values)-replayBufferCap so Last should return values starting
// from there.
expectedValues = values[len(values)-replayBufferSize:]
expectedValues = values[len(values)-replayBufferCap:]
errCh = make(chan error, 1)
ctx, cancel = context.WithCancel(context.Background())
)
Expand All @@ -77,7 +77,7 @@ func TestReplayObservable(t *testing.T) {
// NB: intentionally not using NewReplayObservable() to test ToReplayObservable() directly
// and to retain a reference to the wrapped observable for testing.
obsvbl, publishCh := channel.NewObservable[int]()
replayObsvbl := channel.ToReplayObservable[int](ctx, replayBufferSize, obsvbl)
replayObsvbl := channel.ToReplayObservable[int](ctx, replayBufferCap, obsvbl)

// vanilla observer, should be able to receive all values published after subscribing
observer := obsvbl.Subscribe(ctx)
Expand Down Expand Up @@ -150,32 +150,32 @@ func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) {
slices.Reverse(expectedValues)

tests := []struct {
name string
replayBufferSize int
name string
replayBufferCap int
// lastN is the number of values to return from the replay buffer
lastN int
expectedValues []int
}{
{
name: "n < replayBufferSize",
replayBufferSize: 5,
lastN: 3,
name: "n < replayBufferCap",
replayBufferCap: 5,
lastN: 3,
// the replay buffer has enough values to return to Last, it should return
// the last n values in the replay buffer.
expectedValues: values[2:], // []int{5, 4, 3},
},
{
name: "n = replayBufferSize",
replayBufferSize: 5,
lastN: 5,
expectedValues: values,
name: "n = replayBufferCap",
replayBufferCap: 5,
lastN: 5,
expectedValues: values,
},
{
name: "n > replayBufferSize",
replayBufferSize: 3,
lastN: 5,
name: "n > replayBufferCap",
replayBufferCap: 3,
lastN: 5,
// the replay buffer is full so Last should return values starting
// from lastN - replayBufferSize.
// from lastN - replayBufferCap.
expectedValues: values[2:], // []int{5, 4, 3},
},
}
Expand All @@ -185,7 +185,7 @@ func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) {
var ctx = context.Background()

replayObsvbl, publishCh :=
channel.NewReplayObservable[int](ctx, test.replayBufferSize)
channel.NewReplayObservable[int](ctx, test.replayBufferCap)

for _, value := range values {
publishCh <- value
Expand All @@ -200,8 +200,8 @@ func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) {

func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
var (
replayBufferSize = 5
lastN = 5
replayBufferCap = 5
lastN = 5
// splitIdx is the index at which this test splits the set of values.
// The two groups of values are published at different points in the
// test to test the behavior of Last under different conditions.
Expand All @@ -210,7 +210,7 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
ctx = context.Background()
)

replayObsvbl, publishCh := channel.NewReplayObservable[int](ctx, replayBufferSize)
replayObsvbl, publishCh := channel.NewReplayObservable[int](ctx, replayBufferCap)

// getLastValues is a helper function which returns a channel that will
// receive the result of a call to Last, the method under test.
Expand Down Expand Up @@ -307,34 +307,34 @@ func TestReplayObservable_SubscribeFromLatestBufferedOffset(t *testing.T) {
values := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

tests := []struct {
name string
replayBufferSize int
endOffset int
expectedValues []int
name string
replayBufferCap int
endOffset int
expectedValues []int
}{
{
name: "endOffset = replayBufferSize",
replayBufferSize: 8,
endOffset: 8,
expectedValues: values[2:], // []int{2, 3, 4, 5, ..., 9},
name: "endOffset = replayBufferCap",
replayBufferCap: 8,
endOffset: 8,
expectedValues: values[2:], // []int{2, 3, 4, 5, ..., 9},
},
{
name: "endOffset < replayBufferSize",
replayBufferSize: 10,
endOffset: 2,
expectedValues: values[8:], // []int{8, 9},
name: "endOffset < replayBufferCap",
replayBufferCap: 10,
endOffset: 2,
expectedValues: values[8:], // []int{8, 9},
},
{
name: "endOffset > replayBufferSize",
replayBufferSize: 8,
endOffset: 10,
expectedValues: values[2:],
name: "endOffset > replayBufferCap",
replayBufferCap: 8,
endOffset: 10,
expectedValues: values[2:],
},
{
name: "replayBufferSize > endOffset > numBufferedValues ",
replayBufferSize: 20,
endOffset: 15,
expectedValues: values,
name: "replayBufferCap > endOffset > numBufferedValues ",
replayBufferCap: 20,
endOffset: 15,
expectedValues: values,
},
}

Expand All @@ -343,7 +343,7 @@ func TestReplayObservable_SubscribeFromLatestBufferedOffset(t *testing.T) {
var ctx = context.Background()

replayObsvbl, publishCh :=
channel.NewReplayObservable[int](ctx, test.replayBufferSize)
channel.NewReplayObservable[int](ctx, test.replayBufferCap)

for _, value := range values {
publishCh <- value
Expand Down
4 changes: 3 additions & 1 deletion pkg/observable/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ type ReplayObservable[V any] interface {
// After this range of the replay buffer is notified, the observer continues to be notified,
// in real-time, when the publishCh channel receives a new value.
//
// If offset is greater than replayBufferSize or the number of elements it currently contains,
// If offset is greater than replayBufferCap or the number of elements it currently contains,
// the observer is notified of all elements in the replayBuffer, starting from the beginning.
//
// Passing 0 for offset is equivalent to calling Subscribe() on a non-replay observable.
SubscribeFromLatestBufferedOffset(ctx context.Context, offset int) Observer[V]
// Last synchronously returns the last n values from the replay buffer with
// LIFO ordering
Last(ctx context.Context, n int) []V
// GetReplayBufferSize returns the number of elements currently in the replay buffer.
GetReplayBufferSize() int
}

// Observable is a generic interface that allows multiple subscribers to be
Expand Down
11 changes: 10 additions & 1 deletion pkg/relayer/session/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,16 @@ func (rs *relayerSessionsManager) waitForEarliestCreateClaimsHeight(
// The block that'll be used as a source of entropy for which branch(es) to
// prove should be deterministic and use on-chain governance params.
claimsWindowOpenBlock := rs.waitForBlock(ctx, claimWindowOpenHeight)

// TODO_MAINNET: If a relayminer is cold-started with persisted but unclaimed ("late")
// sessions, the claimsWindowOpenBlock will never be observed. In this case, we should
// use a block query client to populate the block client replay observable at the time
// of block client construction. This check and failure branch can be removed once this
// is implemented.
if claimsWindowOpenBlock == nil {
logger.Warn().Msg("failed to observe earliest claim commit height offset seed block height")
failedCreateClaimsSessionsCh <- sessionTrees
return nil
}
claimsFlushedCh := make(chan []relayer.SessionTree)
defer close(claimsFlushedCh)
go rs.goCreateClaimRoots(
Expand Down
13 changes: 13 additions & 0 deletions pkg/relayer/session/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ func (rs *relayerSessionsManager) waitForEarliestSubmitProofsHeightAndGeneratePr
logger.Info().Msg("waiting & blocking until the proof window open height")

proofsWindowOpenBlock := rs.waitForBlock(ctx, proofWindowOpenHeight)
// TODO_MAINNET: If a relayminer is cold-started with persisted but unproven ("late")
// sessions, the proofsWindowOpenBlock will never be observed. Where a "late" session
// is one whic is unclaimed and whose earliest claim commit height has already elapsed.
//
// In this case, we should
// use a block query client to populate the block client replay observable at the time
// of block client construction. This check and failure branch can be removed once this
// is implemented.
if proofsWindowOpenBlock == nil {
logger.Warn().Msg("failed to observe earliest proof commit height offset seed block height")
failedSubmitProofsSessionsCh <- sessionTrees
return nil
}

// TODO_BLOCKER(@bryanchriswhite, @red0ne): After lean client, there's no
// guarantee that all session trees have the same supplier address. Group
Expand Down
33 changes: 28 additions & 5 deletions pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,19 +311,42 @@ func (rs *relayerSessionsManager) validateConfig() error {

// waitForBlock blocks until the block at the given height (or greater) is
// observed as having been committed.
func (rs *relayerSessionsManager) waitForBlock(ctx context.Context, height int64) client.Block {
func (rs *relayerSessionsManager) waitForBlock(ctx context.Context, targetHeight int64) client.Block {
// Create a cancellable child context for managing the CommittedBlocksSequence lifecycle.
// Since the subscription is no longer needed after the block it is looking for
// Since the committedBlocksObserver is no longer needed after the block it is looking for
// is reached, cancelling the child context at the end of the function will stop
// the subscriptions and close the publish channel associated with the
// CommittedBlocksSequence observable which is not exposing it.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

subscription := rs.blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx)
committedBlocksObs := rs.blockClient.CommittedBlocksSequence(ctx)
committedBlocksObserver := committedBlocksObs.Subscribe(ctx)

// minNumReplayBlocks is the number of blocks that MUST be in the block client's
// replay buffer such that the target block can be observed.
//
// Plus one is necessary for the "oldest" boundary to include targetHeight.
//
// If minNumReplayBlocks is negative, no replay is necessary and the replay buffer will be ignored.
minNumReplayBlocks := rs.blockClient.LastBlock(ctx).Height() - targetHeight + 1

// TODO_MAINNET: If the replay buffer size is less than minNumReplayBlocks, the target
// block targetHeight will never be observed. This can happen if a relayminer is cold-started
// with persisted but unclaimed/unproven ("late") sessions, where a "late" session is one
// which is unclaimed and whose earliest claim commit height has already elapsed.
//
// In this case, we should use a block query client to populate the block client replay
// observable at the time of block client construction.
// The latestBufferedOffset would be the difference between the current height and
// earliest unclaimed/unproven session's earliest supplier claim/proof commit height.
// This check and return branch can be removed once this is implemented.
if committedBlocksObs.GetReplayBufferSize() < int(minNumReplayBlocks) {
return nil
}

for block := range subscription.Ch() {
if block.Height() >= height {
for block := range committedBlocksObserver.Ch() {
if block.Height() >= targetHeight {
return block
}
}
Expand Down
Loading

0 comments on commit 471069b

Please sign in to comment.