Skip to content

Commit

Permalink
[AppGateServer] feat: Use round-robin endpoint selection strategy (#443)
Browse files Browse the repository at this point in the history
  • Loading branch information
red-0ne authored Mar 22, 2024
1 parent 0da8ba5 commit 833e540
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
30 changes: 25 additions & 5 deletions pkg/appgateserver/endpoint_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

// TODO_IMPROVE: This implements a naive greedy approach that defaults to the
// first available supplier.
// TODO_IMPROVE: Use a more sophisticated endpoint selection strategy.
// Future optimizations (e.g. Quality-of-Service) can be introduced here.
// TODO(@h5law): Look into different endpoint selection depending on their suitability.
// getRelayerUrl gets the URL of the relayer for the given service.
Expand All @@ -18,18 +17,39 @@ func (app *appGateServer) getRelayerUrl(
rpcType sharedtypes.RPCType,
supplierEndpoints []*sdk.SingleSupplierEndpoint,
) (supplierEndpoint *sdk.SingleSupplierEndpoint, err error) {
// Filter out the supplier endpoints that match the requested serviceId.
validSupplierEndpoints := make([]*sdk.SingleSupplierEndpoint, 0, len(supplierEndpoints))

for _, supplierEndpoint := range supplierEndpoints {
// Skip services that don't match the requested serviceId.
if supplierEndpoint.Header.Service.Id != serviceId {
continue
}

// Return the first endpoint url that matches the request's RpcType.
// Collect the endpoints that match the request's RpcType.
if supplierEndpoint.RpcType == rpcType {
return supplierEndpoint, nil
validSupplierEndpoints = append(validSupplierEndpoints, supplierEndpoint)
}
}

// Return an error if no relayer endpoints were found.
return nil, ErrAppGateNoRelayEndpoints
if len(validSupplierEndpoints) == 0 {
return nil, ErrAppGateNoRelayEndpoints
}

// Protect the endpointSelectionIndex update from concurrent relay requests.
app.endpointSelectionIndexMu.Lock()
defer app.endpointSelectionIndexMu.Unlock()

// Select the next endpoint in the list by rotating the index.
// This does not necessarily start from the first endpoint of a new session
// but will cycle through all valid endpoints of the same session if enough
// requests are made.
// This is a naive strategy that is used to ensure all endpoints are leveraged
// throughout the lifetime of the session. It is primarily used as a foundation
// for testing or development purposes but a more enhanced strategy is expected
// to be adopted by prod gateways.
app.endpointSelectionIndex = (app.endpointSelectionIndex + 1) % len(validSupplierEndpoints)

return validSupplierEndpoints[app.endpointSelectionIndex], nil
}
9 changes: 9 additions & 0 deletions pkg/appgateserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"

"cosmossdk.io/depinject"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -65,6 +66,14 @@ type appGateServer struct {
// server is the HTTP server that will be used capture application requests
// so that they can be signed and relayed to the supplier.
server *http.Server

// endpointSelectionIndexMu is a mutex that protects the endpointSelectionIndex
// from concurrent relay requests.
endpointSelectionIndexMu sync.Mutex

// endpointSelectionIndex is the index of the last selected endpoint.
// It is used to cycle through the available endpoints in a round-robin fashion.
endpointSelectionIndex int
}

// NewAppGateServer creates a new appGateServer instance with the given dependencies.
Expand Down

0 comments on commit 833e540

Please sign in to comment.