diff --git a/x/proof/keeper/msg_server_submit_proof.go b/x/proof/keeper/msg_server_submit_proof.go index 5952eb94b..768527096 100644 --- a/x/proof/keeper/msg_server_submit_proof.go +++ b/x/proof/keeper/msg_server_submit_proof.go @@ -23,13 +23,12 @@ import ( // A proof that's stored onchain is what leads to rewards (i.e. inflation) // downstream, making this a critical part of the protocol. // -// Note that the validation of the proof is done in `EnsureValidProof`. However, -// preliminary checks are done in the handler to prevent sybil or DoS attacks on -// full nodes because storing and validating proofs is expensive. +// Note that the validation of the proof is done in `EnsureValidProofSignaturesAndClosestPath`. +// However, preliminary checks are done in the handler to prevent sybil or DoS attacks on +// full nodes by submitting malformed proofs. // // We are playing a balance of security and efficiency here, where enough validation -// is done on proof submission, and exhaustive validation is done during session -// settlement. +// is done on proof submission, and exhaustive validation is done during the endblocker. // // The entity sending the SubmitProof messages does not necessarily need // to correspond to the supplier signing the proof. For example, a single entity @@ -45,10 +44,10 @@ func (k msgServer) SubmitProof( isExistingProof bool numRelays uint64 numClaimComputeUnits uint64 + sessionHeader *sessiontypes.SessionHeader ) logger := k.Logger().With("method", "SubmitProof") - sdkCtx := cosmostypes.UnwrapSDKContext(ctx) logger.Info("About to start submitting proof") // Basic validation of the SubmitProof message. @@ -57,25 +56,30 @@ func (k msgServer) SubmitProof( } logger.Info("validated the submitProof message") - // Compare msg session header w/ onchain session header. - session, err := k.queryAndValidateSessionHeader(ctx, msg.GetSessionHeader(), msg.GetSupplierOperatorAddress()) - if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } + sessionHeader = msg.GetSessionHeader() // Defer telemetry calls so that they reference the final values the relevant variables. - defer k.finalizeSubmitProofTelemetry(session, msg, isExistingProof, numRelays, numClaimComputeUnits, err) + defer k.finalizeSubmitProofTelemetry(sessionHeader, msg, isExistingProof, numRelays, numClaimComputeUnits, err) - if err = k.deductProofSubmissionFee(ctx, msg.GetSupplierOperatorAddress()); err != nil { - logger.Error(fmt.Sprintf("failed to deduct proof submission fee: %v", err)) + // Construct the proof from the message. + proof := &types.Proof{ + SupplierOperatorAddress: msg.GetSupplierOperatorAddress(), + SessionHeader: msg.GetSessionHeader(), + ClosestMerkleProof: msg.GetProof(), + } + + // Ensure the proof is well-formed by checking the proof, its corresponding + // claim and relay session headers was well as the proof's submission timing + // (i.e. it is submitted within the proof submission window). + claim, err = k.EnsureWellFormedProof(ctx, proof) + if err != nil { return nil, status.Error(codes.FailedPrecondition, err.Error()) } + logger.Info("checked the proof is well-formed") - // Construct the proof - proof := types.Proof{ - SupplierOperatorAddress: msg.GetSupplierOperatorAddress(), - SessionHeader: session.GetHeader(), - ClosestMerkleProof: msg.GetProof(), + if err = k.deductProofSubmissionFee(ctx, msg.GetSupplierOperatorAddress()); err != nil { + logger.Error(fmt.Sprintf("failed to deduct proof submission fee: %v", err)) + return nil, status.Error(codes.FailedPrecondition, err.Error()) } // Helpers for logging the same metadata throughout this function calls @@ -84,19 +88,6 @@ func (k msgServer) SubmitProof( "session_end_height", proof.SessionHeader.SessionEndBlockHeight, "supplier_operator_address", proof.SupplierOperatorAddress) - // Validate proof message commit height is within the respective session's - // proof submission window using the onchain session header. - if err = k.validateProofWindow(ctx, proof.SessionHeader, proof.SupplierOperatorAddress); err != nil { - return nil, status.Error(codes.FailedPrecondition, err.Error()) - } - - // Retrieve the corresponding claim for the proof submitted so it can be - // used in the proof validation below. - claim, err = k.queryAndValidateClaimForProof(ctx, proof.SessionHeader, proof.SupplierOperatorAddress) - if err != nil { - return nil, status.Error(codes.Internal, types.ErrProofClaimNotFound.Wrap(err.Error()).Error()) - } - // Check if a proof is required for the claim. proofRequirement, err := k.ProofRequirementForClaim(ctx, claim) if err != nil { @@ -120,7 +111,7 @@ func (k msgServer) SubmitProof( } // Get the service ID relayMiningDifficulty to calculate the claimed uPOKT. - serviceId := session.GetHeader().GetServiceId() + serviceId := sessionHeader.GetServiceId() sharedParams := k.sharedKeeper.GetParams(ctx) relayMiningDifficulty, _ := k.serviceKeeper.GetRelayMiningDifficulty(ctx, serviceId) @@ -131,7 +122,7 @@ func (k msgServer) SubmitProof( _, isExistingProof = k.GetProof(ctx, proof.SessionHeader.SessionId, proof.SupplierOperatorAddress) // Upsert the proof - k.UpsertProof(ctx, proof) + k.UpsertProof(ctx, *proof) logger.Info("successfully upserted the proof") // Emit the appropriate event based on whether the claim was created or updated. @@ -141,7 +132,7 @@ func (k msgServer) SubmitProof( proofUpsertEvent = proto.Message( &types.EventProofUpdated{ Claim: claim, - Proof: &proof, + Proof: proof, NumRelays: numRelays, NumClaimedComputeUnits: numClaimComputeUnits, NumEstimatedComputeUnits: numEstimatedComputUnits, @@ -152,7 +143,7 @@ func (k msgServer) SubmitProof( proofUpsertEvent = proto.Message( &types.EventProofSubmitted{ Claim: claim, - Proof: &proof, + Proof: proof, NumRelays: numRelays, NumClaimedComputeUnits: numClaimComputeUnits, NumEstimatedComputeUnits: numEstimatedComputUnits, @@ -160,6 +151,8 @@ func (k msgServer) SubmitProof( }, ) } + + sdkCtx := cosmostypes.UnwrapSDKContext(ctx) if err = sdkCtx.EventManager().EmitTypedEvent(proofUpsertEvent); err != nil { return nil, status.Error( codes.Internal, @@ -172,7 +165,7 @@ func (k msgServer) SubmitProof( } return &types.MsgSubmitProofResponse{ - Proof: &proof, + Proof: proof, }, nil } @@ -322,10 +315,17 @@ func (k Keeper) getProofRequirementSeedBlockHash( // finalizeSubmitProofTelemetry finalizes telemetry updates for SubmitProof, incrementing counters as needed. // Meant to run deferred. -func (k msgServer) finalizeSubmitProofTelemetry(session *sessiontypes.Session, msg *types.MsgSubmitProof, isExistingProof bool, numRelays, numClaimComputeUnits uint64, err error) { +func (k msgServer) finalizeSubmitProofTelemetry( + sessionHeader *sessiontypes.SessionHeader, + msg *types.MsgSubmitProof, + isExistingProof bool, + numRelays, + numClaimComputeUnits uint64, + err error, +) { if !isExistingProof { - serviceId := session.Header.ServiceId - applicationAddress := session.Header.ApplicationAddress + serviceId := sessionHeader.ServiceId + applicationAddress := sessionHeader.ApplicationAddress supplierOperatorAddress := msg.GetSupplierOperatorAddress() claimProofStage := types.ClaimProofStage_PROVEN.String() @@ -337,7 +337,11 @@ func (k msgServer) finalizeSubmitProofTelemetry(session *sessiontypes.Session, m // finalizeProofRequirementTelemetry finalizes telemetry updates for proof requirements. // Meant to run deferred. -func (k Keeper) finalizeProofRequirementTelemetry(requirementReason types.ProofRequirementReason, claim *types.Claim, err error) { +func (k Keeper) finalizeProofRequirementTelemetry( + requirementReason types.ProofRequirementReason, + claim *types.Claim, + err error, +) { telemetry.ProofRequirementCounter( requirementReason.String(), claim.SessionHeader.ServiceId, diff --git a/x/proof/keeper/msg_server_submit_proof_test.go b/x/proof/keeper/msg_server_submit_proof_test.go index 191fdff5d..dbea95aa7 100644 --- a/x/proof/keeper/msg_server_submit_proof_test.go +++ b/x/proof/keeper/msg_server_submit_proof_test.go @@ -630,7 +630,7 @@ func TestMsgServer_SubmitProof_Error(t *testing.T) { }, msgSubmitProofToExpectedErrorFn: func(msgSubmitProof *prooftypes.MsgSubmitProof) error { return status.Error( - codes.InvalidArgument, + codes.FailedPrecondition, prooftypes.ErrProofInvalidSessionId.Wrapf( "session ID does not match onchain session ID; expected %q, got %q", validSessionHeader.GetSessionId(), @@ -652,7 +652,7 @@ func TestMsgServer_SubmitProof_Error(t *testing.T) { }, msgSubmitProofToExpectedErrorFn: func(msgSubmitProof *prooftypes.MsgSubmitProof) error { return status.Error( - codes.InvalidArgument, + codes.FailedPrecondition, prooftypes.ErrProofNotFound.Wrapf( "supplier operator address %q not found in session ID %q", wrongSupplierOperatorAddr, diff --git a/x/proof/keeper/proof.go b/x/proof/keeper/proof.go index 915803199..9a32ddd83 100644 --- a/x/proof/keeper/proof.go +++ b/x/proof/keeper/proof.go @@ -91,11 +91,16 @@ func (k Keeper) RemoveProof(ctx context.Context, sessionId, supplierOperatorAddr ) } -// GetAllProofs returns all proof -func (k Keeper) GetAllProofs(ctx context.Context) (proofs []types.Proof) { +// GetAllProofsIterator returns an iterator for all proofs in the store +func (k Keeper) GetAllProofsIterator(ctx context.Context) storetypes.Iterator { storeAdapter := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx)) primaryStore := prefix.NewStore(storeAdapter, types.KeyPrefix(types.ProofPrimaryKeyPrefix)) - iterator := storetypes.KVStorePrefixIterator(primaryStore, []byte{}) + return storetypes.KVStorePrefixIterator(primaryStore, []byte{}) +} + +// GetAllProofs returns all proofs in the store +func (k Keeper) GetAllProofs(ctx context.Context) (proofs []types.Proof) { + iterator := k.GetAllProofsIterator(ctx) defer iterator.Close() diff --git a/x/proof/keeper/proof_validation.go b/x/proof/keeper/proof_validation.go index bb45be1aa..0892f071b 100644 --- a/x/proof/keeper/proof_validation.go +++ b/x/proof/keeper/proof_validation.go @@ -42,41 +42,32 @@ import ( sessiontypes "github.com/pokt-network/poktroll/x/session/types" ) -// EnsureValidProof validates the proof submitted by the supplier is correct with -// respect to an onchain claim. +// EnsureWellFormedProof ensures that the proof submitted by the supplier is valid w.r.t its +// 1. Session header, +// 2. Submission block height is within the proof submission window, +// 3. Corresponding relay request and response pass basic validation and their +// session headers match the proof session header, +// 4. Relay mining difficulty is above the minimum required to earn rewards. // -// This function should be called during session settlement (i.e. EndBlocker) -// rather than during proof submission (i.e. SubmitProof) because: -// 1. RPC requests should be quick, lightweight and only do basic validation -// 2. Validators are the ones responsible for the heavy processing & validation during state transitions -// 3. This creates an opportunity to slash suppliers who submit false proofs, whereas -// they can keep retrying if it takes place in the SubmitProof handler. +// It does not validate the proof's relay signatures or ClosestMerkleProof as these are +// computationally expensive and should be done in the EndBlocker corresponding +// to the block height at which the proof is submitted. // -// Note that some of the validation here is redundant with the validation done in -// SubmitProof (in the handler). The reason for this is because were are trying -// to find a balance between preventing sybil or DoS attacks on full nodes -// during proof submission, but being completely exhaustive in all the checks done here. -func (k Keeper) EnsureValidProof( - ctx context.Context, - proof *types.Proof, -) error { - // Telemetry: measure execution time. - defer cosmostelemetry.MeasureSince(cosmostelemetry.Now(), telemetry.MetricNameKeys("proof", "validation")...) - - logger := k.Logger().With("method", "ValidateProof") +// This function should be called in the handler corresponding to the message type +// that submits the proof (i.e. SubmitProof). +// +// NOTE: A fully valid proof must pass both EnsureWellFormedProof and +// EnsureValidProofSignaturesAndClosestPath. +func (k Keeper) EnsureWellFormedProof(ctx context.Context, proof *types.Proof) (*types.Claim, error) { + logger := k.Logger().With("method", "EnsureWellFormedProof") - // Retrieve the supplier operator's public key. supplierOperatorAddr := proof.SupplierOperatorAddress - supplierOperatorPubKey, err := k.accountQuerier.GetPubKeyFromAddress(ctx, supplierOperatorAddr) - if err != nil { - return err - } // Validate the session header. var onChainSession *sessiontypes.Session - onChainSession, err = k.queryAndValidateSessionHeader(ctx, proof.SessionHeader, supplierOperatorAddr) + onChainSession, err := k.queryAndValidateSessionHeader(ctx, proof.SessionHeader, supplierOperatorAddr) if err != nil { - return err + return nil, err } logger.Info("queried and validated the session header") @@ -88,17 +79,17 @@ func (k Keeper) EnsureValidProof( // Validate proof message commit height is within the respective session's // proof submission window using the onchain session header. if err = k.validateProofWindow(ctx, sessionHeader, supplierOperatorAddr); err != nil { - return err + return nil, err } if len(proof.ClosestMerkleProof) == 0 { - return types.ErrProofInvalidProof.Wrap("proof cannot be empty") + return nil, types.ErrProofInvalidProof.Wrap("proof cannot be empty") } // Unmarshal the closest merkle proof from the message. sparseCompactMerkleClosestProof := &smt.SparseCompactMerkleClosestProof{} if err = sparseCompactMerkleClosestProof.Unmarshal(proof.ClosestMerkleProof); err != nil { - return types.ErrProofInvalidProof.Wrapf( + return nil, types.ErrProofInvalidProof.Wrapf( "failed to unmarshal closest merkle proof: %s", err, ) @@ -107,7 +98,7 @@ func (k Keeper) EnsureValidProof( // SparseCompactMerkeClosestProof does not implement GetValueHash, so we need to decompact it. sparseMerkleClosestProof, err := smt.DecompactClosestProof(sparseCompactMerkleClosestProof, &protocol.SmtSpec) if err != nil { - return types.ErrProofInvalidProof.Wrapf( + return nil, types.ErrProofInvalidProof.Wrapf( "failed to decompact closest merkle proof: %s", err, ) @@ -117,7 +108,7 @@ func (k Keeper) EnsureValidProof( relayBz := sparseMerkleClosestProof.GetValueHash(&protocol.SmtSpec) relay := &servicetypes.Relay{} if err = k.cdc.Unmarshal(relayBz, relay); err != nil { - return types.ErrProofInvalidRelay.Wrapf( + return nil, types.ErrProofInvalidRelay.Wrapf( "failed to unmarshal relay: %s", err, ) @@ -126,47 +117,35 @@ func (k Keeper) EnsureValidProof( // Basic validation of the relay request. relayReq := relay.GetReq() if err = relayReq.ValidateBasic(); err != nil { - return err + return nil, err } logger.Debug("successfully validated relay request") // Make sure that the supplier operator address in the proof matches the one in the relay request. if supplierOperatorAddr != relayReq.Meta.SupplierOperatorAddress { - return types.ErrProofSupplierMismatch.Wrapf("supplier type mismatch") + return nil, types.ErrProofSupplierMismatch.Wrapf("supplier type mismatch") } logger.Debug("the proof supplier operator address matches the relay request supplier operator address") // Basic validation of the relay response. relayRes := relay.GetRes() if err = relayRes.ValidateBasic(); err != nil { - return err + return nil, err } logger.Debug("successfully validated relay response") // Verify that the relay request session header matches the proof session header. if err = compareSessionHeaders(sessionHeader, relayReq.Meta.GetSessionHeader()); err != nil { - return err + return nil, err } logger.Debug("successfully compared relay request session header") // Verify that the relay response session header matches the proof session header. if err = compareSessionHeaders(sessionHeader, relayRes.Meta.GetSessionHeader()); err != nil { - return err + return nil, err } logger.Debug("successfully compared relay response session header") - // Verify the relay request's signature. - if err = k.ringClient.VerifyRelayRequestSignature(ctx, relayReq); err != nil { - return err - } - logger.Debug("successfully verified relay request signature") - - // Verify the relay response's signature. - if err = relayRes.VerifySupplierOperatorSignature(supplierOperatorPubKey); err != nil { - return err - } - logger.Debug("successfully verified relay response signature") - // Get the service's relay mining difficulty. serviceRelayDifficulty, _ := k.serviceKeeper.GetRelayMiningDifficulty(ctx, sessionHeader.GetServiceId()) @@ -175,10 +154,87 @@ func (k Keeper) EnsureValidProof( relayBz, serviceRelayDifficulty.GetTargetHash(), ); err != nil { - return types.ErrProofInvalidRelayDifficulty.Wrapf("failed to validate relay difficulty for service %s due to: %v", sessionHeader.ServiceId, err) + return nil, types.ErrProofInvalidRelayDifficulty.Wrapf("failed to validate relay difficulty for service %s due to: %v", sessionHeader.ServiceId, err) } logger.Debug("successfully validated relay mining difficulty") + // Retrieve the corresponding claim for the proof submitted + claim, err := k.queryAndValidateClaimForProof(ctx, sessionHeader, supplierOperatorAddr) + if err != nil { + return nil, err + } + logger.Debug("successfully retrieved and validated claim") + + return claim, nil +} + +// EnsureValidProofSignaturesAndClosestPath ensures that the proof submitted by +// the supplier has a valid relay request/response signatures and closest path +// with respect to an onchain claim. +// +// This function should be called in the EndBlocker corresponding to the block height +// at which the proof is submitted rather than during proof submission (i.e. SubmitProof). +// +// NOTE: A fully valid proof must pass both EnsureWellFormedProof and +// EnsureValidProofSignaturesAndClosestPath. +func (k Keeper) EnsureValidProofSignaturesAndClosestPath( + ctx context.Context, + proof *types.Proof, +) error { + // Telemetry: measure execution time. + defer cosmostelemetry.MeasureSince(cosmostelemetry.Now(), telemetry.MetricNameKeys("proof", "validation")...) + + logger := k.Logger().With("method", "EnsureValidProofSignaturesAndClosestPath") + + // Retrieve the supplier operator's public key. + supplierOperatorAddr := proof.SupplierOperatorAddress + supplierOperatorPubKey, err := k.accountQuerier.GetPubKeyFromAddress(ctx, supplierOperatorAddr) + if err != nil { + return err + } + + sessionHeader := proof.GetSessionHeader() + + // Unmarshal the closest merkle proof from the message. + sparseCompactMerkleClosestProof := &smt.SparseCompactMerkleClosestProof{} + if err = sparseCompactMerkleClosestProof.Unmarshal(proof.ClosestMerkleProof); err != nil { + return types.ErrProofInvalidProof.Wrapf( + "failed to unmarshal closest merkle proof: %s", + err, + ) + } + + // SparseCompactMerkeClosestProof does not implement GetValueHash, so we need to decompact it. + sparseMerkleClosestProof, err := smt.DecompactClosestProof(sparseCompactMerkleClosestProof, &protocol.SmtSpec) + if err != nil { + return types.ErrProofInvalidProof.Wrapf( + "failed to decompact closest merkle proof: %s", + err, + ) + } + + // Get the relay request and response from the proof.GetClosestMerkleProof. + relayBz := sparseMerkleClosestProof.GetValueHash(&protocol.SmtSpec) + relay := &servicetypes.Relay{} + if err = k.cdc.Unmarshal(relayBz, relay); err != nil { + return types.ErrProofInvalidRelay.Wrapf( + "failed to unmarshal relay: %s", + err, + ) + } + + // Verify the relay request's signature. + if err = k.ringClient.VerifyRelayRequestSignature(ctx, relay.GetReq()); err != nil { + return err + } + logger.Debug("successfully verified relay request signature") + + // Verify the relay response's signature. + if err = relay.GetRes().VerifySupplierOperatorSignature(supplierOperatorPubKey); err != nil { + return err + } + logger.Debug("successfully verified relay response signature") + // Validate that path the proof is submitted for matches the expected one // based on the pseudo-random onchain data associated with the header. if err = k.validateClosestPath( @@ -193,12 +249,11 @@ func (k Keeper) EnsureValidProof( // Retrieve the corresponding claim for the proof submitted so it can be // used in the proof validation below. - claim, err := k.queryAndValidateClaimForProof(ctx, sessionHeader, supplierOperatorAddr) - if err != nil { - return err - } + // EnsureWellFormedProof has already validated that the claim referenced by the + // proof exists and has a matching session header. + claim, _ := k.GetClaim(ctx, sessionHeader.GetSessionId(), supplierOperatorAddr) - logger.Debug("successfully retrieved and validated claim") + logger.Debug("successfully retrieved claim") // Verify the proof's closest merkle proof. if err = verifyClosestProof(sparseMerkleClosestProof, claim.GetRootHash()); err != nil { diff --git a/x/proof/keeper/proof_validation_test.go b/x/proof/keeper/proof_validation_test.go index f6c3fa970..7f34b38bf 100644 --- a/x/proof/keeper/proof_validation_test.go +++ b/x/proof/keeper/proof_validation_test.go @@ -769,8 +769,19 @@ func TestEnsureValidProof_Error(t *testing.T) { // Advance the block height to the earliest proof commit height. ctx = keepertest.SetBlockHeight(ctx, earliestSupplierProofCommitHeight) - err := keepers.EnsureValidProof(ctx, proof) - require.ErrorContains(t, err, test.expectedErr.Error()) + + // An invalid proof is either one that is not well-formed or one that + // has invalid signatures or closest path. + + if _, err := keepers.EnsureWellFormedProof(ctx, proof); err != nil { + require.ErrorContains(t, err, test.expectedErr.Error()) + return + } + + if err := keepers.EnsureValidProofSignaturesAndClosestPath(ctx, proof); err != nil { + require.ErrorContains(t, err, test.expectedErr.Error()) + return + } }) } } diff --git a/x/proof/keeper/validate_proofs.go b/x/proof/keeper/validate_proofs.go new file mode 100644 index 000000000..f7e0e4ee7 --- /dev/null +++ b/x/proof/keeper/validate_proofs.go @@ -0,0 +1,95 @@ +package keeper + +import ( + "runtime" + "sync" + + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/pokt-network/poktroll/x/proof/types" +) + +// ValidateSubmittedProofs validates all proofs submitted in the block and removes +// any invalid proof from the store so that it is not included in the block. +func (k Keeper) ValidateSubmittedProofs(ctx sdk.Context) { + logger := k.Logger().With("method", "ValidateSubmittedProofs") + + // Use an iterator to iterate over all proofs instead of fetching them all + // at once to avoid memory issues. + iterator := k.GetAllProofsIterator(ctx) + defer iterator.Close() + + // Since the proofs are independent of each other, we can validate them in parallel + // across all CPU cores to speed up the process. + + // Use a semaphore to limit the number of goroutines to the number of CPU cores. + // This is to avoid creating too many goroutines which can lead to memory issues. + sem := make(chan struct{}, runtime.NumCPU()) + + // Use a wait group to wait for all goroutines to finish before returning. + wg := sync.WaitGroup{} + + for ; iterator.Valid(); iterator.Next() { + proofBz := iterator.Value() + + // Acquire a semaphore to limit the number of goroutines. + // This will block if the sem channel is full. + sem <- struct{}{} + // Increment the wait group to wait for validation to finish. + wg.Add(1) + + go func(proofBz []byte) { + // Decrement the wait group when the goroutine finishes. + defer wg.Done() + // Release the semaphore after the goroutine finishes which unblocks another + // iteration to run its goroutine. + defer func() { <-sem }() + + var proof types.Proof + // proofBz is not expected to fail unmarshalling since it is should have + // passed EnsureWellFormedProof validation in MsgSubmitProof handler. + // Panic if it fails unmarshalling. + k.cdc.MustUnmarshal(proofBz, &proof) + + // Already validated proofs will have their ClosestMerkleProof cleared. + // Skip already validated proofs submitted at earlier block heights of + // the proof submission window. + if len(proof.ClosestMerkleProof) == 0 { + return + } + + // Try to validate the proof and remove it if it is invalid. + if err := k.EnsureValidProofSignaturesAndClosestPath(ctx, &proof); err != nil { + // Remove the proof if it is invalid to save block space and trigger the + // supplier slashing code path in the SettlePendingClaims flow. + k.RemoveProof(ctx, proof.GetSessionHeader().GetSessionId(), proof.GetSupplierOperatorAddress()) + + // TODO_MAINNET(red-0ne): Emit an invalid proof event to signal that a proof was + // removed due to bad signatures or ClosestMerkleProof. + // For now this could be inferred from the EventProofSubmitted+EventClaimExpired events. + + logger.Info("Removed invalid proof", + "session_id", proof.GetSessionHeader().GetSessionId(), + "supplier_operator_address", proof.GetSupplierOperatorAddress(), + "error", err, + ) + + return + } + + // Clear the ClosestMerkleProof for successfully validated proofs to: + // 1. Save block space as the ClosestMerkleProof embeds the entire relay request and + // response bytes which account for the majority of the proof size. + // 2. Mark the proof as validated to avoid re-validating it in subsequent blocks + // within the same proof submission window. + proof.ClosestMerkleProof = make([]byte, 0, 0) + + // Update the proof in the store to clear the ClosestMerkleProof which makes the + // committed block to never store the potentially large ClosestMerkleProof. + k.UpsertProof(ctx, proof) + }(proofBz) + } + + // Wait for all goroutines to finish before returning. + wg.Wait() +} diff --git a/x/proof/module/abci.go b/x/proof/module/abci.go new file mode 100644 index 000000000..b986fe803 --- /dev/null +++ b/x/proof/module/abci.go @@ -0,0 +1,21 @@ +package proof + +import ( + cosmostelemetry "github.com/cosmos/cosmos-sdk/telemetry" + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/pokt-network/poktroll/x/proof/keeper" + "github.com/pokt-network/poktroll/x/proof/types" +) + +// EndBlocker called at every block and validates all proofs submitted at the block +// height and removes any invalid proofs. +func EndBlocker(ctx sdk.Context, k keeper.Keeper) (err error) { + // Telemetry: measure the end-block execution time following standard cosmos-sdk practices. + defer cosmostelemetry.ModuleMeasureSince(types.ModuleName, cosmostelemetry.Now(), cosmostelemetry.MetricKeyEndBlocker) + + // ValidateSubmittedProofs does not return an error as it is a best-effort function. + k.ValidateSubmittedProofs(ctx) + + return nil +} diff --git a/x/tokenomics/keeper/keeper_settle_pending_claims_test.go b/x/tokenomics/keeper/keeper_settle_pending_claims_test.go index 076663cce..138c61a4b 100644 --- a/x/tokenomics/keeper/keeper_settle_pending_claims_test.go +++ b/x/tokenomics/keeper/keeper_settle_pending_claims_test.go @@ -440,12 +440,15 @@ func (s *TestSuite) TestSettlePendingClaims_ClaimExpired_ProofRequired_InvalidOn s.keepers.UpsertClaim(ctx, s.claim) s.keepers.UpsertProof(ctx, proof) + sdkCtx := cosmostypes.UnwrapSDKContext(ctx) + s.keepers.ValidateSubmittedProofs(sdkCtx) + // Settle pending claims after proof window closes // Expectation: All (1) claims should be expired. // NB: proofs should be rejected when the current height equals the proof window close height. sessionEndHeight := s.claim.SessionHeader.SessionEndBlockHeight blockHeight := sharedtypes.GetProofWindowCloseHeight(&sharedParams, sessionEndHeight) - sdkCtx := cosmostypes.UnwrapSDKContext(ctx).WithBlockHeight(blockHeight) + sdkCtx = sdkCtx.WithBlockHeight(blockHeight) settledResults, expiredResults, err := s.keepers.SettlePendingClaims(sdkCtx) require.NoError(t, err) @@ -475,7 +478,7 @@ func (s *TestSuite) TestSettlePendingClaims_ClaimExpired_ProofRequired_InvalidOn // Validate the event expectedClaimExpiredEvent := expectedClaimExpiredEvents[0] - require.Equal(t, tokenomicstypes.ClaimExpirationReason_PROOF_INVALID, expectedClaimExpiredEvent.GetExpirationReason()) + require.Equal(t, tokenomicstypes.ClaimExpirationReason_PROOF_MISSING, expectedClaimExpiredEvent.GetExpirationReason()) require.Equal(t, s.numRelays, expectedClaimExpiredEvent.GetNumRelays()) require.Equal(t, s.numClaimedComputeUnits, expectedClaimExpiredEvent.GetNumClaimedComputeUnits()) require.Equal(t, s.numEstimatedComputeUnits, expectedClaimExpiredEvent.GetNumEstimatedComputeUnits()) diff --git a/x/tokenomics/keeper/settle_pending_claims.go b/x/tokenomics/keeper/settle_pending_claims.go index 2d30a0e6b..b422b468d 100644 --- a/x/tokenomics/keeper/settle_pending_claims.go +++ b/x/tokenomics/keeper/settle_pending_claims.go @@ -112,7 +112,7 @@ func (k Keeper) SettlePendingClaims(ctx cosmostypes.Context) ( return settledResults, expiredResults, err } - proof, isProofFound := k.proofKeeper.GetProof(ctx, sessionId, claim.SupplierOperatorAddress) + _, isProofFound := k.proofKeeper.GetProof(ctx, sessionId, claim.SupplierOperatorAddress) // Using the probabilistic proofs approach, determine if this expiring // claim required an onchain proof proofRequirement, err = k.proofKeeper.ProofRequirementForClaim(ctx, &claim) @@ -137,12 +137,7 @@ func (k Keeper) SettlePendingClaims(ctx cosmostypes.Context) ( if proofIsRequired { expirationReason := tokenomicstypes.ClaimExpirationReason_EXPIRATION_REASON_UNSPECIFIED // EXPIRATION_REASON_UNSPECIFIED is the default - if isProofFound { - if err = k.proofKeeper.EnsureValidProof(ctx, &proof); err != nil { - logger.Warn(fmt.Sprintf("Proof was found but is invalid due to %v", err)) - expirationReason = tokenomicstypes.ClaimExpirationReason_PROOF_INVALID - } - } else { + if !isProofFound { expirationReason = tokenomicstypes.ClaimExpirationReason_PROOF_MISSING } diff --git a/x/tokenomics/types/expected_keepers.go b/x/tokenomics/types/expected_keepers.go index 9d555f4e2..b09de7983 100644 --- a/x/tokenomics/types/expected_keepers.go +++ b/x/tokenomics/types/expected_keepers.go @@ -56,10 +56,10 @@ type ProofKeeper interface { RemoveProof(ctx context.Context, sessionId, supplierOperatorAddr string) AllClaims(ctx context.Context, req *prooftypes.QueryAllClaimsRequest) (*prooftypes.QueryAllClaimsResponse, error) - EnsureValidProof(ctx context.Context, proof *prooftypes.Proof) error ProofRequirementForClaim(ctx context.Context, claim *prooftypes.Claim) (prooftypes.ProofRequirementReason, error) // Only used for testing & simulation + ValidateSubmittedProofs(ctx cosmostypes.Context) GetAllProofs(ctx context.Context) []prooftypes.Proof UpsertClaim(ctx context.Context, claim prooftypes.Claim) UpsertProof(ctx context.Context, claim prooftypes.Proof)