Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Observables] chore: add ReplayObservable#SubscribeFromLatestBufferedOffset() #647

Merged
merged 13 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions pkg/observable/channel/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,40 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
// It replays the values stored in the replay buffer in the order of their arrival
// before emitting new values.
func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
return ro.SubscribeFromLatestBufferedOffset(ctx, ro.replayBufferSize)
}

// SubscribeFromLatestBufferedOffset returns an observer which is initially notified of
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// values in the replay buffer, starting from the latest buffered value index - offset.
// After this range of the replay buffer is notified, the observer continues to be notified,
// in real-time, when the publishCh channel receives a value. It also returns the current
// replayBufferSize. If offset is greater than replayBufferSize or the number of elements it
// currently contains, the observer is notified of all elements in the replayBuffer,
// starting from the beginning. Passing 0 for offset is equivalent to calling
// Subscribe() on a non-replay observable.
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
func (ro *replayObservable[V]) SubscribeFromLatestBufferedOffset(
ctx context.Context,
endOffset int,
) observable.Observer[V] {
obs, ch := NewObservable[V]()
ctx, cancel := context.WithCancel(ctx)

go func() {
// Replay the values stored in the buffer form the oldest to the newest.
ro.replayBufferMu.RLock()
for i := len(ro.replayBuffer) - 1; i >= 0; i-- {
defer ro.replayBufferMu.RUnlock()

// Ensure that the offset is within the bounds of the replay buffer.
if endOffset > len(ro.replayBuffer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it a bit confusing if endOffset is number of elements from the start or the end of the buffer.

Should we PUC that "it points to the last element in the replayBuffer"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an offset from the "latest buffered" value. I thought the name and godoc comment would make it sufficiently clear. I'm not sure I understand your suggestion; what is pointing to the last element in the replayBuffer? And by last do you mean "latest" or largest index?

endOffset = len(ro.replayBuffer)
}

// Replay the values stored in the buffer form the oldest to the newest.
for i := endOffset - 1; i >= 0; i-- {
ch <- ro.replayBuffer[i]
}

bufferedValuesCh := ro.bufferingObsvbl.Subscribe(ctx).Ch()
ro.replayBufferMu.RUnlock()

// Since bufferingObsvbl emits all buffered values in one notification
// and the replay buffer has already been replayed, only the most recent
// value needs to be published
Expand Down
76 changes: 76 additions & 0 deletions pkg/observable/channel/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/testutil/testerrors"
)
Expand Down Expand Up @@ -300,3 +301,78 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
require.ElementsMatch(t, []int{5, 4, 3, 2}, replayObsvbl.Last(ctx, 4))
require.ElementsMatch(t, []int{5, 4, 3, 2, 1}, replayObsvbl.Last(ctx, 5))
}

func TestReplayObservable_SubscribeFromLatestBufferedOffset(t *testing.T) {
receiveTimeout := 100 * time.Millisecond
values := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

tests := []struct {
name string
replayBufferSize int
endOffset int
expectedValues []int
}{
{
name: "endOffset = replayBufferSize",
replayBufferSize: 8,
endOffset: 8,
expectedValues: values[2:], // []int{2, 3, 4, 5, ...},
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
},
{
name: "endOffset < replayBufferSize",
replayBufferSize: 10,
endOffset: 2,
expectedValues: values[8:], // []int{8, 9},
},
{
name: "endOffset > replayBufferSize",
replayBufferSize: 8,
endOffset: 10,
expectedValues: values[2:],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this doesn't start at values[len(values)-1:]?

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's values[len(values)-replayBufferSize].

// Published values
[ 1 2 3 4 5 ]

// Replay buffer
[ 5 4 3 2 1 ]

// SubscribeFromEndOffset(100)
[ 1 2 3 4 5 ]

// SubscribeFromLatestBufferedOffset(5)
[ 1 2 3 4 5 ]

// SubscribeFromLatestBufferedOffset(3)
[ 3 4 5 ]

},
{
name: "replayBufferSize < eldOffset < numBufferedValues ",
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
replayBufferSize: 20,
endOffset: 15,
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
expectedValues: values,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var ctx = context.Background()

replayObsvbl, publishCh :=
channel.NewReplayObservable[int](ctx, test.replayBufferSize)

for _, value := range values {
publishCh <- value
time.Sleep(time.Millisecond)
}

observer := replayObsvbl.SubscribeFromLatestBufferedOffset(ctx, test.endOffset)
// Assumes all values will be received within receiveTimeout.
actualValues := accumulateValues(observer, receiveTimeout)
require.EqualValues(t, test.expectedValues, actualValues)
})
}
}

func accumulateValues[V any](
observer observable.Observer[V],
timeout time.Duration,
) (values []V) {
for {
select {
case value, ok := <-observer.Ch():
if !ok {
return
}

values = append(values, value)
continue
case <-time.After(timeout):
return
}
}
}
8 changes: 8 additions & 0 deletions pkg/observable/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import "context"
// to new observers, before publishing new values to observers.
type ReplayObservable[V any] interface {
Observable[V]
// SubscribeFromLatestBufferedOffset returns an observer which is initially notified of
// values in the replay buffer, starting from the latest buffered value index - offset.
// After this range of the replay buffer is notified, the observer continues to be notified,
// in real-time, when the publishCh channel receives a value. If offset is greater than
// replayBufferSize or the number of elements it currently contains, the observer is notified
// of all elements in the replayBuffer, starting from the beginning. Passing 0 for offset
// is equivalent to calling Subscribe() on a non-replay observable.
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
SubscribeFromLatestBufferedOffset(ctx context.Context, offset int) Observer[V]
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// Last synchronously returns the last n values from the replay buffer with
// LIFO ordering
Last(ctx context.Context, n int) []V
Expand Down
Loading