Skip to content

Commit

Permalink
Merge branch 'main' into dk-heighliner
Browse files Browse the repository at this point in the history
  • Loading branch information
okdas authored May 8, 2024
2 parents e84a05e + f488165 commit f7ebc99
Show file tree
Hide file tree
Showing 18 changed files with 310 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ todo_count: ## Print a count of all the TODOs in the project

.PHONY: todo_this_commit
todo_this_commit: ## List all the TODOs needed to be done in this commit
grep --exclude-dir={.git,vendor,.vscode} --exclude=Makefile -r -e "TODO_IN_THIS_"
grep -n --exclude-dir={.git,vendor,.vscode,.idea} --exclude={Makefile,reviewdog.yml} -r -e "TODO_IN_THIS_"

####################
### Gateways ###
Expand Down
44 changes: 27 additions & 17 deletions pkg/client/block/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,46 @@ const (
func NewBlockClient(
ctx context.Context,
deps depinject.Config,
) (client.BlockClient, error) {
ctx, close := context.WithCancel(ctx)
opts ...client.BlockClientOption,
) (_ client.BlockClient, err error) {
ctx, cancel := context.WithCancel(ctx)

// latestBlockPublishCh is the channel that notifies the latestBlockReplayObs of a
// new block, whether it comes from a direct query or an event subscription query.
latestBlockReplayObs, latestBlockPublishCh := channel.NewReplayObservable[client.Block](ctx, 10)
bClient := &blockReplayClient{
latestBlockReplayObs: latestBlockReplayObs,
close: cancel,
}

for _, opt := range opts {
opt(bClient)
}

eventsReplayClient, err := events.NewEventsReplayClient[client.Block](
bClient.eventsReplayClient, err = events.NewEventsReplayClient[client.Block](
ctx,
deps,
committedBlocksQuery,
UnmarshalNewBlock,
defaultBlocksReplayLimit,
events.WithConnRetryLimit[client.Block](bClient.connRetryLimit),
)
if err != nil {
close()
cancel()
return nil, err
}

// latestBlockPublishCh is the channel that notifies the latestBlockReplayObs of a
// new block, whether it comes from a direct query or an event subscription query.
latestBlockReplayObs, latestBlockPublishCh := channel.NewReplayObservable[client.Block](ctx, 10)
blockReplayClient := &blockReplayClient{
eventsReplayClient: eventsReplayClient,
latestBlockReplayObs: latestBlockReplayObs,
close: close,
}

if err := depinject.Inject(deps, &blockReplayClient.onStartQueryClient); err != nil {
if err := depinject.Inject(deps, &bClient.onStartQueryClient); err != nil {
return nil, err
}

blockReplayClient.asyncForwardBlockEvent(ctx, latestBlockPublishCh)
bClient.asyncForwardBlockEvent(ctx, latestBlockPublishCh)

if err := blockReplayClient.getInitialBlock(ctx, latestBlockPublishCh); err != nil {
if err := bClient.getInitialBlock(ctx, latestBlockPublishCh); err != nil {
return nil, err
}

return blockReplayClient, nil
return bClient, nil
}

// blockReplayClient is BlockClient implementation that combines a CometRPC client
Expand All @@ -99,6 +104,11 @@ type blockReplayClient struct {

// close is a function that cancels the context of the blockReplayClient.
close context.CancelFunc

// connRetryLimit is the number of times the underlying replay client
// should retry in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}

// CommittedBlocksSequence returns a replay observable of new block events.
Expand Down
3 changes: 2 additions & 1 deletion pkg/client/block/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func TestBlockClient(t *testing.T) {
Hash: expectedHash,
},
}, nil
})
}).
AnyTimes()

deps := depinject.Supply(eventsQueryClient, cometClientMock)

Expand Down
13 changes: 13 additions & 0 deletions pkg/client/block/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package block

import "github.com/pokt-network/poktroll/pkg/client"

// WithConnRetryLimit returns an option function which sets the number
// of times the underlying replay client should retry in the event that it encounters
// an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit(limit int) client.BlockClientOption {
return func(client client.BlockClient) {
client.(*blockReplayClient).connRetryLimit = limit
}
}
20 changes: 17 additions & 3 deletions pkg/client/delegation/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,27 @@ const (
func NewDelegationClient(
ctx context.Context,
deps depinject.Config,
) (client.DelegationClient, error) {
client, err := events.NewEventsReplayClient[client.Redelegation](
opts ...client.DelegationClientOption,
) (_ client.DelegationClient, err error) {
dClient := &delegationClient{}

for _, opt := range opts {
opt(dClient)
}

dClient.eventsReplayClient, err = events.NewEventsReplayClient[client.Redelegation](
ctx,
deps,
delegationEventQuery,
newRedelegationEventFactoryFn(),
defaultRedelegationsReplayLimit,
events.WithConnRetryLimit[client.Redelegation](dClient.connRetryLimit),
)
if err != nil {
return nil, err
}
return &delegationClient{eventsReplayClient: client}, nil

return dClient, nil
}

// delegationClient is a wrapper around an EventsReplayClient that implements
Expand All @@ -69,6 +78,11 @@ type delegationClient struct {
// These enable the EventsReplayClient to correctly map the raw event bytes
// to Redelegation objects and to correctly return a RedelegationReplayObservable
eventsReplayClient client.EventsReplayClient[client.Redelegation]

// connRetryLimit is the number of times the underlying replay client
// should retry in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}

// RedelegationsSequence returns a replay observable of Redelgation events
Expand Down
13 changes: 13 additions & 0 deletions pkg/client/delegation/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package delegation

import "github.com/pokt-network/poktroll/pkg/client"

// WithConnRetryLimit returns an option function which sets the number
// of times the underlying replay client should retry in the event that it encounters
// an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit(limit int) client.DelegationClientOption {
return func(client client.DelegationClient) {
client.(*delegationClient).connRetryLimit = limit
}
}
15 changes: 15 additions & 0 deletions pkg/client/events/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,18 @@ func WithDialer(dialer client.Dialer) client.EventsQueryClientOption {
evtClient.(*eventsQueryClient).dialer = dialer
}
}

// WithConnRetryLimit returns an option function which sets the number
// of times the replay client should retry in the event that it encounters
// an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit[T any](limit int) client.EventsReplayClientOption[T] {
return func(client client.EventsReplayClient[T]) {
// Ignore the zero value because limit may be provided via a partially
// configured config struct (i.e. no retry limit set).
// The default will be used instead.
if limit != 0 {
client.(*replayClient[T]).connRetryLimit = limit
}
}
}
65 changes: 47 additions & 18 deletions pkg/client/events/replay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ import (
)

const (
// DefaultConnRetryLimit is used to indicate how many times the
// underlying replay client should attempt to retry if it encounters an error
// or its connection is interrupted.
//
// TODO_IMPROVE: this should be configurable but can be overridden at compile-time:
// go build -ldflags "-X github.com/pokt-network/poktroll/DefaultConnRetryLimit=value".
DefaultConnRetryLimit = 10

// eventsBytesRetryDelay is the delay between retry attempts when the events
// bytes observable returns an error.
eventsBytesRetryDelay = time.Second
// eventsBytesRetryLimit is the maximum number of times to attempt to
// re-establish the events query bytes subscription when the events bytes
// observable returns an error or closes.
// TODO_TECHDEBT: to make this a customizable parameter in the appgateserver and relayminer config files.
eventsBytesRetryLimit = 10
eventsBytesRetryResetTimeout = 10 * time.Second
// replayObsCacheBufferSize is the replay buffer size of the
Expand Down Expand Up @@ -81,6 +90,10 @@ type replayClient[T any] struct {
// replayClientCancelCtx is the function to cancel the context of the replay client.
// It is called when the replay client is closed.
replayClientCancelCtx func()
// connRetryLimit is the number of times the replay client should retry
// in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}

// NewEventsReplayClient creates a new EventsReplayClient from the given
Expand All @@ -98,6 +111,7 @@ func NewEventsReplayClient[T any](
queryString string,
newEventFn NewEventsFn[T],
replayObsBufferSize int,
opts ...client.EventsReplayClientOption[T],
) (client.EventsReplayClient[T], error) {
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -107,7 +121,13 @@ func NewEventsReplayClient[T any](
eventDecoder: newEventFn,
replayObsBufferSize: replayObsBufferSize,
replayClientCancelCtx: cancel,
connRetryLimit: DefaultConnRetryLimit,
}

for _, opt := range opts {
opt(rClient)
}

// TODO_REFACTOR(@h5law): Look into making this a regular observable as
// we may no longer depend on it being replayable.
replayObsCache, replayObsCachePublishCh := channel.NewReplayObservable[observable.ReplayObservable[T]](
Expand Down Expand Up @@ -189,26 +209,26 @@ func (rClient *replayClient[T]) Close() {
// goPublishEvents runs the work function returned by retryPublishEventsFactory,
// re-invoking it according to the arguments to retry.OnError when the events bytes
// observable returns an asynchronous error.
// This function is intended to be called in a goroutine.
func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to replayObsCachePublishCh such that
// replayObsCache.Last(ctx, 1) will return it.
publishError := retry.OnError(
publishErr := retry.OnError(
ctx,
eventsBytesRetryLimit,
rClient.connRetryLimit,
eventsBytesRetryDelay,
eventsBytesRetryResetTimeout,
"goPublishEvents",
rClient.retryPublishEventsFactory(ctx),
)

// If we get here, the retry limit was reached and the retry loop exited.
// Since this function runs in a goroutine, we can't return the error to the
// caller. Instead, we panic.
if publishError != nil {
panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishError))
if publishErr != nil {
panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishErr))
}

return
}

// retryPublishEventsFactory returns a function which is intended to be passed
Expand All @@ -217,20 +237,24 @@ func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
// replayObsCache replay observable.
func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) func() chan error {
return func() chan error {
eventsBzCtx, cancelEventsBzObs := context.WithCancel(ctx)
errCh := make(chan error, 1)
eventsBytesObs, err := rClient.eventsClient.EventsBytes(ctx, rClient.queryString)

eventsBytesObs, err := rClient.eventsClient.EventsBytes(eventsBzCtx, rClient.queryString)
if err != nil {
// No need to cancel eventsBytesObs in the case of a synchronous error.
errCh <- err
return errCh
}

// NB: must cast back to generic observable type to use with Map.
eventsBzObs := observable.Observable[either.Either[[]byte]](eventsBytesObs)

typedObs := channel.MapReplay(
ctx,
eventsBzCtx,
replayObsCacheBufferSize,
eventsBzObs,
rClient.newMapEventsBytesToTFn(errCh),
rClient.newMapEventsBytesToTFn(errCh, cancelEventsBzObs),
)

// Subscribe to the eventBzObs and block until the channel closes.
Expand Down Expand Up @@ -269,12 +293,12 @@ func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) f
// If deserialisation failed because the event bytes were for a different event
// type, this value is also skipped. If deserialisation failed for some other
// reason, this function panics.
func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func(
context.Context,
either.Bytes,
) (T, bool) {
func (rClient *replayClient[T]) newMapEventsBytesToTFn(
errCh chan<- error,
cancelEventsBzObs context.CancelFunc,
) func(context.Context, either.Bytes) (T, bool) {
return func(
_ context.Context,
ctx context.Context,
eitherEventBz either.Bytes,
) (_ T, skip bool) {
eventBz, err := eitherEventBz.ValueOrError()
Expand All @@ -296,10 +320,15 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func(
return *new(T), true
}

panic(fmt.Sprintf(
"unexpected error deserialising event: %v; eventBz: %s",
err, string(eventBz),
))
// Don't publish (skip) if there was some other kind of error,
// and send that error on the errCh.
errCh <- err

// The source observable may not necessarily close automatically in this case,
// cancel its context to ensure its closure and prevent a memory/goroutine leak.
cancelEventsBzObs()

return *new(T), true
}
return event, false
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ type TxClientOption func(TxClient)
// SupplierClientOption defines a function type that modifies the SupplierClient.
type SupplierClientOption func(SupplierClient)

// DelegationClientOption defines a function type that modifies the DelegationClient.
type DelegationClientOption func(DelegationClient)

// BlockClientOption defines a function type that modifies the BlockClient.
type BlockClientOption func(BlockClient)

// EventsReplayClientOption defines a function type that modifies the ReplayClient.
type EventsReplayClientOption[T any] func(EventsReplayClient[T])

// AccountQueryClient defines an interface that enables the querying of the
// on-chain account information
type AccountQueryClient interface {
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/tx/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ type txClient struct {
// is used to ensure that transactions error channels receive and close in the event
// that they have not already by the given timeout height.
txTimeoutPool txTimeoutPool

// connRetryLimit is the number of times the underlying replay client
// should retry in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}

type (
Expand Down Expand Up @@ -167,6 +172,7 @@ func NewTxClient(
eventQuery,
UnmarshalTxResult,
defaultTxReplayLimit,
events.WithConnRetryLimit[*abci.TxResult](txnClient.connRetryLimit),
)
if err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions pkg/client/tx/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ func WithSigningKeyName(keyName string) client.TxClientOption {
client.(*txClient).signingKeyName = keyName
}
}

// WithConnRetryLimit returns an option function which sets the number
// of times the underlying replay client should retry in the event that it encounters
// an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit(limit int) client.TxClientOption {
return func(client client.TxClient) {
client.(*txClient).connRetryLimit = limit
}
}
Loading

0 comments on commit f7ebc99

Please sign in to comment.