Skip to content

Commit

Permalink
chore: Address reivew change request
Browse files Browse the repository at this point in the history
  • Loading branch information
red-0ne committed Jan 10, 2025
1 parent 59d3c7a commit 48bccb8
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 38 deletions.
6 changes: 3 additions & 3 deletions load-testing/config/load_test_manifest_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type LoadTestManifestYAML struct {
// IsEphemeralChain is a flag that indicates whether the test is expected to be
// run on LocalNet or long-living remote chain (i.e. TestNet/DevNet).
IsEphemeralChain bool `yaml:"is_ephemeral_chain"`
PRCNode string `yaml:"rpc_node"`
RPCNode string `yaml:"rpc_node"`
ServiceId string `yaml:"service_id"`
Suppliers []ProvisionedActorConfig `yaml:"suppliers"`
Gateways []ProvisionedActorConfig `yaml:"gateways"`
Expand Down Expand Up @@ -67,7 +67,7 @@ func validatedEphemeralChainManifest(manifest *LoadTestManifestYAML) (*LoadTestM
return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty funding account address")
}

if len(manifest.PRCNode) == 0 {
if len(manifest.RPCNode) == 0 {
return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty rpc node url")
}

Expand Down Expand Up @@ -111,7 +111,7 @@ func validatedNonEphemeralChainManifest(manifest *LoadTestManifestYAML) (*LoadTe
return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("suppliers entry forbidden")
}

if len(manifest.PRCNode) == 0 {
if len(manifest.RPCNode) == 0 {
return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("empty rpc node url")
}

Expand Down
46 changes: 29 additions & 17 deletions load-testing/tests/relays_stress_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type actorLoadTestIncrementPlan struct {
func (s *relaysSuite) setupEventListeners(rpcNode string) {
// Set up the blockClient that will be notifying the suite about the committed blocks.
eventsObs, eventsObsCh := channel.NewObservable[[]types.Event]()
s.eventsObs = eventsObs
s.committedEventsObs = eventsObs

extractBlockEvents := func(ctx context.Context, block client.Block) {
// Query the block results endpoint for each observed block to get the tx and block events.
Expand Down Expand Up @@ -661,7 +661,7 @@ func (s *relaysSuite) createApplicationAccount(
// cost, and the block duration.
func (s *relaysSuite) getAppFundingAmount(currentBlockHeight int64) sdk.Coin {
currentTestDuration := s.testStartHeight + s.relayLoadDurationBlocks - currentBlockHeight
// Compute teh cost of all relays throughout the test duration.
// Compute the cost of all relays throughout the test duration.
totalRelayCostDuringTestUPOKT := s.relayRatePerApp * s.relayCoinAmountCost * currentTestDuration * blockDurationSec
// Multiply by 2 to make sure the application does not run out of funds
// based on the number of relays it needs to send. Theoretically, `+1` should
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func (s *relaysSuite) sendRelay(iteration uint64, relayPayload string) (appAddre
sendRelayRequest := func(gatewayURL, appAddr, payload string) {
req, err := http.NewRequest("POST", gatewayURL, strings.NewReader(payload))

// TODO_TECHDEBT(red-0ne): Use the app-address instead X-App-Address once PATH Gateway
// TODO_TECHDEBT(red-0ne): Use 'app-address' instead of 'X-App-Address' once PATH Gateway
// deprecates the X-App-Address header.
// This is needed by the PATH Gateway's trusted mode to identify the application
// that is sending the relay request.
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func (s *relaysSuite) ensureFundedActors(ctx context.Context, actors []*accountI
// Add 1 second to the block duration to make sure the deadline is after the next block.
deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1))
ctx, cancel := context.WithDeadline(ctx, deadline)
channel.ForEach(ctx, s.eventsObs, func(ctx context.Context, events []types.Event) {
channel.ForEach(ctx, s.committedEventsObs, func(ctx context.Context, events []types.Event) {
for _, event := range events {
// In the context of ensuring the actors are funded, only the transfer events
// are relevant; filtering out the other events.
Expand Down Expand Up @@ -1129,7 +1129,7 @@ func (s *relaysSuite) ensureFundedActors(ctx context.Context, actors []*accountI
}

// allActorsFunded checks if all the expected actors are funded.
// An error is returned if any of the expected actors was not funded.
// An error is returned if any (at least one) of the expected actors was not funded.
func allActorsFunded(expectedActors []*accountInfo, fundedActors map[string]struct{}) bool {
for _, actor := range expectedActors {
if _, ok := fundedActors[actor.address]; !ok {
Expand All @@ -1155,7 +1155,7 @@ func (s *relaysSuite) ensureStakedActors(
// Add 1 second to the block duration to make sure the deadline is after the next block.
deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1))
ctx, cancel := context.WithDeadline(ctx, deadline)
typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.eventsObs)
typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs)
channel.ForEach(ctx, typedEventsObs, func(ctx context.Context, blockEvents []proto.Message) {
for _, event := range blockEvents {
switch e := event.(type) {
Expand Down Expand Up @@ -1208,7 +1208,7 @@ func (s *relaysSuite) ensureDelegatedApps(

deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1))
ctx, cancel := context.WithDeadline(ctx, deadline)
typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.eventsObs)
typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs)
channel.ForEach(ctx, typedEventsObs, func(ctx context.Context, blockEvents []proto.Message) {
for _, event := range blockEvents {
redelegationEvent, ok := event.(*apptypes.EventRedelegation)
Expand Down Expand Up @@ -1254,16 +1254,9 @@ func allAppsDelegatedToAllGateways(

// getRelayCost fetches the relay cost from the tokenomics module.
func (s *relaysSuite) getRelayCost() int64 {
// Set up the tokenomics client.
flagSet := testclient.NewLocalnetFlagSet(s)
clientCtx := testclient.NewLocalnetClientCtx(s, flagSet)
sharedClient := sharedtypes.NewQueryClient(clientCtx)
relayCost := s.testedService.ComputeUnitsPerRelay * s.sharedParams.ComputeUnitsToTokensMultiplier

res, err := sharedClient.Params(s.ctx, &sharedtypes.QueryParamsRequest{})
require.NoError(s, err)

// multiply by the CUPR
return int64(res.Params.ComputeUnitsToTokensMultiplier)
return int64(relayCost)
}

// getProvisionedActorsCurrentStakedAmount fetches the current stake amount of
Expand Down Expand Up @@ -1408,7 +1401,7 @@ func (s *relaysSuite) parseActorLoadTestIncrementPlans(

// forEachSettlement asynchronously captures the settlement events and processes them.
func (s *relaysSuite) forEachSettlement(ctx context.Context) {
typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.eventsObs)
typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs)
channel.ForEach(
s.ctx,
typedEventsObs,
Expand Down Expand Up @@ -1509,6 +1502,25 @@ func (s *relaysSuite) queryTokenomicsParams(queryNodeRPCURL string) {
s.tokenomicsParams = &res.Params
}

// queryTestedService queries the current service being tested.
func (s *relaysSuite) queryTestedService(queryNodeRPCURL string) {
s.Helper()

deps := depinject.Supply(s.txContext.GetClientCtx())

blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL)
require.NoError(s, err)
deps = depinject.Configs(deps, depinject.Supply(blockQueryClient))

serviceQueryclient, err := query.NewServiceQuerier(deps)
require.NoError(s, err)

service, err := serviceQueryclient.GetService(s.ctx, "anvil")
require.NoError(s, err)

s.testedService = &service
}

// forEachStakedAndDelegatedAppPrepareApp is a ForEachFn that waits for txs which
// were broadcast in previous pipeline stages have been committed. It ensures that
// new applications were successfully staked and all application actors are delegated
Expand Down
29 changes: 16 additions & 13 deletions load-testing/tests/relays_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ type relaysSuite struct {
// txContext is the transaction context used to sign and send transactions.
txContext client.TxContext

// Protocol parameters used in the test.
// Protocol governance params used in the test.
// It is queried at the beginning of the test.
sharedParams *sharedtypes.Params
appParams *apptypes.Params
proofParams *prooftypes.Params
tokenomicsParams *tokenomicstypes.Params

testedService *sharedtypes.Service

// numRelaysSent is the number of relay requests sent during the test.
numRelaysSent atomic.Uint64
// relayRatePerApp is the rate of relay requests sent per application per second.
Expand Down Expand Up @@ -197,8 +199,8 @@ type relaysSuite struct {
// run on ephemeral chain setups like localnet or long living ones (i.e. Test/DevNet).
isEphemeralChain bool

// eventsObs is the observable that maps committed blocks to on-chain events.
eventsObs observable.Observable[[]types.Event]
// committedEventsObs is the observable that maps committed blocks to on-chain events.
committedEventsObs observable.Observable[[]types.Event]

// successfulRelays is the number of relay requests that returned 200 status code.
successfulRelays atomic.Uint64
Expand Down Expand Up @@ -298,9 +300,9 @@ func (s *relaysSuite) LocalnetIsRunning() {
// CometLocalWebsocketURL to the TestNetNode URL. These variables are used
// by the testtx txClient to send transactions to the network.
if !s.isEphemeralChain {
testclient.CometLocalTCPURL = loadTestParams.PRCNode
testclient.CometLocalTCPURL = loadTestParams.RPCNode

webSocketURL, err := url.Parse(loadTestParams.PRCNode)
webSocketURL, err := url.Parse(loadTestParams.RPCNode)
require.NoError(s, err)

// TestNet nodes may be exposed over HTTPS, so adjust the scheme accordingly.
Expand All @@ -312,19 +314,16 @@ func (s *relaysSuite) LocalnetIsRunning() {
testclient.CometLocalWebsocketURL = webSocketURL.String() + "/websocket"

// Update the block duration when running the test on a non-ephemeral chain.
// TODO_TECHDEBT: Get the block duration value from the chain or the manifest.
// TODO_TECHDEBT: Get the block duration value from the chain.
blockDurationSec = 60
}

// Setup the txContext that will be used to send transactions to the network.
s.txContext = testtx.NewLocalnetContext(s.TestingT.(*testing.T))

// Get the relay cost from the tokenomics module.
s.relayCoinAmountCost = s.getRelayCost()

// Setup the event listener for on-chain events to check and assert on transactions
// and finalized blocks results.
s.setupEventListeners(loadTestParams.PRCNode)
s.setupEventListeners(loadTestParams.RPCNode)

// Initialize the funding account.
s.initFundingAccount(loadTestParams.FundingAccountAddress)
Expand All @@ -333,9 +332,13 @@ func (s *relaysSuite) LocalnetIsRunning() {
s.forEachSettlement(s.ctx)

// Query for the current network on-chain params.
s.querySharedParams(loadTestParams.PRCNode)
s.queryAppParams(loadTestParams.PRCNode)
s.queryProofParams(loadTestParams.PRCNode)
s.querySharedParams(loadTestParams.RPCNode)
s.queryAppParams(loadTestParams.RPCNode)
s.queryProofParams(loadTestParams.RPCNode)
s.queryTestedService(loadTestParams.RPCNode)

// Get the relay cost from the tokenomics module.
s.relayCoinAmountCost = s.getRelayCost()

// Some suppliers may already be staked at genesis, ensure that staking during
// this test succeeds by increasing the sake amount.
Expand Down
12 changes: 8 additions & 4 deletions testutil/delays/waitall.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import "sync"
// WaitAll waits for all the provided functions to complete.
// It is used to wait for multiple goroutines to complete before proceeding.
func WaitAll(waitFuncs ...func()) {
wg := sync.WaitGroup{}
if len(waitFuncs) == 0 {
return
}

var wg sync.WaitGroup
wg.Add(len(waitFuncs))

for _, f := range waitFuncs {
for _, fn := range waitFuncs {
go func(f func()) {
defer wg.Done()
f()
wg.Done()
}(f)
}(fn)
}

wg.Wait()
Expand Down
6 changes: 5 additions & 1 deletion testutil/events/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,14 @@ func NewEventTypeMatchFn(matchEventType string) func(*cosmostypes.Event) bool {
}

// AbciEventsToTypedEvents converts the abci events to typed events.
func AbciEventsToTypedEvents(ctx context.Context, abciEventObs observable.Observable[[]abci.Event]) observable.Observable[[]proto.Message] {
func AbciEventsToTypedEvents(
ctx context.Context,
abciEventObs observable.Observable[[]abci.Event],
) observable.Observable[[]proto.Message] {
return channel.Map(ctx, abciEventObs, func(ctx context.Context, events []abci.Event) ([]proto.Message, bool) {
var typedEvents []proto.Message
for _, event := range events {
// TODO_TECHDEBT: Filter out events by event.Type before parsing them.
typedEvent, err := cosmostypes.ParseTypedEvent(event)
if err != nil {
continue
Expand Down

0 comments on commit 48bccb8

Please sign in to comment.