Skip to content

Commit

Permalink
feat: scalable proof validation
Browse files Browse the repository at this point in the history
  • Loading branch information
red-0ne committed Jan 17, 2025
1 parent 0c52ba7 commit 074743d
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 113 deletions.
86 changes: 45 additions & 41 deletions x/proof/keeper/msg_server_submit_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import (
// A proof that's stored onchain is what leads to rewards (i.e. inflation)
// downstream, making this a critical part of the protocol.
//
// Note that the validation of the proof is done in `EnsureValidProof`. However,
// preliminary checks are done in the handler to prevent sybil or DoS attacks on
// full nodes because storing and validating proofs is expensive.
// Note that the validation of the proof is done in `EnsureValidProofSignaturesAndClosestPath`.
// However, preliminary checks are done in the handler to prevent sybil or DoS attacks on
// full nodes by submitting malformed proofs.
//
// We are playing a balance of security and efficiency here, where enough validation
// is done on proof submission, and exhaustive validation is done during session
// settlement.
// is done on proof submission, and exhaustive validation is done during the endblocker.
//
// The entity sending the SubmitProof messages does not necessarily need
// to correspond to the supplier signing the proof. For example, a single entity
Expand All @@ -45,10 +44,10 @@ func (k msgServer) SubmitProof(
isExistingProof bool
numRelays uint64
numClaimComputeUnits uint64
sessionHeader *sessiontypes.SessionHeader
)

logger := k.Logger().With("method", "SubmitProof")
sdkCtx := cosmostypes.UnwrapSDKContext(ctx)
logger.Info("About to start submitting proof")

// Basic validation of the SubmitProof message.
Expand All @@ -57,25 +56,30 @@ func (k msgServer) SubmitProof(
}
logger.Info("validated the submitProof message")

// Compare msg session header w/ onchain session header.
session, err := k.queryAndValidateSessionHeader(ctx, msg.GetSessionHeader(), msg.GetSupplierOperatorAddress())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
sessionHeader = msg.GetSessionHeader()

// Defer telemetry calls so that they reference the final values the relevant variables.
defer k.finalizeSubmitProofTelemetry(session, msg, isExistingProof, numRelays, numClaimComputeUnits, err)
defer k.finalizeSubmitProofTelemetry(sessionHeader, msg, isExistingProof, numRelays, numClaimComputeUnits, err)

if err = k.deductProofSubmissionFee(ctx, msg.GetSupplierOperatorAddress()); err != nil {
logger.Error(fmt.Sprintf("failed to deduct proof submission fee: %v", err))
// Construct the proof from the message.
proof := &types.Proof{
SupplierOperatorAddress: msg.GetSupplierOperatorAddress(),
SessionHeader: msg.GetSessionHeader(),
ClosestMerkleProof: msg.GetProof(),
}

// Ensure the proof is well-formed by checking the proof, its corresponding
// claim and relay session headers was well as the proof's submission timing
// (i.e. it is submitted within the proof submission window).
claim, err = k.EnsureWellFormedProof(ctx, proof)
if err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
logger.Info("checked the proof is well-formed")

// Construct the proof
proof := types.Proof{
SupplierOperatorAddress: msg.GetSupplierOperatorAddress(),
SessionHeader: session.GetHeader(),
ClosestMerkleProof: msg.GetProof(),
if err = k.deductProofSubmissionFee(ctx, msg.GetSupplierOperatorAddress()); err != nil {
logger.Error(fmt.Sprintf("failed to deduct proof submission fee: %v", err))
return nil, status.Error(codes.FailedPrecondition, err.Error())
}

// Helpers for logging the same metadata throughout this function calls
Expand All @@ -84,19 +88,6 @@ func (k msgServer) SubmitProof(
"session_end_height", proof.SessionHeader.SessionEndBlockHeight,
"supplier_operator_address", proof.SupplierOperatorAddress)

// Validate proof message commit height is within the respective session's
// proof submission window using the onchain session header.
if err = k.validateProofWindow(ctx, proof.SessionHeader, proof.SupplierOperatorAddress); err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}

// Retrieve the corresponding claim for the proof submitted so it can be
// used in the proof validation below.
claim, err = k.queryAndValidateClaimForProof(ctx, proof.SessionHeader, proof.SupplierOperatorAddress)
if err != nil {
return nil, status.Error(codes.Internal, types.ErrProofClaimNotFound.Wrap(err.Error()).Error())
}

// Check if a proof is required for the claim.
proofRequirement, err := k.ProofRequirementForClaim(ctx, claim)
if err != nil {
Expand All @@ -120,7 +111,7 @@ func (k msgServer) SubmitProof(
}

// Get the service ID relayMiningDifficulty to calculate the claimed uPOKT.
serviceId := session.GetHeader().GetServiceId()
serviceId := sessionHeader.GetServiceId()
sharedParams := k.sharedKeeper.GetParams(ctx)
relayMiningDifficulty, _ := k.serviceKeeper.GetRelayMiningDifficulty(ctx, serviceId)

Expand All @@ -131,7 +122,7 @@ func (k msgServer) SubmitProof(
_, isExistingProof = k.GetProof(ctx, proof.SessionHeader.SessionId, proof.SupplierOperatorAddress)

// Upsert the proof
k.UpsertProof(ctx, proof)
k.UpsertProof(ctx, *proof)
logger.Info("successfully upserted the proof")

// Emit the appropriate event based on whether the claim was created or updated.
Expand All @@ -141,7 +132,7 @@ func (k msgServer) SubmitProof(
proofUpsertEvent = proto.Message(
&types.EventProofUpdated{
Claim: claim,
Proof: &proof,
Proof: proof,
NumRelays: numRelays,
NumClaimedComputeUnits: numClaimComputeUnits,
NumEstimatedComputeUnits: numEstimatedComputUnits,
Expand All @@ -152,14 +143,16 @@ func (k msgServer) SubmitProof(
proofUpsertEvent = proto.Message(
&types.EventProofSubmitted{
Claim: claim,
Proof: &proof,
Proof: proof,
NumRelays: numRelays,
NumClaimedComputeUnits: numClaimComputeUnits,
NumEstimatedComputeUnits: numEstimatedComputUnits,
ClaimedUpokt: &claimedUPOKT,
},
)
}

sdkCtx := cosmostypes.UnwrapSDKContext(ctx)
if err = sdkCtx.EventManager().EmitTypedEvent(proofUpsertEvent); err != nil {
return nil, status.Error(
codes.Internal,
Expand All @@ -172,7 +165,7 @@ func (k msgServer) SubmitProof(
}

return &types.MsgSubmitProofResponse{
Proof: &proof,
Proof: proof,
}, nil
}

Expand Down Expand Up @@ -322,10 +315,17 @@ func (k Keeper) getProofRequirementSeedBlockHash(

// finalizeSubmitProofTelemetry finalizes telemetry updates for SubmitProof, incrementing counters as needed.
// Meant to run deferred.
func (k msgServer) finalizeSubmitProofTelemetry(session *sessiontypes.Session, msg *types.MsgSubmitProof, isExistingProof bool, numRelays, numClaimComputeUnits uint64, err error) {
func (k msgServer) finalizeSubmitProofTelemetry(
sessionHeader *sessiontypes.SessionHeader,
msg *types.MsgSubmitProof,
isExistingProof bool,
numRelays,
numClaimComputeUnits uint64,
err error,
) {
if !isExistingProof {
serviceId := session.Header.ServiceId
applicationAddress := session.Header.ApplicationAddress
serviceId := sessionHeader.ServiceId
applicationAddress := sessionHeader.ApplicationAddress
supplierOperatorAddress := msg.GetSupplierOperatorAddress()
claimProofStage := types.ClaimProofStage_PROVEN.String()

Expand All @@ -337,7 +337,11 @@ func (k msgServer) finalizeSubmitProofTelemetry(session *sessiontypes.Session, m

// finalizeProofRequirementTelemetry finalizes telemetry updates for proof requirements.
// Meant to run deferred.
func (k Keeper) finalizeProofRequirementTelemetry(requirementReason types.ProofRequirementReason, claim *types.Claim, err error) {
func (k Keeper) finalizeProofRequirementTelemetry(
requirementReason types.ProofRequirementReason,
claim *types.Claim,
err error,
) {
telemetry.ProofRequirementCounter(
requirementReason.String(),
claim.SessionHeader.ServiceId,
Expand Down
4 changes: 2 additions & 2 deletions x/proof/keeper/msg_server_submit_proof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func TestMsgServer_SubmitProof_Error(t *testing.T) {
},
msgSubmitProofToExpectedErrorFn: func(msgSubmitProof *prooftypes.MsgSubmitProof) error {
return status.Error(
codes.InvalidArgument,
codes.FailedPrecondition,
prooftypes.ErrProofInvalidSessionId.Wrapf(
"session ID does not match onchain session ID; expected %q, got %q",
validSessionHeader.GetSessionId(),
Expand All @@ -652,7 +652,7 @@ func TestMsgServer_SubmitProof_Error(t *testing.T) {
},
msgSubmitProofToExpectedErrorFn: func(msgSubmitProof *prooftypes.MsgSubmitProof) error {
return status.Error(
codes.InvalidArgument,
codes.FailedPrecondition,
prooftypes.ErrProofNotFound.Wrapf(
"supplier operator address %q not found in session ID %q",
wrongSupplierOperatorAddr,
Expand Down
11 changes: 8 additions & 3 deletions x/proof/keeper/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ func (k Keeper) RemoveProof(ctx context.Context, sessionId, supplierOperatorAddr
)
}

// GetAllProofs returns all proof
func (k Keeper) GetAllProofs(ctx context.Context) (proofs []types.Proof) {
// GetAllProofsIterator returns an iterator for all proofs in the store
func (k Keeper) GetAllProofsIterator(ctx context.Context) storetypes.Iterator {
storeAdapter := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
primaryStore := prefix.NewStore(storeAdapter, types.KeyPrefix(types.ProofPrimaryKeyPrefix))
iterator := storetypes.KVStorePrefixIterator(primaryStore, []byte{})
return storetypes.KVStorePrefixIterator(primaryStore, []byte{})
}

// GetAllProofs returns all proofs in the store
func (k Keeper) GetAllProofs(ctx context.Context) (proofs []types.Proof) {
iterator := k.GetAllProofsIterator(ctx)

defer iterator.Close()

Expand Down
Loading

0 comments on commit 074743d

Please sign in to comment.