Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: measure bitswap ttfb from after we get candidates back #432

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,6 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
})
}

totalWritten := atomic.Uint64{}
blockCount := atomic.Uint64{}
bytesWrittenCb := func(bytesWritten uint64) {
// record first byte received
if totalWritten.Load() == 0 {
br.events(events.FirstByte(br.clock.Now(), br.request.RetrievalID, bitswapCandidate, br.clock.Since(startTime), multicodec.TransportBitswap))
}
totalWritten.Add(bytesWritten)
blockCount.Add(1)
// reset the timer
if bytesWritten > 0 && lastBytesReceivedTimer != nil {
lastBytesReceivedTimer.Reset(br.cfg.BlockTimeout)
}
}

// setup providers for this retrieval
hasCandidates, nextCandidates, err := ayncCandidates.Next(retrievalCtx)
if !hasCandidates || err != nil {
Expand All @@ -189,7 +174,8 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
return nil, nil
}

br.events(events.StartedRetrieval(br.clock.Now(), br.request.RetrievalID, bitswapCandidate, multicodec.TransportBitswap))
firstCandidatesTime := br.clock.Now()
br.events(events.StartedRetrieval(firstCandidatesTime, br.request.RetrievalID, bitswapCandidate, multicodec.TransportBitswap))

// set initial providers, then start a goroutine to add more as they come in
br.routing.AddProviders(br.request.RetrievalID, nextCandidates)
Expand All @@ -209,6 +195,35 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
}
}()

totalWritten := atomic.Uint64{}
blockCount := atomic.Uint64{}
ttfb := atomic.Int64{}
bytesWrittenCb := func(bytesWritten uint64) {
// Record first byte received, this is on a per-protocol basis for Bitswap,
// individual providers (currently) don't get one of these so we take the
// duration from the time we started collecting candidates.
// If we end up taking responsibility for dialing peers, and end up with
// first-byte events per peer, we could move the first-byte duration up to
// the post-connect time to match http and graphsync (alternatively)
if totalWritten.Load() == 0 {
ttfbD := br.clock.Since(firstCandidatesTime)
ttfb.Store(int64(ttfbD))
br.events(events.FirstByte(
br.clock.Now(),
br.request.RetrievalID,
bitswapCandidate,
ttfbD,
multicodec.TransportBitswap,
))
}
totalWritten.Add(bytesWritten)
blockCount.Add(1)
// reset the timer
if bytesWritten > 0 && lastBytesReceivedTimer != nil {
lastBytesReceivedTimer.Reset(br.cfg.BlockTimeout)
}
}

// set up the storage system, including the preloader if configured
var preloader preload.Loader
traversalLinkSys := br.request.LinkSystem
Expand Down Expand Up @@ -300,6 +315,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
TotalPayment: big.Zero(),
NumPayments: 0,
AskPrice: big.Zero(),
TimeToFirstByte: time.Duration(ttfb.Load()),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should preserve this change even if this PR is closed, it's the only retriever to not return this value and there's no good reason not to

}, nil
}

Expand Down
90 changes: 48 additions & 42 deletions pkg/retriever/bitswapretriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,24 @@ func TestBitswapRetriever(t *testing.T) {
},
expectedStats: map[cid.Cid]*types.RetrievalStats{
cid1: {
RootCid: cid1,
Size: sizeOf(tbc1.AllBlocks()),
Blocks: 100,
Duration: remoteBlockDuration * 100,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (remoteBlockDuration * 100).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid1,
Size: sizeOf(tbc1.AllBlocks()),
Blocks: 100,
Duration: remoteBlockDuration * 100,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (remoteBlockDuration * 100).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
cid2: {
RootCid: cid2,
Size: sizeOf(tbc2.AllBlocks()),
Blocks: 100,
Duration: remoteBlockDuration * 100,
AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (remoteBlockDuration * 100).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid2,
Size: sizeOf(tbc2.AllBlocks()),
Blocks: 100,
Duration: remoteBlockDuration * 100,
AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (remoteBlockDuration * 100).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
},
},
Expand Down Expand Up @@ -141,22 +143,24 @@ func TestBitswapRetriever(t *testing.T) {
},
expectedStats: map[cid.Cid]*types.RetrievalStats{
cid1: {
RootCid: cid1,
Size: sizeOf(tbc1.Blocks(50, 100)),
Blocks: 50,
Duration: remoteBlockDuration * 50,
AverageSpeed: uint64(float64(sizeOf(tbc1.Blocks(50, 100))) / (remoteBlockDuration * 50).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid1,
Size: sizeOf(tbc1.Blocks(50, 100)),
Blocks: 50,
Duration: remoteBlockDuration * 50,
AverageSpeed: uint64(float64(sizeOf(tbc1.Blocks(50, 100))) / (remoteBlockDuration * 50).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
cid2: {
RootCid: cid2,
Size: sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...)),
Blocks: 45,
Duration: remoteBlockDuration * 45,
AverageSpeed: uint64(float64(sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...))) / (remoteBlockDuration * 45).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid2,
Size: sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...)),
Blocks: 45,
Duration: remoteBlockDuration * 45,
AverageSpeed: uint64(float64(sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...))) / (remoteBlockDuration * 45).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
},
},
Expand Down Expand Up @@ -185,22 +189,24 @@ func TestBitswapRetriever(t *testing.T) {
},
expectedStats: map[cid.Cid]*types.RetrievalStats{
cid1: {
RootCid: cid1,
Size: sizeOf(tbc1.AllBlocks()[:5]),
Blocks: 5,
Duration: remoteBlockDuration * 5,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid1,
Size: sizeOf(tbc1.AllBlocks()[:5]),
Blocks: 5,
Duration: remoteBlockDuration * 5,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
cid2: {
RootCid: cid2,
Size: sizeOf(tbc2.AllBlocks()[:5]),
Blocks: 5,
Duration: remoteBlockDuration * 5,
AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid2,
Size: sizeOf(tbc2.AllBlocks()[:5]),
Blocks: 5,
Duration: remoteBlockDuration * 5,
AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
},
},
Expand Down