diff --git a/.gitignore b/.gitignore index a10753b59..8fe75f2aa 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,12 @@ localnet_config.yaml # Relase artifacts produced by `ignite chain build --release` release +# SMT KVStore files +# TODO_TECHDEBT(#126, @red-0ne): Rename `smt` to `smt_stores` and make it configurable so it can be stored anywhere on this +smt + +# Do not allow a multi-moduled projected go.work.sum + +# TODO_TECHDEBT: It seems that .dot files come and go so we need to figure out the root cause: https://github.com/pokt-network/poktroll/pull/177/files#r1392521547 +# **/*.dot diff --git a/Makefile b/Makefile index 104950507..19120c578 100644 --- a/Makefile +++ b/Makefile @@ -282,8 +282,9 @@ app_list: ## List all the staked applications app_stake: ## Stake tokens for the application specified (must specify the APP and SERVICES env vars) poktrolld --home=$(POKTROLLD_HOME) tx application stake-application 1000upokt $(SERVICES) --keyring-backend test --from $(APP) --node $(POCKET_NODE) +# TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper .PHONY: app1_stake -app1_stake: ## Stake app1 +app1_stake: ## Stake app1 (also staked in genesis) APP=app1 SERVICES=anvil,svc1,svc2 make app_stake .PHONY: app2_stake @@ -362,17 +363,24 @@ supplier_list: ## List all the staked supplier supplier_stake: ## Stake tokens for the supplier specified (must specify the APP env var) poktrolld --home=$(POKTROLLD_HOME) tx supplier stake-supplier 1000upokt "$(SERVICES)" --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE) +# TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper .PHONY: supplier1_stake -supplier1_stake: ## Stake supplier1 - SUPPLIER=supplier1 SERVICES="anvil;http://anvil:8547,svc1;http://localhost:8081" make supplier_stake +supplier1_stake: ## Stake supplier1 (also staked in genesis) + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. + # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). + SUPPLIER=supplier1 SERVICES="anvil;http://localhost:8545,svc1;http://localhost:8081" make supplier_stake .PHONY: supplier2_stake supplier2_stake: ## Stake supplier2 - SUPPLIER=supplier2 SERVICES="anvil;http://anvil:8547,svc2;http://localhost:8082" make supplier_stake + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. + # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). + SUPPLIER=supplier2 SERVICES="anvil;http://localhost:8545,svc2;http://localhost:8082" make supplier_stake .PHONY: supplier3_stake supplier3_stake: ## Stake supplier3 - SUPPLIER=supplier3 SERVICES="anvil;http://anvil:8547,svc3;http://localhost:8083" make supplier_stake + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. + # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). + SUPPLIER=supplier3 SERVICES="anvil;http://localhost:8545,svc3;http://localhost:8083" make supplier_stake .PHONY: supplier_unstake supplier_unstake: ## Unstake an supplier (must specify the SUPPLIER env var) diff --git a/cmd/pocketd/cmd/root.go b/cmd/pocketd/cmd/root.go index cf58f2447..55b7071fc 100644 --- a/cmd/pocketd/cmd/root.go +++ b/cmd/pocketd/cmd/root.go @@ -44,6 +44,7 @@ import ( "github.com/pokt-network/poktroll/app" appparams "github.com/pokt-network/poktroll/app/params" appgateservercmd "github.com/pokt-network/poktroll/pkg/appgateserver/cmd" + relayercmd "github.com/pokt-network/poktroll/pkg/relayer/cmd" ) // NewRootCmd creates a new root command for a Cosmos SDK application @@ -142,6 +143,11 @@ func initRootCmd( addModuleInitFlags, ) + // add relayer command + rootCmd.AddCommand( + relayercmd.RelayerCmd(), + ) + // add keybase, auxiliary RPC, query, and tx child commands rootCmd.AddCommand( rpc.StatusCommand(), diff --git a/cmd/signals/on_exit.go b/cmd/signals/on_exit.go new file mode 100644 index 000000000..1106cce96 --- /dev/null +++ b/cmd/signals/on_exit.go @@ -0,0 +1,23 @@ +package signals + +import ( + "os" + "os/signal" +) + +// GoOnExitSignal calls the given callback when the process receives an interrupt +// or kill signal. +func GoOnExitSignal(onInterrupt func()) { + go func() { + // Set up sigCh to receive when this process receives an interrupt or + // kill signal. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, os.Kill) + + // Block until we receive an interrupt or kill signal (OS-agnostic) + <-sigCh + + // Call the onInterrupt callback. + onInterrupt() + }() +} diff --git a/config.yml b/config.yml index f6e3c1e89..4dac5f556 100644 --- a/config.yml +++ b/config.yml @@ -100,6 +100,7 @@ genesis: - endpoints: - configs: [] rpc_type: JSON_RPC + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to it instead of `localhost`. url: http://localhost:8545 service: id: anvil diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index 663562e98..7a541a511 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -7,24 +7,25 @@ import ( "log" "net/http" "net/url" - "os" - "os/signal" "cosmossdk.io/depinject" cosmosclient "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/flags" + cosmosflags "github.com/cosmos/cosmos-sdk/client/flags" "github.com/spf13/cobra" + "github.com/pokt-network/poktroll/cmd/signals" "github.com/pokt-network/poktroll/pkg/appgateserver" - blockclient "github.com/pokt-network/poktroll/pkg/client/block" - eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/deps/config" ) +// We're `explicitly omitting default` so that the appgateserver crashes if these aren't specified. +const omittedDefaultFlagValue = "explicitly omitting default" + var ( flagSigningKey string flagSelfSigning bool flagListeningEndpoint string - flagCometWebsocketUrl string + flagQueryNodeUrl string ) func AppGateServerCmd() *cobra.Command { @@ -56,13 +57,15 @@ relays to the AppGate server and function as an Application, provided that: RunE: runAppGateServer, } + // Custom flags cmd.Flags().StringVar(&flagSigningKey, "signing-key", "", "The name of the key that will be used to sign relays") cmd.Flags().StringVar(&flagListeningEndpoint, "listening-endpoint", "http://localhost:42069", "The host and port that the appgate server will listen on") - cmd.Flags().StringVar(&flagCometWebsocketUrl, "comet-websocket-url", "ws://localhost:36657/websocket", "The URL of the comet websocket endpoint to communicate with the pocket blockchain") cmd.Flags().BoolVar(&flagSelfSigning, "self-signing", false, "Whether the server should sign all incoming requests with its own ring (for applications)") + cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "tcp://: to a full pocket node for reading data and listening for on-chain events") - cmd.Flags().String(flags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") - cmd.Flags().String(flags.FlagNode, "tcp://localhost:36657", "The URL of the comet tcp endpoint to communicate with the pocket blockchain") + // Cosmos flags + cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly and uses flagQueryNodeUrl underneath") return cmd } @@ -72,18 +75,8 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { ctx, cancelCtx := context.WithCancel(cmd.Context()) defer cancelCtx() - // Handle interrupts in a goroutine. - go func() { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt) - - // Block until we receive an interrupt or kill signal (OS-agnostic) - <-sigCh - log.Println("INFO: Interrupt signal received, shutting down...") - - // Signal goroutines to stop - cancelCtx() - }() + // Handle interrupt and kill signals asynchronously. + signals.GoOnExitSignal(cancelCtx) // Parse the listening endpoint. listeningUrl, err := url.Parse(flagListeningEndpoint) @@ -92,7 +85,7 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { } // Setup the AppGate server dependencies. - appGateServerDeps, err := setupAppGateServerDependencies(cmd, ctx, flagCometWebsocketUrl) + appGateServerDeps, err := setupAppGateServerDependencies(ctx, cmd) if err != nil { return fmt.Errorf("failed to setup AppGate server dependencies: %w", err) } @@ -127,21 +120,62 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { return nil } -func setupAppGateServerDependencies(cmd *cobra.Command, ctx context.Context, cometWebsocketUrl string) (depinject.Config, error) { - // Retrieve the client context for the chain interactions. - clientCtx := cosmosclient.GetClientContextFromCmd(cmd) +func setupAppGateServerDependencies( + ctx context.Context, + cmd *cobra.Command, +) (depinject.Config, error) { + pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl() + if err != nil { + return nil, err + } - // Create the events client. - eventsQueryClient := eventsquery.NewEventsQueryClient(flagCometWebsocketUrl) + supplierFuncs := []config.SupplierFn{ + config.NewSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), + config.NewSupplyBlockClientFn(pocketNodeWebsocketUrl), + newSupplyQueryClientContextFn(flagQueryNodeUrl), + } - // Create the block client. - log.Printf("INFO: Creating block client, using comet websocket URL: %s...", flagCometWebsocketUrl) - deps := depinject.Supply(eventsQueryClient) - blockClient, err := blockclient.NewBlockClient(ctx, deps, flagCometWebsocketUrl) + return config.SupplyConfig(ctx, cmd, supplierFuncs) +} + +// getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to +// connect to for subscribing to on-chain events. +func getPocketNodeWebsocketUrl() (string, error) { + if flagQueryNodeUrl == omittedDefaultFlagValue { + return "", errors.New("missing required flag: --query-node") + } + + pocketNodeURL, err := url.Parse(flagQueryNodeUrl) if err != nil { - return nil, fmt.Errorf("failed to create block client: %w", err) + return "", err } - // Return the dependencies config. - return depinject.Supply(clientCtx, blockClient), nil + return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil +} + +// newSupplyQueryClientContextFn returns a new depinject.Config which is supplied with +// the given deps and a new cosmos ClientCtx +func newSupplyQueryClientContextFn(pocketQueryClientUrl string) config.SupplierFn { + return func( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, + ) (depinject.Config, error) { + // Set --node flag to the pocketQueryClientUrl for the client context + // This flag is read by cosmosclient.GetClientQueryContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, pocketQueryClientUrl) + if err != nil { + return nil, err + } + + // Get the client context from the command. + queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply( + queryClientCtx, + )) + return deps, nil + } } diff --git a/pkg/appgateserver/endpoint_selector.go b/pkg/appgateserver/endpoint_selector.go index 380c5dad5..c0cb22aee 100644 --- a/pkg/appgateserver/endpoint_selector.go +++ b/pkg/appgateserver/endpoint_selector.go @@ -31,7 +31,7 @@ func (app *appGateServer) getRelayerUrl( if endpoint.RpcType == rpcType { supplierUrl, err := url.Parse(endpoint.Url) if err != nil { - log.Printf("error parsing url: %s", err) + log.Printf("ERROR: error parsing url: %s", err) continue } return supplierUrl, supplier.Address, nil diff --git a/pkg/appgateserver/errors.go b/pkg/appgateserver/errors.go index 2c8f281bd..eecf99f44 100644 --- a/pkg/appgateserver/errors.go +++ b/pkg/appgateserver/errors.go @@ -10,4 +10,6 @@ var ( ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 4, "missing application address") ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 5, "missing app client signing information") ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 6, "missing app client listening endpoint") + ErrAppGateEmptyRelayResponse = sdkerrors.Register(codespace, 7, "empty relay response") + ErrAppGateHandleRelay = sdkerrors.Register(codespace, 8, "internal error handling relay request") ) diff --git a/pkg/appgateserver/jsonrpc.go b/pkg/appgateserver/jsonrpc.go index 80784fb44..915167dc8 100644 --- a/pkg/appgateserver/jsonrpc.go +++ b/pkg/appgateserver/jsonrpc.go @@ -25,23 +25,29 @@ func (app *appGateServer) handleJSONRPCRelay( // Read the request body bytes. payloadBz, err := io.ReadAll(request.Body) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("reading relay request body: %s", err) } + log.Printf("DEBUG: relay request body: %s", string(payloadBz)) // Create the relay request payload. relayRequestPayload := &types.RelayRequest_JsonRpcPayload{} - relayRequestPayload.JsonRpcPayload.Unmarshal(payloadBz) + jsonPayload := &types.JSONRPCRequestPayload{} + cdc := types.ModuleCdc + if err := cdc.UnmarshalJSON(payloadBz, jsonPayload); err != nil { + return err + } + relayRequestPayload.JsonRpcPayload = jsonPayload session, err := app.getCurrentSession(ctx, appAddress, serviceId) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("getting current session: %s", err) } log.Printf("DEBUG: Current session ID: %s", session.SessionId) // Get a supplier URL and address for the given service and session. supplierUrl, supplierAddress, err := app.getRelayerUrl(ctx, serviceId, sharedtypes.RPCType_JSON_RPC, session) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("getting supplier URL: %s", err) } // Create the relay request. @@ -56,28 +62,32 @@ func (app *appGateServer) handleJSONRPCRelay( // Get the application's signer. signer, err := app.getRingSingerForAppAddress(ctx, appAddress) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("getting signer: %s", err) } // Hash and sign the request's signable bytes. signableBz, err := relayRequest.GetSignableBytes() if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("getting signable bytes: %s", err) } hash := crypto.Sha256(signableBz) signature, err := signer.Sign(hash) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("signing relay: %s", err) } relayRequest.Meta.Signature = signature // Marshal the relay request to bytes and create a reader to be used as an HTTP request body. - relayRequestBz, err := relayRequest.Marshal() + relayRequestBz, err := cdc.Marshal(relayRequest) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("marshaling relay request: %s", err) } relayRequestReader := io.NopCloser(bytes.NewReader(relayRequestBz)) + var relayReq types.RelayRequest + if err := relayReq.Unmarshal(relayRequestBz); err != nil { + return ErrAppGateHandleRelay.Wrapf("unmarshaling relay response: %s", err) + } // Create the HTTP request to send the request to the relayer. relayHTTPRequest := &http.Request{ @@ -90,19 +100,19 @@ func (app *appGateServer) handleJSONRPCRelay( log.Printf("DEBUG: Sending signed relay request to %s", supplierUrl) relayHTTPResponse, err := http.DefaultClient.Do(relayHTTPRequest) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("sending relay request: %s", err) } // Read the response body bytes. relayResponseBz, err := io.ReadAll(relayHTTPResponse.Body) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("reading relay response body: %s", err) } // Unmarshal the response bytes into a RelayResponse. relayResponse := &types.RelayResponse{} if err := relayResponse.Unmarshal(relayResponseBz); err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("unmarshaling relay response: %s", err) } // Verify the response signature. We use the supplier address that we got from @@ -111,20 +121,21 @@ func (app *appGateServer) handleJSONRPCRelay( // as in some relayer early failures, it may not be signed by the supplier. // TODO_IMPROVE: Add more logging & telemetry so we can get visibility and signal into // failed responses. - log.Println("DEBUG: Verifying signed relay response from...") if err := app.verifyResponse(ctx, supplierAddress, relayResponse); err != nil { - return err + // TODO_DISCUSS: should this be its own error type and asserted against in tests? + return ErrAppGateHandleRelay.Wrapf("verifying relay response signature: %s", err) } // Marshal the response payload to bytes to be sent back to the application. - var responsePayloadBz []byte - if _, err = relayResponse.Payload.MarshalTo(responsePayloadBz); err != nil { - return err + relayResponsePayloadBz, err := cdc.MarshalJSON(relayResponse.GetJsonRpcPayload()) + if err != nil { + return ErrAppGateHandleRelay.Wrapf("unmarshallig relay response: %s", err) } // Reply with the RelayResponse payload. - if _, err := writer.Write(relayRequestBz); err != nil { - return err + log.Printf("DEBUG: Writing relay response payload: %s", string(relayResponsePayloadBz)) + if _, err := writer.Write(relayResponsePayloadBz); err != nil { + return ErrAppGateHandleRelay.Wrapf("writing relay response payload: %s", err) } return nil diff --git a/pkg/appgateserver/relay_verifier.go b/pkg/appgateserver/relay_verifier.go index 712eda7f9..4f86e1cdd 100644 --- a/pkg/appgateserver/relay_verifier.go +++ b/pkg/appgateserver/relay_verifier.go @@ -4,6 +4,8 @@ import ( "context" "github.com/cometbft/cometbft/crypto" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" @@ -23,6 +25,9 @@ func (app *appGateServer) verifyResponse( } // Extract the supplier's signature + if relayResponse.Meta == nil { + return ErrAppGateEmptyRelayResponse + } supplierSignature := relayResponse.Meta.SupplierSignature // Get the relay response signable bytes and hash them. @@ -60,12 +65,15 @@ func (app *appGateServer) getSupplierPubKeyFromAddress( } // Unmarshal the query response into a BaseAccount. - account := new(accounttypes.BaseAccount) - if err := account.Unmarshal(accQueryRes.Account.Value); err != nil { + var acc accounttypes.AccountI + reg := codectypes.NewInterfaceRegistry() + accounttypes.RegisterInterfaces(reg) + cdc := codec.NewProtoCodec(reg) + if err := cdc.UnpackAny(accQueryRes.Account, &acc); err != nil { return nil, err } - fetchedPubKey := account.GetPubKey() + fetchedPubKey := acc.GetPubKey() // Cache the retrieved public key. app.supplierAccountCache[supplierAddress] = fetchedPubKey diff --git a/pkg/appgateserver/server.go b/pkg/appgateserver/server.go index d6410021c..fe4f15000 100644 --- a/pkg/appgateserver/server.go +++ b/pkg/appgateserver/server.go @@ -225,7 +225,9 @@ func (app *appGateServer) replyWithError(writer http.ResponseWriter, err error) relayResponse := &types.RelayResponse{ Payload: &types.RelayResponse_JsonRpcPayload{ JsonRpcPayload: &types.JSONRPCResponsePayload{ - Id: make([]byte, 0), + // TODO_BLOCKER(@red-0ne): This MUST match the Id provided by the request. + // If JSON-RPC request is not unmarshaled yet (i.e. can't extract ID), it SHOULD be a random ID. + Id: 0, Jsonrpc: "2.0", Error: &types.JSONRPCResponseError{ // Using conventional error code indicating internal server error. diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 375171d28..7ded373f2 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -75,6 +75,9 @@ type eventBytesToBlockMapFn = func( ) (client.Block, bool) // NewBlockClient creates a new block client from the given dependencies and cometWebsocketURL. +// +// Required dependencies: +// - client.EventsQueryClient func NewBlockClient( ctx context.Context, deps depinject.Config, diff --git a/pkg/client/events_query/client.go b/pkg/client/events_query/client.go index 88ace493f..34a7d35d2 100644 --- a/pkg/client/events_query/client.go +++ b/pkg/client/events_query/client.go @@ -62,6 +62,11 @@ func (ebc *eventsBytesAndConn) Close() { _ = ebc.conn.Close() } +// NewEventsQueryClient returns a new events query client which is used to +// subscribe to on-chain events matching the given query. +// +// Available options: +// - WithDialer func NewEventsQueryClient(cometWebsocketURL string, opts ...client.EventsQueryClientOption) client.EventsQueryClient { evtClient := &eventsQueryClient{ cometWebsocketURL: cometWebsocketURL, diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index a02b32542..39f5208e0 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -105,6 +105,15 @@ type TxEvent struct { // validateConfigAndSetDefaults method. // 5. Subscribes the client to its own transactions. This step might be // reconsidered for relocation to a potential Start() method in the future. +// +// Required dependencies: +// - client.TxContext +// - client.EventsQueryClient +// - client.BlockClient +// +// Available options: +// - WithSigningKeyName +// - WithCommitTimeoutHeightOffset func NewTxClient( ctx context.Context, deps depinject.Config, @@ -517,7 +526,7 @@ func (tClient *txClient) txEventFromEventBz( return either.Error[*TxEvent](ErrUnmarshalTx.Wrapf("%s", err)), true } - // For successful unmarshalling, return the TxEvent. + // For successful unmarshaling, return the TxEvent. return either.Success(txEvt), false } diff --git a/pkg/client/tx/context.go b/pkg/client/tx/context.go index eca32f943..2bfa0da5f 100644 --- a/pkg/client/tx/context.go +++ b/pkg/client/tx/context.go @@ -12,6 +12,7 @@ import ( authclient "github.com/cosmos/cosmos-sdk/x/auth/client" "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/relayer" ) var _ client.TxContext = (*cosmosTxContext)(nil) @@ -21,7 +22,7 @@ var _ client.TxContext = (*cosmosTxContext)(nil) type cosmosTxContext struct { // Holds cosmos-sdk client context. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client#Context) - clientCtx cosmosclient.Context + clientCtx relayer.TxClientContext // Holds the cosmos-sdk transaction factory. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client/tx#Factory) txFactory cosmostx.Factory @@ -30,6 +31,10 @@ type cosmosTxContext struct { // NewTxContext initializes a new cosmosTxContext with the given dependencies. // It uses depinject to populate its members and returns a client.TxContext // interface type. +// +// Required dependencies: +// - cosmosclient.Context +// - cosmostx.Factory func NewTxContext(deps depinject.Config) (client.TxContext, error) { txCtx := cosmosTxContext{} @@ -60,7 +65,7 @@ func (txCtx cosmosTxContext) SignTx( ) error { return authclient.SignTx( txCtx.txFactory, - txCtx.clientCtx, + cosmosclient.Context(txCtx.clientCtx), signingKeyName, txBuilder, offline, overwriteSig, @@ -80,8 +85,8 @@ func (txCtx cosmosTxContext) EncodeTx(txBuilder cosmosclient.TxBuilder) ([]byte, // BroadcastTx broadcasts the given transaction to the network, blocking until the check-tx // ABCI operation completes and returns a TxResponse of the transaction status at that point in time. func (txCtx cosmosTxContext) BroadcastTx(txBytes []byte) (*cosmostypes.TxResponse, error) { - return txCtx.clientCtx.BroadcastTxAsync(txBytes) - //return txCtx.clientCtx.BroadcastTxSync(txBytes) + clientCtx := cosmosclient.Context(txCtx.clientCtx) + return clientCtx.BroadcastTxAsync(txBytes) } // QueryTx queries the transaction based on its hash and optionally provides proof diff --git a/pkg/client/tx/errors.go b/pkg/client/tx/errors.go index 1e43f1d05..328d7a51d 100644 --- a/pkg/client/tx/errors.go +++ b/pkg/client/tx/errors.go @@ -32,7 +32,7 @@ var ( // byte data isn't recognized as a valid transaction event representation. ErrNonTxEventBytes = errorsmod.Register(codespace, 9, "attempted to deserialize non-tx event bytes") - // ErrUnmarshalTx signals a failure in the unmarshalling process of a transaction. + // ErrUnmarshalTx signals a failure in the unmarshaling process of a transaction. // This error is triggered when the system encounters issues translating a set of // bytes into the corresponding Tx structure or object. ErrUnmarshalTx = errorsmod.Register(codespace, 10, "failed to unmarshal tx") diff --git a/pkg/deps/config/config.go b/pkg/deps/config/config.go new file mode 100644 index 000000000..9621fa1de --- /dev/null +++ b/pkg/deps/config/config.go @@ -0,0 +1,73 @@ +package config + +import ( + "context" + + "cosmossdk.io/depinject" + "github.com/spf13/cobra" + + "github.com/pokt-network/poktroll/pkg/client/block" + eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" +) + +// SupplierFn is a function that is used to supply a depinject config. +type SupplierFn func( + context.Context, + depinject.Config, + *cobra.Command, +) (depinject.Config, error) + +// SupplyConfig supplies a depinject config by calling each of the supplied +// supplier functions in order and passing the result of each supplier to the +// next supplier, chaining them together. +func SupplyConfig( + ctx context.Context, + cmd *cobra.Command, + suppliers []SupplierFn, +) (deps depinject.Config, err error) { + // Initialize deps to with empty depinject config. + deps = depinject.Configs() + for _, supplyFn := range suppliers { + deps, err = supplyFn(ctx, deps, cmd) + if err != nil { + return nil, err + } + } + return deps, nil +} + +// NewSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns +// a new depinject.Config which is supplied with the given deps and the new +// EventsQueryClient. +func NewSupplyEventsQueryClientFn( + pocketNodeWebsocketUrl string, +) SupplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) + + return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil + } +} + +// NewSupplyBlockClientFn returns a function with constructs a BlockClient instance +// with the given nodeURL and returns a new +// depinject.Config which is supplied with the given deps and the new +// BlockClient. +func NewSupplyBlockClientFn(pocketNodeWebsocketUrl string) SupplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(blockClient)), nil + } +} diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go new file mode 100644 index 000000000..4f43c4ff2 --- /dev/null +++ b/pkg/relayer/cmd/cmd.go @@ -0,0 +1,392 @@ +package cmd + +import ( + "context" + "fmt" + "log" + "net/url" + + "cosmossdk.io/depinject" + cosmosclient "github.com/cosmos/cosmos-sdk/client" + cosmosflags "github.com/cosmos/cosmos-sdk/client/flags" + cosmostx "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/spf13/cobra" + + "github.com/pokt-network/poktroll/cmd/signals" + "github.com/pokt-network/poktroll/pkg/client/block" + eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/client/supplier" + "github.com/pokt-network/poktroll/pkg/client/tx" + "github.com/pokt-network/poktroll/pkg/deps/config" + "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/pkg/relayer/miner" + "github.com/pokt-network/poktroll/pkg/relayer/proxy" + "github.com/pokt-network/poktroll/pkg/relayer/session" +) + +// We're `explicitly omitting default` so the relayer crashes if these aren't specified. +const omittedDefaultFlagValue = "explicitly omitting default" + +// TODO_CONSIDERATION: Consider moving all flags defined in `/pkg` to a `flags.go` file. +var ( + flagSigningKeyName string + flagSmtStorePath string + flagNetworkNodeUrl string + flagQueryNodeUrl string +) + +func RelayerCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "relayminer", + Short: "Run a relay miner", + Long: `Run a relay miner. The relay miner process configures and starts +relay servers for each service the supplier actor identified by --signing-key is +staked for (configured on-chain). + +Relay requests received by the relay servers are validated and proxied to their +respective service endpoints, maintained by the relayer off-chain. The responses +are then signed and sent back to the requesting application. + +For each successfully served relay, the miner will hash and compare its difficulty +against an on-chain threshold. If the difficulty is sufficient, it is applicable +to relay volume and therefore rewards. Such relays are inserted into and persisted +via an SMT KV store. The miner will monitor the current block height and periodically +submit claim and proof messages according to the protocol as sessions become eligible +for such operations.`, + RunE: runRelayer, + } + + cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") + + // TODO_TECHDEBT: integrate these flags with the client context (i.e. cosmosflags, config, viper, etc.) + // This is simpler to do with server-side configs (see rootCmd#PersistentPreRunE) and requires more effort than currently justifiable. + cmd.Flags().StringVar(&flagSigningKeyName, "signing-key", "", "Name of the key to sign transactions") + // TODO_TECHDEBT(#137): This, alongside other flags, should be part of a config file suppliers provide. + cmd.Flags().StringVar(&flagSmtStorePath, "smt-store", "smt", "Path to where the data backing SMT KV store exists on disk") + // Communication flags + cmd.Flags().StringVar(&flagNetworkNodeUrl, "network-node", omittedDefaultFlagValue, "tcp://: to a pocket node that gossips transactions throughout the network (may or may not be the sequencer") + cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "tcp://: to a full pocket node for reading data and listening for on-chain events") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly and uses flagQueryNodeUrl underneath") + + return cmd +} + +func runRelayer(cmd *cobra.Command, _ []string) error { + ctx, cancelCtx := context.WithCancel(cmd.Context()) + // Ensure context cancellation. + defer cancelCtx() + + // Handle interrupt and kill signals asynchronously. + signals.GoOnExitSignal(cancelCtx) + + // Sets up the following dependencies: + // Miner, EventsQueryClient, BlockClient, cosmosclient.Context, TxFactory, + // TxContext, TxClient, SupplierClient, RelayerProxy, RelayerSessionsManager. + deps, err := setupRelayerDependencies(ctx, cmd) + if err != nil { + return err + } + + relayMiner, err := relayer.NewRelayMiner(ctx, deps) + if err != nil { + return err + } + + // Start the relay miner + log.Println("INFO: Starting relay miner...") + if err := relayMiner.Start(ctx); err != nil { + return err + } + + log.Println("INFO: Relay miner stopped; exiting") + return nil +} + +// setupRelayerDependencies sets up all the dependencies the relay miner needs +// to run by building the dependency tree from the leaves up, incrementally +// supplying each component to an accumulating depinject.Config: +// Miner, EventsQueryClient, BlockClient, cosmosclient.Context, TxFactory, TxContext, +// TxClient, SupplierClient, RelayerProxy, RelayerSessionsManager. +func setupRelayerDependencies( + ctx context.Context, + cmd *cobra.Command, +) (deps depinject.Config, err error) { + pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl() + if err != nil { + return nil, err + } + + supplierFuncs := []config.SupplierFn{ + config.NewSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), // leaf + config.NewSupplyBlockClientFn(pocketNodeWebsocketUrl), + supplyMiner, // leaf + supplyQueryClientContext, // leaf + supplyTxClientContext, // leaf + supplyTxFactory, + supplyTxContext, + supplyTxClient, + supplySupplierClient, + supplyRelayerProxy, + supplyRelayerSessionsManager, + } + + return config.SupplyConfig(ctx, cmd, supplierFuncs) +} + +// getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to +// connect to for subscribing to on-chain events. +func getPocketNodeWebsocketUrl() (string, error) { + if flagQueryNodeUrl == omittedDefaultFlagValue { + return "", fmt.Errorf("--query-node flag is required") + } + + pocketNodeURL, err := url.Parse(flagQueryNodeUrl) + if err != nil { + return "", err + } + + return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil +} + +// newSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns +// a new depinject.Config which is supplied with the given deps and the new +// EventsQueryClient. +func newSupplyEventsQueryClientFn( + pocketNodeWebsocketUrl string, +) config.SupplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) + + return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil + } +} + +// newSupplyBlockClientFn returns a function with constructs a BlockClient instance +// with the given nodeURL and returns a new +// depinject.Config which is supplied with the given deps and the new +// BlockClient. +func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) config.SupplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(blockClient)), nil + } +} + +// supplyMiner constructs a Miner instance and returns a new depinject.Config +// which is supplied with the given deps and the new Miner. +func supplyMiner( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + mnr, err := miner.NewMiner() + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(mnr)), nil +} + +// supplyQueryClientContext returns a function with constructs a ClientContext +// instance with the given cmd and returns a new depinject.Config which is +// supplied with the given deps and the new ClientContext. +func supplyQueryClientContext( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, +) (depinject.Config, error) { + // Set --node flag to the --query-node for the client context + // This flag is read by cosmosclient.GetClientQueryContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, flagQueryNodeUrl) + if err != nil { + return nil, err + } + + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply( + relayer.QueryClientContext(queryClientCtx), + )) + return deps, nil +} + +// supplyTxClientContext constructs a cosmosclient.Context instance and returns a +// new depinject.Config which is supplied with the given deps and the new +// cosmosclient.Context. +func supplyTxClientContext( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, +) (depinject.Config, error) { + // Set --node flag to the --network-node for this client context. + // This flag is read by cosmosclient.GetClientTxContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, flagNetworkNodeUrl) + if err != nil { + return nil, err + } + + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + txClientCtx, err := cosmosclient.GetClientTxContext(cmd) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply( + relayer.TxClientContext(txClientCtx), + )) + return deps, nil +} + +// supplyTxFactory constructs a cosmostx.Factory instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// cosmostx.Factory. +func supplyTxFactory( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, +) (depinject.Config, error) { + var txClientCtx relayer.TxClientContext + if err := depinject.Inject(deps, &txClientCtx); err != nil { + return nil, err + } + + clientCtx := cosmosclient.Context(txClientCtx) + clientFactory, err := cosmostx.NewFactoryCLI(clientCtx, cmd.Flags()) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(clientFactory)), nil +} + +func supplyTxContext( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + txContext, err := tx.NewTxContext(deps) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(txContext)), nil +} + +// supplyTxClient constructs a TxClient instance and returns a new +// depinject.Config which is supplied with the given deps and the new TxClient. +func supplyTxClient( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + txClient, err := tx.NewTxClient( + ctx, + deps, + tx.WithSigningKeyName(flagSigningKeyName), + // TODO_TECHDEBT: populate this from some config. + tx.WithCommitTimeoutBlocks(tx.DefaultCommitTimeoutHeightOffset), + ) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(txClient)), nil +} + +// supplySupplierClient constructs a SupplierClient instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// SupplierClient. +func supplySupplierClient( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + supplierClient, err := supplier.NewSupplierClient( + deps, + supplier.WithSigningKeyName(flagSigningKeyName), + ) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(supplierClient)), nil +} + +// supplyRelayerProxy constructs a RelayerProxy instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// RelayerProxy. +func supplyRelayerProxy( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + // TODO_BLOCKER:(#137, @red-0ne): This MUST be populated via the `relayer.json` config file + // TODO_UPNEXT(@okdas): this hostname should be updated to match that of the in-tilt anvil service. + proxyServiceURL, err := url.Parse("http://localhost:8547/") + if err != nil { + return nil, err + } + + // TODO_TECHDEBT(#137, #130): Once the `relayer.json` config file is implemented AND a local LLM RPC service + // is supported on LocalNet, this needs to be expanded to include more than one service. The ability to support + // multiple services is already in place but currently (as seen below) is hardcoded. + proxiedServiceEndpoints := map[string]url.URL{ + "anvil": *proxyServiceURL, + } + + relayerProxy, err := proxy.NewRelayerProxy( + deps, + proxy.WithSigningKeyName(flagSigningKeyName), + proxy.WithProxiedServicesEndpoints(proxiedServiceEndpoints), + ) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(relayerProxy)), nil +} + +// supplyRelayerSessionsManager constructs a RelayerSessionsManager instance +// and returns a new depinject.Config which is supplied with the given deps and +// the new RelayerSessionsManager. +// See the comment next to `flagQueryNodeUrl` (if it still exists) on how/why +// we have multiple flags pointing to different node types. +func supplyRelayerSessionsManager( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + relayerSessionsManager, err := session.NewRelayerSessions( + ctx, deps, + session.WithStoresDirectory(flagSmtStorePath), + ) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(relayerSessionsManager)), nil +} diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 539361f2d..d58d8149a 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -3,6 +3,7 @@ package relayer import ( "context" + "github.com/cosmos/cosmos-sdk/client" "github.com/pokt-network/smt" "github.com/pokt-network/poktroll/pkg/observable" @@ -11,6 +12,18 @@ import ( sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) +// TxClientContext is used to distinguish a cosmosclient.Context intended for use +// in transactions from others. +// This type is intentionally not an alias in order to make this distinction clear +// to the dependency injector +type TxClientContext client.Context + +// QueryClientContext is used to distinguish a cosmosclient.Context intended for use +// in queries from others. +// This type is intentionally not an alias in order to make this distinction clear +// to the dependency injector +type QueryClientContext client.Context + // Miner is responsible for observing servedRelayObs, hashing and checking the // difficulty of each, finally publishing those with sufficient difficulty to // minedRelayObs as they are applicable for relay volume. diff --git a/pkg/relayer/protocol/block_heights.go b/pkg/relayer/protocol/block_heights.go index b372376f7..fcf27ec10 100644 --- a/pkg/relayer/protocol/block_heights.go +++ b/pkg/relayer/protocol/block_heights.go @@ -14,7 +14,7 @@ import ( // TODO_TEST(@bryanchriswhite): Add test coverage and more logs func GetEarliestCreateClaimHeight(createClaimWindowStartBlock client.Block) int64 { createClaimWindowStartBlockHash := createClaimWindowStartBlock.Hash() - log.Printf("using createClaimWindowStartBlock %d's hash %x as randomness", createClaimWindowStartBlock.Height(), createClaimWindowStartBlockHash) + log.Printf("DEBUG: using createClaimWindowStartBlock %d's hash %x as randomness", createClaimWindowStartBlock.Height(), createClaimWindowStartBlockHash) rngSeed, _ := binary.Varint(createClaimWindowStartBlockHash) randomNumber := rand.NewSource(rngSeed).Int63() @@ -32,7 +32,7 @@ func GetEarliestCreateClaimHeight(createClaimWindowStartBlock client.Block) int6 // TODO_TEST(@bryanchriswhite): Add test coverage and more logs func GetEarliestSubmitProofHeight(submitProofWindowStartBlock client.Block) int64 { earliestSubmitProofBlockHash := submitProofWindowStartBlock.Hash() - log.Printf("using submitProofWindowStartBlock %d's hash %x as randomness", submitProofWindowStartBlock.Height(), earliestSubmitProofBlockHash) + log.Printf("DEBUG: using submitProofWindowStartBlock %d's hash %x as randomness", submitProofWindowStartBlock.Height(), earliestSubmitProofBlockHash) rngSeed, _ := binary.Varint(earliestSubmitProofBlockHash) randomNumber := rand.NewSource(rngSeed).Int63() diff --git a/pkg/relayer/proxy/error_reply.go b/pkg/relayer/proxy/error_reply.go index d881057a0..617f3b917 100644 --- a/pkg/relayer/proxy/error_reply.go +++ b/pkg/relayer/proxy/error_reply.go @@ -12,11 +12,13 @@ import ( // the caller pass it along with the error if available. // TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC // Code, Message and Data. Possibly by augmenting the passed in error with the adequate information. -func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) { +func (jsrv *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) { relayResponse := &types.RelayResponse{ Payload: &types.RelayResponse_JsonRpcPayload{ JsonRpcPayload: &types.JSONRPCResponsePayload{ - Id: make([]byte, 0), + // TODO_BLOCKER(@red-0ne): This MUST match the Id provided by the request. + // If JSON-RPC request is not unmarshaled yet (i.e. can't extract ID), it SHOULD be a random ID. + Id: 0, Jsonrpc: "2.0", Error: &types.JSONRPCResponseError{ // Using conventional error code indicating internal server error. diff --git a/pkg/relayer/proxy/jsonrpc.go b/pkg/relayer/proxy/jsonrpc.go index 8ea953d6f..113d6285e 100644 --- a/pkg/relayer/proxy/jsonrpc.go +++ b/pkg/relayer/proxy/jsonrpc.go @@ -19,10 +19,6 @@ type jsonRPCServer struct { // service is the service that the server is responsible for. service *sharedtypes.Service - // serverEndpoint is the advertised endpoint configuration that the server uses to - // listen for incoming relay requests. - serverEndpoint *sharedtypes.SupplierEndpoint - // proxiedServiceEndpoint is the address of the proxied service that the server relays requests to. proxiedServiceEndpoint url.URL @@ -43,15 +39,14 @@ type jsonRPCServer struct { // a RelayServer that listens to incoming RelayRequests. func NewJSONRPCServer( service *sharedtypes.Service, - supplierEndpoint *sharedtypes.SupplierEndpoint, + supplierEndpointHost string, proxiedServiceEndpoint url.URL, servedRelaysProducer chan<- *types.Relay, proxy relayer.RelayerProxy, ) relayer.RelayServer { return &jsonRPCServer{ service: service, - serverEndpoint: supplierEndpoint, - server: &http.Server{Addr: supplierEndpoint.Url}, + server: &http.Server{Addr: supplierEndpointHost}, relayerProxy: proxy, proxiedServiceEndpoint: proxiedServiceEndpoint, servedRelaysProducer: servedRelaysProducer, @@ -67,6 +62,9 @@ func (jsrv *jsonRPCServer) Start(ctx context.Context) error { jsrv.server.Shutdown(ctx) }() + // Set the HTTP handler. + jsrv.server.Handler = jsrv + return jsrv.server.ListenAndServe() } @@ -76,8 +74,8 @@ func (jsrv *jsonRPCServer) Stop(ctx context.Context) error { } // Service returns the JSON-RPC service. -func (j *jsonRPCServer) Service() *sharedtypes.Service { - return j.service +func (jsrv *jsonRPCServer) Service() *sharedtypes.Service { + return jsrv.service } // ServeHTTP listens for incoming relay requests. It implements the respective @@ -86,6 +84,9 @@ func (j *jsonRPCServer) Service() *sharedtypes.Service { // (see https://pkg.go.dev/net/http#Handler) func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) { ctx := request.Context() + + log.Printf("DEBUG: Serving JSON-RPC relay request...") + // Relay the request to the proxied service and build the response that will be sent back to the client. relay, err := jsrv.serveHTTP(ctx, request) if err != nil { @@ -107,7 +108,7 @@ func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.R relay.Res.Meta.SessionHeader.ApplicationAddress, relay.Res.Meta.SessionHeader.Service.Id, relay.Res.Meta.SessionHeader.SessionStartBlockHeight, - jsrv.serverEndpoint.Url, + jsrv.server.Addr, ) // Emit the relay to the servedRelays observable. @@ -117,6 +118,7 @@ func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.R // serveHTTP holds the underlying logic of ServeHTTP. func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*types.Relay, error) { // Extract the relay request from the request body. + log.Printf("DEBUG: Extracting relay request from request body...") relayRequest, err := jsrv.newRelayRequest(request) if err != nil { return nil, err @@ -136,25 +138,27 @@ func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) // Get the relayRequest payload's `io.ReadCloser` to add it to the http.Request // that will be sent to the proxied (i.e. staked for) service. // (see https://pkg.go.dev/net/http#Request) Body field type. - var payloadBz []byte - if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil { + log.Printf("DEBUG: Getting relay request payload...") + cdc := types.ModuleCdc + payloadBz, err := cdc.MarshalJSON(relayRequest.GetJsonRpcPayload()) + if err != nil { return nil, err } requestBodyReader := io.NopCloser(bytes.NewBuffer(payloadBz)) + log.Printf("DEBUG: Relay request payload: %s", string(payloadBz)) // Build the request to be sent to the native service by substituting // the destination URL's host with the native service's listen address. - destinationURL, err := url.Parse(request.URL.String()) + log.Printf("DEBUG: Building relay request to native service %s...", jsrv.proxiedServiceEndpoint.String()) if err != nil { return nil, err } - destinationURL.Host = jsrv.proxiedServiceEndpoint.Host relayHTTPRequest := &http.Request{ Method: request.Method, Header: request.Header, - URL: destinationURL, - Host: destinationURL.Host, + URL: &jsrv.proxiedServiceEndpoint, + Host: jsrv.proxiedServiceEndpoint.Host, Body: requestBodyReader, } @@ -167,6 +171,7 @@ func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) // Build the relay response from the native service response // Use relayRequest.Meta.SessionHeader on the relayResponse session header since it was verified to be valid // and has to be the same as the relayResponse session header. + log.Printf("DEBUG: Building relay response from native service response...") relayResponse, err := jsrv.newRelayResponse(httpResponse, relayRequest.Meta.SessionHeader) if err != nil { return nil, err @@ -176,8 +181,9 @@ func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) } // sendRelayResponse marshals the relay response and sends it to the client. -func (j *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error { - relayResponseBz, err := relayResponse.Marshal() +func (jsrv *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error { + cdc := types.ModuleCdc + relayResponseBz, err := cdc.Marshal(relayResponse) if err != nil { return err } diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 9ed38e963..b6c2a5f68 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -7,7 +7,7 @@ import ( "cosmossdk.io/depinject" ringtypes "github.com/athanorlabs/go-dleq/types" - sdkclient "github.com/cosmos/cosmos-sdk/client" + cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/crypto/keyring" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" "golang.org/x/sync/errgroup" @@ -72,9 +72,9 @@ type relayerProxy struct { // servedRelays is an observable that notifies the miner about the relays that have been served. servedRelays observable.Observable[*types.Relay] - // servedRelaysProducer is a channel that emits the relays that have been served so that the + // servedRelaysPublishCh is a channel that emits the relays that have been served so that the // servedRelays observable can fan out the notifications to its subscribers. - servedRelaysProducer chan<- *types.Relay + servedRelaysPublishCh chan<- *types.Relay // ringCache is a cache of the public keys used to create the ring for a given application // they are stored in a map of application address to a slice of points on the secp256k1 curve @@ -83,7 +83,7 @@ type relayerProxy struct { ringCacheMutex *sync.RWMutex // clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies. - clientCtx sdkclient.Context + clientCtx relayer.QueryClientContext // supplierAddress is the address of the supplier that the relayer proxy is running for. supplierAddress string @@ -91,6 +91,14 @@ type relayerProxy struct { // NewRelayerProxy creates a new relayer proxy with the given dependencies or returns // an error if the dependencies fail to resolve or the options are invalid. +// +// Required dependencies: +// - cosmosclient.Context +// - client.BlockClient +// +// Available options: +// - WithSigningKeyName +// - WithProxiedServicesEndpoints func NewRelayerProxy( deps depinject.Config, opts ...relayer.RelayerProxyOption, @@ -105,14 +113,18 @@ func NewRelayerProxy( return nil, err } + clientCtx := cosmosclient.Context(rp.clientCtx) servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]() rp.servedRelays = servedRelays - rp.servedRelaysProducer = servedRelaysProducer - rp.accountsQuerier = accounttypes.NewQueryClient(rp.clientCtx) - rp.supplierQuerier = suppliertypes.NewQueryClient(rp.clientCtx) - rp.sessionQuerier = sessiontypes.NewQueryClient(rp.clientCtx) + rp.servedRelaysPublishCh = servedRelaysProducer + rp.accountsQuerier = accounttypes.NewQueryClient(clientCtx) + rp.supplierQuerier = suppliertypes.NewQueryClient(clientCtx) + rp.sessionQuerier = sessiontypes.NewQueryClient(clientCtx) + rp.applicationQuerier = apptypes.NewQueryClient(clientCtx) rp.keyring = rp.clientCtx.Keyring + rp.ringCache = make(map[string][]ringtypes.Point) // the key is the appAddress + rp.ringCacheMutex = &sync.RWMutex{} for _, opt := range opts { opt(rp) @@ -125,8 +137,9 @@ func NewRelayerProxy( return rp, nil } -// Start concurrently starts all advertised relay servers and returns an error if any of them fails to start. -// This method is blocking as long as all RelayServers are running. +// Start concurrently starts all advertised relay services and returns an error +// if any of them errors. +// This method IS BLOCKING until all RelayServers are stopped. func (rp *relayerProxy) Start(ctx context.Context) error { // The provided services map is built from the supplier's on-chain advertised information, // which is a runtime parameter that can be changed by the supplier. diff --git a/pkg/relayer/proxy/relay_builders.go b/pkg/relayer/proxy/relay_builders.go index beb400cad..c2e9775c4 100644 --- a/pkg/relayer/proxy/relay_builders.go +++ b/pkg/relayer/proxy/relay_builders.go @@ -2,6 +2,7 @@ package proxy import ( "io" + "log" "net/http" "github.com/pokt-network/poktroll/x/service/types" @@ -9,24 +10,25 @@ import ( ) // newRelayRequest builds a RelayRequest from an http.Request. -func (j *jsonRPCServer) newRelayRequest(request *http.Request) (*types.RelayRequest, error) { +func (jsrv *jsonRPCServer) newRelayRequest(request *http.Request) (*types.RelayRequest, error) { requestBz, err := io.ReadAll(request.Body) if err != nil { return nil, err } - var relayRequest types.RelayRequest - if err := relayRequest.Unmarshal(requestBz); err != nil { + log.Printf("DEBUG: Unmarshaling relay request...") + var relayReq types.RelayRequest + if err := relayReq.Unmarshal(requestBz); err != nil { return nil, err } - return &relayRequest, nil + return &relayReq, nil } // newRelayResponse builds a RelayResponse from an http.Response and a SessionHeader. // It also signs the RelayResponse and assigns it to RelayResponse.Meta.SupplierSignature. // If the response has a non-nil body, it will be parsed as a JSONRPCResponsePayload. -func (j *jsonRPCServer) newRelayResponse( +func (jsrv *jsonRPCServer) newRelayResponse( response *http.Response, sessionHeader *sessiontypes.SessionHeader, ) (*types.RelayResponse, error) { @@ -39,15 +41,19 @@ func (j *jsonRPCServer) newRelayResponse( return nil, err } - jsonRPCResponse := &types.JSONRPCResponsePayload{} - if err := jsonRPCResponse.Unmarshal(responseBz); err != nil { + log.Printf("DEBUG: Unmarshaling relay response...") + relayResponsePayload := &types.RelayResponse_JsonRpcPayload{} + jsonPayload := &types.JSONRPCResponsePayload{} + cdc := types.ModuleCdc + if err := cdc.UnmarshalJSON(responseBz, jsonPayload); err != nil { return nil, err } + relayResponsePayload.JsonRpcPayload = jsonPayload - relayResponse.Payload = &types.RelayResponse_JsonRpcPayload{JsonRpcPayload: jsonRPCResponse} + relayResponse.Payload = &types.RelayResponse_JsonRpcPayload{JsonRpcPayload: jsonPayload} // Sign the relay response and add the signature to the relay response metadata - if err = j.relayerProxy.SignRelayResponse(relayResponse); err != nil { + if err = jsrv.relayerProxy.SignRelayResponse(relayResponse); err != nil { return nil, err } diff --git a/pkg/relayer/proxy/relay_verifier.go b/pkg/relayer/proxy/relay_verifier.go index e64955d55..ba8bf7e32 100644 --- a/pkg/relayer/proxy/relay_verifier.go +++ b/pkg/relayer/proxy/relay_verifier.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "log" sdkerrors "cosmossdk.io/errors" ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" @@ -20,6 +21,7 @@ func (rp *relayerProxy) VerifyRelayRequest( service *sharedtypes.Service, ) error { // extract the relay request's ring signature + log.Printf("DEBUG: Verifying relay request signature...") signature := relayRequest.Meta.Signature if signature == nil { return sdkerrors.Wrapf( @@ -73,6 +75,7 @@ func (rp *relayerProxy) VerifyRelayRequest( } // Query for the current session to check if relayRequest sessionId matches the current session. + log.Printf("DEBUG: Verifying relay request session...") currentBlock := rp.blockClient.LatestBlock(ctx) sessionQuery := &sessiontypes.QueryGetSessionRequest{ ApplicationAddress: appAddress, diff --git a/pkg/relayer/proxy/rings.go b/pkg/relayer/proxy/rings.go index 59a19ae70..86651247d 100644 --- a/pkg/relayer/proxy/rings.go +++ b/pkg/relayer/proxy/rings.go @@ -6,9 +6,12 @@ package proxy import ( "context" "fmt" + "log" ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" ringtypes "github.com/athanorlabs/go-dleq/types" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" ring "github.com/noot/ring-go" @@ -28,7 +31,10 @@ func (rp *relayerProxy) getRingForAppAddress(ctx context.Context, appAddress str var err error if !ok { // if the ring is not in the cache, get it from the application module + log.Printf("DEBUG: Ring not found in cache for %s, fetching from application module...", appAddress) points, err = rp.getDelegatedPubKeysForAddress(ctx, appAddress) + } else { + log.Printf("DEBUG: Ring found in cache for %s", appAddress) } if err != nil { return nil, err @@ -50,8 +56,8 @@ func (rp *relayerProxy) getDelegatedPubKeysForAddress( ctx context.Context, appAddress string, ) ([]ringtypes.Point, error) { - rp.ringCacheMutex.RLock() - defer rp.ringCacheMutex.RUnlock() + rp.ringCacheMutex.Lock() + defer rp.ringCacheMutex.Unlock() // get the application's on chain state req := &apptypes.QueryGetApplicationRequest{Address: appAddress} @@ -101,8 +107,11 @@ func (rp *relayerProxy) addressesToPoints(ctx context.Context, addresses []strin if err != nil { return nil, fmt.Errorf("unable to get account for address: %s [%w]", addr, err) } - acc := new(accounttypes.BaseAccount) - if err := acc.Unmarshal(pubKeyRes.Account.Value); err != nil { + var acc accounttypes.AccountI + reg := codectypes.NewInterfaceRegistry() + accounttypes.RegisterInterfaces(reg) + cdc := codec.NewProtoCodec(reg) + if err := cdc.UnpackAny(pubKeyRes.Account, &acc); err != nil { return nil, fmt.Errorf("unable to deserialise account for address: %s [%w]", addr, err) } key := acc.GetPubKey() diff --git a/pkg/relayer/proxy/server_builder.go b/pkg/relayer/proxy/server_builder.go index a5abc47bd..3846ac3af 100644 --- a/pkg/relayer/proxy/server_builder.go +++ b/pkg/relayer/proxy/server_builder.go @@ -2,6 +2,8 @@ package proxy import ( "context" + "log" + "net/url" "github.com/pokt-network/poktroll/pkg/relayer" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" @@ -13,7 +15,12 @@ import ( // is responsible for listening for incoming relay requests and relaying them to the supported proxied service. func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error { // Get the supplier address from the keyring - supplierAddress, err := rp.keyring.Key(rp.signingKeyName) + supplierKey, err := rp.keyring.Key(rp.signingKeyName) + if err != nil { + return err + } + + supplierAddress, err := supplierKey.GetAddress() if err != nil { return err } @@ -32,19 +39,30 @@ func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error { for _, serviceConfig := range services { service := serviceConfig.Service proxiedServicesEndpoints := rp.proxiedServicesEndpoints[service.Id] - serviceEndpoints := make([]relayer.RelayServer, len(serviceConfig.Endpoints)) + var serviceEndpoints []relayer.RelayServer for _, endpoint := range serviceConfig.Endpoints { + url, err := url.Parse(endpoint.Url) + if err != nil { + return err + } + supplierEndpointHost := url.Host + var server relayer.RelayServer + log.Printf( + "INFO: starting relay server for service %s at endpoint %s", + service.Id, endpoint.Url, + ) + // Switch to the RPC type to create the appropriate RelayServer switch endpoint.RpcType { case sharedtypes.RPCType_JSON_RPC: server = NewJSONRPCServer( service, - endpoint, + supplierEndpointHost, proxiedServicesEndpoints, - rp.servedRelaysProducer, + rp.servedRelaysPublishCh, rp, ) default: diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go index 712e7a9e5..9401b4af7 100644 --- a/pkg/relayer/session/claim.go +++ b/pkg/relayer/session/claim.go @@ -106,6 +106,9 @@ func (rs *relayerSessionsManager) newMapClaimSessionFn( return either.Error[relayer.SessionTree](err), false } + latestBlock := rs.blockClient.LatestBlock(ctx) + log.Printf("INFO: currentBlock: %d, submitting claim", latestBlock.Height()+1) + sessionHeader := session.GetSessionHeader() if err := rs.supplierClient.CreateClaim(ctx, *sessionHeader, claimRoot); err != nil { failedCreateClaimSessionsPublishCh <- session diff --git a/pkg/relayer/session/proof.go b/pkg/relayer/session/proof.go index 4a7c415aa..00f9b3df5 100644 --- a/pkg/relayer/session/proof.go +++ b/pkg/relayer/session/proof.go @@ -30,8 +30,7 @@ func (rs *relayerSessionsManager) submitProofs( rs.mapWaitForEarliestSubmitProofHeight, ) - failedSubmitProofSessionsObs, failedSubmitProofSessionsPublishCh := - channel.NewObservable[relayer.SessionTree]() + failedSubmitProofSessionsObs, failedSubmitProofSessionsPublishCh := channel.NewObservable[relayer.SessionTree]() // Map sessionsWithOpenProofWindow to a new observable of an either type, // populated with the session or an error, which is notified after the session @@ -74,7 +73,7 @@ func (rs *relayerSessionsManager) waitForEarliestSubmitProofHeight( // + claimproofparams.GovSubmitProofWindowStartHeightOffset // we wait for submitProofWindowStartHeight to be received before proceeding since we need its hash - log.Printf("waiting and blocking for global earliest proof submission submitProofWindowStartBlock height: %d", submitProofWindowStartHeight) + log.Printf("INFO: waiting and blocking for global earliest proof submission submitProofWindowStartBlock height: %d", submitProofWindowStartHeight) submitProofWindowStartBlock := rs.waitForBlock(ctx, submitProofWindowStartHeight) earliestSubmitProofHeight := protocol.GetEarliestSubmitProofHeight(submitProofWindowStartBlock) @@ -100,7 +99,7 @@ func (rs *relayerSessionsManager) newMapProveSessionFn( return either.Error[relayer.SessionTree](err), false } - log.Printf("currentBlock: %d, submitting proof", latestBlock.Height()+1) + log.Printf("INFO: currentBlock: %d, submitting proof", latestBlock.Height()+1) // SubmitProof ensures on-chain proof inclusion so we can safely prune the tree. if err := rs.supplierClient.SubmitProof( ctx, diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 4a444dfd0..08ea0f962 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -45,13 +45,21 @@ type relayerSessionsManager struct { } // NewRelayerSessions creates a new relayerSessions. +// +// Required dependencies: +// - client.BlockClient +// - client.SupplierClient +// +// Available options: +// - WithStoresDirectory func NewRelayerSessions( ctx context.Context, deps depinject.Config, opts ...relayer.RelayerSessionsManagerOption, ) (relayer.RelayerSessionsManager, error) { rs := &relayerSessionsManager{ - sessionsTrees: make(sessionsTreesMap), + sessionsTrees: make(sessionsTreesMap), + sessionsTreesMu: &sync.Mutex{}, } if err := depinject.Inject( @@ -79,10 +87,11 @@ func NewRelayerSessions( return rs, nil } -// Start iterates over the session trees at the end of each, respective, session. +// Start maps over the session trees at the end of each, respective, session. // The session trees are piped through a series of map operations which progress // them through the claim/proof lifecycle, broadcasting transactions to the // network as necessary. +// It IS NOT BLOCKING as map operations run in their own goroutines. func (rs *relayerSessionsManager) Start(ctx context.Context) { // Map eitherMinedRelays to a new observable of an error type which is // notified if an error was encountered while attempting to add the relay to @@ -113,9 +122,6 @@ func (rs *relayerSessionsManager) InsertRelays(relays observable.Observable[*rel // ensureSessionTree returns the SessionTree for a given session. // If no tree for the session exists, a new SessionTree is created before returning. func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes.SessionHeader) (relayer.SessionTree, error) { - rs.sessionsTreesMu.Lock() - defer rs.sessionsTreesMu.Unlock() - sessionsTrees, ok := rs.sessionsTrees[sessionHeader.SessionEndBlockHeight] // If there is no map for sessions at the sessionEndHeight, create one. @@ -128,8 +134,9 @@ func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes. sessionTree, ok := sessionsTrees[sessionHeader.SessionId] // If the sessionTree does not exist, create it. + var err error if !ok { - sessionTree, err := NewSessionTree(sessionHeader, rs.storesDirectory, rs.removeFromRelayerSessions) + sessionTree, err = NewSessionTree(sessionHeader, rs.storesDirectory, rs.removeFromRelayerSessions) if err != nil { return nil, err } @@ -146,12 +153,20 @@ func (rs *relayerSessionsManager) mapBlockToSessionsToClaim( _ context.Context, block client.Block, ) (sessionTrees []relayer.SessionTree, skip bool) { + rs.sessionsTreesMu.Lock() + defer rs.sessionsTreesMu.Unlock() + // Check if there are sessions that need to enter the claim/proof phase // as their end block height was the one before the last committed block. // Iterate over the sessionsTrees map to get the ones that end at a block height // lower than the current block height. for endBlockHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees { - if endBlockHeight < block.Height() { + // TODO_BLOCKER(@red-0ne): We need this to be == instead of <= because we don't want to keep sending + // the same session while waiting the next step. This does not address the case + // where the block client misses the target block which should be handled by the + // retry mechanism. See the discussion in the following GitHub thread for next + // steps: https://github.com/pokt-network/poktroll/pull/177/files?show-viewed-files=true&file-filters%5B%5D=#r1391957041 + if endBlockHeight == block.Height() { // Iterate over the sessionsTrees that end at this block height (or // less) and add them to the list of sessionTrees to be published. for _, sessionTree := range sessionsTreesEndingAtBlockHeight { @@ -169,7 +184,7 @@ func (rs *relayerSessionsManager) removeFromRelayerSessions(sessionHeader *sessi sessionsTreesEndingAtBlockHeight, ok := rs.sessionsTrees[sessionHeader.SessionEndBlockHeight] if !ok { - log.Printf("no session tree found for sessions ending at height %d", sessionHeader.SessionEndBlockHeight) + log.Printf("DEBUG: no session tree found for sessions ending at height %d", sessionHeader.SessionEndBlockHeight) return } @@ -214,16 +229,18 @@ func (rs *relayerSessionsManager) mapAddRelayToSessionTree( _ context.Context, relay *relayer.MinedRelay, ) (_ error, skip bool) { + rs.sessionsTreesMu.Lock() + defer rs.sessionsTreesMu.Unlock() // ensure the session tree exists for this relay sessionHeader := relay.GetReq().GetMeta().GetSessionHeader() smst, err := rs.ensureSessionTree(sessionHeader) if err != nil { - log.Printf("failed to ensure session tree: %s\n", err) + log.Printf("ERROR: failed to ensure session tree: %s\n", err) return err, false } if err := smst.Update(relay.Hash, relay.Bytes, 1); err != nil { - log.Printf("failed to update smt: %s\n", err) + log.Printf("ERROR: failed to update smt: %s\n", err) return err, false } diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index fb27ff307..f9f03eaa8 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -68,8 +68,8 @@ func NewSessionTree( storePath := filepath.Join(storesDirectory, sessionHeader.SessionId) // Make sure storePath does not exist when creating a new SessionTree - if _, err := os.Stat(storePath); !os.IsNotExist(err) { - return nil, ErrSessionTreeUndefinedStoresDirectory + if _, err := os.Stat(storePath); err != nil && !os.IsNotExist(err) { + return nil, ErrSessionTreeStorePathExists.Wrapf("storePath: %q", storePath) } treeStore, err := smt.NewKVStore(storePath) @@ -86,6 +86,7 @@ func NewSessionTree( storePath: storePath, treeStore: treeStore, tree: tree, + sessionMu: &sync.Mutex{}, removeFromRelayerSessions: removeFromRelayerSessions, } diff --git a/proto/pocket/service/relay.proto b/proto/pocket/service/relay.proto index 7c9e888e4..c9f0d5d77 100644 --- a/proto/pocket/service/relay.proto +++ b/proto/pocket/service/relay.proto @@ -34,13 +34,18 @@ message RelayRequest { } } +// TODO_TECHDEBT(#189, @h5law): See discussion related to #189 on how/why JSONRPC should be refactored altogether. + // JSONRPCRequestPayload contains the payload for a JSON-RPC request. // See https://www.jsonrpc.org/specification#request_object for more details. message JSONRPCRequestPayload { - bytes id = 1; // Identifier established by the Client to create context for the request. + uint32 id = 1; // Identifier established by the Client to create context for the request. string jsonrpc = 2; // Version of JSON-RPC. Must be exactly "2.0". string method = 3; // Method being invoked on the server. - map parameters = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures + // TODO_TECHDEBT(#126): Make params a `oneof` of a list or map per the JSON-RPC specifications + // should they be a list of maps? + //map params = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures + repeated string params = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures } // RESTRequestType represents the type of REST request. @@ -82,9 +87,9 @@ message RelayResponseMetadata { // JSONRPCResponsePayload contains the response details for a JSON-RPC relay. // See www.jsonrpc.org/specification for more details. message JSONRPCResponsePayload { - bytes id = 1; // Identifier established by the Client to link the response back to the request. + uint32 id = 1; // Identifier established by the Client to link the response back to the request. string jsonrpc = 2; // Version of JSON-RPC. Must be exactly "2.0". - bytes result = 3; // Response result payload. + string result = 3; // Response result payload. JSONRPCResponseError error = 4; // Error message, if any. Can be nil. } diff --git a/testutil/testclient/testtx/context.go b/testutil/testclient/testtx/context.go index 35b3dfb71..60412baa7 100644 --- a/testutil/testclient/testtx/context.go +++ b/testutil/testclient/testtx/context.go @@ -17,6 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/pkg/client" @@ -264,7 +265,8 @@ func NewAnyTimesTxTxContext( require.NoError(t, err) require.NotEmpty(t, txFactory) - txCtxDeps := depinject.Supply(txFactory, clientCtx) + txClientCtx := relayer.TxClientContext(clientCtx) + txCtxDeps := depinject.Supply(txFactory, txClientCtx) txCtx, err := tx.NewTxContext(txCtxDeps) require.NoError(t, err) txCtxMock := mockclient.NewMockTxContext(ctrl)