Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Relayminer] refactor: relayerSessionsManager#waitForBlock() #648

Merged
merged 30 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
234b839
chore: add ReplayObservable#SubscribeFromLatestBufferedOffset
bryanchriswhite Jul 2, 2024
f65b1bf
Empty commit
bryanchriswhite Jul 2, 2024
ed9f13e
refactor: relayerSessionsManager#waitForBlock()
bryanchriswhite Jul 2, 2024
759d8f4
fix: relayer sessions manager test
bryanchriswhite Jul 4, 2024
912e820
refactor: relayerSessionsManager#waitForBlock()
bryanchriswhite Jul 4, 2024
051c1da
refactor: rename ReplayObservable#replayBufferSize to #replayBufferCap
bryanchriswhite Jul 4, 2024
8d43cb9
chore: add ReplayObservable#GetReplayBufferSize
bryanchriswhite Jul 4, 2024
1ee0cdf
chore: simplify ReplayObservable#SubscribefromLatestBufferedOffset()
bryanchriswhite Jul 4, 2024
6ef8049
Merge branch 'issues/553/fix/replay-observable' into issues/553/fix/r…
bryanchriswhite Jul 4, 2024
c0b2cd9
fix: failing tests
bryanchriswhite Jul 4, 2024
2dbfcc7
Merge branch 'issues/553/fix/replay-observable' into issues/553/fix/r…
bryanchriswhite Jul 4, 2024
ffbe377
Merge branch 'main' into issues/553/fix/replay-observable
bryanchriswhite Jul 4, 2024
10dcc7f
chore: review feedback improvements
bryanchriswhite Jul 5, 2024
0ebc1b6
chore: update comment
bryanchriswhite Jul 5, 2024
049c9e1
chore: update comment
bryanchriswhite Jul 5, 2024
27a0d00
Merge branch 'main' into issues/553/fix/replay-observable
bryanchriswhite Jul 5, 2024
fac9952
chore: go imports
bryanchriswhite Jul 6, 2024
2b98a3b
fix: complete rename of replayBufferSize to replayBufferCap
bryanchriswhite Jul 6, 2024
eba0df3
chore: add godoc comment
bryanchriswhite Jul 6, 2024
85ae468
chore: review feedback improvements
bryanchriswhite Jul 6, 2024
bc682fd
Merge branch 'issues/553/fix/replay-observable' into issues/553/fix/r…
bryanchriswhite Jul 6, 2024
8388973
Merge branch 'main' into issues/553/fix/replay-observable
bryanchriswhite Jul 8, 2024
80699d5
Merge branch 'issues/553/fix/replay-observable' into issues/553/fix/r…
bryanchriswhite Jul 8, 2024
a518f32
Merge branch 'main' into issues/553/fix/replay-observable
bryanchriswhite Jul 11, 2024
d833d43
Merge branch 'issues/553/fix/replay-observable' into issues/553/fix/r…
bryanchriswhite Jul 11, 2024
8f49b22
Merge branch 'main' into issues/553/fix/replay-observable
bryanchriswhite Jul 12, 2024
d003332
chore: review feedback improvements
bryanchriswhite Jul 12, 2024
23bc4a9
Merge branch 'issues/553/fix/replay-observable' into issues/553/fix/r…
bryanchriswhite Jul 12, 2024
2ce42b8
Merge remote-tracking branch 'pokt/main' into issues/553/fix/relayer-…
bryanchriswhite Jul 12, 2024
c6966ec
fix: goimports
bryanchriswhite Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/observable/channel/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ func MapExpand[S, D any](
// each notification received from the observable. If the transformFn returns a
// skip bool of true, the notification is skipped and not emitted to the resulting
// observable.
// The resulting observable will receive the last replayBufferSize
// The resulting observable will receive the last replayBufferCap
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// number of values published to the source observable before receiving new values.
func MapReplay[S, D any](
ctx context.Context,
replayBufferSize int,
replayBufferCap int,
srcObservable observable.Observable[S],
transformFn MapFn[S, D],
) observable.ReplayObservable[D] {
dstObservable, dstProducer := NewReplayObservable[D](ctx, replayBufferSize)
dstObservable, dstProducer := NewReplayObservable[D](ctx, replayBufferCap)
srcObserver := srcObservable.Subscribe(ctx)

go goMapTransformNotification(
Expand Down
63 changes: 47 additions & 16 deletions pkg/observable/channel/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ var (
)

type replayObservable[V any] struct {
// replayBufferSize is the number of notifications to buffer so that they
// replayBufferCap is the number of notifications to buffer so that they
// can be replayed to new observers.
replayBufferSize int
replayBufferCap int
// replayBufferMu protects replayBuffer from concurrent access/updates.
replayBufferMu sync.RWMutex
// replayBuffer holds the last relayBufferSize number of notifications received
Expand All @@ -29,30 +29,30 @@ type replayObservable[V any] struct {
}

// NewReplayObservable returns a new ReplayObservable with the given replay buffer
// replayBufferSize and the corresponding publish channel to notify it of new values.
// replayBufferCap and the corresponding publish channel to notify it of new values.
func NewReplayObservable[V any](
ctx context.Context,
replayBufferSize int,
replayBufferCap int,
opts ...option[V],
) (observable.ReplayObservable[V], chan<- V) {
obsvbl, publishCh := NewObservable[V](opts...)

return ToReplayObservable(ctx, replayBufferSize, obsvbl), publishCh
return ToReplayObservable(ctx, replayBufferCap, obsvbl), publishCh
}

// ToReplayObservable returns an observable which replays the last replayBufferSize
// ToReplayObservable returns an observable which replays the last replayBufferCap
// number of values published to the source observable to new observers, before
// publishing new values.
// It should only be used with a srcObservable which contains channelObservers
// (i.e. channelObservable or similar).
func ToReplayObservable[V any](
ctx context.Context,
replayBufferSize int,
replayBufferCap int,
srcObsvbl observable.Observable[V],
) observable.ReplayObservable[V] {
replayObsvbl := &replayObservable[V]{
replayBufferSize: replayBufferSize,
replayBuffer: []V{},
replayBufferCap: replayBufferCap,
replayBuffer: []V{},
}

replayObsvbl.bufferingObsvbl = replayObsvbl.initBufferingObservable(ctx, srcObsvbl)
Expand All @@ -70,11 +70,11 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
defer cancel()

// If n is greater than the replay buffer size, return the entire replay buffer.
if n > ro.replayBufferSize {
n = ro.replayBufferSize
if n > ro.replayBufferCap {
n = ro.replayBufferCap
logger.Warn().
Int("requested_replay_buffer_size", n).
Int("replay_buffer_capacity", ro.replayBufferSize).
Int("replay_buffer_capacity", ro.replayBufferCap).
Msg("requested replay buffer size is greater than replay buffer capacity; returning entire replay buffer")
}

Expand Down Expand Up @@ -122,18 +122,42 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
// It replays the values stored in the replay buffer in the order of their arrival
// before emitting new values.
func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
return ro.SubscribeFromLatestBufferedOffset(ctx, ro.replayBufferCap)
}

// SubscribeFromLatestBufferedOffset returns an observer which is initially notified of
// values in the replay buffer, starting from the latest buffered value at index 'offset'.
//
// After this range of the replay buffer is notified, the observer continues to be notified,
// in real-time, when the publishCh channel receives a new value.
//
// If offset is greater than replayBufferCap or the number of elements it currently contains,
// the observer is notified of all elements in the replayBuffer, starting from the beginning.
//
// Passing 0 for offset is equivalent to calling Subscribe() on a non-replay observable.
func (ro *replayObservable[V]) SubscribeFromLatestBufferedOffset(
ctx context.Context,
endOffset int,
) observable.Observer[V] {
obs, ch := NewObservable[V]()
ctx, cancel := context.WithCancel(ctx)

go func() {
// Replay the values stored in the buffer form the oldest to the newest.
ro.replayBufferMu.RLock()
for i := len(ro.replayBuffer) - 1; i >= 0; i-- {

// Ensure that the offset is within the bounds of the replay buffer.
if endOffset > len(ro.replayBuffer) {
endOffset = len(ro.replayBuffer)
}

// Replay the values stored in the buffer form the oldest to the newest.
for i := endOffset - 1; i >= 0; i-- {
ch <- ro.replayBuffer[i]
}

bufferedValuesCh := ro.bufferingObsvbl.Subscribe(ctx).Ch()
ro.replayBufferMu.RUnlock()

// Since bufferingObsvbl emits all buffered values in one notification
// and the replay buffer has already been replayed, only the most recent
// value needs to be published
Expand All @@ -152,6 +176,13 @@ func (ro *replayObservable[V]) UnsubscribeAll() {
ro.bufferingObsvbl.UnsubscribeAll()
}

// GetReplayBufferSize returns the number of elements currently in the replay buffer.
func (ro *replayObservable[V]) GetReplayBufferSize() int {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
ro.replayBufferMu.RLock()
defer ro.replayBufferMu.RUnlock()
return len(ro.replayBuffer)
}

// initBufferingObservable receives and buffers the last n notifications from
// the a source observable and emits all buffered values at once.
func (ro *replayObservable[V]) initBufferingObservable(
Expand All @@ -167,10 +198,10 @@ func (ro *replayObservable[V]) initBufferingObservable(
for value := range ch {
ro.replayBufferMu.Lock()
// The newest value is always at the beginning of the replay buffer.
if len(ro.replayBuffer) < ro.replayBufferSize {
if len(ro.replayBuffer) < ro.replayBufferCap {
ro.replayBuffer = append([]V{value}, ro.replayBuffer...)
} else {
ro.replayBuffer = append([]V{value}, ro.replayBuffer[:ro.replayBufferSize-1]...)
ro.replayBuffer = append([]V{value}, ro.replayBuffer[:ro.replayBufferCap-1]...)
}
// Emit all buffered values at once.
bufferedObsvblCh <- ro.replayBuffer
Expand Down
120 changes: 98 additions & 22 deletions pkg/observable/channel/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/testutil/testerrors"
)
Expand Down Expand Up @@ -62,12 +63,12 @@ func TestReplayObservable_Overflow(t *testing.T) {

func TestReplayObservable(t *testing.T) {
var (
replayBufferSize = 3
values = []int{1, 2, 3, 4, 5}
replayBufferCap = 3
values = []int{1, 2, 3, 4, 5}
// the replay buffer is full and has shifted out values with index <
// len(values)-replayBufferSize so Last should return values starting
// len(values)-replayBufferCap so Last should return values starting
// from there.
expectedValues = values[len(values)-replayBufferSize:]
expectedValues = values[len(values)-replayBufferCap:]
errCh = make(chan error, 1)
ctx, cancel = context.WithCancel(context.Background())
)
Expand All @@ -76,7 +77,7 @@ func TestReplayObservable(t *testing.T) {
// NB: intentionally not using NewReplayObservable() to test ToReplayObservable() directly
// and to retain a reference to the wrapped observable for testing.
obsvbl, publishCh := channel.NewObservable[int]()
replayObsvbl := channel.ToReplayObservable[int](ctx, replayBufferSize, obsvbl)
replayObsvbl := channel.ToReplayObservable[int](ctx, replayBufferCap, obsvbl)

// vanilla observer, should be able to receive all values published after subscribing
observer := obsvbl.Subscribe(ctx)
Expand Down Expand Up @@ -149,32 +150,32 @@ func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) {
slices.Reverse(expectedValues)

tests := []struct {
name string
replayBufferSize int
name string
replayBufferCap int
// lastN is the number of values to return from the replay buffer
lastN int
expectedValues []int
}{
{
name: "n < replayBufferSize",
replayBufferSize: 5,
lastN: 3,
name: "n < replayBufferCap",
replayBufferCap: 5,
lastN: 3,
// the replay buffer has enough values to return to Last, it should return
// the last n values in the replay buffer.
expectedValues: values[2:], // []int{5, 4, 3},
},
{
name: "n = replayBufferSize",
replayBufferSize: 5,
lastN: 5,
expectedValues: values,
name: "n = replayBufferCap",
replayBufferCap: 5,
lastN: 5,
expectedValues: values,
},
{
name: "n > replayBufferSize",
replayBufferSize: 3,
lastN: 5,
name: "n > replayBufferCap",
replayBufferCap: 3,
lastN: 5,
// the replay buffer is full so Last should return values starting
// from lastN - replayBufferSize.
// from lastN - replayBufferCap.
expectedValues: values[2:], // []int{5, 4, 3},
},
}
Expand All @@ -184,7 +185,7 @@ func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) {
var ctx = context.Background()

replayObsvbl, publishCh :=
channel.NewReplayObservable[int](ctx, test.replayBufferSize)
channel.NewReplayObservable[int](ctx, test.replayBufferCap)

for _, value := range values {
publishCh <- value
Expand All @@ -199,8 +200,8 @@ func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) {

func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
var (
replayBufferSize = 5
lastN = 5
replayBufferCap = 5
lastN = 5
// splitIdx is the index at which this test splits the set of values.
// The two groups of values are published at different points in the
// test to test the behavior of Last under different conditions.
Expand All @@ -209,7 +210,7 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
ctx = context.Background()
)

replayObsvbl, publishCh := channel.NewReplayObservable[int](ctx, replayBufferSize)
replayObsvbl, publishCh := channel.NewReplayObservable[int](ctx, replayBufferCap)

// getLastValues is a helper function which returns a channel that will
// receive the result of a call to Last, the method under test.
Expand Down Expand Up @@ -300,3 +301,78 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
require.ElementsMatch(t, []int{5, 4, 3, 2}, replayObsvbl.Last(ctx, 4))
require.ElementsMatch(t, []int{5, 4, 3, 2, 1}, replayObsvbl.Last(ctx, 5))
}

func TestReplayObservable_SubscribeFromLatestBufferedOffset(t *testing.T) {
receiveTimeout := 100 * time.Millisecond
values := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

tests := []struct {
name string
replayBufferCap int
endOffset int
expectedValues []int
}{
{
name: "endOffset = replayBufferCap",
replayBufferCap: 8,
endOffset: 8,
expectedValues: values[2:], // []int{2, 3, 4, 5, ..., 9},
},
{
name: "endOffset < replayBufferCap",
replayBufferCap: 10,
endOffset: 2,
expectedValues: values[8:], // []int{8, 9},
},
{
name: "endOffset > replayBufferCap",
replayBufferCap: 8,
endOffset: 10,
expectedValues: values[2:],
},
{
name: "replayBufferCap > endOffset > numBufferedValues ",
replayBufferCap: 20,
endOffset: 15,
expectedValues: values,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var ctx = context.Background()

replayObsvbl, publishCh :=
channel.NewReplayObservable[int](ctx, test.replayBufferCap)

for _, value := range values {
publishCh <- value
time.Sleep(time.Millisecond)
}

observer := replayObsvbl.SubscribeFromLatestBufferedOffset(ctx, test.endOffset)
// Assumes all values will be received within receiveTimeout.
actualValues := accumulateValues(observer, receiveTimeout)
require.EqualValues(t, test.expectedValues, actualValues)
})
}
}

func accumulateValues[V any](
observer observable.Observer[V],
timeout time.Duration,
) (values []V) {
for {
select {
case value, ok := <-observer.Ch():
if !ok {
return
}

values = append(values, value)
continue
case <-time.After(timeout):
return
}
}
}
13 changes: 13 additions & 0 deletions pkg/observable/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,22 @@ import "context"
// to new observers, before publishing new values to observers.
type ReplayObservable[V any] interface {
Observable[V]
// SubscribeFromLatestBufferedOffset returns an observer which is initially notified of
// values in the replay buffer, starting from the latest buffered value at index 'offset'.
//
// After this range of the replay buffer is notified, the observer continues to be notified,
// in real-time, when the publishCh channel receives a new value.
//
// If offset is greater than replayBufferCap or the number of elements it currently contains,
// the observer is notified of all elements in the replayBuffer, starting from the beginning.
//
// Passing 0 for offset is equivalent to calling Subscribe() on a non-replay observable.
SubscribeFromLatestBufferedOffset(ctx context.Context, offset int) Observer[V]
// Last synchronously returns the last n values from the replay buffer with
// LIFO ordering
Last(ctx context.Context, n int) []V
// GetReplayBufferSize returns the number of elements currently in the replay buffer.
GetReplayBufferSize() int
}

// Observable is a generic interface that allows multiple subscribers to be
Expand Down
11 changes: 10 additions & 1 deletion pkg/relayer/session/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,16 @@ func (rs *relayerSessionsManager) waitForEarliestCreateClaimsHeight(
// The block that'll be used as a source of entropy for which branch(es) to
// prove should be deterministic and use on-chain governance params.
claimsWindowOpenBlock := rs.waitForBlock(ctx, claimWindowOpenHeight)

// TODO_MAINNET: If a relayminer is cold-started with persisted but unclaimed ("late")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to tend to this right now, but don't we have a type for it that either listens on or receives block events that can be reused here? Would #PUC to reference it.

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm following you here. The BlockClient is being used to listen for events but in the scenario described here, the block client would be started after the event for the block in question was already emitted (i.e., it would never see that block event). In this case we need to use the BlockQueryClient.

// sessions, the claimsWindowOpenBlock will never be observed. In this case, we should
// use a block query client to populate the block client replay observable at the time
// of block client construction. This check and failure branch can be removed once this
// is implemented.
if claimsWindowOpenBlock == nil {
logger.Warn().Msg("failed to observe earliest claim commit height offset seed block height")
failedCreateClaimsSessionsCh <- sessionTrees
return nil
}
claimsFlushedCh := make(chan []relayer.SessionTree)
defer close(claimsFlushedCh)
go rs.goCreateClaimRoots(
Expand Down
Loading
Loading