From eec4f3711e9296d02342c2986dd10c422fa8aae2 Mon Sep 17 00:00:00 2001 From: "Tomi P. Hakala" Date: Wed, 12 Feb 2025 22:24:08 +0200 Subject: [PATCH 1/4] refactor: Improve audio device selection and initialization robustness - Enhance audio device selection with comprehensive testing and validation - Add device initialization and start tests before selecting a capture source - Improve error handling and logging for audio device detection - Update capture function to pass selected device source directly - Add platform-specific backend selection for malgo context --- internal/myaudio/capture.go | 143 ++++++++++++++++++++++++------------ 1 file changed, 94 insertions(+), 49 deletions(-) diff --git a/internal/myaudio/capture.go b/internal/myaudio/capture.go index 805f0307..3f991085 100644 --- a/internal/myaudio/capture.go +++ b/internal/myaudio/capture.go @@ -269,7 +269,7 @@ func CaptureAudio(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restart // Initialize buffers for each audio source if len(settings.Realtime.RTSP.URLs) > 0 { for _, url := range settings.Realtime.RTSP.URLs { - abExists, cbExists := false, false //nolint:wastedassign // Need to initialize variables + var abExists, cbExists bool // Check if analysis buffer already exists abMutex.RLock() _, abExists = analysisBuffers[url] @@ -310,7 +310,15 @@ func CaptureAudio(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restart } if settings.Realtime.Audio.Source != "" { - abExists, cbExists := false, false //nolint:wastedassign // Need to initialize variables + // Try to select and test a capture device + selectedSource, err := selectCaptureSource(settings) + if err != nil { + log.Printf("❌ Audio device selection failed: %v", err) + log.Println("⚠️ Continuing without audio device capture. You can configure a working audio device through the web interface.") + return + } + + var abExists, cbExists bool // Check if analysis buffer exists abMutex.RLock() _, abExists = analysisBuffers["malgo"] @@ -346,17 +354,43 @@ func CaptureAudio(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restart // Device audio capture wg.Add(1) - go captureAudioMalgo(settings, wg, quitChan, restartChan, audioLevelChan) + go captureAudioMalgo(settings, selectedSource, wg, quitChan, restartChan, audioLevelChan) } } -// selectCaptureSource selects an appropriate capture device based on the provided settings and available device information. -// It prints available devices and returns the selected device and any error encountered. -func selectCaptureSource(settings *conf.Settings, infos []malgo.DeviceInfo) (captureSource, error) { +// selectCaptureSource selects and tests an appropriate capture device based on the provided settings. +// It prints available devices, tests the selected device, and returns the selected device and any error encountered. +func selectCaptureSource(settings *conf.Settings) (captureSource, error) { + // Initialize malgo context + var backend malgo.Backend + switch runtime.GOOS { + case "linux": + backend = malgo.BackendAlsa + case "windows": + backend = malgo.BackendWasapi + case "darwin": + backend = malgo.BackendCoreaudio + } + + malgoCtx, err := malgo.InitContext([]malgo.Backend{backend}, malgo.ContextConfig{}, func(message string) { + if settings.Debug { + fmt.Print(message) + } + }) + if err != nil { + return captureSource{}, fmt.Errorf("audio context initialization failed: %w", err) + } + defer malgoCtx.Uninit() //nolint:errcheck // We handle errors in the caller + fmt.Println("Available Capture Sources:") var selectedSource captureSource - var deviceFound bool + + // Get list of capture sources + infos, err := malgoCtx.Devices(malgo.Capture) + if err != nil { + return captureSource{}, fmt.Errorf("failed to get capture devices: %w", err) + } // If no devices are available, return appropriate error if len(infos) == 0 { @@ -384,26 +418,47 @@ func selectCaptureSource(settings *conf.Settings, infos []malgo.DeviceInfo) (cap ID: decodedID, Pointer: infos[i].ID.Pointer(), } - deviceFound = true - } - fmt.Println(output) - } + // Try to actually initialize and test the device + deviceConfig := malgo.DefaultDeviceConfig(malgo.Capture) + deviceConfig.Capture.Format = malgo.FormatS16 + deviceConfig.Capture.Channels = conf.NumChannels + deviceConfig.Capture.DeviceID = selectedSource.Pointer + deviceConfig.SampleRate = conf.SampleRate + deviceConfig.Alsa.NoMMap = 1 - // Check if running in container and only null device is available - if conf.RunningInContainer() && len(infos) == 1 && strings.Contains(infos[0].Name(), "Discard all samples") { - return captureSource{}, fmt.Errorf( - "no audio devices available in container\n" + - "Please map host audio devices by running docker with: --device /dev/snd\n" + - "Instructions for running BirdNET-Go in Docker are at https://github.com/tphakala/birdnet-go/blob/main/doc/installation.md") - } + // Try to initialize the device + testDevice, err := malgo.InitDevice(malgoCtx.Context, deviceConfig, malgo.DeviceCallbacks{}) + if err != nil { + if settings.Debug { + log.Printf("❌ Device initialization test failed for %s: %v", selectedSource.Name, err) + } + fmt.Printf("%s (❌ initialization failed)\n", output) + continue + } + + // Try to start the device + if err := testDevice.Start(); err != nil { + if settings.Debug { + log.Printf("❌ Device start test failed for %s: %v", selectedSource.Name, err) + } + testDevice.Uninit() + fmt.Printf("%s (❌ start failed)\n", output) + continue + } + + // Stop and uninit the test device + _ = testDevice.Stop() + testDevice.Uninit() + + fmt.Printf("%s (✅ working)\n", output) + return selectedSource, nil + } - // If no device was found, return error with more descriptive message - if !deviceFound { - return captureSource{}, fmt.Errorf("no suitable capture source found for device setting '%s'", settings.Realtime.Audio.Source) + fmt.Println(output) } - return selectedSource, nil + return captureSource{}, fmt.Errorf("no working capture device found matching '%s'", settings.Realtime.Audio.Source) } // matchesDeviceSettings checks if the device matches the settings specified by the user. @@ -425,9 +480,8 @@ func hexToASCII(hexStr string) (string, error) { return string(bytes), nil } -func captureAudioMalgo(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restartChan chan struct{}, audioLevelChan chan AudioLevelData) { +func captureAudioMalgo(settings *conf.Settings, source captureSource, wg *sync.WaitGroup, quitChan, restartChan chan struct{}, audioLevelChan chan AudioLevelData) { defer wg.Done() // Ensure this is called when the goroutine exits - var device *malgo.Device if settings.Debug { fmt.Println("Initializing context") @@ -453,36 +507,22 @@ func captureAudioMalgo(settings *conf.Settings, wg *sync.WaitGroup, quitChan, re color.New(color.FgHiYellow).Fprintln(os.Stderr, "❌ context init failed:", err) return } - defer malgoCtx.Uninit() //nolint:errcheck // This is a defer, avoid warning about error return value + defer malgoCtx.Uninit() //nolint:errcheck // We handle errors in the caller deviceConfig := malgo.DefaultDeviceConfig(malgo.Capture) deviceConfig.Capture.Format = malgo.FormatS16 deviceConfig.Capture.Channels = conf.NumChannels deviceConfig.SampleRate = conf.SampleRate deviceConfig.Alsa.NoMMap = 1 - - var infos []malgo.DeviceInfo - - // Get list of capture sources - infos, err = malgoCtx.Devices(malgo.Capture) - if err != nil { - color.New(color.FgHiYellow).Fprintln(os.Stderr, "❌ Error getting capture devices:", err) - return - } - - // Select the capture source based on the settings - captureSource, err := selectCaptureSource(settings, infos) - if err != nil { - color.New(color.FgHiYellow).Fprintln(os.Stderr, "❌ Error selecting capture source:", err) - return - } - deviceConfig.Capture.DeviceID = captureSource.Pointer + deviceConfig.Capture.DeviceID = source.Pointer // Initialize the filter chain if err := InitializeFilterChain(settings); err != nil { log.Printf("❌ Error initializing filter chain: %v", err) } + var captureDevice *malgo.Device + onReceiveFrames := func(pSample2, pSamples []byte, framecount uint32) { // Apply audio EQ filters if enabled if settings.Realtime.Audio.Equalizer.Enabled { @@ -502,7 +542,7 @@ func captureAudioMalgo(settings *conf.Settings, wg *sync.WaitGroup, quitChan, re } // Calculate audio level - audioLevelData := calculateAudioLevel(pSamples, "malgo", captureSource.Name) + audioLevelData := calculateAudioLevel(pSamples, "malgo", source.Name) // Send level to channel (non-blocking) select { @@ -530,12 +570,17 @@ func captureAudioMalgo(settings *conf.Settings, wg *sync.WaitGroup, quitChan, re if settings.Debug { fmt.Println("🔄 Attempting to restart audio device.") } - err := device.Start() + err := captureDevice.Start() if err != nil { log.Printf("❌ Failed to restart audio device: %v", err) log.Println("🔄 Attempting full audio context restart in 1 second.") time.Sleep(1 * time.Second) - restartChan <- struct{}{} + select { + case restartChan <- struct{}{}: + // Successfully sent restart signal + case <-quitChan: + // Application is shutting down, don't send restart signal + } } else if settings.Debug { fmt.Println("🔄 Audio device restarted successfully.") } @@ -550,7 +595,7 @@ func captureAudioMalgo(settings *conf.Settings, wg *sync.WaitGroup, quitChan, re } // Initialize the capture device - device, err = malgo.InitDevice(malgoCtx.Context, deviceConfig, deviceCallbacks) + captureDevice, err = malgo.InitDevice(malgoCtx.Context, deviceConfig, deviceCallbacks) if err != nil { color.New(color.FgHiYellow).Fprintln(os.Stderr, "❌ Device initialization failed:", err) conf.PrintUserInfo() @@ -560,18 +605,18 @@ func captureAudioMalgo(settings *conf.Settings, wg *sync.WaitGroup, quitChan, re if settings.Debug { fmt.Println("Starting device") } - err = device.Start() + err = captureDevice.Start() if err != nil { color.New(color.FgHiYellow).Fprintln(os.Stderr, "❌ Device start failed:", err) return } - defer device.Stop() //nolint:errcheck // This is a defer, avoid warning about error return value + defer captureDevice.Stop() //nolint:errcheck // We handle errors in the caller if settings.Debug { fmt.Println("Device started") } // print audio device we are attached to - color.New(color.FgHiGreen).Printf("Listening on source: %s (%s)\n", captureSource.Name, captureSource.ID) + color.New(color.FgHiGreen).Printf("Listening on source: %s (%s)\n", source.Name, source.ID) // Now, instead of directly waiting on QuitChannel, // check if it's closed in a non-blocking select. From 994cd0f1571158ae158e78a1163a900c19d5897a Mon Sep 17 00:00:00 2001 From: "Tomi P. Hakala" Date: Wed, 12 Feb 2025 22:24:19 +0200 Subject: [PATCH 2/4] refactor: Improve buffer initialization error handling and control monitor lifecycle - Modify buffer initialization to collect and report multiple errors - Update control monitor to use a dedicated done channel for cleaner synchronization - Enhance error logging for buffer initialization with more informative messages - Prevent premature function exit when buffer initialization partially fails --- internal/analysis/realtime.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/analysis/realtime.go b/internal/analysis/realtime.go index ce09ba08..da102ed2 100644 --- a/internal/analysis/realtime.go +++ b/internal/analysis/realtime.go @@ -96,13 +96,17 @@ func RealtimeAnalysis(settings *conf.Settings, notificationChan chan handlers.No sources = settings.Realtime.RTSP.URLs } if settings.Realtime.Audio.Source != "" { + // We'll add malgo to sources only if device initialization succeeds + // This will be handled in CaptureAudio sources = append(sources, "malgo") } // Initialize buffers for all audio sources if err := initializeBuffers(sources); err != nil { - log.Printf("❌ %v", err) - return err + // If buffer initialization fails, log the error but continue + // Some sources might still work + log.Printf("⚠️ Error initializing buffers: %v", err) + log.Println("⚠️ Some audio sources might not be available.") } } else { log.Println("⚠️ Starting without active audio sources. You can configure audio devices or RTSP streams through the web interface.") @@ -353,9 +357,10 @@ func initBirdImageCache(ds datastore.Interface, metrics *telemetry.Metrics) *ima // startControlMonitor handles various control signals for realtime analysis mode func startControlMonitor(wg *sync.WaitGroup, controlChan chan string, quitChan, restartChan chan struct{}, notificationChan chan handlers.Notification, bufferManager *BufferManager) { - wg.Add(1) + // Create a channel to signal when the control monitor should stop + monitorDone := make(chan struct{}) + go func() { - defer wg.Done() for { select { case signal := <-controlChan: @@ -430,24 +435,32 @@ func startControlMonitor(wg *sync.WaitGroup, controlChan chan string, quitChan, log.Printf("Received unknown control signal: %v", signal) } case <-quitChan: + close(monitorDone) return } } }() + + // Wait for the monitor to be done before returning + <-monitorDone } // initializeBuffers handles initialization of all audio-related buffers func initializeBuffers(sources []string) error { + var initErrors []string + // Initialize analysis buffers if err := myaudio.InitAnalysisBuffers(conf.BufferSize*3, sources); err != nil { // 3x buffer size to avoid underruns - return fmt.Errorf("failed to initialize analysis buffers: %w", err) + initErrors = append(initErrors, fmt.Sprintf("failed to initialize analysis buffers: %v", err)) } // Initialize capture buffers if err := myaudio.InitCaptureBuffers(60, conf.SampleRate, conf.BitDepth/8, sources); err != nil { - // Cleanup analysis buffers on failure - myaudio.CleanupAnalysisBuffers() - return fmt.Errorf("failed to initialize capture buffers: %w", err) + initErrors = append(initErrors, fmt.Sprintf("failed to initialize capture buffers: %v", err)) + } + + if len(initErrors) > 0 { + return fmt.Errorf("buffer initialization errors: %s", strings.Join(initErrors, "; ")) } return nil From 60bbeb43f904f42e8e4afbe589dcb00e4c6ab95c Mon Sep 17 00:00:00 2001 From: "Tomi P. Hakala" Date: Fri, 14 Feb 2025 18:06:42 +0200 Subject: [PATCH 3/4] refactor: Simplify control monitor synchronization - Remove unnecessary monitorDone channel in startControlMonitor - Streamline goroutine exit mechanism - Remove redundant channel closing and waiting logic --- internal/analysis/realtime.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/internal/analysis/realtime.go b/internal/analysis/realtime.go index da102ed2..e444f904 100644 --- a/internal/analysis/realtime.go +++ b/internal/analysis/realtime.go @@ -357,9 +357,6 @@ func initBirdImageCache(ds datastore.Interface, metrics *telemetry.Metrics) *ima // startControlMonitor handles various control signals for realtime analysis mode func startControlMonitor(wg *sync.WaitGroup, controlChan chan string, quitChan, restartChan chan struct{}, notificationChan chan handlers.Notification, bufferManager *BufferManager) { - // Create a channel to signal when the control monitor should stop - monitorDone := make(chan struct{}) - go func() { for { select { @@ -435,14 +432,10 @@ func startControlMonitor(wg *sync.WaitGroup, controlChan chan string, quitChan, log.Printf("Received unknown control signal: %v", signal) } case <-quitChan: - close(monitorDone) return } } }() - - // Wait for the monitor to be done before returning - <-monitorDone } // initializeBuffers handles initialization of all audio-related buffers From f3f9071d1bfcb02da3ce27dd6801d8e3d6923d39 Mon Sep 17 00:00:00 2001 From: "Tomi P. Hakala" Date: Fri, 14 Feb 2025 18:38:34 +0200 Subject: [PATCH 4/4] refactor: Simplify audio buffer initialization with new helper function - Introduce initializeBuffersForSource to centralize buffer setup logic - Improve error handling and cleanup for RTSP and local audio sources - Reduce code duplication in CaptureAudio function - Enhance error logging for buffer initialization failures --- internal/myaudio/capture.go | 121 ++++++++++++++---------------------- 1 file changed, 47 insertions(+), 74 deletions(-) diff --git a/internal/myaudio/capture.go b/internal/myaudio/capture.go index 3f991085..cdc8a125 100644 --- a/internal/myaudio/capture.go +++ b/internal/myaudio/capture.go @@ -260,47 +260,55 @@ func ReconfigureRTSPStreams(settings *conf.Settings, wg *sync.WaitGroup, quitCha } } +// initializeBuffersForSource handles the initialization of analysis and capture buffers for a given source +func initializeBuffersForSource(sourceID string) error { + var abExists, cbExists bool + + // Check if analysis buffer exists + abMutex.RLock() + _, abExists = analysisBuffers[sourceID] + abMutex.RUnlock() + + // Check if capture buffer exists + cbMutex.RLock() + _, cbExists = captureBuffers[sourceID] + cbMutex.RUnlock() + + // Initialize analysis buffer if it doesn't exist + if !abExists { + if err := AllocateAnalysisBuffer(conf.BufferSize*3, sourceID); err != nil { + return fmt.Errorf("failed to initialize analysis buffer: %w", err) + } + } + + // Initialize capture buffer if it doesn't exist + if !cbExists { + if err := AllocateCaptureBuffer(60, conf.SampleRate, conf.BitDepth/8, sourceID); err != nil { + // Clean up the analysis buffer if we just created it and capture buffer init fails + if !abExists { + if cleanupErr := RemoveAnalysisBuffer(sourceID); cleanupErr != nil { + log.Printf("❌ Failed to cleanup analysis buffer after capture buffer init failure for %s: %v", sourceID, cleanupErr) + } + } + return fmt.Errorf("failed to initialize capture buffer: %w", err) + } + } + + return nil +} + func CaptureAudio(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restartChan chan struct{}, audioLevelChan chan AudioLevelData) { - // If no RTSP URLs and no audio device configured, log a friendly message and return + // If no RTSP URLs and no audio device configured, return early if len(settings.Realtime.RTSP.URLs) == 0 && settings.Realtime.Audio.Source == "" { return } - // Initialize buffers for each audio source + // Initialize buffers for RTSP sources if len(settings.Realtime.RTSP.URLs) > 0 { for _, url := range settings.Realtime.RTSP.URLs { - var abExists, cbExists bool - // Check if analysis buffer already exists - abMutex.RLock() - _, abExists = analysisBuffers[url] - abMutex.RUnlock() - - // Check if capture buffer exists - cbMutex.RLock() - _, cbExists = captureBuffers[url] - cbMutex.RUnlock() - - // Initialize analysis buffer if it doesn't exist - if !abExists { - if err := AllocateAnalysisBuffer(conf.BufferSize*3, url); err != nil { - log.Printf("❌ Failed to initialize analysis buffer for %s: %v", url, err) - continue - } - } - - // Initialize capture buffer if it doesn't exist - if !cbExists { - if err := AllocateCaptureBuffer(60, conf.SampleRate, conf.BitDepth/8, url); err != nil { - // Clean up the ring buffer if audio buffer init fails and we just created it - if !cbExists { - err := RemoveCaptureBuffer(url) - if err != nil { - log.Printf("❌ Failed to remove capture buffer for %s: %v", url, err) - } - } - log.Printf("❌ Failed to initialize capture buffer for %s: %v", url, err) - continue - } + if err := initializeBuffersForSource(url); err != nil { + log.Printf("❌ Failed to initialize buffers for RTSP source %s: %v", url, err) + continue } wg.Add(1) @@ -309,6 +317,7 @@ func CaptureAudio(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restart } } + // Handle local audio device if configured if settings.Realtime.Audio.Source != "" { // Try to select and test a capture device selectedSource, err := selectCaptureSource(settings) @@ -318,38 +327,10 @@ func CaptureAudio(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restart return } - var abExists, cbExists bool - // Check if analysis buffer exists - abMutex.RLock() - _, abExists = analysisBuffers["malgo"] - abMutex.RUnlock() - - // Check if capture buffer exists - cbMutex.RLock() - _, cbExists = captureBuffers["malgo"] - cbMutex.RUnlock() - - // Initialize analysis buffer if it doesn't exist - if !abExists { - if err := AllocateAnalysisBuffer(conf.BufferSize*3, "malgo"); err != nil { - log.Printf("❌ Failed to initialize analysis buffer for device capture: %v", err) - return - } - } - - // Initialize capture buffer if it doesn't exist - if !cbExists { - if err := AllocateCaptureBuffer(60, conf.SampleRate, conf.BitDepth/8, "malgo"); err != nil { - // Clean up the ring buffer if audio buffer init fails and we just created it - if !cbExists { - err := RemoveCaptureBuffer("malgo") - if err != nil { - log.Printf("❌ Failed to remove capture buffer for device capture: %v", err) - } - } - log.Printf("❌ Failed to initialize capture buffer for device capture: %v", err) - return - } + // Initialize buffers for local audio device + if err := initializeBuffersForSource("malgo"); err != nil { + log.Printf("❌ Failed to initialize buffers for device capture: %v", err) + return } // Device audio capture @@ -721,11 +702,3 @@ func calculateAudioLevel(samples []byte, source, name string) AudioLevelData { Name: name, } } - -// abs returns the absolute value of a 16-bit integer -func abs(x int16) int16 { - if x < 0 { - return -x - } - return x -}