-
-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: reduce complexity of processAudioFile() function #411
Conversation
- Created a new `monitorProgress` function to encapsulate progress tracking logic - Moved progress monitoring goroutine implementation into the new function - Simplified `processAudioFile` by extracting complex progress tracking code - Maintained existing progress calculation and display behavior
- Extracted worker initialization into a separate `startWorkers` function - Defined `audioChunk` type at package level for better reusability - Simplified note filtering by using index-based iteration - Improved code organization and separation of concerns in audio file processing
- Created `processChunk` function to encapsulate chunk processing logic - Simplified worker goroutine code by moving chunk processing details to a dedicated function - Maintained existing error handling and species filtering behavior - Improved code modularity and readability in audio file processing
WalkthroughThe pull request introduces a comprehensive refactoring of the audio file processing logic in the Changes
Sequence DiagramsequenceDiagram
participant Main as processAudioFile
participant Workers as startWorkers
participant Chunk as processChunk
participant Progress as monitorProgress
participant Results as Result Collector
Main->>Workers: Initialize workers
Workers->>Chunk: Process audio chunks
Chunk-->>Results: Send processed notes
Main->>Progress: Start progress monitoring
Progress-->>Main: Update progress
Workers-->>Main: Return final results
Possibly related PRs
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
internal/analysis/file.go (1)
Line range hint
246-281
: Good use of goroutines for concurrent chunk processing.Implementing worker goroutines that process chunks in parallel is a sound approach. As a future enhancement, consider using a
sync.WaitGroup
or channel-based shutdown mechanism rather than a shareddoneChan
, to ensure all workers have completed gracefully before concluding. The current approach can be maintained, but a WaitGroup may help streamline shutdown and error handling logic.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/analysis/file.go
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (2)
internal/analysis/file.go (2)
283-287
: Struct looks solid.Defining the
audioChunk
type at the package level is a concise way to handle and pass chunk data. This approach simplifies the interface between functions.
Line range hint
289-429
: Validate partial results handling logic.The
processAudioFile
function retains “partial results” in case of errors, which is a helpful design. However, if multiple chunks fail sequentially, partial results might be inconsistent. Consider adding tests confirming that partial results are accurately recorded even under repeated errors, ensuring the logic is robust and that no chunk results are lost or duplicated.As a follow-up, you can automate verifying partial results with a script that simulates multiple chunk errors:
internal/analysis/file.go
Outdated
// processChunk handles the processing of a single audio chunk | ||
func processChunk(ctx context.Context, chunk audioChunk, settings *conf.Settings, | ||
resultChan chan<- []datastore.Note, errorChan chan<- error) error { | ||
|
||
var allNotes []datastore.Note | ||
notes, err := bn.ProcessChunk(chunk.Data, chunk.FilePosition) | ||
if err != nil { | ||
select { | ||
case errorChan <- err: | ||
default: | ||
} | ||
return err | ||
} | ||
|
||
// Create a single cancel function to coordinate shutdown | ||
var doneChanClosed sync.Once | ||
shutdown := func() { | ||
doneChanClosed.Do(func() { | ||
close(doneChan) | ||
}) | ||
// Filter notes based on included species list | ||
var filteredNotes []datastore.Note | ||
for i := range notes { | ||
if settings.IsSpeciesIncluded(notes[i].ScientificName) { | ||
filteredNotes = append(filteredNotes, notes[i]) | ||
} | ||
} | ||
defer shutdown() | ||
|
||
// Start worker goroutines for BirdNET analysis | ||
select { | ||
case <-ctx.Done(): | ||
select { | ||
case errorChan <- ctx.Err(): | ||
default: | ||
} | ||
return ctx.Err() | ||
case resultChan <- filteredNotes: | ||
return nil | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid silently dropping errors when errorChan
is full.
As written, when bn.ProcessChunk
returns an error, the code attempts to send it via errorChan
non-blockingly. If errorChan
is already full, the error is discarded. This could lead to missed critical errors. Consider using a blocking send or increasing the buffer size to ensure no errors are lost.
Below is a possible approach using a blocking send:
- select {
- case errorChan <- err:
- default:
- }
+ errorChan <- err
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// processChunk handles the processing of a single audio chunk | |
func processChunk(ctx context.Context, chunk audioChunk, settings *conf.Settings, | |
resultChan chan<- []datastore.Note, errorChan chan<- error) error { | |
var allNotes []datastore.Note | |
notes, err := bn.ProcessChunk(chunk.Data, chunk.FilePosition) | |
if err != nil { | |
select { | |
case errorChan <- err: | |
default: | |
} | |
return err | |
} | |
// Create a single cancel function to coordinate shutdown | |
var doneChanClosed sync.Once | |
shutdown := func() { | |
doneChanClosed.Do(func() { | |
close(doneChan) | |
}) | |
// Filter notes based on included species list | |
var filteredNotes []datastore.Note | |
for i := range notes { | |
if settings.IsSpeciesIncluded(notes[i].ScientificName) { | |
filteredNotes = append(filteredNotes, notes[i]) | |
} | |
} | |
defer shutdown() | |
// Start worker goroutines for BirdNET analysis | |
select { | |
case <-ctx.Done(): | |
select { | |
case errorChan <- ctx.Err(): | |
default: | |
} | |
return ctx.Err() | |
case resultChan <- filteredNotes: | |
return nil | |
} | |
} | |
// processChunk handles the processing of a single audio chunk | |
func processChunk(ctx context.Context, chunk audioChunk, settings *conf.Settings, | |
resultChan chan<- []datastore.Note, errorChan chan<- error) error { | |
notes, err := bn.ProcessChunk(chunk.Data, chunk.FilePosition) | |
if err != nil { | |
errorChan <- err | |
return err | |
} | |
// Filter notes based on included species list | |
var filteredNotes []datastore.Note | |
for i := range notes { | |
if settings.IsSpeciesIncluded(notes[i].ScientificName) { | |
filteredNotes = append(filteredNotes, notes[i]) | |
} | |
} | |
select { | |
case <-ctx.Done(): | |
select { | |
case errorChan <- ctx.Err(): | |
default: | |
} | |
return ctx.Err() | |
case resultChan <- filteredNotes: | |
return nil | |
} | |
} |
internal/analysis/file.go
Outdated
// monitorProgress starts a goroutine to monitor and display analysis progress | ||
func monitorProgress(ctx context.Context, doneChan chan struct{}, filename string, duration time.Duration, | ||
totalChunks int, chunkCount *int, startTime time.Time) { | ||
|
||
// Calculate audio duration | ||
duration := time.Duration(float64(audioInfo.TotalSamples) / float64(audioInfo.SampleRate) * float64(time.Second)) | ||
|
||
// Get filename and truncate if necessary | ||
filename := filepath.Base(settings.Input.Path) | ||
|
||
startTime := time.Now() | ||
chunkCount := 1 | ||
lastChunkCount := 0 | ||
lastProgressUpdate := startTime | ||
|
||
// Moving average window for chunks/sec calculation | ||
const windowSize = 10 // Number of samples to average | ||
chunkRates := make([]float64, 0, windowSize) | ||
|
||
// Set number of workers to 1 | ||
numWorkers := 1 | ||
ticker := time.NewTicker(100 * time.Millisecond) | ||
defer ticker.Stop() | ||
|
||
if settings.Debug { | ||
fmt.Printf("DEBUG: Starting analysis with %d total chunks and %d workers\n", totalChunks, numWorkers) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-doneChan: | ||
return | ||
case <-ticker.C: | ||
currentTime := time.Now() | ||
timeSinceLastUpdate := currentTime.Sub(lastProgressUpdate) | ||
|
||
// Calculate current chunk rate | ||
chunksProcessed := *chunkCount - lastChunkCount | ||
currentRate := float64(chunksProcessed) / timeSinceLastUpdate.Seconds() | ||
|
||
// Update moving average | ||
if len(chunkRates) >= windowSize { | ||
// Remove oldest value | ||
chunkRates = chunkRates[1:] | ||
} | ||
chunkRates = append(chunkRates, currentRate) | ||
|
||
// Calculate average rate | ||
var avgRate float64 | ||
if len(chunkRates) > 0 { | ||
sum := 0.0 | ||
for _, rate := range chunkRates { | ||
sum += rate | ||
} | ||
avgRate = sum / float64(len(chunkRates)) | ||
} | ||
|
||
// Update counters for next iteration | ||
lastChunkCount = *chunkCount | ||
lastProgressUpdate = currentTime | ||
|
||
// Get terminal width | ||
width, _, err := term.GetSize(int(os.Stdout.Fd())) | ||
if err != nil { | ||
width = 80 // Default to 80 columns if we can't get terminal width | ||
} | ||
|
||
// Format and print the progress line | ||
fmt.Print(formatProgressLine( | ||
filename, | ||
duration, | ||
*chunkCount, | ||
totalChunks, | ||
avgRate, | ||
birdnet.EstimateTimeRemaining(startTime, *chunkCount, totalChunks), | ||
width, | ||
)) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider synchronizing access to chunkCount
.
chunkCount
is written to in one goroutine (the result collector) and read from in another (monitorProgress
). This may cause a data race if multiple goroutines access chunkCount
concurrently without synchronization. Protect chunkCount
with a mutex, an atomic operation, or a channel-based approach to avoid potential race conditions.
Below is an example atomic-based approach:
- var chunkCount int = 1
+ import "sync/atomic"
+ var chunkCount int64 = 1
- chunkCount++
+ atomic.AddInt64(&chunkCount, 1)
- chunksProcessed := *chunkCount - lastChunkCount
+ chunkCountValue := atomic.LoadInt64(&chunkCount)
+ chunksProcessed := int(chunkCountValue) - lastChunkCount
Committable suggestion skipped: line range outside the PR's diff.
…ssing - Added atomic operations for thread-safe chunk count tracking - Enhanced error handling in chunk processing with context-aware blocking - Replaced integer chunk count with int64 for better concurrency support - Improved error propagation and context cancellation handling - Simplified progress monitoring with atomic load and increment operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
internal/analysis/file.go (3)
145-215
: Consider making the rate calculation window size configurable.The moving average window size for calculating chunks/sec is hardcoded. Consider making it configurable through settings to allow fine-tuning based on different use cases.
-const windowSize = 10 // Number of samples to average +// Add to conf.Settings struct +type AnalysisSettings struct { + RateWindowSize int // Number of samples for rate calculation +} +// Use from settings +windowSize := settings.Analysis.RateWindowSize
311-313
: Consider making worker count configurable.The number of workers is hardcoded to 1, which might not be optimal for all systems. Consider making it configurable through settings to allow scaling based on available resources.
-// Set number of workers to 1 -numWorkers := 1 +// Add to conf.Settings struct +type AnalysisSettings struct { + NumWorkers int // Number of concurrent workers +} +// Use from settings with a reasonable default +numWorkers := settings.Analysis.NumWorkers +if numWorkers <= 0 { + numWorkers = 1 +}
Line range hint
367-384
: Consider making timeout durations configurable.The timeout duration is hardcoded to 5 seconds in multiple places. Consider making it configurable to accommodate different processing environments and file sizes.
+// Add to conf.Settings struct +type AnalysisSettings struct { + ChunkTimeout time.Duration // Timeout for chunk processing +} +// Use from settings with a reasonable default +timeout := settings.Analysis.ChunkTimeout +if timeout <= 0 { + timeout = 5 * time.Second +} -case <-time.After(5 * time.Second): +case <-time.After(timeout):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/analysis/file.go
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (2)
internal/analysis/file.go (2)
288-292
: LGTM! Well-structured type declaration.The
audioChunk
type is appropriately placed at package level and has clear, well-typed fields.
217-249
: LGTM! Robust error handling and species filtering.The function implements proper error handling with blocking channel operations and appropriate context cancellation checks. The species filtering is well-integrated.
processChunk
function to encapsulate chunk processing logic