From 4d10f32d03a6500d1b6e41ef7e4aa8ae1d66d924 Mon Sep 17 00:00:00 2001 From: harry <53987565+h5law@users.noreply.github.com> Date: Thu, 7 Dec 2023 21:57:38 +0000 Subject: [PATCH] [Off-Chain] Add `EventsReplayClient` Generic Event Listener (#220) --- Makefile | 2 +- docs/pkg/client/README.md | 21 +- docs/pkg/client/events.md | 396 ++++++++++++++++++ docs/pkg/client/events_query.md | 204 --------- e2e/tests/session_steps_test.go | 6 +- go.mod | 4 +- pkg/appgateserver/cmd/cmd.go | 2 +- pkg/appgateserver/session.go | 2 +- pkg/client/block/block.go | 29 +- pkg/client/block/client.go | 239 +++-------- pkg/client/block/client_integration_test.go | 4 +- pkg/client/block/client_test.go | 6 +- pkg/client/block/errors.go | 8 - pkg/client/block/godoc.go | 5 + pkg/client/delegation/client.go | 84 ++++ .../delegation/client_integration_test.go | 173 ++++++++ pkg/client/delegation/client_test.go | 102 +++++ pkg/client/delegation/godoc.go | 5 + pkg/client/delegation/redelegation.go | 48 +++ pkg/client/events/errors.go | 14 + pkg/client/events/godoc.go | 12 + .../{events_query => events}/options.go | 2 +- .../client.go => events/query_client.go} | 8 +- .../query_client_integration_test.go} | 2 +- .../query_client_test.go} | 47 +-- pkg/client/events/replay_client.go | 249 +++++++++++ .../events/replay_client_example_test.go | 108 +++++ .../websocket/connection.go | 2 +- .../websocket/dialer.go | 0 pkg/client/events/websocket/errors.go | 11 + pkg/client/events/websocket/godoc.go | 3 + pkg/client/events_query/errors.go | 11 - pkg/client/events_query/websocket/errors.go | 8 - pkg/client/interface.go | 82 +++- pkg/client/tx/client.go | 6 +- pkg/client/tx/client_test.go | 3 +- pkg/deps/config/suppliers.go | 62 +-- pkg/partials/payloads/jsonrpc.go | 2 +- pkg/partials/payloads/rest.go | 2 +- pkg/relayer/cmd/cmd.go | 3 +- pkg/relayer/proxy/proxy.go | 2 +- pkg/relayer/proxy/relay_verifier.go | 2 +- pkg/relayer/session/claim.go | 8 +- pkg/relayer/session/proof.go | 2 +- pkg/signer/ring_signer.go | 5 +- proto/pocket/application/event.proto | 14 + proto/pocket/service/relay.proto | 6 +- testutil/keeper/application.go | 26 +- testutil/network/network.go | 86 +++- testutil/testclient/testblock/client.go | 27 +- testutil/testclient/testdelegation/client.go | 135 ++++++ testutil/testclient/testdelegation/godoc.go | 5 + testutil/testclient/testeventsquery/client.go | 7 +- testutil/testproxy/relayerproxy.go | 12 +- .../keeper/msg_server_delegate_to_gateway.go | 10 +- .../msg_server_delegate_to_gateway_test.go | 50 ++- .../msg_server_undelegate_from_gateway.go | 11 +- ...msg_server_undelegate_from_gateway_test.go | 55 ++- .../types/message_delegate_to_gateway.go | 7 + .../types/message_undelegate_from_gateway.go | 7 + x/supplier/keeper/query_supplier.go | 1 - x/supplier/keeper/supplier.go | 2 - 62 files changed, 1824 insertions(+), 633 deletions(-) create mode 100644 docs/pkg/client/events.md delete mode 100644 docs/pkg/client/events_query.md delete mode 100644 pkg/client/block/errors.go create mode 100644 pkg/client/block/godoc.go create mode 100644 pkg/client/delegation/client.go create mode 100644 pkg/client/delegation/client_integration_test.go create mode 100644 pkg/client/delegation/client_test.go create mode 100644 pkg/client/delegation/godoc.go create mode 100644 pkg/client/delegation/redelegation.go create mode 100644 pkg/client/events/errors.go create mode 100644 pkg/client/events/godoc.go rename pkg/client/{events_query => events}/options.go (95%) rename pkg/client/{events_query/client.go => events/query_client.go} (98%) rename pkg/client/{events_query/client_integration_test.go => events/query_client_integration_test.go} (98%) rename pkg/client/{events_query/client_test.go => events/query_client_test.go} (91%) create mode 100644 pkg/client/events/replay_client.go create mode 100644 pkg/client/events/replay_client_example_test.go rename pkg/client/{events_query => events}/websocket/connection.go (94%) rename pkg/client/{events_query => events}/websocket/dialer.go (100%) create mode 100644 pkg/client/events/websocket/errors.go create mode 100644 pkg/client/events/websocket/godoc.go delete mode 100644 pkg/client/events_query/errors.go delete mode 100644 pkg/client/events_query/websocket/errors.go create mode 100644 proto/pocket/application/event.proto create mode 100644 testutil/testclient/testdelegation/client.go create mode 100644 testutil/testclient/testdelegation/godoc.go diff --git a/Makefile b/Makefile index 89cb5cd59..bb990cd19 100644 --- a/Makefile +++ b/Makefile @@ -520,7 +520,7 @@ trigger_ci: ## Trigger the CI pipeline by submitting an empty commit; See https: .PHONY: go_docs go_docs: check_godoc ## Generate documentation for the project - echo "Visit http://localhost:6060/pkg/pocket/" + echo "Visit http://localhost:6060/pkg/github.com/pokt-network/poktroll/" godoc -http=:6060 .PHONY: openapi_gen diff --git a/docs/pkg/client/README.md b/docs/pkg/client/README.md index 6f4032800..ca07b4ad9 100644 --- a/docs/pkg/client/README.md +++ b/docs/pkg/client/README.md @@ -17,7 +17,6 @@ - [Best Practices](#best-practices) - [FAQ](#faq) - ## Overview The `client` package exposes go APIs to facilitate interactions with the Pocket network. @@ -26,11 +25,12 @@ It includes lower-level interfaces for working with transactions and subscribing ## Features | Interface | Description | -|-------------------------|----------------------------------------------------------------------------------------------------| +| ----------------------- | -------------------------------------------------------------------------------------------------- | | **`SupplierClient`** | A high-level client for use by the "supplier" actor. | | **`TxClient`** | A high-level client used to build, sign, and broadcast transaction from cosmos-sdk messages. | | **`TxContext`** | Abstracts and encapsulates the transaction building, signing, encoding, and broadcasting concerns. | | **`BlockClient`** | Exposes methods for receiving notifications about newly committed blocks. | +| **`DelegationClient`** | Exposes methods for receiving notifications about new delegation changes from application. | | **`EventsQueryClient`** | Encapsulates blockchain event subscriptions. | | **`Connection`** | A transport agnostic communication channel for sending and receiving messages. | | **`Dialer`** | Abstracts the establishment of connections. | @@ -42,7 +42,7 @@ It includes lower-level interfaces for working with transactions and subscribing title: Component Diagram Legend --- flowchart - + c[Component] d[Dependency Component] s[[Subcomponent]] @@ -60,21 +60,24 @@ c --> s title: Clients Dependency Tree --- flowchart - sup[SupplierClient] tx[TxClient] txctx[[TxContext]] -bl[BlockClient] +subgraph bl[BlockClient] + bl_evt_replay[EventsReplayClient] +end +subgraph del[DelegationClient] + del_evt_replay[EventsReplayClient] +end evt[EventsQueryClient] conn[[Connection]] dial[[Dialer]] - sup --"#SignAndBroadcast()"--> tx - tx --"#CommittedBlocksSequence()"--> bl tx --"#BroadcastTx"--> txctx tx --"#EventsBytes()"--> evt -bl --"#EventsBytes()"--> evt +bl_evt_replay --"#EventsBytes()"--> evt +del_evt_replay --"#EventsBytes()"--> evt evt --> conn evt --"#DialContext()"--> dial dial --"(returns)"--> conn @@ -144,4 +147,4 @@ While `TxClient` is centered around signing and broadcasting transactions, `TxCo #### Can I extend or customize the provided interfaces? -Yes, the package is designed with modularity in mind. You can either implement the interfaces based on your requirements or extend them for additional functionalities. \ No newline at end of file +Yes, the package is designed with modularity in mind. You can either implement the interfaces based on your requirements or extend them for additional functionalities. diff --git a/docs/pkg/client/events.md b/docs/pkg/client/events.md new file mode 100644 index 000000000..b4d90ab98 --- /dev/null +++ b/docs/pkg/client/events.md @@ -0,0 +1,396 @@ +# Package `pocket/pkg/client/events` + +> An event query package for interfacing with [CometBFT](https://cometbft.com/) +> and the [Cosmos SDK](https://v1.cosmos.network/sdk), facilitating subscriptions +> to chain event messages. + + + +- [Overview](#overview) +- [Architecture Diagrams](#architecture-diagrams) + - [Components](#components) + - [Events Query Client](#events-query-client) + - [Events Replay Client](#events-replay-client) + - [Subscriptions](#subscriptions) +- [Installation](#installation) +- [Features](#features) +- [Usage (`EventsQueryClient`)](#usage-eventsqueryclient) + - [Basic Example](#basic-example) + - [Advanced Usage](#advanced-usage) + - [Configuration](#configuration) +- [Usage (`EventsReplayClient`)](#usage-eventsreplayclient) + - [Basic Usage](#basic-usage) + - [Advanced Usage](#advanced-usage-1) +- [Best Practices](#best-practices) +- [FAQ](#faq) + - [Why use `events` over directly using Gorilla WebSockets?](#why-use-events-over-directly-using-gorilla-websockets) + - [How can I use a different connection mechanism other than WebSockets?](#how-can-i-use-a-different-connection-mechanism-other-than-websockets) + - [Why use the `EventsReplayClient` over directly maintaining an `EventsQueryClient`?](#why-use-the-eventsreplayclient-over-directly-maintaining-an-eventsqueryclient) + + + +## Overview + +The `events` package provides a client interface to subscribe to chain event +messages. It abstracts the underlying connection mechanisms and offers a clear +and easy-to-use way to get events from the chain. Highlights: + +- Offers subscription to chain event messages matching a given query. +- Uses the Gorilla WebSockets package for underlying connection operations. +- Provides a modular structure with interfaces allowing for mock implementations + and testing. +- Offers considerations for potential improvements and replacements, such as + integration with the cometbft RPC client. +- Offers a generic client to decode on chain event bytes into the desired event + type + +## Architecture Diagrams + +The following section contains numerous diagrams that detail the architecture +of the different aspects of the `events` package. + +### Components + +The following legend describes how to read the following component diagrams. + +```mermaid +--- +title: Component Diagram Legend +--- + +flowchart + + a[Component A] + b[Component B] + c[Component C] + d[Component D] + + a --"A uses B via B#MethodName()"--> b + a =="A returns C from A#MethodName()"==> c + b -."B uses D via network IO".-> d +``` + +#### Events Query Client + +```mermaid +--- +title: EventsQueryClient Components +--- + +flowchart + + subgraph comet[Cometbft Node] + subgraph rpc[JSON-RPC] + sub[subscribe endpoint] + end + end + + subgraph eqc[EventsQueryClient] + q1_eb[EventsBytesObservable] + q1_conn[Connection] + q1_dial[Dialer] + end + + q1_obsvr1[Observer 1] + q1_obsvr2[Observer 2] + + + q1_obsvr1 --"#Subscribe()"--> q1_eb + q1_obsvr2 --"#Subscribe()"--> q1_eb + + + q1_dial =="#DialContext()"==> q1_conn + q1_eb --"#Receive()"--> q1_conn + + q1_conn -.-> sub +``` + +#### Events Replay Client + +```mermaid +--- +title: EventsReplayClient Components +--- + +flowchart + slice_cons[Events Slice Consumer] + + subgraph obs_cons[Events Observable Consumer] + evt_obs[Events Observer] + end + + subgraph erc[EventsReplayClient] + cache[Events Replay Observable Cache] + end + + chain[CometBFT Node] + + evt_re_obs[Events Replay Observable] + + eqc[Events Query Client] + + + slice_cons --"#LastNEvents()"--> erc + obs_cons --"#EventsSequence()"--> erc + erc =="#EventsSequence()"==> evt_re_obs + evt_obs --"#Subscribe()"--> evt_re_obs + + erc --"#EventBytes()"--> eqc + + eqc -.-> chain +``` + +### Subscriptions + +_TODO_DOCUMENTATION(@bryanchriswhite): Add Legend_ + +```mermaid +--- +title: Event Subscription Data Flow +--- + +flowchart + + subgraph comet[Cometbft Node] + subgraph rpc[JSON-RPC] + sub[subscribe endpoint] + end + end + + subgraph eqc[EventsQueryClient] + subgraph q1[Query 1] + q1_eb[EventsBytesObservable] + q1_conn[Connection] + end + subgraph q2[Query 2] + q2_conn[Connection] + q2_eb[EventsBytesObservable] + end + end + + q1_obsvr1[Query 1 Observer 1] + q1_obsvr2[Query 1 Observer 2] + q2_obsvr[Query 2 Observer] + + q1_eb -.-> q1_obsvr1 + q1_eb -.-> q1_obsvr2 + q2_eb -.-> q2_obsvr + + + q1_conn -.-> q1_eb + q2_conn -.-> q2_eb + + sub -.-> q1_conn + sub -.-> q2_conn +``` + +## Installation + +```bash +go get github.com/pokt-network/poktroll/pkg/client/events +``` + +## Features + +- **Websocket Connection**: Uses the + [Gorilla WebSockets](https://github.com/gorilla/websocket) for implementing the + connection interface. +- **Events Subscription**: Subscribe to chain event messages using a simple query + mechanism. +- **Dialer Interface**: Offers a `Dialer` interface for constructing connections, + which can be easily mocked for tests. +- **Observable Pattern**: Integrates the observable pattern, making it easier to + react to chain events. +- **Generic Replay Client**: Offers a generic typed replay client to listen for + specifc events on chain, and handles reconnection and subscription on error, + if the `EventsQueryClient` returns an error or is unexpectedly closed. + +## Usage (`EventsQueryClient`) + +### Basic Example + +```go +ctx := context.Background() + +// Creating a new EventsQueryClient with the default, websocket dialer: +cometWebsocketURL := "ws://example.com" +evtClient := events.NewEventsQueryClient(cometWebsocketURL) + +// Subscribing to a specific event, e.g. newly committed blocks: +// (see: https://docs.cosmos.network/v0.47/core/events#subscribing-to-events) +observable := evtClient.EventsBytes(ctx, "tm.event='NewBlock'") + +// Subscribe and receive from the observer channel, typically in some other scope. +observer := observable.Subscribe(ctx) + +// Observer channel closes when the context is canceled, observer is +// unsubscribed, or after the subscription returns an error. +for eitherEvent := range observer.Ch() { + // (see either.Either: https://github.com/pokt-network/poktroll/blob/main/pkg/either/either.go#L3) + eventBz, err := eitherEvent.ValueOrError() + + // ... +} +``` + +### Advanced Usage + +```go +// Given some custom dialer & connection implementation, e.g.: +var ( + tcpDialer events.Dialer = exampletcp.NewTcpDialerImpl() + grcpDialer events.Dialer = examplegrpc.NewGrpcDialerImpl() +) + +// Both TCP and gRPC use the TCP scheme as gRPC uses TCP for its transport layer. +cometUrl = "tcp://example.com" + +// Creating new EventsQueryClients with a custom tcpDialer: +// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#WithDialer +tcpDialerOpt := events.WithDialer(tcpDialer) +// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsQueryClient +tcpEvtClient := events.NewEventsQueryClient(cometUrl, tcpDialerOpt) + +// Alternatively, with a custom gRPC dialer: +// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#WithDialer +gcpDialerOpt := events.WithDialer(grcpDialer) +// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsQueryClient +grpcEvtClient := events.NewEventsQueryClient(cometUrl, grpcDialerOpt) + +// ... rest follows the same as the basic example. +``` + +### Configuration + +- **WithDialer**: Configure the client to use a custom dialer for connections. + +## Usage (`EventsReplayClient`) + +### Basic Usage + +```go +const ( + // Define a query string to provide to the EventsQueryClient + // See: https://docs.cosmos.network/v0.47/learn/advanced/events#subscribing-to-events + // And: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events + eventQueryString = "message.action='eventName'" + // Define the websocket URL the EventsQueryClient will subscribe to + cometWebsocketURL = "ws://example.com:36657/websocket" + // the amount of events we want before they are emitted + replayObsBufferSize = 1 +) + +// Define an interface to represent an arbitrary onchain event +type EventType interface { + GetName() string // Illustrative only; arbitrary interfaces are supported. +} + +// Define the event type that implements the interface +type eventType struct { + Name string `json:"name"` +} + +func (e *eventType) GetName() string { return e.Name } + +// Define a decoder function that can take the raw event bytes +// received from the EventsQueryClient and convert them into +// the desired type for the EventsReplayClient +// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsFn +func eventTypeFactory(ctx context.Context) events.NewEventsFn[EventType] { + return function(eventBz []byte) EventType { + eventMsg := new(eventType) + logger := polylog.Ctx(ctx) + + if err := json.Unmarshal(eventBz, eventMsg); err != nil { + return nil, err + } + + // Confirm the event is correct by checking its fields + if eventMsg.Name == "" { + return nil, events.ErrEventsUnmarshalEvent. + Wrapf("with eventType data: %s", string(eventBz)) + } + + return eventMsg, nil + } +} + +// Create the events query client and a depinject config to supply +// it into the EventsReplayClient +// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsQueryClient +evtClient := events.NewEventsQueryClient(cometWebsocketURL) +depConfig := depinject.Supply(evtClient) + +// Create a context (this should be cancellable to close the EventsReplayClient) +ctx, cancel := context.WithCancel(context.Background()) + +// Create a new instance of the EventsReplayClient +// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsReplayClient +client, err := events.NewEventsReplayClient[ + EventType, + observable.ReplayObservable[EventType], +]( + ctx, + depConfig, + cometWebsocketURL, + eventQueryString, + eventTypeFactory(ctx), + replayObsBufferSize, +) +if err != nil { + return nil, fmt.Errorf("unable to create EventsReplayClient %w", err) +} + +// Retrieve the latest emitted event +lastEventType := client.LastNEvents(ctx, 1)[0] + +// Get the latest replay observable +latestEventsObs := client.EventsSequence(ctx) +// Get the latest events from the sequence +lastEventType = latestEventsObs.Last(ctx, 1)[0] + +// Cancel the context which will call client.Close and close all +// subscriptions and the EventsQueryClient +cancel() +``` + +### Advanced Usage + +The `EventsReplayClient` can be lightly wrapped to define a custom client for +a respective type. Examples of these include the `client.BlockClient` and +`client.DelegationClient` interfaces which under-the-hood are wrappers for the +`EventsReplayClient`. + +See: [BlockClient](../../../pkg/client/block/) and +[DelegationClient](../../../pkg/client/delegation/) for more detailed examples +on how to wrap and use the `EventsReplayClient` in a more advanced setting. + +## Best Practices + +- **Connection Handling**: Ensure to close the `EventsQueryClient` when done to + free up resources and avoid potential leaks. +- **Error Handling**: Always check both the synchronous error returned by + `EventsBytes` as well as asynchronous errors send over the observable. + +## FAQ + +#### Why use `events` over directly using Gorilla WebSockets? + +`events` abstracts many of the underlying details and provides a streamlined +interface for subscribing to chain events. It also integrates the observable +pattern and provides mockable interfaces for better testing. + +#### How can I use a different connection mechanism other than WebSockets? + +You can implement the `Dialer` and `Connection` interfaces and use the +`WithDialer` configuration to provide your custom dialer. + +#### Why use the `EventsReplayClient` over directly maintaining an `EventsQueryClient`? + +The `EventsReplayClient` will automatically attempt to reconnect to the +underlying `EventsQueryClient` in the event that it closes and publish the most +recent `observable.ReplayObservable` that can be used to retrieve events. This +means that the consumer does not need to maintain their own connection to the +`EventsQueryClient` and can always call the `EventsSequence` and `LastNEvents` +methods to retrieve the latest observable and slice of decoded events from an +active `EventsQueryClient`. diff --git a/docs/pkg/client/events_query.md b/docs/pkg/client/events_query.md deleted file mode 100644 index 787eecc8e..000000000 --- a/docs/pkg/client/events_query.md +++ /dev/null @@ -1,204 +0,0 @@ -# Package `pocket/pkg/client/events_query` - -> An event query package for interfacing with [CometBFT](https://cometbft.com/) and the [Cosmos SDK](https://v1.cosmos.network/sdk), facilitating subscriptions to chain event messages. - -- [Overview](#overview) -- [Architecture Diagrams](#architecture-diagrams) -- [Installation](#installation) -- [Features](#features) -- [Usage](#usage) - - [Basic Example](#basic-example) - - [Advanced Usage](#advanced-usage) - - [Configuration](#configuration) -- [Best Practices](#best-practices) -- [FAQ](#faq) - - [Why use `events_query` over directly using Gorilla WebSockets?](#why-use-events_query-over-directly-using-gorilla-websockets) - - [How can I use a different connection mechanism other than WebSockets?](#how-can-i-use-a-different-connection-mechanism-other-than-websockets) - -## Overview - -The `events_query` package provides a client interface to subscribe to chain event messages. It abstracts the underlying connection mechanisms and offers a clear and easy-to-use way to get events from the chain. Highlights: - -- Offers subscription to chain event messages matching a given query. -- Uses the Gorilla WebSockets package for underlying connection operations. -- Provides a modular structure with interfaces allowing for mock implementations and testing. -- Offers considerations for potential improvements and replacements, such as integration with the cometbft RPC client. - -## Architecture Diagrams - -### Components -```mermaid ---- -title: Component Diagram Legend ---- - -flowchart - - a[Component A] - b[Component B] - c[Component C] - d[Component D] - - a --"A uses B via B#MethodName()"--> b -a =="A returns C from A#MethodName()"==> c -b -."A uses D via network IO".-> d -``` -```mermaid ---- -title: EventsQueryClient Components ---- - -flowchart - - subgraph comet[Cometbft Node] - subgraph rpc[JSON-RPC] - sub[subscribe endpoint] - end - end - - subgraph eqc[EventsQueryClient] - q1_eb[EventsBytesObservable] - q1_conn[Connection] - q1_dial[Dialer] - end - - q1_obsvr1[Observer 1] - q1_obsvr2[Observer 2] - - - q1_obsvr1 --"#Subscribe()"--> q1_eb -q1_obsvr2 --"#Subscribe()"--> q1_eb - - -q1_dial =="#DialContext()"==> q1_conn -q1_eb --"#Receive()"--> q1_conn - -q1_conn -.-> sub - -``` - -### Subscriptions -```mermaid ---- -title: Event Subscription Data Flow ---- - -flowchart - -subgraph comet[Cometbft Node] - subgraph rpc[JSON-RPC] - sub[subscribe endpoint] - end -end - -subgraph eqc[EventsQueryClient] - subgraph q1[Query 1] - q1_eb[EventsBytesObservable] - q1_conn[Connection] - end - subgraph q2[Query 2] - q2_conn[Connection] - q2_eb[EventsBytesObservable] - end -end - -q1_obsvr1[Query 1 Observer 1] -q1_obsvr2[Query 1 Observer 2] -q2_obsvr[Query 2 Observer] - -q1_eb -.-> q1_obsvr1 -q1_eb -.-> q1_obsvr2 -q2_eb -.-> q2_obsvr - - -q1_conn -.-> q1_eb -q2_conn -.-> q2_eb - -sub -.-> q1_conn -sub -.-> q2_conn - -``` - -## Installation - -```bash -go get github.com/pokt-network/poktroll/pkg/client/events_query -``` - -## Features - -- **Websocket Connection**: Uses the [Gorilla WebSockets](https://github.com/gorilla/websocket) for implementing the connection interface. -- **Events Subscription**: Subscribe to chain event messages using a simple query mechanism. -- **Dialer Interface**: Offers a `Dialer` interface for constructing connections, which can be easily mocked for tests. -- **Observable Pattern**: Integrates the observable pattern, making it easier to react to chain events. - -## Usage - -### Basic Example - -```go -ctx := context.Background() - -// Creating a new EventsQueryClient with the default, websocket dialer: -cometWebsocketURL := "ws://example.com" -evtClient := eventsquery.NewEventsQueryClient(cometWebsocketURL) - -// Subscribing to a specific event, e.g. newly committed blocks: -// (see: https://docs.cosmos.network/v0.47/core/events#subscribing-to-events) -observable := evtClient.EventsBytes(ctx, "tm.event='NewBlock'") - -// Subscribe and receive from the observer channel, typically in some other scope. -observer := observable.Subscribe(ctx) - -// Observer channel closes when the context is cancelled, observer is -// unsubscribed, or after the subscription returns an error. -for eitherEvent := range observer.Ch() { - // (see either.Either: https://github.com/pokt-network/poktroll/blob/main/pkg/either/either.go#L3) - eventBz, err := eitherEvent.ValueOrError() - - // ... -} -``` - -### Advanced Usage - -```go -// Given some custom dialer & connection implementation, e.g.: -var ( - tcpDialer eventsquery.Dialer = exampletcp.NewTcpDialerImpl() - grcpDialer eventsquery.Dialer = examplegrpc.NewGrpcDialerImpl() -) - -// Both TCP and gRPC use the TCP scheme as gRPC uses TCP for its transport layer. -cometUrl = "tcp://example.com" - -// Creating new EventsQueryClients with a custom tcpDialer: -tcpDialerOpt := eventsquery.WithDialer(tcpDialer) -tcpEvtClient := eventsquery.NewEventsQueryClient(cometUrl, tcpDialerOpt) - -// Alternatively, with a custom gRPC dialer: -gcpDialerOpt := eventsquery.WithDialer(grcpDialer) -grpcEvtClient := eventsquery.NewEventsQueryClient(cometUrl, grpcDialerOpt) - -// ... rest follows the same as the basic example. -``` - -### Configuration - -- **WithDialer**: Configure the client to use a custom dialer for connections. - -## Best Practices - -- **Connection Handling**: Ensure to close the `EventsQueryClient` when done to free up resources and avoid potential leaks. -- **Error Handling**: Always check both the synchronous error returned by `EventsBytes` as well as asynchronous errors send over the observable. - -## FAQ - -#### Why use `events_query` over directly using Gorilla WebSockets? - -`events_query` abstracts many of the underlying details and provides a streamlined interface for subscribing to chain events. -It also integrates the observable pattern and provides mockable interfaces for better testing. - -#### How can I use a different connection mechanism other than WebSockets? - -You can implement the `Dialer` and `Connection` interfaces and use the `WithDialer` configuration to provide your custom dialer. diff --git a/e2e/tests/session_steps_test.go b/e2e/tests/session_steps_test.go index 2c016f706..0d9b9736f 100644 --- a/e2e/tests/session_steps_test.go +++ b/e2e/tests/session_steps_test.go @@ -13,7 +13,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "github.com/stretchr/testify/require" - eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/either" "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" @@ -31,7 +31,7 @@ const ( ) func (s *suite) AfterTheSupplierCreatesAClaimForTheSessionForServiceForApplication(serviceId, appName string) { - var ctx, done = context.WithCancel(context.Background()) + ctx, done := context.WithCancel(context.Background()) // TODO_CONSIDERATION: if this test suite gets more complex, it might make // sense to refactor this key into a function that takes serviceId and appName @@ -126,7 +126,7 @@ func (s *suite) TheSupplierHasServicedASessionWithRelaysForServiceForApplication msgSenderQuery := fmt.Sprintf(msgClaimSenderQueryFmt, accNameToAddrMap[supplierName]) // TODO_TECHDEBT(#220): refactor to use EventsReplayClient once available. - eventsQueryClient := eventsquery.NewEventsQueryClient(testclient.CometLocalWebsocketURL) + eventsQueryClient := events.NewEventsQueryClient(testclient.CometLocalWebsocketURL) eitherEventsBzObs, err := eventsQueryClient.EventsBytes(ctx, msgSenderQuery) require.NoError(s, err) diff --git a/go.mod b/go.mod index 08aa46324..ddb25c1fd 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,6 @@ require ( github.com/athanorlabs/go-dleq v0.1.0 github.com/cometbft/cometbft v0.37.2 github.com/cometbft/cometbft-db v0.8.0 - github.com/cosmos/cosmos-proto v1.0.0-beta.2 github.com/cosmos/cosmos-sdk v0.47.3 github.com/cosmos/gogoproto v1.4.11 github.com/cosmos/ibc-go/v7 v7.1.0 @@ -46,7 +45,6 @@ require ( go.uber.org/multierr v1.11.0 golang.org/x/crypto v0.15.0 golang.org/x/sync v0.5.0 - google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb google.golang.org/grpc v1.59.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -90,6 +88,7 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cosmos/btcutil v1.0.5 // indirect + github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/cosmos/gogogateway v1.2.0 // indirect github.com/cosmos/iavl v0.20.0 // indirect @@ -285,6 +284,7 @@ require ( google.golang.org/api v0.143.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index 4e3a51c5f..821698c53 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -67,7 +67,7 @@ provided that: // Cosmos flags cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") cmd.Flags(). - StringVar(&flagCosmosNodeURL, cosmosflags.FlagNode, omittedDefaultFlagValue, "Register the default Cosmos node flag, which is needed to initialise the Cosmos query context correctly. It can be used to override the `QueryNodeUrl` field in the config file if specified.") + StringVar(&flagCosmosNodeURL, cosmosflags.FlagNode, omittedDefaultFlagValue, "Register the default Cosmos node flag, which is needed to initialize the Cosmos query context correctly. It can be used to override the `QueryNodeUrl` field in the config file if specified.") return cmd } diff --git a/pkg/appgateserver/session.go b/pkg/appgateserver/session.go index 5db25465d..0d886d8cb 100644 --- a/pkg/appgateserver/session.go +++ b/pkg/appgateserver/session.go @@ -17,7 +17,7 @@ func (app *appGateServer) getCurrentSession( app.sessionMu.RLock() defer app.sessionMu.RUnlock() - latestBlock := app.blockClient.LatestBlock(ctx) + latestBlock := app.blockClient.LastNBlocks(ctx, 1)[0] if currentSession, ok := app.currentSessions[serviceId]; ok { sessionEndBlockHeight := currentSession.Header.SessionStartBlockHeight + currentSession.NumBlocksPerSession diff --git a/pkg/client/block/block.go b/pkg/client/block/block.go index f5bd94516..bbc61e365 100644 --- a/pkg/client/block/block.go +++ b/pkg/client/block/block.go @@ -6,6 +6,7 @@ import ( "github.com/cometbft/cometbft/types" "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/events" ) // cometBlockEvent is used to deserialize incoming committed block event messages @@ -26,19 +27,23 @@ func (blockEvent *cometBlockEvent) Hash() []byte { return blockEvent.Block.LastBlockID.Hash.Bytes() } -// newCometBlockEvent attempts to deserialize the given bytes into a comet block. +// newCometBlockEventFactoryFn is a factory function that returns a functon +// that attempts to deserialize the given bytes into a comet block. // if the resulting block has a height of zero, assume the event was not a block // event and return an ErrUnmarshalBlockEvent error. -func newCometBlockEvent(blockMsgBz []byte) (client.Block, error) { - blockMsg := new(cometBlockEvent) - if err := json.Unmarshal(blockMsgBz, blockMsg); err != nil { - return nil, err +func newCometBlockEventFactoryFn() events.NewEventsFn[client.Block] { + return func(blockMsgBz []byte) (client.Block, error) { + blockMsg := new(cometBlockEvent) + if err := json.Unmarshal(blockMsgBz, blockMsg); err != nil { + return nil, err + } + + // If msg does not match the expected format then the block's height has a zero value. + if blockMsg.Block.Header.Height == 0 { + return nil, events.ErrEventsUnmarshalEvent. + Wrapf("with block data: %s", string(blockMsgBz)) + } + + return blockMsg, nil } - - // If msg does not match the expected format then the block's height has a zero value. - if blockMsg.Block.Header.Height == 0 { - return nil, ErrUnmarshalBlockEvent.Wrap(string(blockMsgBz)) - } - - return blockMsg, nil } diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 7ded373f2..1103a4b66 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -2,79 +2,33 @@ package block import ( "context" - "fmt" - "time" "cosmossdk.io/depinject" "github.com/pokt-network/poktroll/pkg/client" - "github.com/pokt-network/poktroll/pkg/either" - "github.com/pokt-network/poktroll/pkg/observable" - "github.com/pokt-network/poktroll/pkg/observable/channel" - "github.com/pokt-network/poktroll/pkg/retry" + "github.com/pokt-network/poktroll/pkg/client/events" ) const ( - // eventsBytesRetryDelay is the delay between retry attempts when the events - // bytes observable returns an error. - eventsBytesRetryDelay = time.Second - // eventsBytesRetryLimit is the maximum number of times to attempt to - // re-establish the events query bytes subscription when the events bytes - // observable returns an error. - eventsBytesRetryLimit = 10 - eventsBytesRetryResetTimeout = 10 * time.Second - // NB: cometbft event subscription query for newly committed blocks. - // (see: https://docs.cosmos.network/v0.47/core/events#subscribing-to-events) + // committedBlocksQuery is the query used to subscribe to new committed block + // events used by the EventsQueryClient to subscribe to new block events from + // the chain. + // See: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events committedBlocksQuery = "tm.event='NewBlock'" - // latestBlockObsvblsReplayBufferSize is the replay buffer size of the - // latestBlockObsvbls replay observable which is used to cache the latest block observable. - // It is updated with a new "active" observable when a new - // events query subscription is created, for example, after a non-persistent - // connection error. - latestBlockObsvblsReplayBufferSize = 1 - // latestBlockReplayBufferSize is the replay buffer size of the latest block - // replay observable which is notified when block commit events are received - // by the events query client subscription created in goPublishBlocks. - latestBlockReplayBufferSize = 1 + // TODO_TECHDEBT/TODO_FUTURE: add a `blocksReplayLimit` field to the block + // client struct that defaults to this but can be overridden via an option + // in future work. + // defaultBlocksReplayLimit is the number of blocks that the replay + // observable returned by LastNBlocks() will be able to replay. + defaultBlocksReplayLimit = 100 ) -var ( - _ client.BlockClient = (*blockClient)(nil) - _ client.Block = (*cometBlockEvent)(nil) -) - -// blockClient implements the BlockClient interface. -type blockClient struct { - // endpointURL is the URL of RPC endpoint which eventsClient subscription - // requests will be sent. - endpointURL string - // eventsClient is the events query client which is used to subscribe to - // newly committed block events. It emits an either value which may contain - // an error, at most, once and closes immediately after if it does. - eventsClient client.EventsQueryClient - // latestBlockObsvbls is a replay observable with replay buffer size 1, - // which holds the "active latest block observable" which is notified when - // block commit events are received by the events query client subscription - // created in goPublishBlocks. This observable (and the one it emits) closes - // when the events bytes observable returns an error and is updated with a - // new "active" observable after a new events query subscription is created. - latestBlockObsvbls observable.ReplayObservable[client.BlocksObservable] - // latestBlockObsvblsReplayPublishCh is the publish channel for latestBlockObsvbls. - // It's used to set blockObsvbl initially and subsequently update it, for - // example, when the connection is re-established after erroring. - latestBlockObsvblsReplayPublishCh chan<- client.BlocksObservable -} - -// eventsBytesToBlockMapFn is a convenience type to represent the type of a -// function which maps event subscription message bytes into block event objects. -// This is used as a transformFn in a channel.Map() call and is the type returned -// by the newEventsBytesToBlockMapFn factory function. -type eventBytesToBlockMapFn = func( - context.Context, - either.Bytes, -) (client.Block, bool) - -// NewBlockClient creates a new block client from the given dependencies and cometWebsocketURL. +// NewBlockClient creates a new block client from the given dependencies and +// cometWebsocketURL. It uses a pre-defined committedBlocksQuery to subscribe to +// newly committed block events which are mapped to Block objects. +// +// This lightly wraps the EventsReplayClient[Block] generic to correctly mock +// the interface. // // Required dependencies: // - client.EventsQueryClient @@ -83,141 +37,46 @@ func NewBlockClient( deps depinject.Config, cometWebsocketURL string, ) (client.BlockClient, error) { - // Initialize block client - bClient := &blockClient{endpointURL: cometWebsocketURL} - bClient.latestBlockObsvbls, bClient.latestBlockObsvblsReplayPublishCh = channel.NewReplayObservable[client.BlocksObservable](ctx, latestBlockObsvblsReplayBufferSize) - - // Inject dependencies - if err := depinject.Inject(deps, &bClient.eventsClient); err != nil { + client, err := events.NewEventsReplayClient[ + client.Block, + client.EventsObservable[client.Block], + ]( + ctx, + deps, + cometWebsocketURL, + committedBlocksQuery, + newCometBlockEventFactoryFn(), + defaultBlocksReplayLimit, + ) + if err != nil { return nil, err } - - // Concurrently publish blocks to the observable emitted by latestBlockObsvbls. - go bClient.goPublishBlocks(ctx) - - return bClient, nil + return &blockClient{eventsReplayClient: client}, nil } -// CommittedBlocksSequence returns a ReplayObservable, with a replay buffer size -// of 1, which is notified when block commit events are received by the events -// query subscription. -func (bClient *blockClient) CommittedBlocksSequence(ctx context.Context) client.BlocksObservable { - // Get the latest block observable from the replay observable. We only ever - // want the last 1 as any prior latest block observable values are closed. - // Directly accessing the zeroth index here is safe because the call to Last - // is guaranteed to return a slice with at least 1 element. - return bClient.latestBlockObsvbls.Last(ctx, 1)[0] -} - -// LatestBlock returns the latest committed block that's been received by the -// corresponding events query subscription. -// It blocks until at least one block event has been received. -func (bClient *blockClient) LatestBlock(ctx context.Context) client.Block { - return bClient.CommittedBlocksSequence(ctx).Last(ctx, 1)[0] -} - -// Close unsubscribes all observers of the committed blocks sequence observable -// and closes the events query client. -func (bClient *blockClient) Close() { - // Closing eventsClient will cascade unsubscribe and close downstream observers. - bClient.eventsClient.Close() +// blockClient is a wrapper around an EventsReplayClient that implements the +// BlockClient interface for use with cosmos-sdk networks. +type blockClient struct { + // eventsReplayClient is the underlying EventsReplayClient that is used to + // subscribe to new committed block events. It uses both the Block type + // and the BlockReplayObservable type as its generic types. + // These enable the EventsReplayClient to correctly map the raw event bytes + // to Block objects and to correctly return a BlockReplayObservable + eventsReplayClient client.EventsReplayClient[client.Block, client.EventsObservable[client.Block]] } -// goPublishBlocks runs the work function returned by retryPublishBlocksFactory, -// re-invoking it according to the arguments to retry.OnError when the events bytes -// observable returns an asynchronous error. -// This function is intended to be called in a goroutine. -func (bClient *blockClient) goPublishBlocks(ctx context.Context) { - // React to errors by getting a new events bytes observable, re-mapping it, - // and send it to latestBlockObsvblsReplayPublishCh such that - // latestBlockObsvbls.Last(ctx, 1) will return it. - publishErr := retry.OnError( - ctx, - eventsBytesRetryLimit, - eventsBytesRetryDelay, - eventsBytesRetryResetTimeout, - "goPublishBlocks", - bClient.retryPublishBlocksFactory(ctx), - ) - - // If we get here, the retry limit was reached and the retry loop exited. - // Since this function runs in a goroutine, we can't return the error to the - // caller. Instead, we panic. - if publishErr != nil { - panic(fmt.Errorf("BlockClient.goPublishBlocks should never reach this spot: %w", publishErr)) - } +// CommittedBlocksSequence returns a replay observable of new block events. +func (b *blockClient) CommittedBlocksSequence(ctx context.Context) client.BlockReplayObservable { + return b.eventsReplayClient.EventsSequence(ctx) } -// retryPublishBlocksFactory returns a function which is intended to be passed to -// retry.OnError. The returned function pipes event bytes from the events query -// client, maps them to block events, and publishes them to the latestBlockObsvbls -// replay observable. -func (bClient *blockClient) retryPublishBlocksFactory(ctx context.Context) func() chan error { - return func() chan error { - errCh := make(chan error, 1) - eventsBzObsvbl, err := bClient.eventsClient.EventsBytes(ctx, committedBlocksQuery) - if err != nil { - errCh <- err - return errCh - } - - // NB: must cast back to generic observable type to use with Map. - // client.BlocksObservable cannot be an alias due to gomock's lack of - // support for generic types. - eventsBz := observable.Observable[either.Either[[]byte]](eventsBzObsvbl) - blocksObsvbl := channel.MapReplay( - ctx, - latestBlockReplayBufferSize, - eventsBz, - newEventsBytesToBlockMapFn(errCh), - ) - - // Initially set latestBlockObsvbls and update if after retrying on error. - bClient.latestBlockObsvblsReplayPublishCh <- blocksObsvbl - - return errCh - } +// LatestsNBlocks returns the last n blocks observed by the BockClient. +func (b *blockClient) LastNBlocks(ctx context.Context, n int) []client.Block { + return b.eventsReplayClient.LastNEvents(ctx, n) } -// newEventsBytesToBlockMapFn is a factory for a function which is intended -// to be used as a transformFn in a channel.Map() call. Since the map function -// is called asynchronously, this factory creates a closure around an error channel -// which can be used for asynchronous error signaling from within the map function, -// and handling from the Map call context. -// -// The map function itself attempts to deserialize the given byte slice as a -// committed block event. If the events bytes observable contained an error, this value is not emitted -// (skipped) on the destination observable of the map operation. -// If deserialization failed because the event bytes were for a different event type, -// this value is also skipped. -// If deserialization failed for some other reason, this function panics. -func newEventsBytesToBlockMapFn(errCh chan<- error) eventBytesToBlockMapFn { - return func( - _ context.Context, - eitherEventBz either.Bytes, - ) (_ client.Block, skip bool) { - eventBz, err := eitherEventBz.ValueOrError() - if err != nil { - errCh <- err - // Don't publish (skip) if eitherEventBz contained an error. - // eitherEventBz should automatically close itself in this case. - // (i.e. no more values should be mapped to this transformFn's respective - // dstObservable). - return nil, true - } - - block, err := newCometBlockEvent(eventBz) - if err != nil { - if ErrUnmarshalBlockEvent.Is(err) { - // Don't publish (skip) if the message was not a block event. - return nil, true - } - - panic(fmt.Sprintf( - "unexpected error deserializing block event: %s; eventBz: %s", - err, string(eventBz), - )) - } - return block, false - } +// Close closes the underlying websocket connection for the EventsQueryClient +// and closes all downstream connections. +func (b *blockClient) Close() { + b.eventsReplayClient.Close() } diff --git a/pkg/client/block/client_integration_test.go b/pkg/client/block/client_integration_test.go index c242d7474..324b032a5 100644 --- a/pkg/client/block/client_integration_test.go +++ b/pkg/client/block/client_integration_test.go @@ -18,13 +18,13 @@ import ( const blockIntegrationSubTimeout = 5 * time.Second -func TestBlockClient_LatestBlock(t *testing.T) { +func TestBlockClient_LastNBlocks(t *testing.T) { ctx := context.Background() blockClient := testblock.NewLocalnetClient(ctx, t) require.NotNil(t, blockClient) - block := blockClient.LatestBlock(ctx) + block := blockClient.LastNBlocks(ctx, 1) require.NotEmpty(t, block) } diff --git a/pkg/client/block/client_test.go b/pkg/client/block/client_test.go index 768fa59f9..061c42f26 100644 --- a/pkg/client/block/client_test.go +++ b/pkg/client/block/client_test.go @@ -62,9 +62,9 @@ func TestBlockClient(t *testing.T) { fn func() client.Block }{ { - name: "LatestBlock successfully returns latest block", + name: "LastNBlocks(1) successfully returns latest block", fn: func() client.Block { - lastBlock := blockClient.LatestBlock(ctx) + lastBlock := blockClient.LastNBlocks(ctx, 1)[0] return lastBlock }, }, @@ -86,7 +86,7 @@ func TestBlockClient(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var actualBlockCh = make(chan client.Block, 10) + actualBlockCh := make(chan client.Block, 10) // Run test functions asynchronously because they can block, leading // to an unresponsive test. If any of the methods under test hang, diff --git a/pkg/client/block/errors.go b/pkg/client/block/errors.go deleted file mode 100644 index 0a0cc28c9..000000000 --- a/pkg/client/block/errors.go +++ /dev/null @@ -1,8 +0,0 @@ -package block - -import errorsmod "cosmossdk.io/errors" - -var ( - ErrUnmarshalBlockEvent = errorsmod.Register(codespace, 1, "failed to unmarshal committed block event") - codespace = "block_client" -) diff --git a/pkg/client/block/godoc.go b/pkg/client/block/godoc.go new file mode 100644 index 000000000..01d829f2e --- /dev/null +++ b/pkg/client/block/godoc.go @@ -0,0 +1,5 @@ +// Package block contains a light wrapper of the EventsReplayClient[Block] +// generic which listens for committed block events on chain and emits them +// through a ReplayObservable. This enables consumers to listen for newly +// committed blocks and react to them asynchronously. +package block diff --git a/pkg/client/delegation/client.go b/pkg/client/delegation/client.go new file mode 100644 index 000000000..c45d3859a --- /dev/null +++ b/pkg/client/delegation/client.go @@ -0,0 +1,84 @@ +package delegation + +import ( + "context" + + "cosmossdk.io/depinject" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/events" +) + +const ( + // delegationEventQuery is the query used by the EventsQueryClient to subscribe + // to new delegation events from the the application module on chain. + // See: https://docs.cosmos.network/v0.47/learn/advanced/events#subscribing-to-events + // And: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events + delegationEventQuery = "message.action='pocket.application.EventRedelegation'" + // TODO_TECHDEBT/TODO_FUTURE: add a `blocksReplayLimit` field to the block + // client struct that defaults to this but can be overridden via an option + // in future work. + // defaultRedelegationsReplayLimit is the number of redelegations that the + // replay observable returned by LastNRedelegations() will be able to replay. + defaultRedelegationsReplayLimit = 100 +) + +// NewDelegationClient creates a new delegation client from the given +// dependencies and cometWebsocketURL. It uses a pre-defined delegationEventQuery +// to subscribe to newly emitted redelegation events which are mapped to +// Redelegation objects. +// +// This lightly wraps the EventsReplayClient[Redelegation] generic to +// correctly mock the interface. +// +// Required dependencies: +// - client.EventsQueryClient +func NewDelegationClient( + ctx context.Context, + deps depinject.Config, + cometWebsocketURL string, +) (client.DelegationClient, error) { + client, err := events.NewEventsReplayClient[ + client.Redelegation, + client.EventsObservable[client.Redelegation], + ]( + ctx, + deps, + cometWebsocketURL, + delegationEventQuery, + newRedelegationEventFactoryFn(), + defaultRedelegationsReplayLimit, + ) + if err != nil { + return nil, err + } + return &delegationClient{eventsReplayClient: client}, nil +} + +// delegationClient is a wrapper around an EventsReplayClient that implements +// the DelegationClient interface for use with cosmos-sdk networks. +type delegationClient struct { + // eventsReplayClient is the underlying EventsReplayClient that is used to + // subscribe to new delegation events. It uses both the Redelegation type + // and the RedelegationReplayObservable type as its generic types. + // These enable the EventsReplayClient to correctly map the raw event bytes + // to Redelegation objects and to correctly return a RedelegationReplayObservable + eventsReplayClient client.EventsReplayClient[client.Redelegation, client.EventsObservable[client.Redelegation]] +} + +// RedelegationsSequence returns a replay observable of Redelgation events +// observed by the DelegationClient. +func (b *delegationClient) RedelegationsSequence(ctx context.Context) client.RedelegationReplayObservable { + return b.eventsReplayClient.EventsSequence(ctx) +} + +// LastNRedelegations returns the latest n redelegation events from the DelegationClient. +func (b *delegationClient) LastNRedelegations(ctx context.Context, n int) []client.Redelegation { + return b.eventsReplayClient.LastNEvents(ctx, n) +} + +// Close closes the underlying websocket connection for the EventsQueryClient +// and closes all downstream connections. +func (b *delegationClient) Close() { + b.eventsReplayClient.Close() +} diff --git a/pkg/client/delegation/client_integration_test.go b/pkg/client/delegation/client_integration_test.go new file mode 100644 index 000000000..e52922442 --- /dev/null +++ b/pkg/client/delegation/client_integration_test.go @@ -0,0 +1,173 @@ +//go:build integration + +package delegation_test + +// TODO(@h5law): Figure out how to use real components of the localnet +// - Create app and gateway actors +// - Stake them +// - Delegate to the gateway +// - Undelegate from the gateway +// Currently this test doesn't work, because (I think) it is using a mock +// keeper etc and this isnt actually interacting with the localnet where +// the DelegationClient is listening for events from. + +import ( + "context" + "sync" + "testing" + "time" + + "cosmossdk.io/depinject" + "github.com/cosmos/cosmos-sdk/testutil" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/delegation" + "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/testutil/network" + apptypes "github.com/pokt-network/poktroll/x/application/types" + gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" +) + +const ( + delegationIntegrationSubTimeout = 180 * time.Second +) + +// TODO_UPNEXT(@h5law): Figure out the correct way to subscribe to events on the +// simulated localnet. Currently this test doesn't work. Although the block client +// subscribes it doesn't receive any events. +func TestDelegationClient_RedelegationsObservables(t *testing.T) { + t.SkipNow() + // Create the network with 2 applications and 1 gateway + net, appAddresses, gatewayAddr := createNetworkWithApplicationsAndGateways(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create the delegation client + evtQueryClient := events.NewEventsQueryClient("ws://localhost:26657/websocket") + deps := depinject.Supply(evtQueryClient) + delegationClient, err := delegation.NewDelegationClient(ctx, deps, "ws://localhost:26657/websocket") + require.NoError(t, err) + require.NotNil(t, delegationClient) + t.Cleanup(func() { + delegationClient.Close() + }) + + // Subscribe to the delegation events + delegationSub := delegationClient.RedelegationsSequence(ctx).Subscribe(ctx) + + var ( + delegationMu = sync.Mutex{} // mutext to protect delegationChangeCounter + delegationChangeCounter int // counter to keep track of the number of delegation changes + expectedChanges = 4 // expected number of delegation changes + errCh = make(chan error, 1) // channel to signal the test to stop + ) + go func() { + // The test will delegate from app1 to gateway, then from app2 to gateway + // and then undelegate app1 from gateway and then undelegate app2 from gateway + // We expect to receive 4 delegation changes where the address of the + // Redelegation event alternates between app1 and app2 + var previousRedelegation client.Redelegation + for change := range delegationSub.Ch() { + t.Logf("received delegation change: %+v", change) + // Verify that the Redelegation event is valid and that the address + // of the Redelegation event alternates between app1 and app2 + if previousRedelegation != nil { + require.NotEqual(t, previousRedelegation.GetAppAddress(), change.GetAppAddress()) + if previousRedelegation.AppAddress() == appAddresses[0] { + require.Equal(t, appAddresses[1], change.GetAppAddress()) + } else { + require.Equal(t, appAddresses[0], change.GetAppAddress()) + } + } + previousRedelegation = change + + require.NotEmpty(t, change) + delegationMu.Lock() + delegationChangeCounter++ + if delegationChangeCounter >= expectedChanges { + errCh <- nil + return + } + delegationMu.Unlock() + } + }() + + // Delegate from app1 to gateway + t.Log(time.Now().String()) + t.Logf("delegating from app %s to gateway %s", appAddresses[0], gatewayAddr) + network.DelegateAppToGateway(t, net, appAddresses[0], gatewayAddr) + // need to wait for the account to be initialized in the next block + require.NoError(t, net.WaitForNextBlock()) + // Delegate from app2 to gateway + t.Logf("delegating from app %s to gateway %s", appAddresses[1], gatewayAddr) + network.DelegateAppToGateway(t, net, appAddresses[1], gatewayAddr) + // need to wait for the account to be initialized in the next block + require.NoError(t, net.WaitForNextBlock()) + // Undelegate from app1 to gateway + t.Logf("undelegating from app %s to gateway %s", appAddresses[0], gatewayAddr) + network.UndelegateAppFromGateway(t, net, appAddresses[0], gatewayAddr) + // need to wait for the account to be initialized in the next block + require.NoError(t, net.WaitForNextBlock()) + // Undelegate from app2 to gateway + t.Logf("undelegating from app %s to gateway %s", appAddresses[1], gatewayAddr) + network.UndelegateAppFromGateway(t, net, appAddresses[1], gatewayAddr) + // need to wait for the account to be initialized in the next block + require.NoError(t, net.WaitForNextBlock()) + + select { + case err := <-errCh: + require.NoError(t, err) + require.Equal(t, expectedChanges, delegationChangeCounter) + case <-time.After(delegationIntegrationSubTimeout): + t.Log(time.Now().String()) + t.Fatalf( + "timed out waiting for delegation subscription; expected %d delegation events, got %d", + expectedChanges, delegationChangeCounter, + ) + } +} + +// createNetworkWithApplicationsAndGateways creates a network with 2 applications +// and 1 gateway. It returns the network with all accoutns initialized via a +// transaction from the first validator. +func createNetworkWithApplicationsAndGateways( + t *testing.T, +) (net *network.Network, appAddresses []string, gatewayAddress string) { + // Prepare the network + cfg := network.DefaultConfig() + net = network.New(t, cfg) + ctx := net.Validators[0].ClientCtx + + // Prepare the keyring for the 2 applications and 1 gateway account + kr := ctx.Keyring + accounts := testutil.CreateKeyringAccounts(t, kr, 3) + ctx = ctx.WithKeyring(kr) + + // Initialize all the accounts + for i, account := range accounts { + signatureSequenceNumber := i + 1 + network.InitAccountWithSequence(t, net, account.Address, signatureSequenceNumber) + } + // need to wait for the account to be initialized in the next block + require.NoError(t, net.WaitForNextBlock()) + + addresses := make([]string, len(accounts)) + for i, account := range accounts { + addresses[i] = account.Address.String() + } + + // Create two applications + appGenesisState := network.ApplicationModuleGenesisStateWithAddresses(t, addresses[0:2]) + buf, err := cfg.Codec.MarshalJSON(appGenesisState) + require.NoError(t, err) + cfg.GenesisState[apptypes.ModuleName] = buf + + // Create a single gateway + gatewayGenesisState := network.GatewayModuleGenesisStateWithAddresses(t, addresses[2:3]) + buf, err = cfg.Codec.MarshalJSON(gatewayGenesisState) + require.NoError(t, err) + cfg.GenesisState[gatewaytypes.ModuleName] = buf + + return net, addresses[0:2], addresses[2] +} diff --git a/pkg/client/delegation/client_test.go b/pkg/client/delegation/client_test.go new file mode 100644 index 000000000..8fd13d4a3 --- /dev/null +++ b/pkg/client/delegation/client_test.go @@ -0,0 +1,102 @@ +package delegation_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "cosmossdk.io/depinject" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/delegation" + "github.com/pokt-network/poktroll/testutil/sample" + "github.com/pokt-network/poktroll/testutil/testclient" + "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" + apptypes "github.com/pokt-network/poktroll/x/application/types" +) + +const ( + testTimeoutDuration = 100 * time.Millisecond + + // duplicates pkg/client/delegation/client.go's delegationEventQuery for testing purposes. + delegationEventQuery = "message.action='pocket.application.EventRedelegation'" +) + +func TestDelegationClient(t *testing.T) { + var ( + expectedAddress = sample.AccAddress() + expectedDelegationEvent = apptypes.EventRedelegation{ + AppAddress: expectedAddress, + GatewayAddress: sample.AccAddress(), + } + ctx = context.Background() + ) + + expectedEventBz, err := json.Marshal(expectedDelegationEvent) + require.NoError(t, err) + + eventsQueryClient := testeventsquery.NewAnyTimesEventsBytesEventsQueryClient( + ctx, t, + delegationEventQuery, + expectedEventBz, + ) + + deps := depinject.Supply(eventsQueryClient) + + // Set up delegation client. + // NB: the URL passed to `NewDelegationClient` is irrelevant here because `eventsQueryClient` is a mock. + delegationClient, err := delegation.NewDelegationClient(ctx, deps, testclient.CometLocalWebsocketURL) + require.NoError(t, err) + require.NotNil(t, delegationClient) + + tests := []struct { + name string + fn func() client.Redelegation + }{ + { + name: "LastNRedelegations successfully returns latest redelegation", + fn: func() client.Redelegation { + lastRedelegation := delegationClient.LastNRedelegations(ctx, 1)[0] + return lastRedelegation + }, + }, + { + name: "RedelegationsSequence successfully returns latest redelegation", + fn: func() client.Redelegation { + redelegationObs := delegationClient.RedelegationsSequence(ctx) + require.NotNil(t, redelegationObs) + + // Ensure that the observable is replayable via Last. + lastRedelegation := redelegationObs.Last(ctx, 1)[0] + require.Equal(t, expectedAddress, lastRedelegation.GetAppAddress()) + + return lastRedelegation + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualRedelegationCh := make(chan client.Redelegation, 10) + + // Run test functions asynchronously because they can block, leading + // to an unresponsive test. If any of the methods under test hang, + // the test will time out in the select statement that follows. + go func(fn func() client.Redelegation) { + actualRedelegationCh <- fn() + close(actualRedelegationCh) + }(tt.fn) + + select { + case actualRedelegation := <-actualRedelegationCh: + require.Equal(t, expectedAddress, actualRedelegation.GetAppAddress()) + case <-time.After(testTimeoutDuration): + t.Fatal("timed out waiting for redelegation event") + } + }) + } + + delegationClient.Close() +} diff --git a/pkg/client/delegation/godoc.go b/pkg/client/delegation/godoc.go new file mode 100644 index 000000000..76fad49ba --- /dev/null +++ b/pkg/client/delegation/godoc.go @@ -0,0 +1,5 @@ +// Package delegation contains a light wrapper of the EventsReplayClient[DeelgateeChange] +// generic which listens for redelegation events on chain and emits them +// through a ReplayObservable. This enables consumers to listen for on-chain +// application redelegation events and react to them asynchronously. +package delegation diff --git a/pkg/client/delegation/redelegation.go b/pkg/client/delegation/redelegation.go new file mode 100644 index 000000000..a9fecae6e --- /dev/null +++ b/pkg/client/delegation/redelegation.go @@ -0,0 +1,48 @@ +package delegation + +import ( + "encoding/json" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/events" +) + +var _ client.Redelegation = (*redelegation)(nil) + +// redelegation wraps the EventRedelegation event emitted by the application +// module, for use in the observable +type redelegation struct { + AppAddress string `json:"app_address"` + GatewayAddress string `json:"gateway_address"` +} + +// GetAppAddress returns the application address of the redelegation event +func (d redelegation) GetAppAddress() string { + return d.AppAddress +} + +// GetGatewayAddress returns the gateway address of the redelegation event +func (d redelegation) GetGatewayAddress() string { + return d.GatewayAddress +} + +// newRedelegationEventFactoryFn is a factory function that returns a +// function that attempts to deserialise the given bytes into a redelegation +// struct. If the delegate struct has an empty app address then an +// ErrUnmarshalRedelegation error is returned. Otherwise if deserialisation +// fails then the error is returned. +func newRedelegationEventFactoryFn() events.NewEventsFn[client.Redelegation] { + return func(redelegationEventBz []byte) (client.Redelegation, error) { + redelegationEvent := new(redelegation) + if err := json.Unmarshal(redelegationEventBz, redelegationEvent); err != nil { + return nil, err + } + + if redelegationEvent.AppAddress == "" || redelegationEvent.GatewayAddress == "" { + return nil, events.ErrEventsUnmarshalEvent. + Wrapf("with redelegation: %s", string(redelegationEventBz)) + } + + return redelegationEvent, nil + } +} diff --git a/pkg/client/events/errors.go b/pkg/client/events/errors.go new file mode 100644 index 000000000..3f6afaf58 --- /dev/null +++ b/pkg/client/events/errors.go @@ -0,0 +1,14 @@ +package events + +import ( + sdkerrors "cosmossdk.io/errors" +) + +var ( + codespace = "events" + + ErrEventsDial = sdkerrors.Register(codespace, 1, "dialing for connection failed") + ErrEventsConnClosed = sdkerrors.Register(codespace, 2, "connection closed") + ErrEventsSubscribe = sdkerrors.Register(codespace, 3, "failed to subscribe to events") + ErrEventsUnmarshalEvent = sdkerrors.Register(codespace, 4, "failed to unmarshal event bytes") +) diff --git a/pkg/client/events/godoc.go b/pkg/client/events/godoc.go new file mode 100644 index 000000000..b5bbf685f --- /dev/null +++ b/pkg/client/events/godoc.go @@ -0,0 +1,12 @@ +// Package events provides a generic client for subscribing to on-chain events +// via an EventsQueryClient and transforming the received events into the type +// defined by the EventsReplayClient's generic type parameter. +// +// The EventsQueryClient emits ReplayObservables which are of the type defined +// by the EventsReplayClient's generic type parameter. +// +// The usage of of ReplayObservables allows the EventsReplayClient to be always +// provide the latest event data to the caller, even if the connection to the +// EventsQueryClient is lost and re-established, without the caller having to +// re-subscribe to the EventsQueryClient. +package events diff --git a/pkg/client/events_query/options.go b/pkg/client/events/options.go similarity index 95% rename from pkg/client/events_query/options.go rename to pkg/client/events/options.go index 0e2a622fe..41fca7548 100644 --- a/pkg/client/events_query/options.go +++ b/pkg/client/events/options.go @@ -1,4 +1,4 @@ -package eventsquery +package events import "github.com/pokt-network/poktroll/pkg/client" diff --git a/pkg/client/events_query/client.go b/pkg/client/events/query_client.go similarity index 98% rename from pkg/client/events_query/client.go rename to pkg/client/events/query_client.go index 34a7d35d2..41cafe98f 100644 --- a/pkg/client/events_query/client.go +++ b/pkg/client/events/query_client.go @@ -1,4 +1,4 @@ -package eventsquery +package events import ( "context" @@ -12,7 +12,7 @@ import ( "go.uber.org/multierr" "github.com/pokt-network/poktroll/pkg/client" - "github.com/pokt-network/poktroll/pkg/client/events_query/websocket" + "github.com/pokt-network/poktroll/pkg/client/events/websocket" "github.com/pokt-network/poktroll/pkg/either" "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" @@ -185,12 +185,12 @@ func (eqc *eventsQueryClient) openEventsBytesAndConn( // Get a connection from the dialer. conn, err := eqc.dialer.DialContext(ctx, eqc.cometWebsocketURL) if err != nil { - return nil, ErrDial.Wrapf("%s", err) + return nil, ErrEventsDial.Wrapf("%s", err) } // Send the event subscription request on the connection. if err := conn.Send(req); err != nil { - subscribeErr := ErrSubscribe.Wrapf("%s", err) + subscribeErr := ErrEventsSubscribe.Wrapf("%s", err) // assume the connection is bad closeErr := conn.Close() return nil, multierr.Combine(subscribeErr, closeErr) diff --git a/pkg/client/events_query/client_integration_test.go b/pkg/client/events/query_client_integration_test.go similarity index 98% rename from pkg/client/events_query/client_integration_test.go rename to pkg/client/events/query_client_integration_test.go index a6a0e3bcd..cbd9c426b 100644 --- a/pkg/client/events_query/client_integration_test.go +++ b/pkg/client/events/query_client_integration_test.go @@ -1,6 +1,6 @@ //go:build integration -package eventsquery_test +package events_test import ( "context" diff --git a/pkg/client/events_query/client_test.go b/pkg/client/events/query_client_test.go similarity index 91% rename from pkg/client/events_query/client_test.go rename to pkg/client/events/query_client_test.go index aae5f72d2..2c868ac42 100644 --- a/pkg/client/events_query/client_test.go +++ b/pkg/client/events/query_client_test.go @@ -1,4 +1,4 @@ -package eventsquery_test +package events_test import ( "context" @@ -13,12 +13,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/pokt-network/poktroll/testutil/mockclient" - - eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" - "github.com/pokt-network/poktroll/pkg/client/events_query/websocket" + "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/client/events/websocket" "github.com/pokt-network/poktroll/pkg/either" "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/testutil/testchannel" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" "github.com/pokt-network/poktroll/testutil/testerrors" @@ -48,8 +47,8 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) { Times(queryLimit) // Set up events query client. - dialerOpt := eventsquery.WithDialer(dialerMock) - queryClient := eventsquery.NewEventsQueryClient("", dialerOpt) + dialerOpt := events.WithDialer(dialerMock) + queryClient := events.NewEventsQueryClient("", dialerOpt) t.Cleanup(queryClient.Close) for queryIdx := 0; queryIdx < queryLimit; queryIdx++ { @@ -93,7 +92,7 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) { // Simulate ErrConnClosed if connection is isClosed. if connClosed.Load() { - return nil, eventsquery.ErrConnClosed + return nil, events.ErrEventsConnClosed } event := testEvent(int32(readEventCounter)) @@ -129,7 +128,7 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) { behavesLikeEitherObserver( t, eventObserver, handleEventsLimit, - eventsquery.ErrConnClosed, + events.ErrEventsConnClosed, readObserverEventsTimeout, onLimit, ) @@ -160,7 +159,7 @@ func TestEventsQueryClient_Subscribe_Close(t *testing.T) { delayFirstEvent.Do(func() { time.Sleep(firstEventDelay) }) if connClosed.Load() { - return nil, eventsquery.ErrConnClosed + return nil, events.ErrEventsConnClosed } event := testEvent(int32(readEventCounter)) @@ -173,8 +172,8 @@ func TestEventsQueryClient_Subscribe_Close(t *testing.T) { }). MinTimes(handleEventsLimit) - dialerOpt := eventsquery.WithDialer(dialerMock) - queryClient := eventsquery.NewEventsQueryClient("", dialerOpt) + dialerOpt := events.WithDialer(dialerMock) + queryClient := events.NewEventsQueryClient("", dialerOpt) // set up query observer eventsObservable, err := queryClient.EventsBytes(ctx, testQuery(0)) @@ -194,7 +193,7 @@ func TestEventsQueryClient_Subscribe_Close(t *testing.T) { behavesLikeEitherObserver( t, eventsObserver, handleEventsLimit, - eventsquery.ErrConnClosed, + events.ErrEventsConnClosed, readAllEventsTimeout, onLimit, ) @@ -203,14 +202,14 @@ func TestEventsQueryClient_Subscribe_Close(t *testing.T) { func TestEventsQueryClient_Subscribe_DialError(t *testing.T) { ctx := context.Background() - eitherErrDial := either.Error[*mockclient.MockConnection](eventsquery.ErrDial) + eitherErrDial := either.Error[*mockclient.MockConnection](events.ErrEventsDial) dialerMock := testeventsquery.NewOneTimeMockDialer(t, eitherErrDial) - dialerOpt := eventsquery.WithDialer(dialerMock) - queryClient := eventsquery.NewEventsQueryClient("", dialerOpt) + dialerOpt := events.WithDialer(dialerMock) + queryClient := events.NewEventsQueryClient("", dialerOpt) eventsObservable, err := queryClient.EventsBytes(ctx, testQuery(0)) require.Nil(t, eventsObservable) - require.True(t, errors.Is(err, eventsquery.ErrDial)) + require.True(t, errors.Is(err, events.ErrEventsDial)) } func TestEventsQueryClient_Subscribe_RequestError(t *testing.T) { @@ -221,11 +220,11 @@ func TestEventsQueryClient_Subscribe_RequestError(t *testing.T) { Return(fmt.Errorf("mock send error")). Times(1) - dialerOpt := eventsquery.WithDialer(dialerMock) - queryClient := eventsquery.NewEventsQueryClient("url_ignored", dialerOpt) + dialerOpt := events.WithDialer(dialerMock) + queryClient := events.NewEventsQueryClient("url_ignored", dialerOpt) eventsObservable, err := queryClient.EventsBytes(ctx, testQuery(0)) require.Nil(t, eventsObservable) - require.True(t, errors.Is(err, eventsquery.ErrSubscribe)) + require.True(t, errors.Is(err, events.ErrEventsSubscribe)) // cancelling the context should close the connection cancel() @@ -253,7 +252,7 @@ func TestEventsQueryClient_Subscribe_ReceiveError(t *testing.T) { connMock.EXPECT().Receive(). DoAndReturn(func() (any, error) { if readEventCounter >= handleEventLimit { - return nil, websocket.ErrReceive + return nil, websocket.ErrEventsWebsocketReceive } event := testEvent(int32(readEventCounter)) @@ -264,8 +263,8 @@ func TestEventsQueryClient_Subscribe_ReceiveError(t *testing.T) { }). MinTimes(handleEventLimit) - dialerOpt := eventsquery.WithDialer(dialerMock) - queryClient := eventsquery.NewEventsQueryClient("", dialerOpt) + dialerOpt := events.WithDialer(dialerMock) + queryClient := events.NewEventsQueryClient("", dialerOpt) // set up query observer eventsObservable, err := queryClient.EventsBytes(ctx, testQuery(0)) @@ -276,7 +275,7 @@ func TestEventsQueryClient_Subscribe_ReceiveError(t *testing.T) { behavesLikeEitherObserver( t, eventsObserver, handleEventLimit, - websocket.ErrReceive, + websocket.ErrEventsWebsocketReceive, readAllEventsTimeout, nil, ) diff --git a/pkg/client/events/replay_client.go b/pkg/client/events/replay_client.go new file mode 100644 index 000000000..ac3242056 --- /dev/null +++ b/pkg/client/events/replay_client.go @@ -0,0 +1,249 @@ +package events + +import ( + "context" + "fmt" + "time" + + "cosmossdk.io/depinject" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/retry" +) + +const ( + // eventsBytesRetryDelay is the delay between retry attempts when the events + // bytes observable returns an error. + eventsBytesRetryDelay = time.Second + // eventsBytesRetryLimit is the maximum number of times to attempt to + // re-establish the events query bytes subscription when the events bytes + // observable returns an error. + eventsBytesRetryLimit = 10 + eventsBytesRetryResetTimeout = 10 * time.Second + // replayObsCacheBufferSize is the replay buffer size of the + // replayObsCache replay observable which is used to cache the replay + // observable that is notified of new events. + // It, replayObsCache, is updated with a new "active" observable when a new + // events query subscription is created, for example, after a non-persistent + // connection error. + replayObsCacheBufferSize = 1 +) + +// Enforce the EventsReplayClient interface is implemented by the replayClient type. +var _ client.EventsReplayClient[ + any, + observable.ReplayObservable[any], +] = (*replayClient[any, observable.ReplayObservable[any]])(nil) + +// NewEventsFn is a function that takes a byte slice and returns a new instance +// of the generic type T. +type NewEventsFn[T any] func([]byte) (T, error) + +// replayClient implements the EventsReplayClient interface for a generic type T, +// and replay observable for type T. +type replayClient[T any, U observable.ReplayObservable[T]] struct { + // endpointURL is the URL of RPC endpoint which eventsClient subscription + // requests will be sent. + endpointURL string + // queryString is the query string used to subscribe to events of the + // desired type. + // See: https://docs.cosmos.network/main/learn/advanced/events#subscribing-to-events + // and: https://docs.cosmos.network/main/learn/advanced/events#default-events + queryString string + // eventsClient is the events query client which is used to subscribe to + // newly committed block events. It emits an either value which may contain + // an error, at most, once and closes immediately after if it does. + eventsClient client.EventsQueryClient + // eventDecoder is a function which decodes event subscription + // message bytes into the type defined by the EventsReplayClient's generic type + // parameter. + eventDecoder NewEventsFn[T] + // replayObsCache is a replay observable with replay buffer size 1, + // which holds the "active latest observable" which is notified when + // new events are received by the events query client subscription + // created in goPublishEvents. This observable (and the one it emits) closes + // when the events bytes observable returns an error and is updated with a + // new "active" observable after a new events query subscription is created. + replayObsCache observable.ReplayObservable[U] + // replayObsCachePublishCh is the publish channel for replayObsCache. + // It's used to set and subsequently update replayObsCache the events replay + // observable; + // For example when the connection is re-established after erroring. + replayObsCachePublishCh chan<- U +} + +// NewEventsReplayClient creates a new EventsReplayClient from the given +// dependencies, cometWebsocketURL and subscription query string. It requires a +// decoder function to be provided which decodes event subscription message +// bytes into the type defined by the EventsReplayClient's generic type parameter. +// The replayObsBufferSize is the replay buffer size of the replay observable +// which is notified of new events. +// +// Required dependencies: +// - client.EventsQueryClient +func NewEventsReplayClient[T any, U observable.ReplayObservable[T]]( + ctx context.Context, + deps depinject.Config, + cometWebsocketURL string, + queryString string, + newEventFn NewEventsFn[T], + replayObsBufferSize int, +) (client.EventsReplayClient[T, U], error) { + // Initialize the replay client + rClient := &replayClient[T, U]{ + endpointURL: cometWebsocketURL, + queryString: queryString, + eventDecoder: newEventFn, + } + replayObsCache, replayObsCachePublishCh := channel.NewReplayObservable[U]( + ctx, + replayObsBufferSize, + ) + rClient.replayObsCache = observable.ReplayObservable[U](replayObsCache) + rClient.replayObsCachePublishCh = replayObsCachePublishCh + + // Inject dependencies + if err := depinject.Inject(deps, &rClient.eventsClient); err != nil { + return nil, err + } + + // Concurrently publish blocks to the observable emitted by latestObsvbls. + go rClient.goPublishEvents(ctx) + + return rClient, nil +} + +// EventsSequence returns a ReplayObservable, with a replay buffer size of 1, +// which is notified when new events are received by the encapsulated +// EventsQueryClient. +func (rClient *replayClient[T, R]) EventsSequence(ctx context.Context) R { + // Get the active event replay observable from replayObsCache. Only the last + // element is useful as any prior elements are closed replay observables. + // Directly accessing the zeroth index here is safe because the call to Last + // is guaranteed to return a slice with at least 1 element. + replayObs := observable.ReplayObservable[R](rClient.replayObsCache) + return replayObs.Last(ctx, 1)[0] +} + +// LastNEvents returns the last N typed events that have been received by the +// corresponding events query subscription. +// It blocks until at least one event has been received. +func (rClient *replayClient[T, R]) LastNEvents(ctx context.Context, n int) []T { + return rClient.EventsSequence(ctx).Last(ctx, n) +} + +// Close unsubscribes all observers of the committed blocks sequence observable +// and closes the events query client. +func (rClient *replayClient[T, R]) Close() { + // Closing eventsClient will cascade unsubscribe and close downstream observers. + rClient.eventsClient.Close() +} + +// goPublishEvents runs the work function returned by retryPublishEventsFactory, +// re-invoking it according to the arguments to retry.OnError when the events bytes +// observable returns an asynchronous error. +// This function is intended to be called in a goroutine. +func (rClient *replayClient[T, R]) goPublishEvents(ctx context.Context) { + // React to errors by getting a new events bytes observable, re-mapping it, + // and send it to latestObsvblsReplayPublishCh such that + // latestObsvbls.Last(ctx, 1) will return it. + publishErr := retry.OnError( + ctx, + eventsBytesRetryLimit, + eventsBytesRetryDelay, + eventsBytesRetryResetTimeout, + "goPublishEvents", + rClient.retryPublishEventsFactory(ctx), + ) + + // If we get here, the retry limit was reached and the retry loop exited. + // Since this function runs in a goroutine, we can't return the error to the + // caller. Instead, we panic. + if publishErr != nil { + panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishErr)) + } +} + +// retryPublishEventsFactory returns a function which is intended to be passed +// to retry.OnError. The returned function pipes event bytes from the events +// query client, maps them to block events, and publishes them to the +// latestObsvbls replay observable. +func (rClient *replayClient[T, R]) retryPublishEventsFactory(ctx context.Context) func() chan error { + return func() chan error { + errCh := make(chan error, 1) + eventsBzObs, err := rClient.eventsClient.EventsBytes(ctx, rClient.queryString) + if err != nil { + errCh <- err + return errCh + } + + // NB: must cast back to generic observable type to use with Map. + // client.BlocksObservable cannot be an alias due to gomock's lack of + // support for generic types. + eventsBz := observable.Observable[either.Either[[]byte]](eventsBzObs) + typedObs := channel.MapReplay( + ctx, + replayObsCacheBufferSize, + eventsBz, + rClient.newMapEventsBytesToTFn(errCh), + ) + + // Initially set latestObsvbls and update if after retrying on error. + rClient.replayObsCachePublishCh <- typedObs.(R) + + return errCh + } +} + +// newMapEventsBytesToTFn is a factory for a function which is intended +// to be used as a transformFn in a channel.Map() call. Since the map function +// is called asynchronously, this factory creates a closure around an error +// channel which can be used for asynchronous error signaling from within the +// map function, and handling from the Map call context. +// +// The map function itself attempts to deserialize the given byte slice as a +// the EventsReplayClient's generic typed event, using the decoder function provided. +// If the events bytes observable contained an error, this value is not emitted +// (skipped) on the destination observable of the map operation. +// +// If deserialisation failed because the event bytes were for a different event +// type, this value is also skipped. If deserialisation failed for some other +// reason, this function panics. +func (rClient *replayClient[T, R]) newMapEventsBytesToTFn(errCh chan<- error) func( + context.Context, + either.Bytes, +) (T, bool) { + return func( + _ context.Context, + eitherEventBz either.Bytes, + ) (_ T, skip bool) { + eventBz, err := eitherEventBz.ValueOrError() + if err != nil { + errCh <- err + // Don't publish (skip) if eitherEventBz contained an error. + // eitherEventBz should automatically close itself in this case. + // (i.e. no more values should be mapped to this transformFn's respective + // dstObservable). + return *new(T), true + } + + // attempt to decode the event bytes using the decoder function provided + // during the EventsReplayClient's construction. + event, err := rClient.eventDecoder(eventBz) + if err != nil { + if ErrEventsUnmarshalEvent.Is(err) { + // Don't publish (skip) if the message was not the correct event. + return *new(T), true + } + + panic(fmt.Sprintf( + "unexpected error deserialising event: %v; eventBz: %s", + err, string(eventBz), + )) + } + return event, false + } +} diff --git a/pkg/client/events/replay_client_example_test.go b/pkg/client/events/replay_client_example_test.go new file mode 100644 index 000000000..15a656665 --- /dev/null +++ b/pkg/client/events/replay_client_example_test.go @@ -0,0 +1,108 @@ +package events_test + +import ( + "context" + "encoding/json" + "fmt" + + "cosmossdk.io/depinject" + + "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/polylog" +) + +const ( + // Define a query string to provide to the EventsQueryClient + // See: https://docs.cosmos.network/v0.47/learn/advanced/events#subscribing-to-events + // And: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events + eventQueryString = "message.action='eventName'" + // Define the websocket URL the EventsQueryClient will subscribe to + cometWebsocketURL = "ws://example.com:36657/websocket" + // the amount of events we want before they are emitted + replayObsBufferSize = 1 +) + +// Define an interface to represent the onchain event +type EventType interface { + GetName() string // Illustrative only; arbitrary interfaces are supported. +} + +// Define the event type that implements the interface +type eventType struct { + Name string `json:"name"` +} + +// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsFn +func eventTypeFactory(ctx context.Context) events.NewEventsFn[EventType] { + // Define a decoder function that can take the raw event bytes + // received from the EventsQueryClient and convert them into + // the desired type for the EventsReplayClient + return func(eventBz []byte) (EventType, error) { + eventMsg := new(eventType) + logger := polylog.Ctx(ctx) + + if err := json.Unmarshal(eventBz, eventMsg); err != nil { + return nil, err + } + + // Confirm the event is correct by checking its fields + if eventMsg.Name == "" { + logger.Error().Str("eventBz", string(eventBz)).Msg("event type is not correct") + return nil, events.ErrEventsUnmarshalEvent. + Wrapf("with eventType data: %s", string(eventBz)) + } + + return eventMsg, nil + } +} + +func (e *eventType) GetName() string { return e.Name } + +func ExampleNewEventsReplayClient() { + // Create the events query client and a depinject config to supply + // it into the EventsReplayClient + // See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsQueryClient + evtClient := events.NewEventsQueryClient(cometWebsocketURL) + depConfig := depinject.Supply(evtClient) + + // Create a context (this should be cancellable to close the EventsReplayClient) + ctx, cancel := context.WithCancel(context.Background()) + + // Create a new instance of the EventsReplayClient + // See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsReplayClient + client, err := events.NewEventsReplayClient[ + EventType, + observable.ReplayObservable[EventType], + ]( + ctx, + depConfig, + cometWebsocketURL, + eventQueryString, + eventTypeFactory(ctx), + replayObsBufferSize, + ) + if err != nil { + panic(fmt.Errorf("unable to create EventsReplayClient %v", err)) + } + + // Assume events the lastest event emitted of type EventType has the name "testEvent" + + // Retrieve the latest emitted event + lastEventType := client.LastNEvents(ctx, 1)[0] + fmt.Printf("Last Event: '%s'\n", lastEventType.GetName()) + + // Get the latest replay observable from the EventsReplayClient + // In order to get the latest events from the sequence + latestEventsObs := client.EventsSequence(ctx) + // Get the latest events from the sequence + lastEventType = latestEventsObs.Last(ctx, 1)[0] + fmt.Printf("Last Event: '%s'\n", lastEventType.GetName()) + + // Cancel the context which will call client.Close and close all + // subscriptions and the EventsQueryClient + cancel() + // Output + // Last Event: 'testEvent' + // Last Event: 'testEvent' +} diff --git a/pkg/client/events_query/websocket/connection.go b/pkg/client/events/websocket/connection.go similarity index 94% rename from pkg/client/events_query/websocket/connection.go rename to pkg/client/events/websocket/connection.go index b9311bea3..5f34aba4e 100644 --- a/pkg/client/events_query/websocket/connection.go +++ b/pkg/client/events/websocket/connection.go @@ -18,7 +18,7 @@ type websocketConn struct { func (wsConn *websocketConn) Receive() ([]byte, error) { _, msg, err := wsConn.conn.ReadMessage() if err != nil { - return nil, ErrReceive.Wrapf("%s", err) + return nil, ErrEventsWebsocketReceive.Wrapf("%s", err) } return msg, nil } diff --git a/pkg/client/events_query/websocket/dialer.go b/pkg/client/events/websocket/dialer.go similarity index 100% rename from pkg/client/events_query/websocket/dialer.go rename to pkg/client/events/websocket/dialer.go diff --git a/pkg/client/events/websocket/errors.go b/pkg/client/events/websocket/errors.go new file mode 100644 index 000000000..85f8953fd --- /dev/null +++ b/pkg/client/events/websocket/errors.go @@ -0,0 +1,11 @@ +package websocket + +import ( + sdkerrors "cosmossdk.io/errors" +) + +var ( + codespace = "events_query_client_websocket_connection" + + ErrEventsWebsocketReceive = sdkerrors.Register(codespace, 1, "failed to receive event over websocket connection to pocket node") +) diff --git a/pkg/client/events/websocket/godoc.go b/pkg/client/events/websocket/godoc.go new file mode 100644 index 000000000..6ac7e3f04 --- /dev/null +++ b/pkg/client/events/websocket/godoc.go @@ -0,0 +1,3 @@ +// Package websocket provides a websocket client used to connect to a cosmos-sdk +// based chain node and subscribe to events via the EventsQueryClient. +package websocket diff --git a/pkg/client/events_query/errors.go b/pkg/client/events_query/errors.go deleted file mode 100644 index 48d60f0a7..000000000 --- a/pkg/client/events_query/errors.go +++ /dev/null @@ -1,11 +0,0 @@ -package eventsquery - -import errorsmod "cosmossdk.io/errors" - -var ( - ErrDial = errorsmod.Register(codespace, 1, "dialing for connection failed") - ErrConnClosed = errorsmod.Register(codespace, 2, "connection closed") - ErrSubscribe = errorsmod.Register(codespace, 3, "failed to subscribe to events") - - codespace = "events_query_client" -) diff --git a/pkg/client/events_query/websocket/errors.go b/pkg/client/events_query/websocket/errors.go deleted file mode 100644 index 3c70d1eec..000000000 --- a/pkg/client/events_query/websocket/errors.go +++ /dev/null @@ -1,8 +0,0 @@ -package websocket - -import errorsmod "cosmossdk.io/errors" - -var ( - ErrReceive = errorsmod.Register(codespace, 4, "failed to receive event") - codespace = "events_query_client_websocket_connection" -) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 8ed03ae7a..08d39c801 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -1,5 +1,6 @@ //go:generate mockgen -destination=../../testutil/mockclient/events_query_client_mock.go -package=mockclient . Dialer,Connection,EventsQueryClient //go:generate mockgen -destination=../../testutil/mockclient/block_client_mock.go -package=mockclient . Block,BlockClient +//go:generate mockgen -destination=../../testutil/mockclient/delegation_client_mock.go -package=mockclient . Redelegation,DelegationClient //go:generate mockgen -destination=../../testutil/mockclient/tx_client_mock.go -package=mockclient . TxContext,TxClient //go:generate mockgen -destination=../../testutil/mockclient/supplier_client_mock.go -package=mockclient . SupplierClient //go:generate mockgen -destination=../../testutil/mockclient/account_query_client_mock.go -package=mockclient . AccountQueryClient @@ -101,26 +102,6 @@ type TxContext interface { ) (*comettypes.ResultTx, error) } -// BlocksObservable is an observable which is notified with an either -// value which contains either an error or the event message bytes. -// -// TODO_HACK: The purpose of this type is to work around gomock's lack of -// support for generic types. For the same reason, this type cannot be an -// alias (i.e. EventsBytesObservable = observable.Observable[either.Either[[]byte]]). -type BlocksObservable observable.ReplayObservable[Block] - -// BlockClient is an interface which provides notifications about newly committed -// blocks as well as direct access to the latest block via some blockchain API. -type BlockClient interface { - // CommittedBlocksSequence returns an observable which emits newly committed blocks. - CommittedBlocksSequence(context.Context) BlocksObservable - // LatestBlock returns the latest block that has been committed. - LatestBlock(context.Context) Block - // Close unsubscribes all observers of the committed blocks sequence observable - // and closes the events query client. - Close() -} - // Block is an interface which abstracts the details of a block to its minimal // necessary components. type Block interface { @@ -128,6 +109,67 @@ type Block interface { Hash() []byte } +// Redelegation is an interface which wraps the EventRedelegation event +// emitted by the application module. +// See: proto/pocket/application/types/event.proto#EventRedelegatio +type Redelegation interface { + GetAppAddress() string + GetGatewayAddress() string +} + +// EventsObservable is a replay observable for events of some type T. +// NB: This cannot be an alias due to gomock's lack of support for generic types. +type EventsObservable[T any] observable.ReplayObservable[T] + +// EventsReplayClient is an interface which provides notifications about newly received +// events as well as direct access to the latest event via some blockchain API. +type EventsReplayClient[T any, R observable.ReplayObservable[T]] interface { + // EventsSequence returns an observable which emits new events. + EventsSequence(context.Context) R + // LastNEvents returns the latest N events that has been received. + LastNEvents(ctx context.Context, n int) []T + // Close unsubscribes all observers of the events sequence observable + // and closes the events query client. + Close() +} + +// BlockReplayObservable is a defined type which is a replay observable of type Block. +// NB: This cannot be an alias due to gomock's lack of support for generic types. +type BlockReplayObservable EventsObservable[Block] + +// BlockClient is an interface that wraps the EventsReplayClient interface +// specific for the EventsReplayClient[Block] implementation +type BlockClient interface { + // CommittedBlocksSequence returns a BlockObservable that emits the + // latest blocks that have been committed to the chain. + CommittedBlocksSequence(context.Context) BlockReplayObservable + // LastNBlocks returns the latest N blocks that have been committed to + // the chain. + LastNBlocks(context.Context, int) []Block + // Close unsubscribes all observers of the committed block sequence + // observable and closes the events query client. + Close() +} + +// RedelegationReplayObservable is a defined type which is a replay observable +// of type Redelegation. +// NB: This cannot be an alias due to gomock's lack of support for generic types. +type RedelegationReplayObservable EventsObservable[Redelegation] + +// DelegationClient is an interface that wraps the EventsReplayClient interface +// specific for the EventsReplayClient[Redelegation] implementation +type DelegationClient interface { + // RedelegationsSequence returns a Observable of Redelegations that + // emits the latest redelegation events that have occurred on chain. + RedelegationsSequence(context.Context) RedelegationReplayObservable + // LastNBlocks returns the latest N blocks that have been committed to + // the chain. + LastNRedelegations(context.Context, int) []Redelegation + // Close unsubscribes all observers of the committed block sequence + // observable and closes the events query client. + Close() +} + // EventsBytesObservable is an observable which is notified with an either // value which contains either an error or the event message bytes. // diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index 0317e39eb..4e5bc3204 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -33,6 +33,8 @@ const ( txWithSenderAddrQueryFmt = "tm.event='Tx' AND message.sender='%s'" ) +// TODO_TECHDEBT(@bryanchriswhite/@h5law): Refactor this to use the EventsReplayClient +// In order to simplify the logic of the TxClient var _ client.TxClient = (*txClient)(nil) // txClient orchestrates building, signing, broadcasting, and querying of @@ -197,7 +199,7 @@ func (tClient *txClient) SignAndBroadcast( } // Calculate timeout height - timeoutHeight := tClient.blockClient.LatestBlock(ctx). + timeoutHeight := tClient.blockClient.LastNBlocks(ctx, 1)[0]. Height() + tClient.commitTimeoutHeightOffset // TODO_TECHDEBT: this should be configurable @@ -505,7 +507,6 @@ func (tClient *txClient) txEventFromEventBz( _ context.Context, eitherEventBz either.Bytes, ) (eitherTxEvent either.Either[*TxEvent], skip bool) { - // Extract byte data from the given event. In case of failure, wrap the error // and denote the event for skipping. eventBz, err := eitherEventBz.ValueOrError() @@ -555,7 +556,6 @@ func (tClient *txClient) unmarshalTxEvent(eventBz []byte) (*TxEvent, error) { // transaction using the byte hash. If any error occurs during this process, // appropriate wrapped errors are returned for easier debugging. func (tClient *txClient) getTxTimeoutError(ctx context.Context, txHashHex string) error { - // Decode the provided hex hash into bytes. txHash, err := hex.DecodeString(txHashHex) if err != nil { diff --git a/pkg/client/tx/client_test.go b/pkg/client/tx/client_test.go index bb4f78f6b..c66f62559 100644 --- a/pkg/client/tx/client_test.go +++ b/pkg/client/tx/client_test.go @@ -13,12 +13,11 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" - "github.com/pokt-network/poktroll/testutil/mockclient" - "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/keyring" "github.com/pokt-network/poktroll/pkg/client/tx" "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/testutil/testclient" "github.com/pokt-network/poktroll/testutil/testclient/testblock" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" diff --git a/pkg/deps/config/suppliers.go b/pkg/deps/config/suppliers.go index 4ff15c4b3..e7fcf66f2 100644 --- a/pkg/deps/config/suppliers.go +++ b/pkg/deps/config/suppliers.go @@ -10,7 +10,7 @@ import ( "github.com/spf13/cobra" "github.com/pokt-network/poktroll/pkg/client/block" - eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query" querytypes "github.com/pokt-network/poktroll/pkg/client/query/types" txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" @@ -18,14 +18,6 @@ import ( "github.com/pokt-network/poktroll/pkg/polylog" ) -// hostToWebsocketURL converts the provided host into a websocket URL that can -// be used to subscribe to onchain events and query the chain via a client -// context or send transactions via a tx client context. -func hostToWebsocketURL(host string) string { - websocketURL := fmt.Sprintf("ws://%s/websocket", host) - return websocketURL -} - // SupplyConfig supplies a depinject config by calling each of the supplied // supplier functions in order and passing the result of each supplier to the // next supplier, chaining them together. @@ -69,7 +61,7 @@ func NewSupplyEventsQueryClientFn(queryHost string) SupplierFn { ) (depinject.Config, error) { // Convert the host to a websocket URL pocketNodeWebsocketURL := hostToWebsocketURL(queryHost) - eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketURL) + eventsQueryClient := events.NewEventsQueryClient(pocketNodeWebsocketURL) return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil } @@ -77,7 +69,7 @@ func NewSupplyEventsQueryClientFn(queryHost string) SupplierFn { // NewSupplyBlockClientFn returns a function which constructs a BlockClient // instance with the given hostname, which is converted into a websocket URL, -// to listen for block events on, and returns a new depinject.Config which +// to listen for block events on-chain, and returns a new depinject.Config which // is supplied with the given deps and the new BlockClient. func NewSupplyBlockClientFn(queryHost string) SupplierFn { return func( @@ -220,26 +212,6 @@ func NewSupplyApplicationQuerierFn() SupplierFn { } } -// NewSupplyRingCacheFn returns a function with constructs a RingCache instance -// with the required dependencies and returns a new depinject.Config which is -// supplied with the given deps and the new RingCache. -func NewSupplyRingCacheFn() SupplierFn { - return func( - _ context.Context, - deps depinject.Config, - _ *cobra.Command, - ) (depinject.Config, error) { - // Create the ring cache. - ringCache, err := rings.NewRingCache(deps) - if err != nil { - return nil, err - } - - // Supply the ring cache to the provided deps - return depinject.Configs(deps, depinject.Supply(ringCache)), nil - } -} - // NewSupplySessionQuerierFn returns a function which constructs a // SessionQuerier instance with the required dependencies and returns a new // instance with the required dependencies and returns a new depinject.Config @@ -281,3 +253,31 @@ func NewSupplySupplierQuerierFn() SupplierFn { return depinject.Configs(deps, depinject.Supply(supplierQuerier)), nil } } + +// NewSupplyRingCacheFn returns a function with constructs a RingCache instance +// with the required dependencies and returns a new depinject.Config which is +// supplied with the given deps and the new RingCache. +func NewSupplyRingCacheFn() SupplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + // Create the ring cache. + ringCache, err := rings.NewRingCache(deps) + if err != nil { + return nil, err + } + + // Supply the ring cache to the provided deps + return depinject.Configs(deps, depinject.Supply(ringCache)), nil + } +} + +// hostToWebsocketURL converts the provided host into a websocket URL that can +// be used to subscribe to onchain events and query the chain via a client +// context or send transactions via a tx client context. +func hostToWebsocketURL(host string) string { + websocketURL := fmt.Sprintf("ws://%s/websocket", host) + return websocketURL +} diff --git a/pkg/partials/payloads/jsonrpc.go b/pkg/partials/payloads/jsonrpc.go index 6d88ade65..284a6f40c 100644 --- a/pkg/partials/payloads/jsonrpc.go +++ b/pkg/partials/payloads/jsonrpc.go @@ -39,7 +39,7 @@ func (j PartialJSONPayload) ValidateBasic(ctx context.Context) error { return err } -// PartiallyUnmarshalJSONPayload receives a serialised payload and attempts to +// PartiallyUnmarshalJSONPayload receives a serialized payload and attempts to // unmarshal it into the PartialJSONPayload struct. If successful this struct // is returned, if however the struct does not contain all the required fields // an error is returned detailing what was missing. diff --git a/pkg/partials/payloads/rest.go b/pkg/partials/payloads/rest.go index 0c5090ee0..273139280 100644 --- a/pkg/partials/payloads/rest.go +++ b/pkg/partials/payloads/rest.go @@ -13,7 +13,7 @@ type PartialRESTPayload struct { Headers map[string]string `json:"headers"` } -// PartiallyUnmarshalRESTPayload receives a serialised payload and attempts to +// PartiallyUnmarshalRESTPayload receives a serialized payload and attempts to // unmarshal it into the PartialRESTPayload struct. If successful this struct // is returned, if however the struct does not contain all the required fields // the success return value is false and a nil payload is returned. diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 6443d32f5..1812bc5d7 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -59,14 +59,13 @@ submit claim and proof messages according to the protocol as sessions become eli for such operations.`, RunE: runRelayer, } - // Custom flags cmd.Flags().StringVar(&flagRelayMinerConfig, "config", "", "The path to the relayminer config file") // Cosmos flags cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") cmd.Flags(). - StringVar(&flagCosmosNodeURL, cosmosflags.FlagNode, omittedDefaultFlagValue, "Register the default Cosmos node flag, which is needed to initialise the Cosmos query and tx contexts correctly. It can be used to override the `QueryNodeUrl` and `NetworkNodeUrl` fields in the config file if specified.") + StringVar(&flagCosmosNodeURL, cosmosflags.FlagNode, omittedDefaultFlagValue, "Register the default Cosmos node flag, which is needed to initialize the Cosmos query and tx contexts correctly. It can be used to override the `QueryNodeUrl` and `NetworkNodeUrl` fields in the config file if specified.") return cmd } diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 6232804af..728b6a2c4 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -37,7 +37,7 @@ type relayerProxy struct { signingKeyName string keyring keyring.Keyring - // blocksClient is the client used to get the block at the latest height from the blockchain + // blockClient is the client used to get the block at the latest height from the blockchain // and be notified of new incoming blocks. It is used to update the current session data. blockClient client.BlockClient diff --git a/pkg/relayer/proxy/relay_verifier.go b/pkg/relayer/proxy/relay_verifier.go index ca4825f86..8fa075d9c 100644 --- a/pkg/relayer/proxy/relay_verifier.go +++ b/pkg/relayer/proxy/relay_verifier.go @@ -95,7 +95,7 @@ func (rp *relayerProxy) VerifyRelayRequest( }). Msg("verifying relay request session") - currentBlock := rp.blockClient.LatestBlock(ctx) + currentBlock := rp.blockClient.LastNBlocks(ctx, 1)[0] session, err := rp.sessionQuerier.GetSession(ctx, appAddress, service.Id, currentBlock.Height()) if err != nil { return err diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go index f0f3c59de..a7c1fbdc5 100644 --- a/pkg/relayer/session/claim.go +++ b/pkg/relayer/session/claim.go @@ -28,8 +28,7 @@ func (rs *relayerSessionsManager) createClaims(ctx context.Context) observable.O rs.mapWaitForEarliestCreateClaimHeight, ) - failedCreateClaimSessionsObs, failedCreateClaimSessionsPublishCh := - channel.NewObservable[relayer.SessionTree]() + failedCreateClaimSessionsObs, failedCreateClaimSessionsPublishCh := channel.NewObservable[relayer.SessionTree]() // Map sessionsWithOpenClaimWindowObs to a new observable of an either type, // populated with the session or an error, which is notified after the session @@ -91,8 +90,7 @@ func (rs *relayerSessionsManager) waitForEarliestCreateClaimHeight( Str("hash", fmt.Sprintf("%x", createClaimWindowStartBlock.Hash())). Msg("received global earliest claim submission height") - earliestCreateClaimHeight := - protocol.GetEarliestCreateClaimHeight(ctx, createClaimWindowStartBlock) + earliestCreateClaimHeight := protocol.GetEarliestCreateClaimHeight(ctx, createClaimWindowStartBlock) logger.Info(). Int64("earliest_create_claim_height", earliestCreateClaimHeight). @@ -120,7 +118,7 @@ func (rs *relayerSessionsManager) newMapClaimSessionFn( return either.Error[relayer.SessionTree](err), false } - latestBlock := rs.blockClient.LatestBlock(ctx) + latestBlock := rs.blockClient.LastNBlocks(ctx, 1)[0] logger.Info(). Int64("current_block", latestBlock.Height()+1). Msg("submitting claim") diff --git a/pkg/relayer/session/proof.go b/pkg/relayer/session/proof.go index 91cdd25b5..135e76f64 100644 --- a/pkg/relayer/session/proof.go +++ b/pkg/relayer/session/proof.go @@ -94,7 +94,7 @@ func (rs *relayerSessionsManager) newMapProveSessionFn( // TODO_BLOCKER: The block that'll be used as a source of entropy for which // branch(es) to prove should be deterministic and use on-chain governance params // rather than latest. - latestBlock := rs.blockClient.LatestBlock(ctx) + latestBlock := rs.blockClient.LastNBlocks(ctx, 1)[0] proof, err := session.ProveClosest(latestBlock.Hash()) if err != nil { return either.Error[relayer.SessionTree](err), false diff --git a/pkg/signer/ring_signer.go b/pkg/signer/ring_signer.go index bb6cf549c..b2c46a5ce 100644 --- a/pkg/signer/ring_signer.go +++ b/pkg/signer/ring_signer.go @@ -17,13 +17,14 @@ type RingSigner struct { privKey ringtypes.Scalar } -// NewRingSigner creates a new RingSigner instance with the ring and private key provided +// NewRingSigner creates a new RingSigner instance with the ring and private +// key provided func NewRingSigner(ring *ring.Ring, privKey ringtypes.Scalar) *RingSigner { return &RingSigner{ring: ring, privKey: privKey} } // Sign uses the ring and private key to sign the message provided and returns the -// serialised ring signature that can be deserialised and verified by the verifier +// serialized ring signature that can be deserialized and verified by the verifier func (r *RingSigner) Sign(msg [32]byte) ([]byte, error) { ringSig, err := r.ring.Sign(msg, r.privKey) if err != nil { diff --git a/proto/pocket/application/event.proto b/proto/pocket/application/event.proto new file mode 100644 index 000000000..624b119d9 --- /dev/null +++ b/proto/pocket/application/event.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; +package pocket.application; + +option go_package = "github.com/pokt-network/poktroll/x/application/types"; + +import "cosmos_proto/cosmos.proto"; + +// EventRedelegation is an event emitted whenever an application changes its +// delegatee gateways on chain. This is in response to both a DelegateToGateway +// and UndelegateFromGateway message. +message EventRedelegation { + string app_address = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"]; // The Bech32 address of the application, using cosmos' ScalarDescriptor to ensure deterministic encoding + string gateway_address = 2 [(cosmos_proto.scalar) = "cosmos.AddressString"]; // The Bech32 address of the gateway the application has changed their delegation of, using cosmos' ScalarDescriptor to ensure deterministic encoding +} diff --git a/proto/pocket/service/relay.proto b/proto/pocket/service/relay.proto index 85cae4f6c..10a4c9ea5 100644 --- a/proto/pocket/service/relay.proto +++ b/proto/pocket/service/relay.proto @@ -17,7 +17,7 @@ message Relay { // RelayRequestMetadata contains the metadata for a RelayRequest. message RelayRequestMetadata { session.SessionHeader session_header = 1; // Session header associated with the relay. - // The request signature is a serialised ring signature that may have been + // The request signature is a serialized ring signature that may have been // by either the application itself or one of the gateways that the // application has delegated to. The signature is made using the ring of the // application in both cases. @@ -27,7 +27,7 @@ message RelayRequestMetadata { // RelayRequest holds the request details for a relay. message RelayRequest { RelayRequestMetadata meta = 1; - // payload is the serialised payload for the request. + // payload is the serialized payload for the request. // The payload is passed directly to the service and as such can be any // format that the service supports: JSON-RPC, REST, gRPC, etc. bytes payload = 2; @@ -36,7 +36,7 @@ message RelayRequest { // RelayResponse contains the response details for a RelayRequest. message RelayResponse { RelayResponseMetadata meta = 1; - // payload is the serialised payload for the response. + // payload is the serialized payload for the response. // The payload is passed directly from the service and as such can be any // format the the service responds with: JSON-RPC, REST, gRPC, etc. bytes payload = 2; diff --git a/testutil/keeper/application.go b/testutil/keeper/application.go index 7a92cd2f1..e9248b622 100644 --- a/testutil/keeper/application.go +++ b/testutil/keeper/application.go @@ -21,12 +21,14 @@ import ( gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" ) -// StakedGatewayMap is used to mock whether a gateway is staked or not for use +// stakedGatewayMap is used to mock whether a gateway is staked or not for use // in the application's mocked gateway keeper. This enables the tester to // control whether a gateway is "staked" or not and whether it can be delegated to // WARNING: Using this map may cause issues if running multiple tests in parallel -var StakedGatewayMap = make(map[string]struct{}) +var stakedGatewayMap = make(map[string]struct{}) +// ApplicationKeeper returns a mocked application keeper and context for testing +// it mocks the chain having staked gateways via the use of the stakedGatewayMap func ApplicationKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) { storeKey := sdk.NewKVStoreKey(types.StoreKey) memStoreKey := storetypes.NewMemoryStoreKey(types.MemStoreKey) @@ -51,7 +53,7 @@ func ApplicationKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) { mockGatewayKeeper := mocks.NewMockGatewayKeeper(ctrl) mockGatewayKeeper.EXPECT().GetGateway(gomock.Any(), gomock.Any()).DoAndReturn( func(_ sdk.Context, addr string) (gatewaytypes.Gateway, bool) { - if _, ok := StakedGatewayMap[addr]; !ok { + if _, ok := stakedGatewayMap[addr]; !ok { return gatewaytypes.Gateway{}, false } stake := sdk.NewCoin("upokt", sdk.NewInt(10000)) @@ -85,3 +87,21 @@ func ApplicationKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) { return k, ctx } + +// AddGatewayToStakedGatewayMap adds the given gateway address to the staked +// gateway map for use in the application's mocked gateway keeper and ensures +// that it is removed from the map when the test is complete +func AddGatewayToStakedGatewayMap(t *testing.T, gatewayAddr string) { + t.Helper() + stakedGatewayMap[gatewayAddr] = struct{}{} + t.Cleanup(func() { + delete(stakedGatewayMap, gatewayAddr) + }) +} + +// RemoveGatewayFromStakedGatewayMap removes the given gateway address from the +// staked gateway map for use in the application's mocked gateway keeper +func RemoveGatewayFromStakedGatewayMap(t *testing.T, gatewayAddr string) { + t.Helper() + delete(stakedGatewayMap, gatewayAddr) +} diff --git a/testutil/network/network.go b/testutil/network/network.go index 11a5f18ab..9ded9e609 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -24,6 +24,7 @@ import ( "github.com/pokt-network/poktroll/app" "github.com/pokt-network/poktroll/testutil/sample" + appcli "github.com/pokt-network/poktroll/x/application/client/cli" apptypes "github.com/pokt-network/poktroll/x/application/types" gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" @@ -221,7 +222,23 @@ func SupplierModuleGenesisStateWithAddresses(t *testing.T, addresses []string) * return state } -// Initialize an Account by sending it some funds from the validator in the network to the address provided +// GatewayModuleGenesisStateWithAddresses generates a GenesisState object with +// a gateway list full of gateways with the given addresses. +func GatewayModuleGenesisStateWithAddresses(t *testing.T, addresses []string) *gatewaytypes.GenesisState { + t.Helper() + state := gatewaytypes.DefaultGenesis() + for _, addr := range addresses { + gateway := gatewaytypes.Gateway{ + Address: addr, + Stake: &sdk.Coin{Denom: "upokt", Amount: sdk.NewInt(10000)}, + } + state.GatewayList = append(state.GatewayList, gateway) + } + return state +} + +// InitAccount initializes an Account by sending it some funds from the validator +// in the network to the address provided func InitAccount(t *testing.T, net *Network, addr sdk.AccAddress) { t.Helper() val := net.Validators[0] @@ -235,13 +252,14 @@ func InitAccount(t *testing.T, net *Network, addr sdk.AccAddress) { amount := sdk.NewCoins(sdk.NewCoin("stake", sdkmath.NewInt(200))) responseRaw, err := clitestutil.MsgSendExec(ctx, val.Address, addr, amount, args...) require.NoError(t, err) - var responseJson map[string]interface{} - err = json.Unmarshal(responseRaw.Bytes(), &responseJson) + var responseJSON map[string]interface{} + err = json.Unmarshal(responseRaw.Bytes(), &responseJSON) require.NoError(t, err) - require.Equal(t, float64(0), responseJson["code"], "code is not 0 in the response: %v", responseJson) + require.Equal(t, float64(0), responseJSON["code"], "code is not 0 in the response: %v", responseJSON) } -// Initialize an Account by sending it some funds from the validator in the network to the address provided +// InitAccountWithSequence initializes an Account by sending it some funds from +// the validator in the network to the address provided func InitAccountWithSequence( t *testing.T, net *Network, @@ -265,8 +283,60 @@ func InitAccountWithSequence( amount := sdk.NewCoins(sdk.NewCoin("stake", sdkmath.NewInt(200))) responseRaw, err := clitestutil.MsgSendExec(ctx, val.Address, addr, amount, args...) require.NoError(t, err) - var responseJson map[string]interface{} - err = json.Unmarshal(responseRaw.Bytes(), &responseJson) + var responseJSON map[string]interface{} + err = json.Unmarshal(responseRaw.Bytes(), &responseJSON) + require.NoError(t, err) + require.Equal(t, float64(0), responseJSON["code"], "code is not 0 in the response: %v", responseJSON) +} + +// DelegateAppToGateway delegates the provided application to the provided gateway +func DelegateAppToGateway( + t *testing.T, + net *Network, + appAddr string, + gatewayAddr string, +) { + t.Helper() + val := net.Validators[0] + ctx := val.ClientCtx + args := []string{ + gatewayAddr, + fmt.Sprintf("--%s=%s", flags.FlagFrom, appAddr), + fmt.Sprintf("--%s=true", flags.FlagSkipConfirmation), + fmt.Sprintf("--%s=%s", flags.FlagBroadcastMode, flags.BroadcastSync), + fmt.Sprintf("--%s=%s", flags.FlagFees, sdk.NewCoins(sdk.NewCoin(net.Config.BondDenom, sdkmath.NewInt(10))).String()), + } + responseRaw, err := clitestutil.ExecTestCLICmd(ctx, appcli.CmdDelegateToGateway(), args) + require.NoError(t, err) + var resp sdk.TxResponse + require.NoError(t, net.Config.Codec.UnmarshalJSON(responseRaw.Bytes(), &resp)) + require.NotNil(t, resp) + require.NotNil(t, resp.TxHash) + require.Equal(t, uint32(0), resp.Code) +} + +// UndelegateAppFromGateway undelegates the provided application from the provided gateway +func UndelegateAppFromGateway( + t *testing.T, + net *Network, + appAddr string, + gatewayAddr string, +) { + t.Helper() + val := net.Validators[0] + ctx := val.ClientCtx + args := []string{ + gatewayAddr, + fmt.Sprintf("--%s=%s", flags.FlagFrom, appAddr), + fmt.Sprintf("--%s=true", flags.FlagSkipConfirmation), + fmt.Sprintf("--%s=%s", flags.FlagBroadcastMode, flags.BroadcastSync), + fmt.Sprintf("--%s=%s", flags.FlagFees, sdk.NewCoins(sdk.NewCoin(net.Config.BondDenom, sdkmath.NewInt(10))).String()), + } + responseRaw, err := clitestutil.ExecTestCLICmd(ctx, appcli.CmdUndelegateFromGateway(), args) require.NoError(t, err) - require.Equal(t, float64(0), responseJson["code"], "code is not 0 in the response: %v", responseJson) + var resp sdk.TxResponse + require.NoError(t, net.Config.Codec.UnmarshalJSON(responseRaw.Bytes(), &resp)) + require.NotNil(t, resp) + require.NotNil(t, resp.TxHash) + require.Equal(t, uint32(0), resp.Code) } diff --git a/testutil/testclient/testblock/client.go b/testutil/testclient/testblock/client.go index fc5610963..9be028441 100644 --- a/testutil/testclient/testblock/client.go +++ b/testutil/testclient/testblock/client.go @@ -8,12 +8,11 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" - "github.com/pokt-network/poktroll/pkg/observable" - "github.com/pokt-network/poktroll/testutil/mockclient" - "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/testutil/testclient" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" ) @@ -35,15 +34,15 @@ func NewLocalnetClient(ctx context.Context, t *testing.T) client.BlockClient { // NewAnyTimesCommittedBlocksSequenceBlockClient creates a new mock BlockClient. // This mock BlockClient will expect any number of calls to CommittedBlocksSequence, -// and when that call is made, it returns the given BlocksObservable. +// and when that call is made, it returns the given EventsObservable[Block]. func NewAnyTimesCommittedBlocksSequenceBlockClient( t *testing.T, blocksObs observable.Observable[client.Block], ) *mockclient.MockBlockClient { t.Helper() - // Create a mock for the block client which expects the LatestBlock method to be called any number of times. - blockClientMock := NewAnyTimeLatestBlockBlockClient(t, nil, 0) + // Create a mock for the block client which expects the LastNBlocks method to be called any number of times. + blockClientMock := NewAnyTimeLastNBlocksBlockClient(t, nil, 0) // Set up the mock expectation for the CommittedBlocksSequence method. When // the method is called, it returns a new replay observable that publishes @@ -67,15 +66,15 @@ func NewOneTimeCommittedBlocksSequenceBlockClient( ) *mockclient.MockBlockClient { t.Helper() - // Create a mock for the block client which expects the LatestBlock method to be called any number of times. - blockClientMock := NewAnyTimeLatestBlockBlockClient(t, nil, 0) + // Create a mock for the block client which expects the LastNBlocks method to be called any number of times. + blockClientMock := NewAnyTimeLastNBlocksBlockClient(t, nil, 0) // Set up the mock expectation for the CommittedBlocksSequence method. When // the method is called, it returns a new replay observable that publishes // blocks sent on the given blocksPublishCh. blockClientMock.EXPECT().CommittedBlocksSequence( gomock.AssignableToTypeOf(context.Background()), - ).DoAndReturn(func(ctx context.Context) client.BlocksObservable { + ).DoAndReturn(func(ctx context.Context) client.BlockReplayObservable { // Create a new replay observable with a replay buffer size of 1. Blocks // are published to this observable via the provided blocksPublishCh. withPublisherOpt := channel.WithPublisher(blocksPublishCh) @@ -88,10 +87,10 @@ func NewOneTimeCommittedBlocksSequenceBlockClient( return blockClientMock } -// NewAnyTimeLatestBlockBlockClient creates a mock BlockClient that expects -// calls to the LatestBlock method any number of times. When the LatestBlock +// NewAnyTimeLastNBlocksBlockClient creates a mock BlockClient that expects +// calls to the LastNBlocks method any number of times. When the LastNBlocks // method is called, it returns a mock Block with the provided hash and height. -func NewAnyTimeLatestBlockBlockClient( +func NewAnyTimeLastNBlocksBlockClient( t *testing.T, hash []byte, height int64, @@ -101,10 +100,10 @@ func NewAnyTimeLatestBlockBlockClient( // Create a mock block that returns the provided hash and height. blockMock := NewAnyTimesBlock(t, hash, height) - // Create a mock block client that expects calls to LatestBlock method and + // Create a mock block client that expects calls to LastNBlocks method and // returns the mock block. blockClientMock := mockclient.NewMockBlockClient(ctrl) - blockClientMock.EXPECT().LatestBlock(gomock.Any()).Return(blockMock).AnyTimes() + blockClientMock.EXPECT().LastNBlocks(gomock.Any(), gomock.Any()).Return([]client.Block{blockMock}).AnyTimes() return blockClientMock } diff --git a/testutil/testclient/testdelegation/client.go b/testutil/testclient/testdelegation/client.go new file mode 100644 index 000000000..c70afc4e9 --- /dev/null +++ b/testutil/testclient/testdelegation/client.go @@ -0,0 +1,135 @@ +package testdelegation + +import ( + "context" + "testing" + + "cosmossdk.io/depinject" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/delegation" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/testutil/mockclient" + "github.com/pokt-network/poktroll/testutil/testclient" + "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" +) + +// NewLocalnetClient creates and returns a new DelegationClient that's configured for +// use with the localnet sequencer. +func NewLocalnetClient(ctx context.Context, t *testing.T) client.DelegationClient { + t.Helper() + + queryClient := testeventsquery.NewLocalnetClient(t) + require.NotNil(t, queryClient) + + deps := depinject.Supply(queryClient) + dClient, err := delegation.NewDelegationClient(ctx, deps, testclient.CometLocalWebsocketURL) + require.NoError(t, err) + + return dClient +} + +// NewAnyTimesRedelegationsSequence creates a new mock DelegationClient. +// This mock DelegationClient will expect any number of calls to RedelegationsSequence, +// and when that call is made, it returns the given EventsObservable[Redelegation]. +func NewAnyTimesRedelegationsSequence( + t *testing.T, + redelegationObs observable.Observable[client.Redelegation], +) *mockclient.MockDelegationClient { + t.Helper() + + // Create a mock for the delegation client which expects the + // LastNRedelegations method to be called any number of times. + delegationClientMock := NewAnyTimeLastNRedelegationsClient(t, "") + + // Set up the mock expectation for the RedelegationsSequence method. When + // the method is called, it returns a new replay observable that publishes + // redelegation events sent on the given redelegationObs. + delegationClientMock.EXPECT(). + RedelegationsSequence( + gomock.AssignableToTypeOf(context.Background()), + ). + Return(redelegationObs). + AnyTimes() + + return delegationClientMock +} + +// NewOneTimeRedelegationsSequenceDelegationClient creates a new mock +// DelegationClient. This mock DelegationClient will expect a call to +// RedelegationsSequence, and when that call is made, it returns a new +// RedelegationReplayObservable that publishes Redelegation events sent on +// the given redelegationPublishCh. +// redelegationPublishCh is the channel the caller can use to publish +// Redelegation events to the observable. +func NewOneTimeRedelegationsSequenceDelegationClient( + t *testing.T, + redelegationPublishCh chan client.Redelegation, +) *mockclient.MockDelegationClient { + t.Helper() + + // Create a mock for the delegation client which expects the + // LastNRedelegations method to be called any number of times. + delegationClientMock := NewAnyTimeLastNRedelegationsClient(t, "") + + // Set up the mock expectation for the RedelegationsSequence method. When + // the method is called, it returns a new replay observable that publishes + // delegation changes sent on the given redelegationPublishCh. + delegationClientMock.EXPECT().RedelegationsSequence( + gomock.AssignableToTypeOf(context.Background()), + ).DoAndReturn(func(ctx context.Context) client.RedelegationReplayObservable { + // Create a new replay observable with a replay buffer size of 1. + // Redelegation events are published to this observable via the + // provided redelegationPublishCh. + withPublisherOpt := channel.WithPublisher(redelegationPublishCh) + obs, _ := channel.NewReplayObservable[client.Redelegation]( + ctx, 1, withPublisherOpt, + ) + return obs + }) + + return delegationClientMock +} + +// NewAnyTimeLastNRedelegationsClient creates a mock DelegationClient that +// expects calls to the LastNRedelegations method any number of times. When +// the LastNRedelegations method is called, it returns a mock Redelegation +// with the provided appAddress. +func NewAnyTimeLastNRedelegationsClient( + t *testing.T, + appAddress string, +) *mockclient.MockDelegationClient { + t.Helper() + ctrl := gomock.NewController(t) + + // Create a mock redelegation that returns the provided appAddress + redelegation := NewAnyTimesRedelegation(t, appAddress) + // Create a mock delegation client that expects calls to + // LastNRedelegations method and returns the mock redelegation. + delegationClientMock := mockclient.NewMockDelegationClient(ctrl) + delegationClientMock.EXPECT(). + LastNRedelegations(gomock.Any(), gomock.Any()). + Return([]client.Redelegation{redelegation}).AnyTimes() + + return delegationClientMock +} + +// NewAnyTimesRedelegation creates a mock Redelegation that expects calls +// to the AppAddress method any number of times. When the method is called, it +// returns the provided app address. +func NewAnyTimesRedelegation( + t *testing.T, + appAddress string, +) *mockclient.MockRedelegation { + t.Helper() + ctrl := gomock.NewController(t) + + // Create a mock redelegation that returns the provided address AnyTimes. + redelegation := mockclient.NewMockRedelegation(ctrl) + redelegation.EXPECT().GetAppAddress().Return(appAddress).AnyTimes() + + return redelegation +} diff --git a/testutil/testclient/testdelegation/godoc.go b/testutil/testclient/testdelegation/godoc.go new file mode 100644 index 000000000..4c7e7387d --- /dev/null +++ b/testutil/testclient/testdelegation/godoc.go @@ -0,0 +1,5 @@ +// Package testdelegation provides helper functions for constructing real (e.g. +// localnet) and mock DelegationClient objects with pre-configured and/or +// parameterised call arguments, return value(s), and/or expectations thereof. +// Intended for use in tests. +package testdelegation diff --git a/testutil/testclient/testeventsquery/client.go b/testutil/testclient/testeventsquery/client.go index 3ce867fde..cec0a68ff 100644 --- a/testutil/testclient/testeventsquery/client.go +++ b/testutil/testclient/testeventsquery/client.go @@ -10,12 +10,11 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" - "github.com/pokt-network/poktroll/testutil/mockclient" - "github.com/pokt-network/poktroll/pkg/client" - eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/either" "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/testutil/testclient" ) @@ -24,7 +23,7 @@ import ( func NewLocalnetClient(t *testing.T, opts ...client.EventsQueryClientOption) client.EventsQueryClient { t.Helper() - return eventsquery.NewEventsQueryClient(testclient.CometLocalWebsocketURL, opts...) + return events.NewEventsQueryClient(testclient.CometLocalWebsocketURL, opts...) } // NewOneTimeEventsQuery creates a mock of the EventsQueryClient which expects diff --git a/testutil/testproxy/relayerproxy.go b/testutil/testproxy/relayerproxy.go index d34cbb2ba..9961584f0 100644 --- a/testutil/testproxy/relayerproxy.go +++ b/testutil/testproxy/relayerproxy.go @@ -45,8 +45,10 @@ type TestBehavior struct { proxiedServices map[string]*http.Server } -const blockHeight = 1 -const blockHash = "1B1051B7BF236FEA13EFA65B6BE678514FA5B6EA0AE9A7A4B68D45F95E4F18E0" +const ( + blockHeight = 1 + blockHash = "1B1051B7BF236FEA13EFA65B6BE678514FA5B6EA0AE9A7A4B68D45F95E4F18E0" +) // NewRelayerProxyTestBehavior creates a TestBehavior with the provided set of // behavior function that are used to instrument the tested subject's dependencies @@ -80,7 +82,7 @@ func WithRelayerProxyDependencies(keyName string) func(*TestBehavior) { sessionQueryClient := testqueryclients.NewTestSessionQueryClient(test.t) supplierQueryClient := testqueryclients.NewTestSupplierQueryClient(test.t) - blockClient := testblock.NewAnyTimeLatestBlockBlockClient(test.t, []byte{}, 1) + blockClient := testblock.NewAnyTimeLastNBlocksBlockClient(test.t, []byte{}, 1) keyring, _ := testkeyring.NewTestKeyringWithKey(test.t, keyName) ringDeps := depinject.Supply(accountQueryClient, applicationQueryClient) @@ -246,8 +248,8 @@ func GetRelayResponseError(t *testing.T, res *http.Response) (errCode int32, err return payload.Error.Code, payload.Error.Message } -// GetRelayResponseResult crafts a ring signer for test purposes and uses it to -// sign the relay request +// GetApplicationRingSignature crafts a ring signer for test purposes and uses +// it to sign the relay request func GetApplicationRingSignature( t *testing.T, req *servicetypes.RelayRequest, diff --git a/x/application/keeper/msg_server_delegate_to_gateway.go b/x/application/keeper/msg_server_delegate_to_gateway.go index a50523905..f46275545 100644 --- a/x/application/keeper/msg_server_delegate_to_gateway.go +++ b/x/application/keeper/msg_server_delegate_to_gateway.go @@ -3,10 +3,10 @@ package keeper import ( "context" - "github.com/pokt-network/poktroll/x/application/types" - sdkerrors "cosmossdk.io/errors" sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/pokt-network/poktroll/x/application/types" ) func (k msgServer) DelegateToGateway(goCtx context.Context, msg *types.MsgDelegateToGateway) (*types.MsgDelegateToGatewayResponse, error) { @@ -57,5 +57,11 @@ func (k msgServer) DelegateToGateway(goCtx context.Context, msg *types.MsgDelega k.SetApplication(ctx, app) logger.Info("Successfully delegated application to gateway for app: %+v", app) + // Emit the application redelegation change event + if err := ctx.EventManager().EmitTypedEvent(msg.NewRedelegationEvent()); err != nil { + logger.Error("Failed to emit application redelegation event: %v", err) + return nil, err + } + return &types.MsgDelegateToGatewayResponse{}, nil } diff --git a/x/application/keeper/msg_server_delegate_to_gateway_test.go b/x/application/keeper/msg_server_delegate_to_gateway_test.go index 80e0b586a..1e293aa04 100644 --- a/x/application/keeper/msg_server_delegate_to_gateway_test.go +++ b/x/application/keeper/msg_server_delegate_to_gateway_test.go @@ -1,6 +1,7 @@ package keeper_test import ( + "fmt" "testing" sdk "github.com/cosmos/cosmos-sdk/types" @@ -23,12 +24,8 @@ func TestMsgServer_DelegateToGateway_SuccessfullyDelegate(t *testing.T) { gatewayAddr1 := sample.AccAddress() gatewayAddr2 := sample.AccAddress() // Mock the gateway being staked via the staked gateway map - keepertest.StakedGatewayMap[gatewayAddr1] = struct{}{} - keepertest.StakedGatewayMap[gatewayAddr2] = struct{}{} - t.Cleanup(func() { - delete(keepertest.StakedGatewayMap, gatewayAddr1) - delete(keepertest.StakedGatewayMap, gatewayAddr2) - }) + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr1) + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr2) // Prepare the application stakeMsg := &types.MsgStakeApplication{ @@ -56,6 +53,11 @@ func TestMsgServer_DelegateToGateway_SuccessfullyDelegate(t *testing.T) { // Delegate the application to the gateway _, err = srv.DelegateToGateway(wctx, delegateMsg) require.NoError(t, err) + events := ctx.EventManager().Events() + require.Equal(t, 1, len(events)) + require.Equal(t, "pocket.application.EventRedelegation", events[0].Type) + require.Equal(t, "app_address", events[0].Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), events[0].Attributes[0].Value) // Verify that the application exists foundApp, isAppFound := k.GetApplication(ctx, appAddr) @@ -73,6 +75,11 @@ func TestMsgServer_DelegateToGateway_SuccessfullyDelegate(t *testing.T) { // Delegate the application to the second gateway _, err = srv.DelegateToGateway(wctx, delegateMsg2) require.NoError(t, err) + events = ctx.EventManager().Events() + require.Equal(t, 2, len(events)) + require.Equal(t, "pocket.application.EventRedelegation", events[1].Type) + require.Equal(t, "app_address", events[1].Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), events[1].Attributes[0].Value) foundApp, isAppFound = k.GetApplication(ctx, appAddr) require.True(t, isAppFound) require.Equal(t, 2, len(foundApp.DelegateeGatewayAddresses)) @@ -89,10 +96,7 @@ func TestMsgServer_DelegateToGateway_FailDuplicate(t *testing.T) { appAddr := sample.AccAddress() gatewayAddr := sample.AccAddress() // Mock the gateway being staked via the staked gateway map - keepertest.StakedGatewayMap[gatewayAddr] = struct{}{} - t.Cleanup(func() { - delete(keepertest.StakedGatewayMap, gatewayAddr) - }) + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr) // Prepare the application stakeMsg := &types.MsgStakeApplication{ @@ -120,6 +124,11 @@ func TestMsgServer_DelegateToGateway_FailDuplicate(t *testing.T) { // Delegate the application to the gateway _, err = srv.DelegateToGateway(wctx, delegateMsg) require.NoError(t, err) + events := ctx.EventManager().Events() + require.Equal(t, 1, len(events)) + require.Equal(t, "pocket.application.EventRedelegation", events[0].Type) + require.Equal(t, "app_address", events[0].Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), events[0].Attributes[0].Value) // Verify that the application exists foundApp, isAppFound := k.GetApplication(ctx, appAddr) @@ -137,6 +146,8 @@ func TestMsgServer_DelegateToGateway_FailDuplicate(t *testing.T) { // Attempt to delegate the application to the gateway again _, err = srv.DelegateToGateway(wctx, delegateMsg2) require.ErrorIs(t, err, types.ErrAppAlreadyDelegated) + events = ctx.EventManager().Events() + require.Equal(t, 1, len(events)) foundApp, isAppFound = k.GetApplication(ctx, appAddr) require.True(t, isAppFound) require.Equal(t, 1, len(foundApp.DelegateeGatewayAddresses)) @@ -192,10 +203,7 @@ func TestMsgServer_DelegateToGateway_FailMaxReached(t *testing.T) { appAddr := sample.AccAddress() gatewayAddr := sample.AccAddress() // Mock the gateway being staked via the staked gateway map - keepertest.StakedGatewayMap[gatewayAddr] = struct{}{} - t.Cleanup(func() { - delete(keepertest.StakedGatewayMap, gatewayAddr) - }) + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr) // Prepare the application stakeMsg := &types.MsgStakeApplication{ @@ -226,10 +234,7 @@ func TestMsgServer_DelegateToGateway_FailMaxReached(t *testing.T) { // Prepare the delegation message gatewayAddr := sample.AccAddress() // Mock the gateway being staked via the staked gateway map - keepertest.StakedGatewayMap[gatewayAddr] = struct{}{} - t.Cleanup(func() { - delete(keepertest.StakedGatewayMap, gatewayAddr) - }) + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr) delegateMsg := &types.MsgDelegateToGateway{ AppAddress: appAddr, GatewayAddress: gatewayAddr, @@ -242,10 +247,19 @@ func TestMsgServer_DelegateToGateway_FailMaxReached(t *testing.T) { require.True(t, isAppFound) require.Equal(t, int(i+1), len(foundApp.DelegateeGatewayAddresses)) } + events := ctx.EventManager().Events() + require.Equal(t, int(maxDelegatedParam), len(events)) + for _, event := range events { + require.Equal(t, "pocket.application.EventRedelegation", event.Type) + require.Equal(t, "app_address", event.Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), event.Attributes[0].Value) + } // Attempt to delegate the application when the max is already reached _, err = srv.DelegateToGateway(wctx, delegateMsg) require.ErrorIs(t, err, types.ErrAppMaxDelegatedGateways) + events = ctx.EventManager().Events() + require.Equal(t, int(maxDelegatedParam), len(events)) foundApp, isAppFound := k.GetApplication(ctx, appAddr) require.True(t, isAppFound) require.Equal(t, maxDelegatedParam, int64(len(foundApp.DelegateeGatewayAddresses))) diff --git a/x/application/keeper/msg_server_undelegate_from_gateway.go b/x/application/keeper/msg_server_undelegate_from_gateway.go index a1239d7ef..7c38d5725 100644 --- a/x/application/keeper/msg_server_undelegate_from_gateway.go +++ b/x/application/keeper/msg_server_undelegate_from_gateway.go @@ -9,7 +9,10 @@ import ( "github.com/pokt-network/poktroll/x/application/types" ) -func (k msgServer) UndelegateFromGateway(goCtx context.Context, msg *types.MsgUndelegateFromGateway) (*types.MsgUndelegateFromGatewayResponse, error) { +func (k msgServer) UndelegateFromGateway( + goCtx context.Context, + msg *types.MsgUndelegateFromGateway, +) (*types.MsgUndelegateFromGatewayResponse, error) { ctx := sdk.UnwrapSDKContext(goCtx) logger := k.Logger(ctx).With("method", "UndelegateFromGateway") @@ -47,5 +50,11 @@ func (k msgServer) UndelegateFromGateway(goCtx context.Context, msg *types.MsgUn k.SetApplication(ctx, app) logger.Info("Successfully undelegated application from gateway for app: %+v", app) + // Emit the application redelegation event + if err := ctx.EventManager().EmitTypedEvent(msg.NewRedelegationEvent()); err != nil { + logger.Error("Failed to emit application redelegation event: %v", err) + return nil, err + } + return &types.MsgUndelegateFromGatewayResponse{}, nil } diff --git a/x/application/keeper/msg_server_undelegate_from_gateway_test.go b/x/application/keeper/msg_server_undelegate_from_gateway_test.go index 7a3b8283d..0dfa63546 100644 --- a/x/application/keeper/msg_server_undelegate_from_gateway_test.go +++ b/x/application/keeper/msg_server_undelegate_from_gateway_test.go @@ -1,6 +1,7 @@ package keeper_test import ( + "fmt" "testing" sdk "github.com/cosmos/cosmos-sdk/types" @@ -20,18 +21,14 @@ func TestMsgServer_UndelegateFromGateway_SuccessfullyUndelegate(t *testing.T) { // Generate an address for the application and gateways appAddr := sample.AccAddress() - gatewayAddresses := make([]string, int(k.GetParams(ctx).MaxDelegatedGateways)) + maxDelegatedGateways := k.GetParams(ctx).MaxDelegatedGateways + gatewayAddresses := make([]string, int(maxDelegatedGateways)) for i := 0; i < len(gatewayAddresses); i++ { gatewayAddr := sample.AccAddress() // Mock the gateway being staked via the staked gateway map - keepertest.StakedGatewayMap[gatewayAddr] = struct{}{} + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr) gatewayAddresses[i] = gatewayAddr } - t.Cleanup(func() { - for _, gatewayAddr := range gatewayAddresses { - delete(keepertest.StakedGatewayMap, gatewayAddr) - } - }) // Prepare the application stakeMsg := &types.MsgStakeApplication{ @@ -60,9 +57,15 @@ func TestMsgServer_UndelegateFromGateway_SuccessfullyUndelegate(t *testing.T) { _, err = srv.DelegateToGateway(wctx, delegateMsg) require.NoError(t, err) } + events := ctx.EventManager().Events() + require.Equal(t, int(maxDelegatedGateways), len(events)) + for _, event := range events { + require.Equal(t, "pocket.application.EventRedelegation", event.Type) + require.Equal(t, "app_address", event.Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), event.Attributes[0].Value) + } // Verify that the application exists - maxDelegatedGateways := k.GetParams(ctx).MaxDelegatedGateways foundApp, isAppFound := k.GetApplication(ctx, appAddr) require.True(t, isAppFound) require.Equal(t, appAddr, foundApp.Address) @@ -80,6 +83,11 @@ func TestMsgServer_UndelegateFromGateway_SuccessfullyUndelegate(t *testing.T) { // Undelegate the application from the gateway _, err = srv.UndelegateFromGateway(wctx, undelegateMsg) require.NoError(t, err) + events = ctx.EventManager().Events() + require.Equal(t, int(maxDelegatedGateways)+1, len(events)) + require.Equal(t, "pocket.application.EventRedelegation", events[7].Type) + require.Equal(t, "app_address", events[7].Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), events[7].Attributes[0].Value) foundApp, isAppFound = k.GetApplication(ctx, appAddr) require.True(t, isAppFound) require.Equal(t, appAddr, foundApp.Address) @@ -100,12 +108,8 @@ func TestMsgServer_UndelegateFromGateway_FailNotDelegated(t *testing.T) { gatewayAddr1 := sample.AccAddress() gatewayAddr2 := sample.AccAddress() // Mock the gateway being staked via the staked gateway map - keepertest.StakedGatewayMap[gatewayAddr1] = struct{}{} - keepertest.StakedGatewayMap[gatewayAddr2] = struct{}{} - t.Cleanup(func() { - delete(keepertest.StakedGatewayMap, gatewayAddr1) - delete(keepertest.StakedGatewayMap, gatewayAddr2) - }) + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr1) + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr2) // Prepare the application stakeMsg := &types.MsgStakeApplication{ @@ -137,6 +141,8 @@ func TestMsgServer_UndelegateFromGateway_FailNotDelegated(t *testing.T) { require.True(t, isAppFound) require.Equal(t, appAddr, foundApp.Address) require.Equal(t, 0, len(foundApp.DelegateeGatewayAddresses)) + events := ctx.EventManager().Events() + require.Equal(t, 0, len(events)) // Prepare a delegation message delegateMsg := &types.MsgDelegateToGateway{ @@ -147,10 +153,17 @@ func TestMsgServer_UndelegateFromGateway_FailNotDelegated(t *testing.T) { // Delegate the application to the gateway _, err = srv.DelegateToGateway(wctx, delegateMsg) require.NoError(t, err) + events = ctx.EventManager().Events() + require.Equal(t, 1, len(events)) + require.Equal(t, "pocket.application.EventRedelegation", events[0].Type) + require.Equal(t, "app_address", events[0].Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), events[0].Attributes[0].Value) // Ensure the failed undelegation did not affect the application _, err = srv.UndelegateFromGateway(wctx, undelegateMsg) require.ErrorIs(t, err, types.ErrAppNotDelegated) + events = ctx.EventManager().Events() + require.Equal(t, 1, len(events)) foundApp, isAppFound = k.GetApplication(ctx, appAddr) require.True(t, isAppFound) require.Equal(t, 1, len(foundApp.DelegateeGatewayAddresses)) @@ -166,7 +179,7 @@ func TestMsgServer_UndelegateFromGateway_SuccessfullyUndelegateFromUnstakedGatew appAddr := sample.AccAddress() gatewayAddr := sample.AccAddress() // Mock the gateway being staked via the staked gateway map - keepertest.StakedGatewayMap[gatewayAddr] = struct{}{} + keepertest.AddGatewayToStakedGatewayMap(t, gatewayAddr) // Prepare the application stakeMsg := &types.MsgStakeApplication{ @@ -193,6 +206,11 @@ func TestMsgServer_UndelegateFromGateway_SuccessfullyUndelegateFromUnstakedGatew // Delegate the application to the gateway _, err = srv.DelegateToGateway(wctx, delegateMsg) require.NoError(t, err) + events := ctx.EventManager().Events() + require.Equal(t, 1, len(events)) + require.Equal(t, "pocket.application.EventRedelegation", events[0].Type) + require.Equal(t, "app_address", events[0].Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), events[0].Attributes[0].Value) // Verify that the application exists foundApp, isAppFound := k.GetApplication(ctx, appAddr) @@ -202,7 +220,7 @@ func TestMsgServer_UndelegateFromGateway_SuccessfullyUndelegateFromUnstakedGatew require.Equal(t, gatewayAddr, foundApp.DelegateeGatewayAddresses[0]) // Mock unstaking the gateway - delete(keepertest.StakedGatewayMap, gatewayAddr) + keepertest.RemoveGatewayFromStakedGatewayMap(t, gatewayAddr) // Prepare an undelegation message undelegateMsg := &types.MsgUndelegateFromGateway{ @@ -213,6 +231,11 @@ func TestMsgServer_UndelegateFromGateway_SuccessfullyUndelegateFromUnstakedGatew // Undelegate the application from the gateway _, err = srv.UndelegateFromGateway(wctx, undelegateMsg) require.NoError(t, err) + events = ctx.EventManager().Events() + require.Equal(t, 2, len(events)) + require.Equal(t, "pocket.application.EventRedelegation", events[1].Type) + require.Equal(t, "app_address", events[1].Attributes[0].Key) + require.Equal(t, fmt.Sprintf("\"%s\"", appAddr), events[1].Attributes[0].Value) foundApp, isAppFound = k.GetApplication(ctx, appAddr) require.True(t, isAppFound) require.Equal(t, appAddr, foundApp.Address) diff --git a/x/application/types/message_delegate_to_gateway.go b/x/application/types/message_delegate_to_gateway.go index 652b5baa6..71b17984f 100644 --- a/x/application/types/message_delegate_to_gateway.go +++ b/x/application/types/message_delegate_to_gateway.go @@ -37,6 +37,13 @@ func (msg *MsgDelegateToGateway) GetSignBytes() []byte { return sdk.MustSortJSON(bz) } +func (msg *MsgDelegateToGateway) NewRedelegationEvent() *EventRedelegation { + return &EventRedelegation{ + AppAddress: msg.AppAddress, + GatewayAddress: msg.GatewayAddress, + } +} + func (msg *MsgDelegateToGateway) ValidateBasic() error { // Validate the application address if _, err := sdk.AccAddressFromBech32(msg.AppAddress); err != nil { diff --git a/x/application/types/message_undelegate_from_gateway.go b/x/application/types/message_undelegate_from_gateway.go index 4d74748c1..0ab47aa47 100644 --- a/x/application/types/message_undelegate_from_gateway.go +++ b/x/application/types/message_undelegate_from_gateway.go @@ -37,6 +37,13 @@ func (msg *MsgUndelegateFromGateway) GetSignBytes() []byte { return sdk.MustSortJSON(bz) } +func (msg *MsgUndelegateFromGateway) NewRedelegationEvent() *EventRedelegation { + return &EventRedelegation{ + AppAddress: msg.AppAddress, + GatewayAddress: msg.GatewayAddress, + } +} + func (msg *MsgUndelegateFromGateway) ValidateBasic() error { // Validate the application address if _, err := sdk.AccAddressFromBech32(msg.AppAddress); err != nil { diff --git a/x/supplier/keeper/query_supplier.go b/x/supplier/keeper/query_supplier.go index 7f381104c..c27e8e8f6 100644 --- a/x/supplier/keeper/query_supplier.go +++ b/x/supplier/keeper/query_supplier.go @@ -34,7 +34,6 @@ func (k Keeper) SupplierAll(goCtx context.Context, req *types.QueryAllSupplierRe suppliers = append(suppliers, supplier) return nil }) - if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/x/supplier/keeper/supplier.go b/x/supplier/keeper/supplier.go index 559ca4a97..c5edeefc4 100644 --- a/x/supplier/keeper/supplier.go +++ b/x/supplier/keeper/supplier.go @@ -21,7 +21,6 @@ func (k Keeper) SetSupplier(ctx sdk.Context, supplier sharedtypes.Supplier) { func (k Keeper) GetSupplier( ctx sdk.Context, supplierAddr string, - ) (supplier sharedtypes.Supplier, found bool) { store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.SupplierKeyPrefix)) @@ -40,7 +39,6 @@ func (k Keeper) GetSupplier( func (k Keeper) RemoveSupplier( ctx sdk.Context, supplierAddr string, - ) { store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.SupplierKeyPrefix)) store.Delete(types.SupplierKey(