Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proof] Implement scalable proof validation #1031

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
86 changes: 45 additions & 41 deletions x/proof/keeper/msg_server_submit_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import (
// A proof that's stored onchain is what leads to rewards (i.e. inflation)
// downstream, making this a critical part of the protocol.
//
// Note that the validation of the proof is done in `EnsureValidProof`. However,
// preliminary checks are done in the handler to prevent sybil or DoS attacks on
// full nodes because storing and validating proofs is expensive.
// Note that the validation of the proof is done in `EnsureValidProofSignaturesAndClosestPath`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// SubmitProof is the server message handler that stores a valid
// proof onchain, enabling downstream reward distribution.
//
// IMPORTANT: Full proof validation occurs in EnsureValidProofSignaturesAndClosestPath.
// This handler performs preliminary validation to prevent sybil/DoS attacks.
//
// There is a security & performance balance and tradeoff between the handler and end blocker:
// - Basic validation on submission (here)
// - Exhaustive validation in endblocker (EnsureValidProofSignaturesAndClosestPath)
//
// Note: Proof submitter may differ from supplier signer, allowing batched submissions
// to optimize transaction fees.

// However, preliminary checks are done in the handler to prevent sybil or DoS attacks on
// full nodes by submitting malformed proofs.
//
// We are playing a balance of security and efficiency here, where enough validation
// is done on proof submission, and exhaustive validation is done during session
// settlement.
// is done on proof submission, and exhaustive validation is done during the endblocker.
//
// The entity sending the SubmitProof messages does not necessarily need
// to correspond to the supplier signing the proof. For example, a single entity
Expand All @@ -45,10 +44,10 @@ func (k msgServer) SubmitProof(
isExistingProof bool
numRelays uint64
numClaimComputeUnits uint64
sessionHeader *sessiontypes.SessionHeader
)

logger := k.Logger().With("method", "SubmitProof")
sdkCtx := cosmostypes.UnwrapSDKContext(ctx)
logger.Info("About to start submitting proof")

// Basic validation of the SubmitProof message.
Expand All @@ -57,25 +56,30 @@ func (k msgServer) SubmitProof(
}
logger.Info("validated the submitProof message")

// Compare msg session header w/ onchain session header.
session, err := k.queryAndValidateSessionHeader(ctx, msg.GetSessionHeader(), msg.GetSupplierOperatorAddress())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
sessionHeader = msg.GetSessionHeader()

// Defer telemetry calls so that they reference the final values the relevant variables.
defer k.finalizeSubmitProofTelemetry(session, msg, isExistingProof, numRelays, numClaimComputeUnits, err)
defer k.finalizeSubmitProofTelemetry(sessionHeader, msg, isExistingProof, numRelays, numClaimComputeUnits, err)

if err = k.deductProofSubmissionFee(ctx, msg.GetSupplierOperatorAddress()); err != nil {
logger.Error(fmt.Sprintf("failed to deduct proof submission fee: %v", err))
// Construct the proof from the message.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
proof := &types.Proof{
SupplierOperatorAddress: msg.GetSupplierOperatorAddress(),
SessionHeader: msg.GetSessionHeader(),
ClosestMerkleProof: msg.GetProof(),
}

// Ensure the proof is well-formed by checking the proof, its corresponding
// claim and relay session headers was well as the proof's submission timing
// (i.e. it is submitted within the proof submission window).
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
claim, err = k.EnsureWellFormedProof(ctx, proof)
if err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}
logger.Info("checked the proof is well-formed")

// Construct the proof
proof := types.Proof{
SupplierOperatorAddress: msg.GetSupplierOperatorAddress(),
SessionHeader: session.GetHeader(),
ClosestMerkleProof: msg.GetProof(),
if err = k.deductProofSubmissionFee(ctx, msg.GetSupplierOperatorAddress()); err != nil {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
logger.Error(fmt.Sprintf("failed to deduct proof submission fee: %v", err))
return nil, status.Error(codes.FailedPrecondition, err.Error())
}

// Helpers for logging the same metadata throughout this function calls
Expand All @@ -84,19 +88,6 @@ func (k msgServer) SubmitProof(
"session_end_height", proof.SessionHeader.SessionEndBlockHeight,
"supplier_operator_address", proof.SupplierOperatorAddress)

// Validate proof message commit height is within the respective session's
// proof submission window using the onchain session header.
if err = k.validateProofWindow(ctx, proof.SessionHeader, proof.SupplierOperatorAddress); err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}

// Retrieve the corresponding claim for the proof submitted so it can be
// used in the proof validation below.
claim, err = k.queryAndValidateClaimForProof(ctx, proof.SessionHeader, proof.SupplierOperatorAddress)
if err != nil {
return nil, status.Error(codes.Internal, types.ErrProofClaimNotFound.Wrap(err.Error()).Error())
}

// Check if a proof is required for the claim.
proofRequirement, err := k.ProofRequirementForClaim(ctx, claim)
if err != nil {
Expand All @@ -120,7 +111,7 @@ func (k msgServer) SubmitProof(
}

// Get the service ID relayMiningDifficulty to calculate the claimed uPOKT.
serviceId := session.GetHeader().GetServiceId()
serviceId := sessionHeader.GetServiceId()
sharedParams := k.sharedKeeper.GetParams(ctx)
relayMiningDifficulty, _ := k.serviceKeeper.GetRelayMiningDifficulty(ctx, serviceId)

Expand All @@ -131,7 +122,7 @@ func (k msgServer) SubmitProof(
_, isExistingProof = k.GetProof(ctx, proof.SessionHeader.SessionId, proof.SupplierOperatorAddress)

// Upsert the proof
k.UpsertProof(ctx, proof)
k.UpsertProof(ctx, *proof)
logger.Info("successfully upserted the proof")

// Emit the appropriate event based on whether the claim was created or updated.
Expand All @@ -141,7 +132,7 @@ func (k msgServer) SubmitProof(
proofUpsertEvent = proto.Message(
&types.EventProofUpdated{
Claim: claim,
Proof: &proof,
Proof: proof,
NumRelays: numRelays,
NumClaimedComputeUnits: numClaimComputeUnits,
NumEstimatedComputeUnits: numEstimatedComputUnits,
Expand All @@ -152,14 +143,16 @@ func (k msgServer) SubmitProof(
proofUpsertEvent = proto.Message(
&types.EventProofSubmitted{
Claim: claim,
Proof: &proof,
Proof: proof,
NumRelays: numRelays,
NumClaimedComputeUnits: numClaimComputeUnits,
NumEstimatedComputeUnits: numEstimatedComputUnits,
ClaimedUpokt: &claimedUPOKT,
},
)
}

sdkCtx := cosmostypes.UnwrapSDKContext(ctx)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
if err = sdkCtx.EventManager().EmitTypedEvent(proofUpsertEvent); err != nil {
return nil, status.Error(
codes.Internal,
Expand All @@ -172,7 +165,7 @@ func (k msgServer) SubmitProof(
}

return &types.MsgSubmitProofResponse{
Proof: &proof,
Proof: proof,
}, nil
}

Expand Down Expand Up @@ -322,10 +315,17 @@ func (k Keeper) getProofRequirementSeedBlockHash(

// finalizeSubmitProofTelemetry finalizes telemetry updates for SubmitProof, incrementing counters as needed.
// Meant to run deferred.
func (k msgServer) finalizeSubmitProofTelemetry(session *sessiontypes.Session, msg *types.MsgSubmitProof, isExistingProof bool, numRelays, numClaimComputeUnits uint64, err error) {
func (k msgServer) finalizeSubmitProofTelemetry(
sessionHeader *sessiontypes.SessionHeader,
msg *types.MsgSubmitProof,
isExistingProof bool,
numRelays,
numClaimComputeUnits uint64,
err error,
) {
if !isExistingProof {
serviceId := session.Header.ServiceId
applicationAddress := session.Header.ApplicationAddress
serviceId := sessionHeader.ServiceId
applicationAddress := sessionHeader.ApplicationAddress
supplierOperatorAddress := msg.GetSupplierOperatorAddress()
claimProofStage := types.ClaimProofStage_PROVEN.String()

Expand All @@ -337,7 +337,11 @@ func (k msgServer) finalizeSubmitProofTelemetry(session *sessiontypes.Session, m

// finalizeProofRequirementTelemetry finalizes telemetry updates for proof requirements.
// Meant to run deferred.
func (k Keeper) finalizeProofRequirementTelemetry(requirementReason types.ProofRequirementReason, claim *types.Claim, err error) {
func (k Keeper) finalizeProofRequirementTelemetry(
requirementReason types.ProofRequirementReason,
claim *types.Claim,
err error,
) {
telemetry.ProofRequirementCounter(
requirementReason.String(),
claim.SessionHeader.ServiceId,
Expand Down
4 changes: 2 additions & 2 deletions x/proof/keeper/msg_server_submit_proof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func TestMsgServer_SubmitProof_Error(t *testing.T) {
},
msgSubmitProofToExpectedErrorFn: func(msgSubmitProof *prooftypes.MsgSubmitProof) error {
return status.Error(
codes.InvalidArgument,
codes.FailedPrecondition,
prooftypes.ErrProofInvalidSessionId.Wrapf(
"session ID does not match onchain session ID; expected %q, got %q",
validSessionHeader.GetSessionId(),
Expand All @@ -652,7 +652,7 @@ func TestMsgServer_SubmitProof_Error(t *testing.T) {
},
msgSubmitProofToExpectedErrorFn: func(msgSubmitProof *prooftypes.MsgSubmitProof) error {
return status.Error(
codes.InvalidArgument,
codes.FailedPrecondition,
prooftypes.ErrProofNotFound.Wrapf(
"supplier operator address %q not found in session ID %q",
wrongSupplierOperatorAddr,
Expand Down
11 changes: 8 additions & 3 deletions x/proof/keeper/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ func (k Keeper) RemoveProof(ctx context.Context, sessionId, supplierOperatorAddr
)
}

// GetAllProofs returns all proof
func (k Keeper) GetAllProofs(ctx context.Context) (proofs []types.Proof) {
// GetAllProofsIterator returns an iterator for all proofs in the store
func (k Keeper) GetAllProofsIterator(ctx context.Context) storetypes.Iterator {
storeAdapter := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
primaryStore := prefix.NewStore(storeAdapter, types.KeyPrefix(types.ProofPrimaryKeyPrefix))
iterator := storetypes.KVStorePrefixIterator(primaryStore, []byte{})
return storetypes.KVStorePrefixIterator(primaryStore, []byte{})
}

// GetAllProofs returns all proofs in the store
func (k Keeper) GetAllProofs(ctx context.Context) (proofs []types.Proof) {
iterator := k.GetAllProofsIterator(ctx)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

defer iterator.Close()

Expand Down
Loading
Loading