Skip to content

Commit

Permalink
fix: fix file analysis results by keeping track of file position for …
Browse files Browse the repository at this point in the history
…each analysed chunk
  • Loading branch information
tphakala committed Dec 28, 2024
1 parent 19f2d98 commit cccee8b
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions internal/analysis/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func FileAnalysis(settings *conf.Settings) error {

notes, err := processAudioFile(settings, &audioInfo)
if err != nil {
// If we have partial results, write them before returning the error
if notes != nil && len(notes) > 0 {

Check failure on line 37 in internal/analysis/file.go

View workflow job for this annotation

GitHub Actions / lint

S1009: should omit nil check; len() for []github.com/tphakala/birdnet-go/internal/datastore.Note is defined as zero (gosimple)

Check failure on line 37 in internal/analysis/file.go

View workflow job for this annotation

GitHub Actions / golangci / lint

S1009: should omit nil check; len() for []github.com/tphakala/birdnet-go/internal/datastore.Note is defined as zero (gosimple)

Check failure on line 37 in internal/analysis/file.go

View workflow job for this annotation

GitHub Actions / golangci / lint

S1009: should omit nil check; len() for []github.com/tphakala/birdnet-go/internal/datastore.Note is defined as zero (gosimple)
fmt.Printf("\n\033[33m⚠️ Writing partial results before exiting due to error\033[0m\n")
if writeErr := writeResults(settings, notes); writeErr != nil {
// Combine both errors in the return message
return fmt.Errorf("analysis error: %v; failed to write partial results: %v", err, writeErr)

Check failure on line 41 in internal/analysis/file.go

View workflow job for this annotation

GitHub Actions / lint

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)

Check failure on line 41 in internal/analysis/file.go

View workflow job for this annotation

GitHub Actions / golangci / lint

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)

Check failure on line 41 in internal/analysis/file.go

View workflow job for this annotation

GitHub Actions / golangci / lint

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}
}
return err
}

Expand Down Expand Up @@ -86,6 +94,12 @@ func processAudioFile(settings *conf.Settings, audioInfo *myaudio.AudioInfo) ([]
settings.BirdNET.Overlap,
)

// Define a type for audio chunks with file position
type audioChunk struct {
Data []float32
FilePosition time.Time
}

// Calculate audio duration
duration := time.Duration(float64(audioInfo.TotalSamples) / float64(audioInfo.SampleRate) * float64(time.Second))

Expand All @@ -109,13 +123,12 @@ func processAudioFile(settings *conf.Settings, audioInfo *myaudio.AudioInfo) ([]
}

// Create buffered channels for processing
chunkChan := make(chan []float32, 4)
chunkChan := make(chan audioChunk, 4)
resultChan := make(chan []datastore.Note, 4)
errorChan := make(chan error, 1)
doneChan := make(chan struct{})

var allNotes []datastore.Note
predStart := time.Time{}

// Start worker goroutines for BirdNET analysis
for i := 0; i < numWorkers; i++ {
Expand All @@ -124,7 +137,7 @@ func processAudioFile(settings *conf.Settings, audioInfo *myaudio.AudioInfo) ([]
fmt.Printf("DEBUG: Worker %d started\n", workerID)
}
for chunk := range chunkChan {
notes, err := bn.ProcessChunk(chunk, predStart)
notes, err := bn.ProcessChunk(chunk.Data, chunk.FilePosition)
if err != nil {
if settings.Debug {
fmt.Printf("DEBUG: Worker %d encountered error: %v\n", workerID, err)
Expand Down Expand Up @@ -222,12 +235,11 @@ func processAudioFile(settings *conf.Settings, audioInfo *myaudio.AudioInfo) ([]
processingError = err
close(doneChan)
return
// Add timeout to prevent hanging
case <-time.After(5 * time.Second):
if settings.Debug {
fmt.Printf("DEBUG: Timeout waiting for chunk %d results\n", i)
}
processingError = fmt.Errorf("timeout waiting for analysis results")
processingError = fmt.Errorf("timeout waiting for analysis results (processed %d/%d chunks)", chunkCount, totalChunks)
close(doneChan)
return
}
Expand All @@ -238,12 +250,20 @@ func processAudioFile(settings *conf.Settings, audioInfo *myaudio.AudioInfo) ([]
close(doneChan)
}()

// Read and send audio chunks with timeout
err := myaudio.ReadAudioFileBuffered(settings, func(chunk []float32) error {
// Initialize filePosition before the loop
filePosition := time.Time{}

// Read and send audio chunks with timing information
err := myaudio.ReadAudioFileBuffered(settings, func(chunkData []float32) error {
chunk := audioChunk{
Data: chunkData,
FilePosition: filePosition,
}

select {
case chunkChan <- chunk:
// advance predStart by 3 seconds - overlap
predStart = predStart.Add(time.Duration((3.0 - bn.Settings.BirdNET.Overlap) * float64(time.Second)))
// Update predStart for next chunk
filePosition = filePosition.Add(time.Duration((3.0 - bn.Settings.BirdNET.Overlap) * float64(time.Second)))
return nil
case <-doneChan:
return processingError
Expand Down Expand Up @@ -273,7 +293,7 @@ func processAudioFile(settings *conf.Settings, audioInfo *myaudio.AudioInfo) ([]
if settings.Debug {
fmt.Printf("DEBUG: Processing error encountered: %v\n", processingError)
}
return nil, processingError
return allNotes, processingError
}

if settings.Debug {
Expand Down

0 comments on commit cccee8b

Please sign in to comment.