Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File analysis worker pool #347

Merged
merged 5 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 181 additions & 31 deletions internal/analysis/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,55 +89,205 @@ func processAudioFile(settings *conf.Settings, audioInfo *myaudio.AudioInfo) ([]
// Calculate audio duration
duration := time.Duration(float64(audioInfo.TotalSamples) / float64(audioInfo.SampleRate) * float64(time.Second))

var allNotes []datastore.Note
// Get filename and truncate if necessary
filename := truncateFilename(settings.Input.Path)

startTime := time.Now()
chunkCount := 0
chunkCount := 1
lastChunkCount := 0
lastProgressUpdate := startTime

// Get filename and truncate if necessary (showing max 30 chars)
filename := truncateFilename(settings.Input.Path)
// Moving average window for chunks/sec calculation
const windowSize = 10 // Number of samples to average
chunkRates := make([]float64, 0, windowSize)

// Set number of workers to 1
numWorkers := 1
tphakala marked this conversation as resolved.
Show resolved Hide resolved

// Set predStart to 0 time
if settings.Debug {
fmt.Printf("DEBUG: Starting analysis with %d total chunks and %d workers\n", totalChunks, numWorkers)
}

// Create buffered channels for processing
chunkChan := make(chan []float32, 4)
resultChan := make(chan []datastore.Note, 4)
errorChan := make(chan error, 1)
doneChan := make(chan struct{})

var allNotes []datastore.Note
predStart := time.Time{}

// Process audio chunks as they're read
err := myaudio.ReadAudioFileBuffered(settings, func(chunk []float32) error {
chunkCount++
fmt.Printf("\r\033[K\033[37m📄 %s [%s]\033[0m | \033[33m🔍 Analyzing chunk %d/%d\033[0m %s",
filename,
duration.Round(time.Second),
chunkCount,
totalChunks,
birdnet.EstimateTimeRemaining(startTime, chunkCount, totalChunks))

notes, err := bn.ProcessChunk(chunk, predStart)
if err != nil {
return err
}
// Start worker goroutines for BirdNET analysis
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
if settings.Debug {
fmt.Printf("DEBUG: Worker %d started\n", workerID)
}
for chunk := range chunkChan {
notes, err := bn.ProcessChunk(chunk, predStart)
if err != nil {
if settings.Debug {
fmt.Printf("DEBUG: Worker %d encountered error: %v\n", workerID, err)
}
errorChan <- err
return
}
Comment on lines +127 to +134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential deadlock in error handling

The error handling in worker goroutines could lead to a deadlock. If an error occurs, the worker exits immediately without processing remaining chunks in chunkChan.

Consider this approach:

 for chunk := range chunkChan {
     notes, err := bn.ProcessChunk(chunk, predStart)
     if err != nil {
         if settings.Debug {
             fmt.Printf("DEBUG: Worker %d encountered error: %v\n", workerID, err)
         }
         errorChan <- err
-        return
+        continue
     }

Committable suggestion skipped: line range outside the PR's diff.


// Filter notes based on included species list
var filteredNotes []datastore.Note
for _, note := range notes {
if settings.IsSpeciesIncluded(note.ScientificName) {
filteredNotes = append(filteredNotes, note)
}
}

// Filter notes based on included species list
var filteredNotes []datastore.Note
for _, note := range notes {
if settings.IsSpeciesIncluded(note.ScientificName) {
filteredNotes = append(filteredNotes, note)
if settings.Debug {
fmt.Printf("DEBUG: Worker %d sending results\n", workerID)
}
resultChan <- filteredNotes
}
if settings.Debug {
fmt.Printf("DEBUG: Worker %d finished\n", workerID)
}
}(i)
}

// Start progress monitoring goroutine
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for range ticker.C {
select {
case <-doneChan:
return
default:
currentTime := time.Now()
timeSinceLastUpdate := currentTime.Sub(lastProgressUpdate)

// Calculate current chunk rate
chunksProcessed := chunkCount - lastChunkCount
currentRate := float64(chunksProcessed) / timeSinceLastUpdate.Seconds()

// Update moving average
if len(chunkRates) >= windowSize {
// Remove oldest value
chunkRates = chunkRates[1:]
}
chunkRates = append(chunkRates, currentRate)

// Calculate average rate
var avgRate float64
if len(chunkRates) > 0 {
sum := 0.0
for _, rate := range chunkRates {
sum += rate
}
avgRate = sum / float64(len(chunkRates))
}

// Update counters for next iteration
lastChunkCount = chunkCount
lastProgressUpdate = currentTime

fmt.Printf("\r\033[K\033[37m📄 %s [%s]\033[0m | \033[33m🔍 Analyzing chunk %d/%d\033[0m | \033[36m%.1f chunks/sec\033[0m %s",
filename,
duration.Round(time.Second),
chunkCount,
totalChunks,
avgRate,
birdnet.EstimateTimeRemaining(startTime, chunkCount, totalChunks))
}
}
}()

allNotes = append(allNotes, filteredNotes...)
// Start result collector goroutine
var processingError error
go func() {
if settings.Debug {
fmt.Println("DEBUG: Result collector started")
}
for i := 1; i <= totalChunks; i++ {
select {
case notes := <-resultChan:
if settings.Debug {
fmt.Printf("DEBUG: Received results for chunk #%d\n", chunkCount)
}
allNotes = append(allNotes, notes...)
chunkCount++
case err := <-errorChan:
if settings.Debug {
fmt.Printf("DEBUG: Collector received error: %v\n", err)
}
processingError = err
close(doneChan)
return
// Add timeout to prevent hanging
case <-time.After(5 * time.Second):
if settings.Debug {
fmt.Printf("DEBUG: Timeout waiting for chunk %d results\n", i)
}
processingError = fmt.Errorf("timeout waiting for analysis results")
close(doneChan)
return
}
}
if settings.Debug {
fmt.Println("DEBUG: Collector finished")
}
close(doneChan)
}()

// advance predStart by 3 seconds - overlap
predStart = predStart.Add(time.Duration((3.0 - bn.Settings.BirdNET.Overlap) * float64(time.Second)))
return nil
// Read and send audio chunks with timeout
err := myaudio.ReadAudioFileBuffered(settings, func(chunk []float32) error {
select {
case chunkChan <- chunk:
// advance predStart by 3 seconds - overlap
predStart = predStart.Add(time.Duration((3.0 - bn.Settings.BirdNET.Overlap) * float64(time.Second)))
return nil
case <-doneChan:
return processingError
case <-time.After(5 * time.Second):
return fmt.Errorf("timeout sending chunk to processing")
}
Comment on lines +241 to +252
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add timeout handling for chunk processing

While there's a timeout for sending chunks, there's no timeout for the actual processing of chunks, which could lead to hanging workers.

Consider adding a context with timeout:

+ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+defer cancel()
 err := myaudio.ReadAudioFileBuffered(settings, func(chunk []float32) error {
     select {
     case chunkChan <- chunk:
         predStart = predStart.Add(time.Duration((3.0 - bn.Settings.BirdNET.Overlap) * float64(time.Second)))
         return nil
     case <-doneChan:
         return processingError
+    case <-ctx.Done():
+        return fmt.Errorf("context timeout: %w", ctx.Err())
     case <-time.After(5 * time.Second):
         return fmt.Errorf("timeout sending chunk to processing")
     }
 })

Committable suggestion skipped: line range outside the PR's diff.

})

if settings.Debug {
fmt.Println("DEBUG: Finished reading audio file")
}
close(chunkChan)

if settings.Debug {
fmt.Println("DEBUG: Waiting for processing to complete")
}
<-doneChan // Wait for processing to complete

if err != nil {
if settings.Debug {
fmt.Printf("DEBUG: File processing error: %v\n", err)
}
return nil, fmt.Errorf("error processing audio: %w", err)
}

// Show total time taken for analysis, including audio length
fmt.Printf("\r\033[K\033[37m📄 %s [%s]\033[0m | \033[32m✅ Analysis completed in %s\033[0m\n",
if processingError != nil {
if settings.Debug {
fmt.Printf("DEBUG: Processing error encountered: %v\n", processingError)
}
return nil, processingError
}

if settings.Debug {
fmt.Println("DEBUG: Analysis completed successfully")
}
// Update final statistics
totalTime := time.Since(startTime)
avgChunksPerSec := float64(totalChunks) / totalTime.Seconds()

fmt.Printf("\r\033[K\033[37m📄 %s [%s]\033[0m | \033[32m✅ Analysis completed in %s\033[0m | \033[36m%.1f chunks/sec avg\033[0m\n",
filename,
duration.Round(time.Second),
birdnet.FormatDuration(time.Since(startTime)))
birdnet.FormatDuration(totalTime),
avgChunksPerSec)

return allNotes, nil
}
Expand Down
2 changes: 0 additions & 2 deletions internal/myaudio/readfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ func GetAudioInfo(filePath string) (AudioInfo, error) {

// ReadAudioFileBuffered reads and processes audio data in chunks
func ReadAudioFileBuffered(settings *conf.Settings, callback AudioChunkCallback) error {
fmt.Print("- Reading audio data")

file, err := os.Open(settings.Input.Path)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/myaudio/readfile_flac.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func readFLACBuffered(file *os.File, settings *conf.Settings, callback AudioChun
}

// Handle the last chunk
if len(currentChunk) >= minLenSamples {
if len(currentChunk) >= minLenSamples || len(currentChunk) > 0 {
if len(currentChunk) < secondsSamples {
padding := make([]float32, secondsSamples-len(currentChunk))
currentChunk = append(currentChunk, padding...)
Expand Down
2 changes: 1 addition & 1 deletion internal/myaudio/readfile_wav.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func readWAVBuffered(file *os.File, settings *conf.Settings, callback AudioChunk
}

// Handle the last chunk
if len(currentChunk) >= minLenSamples {
if len(currentChunk) >= minLenSamples || len(currentChunk) > 0 {
if len(currentChunk) < secondsSamples {
padding := make([]float32, secondsSamples-len(currentChunk))
currentChunk = append(currentChunk, padding...)
Expand Down
121 changes: 121 additions & 0 deletions internal/suncalc/suncalc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package suncalc

import (
"testing"
"time"
)

func TestNewSunCalc(t *testing.T) {
latitude, longitude := 60.1699, 24.9384 // Helsinki coordinates
sc := NewSunCalc(latitude, longitude)
if sc == nil {
t.Fatal("NewSunCalc returned nil")
}

if sc.observer.Latitude != latitude {
t.Errorf("Expected latitude %v, got %v", latitude, sc.observer.Latitude)
}

if sc.observer.Longitude != longitude {
t.Errorf("Expected longitude %v, got %v", longitude, sc.observer.Longitude)
}
}

func TestGetSunEventTimes(t *testing.T) {
// Helsinki coordinates
sc := NewSunCalc(60.1699, 24.9384)

// Test date (midsummer in Helsinki)
date := time.Date(2024, 6, 21, 0, 0, 0, 0, time.UTC)

// First call to calculate and cache
times1, err := sc.GetSunEventTimes(date)
if err != nil {
t.Fatalf("Failed to get sun event times: %v", err)
}

// Verify times are not zero
if times1.Sunrise.IsZero() {
t.Error("Sunrise time is zero")
}
if times1.Sunset.IsZero() {
t.Error("Sunset time is zero")
}
if times1.CivilDawn.IsZero() {
t.Error("Civil dawn time is zero")
}
if times1.CivilDusk.IsZero() {
t.Error("Civil dusk time is zero")
}

// Second call to test cache
times2, err := sc.GetSunEventTimes(date)
if err != nil {
t.Fatalf("Failed to get cached sun event times: %v", err)
}

// Verify cached times match original times
if !times1.Sunrise.Equal(times2.Sunrise) {
t.Error("Cached sunrise time doesn't match original")
}
if !times1.Sunset.Equal(times2.Sunset) {
t.Error("Cached sunset time doesn't match original")
}
}

func TestGetSunriseTime(t *testing.T) {
sc := NewSunCalc(60.1699, 24.9384)
date := time.Date(2024, 6, 21, 0, 0, 0, 0, time.UTC)

sunrise, err := sc.GetSunriseTime(date)
if err != nil {
t.Fatalf("Failed to get sunrise time: %v", err)
}

if sunrise.IsZero() {
t.Error("Sunrise time is zero")
}
}

func TestGetSunsetTime(t *testing.T) {
sc := NewSunCalc(60.1699, 24.9384)
date := time.Date(2024, 6, 21, 0, 0, 0, 0, time.UTC)

sunset, err := sc.GetSunsetTime(date)
if err != nil {
t.Fatalf("Failed to get sunset time: %v", err)
}

if sunset.IsZero() {
t.Error("Sunset time is zero")
}
}

func TestCacheConsistency(t *testing.T) {
sc := NewSunCalc(60.1699, 24.9384)
date := time.Date(2024, 6, 21, 0, 0, 0, 0, time.UTC)

// Get times twice
times1, err := sc.GetSunEventTimes(date)
if err != nil {
t.Fatalf("Failed to get initial sun event times: %v", err)
}

// Verify cache entry exists
dateKey := date.Format("2006-01-02")
sc.lock.RLock()
entry, exists := sc.cache[dateKey]
sc.lock.RUnlock()

if !exists {
t.Error("Cache entry not found after calculation")
}

if !entry.date.Equal(date) {
t.Error("Cached date doesn't match requested date")
}

if !entry.times.Sunrise.Equal(times1.Sunrise) {
t.Error("Cached sunrise time doesn't match calculated time")
}
}
Loading