Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Working SDK with AppGateServer integration #241

Merged
merged 11 commits into from
Dec 9, 2023
2 changes: 2 additions & 0 deletions pkg/appgateserver/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ func setupAppGateServerDependencies(
config.NewSupplyQueryClientContextFn(queryNodeURL.String()), // leaf
config.NewSupplyAccountQuerierFn(), // leaf
config.NewSupplyApplicationQuerierFn(), // leaf
config.NewSupplySessionQuerierFn(), // leaf
config.NewSupplyRingCacheFn(),
config.NewSupplyPOKTRollSDKFn(queryNodeURL, appGateConfig.SigningKey),
}

return config.SupplyConfig(ctx, cmd, supplierFuncs)
Expand Down
35 changes: 12 additions & 23 deletions pkg/appgateserver/endpoint_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package appgateserver

import (
"context"
"log"
"net/url"

sessiontypes "github.com/pokt-network/poktroll/x/session/types"
"github.com/pokt-network/poktroll/pkg/sdk"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

Expand All @@ -18,29 +16,20 @@ func (app *appGateServer) getRelayerUrl(
ctx context.Context,
serviceId string,
rpcType sharedtypes.RPCType,
session *sessiontypes.Session,
) (supplierUrl *url.URL, supplierAddress string, err error) {
for _, supplier := range session.Suppliers {
for _, service := range supplier.Services {
// Skip services that don't match the requested serviceId.
if service.Service.Id != serviceId {
continue
}
supplierEndpoints []*sdk.SupplierEndpoint,
) (supplierEndpoint *sdk.SupplierEndpoint, err error) {
for _, supplierEndpoint := range supplierEndpoints {
// Skip services that don't match the requested serviceId.
if supplierEndpoint.Header.Service.Id != serviceId {
continue
}

for _, endpoint := range service.Endpoints {
// Return the first endpoint url that matches the request's RpcType.
if endpoint.RpcType == rpcType {
supplierUrl, err := url.Parse(endpoint.Url)
if err != nil {
log.Printf("ERROR: error parsing url: %s", err)
continue
}
return supplierUrl, supplier.Address, nil
}
}
// Return the first endpoint url that matches the request's RpcType.
if supplierEndpoint.RpcType == rpcType {
return supplierEndpoint, nil
}
}

// Return an error if no relayer endpoints were found.
return nil, "", ErrAppGateNoRelayEndpoints
return nil, ErrAppGateNoRelayEndpoints
}
16 changes: 6 additions & 10 deletions pkg/appgateserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package appgateserver
import sdkerrors "cosmossdk.io/errors"

var (
codespace = "appgateserver"
ErrAppGateInvalidRelayResponseSignature = sdkerrors.Register(codespace, 1, "invalid relay response signature")
ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 2, "no relay endpoints found")
ErrAppGateInvalidRequestURL = sdkerrors.Register(codespace, 3, "invalid request URL")
ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 4, "missing application address")
ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 5, "missing app client signing information")
ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 6, "missing app client listening endpoint")
ErrAppGateEmptyRelayResponseMeta = sdkerrors.Register(codespace, 7, "empty relay response metadata")
ErrAppGateEmptyRelayResponseSignature = sdkerrors.Register(codespace, 8, "empty relay response signature")
ErrAppGateHandleRelay = sdkerrors.Register(codespace, 9, "internal error handling relay request")
codespace = "appgateserver"
ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 1, "no relay endpoints found")
ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 2, "missing application address")
ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 3, "missing app client signing information")
ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 4, "missing app client listening endpoint")
ErrAppGateHandleRelay = sdkerrors.Register(codespace, 5, "internal error handling relay request")
)
104 changes: 18 additions & 86 deletions pkg/appgateserver/server.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
package appgateserver

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strings"
"sync"

"cosmossdk.io/depinject"
ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1"
ringtypes "github.com/athanorlabs/go-dleq/types"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
"github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"

"github.com/pokt-network/poktroll/pkg/client"

querytypes "github.com/pokt-network/poktroll/pkg/client/query/types"
"github.com/pokt-network/poktroll/pkg/crypto"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
"github.com/pokt-network/poktroll/pkg/sdk"
)

// SigningInformation is a struct that holds information related to the signing
Expand All @@ -35,10 +27,6 @@ type SigningInformation struct {
// private key used to sign relay requests.
SigningKeyName string

// SigningKey is the scalar point on the appropriate curve corresponding to the
// signer's private key, and is used to sign relay requests via a ring signature
SigningKey ringtypes.Scalar

// AppAddress is the address of the application that the server is serving if
// If it is nil, then the application address must be included in each request via a query parameter.
AppAddress string
Expand All @@ -54,61 +42,34 @@ type appGateServer struct {
// signing information holds the signing key and application address for the server
signingInformation *SigningInformation

// ringCache is used to obtain and store the ring for the application.
ringCache crypto.RingCache

// clientCtx is the client context for the application.
// It is used to query for the application's account to unmarshal the supplier's account
// and get the public key to verify the relay response signature.
clientCtx querytypes.Context

// sessionQuerier is the querier for the session module.
// It used to get the current session for the application given a requested service.
sessionQuerier sessiontypes.QueryClient

// sessionMu is a mutex to protect currentSession map reads and and updates.
sessionMu sync.RWMutex

// currentSessions is the current session for the application given a block height.
// It is updated by the goListenForNewSessions goroutine.
currentSessions map[string]*sessiontypes.Session

// accountQuerier is the querier for the account module.
// It is used to get the the supplier's public key to verify the relay response signature.
accountQuerier client.AccountQueryClient

// blockClient is the client for the block module.
// It is used to get the current block height to query for the current session.
blockClient client.BlockClient
// sdk is the POKTRollSDK that the appGateServer uses to query for the current session
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// and send relay requests to the supplier.
sdk sdk.POKTRollSDK

// listeningEndpoint is the endpoint that the appGateServer will listen on.
listeningEndpoint *url.URL

// server is the HTTP server that will be used capture application requests
// so that they can be signed and relayed to the supplier.
server *http.Server

// accountCache is a cache of the supplier accounts that has been queried
// TODO_TECHDEBT: Add a size limit to the cache.
supplierAccountCache map[string]cryptotypes.PubKey
}

// NewAppGateServer creates a new appGateServer instance with the given dependencies.
func NewAppGateServer(
deps depinject.Config,
opts ...appGateServerOption,
) (*appGateServer, error) {
app := &appGateServer{
currentSessions: make(map[string]*sessiontypes.Session),
supplierAccountCache: make(map[string]cryptotypes.PubKey),
}
app := &appGateServer{}
Olshansk marked this conversation as resolved.
Show resolved Hide resolved

if err := depinject.Inject(
deps,
&app.clientCtx,
&app.blockClient,
&app.accountQuerier,
&app.ringCache,
&app.sdk,
); err != nil {
return nil, err
}
Expand All @@ -134,18 +95,6 @@ func NewAppGateServer(
app.signingInformation.AppAddress = appAddress.String()
}

// Convert the key record to a private key and return the scalar
// point on the secp256k1 curve that it corresponds to.
// If the key is not a secp256k1 key, this will return an error.
signingKey, err := recordLocalToScalar(keyRecord.GetLocal())
if err != nil {
return nil, fmt.Errorf("failed to convert private key to scalar: %w", err)
}
app.signingInformation.SigningKey = signingKey

clientCtx := cosmosclient.Context(app.clientCtx)

app.sessionQuerier = sessiontypes.NewQueryClient(clientCtx)
app.server = &http.Server{Addr: app.listeningEndpoint.Host}

return app, nil
Expand Down Expand Up @@ -192,35 +141,39 @@ func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Re
serviceId := strings.Split(path, "/")[1]

// Read the request body bytes.
payloadBz, err := io.ReadAll(request.Body)
requestPayloadBz, err := io.ReadAll(request.Body)
if err != nil {
app.replyWithError(
payloadBz,
requestPayloadBz,
writer,
ErrAppGateHandleRelay.Wrapf("reading relay request body: %s", err),
)
log.Printf("ERROR: failed reading relay request body: %s", err)
return
}
log.Printf("DEBUG: relay request body: %s", string(payloadBz))
log.Printf("DEBUG: relay request body: %s", string(requestPayloadBz))

// Determine the application address.
appAddress := app.signingInformation.AppAddress
if appAddress == "" {
appAddress = request.URL.Query().Get("senderAddr")
}
if appAddress == "" {
app.replyWithError(payloadBz, writer, ErrAppGateMissingAppAddress)
app.replyWithError(requestPayloadBz, writer, ErrAppGateMissingAppAddress)
log.Print("ERROR: no application address provided")
return
}

// Put the request body bytes back into the request body.
request.Body = io.NopCloser(bytes.NewBuffer(requestPayloadBz))

// TODO(@h5law, @red0ne): Add support for asynchronous relays, and switch on
// the request type here.
// TODO_RESEARCH: Should this be started in a goroutine, to allow for
// concurrent requests from numerous applications?
if err := app.handleSynchronousRelay(ctx, appAddress, serviceId, payloadBz, request, writer); err != nil {
if err := app.handleSynchronousRelay(ctx, appAddress, serviceId, requestPayloadBz, request, writer); err != nil {
// Reply with an error response if there was an error handling the relay.
app.replyWithError(payloadBz, writer, err)
app.replyWithError(requestPayloadBz, writer, err)
log.Printf("ERROR: failed handling relay: %s", err)
return
}
Expand All @@ -239,25 +192,4 @@ func (app *appGateServer) validateConfig() error {
return nil
}

// recordLocalToScalar converts the private key obtained from a
// key record to a scalar point on the secp256k1 curve
func recordLocalToScalar(local *keyring.Record_Local) (ringtypes.Scalar, error) {
if local == nil {
return nil, fmt.Errorf("cannot extract private key from key record: nil")
}
priv, ok := local.PrivKey.GetCachedValue().(cryptotypes.PrivKey)
if !ok {
return nil, fmt.Errorf("cannot extract private key from key record: %T", local.PrivKey.GetCachedValue())
}
if _, ok := priv.(*secp256k1.PrivKey); !ok {
return nil, fmt.Errorf("unexpected private key type: %T, want %T", priv, &secp256k1.PrivKey{})
}
crv := ring_secp256k1.NewCurve()
privKey, err := crv.DecodeToScalar(priv.Bytes())
if err != nil {
return nil, fmt.Errorf("failed to decode private key: %w", err)
}
return privKey, nil
}

type appGateServerOption func(*appGateServer)
47 changes: 0 additions & 47 deletions pkg/appgateserver/session.go

This file was deleted.

Loading