Skip to content

Commit

Permalink
[Telemetry] chore: add probabilistic proof telemetry (#605)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite authored Jun 20, 2024
1 parent 11a60b6 commit 9ff9142
Show file tree
Hide file tree
Showing 20 changed files with 860 additions and 112 deletions.
413 changes: 408 additions & 5 deletions localnet/grafana-dashboards/stress-test-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/relayer/session/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ func (rs *relayerSessionsManager) waitForEarliestCreateClaimsHeight(
}

// we wait for createClaimsWindowOpenHeight to be received before proceeding since we need its hash
// to know where this servicer's claim submission window starts.
// to know where this servicer's claim submission window opens.
rs.logger.Info().
Int64("create_claim_window_start_height", createClaimsWindowOpenHeight).
Int64("create_claim_window_open_height", createClaimsWindowOpenHeight).
Msg("waiting & blocking for global earliest claim submission height")

// TODO_BLOCKER(@bryanchriswhite): The block that'll be used as a source of entropy for
Expand Down
121 changes: 119 additions & 2 deletions telemetry/event_counters.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Package telemetry provides a set of functions for incrementing counters which track
// various events across the codebase. Typically, calls to these counter functions SHOULD
// be made inside deferred anonymous functions so that they will reference the final values
// of their inputs. Any instrumented piece of code which contains branching logic with respect
// its counter function inputs is subject to this constraint (i.e. MUST defer).
package telemetry

import (
Expand All @@ -7,7 +12,24 @@ import (
"github.com/hashicorp/go-metrics"
)

const eventSuccessKey = "event_type"
const eventTypeMetricKey = "event_type"

type ClaimProofStage = string

const (
ClaimProofStageClaimed = ClaimProofStage("claimed")
ClaimProofStageProven = ClaimProofStage("proven")
ClaimProofStageSettled = ClaimProofStage("settled")
ClaimProofStageExpired = ClaimProofStage("expired")
)

type ProofRequirementReason = string

const (
ProofNotRequired = ProofRequirementReason("not_required")
ProofRequirementReasonProbabilistic = ProofRequirementReason("probabilistic_selection")
ProofRequirementReasonThreshold = ProofRequirementReason("above_compute_unit_threshold")
)

// EventSuccessCounter increments a counter with the given data type and success status.
func EventSuccessCounter(
Expand All @@ -19,11 +41,106 @@ func EventSuccessCounter(
value := getValue()

telemetry.IncrCounterWithLabels(
[]string{eventSuccessKey},
[]string{eventTypeMetricKey},
value,
[]metrics.Label{
{Name: "type", Value: eventType},
{Name: "is_successful", Value: successResult},
},
)
}

// ProofRequirementCounter increments a counter which tracks the number of claims
// which require proof for the given proof requirement reason (i.e. not required,
// probabilistic selection, above compute unit threshold).
// If err is not nil, the counter is not incremented and an "error" label is added
// with the error's message.
func ProofRequirementCounter(
reason ProofRequirementReason,
err error,
) {
incrementAmount := 1
isRequired := strconv.FormatBool(reason != ProofNotRequired)
labels := []metrics.Label{
{Name: "proof_required_reason", Value: reason},
{Name: "is_required", Value: isRequired},
}

// Ensure the counter is not incremented if there was an error.
if err != nil {
incrementAmount = 0
labels = AppendErrLabel(err, labels...)
}

telemetry.IncrCounterWithLabels(
[]string{eventTypeMetricKey},
float32(incrementAmount),
labels,
)
}

// ClaimComputeUnitsCounter increments a counter which tracks the number of compute units
// which are represented by on-chain claims at the given ClaimProofStage.
// If err is not nil, the counter is not incremented and an "error" label is added
// with the error's message. I.e., Prometheus will ingest this event.
func ClaimComputeUnitsCounter(
claimProofStage ClaimProofStage,
numComputeUnits uint64,
err error,
) {
incrementAmount := numComputeUnits
labels := []metrics.Label{
{Name: "unit", Value: "compute_units"},
{Name: "claim_proof_stage", Value: claimProofStage},
}

// Ensure the counter is not incremented if there was an error.
if err != nil {
incrementAmount = 0
labels = AppendErrLabel(err, labels...)
}

telemetry.IncrCounterWithLabels(
[]string{eventTypeMetricKey},
float32(incrementAmount),
labels,
)
}

// ClaimCounter increments a counter which tracks the number of claims at the given
// ClaimProofStage.
// If err is not nil, the counter is not incremented and an "error" label is added
// with the error's message. I.e., Prometheus will ingest this event.
func ClaimCounter(
claimProofStage ClaimProofStage,
numClaims uint64,
err error,
) {
incrementAmount := numClaims
labels := []metrics.Label{
{Name: "unit", Value: "claims"},
{Name: "claim_proof_stage", Value: claimProofStage},
}

// Ensure the counter is not incremented if there was an error.
if err != nil {
incrementAmount = 0
labels = AppendErrLabel(err, labels...)
}

telemetry.IncrCounterWithLabels(
[]string{eventTypeMetricKey},
float32(incrementAmount),
labels,
)
}

// AppendErrLabel appends a label with the name "error" and a value of the error's
// message to the given labels slice if the error is not nil.
func AppendErrLabel(err error, labels ...metrics.Label) []metrics.Label {
if err == nil {
return labels
}

return append(labels, metrics.Label{Name: "error", Value: err.Error()})
}
1 change: 1 addition & 0 deletions testutil/integration/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func NewCompleteIntegrationApp(t *testing.T) *App {
accountKeeper,
applicationKeeper,
proofKeeper,
sharedKeeper,
)
tokenomicsModule := tokenomics.NewAppModule(
cdc,
Expand Down
6 changes: 6 additions & 0 deletions testutil/keeper/tokenomics.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func TokenomicsKeeperWithActorAddrs(t testing.TB) (
mockProofKeeper := mocks.NewMockProofKeeper(ctrl)
mockProofKeeper.EXPECT().GetAllClaims(gomock.Any()).AnyTimes()

// Mock the shared keeper
mockSharedKeeper := mocks.NewMockSharedKeeper(ctrl)
mockSharedKeeper.EXPECT().GetProofWindowCloseHeight(gomock.Any(), gomock.Any()).AnyTimes()

k := tokenomicskeeper.NewKeeper(
cdc,
runtime.NewKVStoreService(storeKey),
Expand All @@ -160,6 +164,7 @@ func TokenomicsKeeperWithActorAddrs(t testing.TB) (
mockAccountKeeper,
mockApplicationKeeper,
mockProofKeeper,
mockSharedKeeper,
)

sdkCtx := sdk.NewContext(stateStore, cmtproto.Header{}, false, log.NewNopLogger())
Expand Down Expand Up @@ -326,6 +331,7 @@ func NewTokenomicsModuleKeepers(
accountKeeper,
appKeeper,
proofKeeper,
sharedKeeper,
)

require.NoError(t, tokenomicsKeeper.SetParams(ctx, tokenomicstypes.DefaultParams()))
Expand Down
12 changes: 8 additions & 4 deletions testutil/proof/fixture_generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ func SmstRootWithSum(sum uint64) smt.MerkleRoot {
func RandSmstRootWithSum(t *testing.T, sum uint64) smt.MerkleRoot {
t.Helper()

root := make([]byte, 40)
root := [smt.SmstRootSizeBytes]byte{}
// Only populate the first 32 bytes with random data, leave the last 8 bytes for the sum.
_, err := rand.Read(root[:32])
_, err := rand.Read(root[:smt.SmtRootSizeBytes])
require.NoError(t, err)

binary.BigEndian.PutUint64(root[32:], sum)
return smt.MerkleRoot(root)
binary.BigEndian.PutUint64(root[smt.SmtRootSizeBytes:], sum)
// Insert the count into the root hash
// TODO_TECHDEBT: This is a hard-coded count of 1, but could be a parameter.
// TODO_TECHDEBT: We are assuming the sum takes up 8 bytes.
binary.BigEndian.PutUint64(root[smt.SmtRootSizeBytes+8:], 1)
return smt.MerkleRoot(root[:])
}
33 changes: 23 additions & 10 deletions x/proof/keeper/msg_server_create_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"context"
"errors"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -10,16 +11,29 @@ import (
"github.com/pokt-network/poktroll/x/proof/types"
)

func (k msgServer) CreateClaim(ctx context.Context, msg *types.MsgCreateClaim) (*types.MsgCreateClaimResponse, error) {
func (k msgServer) CreateClaim(
ctx context.Context,
msg *types.MsgCreateClaim,
) (_ *types.MsgCreateClaimResponse, err error) {
// TODO_BLOCKER(@bryanchriswhite): Prevent Claim upserts after the ClaimWindow is closed.
// TODO_BLOCKER(@bryanchriswhite): Validate the signature on the Claim message corresponds to the supplier before Upserting.

isSuccessful := false
defer telemetry.EventSuccessCounter(
"create_claim",
telemetry.DefaultCounterFn,
func() bool { return isSuccessful },
)
// Declare claim to reference in telemetry.
var claim types.Claim

// Defer telemetry calls so that they reference the final values the relevant variables.
defer func() {
// TODO_IMPROVE: We could track on-chain relays here with claim.GetNumRelays().
numComputeUnits, deferredErr := claim.GetNumComputeUnits()
err = errors.Join(err, deferredErr)

telemetry.ClaimCounter(telemetry.ClaimProofStageClaimed, 1, err)
telemetry.ClaimComputeUnitsCounter(
telemetry.ClaimProofStageClaimed,
numComputeUnits,
err,
)
}()

logger := k.Logger().With("method", "CreateClaim")
logger.Info("creating claim")
Expand Down Expand Up @@ -67,8 +81,8 @@ func (k msgServer) CreateClaim(ctx context.Context, msg *types.MsgCreateClaim) (

logger.Info("validated claim")

// Construct and upsert claim after all validation.
claim := types.Claim{
// Assign and upsert claim after all validation.
claim = types.Claim{
SupplierAddress: msg.GetSupplierAddress(),
SessionHeader: sessionHeader,
RootHash: msg.GetRootHash(),
Expand All @@ -81,7 +95,6 @@ func (k msgServer) CreateClaim(ctx context.Context, msg *types.MsgCreateClaim) (

logger.Info("created new claim")

isSuccessful = true
// TODO_BETA: return the claim in the response.
return &types.MsgCreateClaimResponse{}, nil
}
3 changes: 2 additions & 1 deletion x/proof/keeper/msg_server_create_claim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"google.golang.org/grpc/status"

keepertest "github.com/pokt-network/poktroll/testutil/keeper"
testproof "github.com/pokt-network/poktroll/testutil/proof"
"github.com/pokt-network/poktroll/testutil/sample"
testsession "github.com/pokt-network/poktroll/testutil/session"
apptypes "github.com/pokt-network/poktroll/x/application/types"
Expand All @@ -20,7 +21,7 @@ import (
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

var defaultMerkleRoot = []byte{0, 1, 0, 1}
var defaultMerkleRoot = testproof.SmstRootWithSum(10)

func TestMsgServer_CreateClaim_Success(t *testing.T) {
tests := []struct {
Expand Down
32 changes: 23 additions & 9 deletions x/proof/keeper/msg_server_submit_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"hash"

Expand Down Expand Up @@ -49,7 +50,10 @@ func init() {
// to correspond to the supplier signing the proof. For example, a single entity
// could (theoretically) batch multiple proofs (signed by the corresponding supplier)
// into one transaction to save on transaction fees.
func (k msgServer) SubmitProof(ctx context.Context, msg *types.MsgSubmitProof) (*types.MsgSubmitProofResponse, error) {
func (k msgServer) SubmitProof(
ctx context.Context,
msg *types.MsgSubmitProof,
) (_ *types.MsgSubmitProofResponse, err error) {
// TODO_MAINNET: A potential issue with doing proof validation inside
// `SubmitProof` is that we will not be storing false proofs on-chain (e.g. for slashing purposes).
// This could be considered a feature (e.g. less state bloat against sybil attacks)
Expand All @@ -58,12 +62,22 @@ func (k msgServer) SubmitProof(ctx context.Context, msg *types.MsgSubmitProof) (
logger := k.Logger().With("method", "SubmitProof")
logger.Info("About to start submitting proof")

isSuccessful := false
defer telemetry.EventSuccessCounter(
"submit_proof",
telemetry.DefaultCounterFn,
func() bool { return isSuccessful },
)
// Declare claim to reference in telemetry.
claim := new(types.Claim)

// Defer telemetry calls so that they reference the final values the relevant variables.
defer func() {
// TODO_IMPROVE: We could track on-chain relays here with claim.GetNumRelays().
numComputeUnits, deferredErr := claim.GetNumComputeUnits()
err = errors.Join(err, deferredErr)

telemetry.ClaimCounter(telemetry.ClaimProofStageProven, 1, err)
telemetry.ClaimComputeUnitsCounter(
telemetry.ClaimProofStageProven,
numComputeUnits,
err,
)
}()

/*
TODO_BLOCKER(@bryanchriswhite): Document these steps in proof
Expand Down Expand Up @@ -228,10 +242,11 @@ func (k msgServer) SubmitProof(ctx context.Context, msg *types.MsgSubmitProof) (

// Retrieve the corresponding claim for the proof submitted so it can be
// used in the proof validation below.
claim, err := k.queryAndValidateClaimForProof(ctx, msg)
claim, err = k.queryAndValidateClaimForProof(ctx, msg)
if err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}

logger.Debug("successfully retrieved and validated claim")

// Verify the proof's closest merkle proof.
Expand All @@ -254,7 +269,6 @@ func (k msgServer) SubmitProof(ctx context.Context, msg *types.MsgSubmitProof) (
k.UpsertProof(ctx, proof)
logger.Info("successfully upserted the proof")

isSuccessful = true
return &types.MsgSubmitProofResponse{}, nil
}

Expand Down
Loading

0 comments on commit 9ff9142

Please sign in to comment.