Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RelayMiner] Implement relayminer query caching #1050

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
22 changes: 8 additions & 14 deletions pkg/client/query/accquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package query

import (
"context"
"sync"

"cosmossdk.io/depinject"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
Expand All @@ -21,11 +20,7 @@ var _ client.AccountQueryClient = (*accQuerier)(nil)
type accQuerier struct {
clientConn grpc.ClientConn
accountQuerier accounttypes.QueryClient

// accountCache is a cache of accounts that have already been queried.
// TODO_TECHDEBT: Add a size limit to the cache and consider an LRU cache.
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
accountCache map[string]types.AccountI
accountCacheMu sync.Mutex
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
accountsCache KeyValueCache[types.AccountI]
}

// NewAccountQuerier returns a new instance of a client.AccountQueryClient by
Expand All @@ -34,10 +29,11 @@ type accQuerier struct {
// Required dependencies:
// - clientCtx
func NewAccountQuerier(deps depinject.Config) (client.AccountQueryClient, error) {
aq := &accQuerier{accountCache: make(map[string]types.AccountI)}
aq := &accQuerier{}

if err := depinject.Inject(
deps,
&aq.accountsCache,
&aq.clientConn,
); err != nil {
return nil, err
Expand All @@ -53,11 +49,9 @@ func (aq *accQuerier) GetAccount(
ctx context.Context,
address string,
) (types.AccountI, error) {
aq.accountCacheMu.Lock()
defer aq.accountCacheMu.Unlock()

if foundAccount, isAccountFound := aq.accountCache[address]; isAccountFound {
return foundAccount, nil
// Check if the account is present in the cache.
if account, found := aq.accountsCache.Get(address); found {
return account, nil
}

// Query the blockchain for the account record
Expand All @@ -81,8 +75,8 @@ func (aq *accQuerier) GetAccount(
return nil, ErrQueryPubKeyNotFound
}

aq.accountCache[address] = fetchedAccount

// Cache the fetched account for future queries.
aq.accountsCache.Set(address, fetchedAccount)
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
return fetchedAccount, nil
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/client/query/appquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var _ client.ApplicationQueryClient = (*appQuerier)(nil)
type appQuerier struct {
clientConn grpc.ClientConn
applicationQuerier apptypes.QueryClient
applicationsCache KeyValueCache[apptypes.Application]
paramsCache ParamsCache[apptypes.Params]
}

// NewApplicationQuerier returns a new instance of a client.ApplicationQueryClient
Expand All @@ -30,6 +32,8 @@ func NewApplicationQuerier(deps depinject.Config) (client.ApplicationQueryClient

if err := depinject.Inject(
deps,
&aq.paramsCache,
&aq.applicationsCache,
&aq.clientConn,
); err != nil {
return nil, err
Expand All @@ -45,17 +49,28 @@ func (aq *appQuerier) GetApplication(
ctx context.Context,
appAddress string,
) (apptypes.Application, error) {
// Check if the application is present in the cache.
if app, found := aq.applicationsCache.Get(appAddress); found {
return app, nil
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}

req := apptypes.QueryGetApplicationRequest{Address: appAddress}
res, err := aq.applicationQuerier.Application(ctx, &req)
if err != nil {
return apptypes.Application{}, apptypes.ErrAppNotFound.Wrapf("app address: %s [%v]", appAddress, err)
}

// Cache the application.
aq.applicationsCache.Set(appAddress, res.Application)
return res.Application, nil
}

// GetAllApplications returns all staked applications
func (aq *appQuerier) GetAllApplications(ctx context.Context) ([]apptypes.Application, error) {
req := apptypes.QueryAllApplicationsRequest{}
// TODO_CONSIDERATION: Fill the cache with all applications and mark it as
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
// having been filled, such that subsequent calls to this function will
// return the cached value.
res, err := aq.applicationQuerier.AllApplications(ctx, &req)
if err != nil {
return []apptypes.Application{}, err
Expand All @@ -65,10 +80,18 @@ func (aq *appQuerier) GetAllApplications(ctx context.Context) ([]apptypes.Applic

// GetParams returns the application module parameters
func (aq *appQuerier) GetParams(ctx context.Context) (*apptypes.Params, error) {
// Check if the application module parameters are present in the cache.
if params, found := aq.paramsCache.Get(); found {
return &params, nil
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}

req := apptypes.QueryParamsRequest{}
res, err := aq.applicationQuerier.Params(ctx, &req)
if err != nil {
return nil, err
}

// Update the cache with the newly retrieved application module parameters.
aq.paramsCache.Set(res.Params)
return &res.Params, nil
}
13 changes: 11 additions & 2 deletions pkg/client/query/bankquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ var _ client.BankQueryClient = (*bankQuerier)(nil)
// bankQuerier is a wrapper around the banktypes.QueryClient that enables the
// querying of onchain balance information.
type bankQuerier struct {
clientConn grpc.ClientConn
bankQuerier banktypes.QueryClient
clientConn grpc.ClientConn
bankQuerier banktypes.QueryClient
balancesCache KeyValueCache[*sdk.Coin]
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}

// NewBankQuerier returns a new instance of a client.BankQueryClient by
Expand All @@ -31,6 +32,7 @@ func NewBankQuerier(deps depinject.Config) (client.BankQueryClient, error) {

if err := depinject.Inject(
deps,
&bq.balancesCache,
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
&bq.clientConn,
); err != nil {
return nil, err
Expand All @@ -46,12 +48,19 @@ func (bq *bankQuerier) GetBalance(
ctx context.Context,
address string,
) (*sdk.Coin, error) {
// Check if the account balance is present in the cache.
if balance, found := bq.balancesCache.Get(address); found {
return balance, nil
}

// Query the blockchain for the balance record
req := &banktypes.QueryBalanceRequest{Address: address, Denom: volatile.DenomuPOKT}
res, err := bq.bankQuerier.Balance(ctx, req)
if err != nil {
return nil, ErrQueryBalanceNotFound.Wrapf("address: %s [%s]", address, err)
}

// Cache the balance for future queries
bq.balancesCache.Set(address, res.Balance)
return res.Balance, nil
}
61 changes: 61 additions & 0 deletions pkg/client/query/cache/kvcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cache

import (
"sync"

"github.com/pokt-network/poktroll/pkg/client/query"
)

var _ query.KeyValueCache[any] = (*keyValueCache[any])(nil)

// keyValueCache is a simple in-memory key-value cache implementation.
// It is safe for concurrent use.
type keyValueCache[V any] struct {
cacheMu sync.RWMutex
valuesMap map[string]V
}

// NewKeyValueCache returns a new instance of a KeyValueCache.
func NewKeyValueCache[T any]() query.KeyValueCache[T] {
return &keyValueCache[T]{
valuesMap: make(map[string]T),
}
}

// Get returns the value for the given key.
// A boolean is returned as the second value to indicate if the key was found in the cache.
func (c *keyValueCache[V]) Get(key string) (value V, found bool) {
c.cacheMu.RLock()
defer c.cacheMu.RUnlock()

value, found = c.valuesMap[key]
return value, found
}

// Set sets the value for the given key.
// TODO_CONSIDERATION: Add a method to set many values and indicate whether it
// is the result of a GetAll operation. This would allow us to know whether the
// cache is populated with all the possible values, so any other GetAll operation
// could be returned from the cache.
func (c *keyValueCache[V]) Set(key string, value V) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.valuesMap[key] = value
}

// Delete deletes the value for the given key.
func (c *keyValueCache[V]) Delete(key string) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

delete(c.valuesMap, key)
}

// Clear empties the whole cache.
func (c *keyValueCache[V]) Clear() {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.valuesMap = make(map[string]V)
}
38 changes: 38 additions & 0 deletions pkg/client/query/cache/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cache

import (
"context"

"cosmossdk.io/depinject"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable/channel"
)

// Cache is an interface that defines the common methods for a cache object.
type Cache interface {
Clear()
}

// CacheOption is a function type for the option functions that can customize
// the cache behavior.
type CacheOption[C Cache] func(context.Context, depinject.Config, C) error

// WithNewBlockCacheClearing is a cache option that clears the cache every time
// a new block is observed.
func WithNewBlockCacheClearing[C Cache](ctx context.Context, deps depinject.Config, cache C) error {
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
var blockClient client.BlockClient
if err := depinject.Inject(deps, &blockClient); err != nil {
return err
}

channel.ForEach(
ctx,
blockClient.CommittedBlocksSequence(ctx),
func(ctx context.Context, block client.Block) {
cache.Clear()
},
)

return nil
}
54 changes: 54 additions & 0 deletions pkg/client/query/cache/paramscache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cache

import (
"sync"

"github.com/pokt-network/poktroll/pkg/client/query"
)

var _ query.ParamsCache[any] = (*paramsCache[any])(nil)

// paramsCache is a simple in-memory cache implementation for query parameters.
// It does not involve key-value pairs, but only stores a single value.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
type paramsCache[T any] struct {
cacheMu sync.RWMutex
found bool
value T
}

// NewParamsCache returns a new instance of a ParamsCache.
func NewParamsCache[T any]() query.ParamsCache[T] {
return &paramsCache[T]{}
}

// Get returns the value stored in the cache.
// A boolean is returned as the second value to indicate if the value was found in the cache.
func (c *paramsCache[T]) Get() (value T, found bool) {
c.cacheMu.RLock()
defer c.cacheMu.RUnlock()

return c.value, c.found
}

// Set sets the value in the cache.
func (c *paramsCache[T]) Set(value T) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.found = true
c.value = value
}

// Clear empties the cache.
func (c *paramsCache[T]) Clear() {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

c.found = false
c.value = zeroValue[T]()
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}

// zeroValue is a generic helper which returns the zero value of the given type.
func zeroValue[T any]() (zero T) {
return zero
}
16 changes: 16 additions & 0 deletions pkg/client/query/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package query

// ParamsCache is an interface for a simple in-memory cache implementation for query parameters.
type ParamsCache[T any] interface {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
Get() (T, bool)
Set(T)
Clear()
}

// KeyValueCache is an interface for a simple in-memory key-value cache implementation.
type KeyValueCache[V any] interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanchriswhite and I spent a lot of time designing a more general purpose interface in.

I think we should merge that one in and leverage it here.

[1] https://github.com/pokt-network/poktroll/pull/994/files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is consistent with what I was calling QueryCache[T any] in #994. It should be quite straightforward to refactor #994 to use this instead. I see KeyValueCache[V any] as a subsequent iteration of QueryCache[T any] which includes generalizing the name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR and PR #994 have very similar interfaces and will ultimately be merged.

I considered building on top of #994 but decided not to, as this PR does not aim to provide:

  • Granular (per-key) cache invalidation
  • Time-based cache invalidation (it only clears the whole cache when a new block is observed)
  • Access to historical values, which is out of scope and would imply adding new functionality (or not using the implementation)

I can put in the effort to rebase it on top of #994 if you and @bryanchriswhite think it would be worthwhile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment on discord, but I feel strongly that we should finish #994 and rebase on top of it.

Now is the time to do this right.

Screenshot 2025-02-11 at 4 31 05 PM

Get(string) (V, bool)
Set(string, V)
Delete(string)
Clear()
}
10 changes: 10 additions & 0 deletions pkg/client/query/proofquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type proofQuerier struct {
clientConn grpc.ClientConn
proofQuerier prooftypes.QueryClient
paramsCache ParamsCache[prooftypes.Params]
}

// NewProofQuerier returns a new instance of a client.ProofQueryClient by
Expand All @@ -27,6 +28,7 @@ func NewProofQuerier(deps depinject.Config) (client.ProofQueryClient, error) {

if err := depinject.Inject(
deps,
&querier.paramsCache,
&querier.clientConn,
); err != nil {
return nil, err
Expand All @@ -41,10 +43,18 @@ func NewProofQuerier(deps depinject.Config) (client.ProofQueryClient, error) {
func (pq *proofQuerier) GetParams(
ctx context.Context,
) (client.ProofParams, error) {
// Get the params from the cache if they exist.
if params, found := pq.paramsCache.Get(); found {
return &params, nil
}

req := &prooftypes.QueryParamsRequest{}
res, err := pq.proofQuerier.Params(ctx, req)
if err != nil {
return nil, err
}

// Update the cache with the newly retrieved params.
pq.paramsCache.Set(res.Params)
return &res.Params, nil
}
Loading