Skip to content

Commit

Permalink
Merge pull request #198 from intel/runperfrace
Browse files Browse the repository at this point in the history
address race condition in metrics event processing
  • Loading branch information
harp-intel authored Feb 6, 2025
2 parents 4188374 + 773aa4c commit c4f2157
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 47 deletions.
2 changes: 1 addition & 1 deletion cmd/metrics/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type MetricFrame struct {
}

// ProcessEvents is responsible for producing metrics from raw perf events
func ProcessEvents(perfEvents [][]byte, eventGroupDefinitions []GroupDefinition, metricDefinitions []MetricDefinition, processes []Process, previousTimestamp float64, metadata Metadata, outputDir string) (metricFrames []MetricFrame, timeStamp float64, err error) {
func ProcessEvents(perfEvents [][]byte, eventGroupDefinitions []GroupDefinition, metricDefinitions []MetricDefinition, processes []Process, previousTimestamp float64, metadata Metadata) (metricFrames []MetricFrame, timeStamp float64, err error) {
var eventFrames []EventFrame
if eventFrames, err = GetEventFrames(perfEvents, eventGroupDefinitions, flagScope, flagGranularity, metadata); err != nil { // arrange the events into groups
err = fmt.Errorf("failed to put perf events into groups: %v", err)
Expand Down
89 changes: 43 additions & 46 deletions cmd/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,14 +1212,15 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
}
// must manually terminate perf in cgroup scope when a timeout is specified and/or need to refresh cgroups
startPerfTimestamp := time.Now()
var timeout int
var cgroupTimeout int
if flagScope == scopeCgroup && (flagDuration != 0 || len(flagCidList) == 0) {
if flagDuration > 0 && flagDuration < flagRefresh {
timeout = flagDuration
cgroupTimeout = flagDuration
} else {
timeout = flagRefresh
cgroupTimeout = flagRefresh
}
}
// Start a goroutine to wait for and then process perf output
// Use a timer to determine when we received an entire frame of events from perf
// The timer will expire when no lines (events) have been received from perf for more than 100ms. This
// works because perf writes the events to stderr in a burst every collection interval, e.g., 5 seconds.
Expand All @@ -1230,22 +1231,17 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
frameCount := 0
stopAnonymousFuncChannel := make(chan bool)
go func() {
stop := false
for {
select {
case <-t1.C: // waits for timer to expire
case <-stopAnonymousFuncChannel: // wait for signal to exit the goroutine
return
stop = true // exit the loop
}
if len(outputLines) != 0 {
if flagWriteEventsToFile {
if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil {
err = fmt.Errorf("failed to write events to raw file: %v", err)
slog.Error(err.Error())
return
}
}
if !stop && len(outputLines) != 0 {
// process the events
var metricFrames []MetricFrame
if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata, outputDir); err != nil {
if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata); err != nil {
slog.Warn(err.Error())
outputLines = [][]byte{} // empty it
continue
Expand All @@ -1254,57 +1250,58 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
frameCount += 1
metricFrames[i].FrameCount = frameCount
}
// send the metrics frames out to be printed
frameChannel <- metricFrames
outputLines = [][]byte{} // empty it
// write the events to a file
if flagWriteEventsToFile {
if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil {
err = fmt.Errorf("failed to write events to raw file: %v", err)
slog.Error(err.Error())
return
}
}
// empty the outputLines
outputLines = [][]byte{}
}
if timeout != 0 && int(time.Since(startPerfTimestamp).Seconds()) > timeout {
err = localCommand.Process.Signal(os.Interrupt)
if err != nil {
err = fmt.Errorf("failed to terminate perf: %v", err)
slog.Error(err.Error())
// for cgroup scope, terminate perf if timeout is reached
if flagScope == scopeCgroup {
if stop || (cgroupTimeout != 0 && int(time.Since(startPerfTimestamp).Seconds()) > cgroupTimeout) {
err = localCommand.Process.Signal(os.Interrupt)
if err != nil {
err = fmt.Errorf("failed to terminate perf: %v", err)
slog.Error(err.Error())
}
}
}
if stop {
break
}
}
// signal that the goroutine is done
stopAnonymousFuncChannel <- true
}()
// read perf output
// receive perf output
done := false
for !done {
select {
case err := <-scriptErrorChannel:
case err := <-scriptErrorChannel: // if there is an error running perf, it comes here
if err != nil {
slog.Error("error from perf", slog.String("error", err.Error()))
}
done = true
case exitCode := <-exitcodeChannel:
done = true // exit the loop
case exitCode := <-exitcodeChannel: // when perf exits, the exit code comes to this channel
slog.Debug("perf exited", slog.Int("exit code", exitCode))
done = true
case line := <-stderrChannel:
done = true // exit the loop
case line := <-stderrChannel: // perf output comes in on this channel, one line at a time
t1.Stop()
t1.Reset(100 * time.Millisecond) // 100ms is somewhat arbitrary, but seems to work
// accumulate the lines, they will be processed in the goroutine when the timer expires
outputLines = append(outputLines, []byte(line))
}
}
t1.Stop()
// send signal to exit the goroutine
defer func() { stopAnonymousFuncChannel <- true }()
// process any remaining events
if len(outputLines) != 0 {
if flagWriteEventsToFile {
if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil {
err = fmt.Errorf("failed to write events to raw file: %v", err)
slog.Error(err.Error())
return
}
}
var metricFrames []MetricFrame
if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata, outputDir); err != nil {
slog.Error(err.Error())
return
}
for i := range metricFrames {
frameCount += 1
metricFrames[i].FrameCount = frameCount
}
frameChannel <- metricFrames
}
stopAnonymousFuncChannel <- true
// wait for the goroutine to exit
<-stopAnonymousFuncChannel
}

0 comments on commit c4f2157

Please sign in to comment.