Skip to content

Commit

Permalink
fix caching concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
red-0ne committed Jan 25, 2025
1 parent 7c4c452 commit 1e34b01
Show file tree
Hide file tree
Showing 32 changed files with 206 additions and 254 deletions.
71 changes: 41 additions & 30 deletions create-accounts.sh
Original file line number Diff line number Diff line change
@@ -1,41 +1,52 @@
#!/bin/bash

TOTAL_ACCOUNTS=50000
PARALLEL_JOBS=8
ACCOUNTS_PER_JOB=$((TOTAL_ACCOUNTS / PARALLEL_JOBS))

create_accounts() {
local start=$1
local end=$2
local job_id=$3

for i in $(seq $start $end); do
if ! poktrolld keys add "app-$i" > /dev/null 2>&1; then
echo "Job $job_id: Error creating account app-$i"
continue
fi
TOTAL_ADDRESSES=${1:-50000}
PARALLEL_JOBS=${2:-8}
SEGMENTS_DIR="./segments"
OUTPUT_FILE="app_addresses.txt"

create_segment() {
local job_id=$1
local start_idx=$2
local num_addresses=$3
local segment_file="$SEGMENTS_DIR/segment_$job_id.txt"

> "$segment_file"
for i in $(seq 0 $(($num_addresses-1))); do
addr_idx=$(($start_idx + $i + 1))
output=$(poktrolld keys add "app-$addr_idx" --output json | jq -r .address 2>&1)
echo "$output" >> "$segment_file"
done
}

if [ $((i % 100)) -eq 0 ]; then
echo "Job $job_id: Progress $i/$end accounts created"
merge_segments() {
> "$OUTPUT_FILE"
for i in $(seq 0 $((PARALLEL_JOBS-1))); do
if [ -f "$SEGMENTS_DIR/segment_$i.txt" ]; then
cat "$SEGMENTS_DIR/segment_$i.txt" >> "$OUTPUT_FILE"
else
echo "Missing segment file: segment_$i.txt" >&2
return 1
fi
done
}

echo "Starting parallel account creation with $PARALLEL_JOBS jobs..."
main() {
rm -rf $SEGMENTS_DIR
mkdir -p $SEGMENTS_DIR

# Launch parallel jobs
for job in $(seq 0 $((PARALLEL_JOBS-1))); do
start=$((job * ACCOUNTS_PER_JOB + 1))
if [ $job -eq $((PARALLEL_JOBS-1)) ]; then
end=$TOTAL_ACCOUNTS
else
end=$((start + ACCOUNTS_PER_JOB - 1))
fi
ADDRS_PER_JOB=$(( (TOTAL_ADDRESSES + PARALLEL_JOBS - 1) / PARALLEL_JOBS ))
echo "Creating $TOTAL_ADDRESSES addresses using $PARALLEL_JOBS parallel jobs"

create_accounts $start $end $job &
done
for job_id in $(seq 0 $((PARALLEL_JOBS-1))); do
start_idx=$((job_id * ADDRS_PER_JOB))
create_segment "$job_id" "$start_idx" "$ADDRS_PER_JOB" &
done

# Wait for all background jobs to complete
wait
wait
merge_segments
rm -rf $SEGMENTS_DIR
echo "Complete - addresses written to $OUTPUT_FILE"
}

echo "All account creation jobs completed!"
main
4 changes: 3 additions & 1 deletion load-testing/tests/relays_stress_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,9 @@ func (s *relaysSuite) createApplicationAccount(
accAddress, err := keyRecord.GetAddress()
require.NoError(s, err)

logger.Debug().Msgf("Application added %s", keyName)
if appIdx%5000 == 0 {
logger.Debug().Msgf("Application added %s", keyName)
}

return &accountInfo{
address: accAddress.String(),
Expand Down
36 changes: 3 additions & 33 deletions stake_apps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,22 @@
TOTAL_APPS=50000
PARALLEL_JOBS=8
CONFIG_DIR="localnet/poktrolld/config"
TEMP_DIR=/tmp/stake_apps
SEGMENT_SIZE=$((TOTAL_APPS / PARALLEL_JOBS))

# Create and setup temp directory
rm -rf $TEMP_DIR
mkdir -p $TEMP_DIR
chmod 777 $TEMP_DIR
trap 'rm -rf $TEMP_DIR' EXIT

# Function to process a segment of apps
process_segment() {
local start=$1
local end=$2
local job_id=$3
local output="$TEMP_DIR/segment_$job_id.txt"
local config_file="${CONFIG_DIR}/application_stake_config.yaml"

echo "Job $job_id staking apps $start to $end"
for i in $(seq $start $end); do
local app_name="app-$i"
if poktrolld tx application stake-application -y \
poktrolld tx application stake-application -y \
--config "$config_file" \
--keyring-backend test \
--from "$app_name" > /dev/null 2>&1; then
echo "$app_name" >> "$output.success"
else
echo "$app_name" >> "$output.failed"
fi
--from "$app_name" > /dev/null 2>&1;
done
}

Expand All @@ -50,22 +38,4 @@ done

wait

# Report results
total_success=0
total_failed=0
for job_id in $(seq 0 $((PARALLEL_JOBS - 1))); do
if [ -f "$TEMP_DIR/segment_$job_id.txt.success" ]; then
success=$(wc -l < "$TEMP_DIR/segment_$job_id.txt.success")
total_success=$((total_success + success))
fi
if [ -f "$TEMP_DIR/segment_$job_id.txt.failed" ]; then
failed=$(wc -l < "$TEMP_DIR/segment_$job_id.txt.failed")
total_failed=$((total_failed + failed))
echo "Failed apps in job $job_id:"
cat "$TEMP_DIR/segment_$job_id.txt.failed"
fi
done

echo "Staking complete!"
echo "Successfully staked: $total_success applications"
echo "Failed: $total_failed applications"
echo "Staking complete!"
16 changes: 6 additions & 10 deletions x/application/keeper/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,21 @@ import (

// SetApplication set a specific application in the store from its index
func (k Keeper) SetApplication(ctx context.Context, application types.Application) {
if k.cache.Applications[application.Address] != nil {
k.cache.Applications[application.Address] = &application
}

storeAdapter := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
store := prefix.NewStore(storeAdapter, types.KeyPrefix(types.ApplicationKeyPrefix))
appBz := k.cdc.MustMarshal(&application)
store.Set(types.ApplicationKey(application.Address), appBz)
k.applicationsCache.Set(application.Address, application)
}

// GetApplication returns a application from its index
func (k Keeper) GetApplication(
ctx context.Context,
appAddr string,
) (app types.Application, found bool) {
if app, found := k.cache.Applications[appAddr]; found {
if app, found := k.applicationsCache.Get(appAddr); found {

Check failure on line 27 in x/application/keeper/application.go

View workflow job for this annotation

GitHub Actions / go-test

shadow: declaration of "app" shadows declaration at line 26 (govet)
k.logger.Info("-----Application cache hit-----")
return *app, true
return app, true
}

storeAdapter := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
Expand All @@ -54,18 +51,17 @@ func (k Keeper) GetApplication(
app.DelegateeGatewayAddresses = make([]string, 0)
}

k.cache.Applications[appAddr] = &app
k.applicationsCache.Set(appAddr, app)

return app, true
}

// RemoveApplication removes a application from the store
func (k Keeper) RemoveApplication(ctx context.Context, appAddr string) {
delete(k.cache.Applications, appAddr)

storeAdapter := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
store := prefix.NewStore(storeAdapter, types.KeyPrefix(types.ApplicationKeyPrefix))
store.Delete(types.ApplicationKey(appAddr))
k.applicationsCache.Delete(appAddr)
}

// GetAllApplications returns all application
Expand All @@ -86,7 +82,7 @@ func (k Keeper) GetAllApplications(ctx context.Context) (apps []types.Applicatio
app.PendingUndelegations = make(map[uint64]types.UndelegatingGatewayList)
}

k.cache.Applications[app.Address] = &app
k.applicationsCache.Set(app.Address, app)

apps = append(apps, app)
}
Expand Down
12 changes: 7 additions & 5 deletions x/application/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/pokt-network/poktroll/x/application/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

type (
Expand All @@ -26,7 +27,8 @@ type (
gatewayKeeper types.GatewayKeeper
sharedKeeper types.SharedKeeper

cache *types.Cache
applicationsCache *sharedtypes.Cache[string, types.Application]
paramsCache *sharedtypes.Cache[string, types.Params]
}
)

Expand Down Expand Up @@ -56,14 +58,14 @@ func NewKeeper(
gatewayKeeper: gatewayKeeper,
sharedKeeper: sharedKeeper,

cache: &types.Cache{
Applications: make(map[string]*types.Application),
},
applicationsCache: sharedtypes.NewCache[string, types.Application](),
paramsCache: sharedtypes.NewCache[string, types.Params](),
}
}

func (k Keeper) ClearCache() {
k.cache.Clear()
k.applicationsCache.Clear()
k.paramsCache.Clear()
}

// GetAuthority returns the module's authority.
Expand Down
9 changes: 4 additions & 5 deletions x/application/keeper/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

// GetParams get all parameters as types.Params
func (k Keeper) GetParams(ctx context.Context) (params types.Params) {
if k.cache.Params != nil {
if params, found := k.paramsCache.Get(""); found {

Check failure on line 13 in x/application/keeper/params.go

View workflow job for this annotation

GitHub Actions / go-test

shadow: declaration of "params" shadows declaration at line 12 (govet)
k.logger.Info("-----Application params cache hit-----")
return *k.cache.Params
return params
}

store := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
Expand All @@ -22,21 +22,20 @@ func (k Keeper) GetParams(ctx context.Context) (params types.Params) {
}

k.cdc.MustUnmarshal(paramsBz, &params)
k.paramsCache.Set("", params)

k.cache.Params = &params
return params
}

// SetParams set the params
func (k Keeper) SetParams(ctx context.Context, params types.Params) error {
k.cache.Params = &params

store := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
paramsBz, err := k.cdc.Marshal(&params)
if err != nil {
return err
}
store.Set(types.ParamsKey, paramsBz)
k.paramsCache.Set("", params)

return nil
}
11 changes: 0 additions & 11 deletions x/application/types/cache.go

This file was deleted.

16 changes: 8 additions & 8 deletions x/proof/keeper/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func (k Keeper) UpsertClaim(ctx context.Context, claim types.Claim) {
sessionId := claim.GetSessionHeader().GetSessionId()
primaryKey := types.ClaimPrimaryKey(sessionId, claim.SupplierOperatorAddress)
primaryStore.Set(primaryKey, claimBz)

k.cache.Claims[sessionId] = &claim
k.claimsCache.Set(string(primaryKey), claim)

logger.Info(fmt.Sprintf("upserted claim for supplier %s with primaryKey %s", claim.SupplierOperatorAddress, primaryKey))

Expand All @@ -43,14 +42,15 @@ func (k Keeper) UpsertClaim(ctx context.Context, claim types.Claim) {

// GetClaim returns a claim from its index
func (k Keeper) GetClaim(ctx context.Context, sessionId, supplierOperatorAddr string) (_ types.Claim, isClaimFound bool) {
if claim, found := k.cache.Claims[sessionId]; found {
primaryKey := types.ClaimPrimaryKey(sessionId, supplierOperatorAddr)
if claim, found := k.claimsCache.Get(string(primaryKey)); found {
k.logger.Info("-----Supplier cache hit-----")
return *claim, true
return claim, true
}

claim, found := k.getClaimByPrimaryKey(ctx, types.ClaimPrimaryKey(sessionId, supplierOperatorAddr))
claim, found := k.getClaimByPrimaryKey(ctx, primaryKey)
if found {
k.cache.Claims[sessionId] = &claim
k.claimsCache.Set(string(primaryKey), claim)
}

return claim, found
Expand All @@ -65,7 +65,6 @@ func (k Keeper) RemoveClaim(ctx context.Context, sessionId, supplierOperatorAddr

// Check if the claim exists
primaryKey := types.ClaimPrimaryKey(sessionId, supplierOperatorAddr)
delete(k.cache.Claims, sessionId)
foundClaim, isClaimFound := k.getClaimByPrimaryKey(ctx, primaryKey)
if !isClaimFound {
logger.Error(fmt.Sprintf("trying to delete non-existent claim with primary key %s for supplier %s and session %s", primaryKey, supplierOperatorAddr, sessionId))
Expand All @@ -84,6 +83,7 @@ func (k Keeper) RemoveClaim(ctx context.Context, sessionId, supplierOperatorAddr
primaryStore.Delete(primaryKey)
supplierOperatorAddrStore.Delete(supplierOperatorAddrKey)
sessionEndHeightStore.Delete(sessionEndHeightKey)
k.claimsCache.Delete(string(primaryKey))

logger.Info(fmt.Sprintf("deleted claim with primary key %s for supplier %s and session %s", primaryKey, supplierOperatorAddr, sessionId))
}
Expand All @@ -99,7 +99,7 @@ func (k Keeper) GetAllClaims(ctx context.Context) (claims []types.Claim) {
for ; iterator.Valid(); iterator.Next() {
var claim types.Claim
k.cdc.MustUnmarshal(iterator.Value(), &claim)
k.cache.Claims[claim.GetSessionHeader().GetSessionId()] = &claim
k.claimsCache.Set(string(iterator.Key()), claim)
claims = append(claims, claim)
}

Expand Down
16 changes: 10 additions & 6 deletions x/proof/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pokt-network/poktroll/pkg/polylog"
_ "github.com/pokt-network/poktroll/pkg/polylog/polyzero"
"github.com/pokt-network/poktroll/x/proof/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

type (
Expand All @@ -39,7 +40,9 @@ type (
accountQuerier client.AccountQueryClient
sharedQuerier client.SharedQueryClient

cache *types.Cache
claimsCache *sharedtypes.Cache[string, types.Claim]
proofsCache *sharedtypes.Cache[string, types.Proof]
paramsCache *sharedtypes.Cache[string, types.Params]
}
)

Expand Down Expand Up @@ -104,15 +107,16 @@ func NewKeeper(
accountQuerier: accountQuerier,
sharedQuerier: sharedQuerier,

cache: &types.Cache{
Proofs: make(map[string]*types.Proof),
Claims: make(map[string]*types.Claim),
},
claimsCache: sharedtypes.NewCache[string, types.Claim](),
proofsCache: sharedtypes.NewCache[string, types.Proof](),
paramsCache: sharedtypes.NewCache[string, types.Params](),
}
}

func (k Keeper) ClearCache() {
k.cache.Clear()
k.claimsCache.Clear()
k.proofsCache.Clear()
k.paramsCache.Clear()
k.accountQuerier.ClearCache()
}

Expand Down
Loading

0 comments on commit 1e34b01

Please sign in to comment.