Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RelayMiner] Implement relayminer query caching #1050

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions pkg/client/query/accquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package query

import (
"context"
"sync"

"cosmossdk.io/depinject"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
Expand All @@ -21,11 +20,7 @@ var _ client.AccountQueryClient = (*accQuerier)(nil)
type accQuerier struct {
clientConn grpc.ClientConn
accountQuerier accounttypes.QueryClient

// accountCache is a cache of accounts that have already been queried.
// TODO_TECHDEBT: Add a size limit to the cache and consider an LRU cache.
accountCache map[string]types.AccountI
accountCacheMu sync.Mutex
accountsCache KeyValueCache[types.AccountI]
}

// NewAccountQuerier returns a new instance of a client.AccountQueryClient by
Expand All @@ -34,10 +29,11 @@ type accQuerier struct {
// Required dependencies:
// - clientCtx
func NewAccountQuerier(deps depinject.Config) (client.AccountQueryClient, error) {
aq := &accQuerier{accountCache: make(map[string]types.AccountI)}
aq := &accQuerier{}

if err := depinject.Inject(
deps,
&aq.accountsCache,
&aq.clientConn,
); err != nil {
return nil, err
Expand All @@ -53,11 +49,9 @@ func (aq *accQuerier) GetAccount(
ctx context.Context,
address string,
) (types.AccountI, error) {
aq.accountCacheMu.Lock()
defer aq.accountCacheMu.Unlock()

if foundAccount, isAccountFound := aq.accountCache[address]; isAccountFound {
return foundAccount, nil
// Check if the account is present in the cache.
if account, found := aq.accountsCache.Get(address); found {
return account, nil
}

// Query the blockchain for the account record
Expand All @@ -81,8 +75,8 @@ func (aq *accQuerier) GetAccount(
return nil, ErrQueryPubKeyNotFound
}

aq.accountCache[address] = fetchedAccount

// Cache the fetched account for future queries.
aq.accountsCache.Set(address, fetchedAccount)
return fetchedAccount, nil
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/client/query/appquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var _ client.ApplicationQueryClient = (*appQuerier)(nil)
type appQuerier struct {
clientConn grpc.ClientConn
applicationQuerier apptypes.QueryClient
applicationsCache KeyValueCache[apptypes.Application]
paramsCache ParamsCache[apptypes.Params]
}

// NewApplicationQuerier returns a new instance of a client.ApplicationQueryClient
Expand All @@ -30,6 +32,8 @@ func NewApplicationQuerier(deps depinject.Config) (client.ApplicationQueryClient

if err := depinject.Inject(
deps,
&aq.paramsCache,
&aq.applicationsCache,
&aq.clientConn,
); err != nil {
return nil, err
Expand All @@ -45,17 +49,28 @@ func (aq *appQuerier) GetApplication(
ctx context.Context,
appAddress string,
) (apptypes.Application, error) {
// Check if the application is present in the cache.
if app, found := aq.applicationsCache.Get(appAddress); found {
return app, nil
}

req := apptypes.QueryGetApplicationRequest{Address: appAddress}
res, err := aq.applicationQuerier.Application(ctx, &req)
if err != nil {
return apptypes.Application{}, apptypes.ErrAppNotFound.Wrapf("app address: %s [%v]", appAddress, err)
}

// Cache the application.
aq.applicationsCache.Set(appAddress, res.Application)
return res.Application, nil
}

// GetAllApplications returns all staked applications
func (aq *appQuerier) GetAllApplications(ctx context.Context) ([]apptypes.Application, error) {
req := apptypes.QueryAllApplicationsRequest{}
// TODO_CONSIDERATION: Fill the cache with all applications and mark it as
// having been filled, such that subsequent calls to this function will
// return the cached value.
res, err := aq.applicationQuerier.AllApplications(ctx, &req)
if err != nil {
return []apptypes.Application{}, err
Expand All @@ -65,10 +80,18 @@ func (aq *appQuerier) GetAllApplications(ctx context.Context) ([]apptypes.Applic

// GetParams returns the application module parameters
func (aq *appQuerier) GetParams(ctx context.Context) (*apptypes.Params, error) {
// Check if the application module parameters are present in the cache.
if params, found := aq.paramsCache.Get(); found {
return &params, nil
}

req := apptypes.QueryParamsRequest{}
res, err := aq.applicationQuerier.Params(ctx, &req)
if err != nil {
return nil, err
}

// Update the cache with the newly retrieved application module parameters.
aq.paramsCache.Set(res.Params)
return &res.Params, nil
}
13 changes: 11 additions & 2 deletions pkg/client/query/bankquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ var _ client.BankQueryClient = (*bankQuerier)(nil)
// bankQuerier is a wrapper around the banktypes.QueryClient that enables the
// querying of onchain balance information.
type bankQuerier struct {
clientConn grpc.ClientConn
bankQuerier banktypes.QueryClient
clientConn grpc.ClientConn
bankQuerier banktypes.QueryClient
balancesCache KeyValueCache[*sdk.Coin]
}

// NewBankQuerier returns a new instance of a client.BankQueryClient by
Expand All @@ -31,6 +32,7 @@ func NewBankQuerier(deps depinject.Config) (client.BankQueryClient, error) {

if err := depinject.Inject(
deps,
&bq.balancesCache,
&bq.clientConn,
); err != nil {
return nil, err
Expand All @@ -46,12 +48,19 @@ func (bq *bankQuerier) GetBalance(
ctx context.Context,
address string,
) (*sdk.Coin, error) {
// Check if the account balance is present in the cache.
if balance, found := bq.balancesCache.Get(address); found {
return balance, nil
}

// Query the blockchain for the balance record
req := &banktypes.QueryBalanceRequest{Address: address, Denom: volatile.DenomuPOKT}
res, err := bq.bankQuerier.Balance(ctx, req)
if err != nil {
return nil, ErrQueryBalanceNotFound.Wrapf("address: %s [%s]", address, err)
}

// Cache the balance for future queries
bq.balancesCache.Set(address, res.Balance)
return res.Balance, nil
}
61 changes: 61 additions & 0 deletions pkg/client/query/cache/kvcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cache

import (
"sync"

"github.com/pokt-network/poktroll/pkg/client/query"
)

var _ query.KeyValueCache[any] = (*keyValueCache[any])(nil)

// keyValueCache is a simple in-memory key-value cache implementation.
// It is safe for concurrent use.
type keyValueCache[V any] struct {
cacheMu sync.RWMutex
valuesMap map[string]V
}

// NewKeyValueCache returns a new instance of a KeyValueCache.
func NewKeyValueCache[T any]() query.KeyValueCache[T] {
return &keyValueCache[T]{
valuesMap: make(map[string]T),
}
}

// Get returns the value for the given key.
// A boolean is returned as the second value to indicate if the key was found in the cache.
func (c *keyValueCache[V]) Get(key string) (value V, found bool) {
c.cacheMu.RLock()
defer c.cacheMu.RUnlock()

value, found = c.valuesMap[key]
return value, found
}

// Set sets the value for the given key.
// TODO_CONSIDERATION: Add a method to set many values and indicate whether it
// is the result of a GetAll operation. This would allow us to know whether the
// cache is populated with all the possible values, so any other GetAll operation
// could be returned from the cache.
func (c *keyValueCache[V]) Set(key string, value V) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.valuesMap[key] = value
}

// Delete deletes the value for the given key.
func (c *keyValueCache[V]) Delete(key string) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

delete(c.valuesMap, key)
}

// Clear empties the whole cache.
func (c *keyValueCache[V]) Clear() {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.valuesMap = make(map[string]V)
}
38 changes: 38 additions & 0 deletions pkg/client/query/cache/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cache

import (
"context"

"cosmossdk.io/depinject"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable/channel"
)

// Cache is an interface that defines the common methods for a cache object.
type Cache interface {
Clear()
}

// CacheOption is a function type for the option functions that can customize
// the cache behavior.
type CacheOption[C Cache] func(context.Context, depinject.Config, C) error

// WithNewBlockCacheClearing is a cache option that clears the cache every time
// a new block is observed.
func WithNewBlockCacheClearing[C Cache](ctx context.Context, deps depinject.Config, cache C) error {
var blockClient client.BlockClient
if err := depinject.Inject(deps, &blockClient); err != nil {
return err
}

channel.ForEach(
ctx,
blockClient.CommittedBlocksSequence(ctx),
func(ctx context.Context, block client.Block) {
cache.Clear()
},
)

return nil
}
54 changes: 54 additions & 0 deletions pkg/client/query/cache/paramscache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cache

import (
"sync"

"github.com/pokt-network/poktroll/pkg/client/query"
)

var _ query.ParamsCache[any] = (*paramsCache[any])(nil)

// paramsCache is a simple in-memory cache implementation for query parameters.
// It does not involve key-value pairs, but only stores a single value.
type paramsCache[T any] struct {
cacheMu sync.RWMutex
found bool
value T
}

// NewParamsCache returns a new instance of a ParamsCache.
func NewParamsCache[T any]() query.ParamsCache[T] {
return &paramsCache[T]{}
}

// Get returns the value stored in the cache.
// A boolean is returned as the second value to indicate if the value was found in the cache.
func (c *paramsCache[T]) Get() (value T, found bool) {
c.cacheMu.RLock()
defer c.cacheMu.RUnlock()

return c.value, c.found
}

// Set sets the value in the cache.
func (c *paramsCache[T]) Set(value T) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.found = true
c.value = value
}

// Clear empties the cache.
func (c *paramsCache[T]) Clear() {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.found = false
c.value = zeroValue[T]()
}

// zeroValue is a generic helper which returns the zero value of the given type.
func zeroValue[T any]() (zero T) {
return zero
}
16 changes: 16 additions & 0 deletions pkg/client/query/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package query

// ParamsCache is an interface for a simple in-memory cache implementation for query parameters.
type ParamsCache[T any] interface {
Get() (T, bool)
Set(T)
Clear()
}

// KeyValueCache is an interface for a simple in-memory key-value cache implementation.
type KeyValueCache[V any] interface {
Get(string) (V, bool)
Set(string, V)
Delete(string)
Clear()
}
10 changes: 10 additions & 0 deletions pkg/client/query/proofquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type proofQuerier struct {
clientConn grpc.ClientConn
proofQuerier prooftypes.QueryClient
paramsCache ParamsCache[prooftypes.Params]
}

// NewProofQuerier returns a new instance of a client.ProofQueryClient by
Expand All @@ -27,6 +28,7 @@ func NewProofQuerier(deps depinject.Config) (client.ProofQueryClient, error) {

if err := depinject.Inject(
deps,
&querier.paramsCache,
&querier.clientConn,
); err != nil {
return nil, err
Expand All @@ -41,10 +43,18 @@ func NewProofQuerier(deps depinject.Config) (client.ProofQueryClient, error) {
func (pq *proofQuerier) GetParams(
ctx context.Context,
) (client.ProofParams, error) {
// Get the params from the cache if they exist.
if params, found := pq.paramsCache.Get(); found {
return &params, nil
}

req := &prooftypes.QueryParamsRequest{}
res, err := pq.proofQuerier.Params(ctx, req)
if err != nil {
return nil, err
}

// Update the cache with the newly retrieved params.
pq.paramsCache.Set(res.Params)
return &res.Params, nil
}
Loading