Skip to content

Commit

Permalink
feat: Implement relayminer query caching
Browse files Browse the repository at this point in the history
  • Loading branch information
red-0ne committed Jan 31, 2025
1 parent 2055b0f commit c12abfd
Show file tree
Hide file tree
Showing 15 changed files with 404 additions and 38 deletions.
22 changes: 8 additions & 14 deletions pkg/client/query/accquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package query

import (
"context"
"sync"

"cosmossdk.io/depinject"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
Expand All @@ -21,11 +20,7 @@ var _ client.AccountQueryClient = (*accQuerier)(nil)
type accQuerier struct {
clientConn grpc.ClientConn
accountQuerier accounttypes.QueryClient

// accountCache is a cache of accounts that have already been queried.
// TODO_TECHDEBT: Add a size limit to the cache and consider an LRU cache.
accountCache map[string]types.AccountI
accountCacheMu sync.Mutex
accountsCache KeyValueCache[types.AccountI]
}

// NewAccountQuerier returns a new instance of a client.AccountQueryClient by
Expand All @@ -34,10 +29,11 @@ type accQuerier struct {
// Required dependencies:
// - clientCtx
func NewAccountQuerier(deps depinject.Config) (client.AccountQueryClient, error) {
aq := &accQuerier{accountCache: make(map[string]types.AccountI)}
aq := &accQuerier{}

if err := depinject.Inject(
deps,
&aq.accountsCache,
&aq.clientConn,
); err != nil {
return nil, err
Expand All @@ -53,11 +49,9 @@ func (aq *accQuerier) GetAccount(
ctx context.Context,
address string,
) (types.AccountI, error) {
aq.accountCacheMu.Lock()
defer aq.accountCacheMu.Unlock()

if foundAccount, isAccountFound := aq.accountCache[address]; isAccountFound {
return foundAccount, nil
// Check if the account is present in the cache.
if account, found := aq.accountsCache.Get(address); found {
return account, nil
}

// Query the blockchain for the account record
Expand All @@ -81,8 +75,8 @@ func (aq *accQuerier) GetAccount(
return nil, ErrQueryPubKeyNotFound
}

aq.accountCache[address] = fetchedAccount

// Cache the fetched account for future queries.
aq.accountsCache.Set(address, fetchedAccount)
return fetchedAccount, nil
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/client/query/appquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var _ client.ApplicationQueryClient = (*appQuerier)(nil)
type appQuerier struct {
clientConn grpc.ClientConn
applicationQuerier apptypes.QueryClient
applicationsCache KeyValueCache[apptypes.Application]
paramsCache ParamsCache[apptypes.Params]
}

// NewApplicationQuerier returns a new instance of a client.ApplicationQueryClient
Expand All @@ -30,6 +32,8 @@ func NewApplicationQuerier(deps depinject.Config) (client.ApplicationQueryClient

if err := depinject.Inject(
deps,
&aq.paramsCache,
&aq.applicationsCache,
&aq.clientConn,
); err != nil {
return nil, err
Expand All @@ -45,17 +49,28 @@ func (aq *appQuerier) GetApplication(
ctx context.Context,
appAddress string,
) (apptypes.Application, error) {
// Check if the application is present in the cache.
if app, found := aq.applicationsCache.Get(appAddress); found {
return app, nil
}

req := apptypes.QueryGetApplicationRequest{Address: appAddress}
res, err := aq.applicationQuerier.Application(ctx, &req)
if err != nil {
return apptypes.Application{}, apptypes.ErrAppNotFound.Wrapf("app address: %s [%v]", appAddress, err)
}

// Cache the application.
aq.applicationsCache.Set(appAddress, res.Application)
return res.Application, nil
}

// GetAllApplications returns all staked applications
func (aq *appQuerier) GetAllApplications(ctx context.Context) ([]apptypes.Application, error) {
req := apptypes.QueryAllApplicationsRequest{}
// TODO_CONSIDERATION: Fill the cache with all applications and mark it as
// having been filled, such that subsequent calls to this function will
// return the cached value.
res, err := aq.applicationQuerier.AllApplications(ctx, &req)
if err != nil {
return []apptypes.Application{}, err
Expand All @@ -65,10 +80,18 @@ func (aq *appQuerier) GetAllApplications(ctx context.Context) ([]apptypes.Applic

// GetParams returns the application module parameters
func (aq *appQuerier) GetParams(ctx context.Context) (*apptypes.Params, error) {
// Check if the application module parameters are present in the cache.
if params, found := aq.paramsCache.Get(); found {
return &params, nil
}

req := apptypes.QueryParamsRequest{}
res, err := aq.applicationQuerier.Params(ctx, &req)
if err != nil {
return nil, err
}

// Update the cache with the newly retrieved application module parameters.
aq.paramsCache.Set(res.Params)
return &res.Params, nil
}
13 changes: 11 additions & 2 deletions pkg/client/query/bankquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ var _ client.BankQueryClient = (*bankQuerier)(nil)
// bankQuerier is a wrapper around the banktypes.QueryClient that enables the
// querying of onchain balance information.
type bankQuerier struct {
clientConn grpc.ClientConn
bankQuerier banktypes.QueryClient
clientConn grpc.ClientConn
bankQuerier banktypes.QueryClient
balancesCache KeyValueCache[*sdk.Coin]
}

// NewBankQuerier returns a new instance of a client.BankQueryClient by
Expand All @@ -31,6 +32,7 @@ func NewBankQuerier(deps depinject.Config) (client.BankQueryClient, error) {

if err := depinject.Inject(
deps,
&bq.balancesCache,
&bq.clientConn,
); err != nil {
return nil, err
Expand All @@ -46,12 +48,19 @@ func (bq *bankQuerier) GetBalance(
ctx context.Context,
address string,
) (*sdk.Coin, error) {
// Check if the account balance is present in the cache.
if balance, found := bq.balancesCache.Get(address); found {
return balance, nil
}

// Query the blockchain for the balance record
req := &banktypes.QueryBalanceRequest{Address: address, Denom: volatile.DenomuPOKT}
res, err := bq.bankQuerier.Balance(ctx, req)
if err != nil {
return nil, ErrQueryBalanceNotFound.Wrapf("address: %s [%s]", address, err)
}

// Cache the balance for future queries
bq.balancesCache.Set(address, res.Balance)
return res.Balance, nil
}
61 changes: 61 additions & 0 deletions pkg/client/query/cache/kvcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cache

import (
"sync"

"github.com/pokt-network/poktroll/pkg/client/query"
)

var _ query.KeyValueCache[any] = (*keyValueCache[any])(nil)

// keyValueCache is a simple in-memory key-value cache implementation.
// It is safe for concurrent use.
type keyValueCache[V any] struct {
cacheMu sync.RWMutex
valuesMap map[string]V
}

// NewKeyValueCache returns a new instance of a KeyValueCache.
func NewKeyValueCache[T any]() query.KeyValueCache[T] {
return &keyValueCache[T]{
valuesMap: make(map[string]T),
}
}

// Get returns the value for the given key.
// A boolean is returned as the second value to indicate if the key was found in the cache.
func (c *keyValueCache[V]) Get(key string) (value V, found bool) {
c.cacheMu.RLock()
defer c.cacheMu.RUnlock()

value, found = c.valuesMap[key]
return value, found
}

// Set sets the value for the given key.
// TODO_CONSIDERATION: Add a method to set many values and indicate whether it
// is the result of a GetAll operation. This would allow us to know whether the
// cache is populated with all the possible values, so any other GetAll operation
// could be returned from the cache.
func (c *keyValueCache[V]) Set(key string, value V) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.valuesMap[key] = value
}

// Delete deletes the value for the given key.
func (c *keyValueCache[V]) Delete(key string) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

delete(c.valuesMap, key)
}

// Clear empties the whole cache.
func (c *keyValueCache[V]) Clear() {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.valuesMap = make(map[string]V)
}
38 changes: 38 additions & 0 deletions pkg/client/query/cache/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cache

import (
"context"

"cosmossdk.io/depinject"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable/channel"
)

// ClearableCache is an interface that defines the common methods for a cache object.
type Cache interface {
Clear()
}

// CacheOption is a function type for the option functions that can customize
// the cache behavior.
type CacheOption[C Cache] func(context.Context, depinject.Config, C) error

// WithNewBlockCacheClearing is a cache option that clears the cache every time
// a new block is observed.
func WithNewBlockCacheClearing[C Cache](ctx context.Context, deps depinject.Config, cache C) error {
var blockClient client.BlockClient
if err := depinject.Inject(deps, &blockClient); err != nil {
return err
}

channel.ForEach(
ctx,
blockClient.CommittedBlocksSequence(ctx),
func(ctx context.Context, block client.Block) {
cache.Clear()
},
)

return nil
}
54 changes: 54 additions & 0 deletions pkg/client/query/cache/paramscache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cache

import (
"sync"

"github.com/pokt-network/poktroll/pkg/client/query"
)

var _ query.ParamsCache[any] = (*paramsCache[any])(nil)

// paramsCache is a simple in-memory cache implementation for query parameters.
// It does not involve key-value pairs, but only stores a single value.
type paramsCache[T any] struct {
cacheMu sync.RWMutex
found bool
value T
}

// NewParamsCache returns a new instance of a ParamsCache.
func NewParamsCache[T any]() query.ParamsCache[T] {
return &paramsCache[T]{}
}

// Get returns the value stored in the cache.
// A boolean is returned as the second value to indicate if the value was found in the cache.
func (c *paramsCache[T]) Get() (value T, found bool) {
c.cacheMu.RLock()
defer c.cacheMu.RUnlock()

return c.value, c.found
}

// Set sets the value in the cache.
func (c *paramsCache[T]) Set(value T) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.found = true
c.value = value
}

// Clear empties the cache.
func (c *paramsCache[T]) Clear() {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.found = false
c.value = zeroValue[T]()
}

// zeroValue is a generic helper which returns the zero value of the given type.
func zeroValue[T any]() (zero T) {
return zero
}
16 changes: 16 additions & 0 deletions pkg/client/query/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package query

// ParamsCache is an interface for a simple in-memory cache implementation for query parameters.
type ParamsCache[T any] interface {
Get() (T, bool)
Set(T)
Clear()
}

// KeyValueCache is an interface for a simple in-memory key-value cache implementation.
type KeyValueCache[V any] interface {
Get(string) (V, bool)
Set(string, V)
Delete(string)
Clear()
}
10 changes: 10 additions & 0 deletions pkg/client/query/proofquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type proofQuerier struct {
clientConn grpc.ClientConn
proofQuerier prooftypes.QueryClient
paramsCache ParamsCache[prooftypes.Params]
}

// NewProofQuerier returns a new instance of a client.ProofQueryClient by
Expand All @@ -27,6 +28,7 @@ func NewProofQuerier(deps depinject.Config) (client.ProofQueryClient, error) {

if err := depinject.Inject(
deps,
&querier.paramsCache,
&querier.clientConn,
); err != nil {
return nil, err
Expand All @@ -41,10 +43,18 @@ func NewProofQuerier(deps depinject.Config) (client.ProofQueryClient, error) {
func (pq *proofQuerier) GetParams(
ctx context.Context,
) (client.ProofParams, error) {
// Get the params from the cache if they exist.
if params, found := pq.paramsCache.Get(); found {
return &params, nil
}

req := &prooftypes.QueryParamsRequest{}
res, err := pq.proofQuerier.Params(ctx, req)
if err != nil {
return nil, err
}

// Update the cache with the newly retrieved params.
pq.paramsCache.Set(res.Params)
return &res.Params, nil
}
Loading

0 comments on commit c12abfd

Please sign in to comment.