Skip to content

Commit

Permalink
refactor(dashboard): refactor back-pressure calculation (#20001)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Jan 2, 2025
1 parent 104c4bf commit c28ea2d
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 379 deletions.
46 changes: 0 additions & 46 deletions dashboard/components/RateBar.tsx

This file was deleted.

174 changes: 0 additions & 174 deletions dashboard/lib/api/metric.ts

This file was deleted.

118 changes: 57 additions & 61 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ import FragmentDependencyGraph from "../components/FragmentDependencyGraph"
import FragmentGraph from "../components/FragmentGraph"
import Title from "../components/Title"
import useErrorToast from "../hook/useErrorToast"
import api from "../lib/api/api"
import useFetch from "../lib/api/fetch"
import {
calculateBPRate,
calculateCumulativeBp,
fetchEmbeddedBackPressure,
} from "../lib/api/metric"
import {
getFragmentsByJobId,
getRelationIdInfos,
Expand Down Expand Up @@ -190,13 +186,43 @@ function buildFragmentDependencyAsEdges(

const SIDEBAR_WIDTH = 225

// The state of the embedded back pressure metrics.
// The metrics from previous fetch are stored here to calculate the rate.
interface EmbeddedBackPressureInfo {
previous: BackPressureInfo[]
current: BackPressureInfo[]
totalBackpressureNs: BackPressureInfo[]
totalDurationNs: number
export class BackPressureSnapshot {
// The first fetch result.
// key: `<fragmentId>_<downstreamFragmentId>`
// value: output blocking duration in nanoseconds.
result: Map<string, number>

// The time of the current fetch in milliseconds. (`Date.now()`)
time: number

constructor(result: Map<string, number>, time: number) {
this.result = result
this.time = time
}

static fromResponse(channelStats: {
[key: string]: BackPressureInfo
}): BackPressureSnapshot {
const result = new Map<string, number>()
for (const [key, info] of Object.entries(channelStats)) {
result.set(key, info.value / info.actorCount)
}
return new BackPressureSnapshot(result, Date.now())
}

getRate(initial: BackPressureSnapshot): Map<string, number> {
const result = new Map<string, number>()
for (const [key, value] of this.result) {
const initialValue = initial.result.get(key)
if (initialValue) {
result.set(
key,
(value - initialValue) / (this.time - initial.time) / 1000000
)
}
}
return result
}
}

export default function Streaming() {
Expand Down Expand Up @@ -308,40 +334,30 @@ export default function Streaming() {
toast(new Error(`Actor ${searchActorIdInt} not found`))
}

// Periodically fetch embedded back-pressure from Meta node
// Didn't call `useFetch()` because the `setState` way is special.
const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] =
useState<EmbeddedBackPressureInfo>()
// Keep the initial snapshot to calculate the rate of back pressure
const [backPressureRate, setBackPressureRate] =
useState<Map<string, number>>()

const [fragmentStats, setFragmentStats] = useState<{
[key: number]: FragmentStats
}>()

useEffect(() => {
// The initial snapshot is used to calculate the rate of back pressure
// It's not used to render the page directly, so we don't need to set it in the state
let initialSnapshot: BackPressureSnapshot | undefined

function refresh() {
fetchEmbeddedBackPressure().then(
api.get("/metrics/fragment/embedded_back_pressures").then(
(response) => {
let newBP =
response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? []
setEmbeddedBackPressureInfo((prev) =>
prev
? {
previous: prev.current,
current: newBP,
totalBackpressureNs: calculateCumulativeBp(
prev.totalBackpressureNs,
prev.current,
newBP
),
totalDurationNs:
prev.totalDurationNs + INTERVAL_MS * 1000 * 1000,
}
: {
previous: newBP, // Use current value to show zero rate, but it's fine
current: newBP,
totalBackpressureNs: [],
totalDurationNs: 0,
}
let snapshot = BackPressureSnapshot.fromResponse(
response.channelStats
)
if (!initialSnapshot) {
initialSnapshot = snapshot
} else {
setBackPressureRate(snapshot.getRate(initialSnapshot!))
}
setFragmentStats(response.fragmentStats)
},
(e) => {
Expand All @@ -350,33 +366,13 @@ export default function Streaming() {
}
)
}
refresh()
const interval = setInterval(refresh, INTERVAL_MS)
refresh() // run once immediately
const interval = setInterval(refresh, INTERVAL_MS) // and then run every interval
return () => {
clearInterval(interval)
}
}, [toast])

const backPressures = useMemo(() => {
if (embeddedBackPressureInfo) {
let map = new Map()

if (embeddedBackPressureInfo) {
const metrics = calculateBPRate(
embeddedBackPressureInfo.totalBackpressureNs,
embeddedBackPressureInfo.totalDurationNs
)
for (const m of metrics.outputBufferBlockingDuration) {
map.set(
`${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`,
m.sample[0].value
)
}
}
return map
}
}, [embeddedBackPressureInfo])

const retVal = (
<Flex p={3} height="calc(100vh - 20px)" flexDirection="column">
<Title>Fragment Graph</Title>
Expand Down Expand Up @@ -503,7 +499,7 @@ export default function Streaming() {
selectedFragmentId={selectedFragmentId?.toString()}
fragmentDependency={fragmentDependency}
planNodeDependencies={planNodeDependencies}
backPressures={backPressures}
backPressures={backPressureRate}
fragmentStats={fragmentStats}
/>
)}
Expand Down
Loading

0 comments on commit c28ea2d

Please sign in to comment.