From 9ff9142214c091192f2c92c739fb1c42a1077be1 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 20 Jun 2024 10:43:22 +0200 Subject: [PATCH] [Telemetry] chore: add probabilistic proof telemetry (#605) --- .../stress-test-dashboard.json | 413 +++++++++++++++++- pkg/relayer/session/claim.go | 4 +- telemetry/event_counters.go | 121 ++++- testutil/integration/app.go | 1 + testutil/keeper/tokenomics.go | 6 + testutil/proof/fixture_generators.go | 12 +- x/proof/keeper/msg_server_create_claim.go | 33 +- .../keeper/msg_server_create_claim_test.go | 3 +- x/proof/keeper/msg_server_submit_proof.go | 32 +- x/proof/types/claim.go | 42 ++ x/shared/keeper/session.go | 7 + x/tokenomics/keeper/keeper.go | 3 + .../keeper_settle_pending_claims_test.go | 90 +++- x/tokenomics/keeper/proof_requirement_test.go | 29 +- x/tokenomics/keeper/random_test.go | 33 +- x/tokenomics/keeper/settle_pending_claims.go | 80 ++-- .../keeper/settle_session_accounting_test.go | 20 +- x/tokenomics/module/abci.go | 37 +- x/tokenomics/module/module.go | 2 + x/tokenomics/types/expected_keepers.go | 4 +- 20 files changed, 860 insertions(+), 112 deletions(-) create mode 100644 x/proof/types/claim.go diff --git a/localnet/grafana-dashboards/stress-test-dashboard.json b/localnet/grafana-dashboards/stress-test-dashboard.json index d3bd1ed7d..6054b2f6b 100644 --- a/localnet/grafana-dashboards/stress-test-dashboard.json +++ b/localnet/grafana-dashboards/stress-test-dashboard.json @@ -94,6 +94,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -196,6 +197,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -298,6 +300,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -397,6 +400,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -496,6 +500,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -595,6 +600,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -694,6 +700,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -794,6 +801,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -894,6 +902,7 @@ "showLegend": true }, "tooltip": { + "maxHeight": 600, "mode": "single", "sort": "none" } @@ -906,7 +915,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "avg by(jbo) (event_type{container=\"poktrolld-validator\", type=\"submit_proof\", is_successful=\"true\"})", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", claim_proof_stage=\"proven\", unit=\"claims\"})", "fullMetaSearch": false, "hide": false, "includeNullMetadata": false, @@ -924,7 +933,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", type=\"create_claim\", is_successful=\"true\"})", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", claim_proof_stage=\"claimed\", unit=\"claims\"})", "fullMetaSearch": false, "hide": false, "includeNullMetadata": false, @@ -937,11 +946,404 @@ ], "title": "Claims & Proofs", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 24 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", proof_required_reason=\"not_required\"})", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "interval": "", + "legendFormat": "not required", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", proof_required_reason=\"probabilistic_selection\"})", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "probabilistic", + "range": true, + "refId": "B", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", proof_required_reason=\"threshold_selection\"})", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "threshold", + "range": true, + "refId": "C", + "useBackend": false + } + ], + "title": "Proof Requirement", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 24 + }, + "id": 11, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", claim_proof_stage=\"claimed\", unit=\"compute_units\"})", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "interval": "", + "legendFormat": "claimed", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", claim_proof_stage=\"proven\", unit=\"compute_units\"})", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "proven", + "range": true, + "refId": "B", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", claim_proof_stage=\"settled\", unit=\"compute_units\"})", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "settled", + "range": true, + "refId": "C", + "useBackend": false + } + ], + "title": "Compute Units Lifecycle", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 16, + "y": 24 + }, + "id": 13, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", claim_proof_stage=\"settled\", unit=\"claims\"})", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "interval": "", + "legendFormat": "settled", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "avg by(job) (event_type{container=\"poktrolld-validator\", claim_proof_stage=\"expired\", unit=\"claims\"})", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "expired", + "range": true, + "refId": "B", + "useBackend": false + } + ], + "title": "Claim Settlement & Expiration", + "type": "timeseries" } ], - "refresh": "auto", + "refresh": "", "schemaVersion": 39, - "tags": ["protocol"], + "tags": [ + "protocol" + ], "templating": { "list": [] }, @@ -949,10 +1351,11 @@ "from": "now-5m", "to": "now" }, + "timeRangeUpdatedDuringEditOrView": false, "timepicker": {}, "timezone": "browser", "title": "Protocol / Stress test", "uid": "ddkakqetrti4gb", "version": 1, "weekStart": "" -} +} \ No newline at end of file diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go index bc0af2763..2d42911b6 100644 --- a/pkg/relayer/session/claim.go +++ b/pkg/relayer/session/claim.go @@ -97,9 +97,9 @@ func (rs *relayerSessionsManager) waitForEarliestCreateClaimsHeight( } // we wait for createClaimsWindowOpenHeight to be received before proceeding since we need its hash - // to know where this servicer's claim submission window starts. + // to know where this servicer's claim submission window opens. rs.logger.Info(). - Int64("create_claim_window_start_height", createClaimsWindowOpenHeight). + Int64("create_claim_window_open_height", createClaimsWindowOpenHeight). Msg("waiting & blocking for global earliest claim submission height") // TODO_BLOCKER(@bryanchriswhite): The block that'll be used as a source of entropy for diff --git a/telemetry/event_counters.go b/telemetry/event_counters.go index 53a224786..2d222413c 100644 --- a/telemetry/event_counters.go +++ b/telemetry/event_counters.go @@ -1,3 +1,8 @@ +// Package telemetry provides a set of functions for incrementing counters which track +// various events across the codebase. Typically, calls to these counter functions SHOULD +// be made inside deferred anonymous functions so that they will reference the final values +// of their inputs. Any instrumented piece of code which contains branching logic with respect +// its counter function inputs is subject to this constraint (i.e. MUST defer). package telemetry import ( @@ -7,7 +12,24 @@ import ( "github.com/hashicorp/go-metrics" ) -const eventSuccessKey = "event_type" +const eventTypeMetricKey = "event_type" + +type ClaimProofStage = string + +const ( + ClaimProofStageClaimed = ClaimProofStage("claimed") + ClaimProofStageProven = ClaimProofStage("proven") + ClaimProofStageSettled = ClaimProofStage("settled") + ClaimProofStageExpired = ClaimProofStage("expired") +) + +type ProofRequirementReason = string + +const ( + ProofNotRequired = ProofRequirementReason("not_required") + ProofRequirementReasonProbabilistic = ProofRequirementReason("probabilistic_selection") + ProofRequirementReasonThreshold = ProofRequirementReason("above_compute_unit_threshold") +) // EventSuccessCounter increments a counter with the given data type and success status. func EventSuccessCounter( @@ -19,7 +41,7 @@ func EventSuccessCounter( value := getValue() telemetry.IncrCounterWithLabels( - []string{eventSuccessKey}, + []string{eventTypeMetricKey}, value, []metrics.Label{ {Name: "type", Value: eventType}, @@ -27,3 +49,98 @@ func EventSuccessCounter( }, ) } + +// ProofRequirementCounter increments a counter which tracks the number of claims +// which require proof for the given proof requirement reason (i.e. not required, +// probabilistic selection, above compute unit threshold). +// If err is not nil, the counter is not incremented and an "error" label is added +// with the error's message. +func ProofRequirementCounter( + reason ProofRequirementReason, + err error, +) { + incrementAmount := 1 + isRequired := strconv.FormatBool(reason != ProofNotRequired) + labels := []metrics.Label{ + {Name: "proof_required_reason", Value: reason}, + {Name: "is_required", Value: isRequired}, + } + + // Ensure the counter is not incremented if there was an error. + if err != nil { + incrementAmount = 0 + labels = AppendErrLabel(err, labels...) + } + + telemetry.IncrCounterWithLabels( + []string{eventTypeMetricKey}, + float32(incrementAmount), + labels, + ) +} + +// ClaimComputeUnitsCounter increments a counter which tracks the number of compute units +// which are represented by on-chain claims at the given ClaimProofStage. +// If err is not nil, the counter is not incremented and an "error" label is added +// with the error's message. I.e., Prometheus will ingest this event. +func ClaimComputeUnitsCounter( + claimProofStage ClaimProofStage, + numComputeUnits uint64, + err error, +) { + incrementAmount := numComputeUnits + labels := []metrics.Label{ + {Name: "unit", Value: "compute_units"}, + {Name: "claim_proof_stage", Value: claimProofStage}, + } + + // Ensure the counter is not incremented if there was an error. + if err != nil { + incrementAmount = 0 + labels = AppendErrLabel(err, labels...) + } + + telemetry.IncrCounterWithLabels( + []string{eventTypeMetricKey}, + float32(incrementAmount), + labels, + ) +} + +// ClaimCounter increments a counter which tracks the number of claims at the given +// ClaimProofStage. +// If err is not nil, the counter is not incremented and an "error" label is added +// with the error's message. I.e., Prometheus will ingest this event. +func ClaimCounter( + claimProofStage ClaimProofStage, + numClaims uint64, + err error, +) { + incrementAmount := numClaims + labels := []metrics.Label{ + {Name: "unit", Value: "claims"}, + {Name: "claim_proof_stage", Value: claimProofStage}, + } + + // Ensure the counter is not incremented if there was an error. + if err != nil { + incrementAmount = 0 + labels = AppendErrLabel(err, labels...) + } + + telemetry.IncrCounterWithLabels( + []string{eventTypeMetricKey}, + float32(incrementAmount), + labels, + ) +} + +// AppendErrLabel appends a label with the name "error" and a value of the error's +// message to the given labels slice if the error is not nil. +func AppendErrLabel(err error, labels ...metrics.Label) []metrics.Label { + if err == nil { + return labels + } + + return append(labels, metrics.Label{Name: "error", Value: err.Error()}) +} diff --git a/testutil/integration/app.go b/testutil/integration/app.go index 53d48517d..59cbde78d 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -374,6 +374,7 @@ func NewCompleteIntegrationApp(t *testing.T) *App { accountKeeper, applicationKeeper, proofKeeper, + sharedKeeper, ) tokenomicsModule := tokenomics.NewAppModule( cdc, diff --git a/testutil/keeper/tokenomics.go b/testutil/keeper/tokenomics.go index 234012f34..c95aff45a 100644 --- a/testutil/keeper/tokenomics.go +++ b/testutil/keeper/tokenomics.go @@ -151,6 +151,10 @@ func TokenomicsKeeperWithActorAddrs(t testing.TB) ( mockProofKeeper := mocks.NewMockProofKeeper(ctrl) mockProofKeeper.EXPECT().GetAllClaims(gomock.Any()).AnyTimes() + // Mock the shared keeper + mockSharedKeeper := mocks.NewMockSharedKeeper(ctrl) + mockSharedKeeper.EXPECT().GetProofWindowCloseHeight(gomock.Any(), gomock.Any()).AnyTimes() + k := tokenomicskeeper.NewKeeper( cdc, runtime.NewKVStoreService(storeKey), @@ -160,6 +164,7 @@ func TokenomicsKeeperWithActorAddrs(t testing.TB) ( mockAccountKeeper, mockApplicationKeeper, mockProofKeeper, + mockSharedKeeper, ) sdkCtx := sdk.NewContext(stateStore, cmtproto.Header{}, false, log.NewNopLogger()) @@ -326,6 +331,7 @@ func NewTokenomicsModuleKeepers( accountKeeper, appKeeper, proofKeeper, + sharedKeeper, ) require.NoError(t, tokenomicsKeeper.SetParams(ctx, tokenomicstypes.DefaultParams())) diff --git a/testutil/proof/fixture_generators.go b/testutil/proof/fixture_generators.go index e224d53d4..a4d7c24c3 100644 --- a/testutil/proof/fixture_generators.go +++ b/testutil/proof/fixture_generators.go @@ -68,11 +68,15 @@ func SmstRootWithSum(sum uint64) smt.MerkleRoot { func RandSmstRootWithSum(t *testing.T, sum uint64) smt.MerkleRoot { t.Helper() - root := make([]byte, 40) + root := [smt.SmstRootSizeBytes]byte{} // Only populate the first 32 bytes with random data, leave the last 8 bytes for the sum. - _, err := rand.Read(root[:32]) + _, err := rand.Read(root[:smt.SmtRootSizeBytes]) require.NoError(t, err) - binary.BigEndian.PutUint64(root[32:], sum) - return smt.MerkleRoot(root) + binary.BigEndian.PutUint64(root[smt.SmtRootSizeBytes:], sum) + // Insert the count into the root hash + // TODO_TECHDEBT: This is a hard-coded count of 1, but could be a parameter. + // TODO_TECHDEBT: We are assuming the sum takes up 8 bytes. + binary.BigEndian.PutUint64(root[smt.SmtRootSizeBytes+8:], 1) + return smt.MerkleRoot(root[:]) } diff --git a/x/proof/keeper/msg_server_create_claim.go b/x/proof/keeper/msg_server_create_claim.go index 51aec379f..c93177174 100644 --- a/x/proof/keeper/msg_server_create_claim.go +++ b/x/proof/keeper/msg_server_create_claim.go @@ -2,6 +2,7 @@ package keeper import ( "context" + "errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -10,16 +11,29 @@ import ( "github.com/pokt-network/poktroll/x/proof/types" ) -func (k msgServer) CreateClaim(ctx context.Context, msg *types.MsgCreateClaim) (*types.MsgCreateClaimResponse, error) { +func (k msgServer) CreateClaim( + ctx context.Context, + msg *types.MsgCreateClaim, +) (_ *types.MsgCreateClaimResponse, err error) { // TODO_BLOCKER(@bryanchriswhite): Prevent Claim upserts after the ClaimWindow is closed. // TODO_BLOCKER(@bryanchriswhite): Validate the signature on the Claim message corresponds to the supplier before Upserting. - isSuccessful := false - defer telemetry.EventSuccessCounter( - "create_claim", - telemetry.DefaultCounterFn, - func() bool { return isSuccessful }, - ) + // Declare claim to reference in telemetry. + var claim types.Claim + + // Defer telemetry calls so that they reference the final values the relevant variables. + defer func() { + // TODO_IMPROVE: We could track on-chain relays here with claim.GetNumRelays(). + numComputeUnits, deferredErr := claim.GetNumComputeUnits() + err = errors.Join(err, deferredErr) + + telemetry.ClaimCounter(telemetry.ClaimProofStageClaimed, 1, err) + telemetry.ClaimComputeUnitsCounter( + telemetry.ClaimProofStageClaimed, + numComputeUnits, + err, + ) + }() logger := k.Logger().With("method", "CreateClaim") logger.Info("creating claim") @@ -67,8 +81,8 @@ func (k msgServer) CreateClaim(ctx context.Context, msg *types.MsgCreateClaim) ( logger.Info("validated claim") - // Construct and upsert claim after all validation. - claim := types.Claim{ + // Assign and upsert claim after all validation. + claim = types.Claim{ SupplierAddress: msg.GetSupplierAddress(), SessionHeader: sessionHeader, RootHash: msg.GetRootHash(), @@ -81,7 +95,6 @@ func (k msgServer) CreateClaim(ctx context.Context, msg *types.MsgCreateClaim) ( logger.Info("created new claim") - isSuccessful = true // TODO_BETA: return the claim in the response. return &types.MsgCreateClaimResponse{}, nil } diff --git a/x/proof/keeper/msg_server_create_claim_test.go b/x/proof/keeper/msg_server_create_claim_test.go index 2cc398dfc..c1ba1479d 100644 --- a/x/proof/keeper/msg_server_create_claim_test.go +++ b/x/proof/keeper/msg_server_create_claim_test.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/status" keepertest "github.com/pokt-network/poktroll/testutil/keeper" + testproof "github.com/pokt-network/poktroll/testutil/proof" "github.com/pokt-network/poktroll/testutil/sample" testsession "github.com/pokt-network/poktroll/testutil/session" apptypes "github.com/pokt-network/poktroll/x/application/types" @@ -20,7 +21,7 @@ import ( sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) -var defaultMerkleRoot = []byte{0, 1, 0, 1} +var defaultMerkleRoot = testproof.SmstRootWithSum(10) func TestMsgServer_CreateClaim_Success(t *testing.T) { tests := []struct { diff --git a/x/proof/keeper/msg_server_submit_proof.go b/x/proof/keeper/msg_server_submit_proof.go index cc330c860..8cccc80dd 100644 --- a/x/proof/keeper/msg_server_submit_proof.go +++ b/x/proof/keeper/msg_server_submit_proof.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "crypto/sha256" + "errors" "fmt" "hash" @@ -49,7 +50,10 @@ func init() { // to correspond to the supplier signing the proof. For example, a single entity // could (theoretically) batch multiple proofs (signed by the corresponding supplier) // into one transaction to save on transaction fees. -func (k msgServer) SubmitProof(ctx context.Context, msg *types.MsgSubmitProof) (*types.MsgSubmitProofResponse, error) { +func (k msgServer) SubmitProof( + ctx context.Context, + msg *types.MsgSubmitProof, +) (_ *types.MsgSubmitProofResponse, err error) { // TODO_MAINNET: A potential issue with doing proof validation inside // `SubmitProof` is that we will not be storing false proofs on-chain (e.g. for slashing purposes). // This could be considered a feature (e.g. less state bloat against sybil attacks) @@ -58,12 +62,22 @@ func (k msgServer) SubmitProof(ctx context.Context, msg *types.MsgSubmitProof) ( logger := k.Logger().With("method", "SubmitProof") logger.Info("About to start submitting proof") - isSuccessful := false - defer telemetry.EventSuccessCounter( - "submit_proof", - telemetry.DefaultCounterFn, - func() bool { return isSuccessful }, - ) + // Declare claim to reference in telemetry. + claim := new(types.Claim) + + // Defer telemetry calls so that they reference the final values the relevant variables. + defer func() { + // TODO_IMPROVE: We could track on-chain relays here with claim.GetNumRelays(). + numComputeUnits, deferredErr := claim.GetNumComputeUnits() + err = errors.Join(err, deferredErr) + + telemetry.ClaimCounter(telemetry.ClaimProofStageProven, 1, err) + telemetry.ClaimComputeUnitsCounter( + telemetry.ClaimProofStageProven, + numComputeUnits, + err, + ) + }() /* TODO_BLOCKER(@bryanchriswhite): Document these steps in proof @@ -228,10 +242,11 @@ func (k msgServer) SubmitProof(ctx context.Context, msg *types.MsgSubmitProof) ( // Retrieve the corresponding claim for the proof submitted so it can be // used in the proof validation below. - claim, err := k.queryAndValidateClaimForProof(ctx, msg) + claim, err = k.queryAndValidateClaimForProof(ctx, msg) if err != nil { return nil, status.Error(codes.FailedPrecondition, err.Error()) } + logger.Debug("successfully retrieved and validated claim") // Verify the proof's closest merkle proof. @@ -254,7 +269,6 @@ func (k msgServer) SubmitProof(ctx context.Context, msg *types.MsgSubmitProof) ( k.UpsertProof(ctx, proof) logger.Info("successfully upserted the proof") - isSuccessful = true return &types.MsgSubmitProofResponse{}, nil } diff --git a/x/proof/types/claim.go b/x/proof/types/claim.go new file mode 100644 index 000000000..f32c70ad6 --- /dev/null +++ b/x/proof/types/claim.go @@ -0,0 +1,42 @@ +package types + +import ( + "fmt" + + "github.com/pokt-network/smt" +) + +// GetNumComputeUnits returns the number of compute units for a given claim +// as determined by the sum of the root hash. +func (claim *Claim) GetNumComputeUnits() (numComputeUnits uint64, err error) { + // NB: smt.MerkleRoot#Sum() will panic if the root hash is not valid. + // Convert this panic into an error return. + defer func() { + if r := recover(); r != nil { + numComputeUnits = 0 + err = fmt.Errorf( + "unable to get sum of invalid merkle root: %x; error: %v", + claim.GetRootHash(), r, + ) + } + }() + + return smt.MerkleRoot(claim.GetRootHash()).Sum(), nil +} + +// GetNumRelays returns the number of relays for a given claim +// as determined by the count of the root hash. +func (claim *Claim) GetNumRelays() (numRelays uint64, err error) { + // Convert this panic into an error return. + defer func() { + if r := recover(); r != nil { + numRelays = 0 + err = fmt.Errorf( + "unable to get count of invalid merkle root: %x; error: %v", + claim.GetRootHash(), r, + ) + } + }() + + return smt.MerkleRoot(claim.GetRootHash()).Count(), nil +} diff --git a/x/shared/keeper/session.go b/x/shared/keeper/session.go index c0d414c5f..3e5619bc8 100644 --- a/x/shared/keeper/session.go +++ b/x/shared/keeper/session.go @@ -33,3 +33,10 @@ func (k Keeper) GetSessionNumber(ctx context.Context, queryHeight int64) int64 { sharedParams := k.GetParams(ctx) return shared.GetSessionNumber(&sharedParams, queryHeight) } + +// GetProofWindowCloseHeight returns the block height at which the proof window of +// the session that includes queryHeight closes, given the passed sharedParams. +func (k Keeper) GetProofWindowCloseHeight(ctx context.Context, queryHeight int64) int64 { + sharedParams := k.GetParams(ctx) + return shared.GetProofWindowCloseHeight(&sharedParams, queryHeight) +} diff --git a/x/tokenomics/keeper/keeper.go b/x/tokenomics/keeper/keeper.go index f1e254164..7e6c13b87 100644 --- a/x/tokenomics/keeper/keeper.go +++ b/x/tokenomics/keeper/keeper.go @@ -25,6 +25,7 @@ type Keeper struct { accountKeeper types.AccountKeeper applicationKeeper types.ApplicationKeeper proofKeeper types.ProofKeeper + sharedKeeper types.SharedKeeper } func NewKeeper( @@ -37,6 +38,7 @@ func NewKeeper( accountKeeper types.AccountKeeper, applicationKeeper types.ApplicationKeeper, proofKeeper types.ProofKeeper, + sharedKeeper types.SharedKeeper, ) Keeper { if _, err := sdk.AccAddressFromBech32(authority); err != nil { panic(fmt.Sprintf("invalid authority address: %s", authority)) @@ -52,6 +54,7 @@ func NewKeeper( accountKeeper: accountKeeper, applicationKeeper: applicationKeeper, proofKeeper: proofKeeper, + sharedKeeper: sharedKeeper, } } diff --git a/x/tokenomics/keeper/keeper_settle_pending_claims_test.go b/x/tokenomics/keeper/keeper_settle_pending_claims_test.go index 8950607bb..1b2d2cc69 100644 --- a/x/tokenomics/keeper/keeper_settle_pending_claims_test.go +++ b/x/tokenomics/keeper/keeper_settle_pending_claims_test.go @@ -120,7 +120,7 @@ func (s *TestSuite) TestSettlePendingClaims_ClaimPendingBeforeSettlement() { // Expectations: No claims should be settled because the session is still ongoing blockHeight := claim.SessionHeader.SessionEndBlockHeight - 2 // session is still active sdkCtx = sdkCtx.WithBlockHeight(blockHeight) - numClaimsSettled, numClaimsExpired, _, err := s.keepers.SettlePendingClaims(sdkCtx) + numClaimsSettled, numClaimsExpired, _, _, err := s.keepers.SettlePendingClaims(sdkCtx) require.NoError(t, err) // Check that no claims were settled. @@ -145,7 +145,7 @@ func (s *TestSuite) TestSettlePendingClaims_ClaimPendingBeforeSettlement() { // 2. Settle pending claims just after the session ended. // Expectations: Claims should not be settled because the proof window hasn't closed yet. sdkCtx = sdkCtx.WithBlockHeight(blockHeight) - numClaimsSettled, numClaimsExpired, _, err = s.keepers.SettlePendingClaims(sdkCtx) + numClaimsSettled, numClaimsExpired, _, _, err = s.keepers.SettlePendingClaims(sdkCtx) // Check that no claims were settled require.NoError(t, err) require.Equal(t, uint64(0), numClaimsSettled) @@ -175,7 +175,7 @@ func (s *TestSuite) TestSettlePendingClaims_ClaimExpired_ProofRequiredAndNotProv // NB: proofs should be rejected when the current height equals the proof window close height. blockHeight := shared.GetProofWindowCloseHeight(&sharedParams, claim.SessionHeader.SessionEndBlockHeight) sdkCtx = sdkCtx.WithBlockHeight(blockHeight) - numClaimsSettled, numClaimsExpired, _, err := s.keepers.SettlePendingClaims(sdkCtx) + numClaimsSettled, numClaimsExpired, _, _, err := s.keepers.SettlePendingClaims(sdkCtx) require.NoError(t, err) // Check that no claims were settled. @@ -220,7 +220,7 @@ func (s *TestSuite) TestSettlePendingClaims_ClaimSettled_ProofRequiredAndProvide // NB: proofs should be rejected when the current height equals the proof window close height. blockHeight := shared.GetProofWindowCloseHeight(&sharedParams, claim.SessionHeader.SessionEndBlockHeight) sdkCtx = sdkCtx.WithBlockHeight(blockHeight) - numClaimsSettled, numClaimsExpired, _, err := s.keepers.SettlePendingClaims(sdkCtx) + numClaimsSettled, numClaimsExpired, _, _, err := s.keepers.SettlePendingClaims(sdkCtx) require.NoError(t, err) // Check that one claim was settled. @@ -275,7 +275,7 @@ func (s *TestSuite) TestClaimSettlement_ClaimSettled_ProofRequiredAndProvided_Vi // NB: proof window has definitely closed at this point blockHeight := shared.GetProofWindowCloseHeight(&sharedParams, claim.SessionHeader.SessionEndBlockHeight) sdkCtx = sdkCtx.WithBlockHeight(blockHeight) - numClaimsSettled, numClaimsExpired, _, err := s.keepers.SettlePendingClaims(sdkCtx) + numClaimsSettled, numClaimsExpired, _, _, err := s.keepers.SettlePendingClaims(sdkCtx) require.NoError(t, err) // Check that one claim was settled. @@ -326,7 +326,7 @@ func (s *TestSuite) TestSettlePendingClaims_Settles_WhenAProofIsNotRequired() { // NB: proofs should be rejected when the current height equals the proof window close height. blockHeight := shared.GetProofWindowCloseHeight(&sharedParams, claim.SessionHeader.SessionEndBlockHeight) sdkCtx = sdkCtx.WithBlockHeight(blockHeight) - numClaimsSettled, numClaimsExpired, _, err := s.keepers.SettlePendingClaims(sdkCtx) + numClaimsSettled, numClaimsExpired, _, _, err := s.keepers.SettlePendingClaims(sdkCtx) require.NoError(t, err) // Check that one claim was settled. @@ -386,3 +386,81 @@ func (s *TestSuite) getClaimEvent(events cosmostypes.Events, protoType string) p require.NotEqual(s.T(), 1, numExpectedEvents, "Expected exactly one claim event") return nil } + +func (s *TestSuite) TestSettlePendingClaims_ClaimPendingAfterSettlement() { + // Retrieve default values + t := s.T() + ctx := s.ctx + sdkCtx := cosmostypes.UnwrapSDKContext(ctx) + sharedParams := s.keepers.SharedKeeper.GetParams(ctx) + + // Set the proof parameters such that s.claim DOES NOT require a proof + // because the proof_request_probability is 0% and the proof_request_threshold + // is greater than the claims' compute units. + err := s.keepers.ProofKeeper.SetParams(ctx, prooftypes.Params{ + ProofRequestProbability: 0, + // +1 to push the threshold above s.claim's compute units + ProofRequirementThreshold: s.expectedComputeUnits + 1, + }) + require.NoError(t, err) + + // 0. Add the claims & verify they exists + sessionOneClaim := s.claim + s.keepers.UpsertClaim(ctx, sessionOneClaim) + + sessionOneStartHeight := sessionOneClaim.GetSessionHeader().GetSessionEndBlockHeight() + // Add a second claim with a session header corresponding to the next session. + sessionTwoClaim := testutilproof.BaseClaim( + sessionOneClaim.GetSessionHeader().GetApplicationAddress(), + sessionOneClaim.GetSupplierAddress(), + s.expectedComputeUnits, + ) + + sessionOneProofWindowCloseHeight := shared.GetProofWindowCloseHeight(&sharedParams, sessionOneStartHeight) + sessionTwoStartHeight := shared.GetSessionStartHeight(&sharedParams, sessionOneProofWindowCloseHeight+1) + sessionTwoProofWindowCloseHeight := shared.GetProofWindowCloseHeight(&sharedParams, sessionTwoStartHeight) + + sessionTwoClaim.SessionHeader = &sessiontypes.SessionHeader{ + ApplicationAddress: sessionOneClaim.GetSessionHeader().GetApplicationAddress(), + Service: s.claim.GetSessionHeader().GetService(), + SessionId: "session_two_id", + SessionStartBlockHeight: sessionTwoStartHeight, + SessionEndBlockHeight: shared.GetSessionEndHeight(&sharedParams, sessionTwoStartHeight), + } + s.keepers.UpsertClaim(ctx, sessionTwoClaim) + + claims := s.keepers.GetAllClaims(ctx) + s.Require().Equalf(2, len(claims), "expected %d claims, got %d", 2, len(claims)) + + // 1. Settle pending claims while the session is still active. + // Expectations: No claims should be settled because the session is still ongoing + blockHeight := shared.GetProofWindowCloseHeight(&sharedParams, sessionOneStartHeight) + sdkCtx = sdkCtx.WithBlockHeight(blockHeight) + numClaimsSettled, numClaimsExpired, _, _, err := s.keepers.SettlePendingClaims(sdkCtx) + require.NoError(t, err) + + // Check that one claim was settled. + require.Equal(t, uint64(1), numClaimsSettled) + + // Validate that no claims expired. + require.Equal(t, uint64(0), numClaimsExpired) + + // Validate that one claim still remains. + claims = s.keepers.GetAllClaims(ctx) + require.Len(t, claims, 1) + + // Calculate a block height which is within session two's proof window. + blockHeight = (sessionTwoProofWindowCloseHeight - sessionTwoStartHeight) / 2 + + // 2. Settle pending claims just after the session ended. + // Expectations: Claims should not be settled because the proof window hasn't closed yet. + sdkCtx = sdkCtx.WithBlockHeight(blockHeight) + numClaimsSettled, numClaimsExpired, _, _, err = s.keepers.SettlePendingClaims(sdkCtx) + // Check that no claims were settled + require.NoError(t, err) + require.Equal(t, uint64(0), numClaimsSettled) + require.Equal(t, uint64(0), numClaimsExpired) + // Validate that the claim still exists + claims = s.keepers.GetAllClaims(ctx) + require.Len(t, claims, 1) +} diff --git a/x/tokenomics/keeper/proof_requirement_test.go b/x/tokenomics/keeper/proof_requirement_test.go index 28f5a32b7..7d4ef3d1f 100644 --- a/x/tokenomics/keeper/proof_requirement_test.go +++ b/x/tokenomics/keeper/proof_requirement_test.go @@ -2,6 +2,8 @@ package keeper_test import ( "math/rand" + "sync" + "sync/atomic" "testing" "cosmossdk.io/log" @@ -35,28 +37,33 @@ func TestKeeper_IsProofRequired(t *testing.T) { probability = prooftypes.DefaultProofRequestProbability tolerance = 0.01 - numTrueSamples, numFalseSamples int + numTrueSamples atomic.Int64 ) + // Sample concurrently to save time. + wg := sync.WaitGroup{} for i := 0; i < sampleSize; i++ { - claim := tetsproof.ClaimWithRandomHash(t, sample.AccAddress(), sample.AccAddress(), expectedComputeUnits) + wg.Add(1) + go func() { + claim := tetsproof.ClaimWithRandomHash(t, sample.AccAddress(), sample.AccAddress(), expectedComputeUnits) - isRequired, err := keepers.Keeper.IsProofRequiredForClaim(sdkCtx, &claim) - require.NoError(t, err) + isRequired, err := keepers.Keeper.IsProofRequiredForClaim(sdkCtx, &claim) + require.NoError(t, err) - switch isRequired { - case true: - numTrueSamples++ - case false: - numFalseSamples++ - } + if isRequired { + numTrueSamples.Add(1) + } + wg.Done() + }() } + wg.Wait() expectedNumTrueSamples := float32(sampleSize) * probability expectedNumFalseSamples := float32(sampleSize) * (1 - probability) toleranceSamples := tolerance * float64(sampleSize) // Check that the number of samples for each outcome is within the expected range. - require.InDeltaf(t, expectedNumTrueSamples, numTrueSamples, toleranceSamples, "true samples") + numFalseSamples := int64(sampleSize) - numTrueSamples.Load() + require.InDeltaf(t, expectedNumTrueSamples, numTrueSamples.Load(), toleranceSamples, "true samples") require.InDeltaf(t, expectedNumFalseSamples, numFalseSamples, toleranceSamples, "false samples") } diff --git a/x/tokenomics/keeper/random_test.go b/x/tokenomics/keeper/random_test.go index 3128afcf3..a23c31158 100644 --- a/x/tokenomics/keeper/random_test.go +++ b/x/tokenomics/keeper/random_test.go @@ -2,6 +2,8 @@ package keeper import ( "math" + "sync" + "sync/atomic" "testing" "github.com/stretchr/testify/require" @@ -16,30 +18,35 @@ func TestRandProbability(t *testing.T) { sampleSize := requiredSampleSize(float64(probability), tolerance, confidence) - var numTrueSamples, numFalseSamples int + var numTrueSamples atomic.Int64 + // Sample concurrently to save time. + wg := sync.WaitGroup{} for i := 0; i < sampleSize; i++ { - rand, err := randProbability(int64(i)) - require.NoError(t, err) + wg.Add(1) + go func() { + rand, err := randProbability(int64(i)) + require.NoError(t, err) - if rand < 0 || rand > 1 { - t.Fatalf("secureRandFloat64() returned out of bounds value: %f", rand) - } + if rand < 0 || rand > 1 { + t.Fatalf("secureRandFloat64() returned out of bounds value: %f", rand) + } - switch rand <= probability { - case true: - numTrueSamples++ - case false: - numFalseSamples++ - } + if rand <= probability { + numTrueSamples.Add(1) + } + wg.Done() + }() } + wg.Wait() expectedNumTrueSamples := float32(sampleSize) * probability expectedNumFalseSamples := float32(sampleSize) * (1 - probability) toleranceSamples := tolerance * float64(sampleSize) // Check that the number of samples for each outcome is within the expected range. - require.InDeltaf(t, expectedNumTrueSamples, numTrueSamples, toleranceSamples, "true samples") + numFalseSamples := int64(sampleSize) - numTrueSamples.Load() + require.InDeltaf(t, expectedNumTrueSamples, numTrueSamples.Load(), toleranceSamples, "true samples") require.InDeltaf(t, expectedNumFalseSamples, numFalseSamples, toleranceSamples, "false samples") } diff --git a/x/tokenomics/keeper/settle_pending_claims.go b/x/tokenomics/keeper/settle_pending_claims.go index ecc1f20c8..f2cac6137 100644 --- a/x/tokenomics/keeper/settle_pending_claims.go +++ b/x/tokenomics/keeper/settle_pending_claims.go @@ -7,11 +7,9 @@ import ( "math/rand" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/pokt-network/smt" "github.com/pokt-network/poktroll/telemetry" prooftypes "github.com/pokt-network/poktroll/x/proof/types" - "github.com/pokt-network/poktroll/x/shared" "github.com/pokt-network/poktroll/x/tokenomics/types" ) @@ -21,26 +19,16 @@ import ( // If a claim is expired and does NOT require a proof -> it's settled. // Events are emitted for each claim that is settled or removed. // On-chain Claims & Proofs are deleted after they're settled or expired to free up space. +// +// TODO_TECHDEBT: Refactor this function to return a struct instead of multiple return values. func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( numClaimsSettled, numClaimsExpired uint64, relaysPerServiceMap map[string]uint64, + computeUnitsPerServiceMap map[string]uint64, err error, ) { logger := k.Logger().With("method", "SettlePendingClaims") - isSuccessful := false - defer telemetry.EventSuccessCounter( - "claims_settled", - func() float32 { return float32(numClaimsSettled) }, - func() bool { return isSuccessful }, - ) - - defer telemetry.EventSuccessCounter( - "claims_expired", - func() float32 { return float32(numClaimsExpired) }, - func() bool { return isSuccessful }, - ) - // TODO_BLOCKER(@Olshansk): Optimize this by indexing expiringClaims appropriately // and only retrieving the expiringClaims that need to be settled rather than all // of them and iterating through them one by one. @@ -51,17 +39,23 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( logger.Info(fmt.Sprintf("found %d expiring claims at block height %d", len(expiringClaims), blockHeight)) relaysPerServiceMap = make(map[string]uint64) + computeUnitsPerServiceMap = make(map[string]uint64) for _, claim := range expiringClaims { - // Retrieve the number of compute units in the claim for the events emitted - root := (smt.MerkleRoot)(claim.GetRootHash()) // NB: Note that not every (Req, Res) pair in the session is inserted in // the tree for scalability reasons. This is the count of non-empty leaves // that matched the necessary difficulty and is therefore an estimation // of the total number of relays serviced and work done. - claimComputeUnits := root.Sum() - numRelaysInSessionTree := root.Count() + numClaimComputeUnits, err := claim.GetNumComputeUnits() + if err != nil { + return 0, 0, relaysPerServiceMap, computeUnitsPerServiceMap, err + } + + numRelaysInSessionTree, err := claim.GetNumRelays() + if err != nil { + return 0, 0, relaysPerServiceMap, computeUnitsPerServiceMap, err + } sessionId := claim.SessionHeader.SessionId @@ -70,7 +64,7 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( // claim required an on-chain proof isProofRequiredForClaim, err := k.isProofRequiredForClaim(ctx, &claim) if err != nil { - return 0, 0, nil, err + return 0, 0, relaysPerServiceMap, computeUnitsPerServiceMap, err } if isProofRequiredForClaim { // If a proof is not found, the claim will expire and never be settled. @@ -78,10 +72,10 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( // Emit an event that a claim has expired and being removed without being settled. claimExpiredEvent := types.EventClaimExpired{ Claim: &claim, - ComputeUnits: claimComputeUnits, + ComputeUnits: numClaimComputeUnits, } if err := ctx.EventManager().EmitTypedEvent(&claimExpiredEvent); err != nil { - return 0, 0, relaysPerServiceMap, err + return 0, 0, relaysPerServiceMap, computeUnitsPerServiceMap, err } // The claim & proof are no longer necessary, so there's no need for them // to take up on-chain space. @@ -97,16 +91,16 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( // Manage the mint & burn accounting for the claim. if err := k.SettleSessionAccounting(ctx, &claim); err != nil { logger.Error(fmt.Sprintf("error settling session accounting for claim %q: %v", claim.SessionHeader.SessionId, err)) - return 0, 0, relaysPerServiceMap, err + return 0, 0, relaysPerServiceMap, computeUnitsPerServiceMap, err } claimSettledEvent := types.EventClaimSettled{ Claim: &claim, - ComputeUnits: claimComputeUnits, + ComputeUnits: numClaimComputeUnits, ProofRequired: isProofRequiredForClaim, } if err := ctx.EventManager().EmitTypedEvent(&claimSettledEvent); err != nil { - return 0, 0, relaysPerServiceMap, err + return 0, 0, relaysPerServiceMap, computeUnitsPerServiceMap, err } // The claim & proof are no longer necessary, so there's no need for them @@ -121,6 +115,7 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( } relaysPerServiceMap[claim.SessionHeader.Service.Id] += numRelaysInSessionTree + computeUnitsPerServiceMap[claim.SessionHeader.Service.Id] += numClaimComputeUnits numClaimsSettled++ logger.Info(fmt.Sprintf("Successfully settled claim for session ID %q at block height %d", claim.SessionHeader.SessionId, blockHeight)) @@ -128,8 +123,7 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( logger.Info(fmt.Sprintf("settled %d and expired %d claims at block height %d", numClaimsSettled, numClaimsExpired, blockHeight)) - isSuccessful = true - return numClaimsSettled, numClaimsExpired, relaysPerServiceMap, nil + return numClaimsSettled, numClaimsExpired, relaysPerServiceMap, computeUnitsPerServiceMap, nil } // getExpiringClaims returns all claims that are expiring at the current block height. @@ -139,11 +133,6 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( func (k Keeper) getExpiringClaims(ctx sdk.Context) (expiringClaims []prooftypes.Claim) { blockHeight := ctx.BlockHeight() - // TODO_BLOCKER(@bryanchriswhite): query the on-chain governance parameter once available. - // `* 3` is just a random factor Olshansky added for now to make sure expiration - // doesn't happen immediately after a session's grace period is complete. - submitProofWindowEndHeight := shared.SessionGracePeriodBlocks * int64(3) - // TODO_TECHDEBT: Optimize this by indexing claims appropriately // and only retrieving the claims that need to be settled rather than all // of them and iterating through them one by one. @@ -151,7 +140,8 @@ func (k Keeper) getExpiringClaims(ctx sdk.Context) (expiringClaims []prooftypes. // Loop over all claims we need to check for expiration for _, claim := range claims { - expirationHeight := claim.SessionHeader.SessionEndBlockHeight + submitProofWindowEndHeight + claimSessionStartHeight := claim.GetSessionHeader().GetSessionStartBlockHeight() + expirationHeight := k.sharedKeeper.GetProofWindowCloseHeight(ctx, claimSessionStartHeight) if blockHeight >= expirationHeight { expiringClaims = append(expiringClaims, claim) } @@ -165,13 +155,23 @@ func (k Keeper) getExpiringClaims(ctx sdk.Context) (expiringClaims []prooftypes. // If it is not, the claim will be settled without a proof. // If it is, the claim will only be settled if a valid proof is available. // TODO_BLOCKER(@bryanchriswhite, #419): Document safety assumptions of the probabilistic proofs mechanism. -func (k Keeper) isProofRequiredForClaim(ctx sdk.Context, claim *prooftypes.Claim) (bool, error) { +func (k Keeper) isProofRequiredForClaim(ctx sdk.Context, claim *prooftypes.Claim) (_ bool, err error) { logger := k.logger.With("method", "isProofRequiredForClaim") + var requirementReason = telemetry.ProofNotRequired + + // Defer telemetry calls so that they reference the final values the relevant variables. + defer func() { + telemetry.ProofRequirementCounter(requirementReason, err) + }() + // NB: Assumption that claim is non-nil and has a valid root sum because it // is retrieved from the store and validated, on-chain, at time of creation. - root := (smt.MerkleRoot)(claim.GetRootHash()) - claimComputeUnits := root.Sum() + claimComputeUnits, err := claim.GetNumComputeUnits() + if err != nil { + return true, err + } + proofParams := k.proofKeeper.GetParams(ctx) // Require a proof if the claim's compute units meets or exceeds the threshold. @@ -179,7 +179,13 @@ func (k Keeper) isProofRequiredForClaim(ctx sdk.Context, claim *prooftypes.Claim // TODO_BLOCKER(@bryanchriswhite, #419): This is just VERY BASIC placeholder logic to have something // in place while we implement proper probabilistic proofs. If you're reading it, // do not overthink it and look at the documents linked in #419. + // + // TODO_IMPROVE(@bryanchriswhite, @red-0ne): It might make sense to include + // whether there was a proof submission error downstream from here. This would + // require a more comprehensive metrics API. if claimComputeUnits >= proofParams.GetProofRequirementThreshold() { + requirementReason = telemetry.ProofRequirementReasonThreshold + logger.Info(fmt.Sprintf( "claim requires proof due to compute units (%d) exceeding threshold (%d)", claimComputeUnits, @@ -203,6 +209,8 @@ func (k Keeper) isProofRequiredForClaim(ctx sdk.Context, claim *prooftypes.Claim // NB: A random value between 0 and 1 will be less than or equal to proof_request_probability // with probability equal to the proof_request_probability. if randFloat <= proofParams.GetProofRequestProbability() { + requirementReason = telemetry.ProofRequirementReasonProbabilistic + logger.Info(fmt.Sprintf( "claim requires proof due to random sample (%.2f) being less than or equal to probability (%.2f)", randFloat, diff --git a/x/tokenomics/keeper/settle_session_accounting_test.go b/x/tokenomics/keeper/settle_session_accounting_test.go index e5ab348b2..6319eefd7 100644 --- a/x/tokenomics/keeper/settle_session_accounting_test.go +++ b/x/tokenomics/keeper/settle_session_accounting_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" testkeeper "github.com/pokt-network/poktroll/testutil/keeper" - testutilproof "github.com/pokt-network/poktroll/testutil/proof" + testproof "github.com/pokt-network/poktroll/testutil/proof" "github.com/pokt-network/poktroll/testutil/sample" testsession "github.com/pokt-network/poktroll/testutil/session" apptypes "github.com/pokt-network/poktroll/x/application/types" @@ -52,7 +52,7 @@ func TestSettleSessionAccounting_HandleAppGoingIntoDebt(t *testing.T) { SessionStartBlockHeight: 1, SessionEndBlockHeight: testsession.GetSessionEndHeightWithDefaultParams(1), }, - RootHash: testutilproof.SmstRootWithSum(appStake.Amount.Uint64() + 1), // More than the app stake + RootHash: testproof.SmstRootWithSum(appStake.Amount.Uint64() + 1), // More than the app stake } err := keepers.SettleSessionAccounting(ctx, &claim) @@ -93,7 +93,7 @@ func TestSettleSessionAccounting_AppNotFound(t *testing.T) { SessionStartBlockHeight: 1, SessionEndBlockHeight: testsession.GetSessionEndHeightWithDefaultParams(1), }, - RootHash: testutilproof.SmstRootWithSum(42), + RootHash: testproof.SmstRootWithSum(42), } err := keeper.SettleSessionAccounting(ctx, &claim) @@ -144,7 +144,7 @@ func TestSettleSessionAccounting_InvalidRoot(t *testing.T) { { desc: "correct size and a valid value", root: func() []byte { - root := testutilproof.SmstRootWithSum(42) + root := testproof.SmstRootWithSum(42) return root[:] }(), errExpected: false, @@ -162,7 +162,7 @@ func TestSettleSessionAccounting_InvalidRoot(t *testing.T) { }() // Setup claim by copying the testproof.BaseClaim and updating the root - claim := testutilproof.BaseClaim(appAddr, supplierAddr, 0) + claim := testproof.BaseClaim(appAddr, supplierAddr, 0) claim.RootHash = smt.MerkleRoot(test.root[:]) // Execute test function @@ -199,7 +199,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { { desc: "Valid Claim", claim: func() *prooftypes.Claim { - claim := testutilproof.BaseClaim(appAddr, supplierAddr, 42) + claim := testproof.BaseClaim(appAddr, supplierAddr, 42) return &claim }(), errExpected: false, @@ -213,7 +213,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { { desc: "Claim with nil session header", claim: func() *prooftypes.Claim { - claim := testutilproof.BaseClaim(appAddr, supplierAddr, 42) + claim := testproof.BaseClaim(appAddr, supplierAddr, 42) claim.SessionHeader = nil return &claim }(), @@ -223,7 +223,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { { desc: "Claim with invalid session id", claim: func() *prooftypes.Claim { - claim := testutilproof.BaseClaim(appAddr, supplierAddr, 42) + claim := testproof.BaseClaim(appAddr, supplierAddr, 42) claim.SessionHeader.SessionId = "" return &claim }(), @@ -233,7 +233,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { { desc: "Claim with invalid application address", claim: func() *prooftypes.Claim { - claim := testutilproof.BaseClaim(appAddr, supplierAddr, 42) + claim := testproof.BaseClaim(appAddr, supplierAddr, 42) claim.SessionHeader.ApplicationAddress = "invalid address" return &claim }(), @@ -243,7 +243,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { { desc: "Claim with invalid supplier address", claim: func() *prooftypes.Claim { - claim := testutilproof.BaseClaim(appAddr, supplierAddr, 42) + claim := testproof.BaseClaim(appAddr, supplierAddr, 42) claim.SupplierAddress = "invalid address" return &claim }(), diff --git a/x/tokenomics/module/abci.go b/x/tokenomics/module/abci.go index 7654633a1..3994da681 100644 --- a/x/tokenomics/module/abci.go +++ b/x/tokenomics/module/abci.go @@ -5,11 +5,12 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/pokt-network/poktroll/telemetry" "github.com/pokt-network/poktroll/x/tokenomics/keeper" ) // EndBlocker called at every block and settles all pending claims. -func EndBlocker(ctx sdk.Context, k keeper.Keeper) error { +func EndBlocker(ctx sdk.Context, k keeper.Keeper) (err error) { logger := k.Logger().With("method", "EndBlocker") // NB: There are two main reasons why we settle expiring claims in the end // instead of when a proof is submitted: @@ -17,11 +18,43 @@ func EndBlocker(ctx sdk.Context, k keeper.Keeper) error { // even without a proof to be able to scale to unbounded Claims & Proofs. // 2. Implementation - This cannot be done from the `x/proof` module because // it would create a circular dependency. - numClaimsSettled, numClaimsExpired, relaysPerServiceMap, err := k.SettlePendingClaims(ctx) + numClaimsSettled, + numClaimsExpired, + relaysPerServiceMap, + computeUnitsPerServiceMap, + err := k.SettlePendingClaims(ctx) if err != nil { logger.Error(fmt.Sprintf("could not settle pending claims due to error %v", err)) return err } + + // Accumulate compute units for metrics. + // TODO_IMPROVE(@bryanchriswhite, @red-0ne): It would be preferable to have telemetry + // counter functions return an "event" or "event set", similar to how polylog/zerolog work. + var numComputeUnits uint64 + for _, serviceComputeUnits := range computeUnitsPerServiceMap { + numComputeUnits += serviceComputeUnits + } + + // Defer telemetry calls so that they reference the final values the relevant variables. + defer func() { + telemetry.ClaimComputeUnitsCounter( + telemetry.ClaimProofStageSettled, + numComputeUnits, + err, + ) + telemetry.ClaimCounter( + telemetry.ClaimProofStageSettled, + numClaimsSettled, + err, + ) + telemetry.ClaimCounter( + telemetry.ClaimProofStageExpired, + numClaimsExpired, + err, + ) + }() + logger.Info(fmt.Sprintf("settled %d claims and expired %d claims", numClaimsSettled, numClaimsExpired)) // Update the relay mining difficulty for every service that settled pending diff --git a/x/tokenomics/module/module.go b/x/tokenomics/module/module.go index 95b7bb152..f4c2b388e 100644 --- a/x/tokenomics/module/module.go +++ b/x/tokenomics/module/module.go @@ -181,6 +181,7 @@ type ModuleInputs struct { BankKeeper types.BankKeeper ApplicationKeeper types.ApplicationKeeper ProofKeeper types.ProofKeeper + SharedKeeper types.SharedKeeper } type ModuleOutputs struct { @@ -205,6 +206,7 @@ func ProvideModule(in ModuleInputs) ModuleOutputs { in.AccountKeeper, in.ApplicationKeeper, in.ProofKeeper, + in.SharedKeeper, ) m := NewAppModule( in.Cdc, diff --git a/x/tokenomics/types/expected_keepers.go b/x/tokenomics/types/expected_keepers.go index bbee57ecf..fb82fa7d5 100644 --- a/x/tokenomics/types/expected_keepers.go +++ b/x/tokenomics/types/expected_keepers.go @@ -1,4 +1,4 @@ -//go:generate mockgen -destination ../../../testutil/tokenomics/mocks/expected_keepers_mock.go -package mocks . AccountKeeper,BankKeeper,ApplicationKeeper,ProofKeeper +//go:generate mockgen -destination ../../../testutil/tokenomics/mocks/expected_keepers_mock.go -package mocks . AccountKeeper,BankKeeper,ApplicationKeeper,ProofKeeper,SharedKeeper package types @@ -50,4 +50,6 @@ type ProofKeeper interface { type SharedKeeper interface { GetParams(ctx context.Context) sharedtypes.Params SetParams(ctx context.Context, params sharedtypes.Params) error + + GetProofWindowCloseHeight(ctx context.Context, queryHeight int64) int64 }