diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index 731fd62f7..0dc71ec48 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -40,6 +40,7 @@ type E2EApp struct { // NewE2EApp creates a new E2EApp instance with integration.App, gRPC, and WebSocket servers func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp { t.Helper() + ctx := context.Background() // Initialize and start gRPC server creds := insecure.NewCredentials() @@ -88,7 +89,7 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp resultEventChan: make(chan *coretypes.ResultEvent), } - mux.Handle(http.MethodPost, rootPattern, newPostHandler(client, e2eApp)) + mux.Handle(http.MethodPost, rootPattern, newPostHandler(ctx, client, e2eApp)) go func() { if err := e2eApp.grpcServer.Serve(grpcListener); err != nil { diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go index f4fdf2ddf..fdb0af469 100644 --- a/testutil/e2e/app_test.go +++ b/testutil/e2e/app_test.go @@ -1,24 +1,16 @@ package e2e import ( - "bytes" - "context" - "io" - "net/http" "testing" - "time" "cosmossdk.io/depinject" "cosmossdk.io/math" - cometrpctypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/cometbft/cometbft/types" + comethttp "github.com/cometbft/cometbft/rpc/client/http" cosmostx "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/crypto/hd" "github.com/cosmos/cosmos-sdk/crypto/keyring" cosmostypes "github.com/cosmos/cosmos-sdk/types" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" - "github.com/golang/mock/gomock" - "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -29,84 +21,15 @@ import ( "github.com/pokt-network/poktroll/pkg/client/query" "github.com/pokt-network/poktroll/pkg/client/tx" txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" - "github.com/pokt-network/poktroll/testutil/integration" - "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/testutil/testclient" gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) -func TestGRPCServer2(t *testing.T) { - grpcServer := grpc.NewServer() - //gwKeeper := gatewaykeeper.NewKeeper() - //gwSvc := gatewaykeeper.NewMsgServerImpl(gwKeeper) - - app := integration.NewCompleteIntegrationApp(t) - app.RegisterGRPCServer(grpcServer) - - mux := runtime.NewServeMux() - err := http.ListenAndServe(":42070", mux) - require.NoError(t, err) - //gatewaytypes.RegisterMsgServer(grpcServer, gwSvc) - //gatewaytypes.RegisterMsgServer(app.MsgServiceRouter(), gwSvc) - - //gatewaytypes.RegisterQueryHandlerFromEndpoint() - - //reflectionService, err := services.NewReflectionService() - //require.NoError(t, err) - - //desc, err := reflectionService.FileDescriptors(nil, nil) - //require.NoError(t, err) - - //app := integration.NewCompleteIntegrationApp(t) - //grpcServer.RegisterService(desc, app.MsgServiceRouter()) -} - -func TestSanity(t *testing.T) { - app := integration.NewCompleteIntegrationApp(t) - - //app.Query(nil, &authtypes.QueryAccountRequest{ - // Address: "pokt1h04g6njyuv03dhd74a73pyzeadmd8dk7l9tsk8", - //}) - - //app.Query(nil, types2.RequestQuery{ - // Data: nil, - // Path: "", - // Height: 0, - // Prove: false, - //}) - - ctrl := gomock.NewController(t) - blockQueryClient := mockclient.NewMockBlockQueryClient(ctrl) - blockQueryClient.EXPECT(). - Block(gomock.Any(), gomock.Any()). - DoAndReturn( - func(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error) { - blockResultMock := &cometrpctypes.ResultBlock{ - Block: &types.Block{ - Header: types.Header{ - Height: 1, - }, - }, - } - return blockResultMock, nil - }, - ).AnyTimes() - deps := depinject.Supply(app.QueryHelper(), blockQueryClient) - sharedClient, err := query.NewSharedQuerier(deps) - require.NoError(t, err) - - params, err := sharedClient.GetParams(app.GetSdkCtx()) - require.NoError(t, err) - - t.Logf("shared params: %+v", params) -} - func TestNewE2EApp(t *testing.T) { - initialHeight := int64(7553) - // TODO_IN_THIS_COMMIT: does this 👆 need to be reconciled with the internal height of app? - app := NewE2EApp(t) + // Construct dependencies... keyRing := keyring.NewInMemory(app.GetCodec()) rec, err := keyRing.NewAccount( "gateway2", @@ -120,34 +43,13 @@ func TestNewE2EApp(t *testing.T) { gateway2Addr, err := rec.GetAddress() require.NoError(t, err) - // Fund gateway2 account. - _, err = app.RunMsg(t, &banktypes.MsgSend{ - FromAddress: app.GetFaucetBech32(), - ToAddress: gateway2Addr.String(), - Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), - }) + blockQueryClient, err := comethttp.New("tcp://127.0.0.1:42070", "/websocket") require.NoError(t, err) - ctrl := gomock.NewController(t) - blockQueryClient := mockclient.NewMockBlockQueryClient(ctrl) - blockQueryClient.EXPECT(). - Block(gomock.Any(), gomock.Any()). - DoAndReturn( - func(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error) { - blockResultMock := &cometrpctypes.ResultBlock{ - Block: &types.Block{ - Header: types.Header{ - Height: initialHeight, - }, - }, - } - return blockResultMock, nil - }, - ).AnyTimes() - creds := insecure.NewCredentials() grpcConn, err := grpc.NewClient("127.0.0.1:42069", grpc.WithTransportCredentials(creds)) require.NoError(t, err) + deps := depinject.Supply(grpcConn, blockQueryClient) sharedQueryClient, err := query.NewSharedQuerier(deps) @@ -155,18 +57,15 @@ func TestNewE2EApp(t *testing.T) { sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx()) require.NoError(t, err) - - t.Logf("shared params: %+v", sharedParams) + require.Equal(t, sharedtypes.DefaultParams(), *sharedParams) eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:6969/websocket") - //eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:26657/websocket") deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) blockClient, err := block.NewBlockClient(app.GetSdkCtx(), deps) require.NoError(t, err) - // TODO_IN_THIS_COMMIT: NOT localnet flagset NOR context, should be - // configured to match the E2E app listeners. flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + // DEV_NOTE: DO NOT use the clientCtx as a grpc.ClientConn as it bypasses E2EApp integrations. clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) @@ -181,8 +80,20 @@ func TestNewE2EApp(t *testing.T) { txClient, err := tx.NewTxClient(app.GetSdkCtx(), deps, tx.WithSigningKeyName("gateway2")) require.NoError(t, err) - time.Sleep(time.Second * 1) + // Assert that no gateways are staked. + gatewayQueryClient := gatewaytypes.NewQueryClient(grpcConn) + allGatewaysRes, err := gatewayQueryClient.AllGateways(app.GetSdkCtx(), &gatewaytypes.QueryAllGatewaysRequest{}) + require.Equal(t, 0, len(allGatewaysRes.Gateways)) + // Fund gateway2 account. + _, err = app.RunMsg(t, &banktypes.MsgSend{ + FromAddress: app.GetFaucetBech32(), + ToAddress: gateway2Addr.String(), + Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), + }) + require.NoError(t, err) + + // Stake gateway2. eitherErr := txClient.SignAndBroadcast( app.GetSdkCtx(), gatewaytypes.NewMsgStakeGateway( @@ -191,231 +102,12 @@ func TestNewE2EApp(t *testing.T) { ), ) - // TODO_IN_THIS_COMMIT: signal to the WS server to send another block result event... - //app.NextBlock(t) - err, errCh := eitherErr.SyncOrAsyncError() require.NoError(t, err) require.NoError(t, <-errCh) -} - -func TestGRPCServer(t *testing.T) { - app := NewE2EApp(t) - t.Cleanup(func() { - app.Close() - }) - - creds := insecure.NewCredentials() - grpcConn, err := grpc.NewClient("127.0.0.1:42069", grpc.WithTransportCredentials(creds)) - require.NoError(t, err) - //dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") - require.NoError(t, err) - - //req := gatewaytypes.QueryGetGatewayRequest{ - // Address: "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", - //} - //res := &abci.ResponseQuery{} - - //grpcConn.Invoke(context.Background(), "abci_query", req, res) - - ctrl := gomock.NewController(t) - blockQueryClient := mockclient.NewMockBlockQueryClient(ctrl) - blockQueryClient.EXPECT(). - Block(gomock.Any(), gomock.Any()). - DoAndReturn( - func(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error) { - //time.Sleep(time.Second * 100) - blockResultMock := &cometrpctypes.ResultBlock{ - Block: &types.Block{ - Header: types.Header{ - Height: 1, - }, - }, - } - return blockResultMock, nil - }, - ).AnyTimes() - - deps := depinject.Supply(grpcConn, blockQueryClient) - - sharedQueryClient, err := query.NewSharedQuerier(deps) - require.NoError(t, err) - - //res := new(gatewaytypes.QueryGetGatewayResponse) - // - //err = grpcConn.Invoke(context.Background(), "/poktroll.gateway.Query/Gateway", anyReq, res) - //dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") - //require.NoError(t, err) - - //req := &abci.RequestQuery{ - // Data: dataHex, - // Path: "/cosmos.auth.v1beta1.Query/Account", - // Height: 0, - // Prove: false, - //} - // - //err = grpcConn.Invoke(context.Background(), "abci_query", req, res) - ////err = grpcConn.Invoke(context.Background(), "abci_query", req, res) - //require.NoError(t, err) - - require.NoError(t, err) - sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx()) - require.NoError(t, err) - - t.Logf("shared params: %+v", sharedParams) - - //"method" : "abci_query", - //"params" : { - // "data" : "0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A", - // "height" : "0", - // "path" : "/cosmos.auth.v1beta1.Query/Account", - // "prove" : false - //} - - //"method" : "broadcast_tx_async", - //"params" : { - // "tx" : "CmsKZgohL3Bva3Ryb2xsLmdhdGV3YXkuTXNnU3Rha2VHYXRld2F5EkEKK3Bva3QxNXczZmhmeWMwbHR0djdyNTg1ZTJuY3BmNnQya2w5dWg4cnNueXoSEgoFdXBva3QSCTEwMDAwMDAwMRiGOxJYCk4KRgofL2Nvc21vcy5jcnlwdG8uc2VjcDI1NmsxLlB1YktleRIjCiEDZo2bY9XquUsFljtW/OKWVCDhYFf7NbidN4Y99VQ9438SBAoCCAESBhCqoYLJAhpAw5e7iJN5SpFit3fftxnZY7EDiFqupi7XEL3sUyeV0IBSQv2JZ7Cdu0dCG0yEVgj0xarkPi7dR10pNDL1gcUJxw==" - //} -} - -func TestSanity3(t *testing.T) { - app := NewE2EApp(t) - t.Cleanup(func() { - app.Close() - }) - - time.Sleep(time.Second * 1) - - client := http.DefaultClient - //res, err := client.Do(&http.Request{ - // Method: http.MethodPost, - // URL: &url.URL{Scheme: "http", Host: "127.0.0.1:42070", Path: ""}, - // Body: io.NopCloser(bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","id":"0","method":"abci_query","params":{"path":"/cosmos.auth.v1beta1.Query/Account","data":"0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A","prove":false,"height":"0"}}`))), - //}) - res, err := client.Post( - "http://127.0.0.1:42070/", - "application/json", - bytes.NewBuffer([]byte(`{ - "jsonrpc":"2.0", - "id":"0", - "method":"abci_query", - "params":{"path":"/cosmos.auth.v1beta1.Query/Account", - "data":"0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A", - "prove":false, - "height":"0" - } - }`)), - ) - require.NoError(t, err) - - result, err := io.ReadAll(res.Body) - require.NoError(t, err) - - t.Logf("result: %s", result) -} - -func TestSanity4(t *testing.T) { - ctx := context.Background() - initialHeight := int64(7553) - // TODO_IN_THIS_COMMIT: does this 👆 need to be reconciled with the internal height of app? - - //app := NewE2EApp(t) - - //registry := codectypes.NewInterfaceRegistry() - //cdc := codec.NewProtoCodec(registry) - keyRing := keyring.NewInMemory(testclient.Marshaler) - _, err := keyRing.NewAccount( - "pnf", - "crumble shrimp south strategy speed kick green topic stool seminar track stand rhythm almost bubble pet knock steel pull flag weekend country major blade", - "", - cosmostypes.FullFundraiserPath, - hd.Secp256k1, - ) - require.NoError(t, err) - - //// TODO_IN_THIS_COMMOT: fund gateway2 account. - //_, err = app.RunMsg(t, &banktypes.MsgSend{ - // FromAddress: app.GetFaucetBech32(), - // ToAddress: pnfAddr.String(), - // Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), - //}) - //require.NoError(t, err) - - ctrl := gomock.NewController(t) - blockQueryClient := mockclient.NewMockBlockQueryClient(ctrl) - blockQueryClient.EXPECT(). - Block(gomock.Any(), gomock.Any()). - DoAndReturn( - func(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error) { - //time.Sleep(time.Second * 100) - blockResultMock := &cometrpctypes.ResultBlock{ - Block: &types.Block{ - Header: types.Header{ - Height: initialHeight, - }, - }, - } - return blockResultMock, nil - }, - ).AnyTimes() - //blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:42070") - //blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:26657") - //require.NoError(t, err) - - //creds := insecure.NewCredentials() - //grpcConn := testclient.NewLocalnetClientCtx(t, flagSet).GetClient() - //grpcConn, err := grpc.NewClient("127.0.0.1:42069", grpc.WithTransportCredentials(creds)) - //require.NoError(t, err) - - // TODO_IN_THIS_COMMIT: NOT localnet flagset NOR context, should be - // configured to match the E2E app listeners. - //flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") - flagSet := testclient.NewLocalnetFlagSet(t) - clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) - deps := depinject.Supply(clientCtx, blockQueryClient) - - sharedQueryClient, err := query.NewSharedQuerier(deps) - require.NoError(t, err) - - sharedParams, err := sharedQueryClient.GetParams(ctx) - require.NoError(t, err) - - t.Logf("shared params: %+v", sharedParams) - - eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:26657/websocket") - deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) - blockClient, err := block.NewBlockClient(ctx, deps) - require.NoError(t, err) - - txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) - require.NoError(t, err) - - deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory)) - - //_, txContext := testtx.NewE2ETxContext(t, keyRing, flagSet) - txContext, err := tx.NewTxContext(deps) - require.NoError(t, err) - - deps = depinject.Configs(deps, depinject.Supply(blockClient, txContext)) - txClient, err := tx.NewTxClient(ctx, deps, tx.WithSigningKeyName("pnf")) - require.NoError(t, err) - - time.Sleep(time.Second * 1) - - eitherErr := txClient.SignAndBroadcast( - ctx, - &banktypes.MsgSend{ - FromAddress: "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw", - ToAddress: "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", - Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), - }, - ) - - // TODO_IN_THIS_COMMIT: signal to the WS server to send another block result event... - //app.NextBlock(t) - - err, errCh := eitherErr.SyncOrAsyncError() - require.NoError(t, err) - require.NoError(t, <-errCh) + // Assert that only gateway2 is staked. + allGatewaysRes, err = gatewayQueryClient.AllGateways(app.GetSdkCtx(), &gatewaytypes.QueryAllGatewaysRequest{}) + require.Equal(t, 1, len(allGatewaysRes.Gateways)) + require.Equal(t, "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", allGatewaysRes.Gateways[0].Address) } diff --git a/testutil/e2e/comet.go b/testutil/e2e/comet.go index afd64c127..3babf6f34 100644 --- a/testutil/e2e/comet.go +++ b/testutil/e2e/comet.go @@ -30,14 +30,18 @@ const ( broadcastTxSyncMethod = CometBFTMethod("broadcast_tx_sync") broadcastTxAsyncMethod = CometBFTMethod("broadcast_tx_async") broadcastTxCommitMethod = CometBFTMethod("broadcast_tx_commit") + blockMethod = CometBFTMethod("block") authAccountQueryUri = ServiceMethodUri("/cosmos.auth.v1beta1.Query/Account") ) // handleABCIQuery handles the actual ABCI query logic -func newPostHandler(client gogogrpc.ClientConn, app *E2EApp) runtime.HandlerFunc { +func newPostHandler( + ctx context.Context, + client gogogrpc.ClientConn, + app *E2EApp, +) runtime.HandlerFunc { return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { - ctx := context.Background() // DEV_NOTE: http.Error() automatically sets the Content-Type header to "text/plain". w.Header().Set("Content-Type", "application/json") @@ -62,141 +66,194 @@ func newPostHandler(client gogogrpc.ClientConn, app *E2EApp) runtime.HandlerFunc return } - var response rpctypes.RPCResponse + response := new(rpctypes.RPCResponse) switch CometBFTMethod(req.Method) { // TODO_IN_THIS_COMMIT: extract... case abciQueryMethod: - var ( - resData []byte - height int64 - ) - - pathRaw, hasPath := params["path"] - if !hasPath { - writeErrorResponse(w, req, "missing path param", string(req.Params)) - return + response, err = app.abciQuery(ctx, client, req, params) + if err != nil { + *response = rpctypes.NewRPCErrorResponse(req.ID, 500, err.Error(), "") } - - var path string - if err = json.Unmarshal(pathRaw, &path); err != nil { - writeErrorResponseFromErr(w, req, err) - return + case broadcastTxSyncMethod, broadcastTxAsyncMethod, broadcastTxCommitMethod: + response, err = app.broadcastTx(req, params) + if err != nil { + *response = rpctypes.NewRPCErrorResponse(req.ID, 500, err.Error(), "") } - - switch ServiceMethodUri(path) { - case authAccountQueryUri: - dataRaw, hasData := params["data"] - if !hasData { - writeErrorResponse(w, req, "missing data param", string(req.Params)) - return - } - - data, err := hex.DecodeString(string(bytes.Trim(dataRaw, `"`))) - if err != nil { - writeErrorResponseFromErr(w, req, err) - return - } - - queryReq := new(authtypes.QueryAccountRequest) - if err = queryReq.Unmarshal(data); err != nil { - writeErrorResponseFromErr(w, req, err) - return - } - - var height int64 - heightRaw, hasHeight := params["height"] - if hasHeight { - if err = json.Unmarshal(bytes.Trim(heightRaw, `"`), &height); err != nil { - writeErrorResponseFromErr(w, req, err) - return - } - } - - queryRes := new(authtypes.QueryAccountResponse) - if err = client.Invoke(ctx, path, queryReq, queryRes); err != nil { - writeErrorResponseFromErr(w, req, err) - return - } - - resData, err = queryRes.Marshal() - if err != nil { - writeErrorResponseFromErr(w, req, err) - return - } + case blockMethod: + response, err = app.block(ctx, client, req, params) + if err != nil { + *response = rpctypes.NewRPCErrorResponse(req.ID, 500, err.Error(), "") } + default: + *response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) + } - abciQueryRes := coretypes.ResultABCIQuery{ - Response: types.ResponseQuery{ - //Code: 0, - //Index: 0, - //Key: nil, - Value: resData, - Height: height, - }, - } + if err = json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } +} - response = rpctypes.NewRPCSuccessResponse(req.ID, abciQueryRes) - case broadcastTxSyncMethod, broadcastTxAsyncMethod, broadcastTxCommitMethod: - fmt.Println(">>>> BROADCAST_TX") +// TODO_IN_THIS_COMMIT: godoc... +func (app *E2EApp) abciQuery( + ctx context.Context, + client gogogrpc.ClientConn, + req rpctypes.RPCRequest, + params map[string]json.RawMessage, +) (*rpctypes.RPCResponse, error) { + var ( + resData []byte + height int64 + ) + + pathRaw, hasPath := params["path"] + if !hasPath { + return nil, fmt.Errorf("missing path param: %s", string(req.Params)) + } - var txBz []byte - txRaw, hasTx := params["tx"] - if !hasTx { - writeErrorResponse(w, req, "missing tx param", string(req.Params)) - return - } - if err = json.Unmarshal(txRaw, &txBz); err != nil { - writeErrorResponseFromErr(w, req, err) - return - } + var path string + if err := json.Unmarshal(pathRaw, &path); err != nil { + return nil, err + } - // TODO_CONSIDERATION: more correct implementation of the different - // broadcast_tx methods (i.e. sync, async, commit) is a matter of - // the sequencing of the following: - // - calling the finalize block ABCI method - // - returning the JSON-RPC response - // - emitting websocket event + switch ServiceMethodUri(path) { + case authAccountQueryUri: + dataRaw, hasData := params["data"] + if !hasData { + return nil, fmt.Errorf("missing data param: %s", string(req.Params)) + } - _, finalizeBlockRes, err := app.RunTx(nil, txBz) - if err != nil { - writeErrorResponseFromErr(w, req, err) - return - } + data, err := hex.DecodeString(string(bytes.Trim(dataRaw, `"`))) + if err != nil { + return nil, err + } - // TODO_IN_THIS_COMMIT: something better... - go func() { - // Simulate 1 second block production delay. - time.Sleep(time.Second * 1) - - fmt.Println(">>> emitting ws events") - //app.EmitWSEvents(app.GetSdkCtx().EventManager().Events()) - - // TODO_IMPROVE: If we want/need to support multiple txs per - // block in the future, this will have to be refactored. - app.EmitWSEvents(finalizeBlockRes, txBz) - }() - - // DEV_NOTE: There SHOULD ALWAYS be exactly one tx result so long as - // we're finalizing one tx at a time (single tx blocks). - txRes := finalizeBlockRes.GetTxResults()[0] - - bcastTxRes := coretypes.ResultBroadcastTx{ - Code: txRes.GetCode(), - Data: txRes.GetData(), - Log: txRes.GetLog(), - Codespace: txRes.GetCodespace(), - Hash: comettypes.Tx(txBz).Hash(), + queryReq := new(authtypes.QueryAccountRequest) + if err = queryReq.Unmarshal(data); err != nil { + return nil, err + } + + var height int64 + heightRaw, hasHeight := params["height"] + if hasHeight { + if err = json.Unmarshal(bytes.Trim(heightRaw, `"`), &height); err != nil { + return nil, err } + } - response = rpctypes.NewRPCSuccessResponse(req.ID, bcastTxRes) - default: - response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) + queryRes := new(authtypes.QueryAccountResponse) + if err = client.Invoke(ctx, path, queryReq, queryRes); err != nil { + return nil, err } - if err = json.NewEncoder(w).Encode(response); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + resData, err = queryRes.Marshal() + if err != nil { + return nil, err } } + + abciQueryRes := coretypes.ResultABCIQuery{ + Response: types.ResponseQuery{ + //Code: 0, + //Index: 0, + //Key: nil, + Value: resData, + Height: height, + }, + } + + res := rpctypes.NewRPCSuccessResponse(req.ID, abciQueryRes) + return &res, nil +} + +// TODO_IN_THIS_COMMIT: godoc... +func (app *E2EApp) broadcastTx( + req rpctypes.RPCRequest, + params map[string]json.RawMessage, +) (*rpctypes.RPCResponse, error) { + var txBz []byte + txRaw, hasTx := params["tx"] + if !hasTx { + return nil, fmt.Errorf("missing tx param: %s", string(req.Params)) + } + if err := json.Unmarshal(txRaw, &txBz); err != nil { + return nil, err + } + + // TODO_CONSIDERATION: more correct implementation of the different + // broadcast_tx methods (i.e. sync, async, commit) is a matter of + // the sequencing of the following: + // - calling the finalize block ABCI method + // - returning the JSON-RPC response + // - emitting websocket event + + _, finalizeBlockRes, err := app.RunTx(nil, txBz) + if err != nil { + return nil, err + } + + // TODO_IN_THIS_COMMIT: something better... + go func() { + // Simulate 1 second block production delay. + time.Sleep(time.Second * 1) + + //fmt.Println(">>> emitting ws events") + //app.EmitWSEvents(app.GetSdkCtx().EventManager().Events()) + + // TODO_IMPROVE: If we want/need to support multiple txs per + // block in the future, this will have to be refactored. + app.EmitWSEvents(finalizeBlockRes, txBz) + }() + + // DEV_NOTE: There SHOULD ALWAYS be exactly one tx result so long as + // we're finalizing one tx at a time (single tx blocks). + txRes := finalizeBlockRes.GetTxResults()[0] + + bcastTxRes := coretypes.ResultBroadcastTx{ + Code: txRes.GetCode(), + Data: txRes.GetData(), + Log: txRes.GetLog(), + Codespace: txRes.GetCodespace(), + Hash: comettypes.Tx(txBz).Hash(), + } + + res := rpctypes.NewRPCSuccessResponse(req.ID, bcastTxRes) + return &res, nil +} + +// TODO_IN_THIS_COMMIT: godoc... +func (app *E2EApp) block( + ctx context.Context, + client gogogrpc.ClientConn, + req rpctypes.RPCRequest, + params map[string]json.RawMessage, +) (*rpctypes.RPCResponse, error) { + resultBlock := coretypes.ResultBlock{ + BlockID: app.GetCometBlockID(), + Block: &comettypes.Block{ + Header: comettypes.Header{ + //Version: version.Consensus{}, + ChainID: "poktroll-test", + Height: app.GetSdkCtx().BlockHeight(), + Time: time.Now(), + LastBlockID: app.GetCometBlockID(), + //LastCommitHash: nil, + //DataHash: nil, + //ValidatorsHash: nil, + //NextValidatorsHash: nil, + //ConsensusHash: nil, + //AppHash: nil, + //LastResultsHash: nil, + //EvidenceHash: nil, + //ProposerAddress: nil, + }, + //Data: comettypes.Data{}, + //Evidence: comettypes.EvidenceData{}, + //LastCommit: nil, + }, + } + res := rpctypes.NewRPCSuccessResponse(req.ID, resultBlock) + return &res, nil } // TODO_IN_THIS_COMMIT: godoc... diff --git a/testutil/e2e/ws_server.go b/testutil/e2e/ws_server.go index 7becc8b96..a67f5eacf 100644 --- a/testutil/e2e/ws_server.go +++ b/testutil/e2e/ws_server.go @@ -6,12 +6,12 @@ import ( "net/http" "strings" "testing" + "time" abci "github.com/cometbft/cometbft/abci/types" coretypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" comettypes "github.com/cometbft/cometbft/types" - cosmostypes "github.com/cosmos/cosmos-sdk/types" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" ) @@ -39,6 +39,8 @@ func (app *E2EApp) handleWebSocket(w http.ResponseWriter, r *http.Request) { // handleWebSocketConnection handles messages from a specific WebSocket connection func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { + logger := app.Logger().With("method", "handleWebSocketConnection") + defer func() { app.wsConnMutex.Lock() delete(app.wsConnections, conn) @@ -57,7 +59,7 @@ func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { continue } - // Handle subscribe/unsubscribe requests + // Handle subscription requests. if req.Method == "subscribe" { var params struct { Query string `json:"query"` @@ -70,24 +72,16 @@ func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { app.wsConnections[conn][params.Query] = struct{}{} app.wsConnMutex.Unlock() - // Send subscription response + // Send initial subscription response resp := rpctypes.RPCResponse{ JSONRPC: "2.0", ID: req.ID, - // TODO_IN_THIS_COMMIT: generate a mock result... - //Result: json.RawMessage(mockBlockResultJSON), // DEV_NOTE: Query subscription responses are initially empty; data is sent as subsequent events occur. Result: json.RawMessage("{}"), } - - //time.Sleep(time.Second * 4) - if err = conn.WriteJSON(resp); err != nil { - panic(err) + logger.Error(fmt.Sprintf("writing JSON-RPC response: %s", err)) } - //if err = conn.WriteJSON(resp); err != nil { - // return - //} } } } @@ -97,9 +91,6 @@ func (app *E2EApp) handleResultEvents(t *testing.T) { t.Helper() for event := range app.resultEventChan { - fmt.Printf(">>> WS event: %+v\n", event) - fmt.Printf(">>> num WS conns: %d\n", len(app.wsConnections)) - app.wsConnMutex.RLock() for conn, queries := range app.wsConnections { // Check if connection is subscribed to this event type @@ -117,8 +108,6 @@ func (app *E2EApp) handleResultEvents(t *testing.T) { continue } - fmt.Printf(">>> checking query: %s\n", query) - // DEV_NOTE: An empty request ID is consistent with the cometbft // implementation and is the reason that we MUST use a distinct // websocket connection per query; it's not possible to determine @@ -131,7 +120,6 @@ func (app *E2EApp) handleResultEvents(t *testing.T) { delete(app.wsConnections, conn) app.wsConnMutex.Unlock() app.wsConnMutex.RLock() - continue } } } @@ -158,105 +146,36 @@ func parseQuery(t *testing.T, query string) map[string]string { return queryPartPairs } -//// TODO_IN_THIS_COMMIT: also wrap RunMsgs... -//// TODO_IN_THIS_COMMIT: godoc... -//// Override RunMsg to also emit transaction events via WebSocket -//func (app *E2EApp) RunMsg(t *testing.T, msg cosmostypes.Msg) (tx.MsgResponse, error) { -// msgRes, err := app.App.RunMsg(t, msg) -// if err != nil { -// return nil, err -// } -// -// // Create and emit block event with transaction results -// blockEvent := createBlockEvent(app.GetSdkCtx()) -// app.resultEventChan <- blockEvent -// -// return msgRes, nil -//} - -// createBlockEvent creates a CometBFT-compatible event from transaction results -func createBlockEvent(ctx *cosmostypes.Context) *coretypes.ResultEvent { - // Convert SDK events to map[string][]string format that CometBFT expects - events := make(map[string][]string) - for _, event := range ctx.EventManager().Events() { - // Each event type becomes a key, and its attributes become the values - for _, attr := range event.Attributes { - if events[event.Type] == nil { - events[event.Type] = make([]string, 0) - } - events[event.Type] = append(events[event.Type], string(attr.Value)) - } - } - - return &coretypes.ResultEvent{ - Query: "tm.event='NewBlock'", - Data: map[string]interface{}{ - "height": ctx.BlockHeight(), - "hash": ctx.BlockHeader().LastBlockId.Hash, - "events": events, - // Add other relevant block and transaction data here as needed - }, - Events: events, - } -} - // TODO_IN_THIS_COMMIT: godoc... func (app *E2EApp) EmitWSEvents(finalizeBlockRes *abci.ResponseFinalizeBlock, txBz []byte) { - //resultEvent := &coretypes.ResultEvent{ - // Query: "tm.event='NewBlock'", - // Data: map[string]interface{}{ - // //"height": ctx.BlockHeight(), - // //"hash": ctx.BlockHeader().LastBlockId.Hash, - // //"events": events, - // // Add other relevant block and transaction data here as needed - // }, - // //Events: events, - //} - - //emitEvent := func(event abci.Event, query string) error { - // eventAny, err := codectypes.NewAnyWithValue(&event) - // if err != nil { - // return err - // } - // - // resultEvent := &coretypes.ResultEvent{ - // Query: query, - // Data: eventAny, - // Events: nil, - // } - // - // app.resultEventChan <- resultEvent - // - // return nil - //} - //for _, event := range finalizeBlockRes.GetEvents() { - // // TODO_IN_THIS_COMMIT: reconsider how to populate the queries... - // if err := emitEvent(event, comettypes.EventQueryNewBlock.String()); err != nil { - // app.Logger().Error(err.Error()) - // } - //} - //for _, txResult := range finalizeBlockRes.GetTxResults() { - // for _, event := range txResult.GetEvents() { - // // TODO_IN_THIS_COMMIT: reconsider how to populate the queries... - // if err := emitEvent(event, comettypes.EventQueryTx.String()); err != nil { - // app.Logger().Error(err.Error()) - // } - // } - //} - - // TODO_IN_THIS_COMMIT: necessary? - //app.wsConnMutex.RLock() - //defer app.wsConnMutex.RUnlock() - events := validateAndStringifyEvents(finalizeBlockRes.GetEvents()) // DEV_NOTE: see https://github.com/cometbft/cometbft/blob/v0.38.10/types/event_bus.go#L138 events[comettypes.EventTypeKey] = append(events[comettypes.EventTypeKey], comettypes.EventNewBlock) evtDataNewBlock := comettypes.EventDataNewBlock{ - // TODO_IN_THIS_COMMIT: add block... - Block: nil, + Block: &comettypes.Block{ + Header: comettypes.Header{ + //Version: version.Consensus{}, + ChainID: "poktroll-test", + Height: app.GetSdkCtx().BlockHeight(), + Time: time.Now(), + LastBlockID: app.GetCometBlockID(), + //LastCommitHash: nil, + //DataHash: nil, + //ValidatorsHash: nil, + //NextValidatorsHash: nil, + //ConsensusHash: nil, + //AppHash: nil, + //LastResultsHash: nil, + //EvidenceHash: nil, + //ProposerAddress: nil, + }, + //Data: comettypes.Data{}, + //Evidence: comettypes.EvidenceData{}, + //LastCommit: nil, + }, BlockID: app.GetCometBlockID(), - ResultFinalizeBlock: abci.ResponseFinalizeBlock{}, + ResultFinalizeBlock: *finalizeBlockRes, } // TODO_IN_THIS_COMMIT: comment...