Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batching support for virtual chain and UTXO by addresses RPC calls #2155

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion app/appmessage/rpc_get_utxos_by_addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package appmessage
// its respective RPC message
type GetUTXOsByAddressesRequestMessage struct {
baseMessage
Addresses []string
Addresses []string
BatchDaaScoreStart uint64
BatchSize uint64
}

// Command returns the protocol command string for the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type GetVirtualSelectedParentChainFromBlockRequestMessage struct {
baseMessage
StartHash string
IncludeAcceptedTransactionIDs bool
BatchSize uint64
}

// Command returns the protocol command string for the message
Expand All @@ -15,11 +16,12 @@ func (msg *GetVirtualSelectedParentChainFromBlockRequestMessage) Command() Messa

// NewGetVirtualSelectedParentChainFromBlockRequestMessage returns a instance of the message
func NewGetVirtualSelectedParentChainFromBlockRequestMessage(
startHash string, includeAcceptedTransactionIDs bool) *GetVirtualSelectedParentChainFromBlockRequestMessage {
startHash string, includeAcceptedTransactionIDs bool, batchSize uint64) *GetVirtualSelectedParentChainFromBlockRequestMessage {

return &GetVirtualSelectedParentChainFromBlockRequestMessage{
StartHash: startHash,
IncludeAcceptedTransactionIDs: includeAcceptedTransactionIDs,
BatchSize: batchSize,
}
}

Expand Down
33 changes: 33 additions & 0 deletions app/rpc/rpccontext/utxos_by_addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpccontext

import (
"encoding/hex"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/txscript"
"github.com/kaspanet/kaspad/util"
"github.com/pkg/errors"
Expand All @@ -10,6 +11,38 @@ import (
"github.com/kaspanet/kaspad/domain/utxoindex"
)

// OutpointDAAScoreEntry is used for finding a batch of UTXO entries with consecutive DAA score
type OutpointDAAScoreEntry struct {
DAAScore uint64
Outpoint externalapi.DomainOutpoint
}

// ConvertUTXOOutpointEntryBatchToUTXOsByAddressesEntries converts
// a batch of UTXOOutpointEntryPairs to a slice of UTXOsByAddressesEntry
func ConvertUTXOOutpointEntryBatchToUTXOsByAddressesEntries(
address string, pairs utxoindex.UTXOOutpointEntryPairs,
batch []OutpointDAAScoreEntry) []*appmessage.UTXOsByAddressesEntry {
utxosByAddressesEntries := make([]*appmessage.UTXOsByAddressesEntry, 0, len(batch))
for _, scoreEntry := range batch {
outpoint := scoreEntry.Outpoint
utxoEntry := pairs[outpoint]
utxosByAddressesEntries = append(utxosByAddressesEntries, &appmessage.UTXOsByAddressesEntry{
Address: address,
Outpoint: &appmessage.RPCOutpoint{
TransactionID: outpoint.TransactionID.String(),
Index: outpoint.Index,
},
UTXOEntry: &appmessage.RPCUTXOEntry{
Amount: utxoEntry.Amount(),
ScriptPublicKey: &appmessage.RPCScriptPublicKey{Script: hex.EncodeToString(utxoEntry.ScriptPublicKey().Script), Version: utxoEntry.ScriptPublicKey().Version},
BlockDAAScore: utxoEntry.BlockDAAScore(),
IsCoinbase: utxoEntry.IsCoinbase(),
},
})
}
return utxosByAddressesEntries
}

// ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries converts
// UTXOOutpointEntryPairs to a slice of UTXOsByAddressesEntry
func ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries(address string, pairs utxoindex.UTXOOutpointEntryPairs) []*appmessage.UTXOsByAddressesEntry {
Expand Down
47 changes: 45 additions & 2 deletions app/rpc/rpchandlers/get_utxos_by_addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
"github.com/kaspanet/kaspad/domain/consensus/utils/txscript"
"github.com/kaspanet/kaspad/domain/utxoindex"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/util"
"sort"
)

// HandleGetUTXOsByAddresses handles the respectively named RPC command
Expand Down Expand Up @@ -36,10 +38,51 @@ func HandleGetUTXOsByAddresses(context *rpccontext.Context, _ *router.Router, re
if err != nil {
return nil, err
}
entries := rpccontext.ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries(addressString, utxoOutpointEntryPairs)
allEntries = append(allEntries, entries...)

if getUTXOsByAddressesRequest.BatchDaaScoreStart > 0 ||
(getUTXOsByAddressesRequest.BatchSize > 0 && uint64(len(utxoOutpointEntryPairs)) > getUTXOsByAddressesRequest.BatchSize) {
// Find a batch of entries with consecutive DAA score
entriesOrderedBatch := extractOrderedEntriesBatch(utxoOutpointEntryPairs, getUTXOsByAddressesRequest.BatchDaaScoreStart, getUTXOsByAddressesRequest.BatchSize)
// Extract the batch from the full pairs map
entries := rpccontext.ConvertUTXOOutpointEntryBatchToUTXOsByAddressesEntries(addressString, utxoOutpointEntryPairs, entriesOrderedBatch)
allEntries = append(allEntries, entries...)
} else {
entries := rpccontext.ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries(addressString, utxoOutpointEntryPairs)
allEntries = append(allEntries, entries...)
}
}

response := appmessage.NewGetUTXOsByAddressesResponseMessage(allEntries)
return response, nil
}

func extractOrderedEntriesBatch(utxoOutpointEntryPairs utxoindex.UTXOOutpointEntryPairs, batchDaaScoreStart, batchSize uint64) []rpccontext.OutpointDAAScoreEntry {
entriesSlice := make([]rpccontext.OutpointDAAScoreEntry, 0, len(utxoOutpointEntryPairs))
// Extract to slice
for outpoint, utxoEntry := range utxoOutpointEntryPairs {
entriesSlice = append(entriesSlice, rpccontext.OutpointDAAScoreEntry{DAAScore: utxoEntry.BlockDAAScore(), Outpoint: outpoint})
}
// Sort by DAA score
sort.Slice(entriesSlice, func(i, j int) bool {
if entriesSlice[i].DAAScore == entriesSlice[j].DAAScore {
if entriesSlice[i].Outpoint.TransactionID.Equal(&entriesSlice[j].Outpoint.TransactionID) {
return entriesSlice[i].Outpoint.Index < entriesSlice[j].Outpoint.Index
}
return entriesSlice[i].Outpoint.TransactionID.Less(&entriesSlice[j].Outpoint.TransactionID)
}
return entriesSlice[i].DAAScore < entriesSlice[j].DAAScore
})
// Find batch start and end points
startIndex := len(entriesSlice)
endIndex := uint64(len(entriesSlice))
for i := 0; i < len(entriesSlice); i++ {
if entriesSlice[i].DAAScore >= batchDaaScoreStart {
startIndex = i
break
}
}
if uint64(startIndex)+batchSize < endIndex {
endIndex = uint64(startIndex) + batchSize
}
return entriesSlice[startIndex:endIndex]
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

// HandleGetVirtualSelectedParentChainFromBlock handles the respectively named RPC command
func HandleGetVirtualSelectedParentChainFromBlock(context *rpccontext.Context, _ *router.Router, request appmessage.Message) (appmessage.Message, error) {
getVirtualSelectedParentChainFromBlockRequest := request.(*appmessage.GetVirtualSelectedParentChainFromBlockRequestMessage)
chainRequest := request.(*appmessage.GetVirtualSelectedParentChainFromBlockRequestMessage)

startHash, err := externalapi.NewDomainHashFromString(getVirtualSelectedParentChainFromBlockRequest.StartHash)
startHash, err := externalapi.NewDomainHashFromString(chainRequest.StartHash)
if err != nil {
errorMessage := &appmessage.GetVirtualSelectedParentChainFromBlockResponseMessage{}
errorMessage.Error = appmessage.RPCErrorf("Could not parse startHash: %s", err)
Expand All @@ -22,14 +22,22 @@ func HandleGetVirtualSelectedParentChainFromBlock(context *rpccontext.Context, _
if err != nil {
response := &appmessage.GetVirtualSelectedParentChainFromBlockResponseMessage{}
response.Error = appmessage.RPCErrorf("Could not build virtual "+
"selected parent chain from %s: %s", getVirtualSelectedParentChainFromBlockRequest.StartHash, err)
"selected parent chain from %s: %s", chainRequest.StartHash, err)
return response, nil
}

if chainRequest.BatchSize > 0 && uint64(len(virtualSelectedParentChain.Added)) > chainRequest.BatchSize {
// Send at most `BatchSize` added chain blocks
virtualSelectedParentChain.Added = virtualSelectedParentChain.Added[:chainRequest.BatchSize]
}

chainChangedNotification, err := context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage(
virtualSelectedParentChain, getVirtualSelectedParentChainFromBlockRequest.IncludeAcceptedTransactionIDs)
virtualSelectedParentChain, chainRequest.IncludeAcceptedTransactionIDs)
if err != nil {
return nil, err
response := &appmessage.GetVirtualSelectedParentChainFromBlockResponseMessage{}
response.Error = appmessage.RPCErrorf("Could not load acceptance data for virtual "+
"selected parent chain from %s: %s", chainRequest.StartHash, err)
return response, nil
}

response := appmessage.NewGetVirtualSelectedParentChainFromBlockResponseMessage(
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -918,13 +918,14 @@ Currently unimplemented

### GetVirtualSelectedParentChainFromBlockRequestMessage
GetVirtualSelectedParentChainFromBlockRequestMessage requests the virtual selected
parent chain from some startHash to this kaspad&#39;s current virtual
parent chain from some startHash to this kaspad&#39;s current virtual.


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| startHash | [string](#string) | | |
| includeAcceptedTransactionIds | [bool](#bool) | | |
| batchSize | [uint64](#uint64) | | If batchSize is specified and &gt; 0, then the response will contain at most batchSize added blocks. Note: batchSize does not restrict the number of removed blocks. |



Expand Down Expand Up @@ -1315,14 +1316,19 @@ See: UtxosChangedNotificationMessage
<a name="protowire.GetUtxosByAddressesRequestMessage"></a>

### GetUtxosByAddressesRequestMessage
GetUtxosByAddressesRequestMessage requests all current UTXOs for the given kaspad addresses
GetUtxosByAddressesRequestMessage requests all current UTXOs for the given kaspad addresses.
Both batchDaaScoreStart, batchSize parameters allow querying the entries in a batched process
by using the DAA score of the last entry as a starting point for the next batch (note that some
duplications are possible on the bordering DAA score).

This call is only available when this kaspad was started with `--utxoindex`


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| addresses | [string](#string) | repeated | |
| batchDaaScoreStart | [uint64](#uint64) | | If batchDaaScoreStart is specified it sends only entries starting from that DAA score (inclusive). |
| batchSize | [uint64](#uint64) | | If batchSize is specified and &gt; 0, the response will contain at most batchSize entries (by DAA score order). |



Expand Down
Loading