From 06f8040e0c4db51ed8b97679d8f356262229f53a Mon Sep 17 00:00:00 2001 From: harry <53987565+h5law@users.noreply.github.com> Date: Sat, 23 Dec 2023 02:35:18 +0000 Subject: [PATCH] [EventsReplayClient] Fix Replay Client Bugs (#267) --- docs/static/openapi.yml | 28 +++- pkg/appgateserver/cmd/cmd.go | 4 +- pkg/client/block/client.go | 2 - pkg/client/block/client_integration_test.go | 4 +- pkg/client/block/client_test.go | 3 +- pkg/client/delegation/client.go | 2 - .../delegation/client_integration_test.go | 16 +- pkg/client/delegation/client_test.go | 3 +- pkg/client/delegation/godoc.go | 2 +- pkg/client/events/errors.go | 1 + pkg/client/events/query_client.go | 5 +- pkg/client/events/replay_client.go | 147 ++++++++++++------ .../events/replay_client_example_test.go | 1 - .../events/replay_client_integration_test.go | 143 +++++++++++++++++ pkg/client/interface.go | 4 +- pkg/client/tx/client_integration_test.go | 9 +- pkg/deps/config/suppliers.go | 17 +- pkg/observable/channel/replay.go | 6 +- pkg/relayer/cmd/cmd.go | 2 +- pkg/retry/retry.go | 2 +- pkg/sdk/deps_builder.go | 2 +- testutil/testclient/testblock/client.go | 3 +- testutil/testclient/testdelegation/client.go | 3 +- .../testclient/testeventsquery/connection.go | 66 ++++++++ testutil/testclient/testtx/client.go | 2 +- 25 files changed, 371 insertions(+), 106 deletions(-) create mode 100644 pkg/client/events/replay_client_integration_test.go diff --git a/docs/static/openapi.yml b/docs/static/openapi.yml index d1e14ae06..7f1675c88 100644 --- a/docs/static/openapi.yml +++ b/docs/static/openapi.yml @@ -47176,7 +47176,7 @@ paths: service: title: >- The Service for which the application is - configured for + configured type: object properties: id: @@ -47243,7 +47243,7 @@ paths: service: title: >- The Service for which the supplier is - configured for + configured type: object properties: id: @@ -47931,12 +47931,14 @@ paths: - GRPC - WEBSOCKET - JSON_RPC + - REST default: UNKNOWN_RPC description: |- - UNKNOWN_RPC: Undefined RPC type - GRPC: gRPC - WEBSOCKET: WebSocket - JSON_RPC: JSON-RPC + - REST: REST configs: type: array items: @@ -48080,12 +48082,14 @@ paths: - GRPC - WEBSOCKET - JSON_RPC + - REST default: UNKNOWN_RPC description: |- - UNKNOWN_RPC: Undefined RPC type - GRPC: gRPC - WEBSOCKET: WebSocket - JSON_RPC: JSON-RPC + - REST: REST configs: type: array items: @@ -77536,7 +77540,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured for + title: The Service for which the application is configured type: object properties: id: @@ -77600,7 +77604,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured for + title: The Service for which the supplier is configured type: object properties: id: @@ -77790,7 +77794,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured for + title: The Service for which the application is configured type: object properties: id: @@ -77852,7 +77856,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured for + title: The Service for which the supplier is configured type: object properties: id: @@ -78032,6 +78036,7 @@ definitions: - GRPC - WEBSOCKET - JSON_RPC + - REST default: UNKNOWN_RPC description: |- - UNKNOWN_RPC: Undefined RPC type @@ -78066,7 +78071,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured for + title: The Service for which the supplier is configured type: object properties: id: @@ -78099,6 +78104,7 @@ definitions: - GRPC - WEBSOCKET - JSON_RPC + - REST default: UNKNOWN_RPC description: |- - UNKNOWN_RPC: Undefined RPC type @@ -78156,6 +78162,7 @@ definitions: - GRPC - WEBSOCKET - JSON_RPC + - REST default: UNKNOWN_RPC description: |- - UNKNOWN_RPC: Undefined RPC type @@ -78195,7 +78202,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured for + title: The Service for which the supplier is configured type: object properties: id: @@ -78227,6 +78234,7 @@ definitions: - GRPC - WEBSOCKET - JSON_RPC + - REST default: UNKNOWN_RPC description: |- - UNKNOWN_RPC: Undefined RPC type @@ -78481,12 +78489,14 @@ definitions: - GRPC - WEBSOCKET - JSON_RPC + - REST default: UNKNOWN_RPC description: |- - UNKNOWN_RPC: Undefined RPC type - GRPC: gRPC - WEBSOCKET: WebSocket - JSON_RPC: JSON-RPC + - REST: REST configs: type: array items: @@ -78661,12 +78671,14 @@ definitions: - GRPC - WEBSOCKET - JSON_RPC + - REST default: UNKNOWN_RPC description: |- - UNKNOWN_RPC: Undefined RPC type - GRPC: gRPC - WEBSOCKET: WebSocket - JSON_RPC: JSON-RPC + - REST: REST configs: type: array items: diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index 4810bb67a..657419e46 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -158,13 +158,13 @@ func setupAppGateServerDependencies( supplierFuncs := []config.SupplierFn{ config.NewSupplyLoggerFromCtx(ctx), config.NewSupplyEventsQueryClientFn(queryNodeURL.Host), // leaf - config.NewSupplyBlockClientFn(queryNodeURL.Host), // leaf + config.NewSupplyBlockClientFn(), // leaf config.NewSupplyQueryClientContextFn(queryNodeURL.String()), // leaf config.NewSupplyAccountQuerierFn(), // leaf config.NewSupplyApplicationQuerierFn(), // leaf config.NewSupplySessionQuerierFn(), // leaf config.NewSupplyRingCacheFn(), - config.NewSupplyPOKTRollSDKFn(queryNodeURL, appGateConfig.SigningKey), + config.NewSupplyPOKTRollSDKFn(appGateConfig.SigningKey), } return config.SupplyConfig(ctx, cmd, supplierFuncs) diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index cb04bdc07..3a462f6c4 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -35,7 +35,6 @@ const ( func NewBlockClient( ctx context.Context, deps depinject.Config, - cometWebsocketURL string, ) (client.BlockClient, error) { client, err := events.NewEventsReplayClient[ client.Block, @@ -43,7 +42,6 @@ func NewBlockClient( ]( ctx, deps, - cometWebsocketURL, committedBlocksQuery, newCometBlockEventFactoryFn(), defaultBlocksReplayLimit, diff --git a/pkg/client/block/client_integration_test.go b/pkg/client/block/client_integration_test.go index 324b032a5..45035ece2 100644 --- a/pkg/client/block/client_integration_test.go +++ b/pkg/client/block/client_integration_test.go @@ -1,5 +1,3 @@ -//go:build integration - package block_test import ( @@ -19,6 +17,7 @@ import ( const blockIntegrationSubTimeout = 5 * time.Second func TestBlockClient_LastNBlocks(t *testing.T) { + t.Skip("TODO(@h5law): Figure out how to subscribe to events on the simulated localnet") ctx := context.Background() blockClient := testblock.NewLocalnetClient(ctx, t) @@ -29,6 +28,7 @@ func TestBlockClient_LastNBlocks(t *testing.T) { } func TestBlockClient_BlocksObservable(t *testing.T) { + t.Skip("TODO(@h5law): Figure out how to subscribe to events on the simulated localnet") ctx := context.Background() blockClient := testblock.NewLocalnetClient(ctx, t) diff --git a/pkg/client/block/client_test.go b/pkg/client/block/client_test.go index 061c42f26..e44c8f32f 100644 --- a/pkg/client/block/client_test.go +++ b/pkg/client/block/client_test.go @@ -12,7 +12,6 @@ import ( "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/block" - "github.com/pokt-network/poktroll/testutil/testclient" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" ) @@ -53,7 +52,7 @@ func TestBlockClient(t *testing.T) { deps := depinject.Supply(eventsQueryClient) // Set up block client. - blockClient, err := block.NewBlockClient(ctx, deps, testclient.CometLocalWebsocketURL) + blockClient, err := block.NewBlockClient(ctx, deps) require.NoError(t, err) require.NotNil(t, blockClient) diff --git a/pkg/client/delegation/client.go b/pkg/client/delegation/client.go index 883969f6d..e4baae743 100644 --- a/pkg/client/delegation/client.go +++ b/pkg/client/delegation/client.go @@ -36,7 +36,6 @@ const ( func NewDelegationClient( ctx context.Context, deps depinject.Config, - cometWebsocketURL string, ) (client.DelegationClient, error) { client, err := events.NewEventsReplayClient[ client.Redelegation, @@ -44,7 +43,6 @@ func NewDelegationClient( ]( ctx, deps, - cometWebsocketURL, delegationEventQuery, newRedelegationEventFactoryFn(), defaultRedelegationsReplayLimit, diff --git a/pkg/client/delegation/client_integration_test.go b/pkg/client/delegation/client_integration_test.go index e52922442..1ea3cbaec 100644 --- a/pkg/client/delegation/client_integration_test.go +++ b/pkg/client/delegation/client_integration_test.go @@ -1,5 +1,3 @@ -//go:build integration - package delegation_test // TODO(@h5law): Figure out how to use real components of the localnet @@ -8,7 +6,7 @@ package delegation_test // - Delegate to the gateway // - Undelegate from the gateway // Currently this test doesn't work, because (I think) it is using a mock -// keeper etc and this isnt actually interacting with the localnet where +// keeper etc and this isn't actually interacting with the localnet where // the DelegationClient is listening for events from. import ( @@ -34,10 +32,10 @@ const ( ) // TODO_UPNEXT(@h5law): Figure out the correct way to subscribe to events on the -// simulated localnet. Currently this test doesn't work. Although the block client -// subscribes it doesn't receive any events. +// simulated localnet. Currently this test doesn't work. Although the delegation +// client subscribes it doesn't receive any events. func TestDelegationClient_RedelegationsObservables(t *testing.T) { - t.SkipNow() + t.Skip("TODO(@h5law): Figure out how to subscribe to events on the simulated localnet") // Create the network with 2 applications and 1 gateway net, appAddresses, gatewayAddr := createNetworkWithApplicationsAndGateways(t) ctx, cancel := context.WithCancel(context.Background()) @@ -46,7 +44,7 @@ func TestDelegationClient_RedelegationsObservables(t *testing.T) { // Create the delegation client evtQueryClient := events.NewEventsQueryClient("ws://localhost:26657/websocket") deps := depinject.Supply(evtQueryClient) - delegationClient, err := delegation.NewDelegationClient(ctx, deps, "ws://localhost:26657/websocket") + delegationClient, err := delegation.NewDelegationClient(ctx, deps) require.NoError(t, err) require.NotNil(t, delegationClient) t.Cleanup(func() { @@ -74,7 +72,7 @@ func TestDelegationClient_RedelegationsObservables(t *testing.T) { // of the Redelegation event alternates between app1 and app2 if previousRedelegation != nil { require.NotEqual(t, previousRedelegation.GetAppAddress(), change.GetAppAddress()) - if previousRedelegation.AppAddress() == appAddresses[0] { + if previousRedelegation.GetAppAddress() == appAddresses[0] { require.Equal(t, appAddresses[1], change.GetAppAddress()) } else { require.Equal(t, appAddresses[0], change.GetAppAddress()) @@ -129,7 +127,7 @@ func TestDelegationClient_RedelegationsObservables(t *testing.T) { } // createNetworkWithApplicationsAndGateways creates a network with 2 applications -// and 1 gateway. It returns the network with all accoutns initialized via a +// and 1 gateway. It returns the network with all accounts initialized via a // transaction from the first validator. func createNetworkWithApplicationsAndGateways( t *testing.T, diff --git a/pkg/client/delegation/client_test.go b/pkg/client/delegation/client_test.go index 8fd13d4a3..ccb6beacd 100644 --- a/pkg/client/delegation/client_test.go +++ b/pkg/client/delegation/client_test.go @@ -12,7 +12,6 @@ import ( "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/delegation" "github.com/pokt-network/poktroll/testutil/sample" - "github.com/pokt-network/poktroll/testutil/testclient" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" apptypes "github.com/pokt-network/poktroll/x/application/types" ) @@ -47,7 +46,7 @@ func TestDelegationClient(t *testing.T) { // Set up delegation client. // NB: the URL passed to `NewDelegationClient` is irrelevant here because `eventsQueryClient` is a mock. - delegationClient, err := delegation.NewDelegationClient(ctx, deps, testclient.CometLocalWebsocketURL) + delegationClient, err := delegation.NewDelegationClient(ctx, deps) require.NoError(t, err) require.NotNil(t, delegationClient) diff --git a/pkg/client/delegation/godoc.go b/pkg/client/delegation/godoc.go index 76fad49ba..1d5b00139 100644 --- a/pkg/client/delegation/godoc.go +++ b/pkg/client/delegation/godoc.go @@ -1,4 +1,4 @@ -// Package delegation contains a light wrapper of the EventsReplayClient[DeelgateeChange] +// Package delegation contains a light wrapper of the EventsReplayClient[Redelegation] // generic which listens for redelegation events on chain and emits them // through a ReplayObservable. This enables consumers to listen for on-chain // application redelegation events and react to them asynchronously. diff --git a/pkg/client/events/errors.go b/pkg/client/events/errors.go index 3f6afaf58..838b9dbec 100644 --- a/pkg/client/events/errors.go +++ b/pkg/client/events/errors.go @@ -11,4 +11,5 @@ var ( ErrEventsConnClosed = sdkerrors.Register(codespace, 2, "connection closed") ErrEventsSubscribe = sdkerrors.Register(codespace, 3, "failed to subscribe to events") ErrEventsUnmarshalEvent = sdkerrors.Register(codespace, 4, "failed to unmarshal event bytes") + ErrEventsConsClosed = sdkerrors.Register(codespace, 5, "eventsqueryclient connection closed") ) diff --git a/pkg/client/events/query_client.go b/pkg/client/events/query_client.go index 41cafe98f..5524f52dc 100644 --- a/pkg/client/events/query_client.go +++ b/pkg/client/events/query_client.go @@ -227,6 +227,7 @@ func (eqc *eventsQueryClient) goPublishEventsBz( } eqc.close() + return } @@ -244,8 +245,8 @@ func (eqc *eventsQueryClient) goUnsubscribeOnDone( // Wait for the context to be done. <-ctx.Done() // Only close the eventsBytes for the given query. - eqc.eventsBytesAndConnsMu.RLock() - defer eqc.eventsBytesAndConnsMu.RUnlock() + eqc.eventsBytesAndConnsMu.Lock() + defer eqc.eventsBytesAndConnsMu.Unlock() if eventsBzConn, ok := eqc.eventsBytesAndConns[query]; ok { // Unsubscribe all observers of the given query's eventsBzConn's observable diff --git a/pkg/client/events/replay_client.go b/pkg/client/events/replay_client.go index ac3242056..c196f9473 100644 --- a/pkg/client/events/replay_client.go +++ b/pkg/client/events/replay_client.go @@ -20,7 +20,7 @@ const ( eventsBytesRetryDelay = time.Second // eventsBytesRetryLimit is the maximum number of times to attempt to // re-establish the events query bytes subscription when the events bytes - // observable returns an error. + // observable returns an error or closes. eventsBytesRetryLimit = 10 eventsBytesRetryResetTimeout = 10 * time.Second // replayObsCacheBufferSize is the replay buffer size of the @@ -44,10 +44,7 @@ type NewEventsFn[T any] func([]byte) (T, error) // replayClient implements the EventsReplayClient interface for a generic type T, // and replay observable for type T. -type replayClient[T any, U observable.ReplayObservable[T]] struct { - // endpointURL is the URL of RPC endpoint which eventsClient subscription - // requests will be sent. - endpointURL string +type replayClient[T any, R observable.ReplayObservable[T]] struct { // queryString is the query string used to subscribe to events of the // desired type. // See: https://docs.cosmos.network/main/learn/advanced/events#subscribing-to-events @@ -61,18 +58,26 @@ type replayClient[T any, U observable.ReplayObservable[T]] struct { // message bytes into the type defined by the EventsReplayClient's generic type // parameter. eventDecoder NewEventsFn[T] - // replayObsCache is a replay observable with replay buffer size 1, - // which holds the "active latest observable" which is notified when - // new events are received by the events query client subscription - // created in goPublishEvents. This observable (and the one it emits) closes - // when the events bytes observable returns an error and is updated with a - // new "active" observable after a new events query subscription is created. - replayObsCache observable.ReplayObservable[U] + // replayObsBufferSize is the buffer size for the replay observable returned + // by EventsSequence, this can be any integer and it refers to the number of + // notifications the replay observable will hold in its buffer, that can be + // replayed to new observers. + // NB: This is not the buffer size of the replayObsCache + replayObsBufferSize int + // replayObsCache is a replay observable with a buffer size of 1, which + // holds the "active latest replay observable" which is notified when new + // events are received by the events query client subscription created in + // goPublishEvents. This observable (and the one it emits) closes when the + // events bytes observable returns an error and is updated with a new + // "active" observable after a new events query subscription is created. + // TODO_REFACTOR(@h5law): Look into making this a regular observable as + // we no depend on it being replayable. + replayObsCache observable.ReplayObservable[R] // replayObsCachePublishCh is the publish channel for replayObsCache. // It's used to set and subsequently update replayObsCache the events replay // observable; // For example when the connection is re-established after erroring. - replayObsCachePublishCh chan<- U + replayObsCachePublishCh chan<- R } // NewEventsReplayClient creates a new EventsReplayClient from the given @@ -84,25 +89,28 @@ type replayClient[T any, U observable.ReplayObservable[T]] struct { // // Required dependencies: // - client.EventsQueryClient -func NewEventsReplayClient[T any, U observable.ReplayObservable[T]]( +func NewEventsReplayClient[T any, R observable.ReplayObservable[T]]( ctx context.Context, deps depinject.Config, - cometWebsocketURL string, queryString string, newEventFn NewEventsFn[T], replayObsBufferSize int, -) (client.EventsReplayClient[T, U], error) { +) (client.EventsReplayClient[T, R], error) { // Initialize the replay client - rClient := &replayClient[T, U]{ - endpointURL: cometWebsocketURL, - queryString: queryString, - eventDecoder: newEventFn, + rClient := &replayClient[T, R]{ + queryString: queryString, + eventDecoder: newEventFn, + replayObsBufferSize: replayObsBufferSize, } - replayObsCache, replayObsCachePublishCh := channel.NewReplayObservable[U]( + // TODO_REFACTOR(@h5law): Look into making this a regular observable as + // we no depend on it being replayable. + replayObsCache, replayObsCachePublishCh := channel.NewReplayObservable[R]( ctx, - replayObsBufferSize, + // Buffer size of 1 as the cache only needs to hold the latest + // active replay observable. + replayObsCacheBufferSize, ) - rClient.replayObsCache = observable.ReplayObservable[U](replayObsCache) + rClient.replayObsCache = observable.ReplayObservable[R](replayObsCache) rClient.replayObsCachePublishCh = replayObsCachePublishCh // Inject dependencies @@ -110,22 +118,56 @@ func NewEventsReplayClient[T any, U observable.ReplayObservable[T]]( return nil, err } - // Concurrently publish blocks to the observable emitted by latestObsvbls. + // Concurrently publish events to the observable emitted by replayObsCache. go rClient.goPublishEvents(ctx) return rClient, nil } -// EventsSequence returns a ReplayObservable, with a replay buffer size of 1, -// which is notified when new events are received by the encapsulated -// EventsQueryClient. +// EventsSequence returns a new ReplayObservable, with the buffer size provided +// during the EventsReplayClient construction, which is notified when new +// events are received by the encapsulated EventsQueryClient. func (rClient *replayClient[T, R]) EventsSequence(ctx context.Context) R { - // Get the active event replay observable from replayObsCache. Only the last - // element is useful as any prior elements are closed replay observables. - // Directly accessing the zeroth index here is safe because the call to Last - // is guaranteed to return a slice with at least 1 element. - replayObs := observable.ReplayObservable[R](rClient.replayObsCache) - return replayObs.Last(ctx, 1)[0] + // Create a new replay observable and publish channel for event type T with + // a buffer size matching that provided during the EventsReplayClient + // construction. + eventTypeObs, replayEventTypeObsPublishCh := channel.NewReplayObservable[T]( + ctx, + rClient.replayObsBufferSize, + ) + + // Ensure that the subscribers of the returned eventTypeObs receive + // notifications from the latest open replay observable. + go rClient.goRemapEventsSequence(ctx, replayEventTypeObsPublishCh) + + // Return the event type observable. + return eventTypeObs.(R) +} + +// goRemapEventsSequence publishes events observed by the most recent cached +// events type replay observable to the given publishCh +func (rClient *replayClient[T, R]) goRemapEventsSequence(ctx context.Context, publishCh chan<- T) { + var prevEventTypeObs observable.ReplayObservable[T] + channel.ForEach[R]( + ctx, + rClient.replayObsCache, + func(ctx context.Context, eventTypeObs R) { + if prevEventTypeObs != nil { + // Just in case the assumption that all transport errors are + // persistent does not hold, unsubscribe from the previous + // event type observable in order to prevent unexpected and/or + // duplicate notifications on the obsersvable returned by this + // function. + prevEventTypeObs.UnsubscribeAll() + } else { + prevEventTypeObs = eventTypeObs + } + eventObserver := eventTypeObs.Subscribe(ctx) + for event := range eventObserver.Ch() { + publishCh <- event + } + }, + ) } // LastNEvents returns the last N typed events that have been received by the @@ -148,9 +190,9 @@ func (rClient *replayClient[T, R]) Close() { // This function is intended to be called in a goroutine. func (rClient *replayClient[T, R]) goPublishEvents(ctx context.Context) { // React to errors by getting a new events bytes observable, re-mapping it, - // and send it to latestObsvblsReplayPublishCh such that - // latestObsvbls.Last(ctx, 1) will return it. - publishErr := retry.OnError( + // and send it to replayObsCachePublishCh such that + // replayObsCache.Last(ctx, 1) will return it. + publishError := retry.OnError( ctx, eventsBytesRetryLimit, eventsBytesRetryDelay, @@ -162,36 +204,49 @@ func (rClient *replayClient[T, R]) goPublishEvents(ctx context.Context) { // If we get here, the retry limit was reached and the retry loop exited. // Since this function runs in a goroutine, we can't return the error to the // caller. Instead, we panic. - if publishErr != nil { - panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishErr)) + if publishError != nil { + panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishError)) } } // retryPublishEventsFactory returns a function which is intended to be passed // to retry.OnError. The returned function pipes event bytes from the events -// query client, maps them to block events, and publishes them to the -// latestObsvbls replay observable. +// query client, maps them to typed events, and publishes them to the +// replayObsCache replay observable. func (rClient *replayClient[T, R]) retryPublishEventsFactory(ctx context.Context) func() chan error { return func() chan error { errCh := make(chan error, 1) - eventsBzObs, err := rClient.eventsClient.EventsBytes(ctx, rClient.queryString) + eventsBytesObs, err := rClient.eventsClient.EventsBytes(ctx, rClient.queryString) if err != nil { errCh <- err return errCh } // NB: must cast back to generic observable type to use with Map. - // client.BlocksObservable cannot be an alias due to gomock's lack of - // support for generic types. - eventsBz := observable.Observable[either.Either[[]byte]](eventsBzObs) + eventsBzObs := observable.Observable[either.Either[[]byte]](eventsBytesObs) typedObs := channel.MapReplay( ctx, replayObsCacheBufferSize, - eventsBz, + eventsBzObs, rClient.newMapEventsBytesToTFn(errCh), ) - // Initially set latestObsvbls and update if after retrying on error. + // Subscribe to the eventBzObs and block until the channel closes. + // Then pass this as an error to force the retry.OnError to resubscribe. + go func() { + eventsBzObserver := eventsBzObs.Subscribe(ctx) + for range eventsBzObserver.Ch() { + // Wait for the channel to close. + continue + } + // UnsubscribeAll downstream observers, as the source observable has + // closed and will not emit any more values. + typedObs.UnsubscribeAll() + // Publish an error to the error channel to initiate a retry + errCh <- ErrEventsConsClosed + }() + + // Initially set replayObsCache and update if after retrying on error. rClient.replayObsCachePublishCh <- typedObs.(R) return errCh diff --git a/pkg/client/events/replay_client_example_test.go b/pkg/client/events/replay_client_example_test.go index b9e3c13bc..53322cb45 100644 --- a/pkg/client/events/replay_client_example_test.go +++ b/pkg/client/events/replay_client_example_test.go @@ -79,7 +79,6 @@ func ExampleNewEventsReplayClient() { ]( ctx, depConfig, - cometWebsocketURL, eventQueryString, eventTypeFactory(ctx), replayObsBufferSize, diff --git a/pkg/client/events/replay_client_integration_test.go b/pkg/client/events/replay_client_integration_test.go new file mode 100644 index 000000000..a54a0b61e --- /dev/null +++ b/pkg/client/events/replay_client_integration_test.go @@ -0,0 +1,143 @@ +package events_test + +import ( + "context" + "encoding/json" + "fmt" + "sync/atomic" + "testing" + "time" + + "cosmossdk.io/depinject" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" +) + +// Create the generic event type and decoder for the replay client + +var _ messageEvent = (*tEvent)(nil) + +type messageEvent interface { + EventMessage() string +} + +type tEvent struct { + Message string `json:"message"` +} + +type messageEventReplayObs observable.ReplayObservable[messageEvent] + +func (t *tEvent) EventMessage() string { + return t.Message +} + +func newDecodeEventMessageFn() events.NewEventsFn[messageEvent] { + return func(eventBz []byte) (messageEvent, error) { + t := new(tEvent) + if err := json.Unmarshal(eventBz, t); err != nil { + return nil, err + } + if t.Message == "" { + return nil, events.ErrEventsUnmarshalEvent + } + return t, nil + } +} + +// newMessageEventBz returns a new message event in JSON format +func newMessageEventBz(eventNum int32) []byte { + return []byte(fmt.Sprintf(`{"message":"message_%d"}`, eventNum)) +} + +func TestReplayClient_Remapping(t *testing.T) { + var ( + ctx = context.Background() + connClosed atomic.Bool + firstEventDelayed atomic.Bool + readEventCounter atomic.Int32 + eventsReceived atomic.Int32 + eventsToRecv = int32(10) + errCh = make(chan error, 1) + timeoutAfter = 3 * time.Second // 1 second delay on retry.OnError + ) + + // Setup the mock connection and dialer + connMock, dialerMock := testeventsquery.NewNTimesReconnectMockConnAndDialer(t, 2, &connClosed, &firstEventDelayed) + // Mock the connection receiving events + connMock.EXPECT().Receive(). + // Receive is called in the tightest loop possible (max speed limited + // by a goroutine) and as such the sleep's within are used to slow down + // the time between events to prevent unexpected behavior. As in this + // test environment, there are no "real" delays between "#Receive" calls + // (events being emitted) and as such the sleep's enable the publishing + // of notifications to observers to occur in a flake-free manner. + DoAndReturn(func() (any, error) { + // Simulate ErrConnClosed if connection is isClosed. + if connClosed.Load() { + return nil, events.ErrEventsConnClosed + } + + // Delay the event if needed, this is to allow for the events query + // client to subscribe and receive the first event. + if !firstEventDelayed.CompareAndSwap(false, true) { + time.Sleep(50 * time.Millisecond) + } + + eventNum := readEventCounter.Add(1) - 1 + event := newMessageEventBz(eventNum) + if eventNum == 2 { + // Simulate the connection closing + connMock.Close() + } + + // Simulate IO delay between sequential events. + time.Sleep(50 * time.Microsecond) + + return event, nil + }). + MinTimes(int(eventsToRecv)) + + // Setup the events query client dependency + dialerOpt := events.WithDialer(dialerMock) + queryClient := events.NewEventsQueryClient("", dialerOpt) + deps := depinject.Supply(queryClient) + + // Create the replay client + replayClient, err := events.NewEventsReplayClient[messageEvent, messageEventReplayObs]( + ctx, + deps, + "", // subscription query string + newDecodeEventMessageFn(), + 100, // replay buffer size + ) + require.NoError(t, err) + + channel.ForEach( + ctx, + observable.Observable[messageEvent](replayClient.EventsSequence(ctx)), + func(ctx context.Context, event messageEvent) { + require.NotEmpty(t, event) + received := eventsReceived.Add(1) + if received >= eventsToRecv { + errCh <- nil + return + } + }, + ) + + select { + case err := <-errCh: + require.NoError(t, err) + eventsRecv := eventsReceived.Load() + require.Equalf(t, eventsToRecv, eventsRecv, "received %d events, want: %d", eventsReceived.Load(), eventsRecv) + case <-time.After(timeoutAfter): + t.Fatalf( + "timed out waiting for events subscription; expected %d messages, got %d", + eventsToRecv, eventsReceived.Load(), + ) + } +} diff --git a/pkg/client/interface.go b/pkg/client/interface.go index a857c14b2..0a2f2cef2 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -162,8 +162,8 @@ type DelegationClient interface { // RedelegationsSequence returns a Observable of Redelegations that // emits the latest redelegation events that have occurred on chain. RedelegationsSequence(context.Context) RedelegationReplayObservable - // LastNBlocks returns the latest N blocks that have been committed to - // the chain. + // LastNRedelegations returns the latest N redelegation events that have + // occurred on chain. LastNRedelegations(context.Context, int) []Redelegation // Close unsubscribes all observers of the committed block sequence // observable and closes the events query client. diff --git a/pkg/client/tx/client_integration_test.go b/pkg/client/tx/client_integration_test.go index 2716e9f64..67d4b71ef 100644 --- a/pkg/client/tx/client_integration_test.go +++ b/pkg/client/tx/client_integration_test.go @@ -10,20 +10,21 @@ import ( "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/tx" "github.com/pokt-network/poktroll/testutil/testclient/testblock" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" "github.com/pokt-network/poktroll/testutil/testclient/testkeyring" "github.com/pokt-network/poktroll/testutil/testclient/testtx" - - "github.com/pokt-network/poktroll/pkg/client" apptypes "github.com/pokt-network/poktroll/x/application/types" ) func TestTxClient_SignAndBroadcast_Integration(t *testing.T) { - t.Skip("TODO_TECHDEBT: this test depends on some setup which is currently not implemented in this test: staked application and servicer with matching services") + t.Skip( + "TODO_TECHDEBT: this test depends on some setup which is currently not implemented in this test: staked application and servicer with matching services", + ) - var ctx = context.Background() + ctx := context.Background() keyring, signingKey := testkeyring.NewTestKeyringWithKey(t, testSigningKeyName) diff --git a/pkg/deps/config/suppliers.go b/pkg/deps/config/suppliers.go index d91dc2822..ca439cbe3 100644 --- a/pkg/deps/config/suppliers.go +++ b/pkg/deps/config/suppliers.go @@ -2,13 +2,14 @@ package config import ( "context" - "net/url" "cosmossdk.io/depinject" cosmosclient "github.com/cosmos/cosmos-sdk/client" cosmosflags "github.com/cosmos/cosmos-sdk/client/flags" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" grpc "github.com/cosmos/gogoproto/grpc" + "github.com/spf13/cobra" + "github.com/pokt-network/poktroll/pkg/client/block" "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query" @@ -17,7 +18,6 @@ import ( "github.com/pokt-network/poktroll/pkg/crypto/rings" "github.com/pokt-network/poktroll/pkg/polylog" "github.com/pokt-network/poktroll/pkg/sdk" - "github.com/spf13/cobra" ) // SupplierFn is a function that is used to supply a depinject config. @@ -77,18 +77,16 @@ func NewSupplyEventsQueryClientFn(queryHost string) SupplierFn { } // NewSupplyBlockClientFn returns a function which constructs a BlockClient -// instance with the given hostname, which is converted into a websocket URL, -// to listen for block events on-chain, and returns a new depinject.Config which -// is supplied with the given deps and the new BlockClient. -func NewSupplyBlockClientFn(queryHost string) SupplierFn { +// instance and returns a new depinject.Config which is supplied with the +// given deps and the new BlockClient. +func NewSupplyBlockClientFn() SupplierFn { return func( ctx context.Context, deps depinject.Config, _ *cobra.Command, ) (depinject.Config, error) { - // Convert the host to a websocket URL - pocketNodeWebsocketURL := sdk.HostToWebsocketURL(queryHost) - blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketURL) + // Requires a query client to be supplied to the deps + blockClient, err := block.NewBlockClient(ctx, deps) if err != nil { return nil, err } @@ -287,7 +285,6 @@ func NewSupplyRingCacheFn() SupplierFn { // POKTRollSDK instance with the required dependencies and returns a new // depinject.Config which is supplied with the given deps and the new POKTRollSDK. func NewSupplyPOKTRollSDKFn( - queryNodeURL *url.URL, signingKeyName string, ) SupplierFn { return func( diff --git a/pkg/observable/channel/replay.go b/pkg/observable/channel/replay.go index 3b7de8254..ef9096855 100644 --- a/pkg/observable/channel/replay.go +++ b/pkg/observable/channel/replay.go @@ -82,7 +82,7 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V { // Use a temporary observer to accumulate replay values. // Subscribe will always start with the replay buffer, so we can safely - // leverage it here for syncrhonization (i.e. blocking until at least 1 + // leverage it here for synchronization (i.e. blocking until at least 1 // notification has been accumulated). This also eliminates the need for // locking and/or copying the replay buffer. tempObserver := ro.Subscribe(ctx) @@ -97,7 +97,7 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V { Msg("requested replay buffer size is greater than replay buffer capacity; returning entire replay buffer") } - // accumulateReplayValues works concurrently and returns a context and cancelation + // accumulateReplayValues works concurrently and returns a context and cancellation // function for signaling completion. return accumulateReplayValues(tempObserver, n) } @@ -122,7 +122,7 @@ func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observe ro.observerManager.add(observer) - // caller can rely on context cancelation or call UnsubscribeAll() to unsubscribe + // caller can rely on context cancellation or call UnsubscribeAll() to unsubscribe // active observers if ctx != nil { // asynchronously wait for the context to be done and then unsubscribe diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 1812bc5d7..3d37693d5 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -152,7 +152,7 @@ func setupRelayerDependencies( supplierFuncs := []config.SupplierFn{ config.NewSupplyLoggerFromCtx(ctx), config.NewSupplyEventsQueryClientFn(queryNodeURL.Host), // leaf - config.NewSupplyBlockClientFn(queryNodeURL.Host), // leaf + config.NewSupplyBlockClientFn(), // leaf config.NewSupplyQueryClientContextFn(queryNodeURL.String()), // leaf supplyMiner, // leaf config.NewSupplyTxClientContextFn(networkNodeURL.String()), // leaf diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index e25d6c0c9..bd06af941 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -71,7 +71,7 @@ func OnError( logger.Error(). Str("work_name", workName). Err(err). - Msg("on final retry") + Msgf("on retry: %d", retryCount) } } } diff --git a/pkg/sdk/deps_builder.go b/pkg/sdk/deps_builder.go index a8501967f..d9113e197 100644 --- a/pkg/sdk/deps_builder.go +++ b/pkg/sdk/deps_builder.go @@ -35,7 +35,7 @@ func (sdk *poktrollSDK) buildDeps( deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) // Create and supply the block client that depends on the events query client - blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketURL) + blockClient, err := block.NewBlockClient(ctx, deps) if err != nil { return nil, err } diff --git a/testutil/testclient/testblock/client.go b/testutil/testclient/testblock/client.go index 9be028441..91c1641b5 100644 --- a/testutil/testclient/testblock/client.go +++ b/testutil/testclient/testblock/client.go @@ -13,7 +13,6 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/testutil/mockclient" - "github.com/pokt-network/poktroll/testutil/testclient" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" ) @@ -26,7 +25,7 @@ func NewLocalnetClient(ctx context.Context, t *testing.T) client.BlockClient { require.NotNil(t, queryClient) deps := depinject.Supply(queryClient) - bClient, err := block.NewBlockClient(ctx, deps, testclient.CometLocalWebsocketURL) + bClient, err := block.NewBlockClient(ctx, deps) require.NoError(t, err) return bClient diff --git a/testutil/testclient/testdelegation/client.go b/testutil/testclient/testdelegation/client.go index c70afc4e9..a055579df 100644 --- a/testutil/testclient/testdelegation/client.go +++ b/testutil/testclient/testdelegation/client.go @@ -13,7 +13,6 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/testutil/mockclient" - "github.com/pokt-network/poktroll/testutil/testclient" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" ) @@ -26,7 +25,7 @@ func NewLocalnetClient(ctx context.Context, t *testing.T) client.DelegationClien require.NotNil(t, queryClient) deps := depinject.Supply(queryClient) - dClient, err := delegation.NewDelegationClient(ctx, deps, testclient.CometLocalWebsocketURL) + dClient, err := delegation.NewDelegationClient(ctx, deps) require.NoError(t, err) return dClient diff --git a/testutil/testclient/testeventsquery/connection.go b/testutil/testclient/testeventsquery/connection.go index c8af4e6ad..e51b31a81 100644 --- a/testutil/testclient/testeventsquery/connection.go +++ b/testutil/testclient/testeventsquery/connection.go @@ -1,6 +1,7 @@ package testeventsquery import ( + "sync/atomic" "testing" "github.com/golang/mock/gomock" @@ -47,3 +48,68 @@ func NewOneTimeMockDialer( return dialerMock } + +// NewNTimesReconnectMockConnAndDialer returns a new mock connection and mock +// dialer that will return the mock connection when DialContext is called. The +// mock dialer will expect DialContext to be called any times. The connection +// mock will expect Close and Send to be called exactly N times. +func NewNTimesReconnectMockConnAndDialer( + t *testing.T, + n int, + connClosed *atomic.Bool, + delayEvent *atomic.Bool, +) (*mockclient.MockConnection, *mockclient.MockDialer) { + connMock := NewNTimesReconnectConnectionMock(t, n, connClosed, delayEvent) + dialerMock := NewAnyTimesMockDailer(t, connMock) + return connMock, dialerMock +} + +// NewNTimesReconnectConnectionMock returns a mock connection that will expect +// Close and Send to be called exactly N times. The connection mock will set the +// connClosed atomic to true when Close is called and false when Send is called. +// The connection mock will set the delayEvent atomic to false when Send is +// called. This is to allow the caller to subscribe to the first event emitted +func NewNTimesReconnectConnectionMock( + t *testing.T, + n int, + connClosed *atomic.Bool, + delayEvent *atomic.Bool, +) *mockclient.MockConnection { + ctrl := gomock.NewController(t) + connMock := mockclient.NewMockConnection(ctrl) + // Expect the connection to be closed and the dialer to be re-established + connMock.EXPECT(). + Close(). + DoAndReturn(func() error { + connClosed.CompareAndSwap(false, true) + return nil + }). + Times(n) + // Expect the subscription to be re-established any number of times + connMock.EXPECT(). + Send(gomock.Any()). + DoAndReturn(func(eventBz []byte) error { + if connClosed.Load() { + connClosed.CompareAndSwap(true, false) + } + delayEvent.CompareAndSwap(true, false) + return nil + }). + Times(n) + return connMock +} + +// NewAnyTimesMockDailer returns a mock dialer that will return the given +// connection mock when DialContext is called. The mock dialer will expect +// DialContext to be called any number of times. +func NewAnyTimesMockDailer( + t *testing.T, + connMock *mockclient.MockConnection, +) *mockclient.MockDialer { + ctrl := gomock.NewController(t) + dialerMock := mockclient.NewMockDialer(ctrl) + dialerMock.EXPECT().DialContext(gomock.Any(), gomock.Any()). + Return(connMock, nil). + AnyTimes() + return dialerMock +} diff --git a/testutil/testclient/testtx/client.go b/testutil/testclient/testtx/client.go index 4bc85c218..405c24a2e 100644 --- a/testutil/testclient/testtx/client.go +++ b/testutil/testclient/testtx/client.go @@ -68,7 +68,7 @@ func NewOneTimeSignAndBroadcastTxClient( ) *mockclient.MockTxClient { t.Helper() - var ctrl = gomock.NewController(t) + ctrl := gomock.NewController(t) txClient := mockclient.NewMockTxClient(ctrl) txClient.EXPECT().SignAndBroadcast(