Skip to content

Commit

Permalink
Merge pull request #437 from tphakala/fix-startup-without-audio-device
Browse files Browse the repository at this point in the history
refactor: Enhance audio source configuration
  • Loading branch information
tphakala authored Feb 9, 2025
2 parents c6a0e8a + a6844e8 commit d2e0219
Show file tree
Hide file tree
Showing 11 changed files with 1,369 additions and 501 deletions.
86 changes: 86 additions & 0 deletions internal/analysis/buffer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package analysis

import (
"sync"

"github.com/tphakala/birdnet-go/internal/birdnet"
"github.com/tphakala/birdnet-go/internal/myaudio"
)

// BufferManager handles the lifecycle of analysis buffer monitors
type BufferManager struct {
monitors sync.Map
bn *birdnet.BirdNET
quitChan chan struct{}
wg *sync.WaitGroup
}

// NewBufferManager creates a new buffer manager
func NewBufferManager(bn *birdnet.BirdNET, quitChan chan struct{}, wg *sync.WaitGroup) *BufferManager {
return &BufferManager{
bn: bn,
quitChan: quitChan,
wg: wg,
}
}

// AddMonitor safely adds a new analysis buffer monitor for a source
func (m *BufferManager) AddMonitor(source string) {
// Check if monitor already exists
if _, exists := m.monitors.Load(source); exists {
return
}

// Create a monitor-specific quit channel
monitorQuit := make(chan struct{})
m.monitors.Store(source, monitorQuit)

// Start the monitor
m.wg.Add(1)
go func() {
defer m.wg.Done()
myaudio.AnalysisBufferMonitor(m.wg, m.bn, monitorQuit, source)
}()
}

// RemoveMonitor safely stops and removes a monitor for a source
func (m *BufferManager) RemoveMonitor(source string) {
// Get the monitor's quit channel
if quitChan, exists := m.monitors.Load(source); exists {
// Signal the monitor to stop
close(quitChan.(chan struct{}))
// Remove from the map
m.monitors.Delete(source)
}
}

// RemoveAllMonitors stops all running monitors
func (m *BufferManager) RemoveAllMonitors() {
m.monitors.Range(func(key, value interface{}) bool {
m.RemoveMonitor(key.(string))
return true
})
}

// UpdateMonitors ensures monitors are running for all given sources
func (m *BufferManager) UpdateMonitors(sources []string) {
// Track existing monitors that should be removed
toRemove := make(map[string]bool)
m.monitors.Range(func(key, _ interface{}) bool {
toRemove[key.(string)] = true
return true
})

// Add new monitors and mark existing ones as still needed
for _, source := range sources {
if source != "" {
delete(toRemove, source)
m.AddMonitor(source)
}
}

// Remove monitors that are no longer needed
for source := range toRemove {
m.RemoveMonitor(source)
}
}
98 changes: 74 additions & 24 deletions internal/analysis/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -42,13 +43,15 @@ func RealtimeAnalysis(settings *conf.Settings, notificationChan chan handlers.No
// Get system details with golps
info, err := host.Info()
if err != nil {
fmt.Printf("Error retrieving host info: %v\n", err)
fmt.Printf("Error retrieving host info: %v\n", err)
}

var hwModel string
// Print SBC hardware details
if conf.IsLinuxArm64() {
hwModel = conf.GetBoardModel()
// remove possible new line from hwModel
hwModel = strings.TrimSpace(hwModel)
} else {
hwModel = "unknown"
}
Expand Down Expand Up @@ -86,24 +89,25 @@ func RealtimeAnalysis(settings *conf.Settings, notificationChan chan handlers.No
// Initialize audioLevelChan, used to visualize audio levels on web ui
audioLevelChan = make(chan myaudio.AudioLevelData, 100)

// Initialize ring buffers for each audio source
// Prepare sources list
var sources []string
if len(settings.Realtime.RTSP.URLs) > 0 {
sources = settings.Realtime.RTSP.URLs
// DEBUG
//log.Println("RTSP sources configured, using RTSP for audio capture")
if len(settings.Realtime.RTSP.URLs) > 0 || settings.Realtime.Audio.Source != "" {
if len(settings.Realtime.RTSP.URLs) > 0 {
sources = settings.Realtime.RTSP.URLs
}
if settings.Realtime.Audio.Source != "" {
sources = append(sources, "malgo")
}

// Initialize buffers for all audio sources
if err := initializeBuffers(sources); err != nil {
log.Printf("❌ %v", err)
return err
}
} else {
// DEBUG
//log.Println("No RTSP sources configured, using malgo for audio capture")
sources = []string{"malgo"}
log.Println("⚠️ Starting without active audio sources. You can configure audio devices or RTSP streams through the web interface.")
}

// Initialize ring buffers for each audio source
myaudio.InitRingBuffers(conf.BufferSize*3, sources) // 3x buffer size to avoid underruns

// Audio buffer for extended audio clip capture
myaudio.InitAudioBuffers(60, conf.SampleRate, conf.BitDepth/8, sources)

// init detection queue
queue.Init(5, 5)

Expand Down Expand Up @@ -131,10 +135,14 @@ func RealtimeAnalysis(settings *conf.Settings, notificationChan chan handlers.No
// Initialize the wait group to wait for all goroutines to finish
var wg sync.WaitGroup

// Start buffer monitors for each audio source
for _, source := range sources {
wg.Add(1)
go myaudio.BufferMonitor(&wg, bn, quitChan, source)
// Initialize the buffer manager
bufferManager := NewBufferManager(bn, quitChan, &wg)

// Start buffer monitors for each audio source only if we have active sources
if len(settings.Realtime.RTSP.URLs) > 0 || settings.Realtime.Audio.Source != "" {
bufferManager.UpdateMonitors(sources)
} else {
log.Println("⚠️ Starting without active audio sources. You can configure audio devices or RTSP streams through the web interface.")
}

// start audio capture
Expand All @@ -154,7 +162,7 @@ func RealtimeAnalysis(settings *conf.Settings, notificationChan chan handlers.No
startTelemetryEndpoint(&wg, settings, metrics, quitChan)

// start control monitor for hot reloads
startControlMonitor(&wg, controlChan, quitChan, notificationChan)
startControlMonitor(&wg, controlChan, quitChan, restartChan, notificationChan, bufferManager)

// start quit signal monitor
monitorCtrlC(quitChan)
Expand All @@ -165,6 +173,8 @@ func RealtimeAnalysis(settings *conf.Settings, notificationChan chan handlers.No
case <-quitChan:
// Close controlChan to signal that no restart attempts should be made.
close(controlChan)
// Stop all analysis buffer monitors
bufferManager.RemoveAllMonitors()
// Wait for all goroutines to finish.
wg.Wait()
// Delete the BirdNET interpreter.
Expand All @@ -174,11 +184,10 @@ func RealtimeAnalysis(settings *conf.Settings, notificationChan chan handlers.No

case <-restartChan:
// Handle the restart signal.
fmt.Println("Restarting audio capture")
fmt.Println("🔄 Restarting audio capture")
startAudioCapture(&wg, settings, quitChan, restartChan, audioLevelChan)
}
}

}

// startAudioCapture initializes and starts the audio capture routine in a new goroutine.
Expand All @@ -198,7 +207,7 @@ func startWeatherPolling(wg *sync.WaitGroup, settings *conf.Settings, dataStore
// Create new weather service
weatherService, err := weather.NewService(settings, dataStore)
if err != nil {
log.Printf("Failed to initialize weather service: %v", err)
log.Printf("⛈️ Failed to initialize weather service: %v", err)
return
}

Expand Down Expand Up @@ -343,7 +352,7 @@ func initBirdImageCache(ds datastore.Interface, metrics *telemetry.Metrics) *ima
}

// startControlMonitor handles various control signals for realtime analysis mode
func startControlMonitor(wg *sync.WaitGroup, controlChan chan string, quitChan chan struct{}, notificationChan chan handlers.Notification) {
func startControlMonitor(wg *sync.WaitGroup, controlChan chan string, quitChan, restartChan chan struct{}, notificationChan chan handlers.Notification, bufferManager *BufferManager) {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -393,6 +402,30 @@ func startControlMonitor(wg *sync.WaitGroup, controlChan chan string, quitChan c
}
}
}
case "reconfigure_rtsp_sources":
log.Printf("\033[32m🔄 Reconfiguring RTSP sources...\033[0m")
settings := conf.Setting()

// Prepare the list of active sources
var sources []string
if len(settings.Realtime.RTSP.URLs) > 0 {
sources = append(sources, settings.Realtime.RTSP.URLs...)
}
if settings.Realtime.Audio.Source != "" {
sources = append(sources, "malgo")
}

// Update the analysis buffer monitors
bufferManager.UpdateMonitors(sources)

// Reconfigure RTSP streams
myaudio.ReconfigureRTSPStreams(settings, wg, quitChan, restartChan, audioLevelChan)

log.Printf("\033[32m✅ RTSP sources reconfigured successfully\033[0m")
notificationChan <- handlers.Notification{
Message: "Audio capture reconfigured successfully",
Type: "success",
}
default:
log.Printf("Received unknown control signal: %v", signal)
}
Expand All @@ -402,3 +435,20 @@ func startControlMonitor(wg *sync.WaitGroup, controlChan chan string, quitChan c
}
}()
}

// initializeBuffers handles initialization of all audio-related buffers
func initializeBuffers(sources []string) error {
// Initialize analysis buffers
if err := myaudio.InitAnalysisBuffers(conf.BufferSize*3, sources); err != nil { // 3x buffer size to avoid underruns
return fmt.Errorf("failed to initialize analysis buffers: %w", err)
}

// Initialize capture buffers
if err := myaudio.InitCaptureBuffers(60, conf.SampleRate, conf.BitDepth/8, sources); err != nil {
// Cleanup analysis buffers on failure
myaudio.CleanupAnalysisBuffers()
return fmt.Errorf("failed to initialize capture buffers: %w", err)
}

return nil
}
24 changes: 24 additions & 0 deletions internal/httpcontroller/handlers/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ func (h *Handlers) SaveSettings(c echo.Context) error {
h.controlChan <- "rebuild_range_filter"
}

// Check if RTSP settings have changed
if rtspSettingsChanged(&oldSettings, settings) {
h.SSE.SendNotification(Notification{
Message: "Reconfiguring RTSP sources...",
Type: "info",
})
h.controlChan <- "reconfigure_rtsp_sources"
}

// Check the authentication settings and update if needed
h.updateAuthenticationSettings(settings)

Expand Down Expand Up @@ -626,3 +635,18 @@ func birdnetSettingsChanged(oldSettings, currentSettings *conf.Settings) bool {

return false
}

// rtspSettingsChanged checks if RTSP settings have been modified
func rtspSettingsChanged(oldSettings, currentSettings *conf.Settings) bool {
// Check for changes in RTSP transport protocol
if oldSettings.Realtime.RTSP.Transport != currentSettings.Realtime.RTSP.Transport {
return true
}

// Check for changes in RTSP URLs
if !reflect.DeepEqual(oldSettings.Realtime.RTSP.URLs, currentSettings.Realtime.RTSP.URLs) {
return true
}

return false
}
Loading

0 comments on commit d2e0219

Please sign in to comment.