Skip to content

Commit

Permalink
Merge pull request #347 from tphakala/file-analysis-worker-pool
Browse files Browse the repository at this point in the history
File analysis worker pool
  • Loading branch information
tphakala authored Dec 23, 2024
2 parents 414406f + 3ced291 commit 94091bf
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 35 deletions.
212 changes: 181 additions & 31 deletions internal/analysis/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,55 +89,205 @@ func processAudioFile(settings *conf.Settings, audioInfo *myaudio.AudioInfo) ([]
// Calculate audio duration
duration := time.Duration(float64(audioInfo.TotalSamples) / float64(audioInfo.SampleRate) * float64(time.Second))

var allNotes []datastore.Note
// Get filename and truncate if necessary
filename := truncateFilename(settings.Input.Path)

startTime := time.Now()
chunkCount := 0
chunkCount := 1
lastChunkCount := 0
lastProgressUpdate := startTime

// Get filename and truncate if necessary (showing max 30 chars)
filename := truncateFilename(settings.Input.Path)
// Moving average window for chunks/sec calculation
const windowSize = 10 // Number of samples to average
chunkRates := make([]float64, 0, windowSize)

// Set number of workers to 1
numWorkers := 1

// Set predStart to 0 time
if settings.Debug {
fmt.Printf("DEBUG: Starting analysis with %d total chunks and %d workers\n", totalChunks, numWorkers)
}

// Create buffered channels for processing
chunkChan := make(chan []float32, 4)
resultChan := make(chan []datastore.Note, 4)
errorChan := make(chan error, 1)
doneChan := make(chan struct{})

var allNotes []datastore.Note
predStart := time.Time{}

// Process audio chunks as they're read
err := myaudio.ReadAudioFileBuffered(settings, func(chunk []float32) error {
chunkCount++
fmt.Printf("\r\033[K\033[37m📄 %s [%s]\033[0m | \033[33m🔍 Analyzing chunk %d/%d\033[0m %s",
filename,
duration.Round(time.Second),
chunkCount,
totalChunks,
birdnet.EstimateTimeRemaining(startTime, chunkCount, totalChunks))

notes, err := bn.ProcessChunk(chunk, predStart)
if err != nil {
return err
}
// Start worker goroutines for BirdNET analysis
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
if settings.Debug {
fmt.Printf("DEBUG: Worker %d started\n", workerID)
}
for chunk := range chunkChan {
notes, err := bn.ProcessChunk(chunk, predStart)
if err != nil {
if settings.Debug {
fmt.Printf("DEBUG: Worker %d encountered error: %v\n", workerID, err)
}
errorChan <- err
return
}

// Filter notes based on included species list
var filteredNotes []datastore.Note
for _, note := range notes {
if settings.IsSpeciesIncluded(note.ScientificName) {
filteredNotes = append(filteredNotes, note)
}
}

// Filter notes based on included species list
var filteredNotes []datastore.Note
for _, note := range notes {
if settings.IsSpeciesIncluded(note.ScientificName) {
filteredNotes = append(filteredNotes, note)
if settings.Debug {
fmt.Printf("DEBUG: Worker %d sending results\n", workerID)
}
resultChan <- filteredNotes
}
if settings.Debug {
fmt.Printf("DEBUG: Worker %d finished\n", workerID)
}
}(i)
}

// Start progress monitoring goroutine
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for range ticker.C {
select {
case <-doneChan:
return
default:
currentTime := time.Now()
timeSinceLastUpdate := currentTime.Sub(lastProgressUpdate)

// Calculate current chunk rate
chunksProcessed := chunkCount - lastChunkCount
currentRate := float64(chunksProcessed) / timeSinceLastUpdate.Seconds()

// Update moving average
if len(chunkRates) >= windowSize {
// Remove oldest value
chunkRates = chunkRates[1:]
}
chunkRates = append(chunkRates, currentRate)

// Calculate average rate
var avgRate float64
if len(chunkRates) > 0 {
sum := 0.0
for _, rate := range chunkRates {
sum += rate
}
avgRate = sum / float64(len(chunkRates))
}

// Update counters for next iteration
lastChunkCount = chunkCount
lastProgressUpdate = currentTime

fmt.Printf("\r\033[K\033[37m📄 %s [%s]\033[0m | \033[33m🔍 Analyzing chunk %d/%d\033[0m | \033[36m%.1f chunks/sec\033[0m %s",
filename,
duration.Round(time.Second),
chunkCount,
totalChunks,
avgRate,
birdnet.EstimateTimeRemaining(startTime, chunkCount, totalChunks))
}
}
}()

allNotes = append(allNotes, filteredNotes...)
// Start result collector goroutine
var processingError error
go func() {
if settings.Debug {
fmt.Println("DEBUG: Result collector started")
}
for i := 1; i <= totalChunks; i++ {
select {
case notes := <-resultChan:
if settings.Debug {
fmt.Printf("DEBUG: Received results for chunk #%d\n", chunkCount)
}
allNotes = append(allNotes, notes...)
chunkCount++
case err := <-errorChan:
if settings.Debug {
fmt.Printf("DEBUG: Collector received error: %v\n", err)
}
processingError = err
close(doneChan)
return
// Add timeout to prevent hanging
case <-time.After(5 * time.Second):
if settings.Debug {
fmt.Printf("DEBUG: Timeout waiting for chunk %d results\n", i)
}
processingError = fmt.Errorf("timeout waiting for analysis results")
close(doneChan)
return
}
}
if settings.Debug {
fmt.Println("DEBUG: Collector finished")
}
close(doneChan)
}()

// advance predStart by 3 seconds - overlap
predStart = predStart.Add(time.Duration((3.0 - bn.Settings.BirdNET.Overlap) * float64(time.Second)))
return nil
// Read and send audio chunks with timeout
err := myaudio.ReadAudioFileBuffered(settings, func(chunk []float32) error {
select {
case chunkChan <- chunk:
// advance predStart by 3 seconds - overlap
predStart = predStart.Add(time.Duration((3.0 - bn.Settings.BirdNET.Overlap) * float64(time.Second)))
return nil
case <-doneChan:
return processingError
case <-time.After(5 * time.Second):
return fmt.Errorf("timeout sending chunk to processing")
}
})

if settings.Debug {
fmt.Println("DEBUG: Finished reading audio file")
}
close(chunkChan)

if settings.Debug {
fmt.Println("DEBUG: Waiting for processing to complete")
}
<-doneChan // Wait for processing to complete

if err != nil {
if settings.Debug {
fmt.Printf("DEBUG: File processing error: %v\n", err)
}
return nil, fmt.Errorf("error processing audio: %w", err)
}

// Show total time taken for analysis, including audio length
fmt.Printf("\r\033[K\033[37m📄 %s [%s]\033[0m | \033[32m✅ Analysis completed in %s\033[0m\n",
if processingError != nil {
if settings.Debug {
fmt.Printf("DEBUG: Processing error encountered: %v\n", processingError)
}
return nil, processingError
}

if settings.Debug {
fmt.Println("DEBUG: Analysis completed successfully")
}
// Update final statistics
totalTime := time.Since(startTime)
avgChunksPerSec := float64(totalChunks) / totalTime.Seconds()

fmt.Printf("\r\033[K\033[37m📄 %s [%s]\033[0m | \033[32m✅ Analysis completed in %s\033[0m | \033[36m%.1f chunks/sec avg\033[0m\n",
filename,
duration.Round(time.Second),
birdnet.FormatDuration(time.Since(startTime)))
birdnet.FormatDuration(totalTime),
avgChunksPerSec)

return allNotes, nil
}
Expand Down
2 changes: 0 additions & 2 deletions internal/myaudio/readfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ func GetAudioInfo(filePath string) (AudioInfo, error) {

// ReadAudioFileBuffered reads and processes audio data in chunks
func ReadAudioFileBuffered(settings *conf.Settings, callback AudioChunkCallback) error {
fmt.Print("- Reading audio data")

file, err := os.Open(settings.Input.Path)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/myaudio/readfile_flac.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func readFLACBuffered(file *os.File, settings *conf.Settings, callback AudioChun
}

// Handle the last chunk
if len(currentChunk) >= minLenSamples {
if len(currentChunk) >= minLenSamples || len(currentChunk) > 0 {
if len(currentChunk) < secondsSamples {
padding := make([]float32, secondsSamples-len(currentChunk))
currentChunk = append(currentChunk, padding...)
Expand Down
2 changes: 1 addition & 1 deletion internal/myaudio/readfile_wav.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func readWAVBuffered(file *os.File, settings *conf.Settings, callback AudioChunk
}

// Handle the last chunk
if len(currentChunk) >= minLenSamples {
if len(currentChunk) >= minLenSamples || len(currentChunk) > 0 {
if len(currentChunk) < secondsSamples {
padding := make([]float32, secondsSamples-len(currentChunk))
currentChunk = append(currentChunk, padding...)
Expand Down

0 comments on commit 94091bf

Please sign in to comment.