From f4a6ab179ecffc004e855a5142bf154b0db56893 Mon Sep 17 00:00:00 2001 From: Tomi Hakala Date: Sun, 21 Apr 2024 15:34:11 +0300 Subject: [PATCH] refactor(telemetry): Move Prometheus metrics to dedicated package and add pprof debug --- cmd/realtime/realtime.go | 3 +- internal/analysis/processor/processor.go | 24 ++++----- internal/analysis/realtime.go | 46 ++++++++++------- internal/conf/config.go | 26 ++++++---- internal/telemetry/debug.go | 24 +++++++++ internal/telemetry/endpoint.go | 63 ++++++++++++++++++++++++ internal/telemetry/metrics.go | 47 ++++++++++++++++++ 7 files changed, 192 insertions(+), 41 deletions(-) create mode 100644 internal/telemetry/debug.go create mode 100644 internal/telemetry/endpoint.go create mode 100644 internal/telemetry/metrics.go diff --git a/cmd/realtime/realtime.go b/cmd/realtime/realtime.go index d8914a1e..3e1860f7 100644 --- a/cmd/realtime/realtime.go +++ b/cmd/realtime/realtime.go @@ -37,7 +37,8 @@ func setupFlags(cmd *cobra.Command, settings *conf.Settings) error { cmd.Flags().BoolVar(&settings.Realtime.ProcessingTime, "processingtime", viper.GetBool("realtime.processingtime"), "Report processing time for each detection") cmd.Flags().StringVar(&settings.Realtime.RTSP.Url, "rtsp", viper.GetString("realtime.rtsp.url"), "URL of RTSP audio stream to capture") cmd.Flags().StringVar(&settings.Realtime.RTSP.Transport, "rtsptransport", viper.GetString("realtime.rtsp.transport"), "RTSP transport (tcp/udp)") - cmd.Flags().BoolVar(&settings.Realtime.Prometheus, "prometheus", viper.GetBool("realtime.prometheus"), "Enable prometheus metrics") + cmd.Flags().BoolVar(&settings.Realtime.Telemetry.Enabled, "telemetry", viper.GetBool("realtime.telemetry.enabled"), "Enable Prometheus telemetry endpoint") + cmd.Flags().StringVar(&settings.Realtime.Telemetry.Listen, "listen", viper.GetString("realtime.telemetry.listen"), "Listen address and port of telemetry endpoint") // Bind flags to the viper settings if err := viper.BindPFlags(cmd.Flags()); err != nil { diff --git a/internal/analysis/processor/processor.go b/internal/analysis/processor/processor.go index 11928586..78f1afb2 100644 --- a/internal/analysis/processor/processor.go +++ b/internal/analysis/processor/processor.go @@ -15,6 +15,7 @@ import ( "github.com/tphakala/birdnet-go/internal/mqtt" "github.com/tphakala/birdnet-go/internal/myaudio" "github.com/tphakala/birdnet-go/internal/observation" + "github.com/tphakala/birdnet-go/internal/telemetry" ) type Processor struct { @@ -31,6 +32,7 @@ type Processor struct { AudioBuffer *myaudio.AudioBuffer LastDogDetection time.Time // keep track of dog barks to filter out false positive owl detections LastHumanDetection time.Time // keep track of human vocal for privacy filtering + Metrics *telemetry.Metrics } type Detections struct { @@ -58,14 +60,15 @@ var PendingDetections map[string]PendingDetection = make(map[string]PendingDetec // ensuring thread safety when the map is accessed or modified by concurrent goroutines. var mutex sync.Mutex -func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, audioBuffer *myaudio.AudioBuffer) *Processor { +func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, audioBuffer *myaudio.AudioBuffer, metrics *telemetry.Metrics) *Processor { p := &Processor{ - Settings: settings, - Ds: ds, - Bn: bn, - EventTracker: NewEventTracker(), - IncludedSpecies: new([]string), - AudioBuffer: audioBuffer, + Settings: settings, // BirdNET-Go Settings struct + Ds: ds, // Datastore + Bn: bn, // BirdNET analyzer + EventTracker: NewEventTracker(), // Duplicate event tracker + IncludedSpecies: new([]string), // Included species list + AudioBuffer: audioBuffer, // Audio buffer for audio export + Metrics: metrics, // Prometheus metrics struct } // Start the detection processor @@ -329,10 +332,9 @@ func (p *Processor) pendingDetectionsFlusher() { // Detection is now processed, remove it from pending detections map. delete(PendingDetections, species) - // Update prometheus detection counter - if p.Settings.Realtime.Prometheus { - p.Settings.Realtime.PrometheusDetectionCounter. - WithLabelValues(item.Detection.Note.CommonName).Inc() + // Update Prometheus metrics detection counter + if p.Settings.Realtime.Telemetry.Enabled { + p.Metrics.IncrementDetectionCounter(item.Detection.Note.CommonName) } } } diff --git a/internal/analysis/realtime.go b/internal/analysis/realtime.go index 4aa613db..9cd2edf0 100644 --- a/internal/analysis/realtime.go +++ b/internal/analysis/realtime.go @@ -3,15 +3,12 @@ package analysis import ( "fmt" "log" - "net/http" "os" "os/signal" "sync" "syscall" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/tphakala/birdnet-go/internal/analysis/processor" "github.com/tphakala/birdnet-go/internal/analysis/queue" "github.com/tphakala/birdnet-go/internal/birdnet" @@ -19,6 +16,7 @@ import ( "github.com/tphakala/birdnet-go/internal/datastore" "github.com/tphakala/birdnet-go/internal/httpcontroller" "github.com/tphakala/birdnet-go/internal/myaudio" + "github.com/tphakala/birdnet-go/internal/telemetry" ) const ( @@ -28,7 +26,6 @@ const ( // RealtimeAnalysis initiates the BirdNET Analyzer in real-time mode and waits for a termination signal. func RealtimeAnalysis(settings *conf.Settings) error { - // Initialize the BirdNET interpreter. bn, err := birdnet.NewBirdNET(settings) if err != nil { @@ -48,20 +45,6 @@ func RealtimeAnalysis(settings *conf.Settings) error { // Initialize database access. dataStore := datastore.New(settings) - // Initialize Prometheus (if enabled) - if settings.Realtime.Prometheus { - settings.Realtime.PrometheusDetectionCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "birdnet_detections", - Help: "How many BirdNET detections partitioned by common name.", - }, - []string{"name"}, - ) - prometheus.MustRegister(settings.Realtime.PrometheusDetectionCounter) - http.Handle("/metrics", promhttp.Handler()) - go http.ListenAndServe(":2112", nil) - } - // Open a connection to the database and handle possible errors. if err := dataStore.Open(); err != nil { //logger.Error("main", "Failed to open database: %v", err) @@ -88,8 +71,14 @@ func RealtimeAnalysis(settings *conf.Settings) error { // init detection queue queue.Init(5, 5) + // Initialize Prometheus metrics manager + metrics, err := telemetry.NewMetrics() + if err != nil { + log.Fatalf("Error initializing metrics: %v", err) + } + // Start worker pool for processing detections - processor.New(settings, dataStore, bn, audioBuffer) + processor.New(settings, dataStore, bn, audioBuffer, metrics) // Start http server httpcontroller.New(settings, dataStore) @@ -98,11 +87,16 @@ func RealtimeAnalysis(settings *conf.Settings) error { var wg sync.WaitGroup // start buffer monitor startBufferMonitor(&wg, bn, quitChan) + // start audio capture startAudioCapture(&wg, settings, quitChan, restartChan, audioBuffer) + // start cleanup of clips startClipCleanupMonitor(&wg, settings, dataStore, quitChan) + // start telemetry endpoint + startTelemetryEndpoint(&wg, settings, metrics, quitChan) + // start quit signal monitor monitorCtrlC(quitChan) @@ -146,6 +140,20 @@ func startClipCleanupMonitor(wg *sync.WaitGroup, settings *conf.Settings, dataSt go ClipCleanupMonitor(wg, settings, dataStore, quitChan) } +func startTelemetryEndpoint(wg *sync.WaitGroup, settings *conf.Settings, metrics *telemetry.Metrics, quitChan chan struct{}) { + // Initialize Prometheus metrics endpoint if enabled + if settings.Realtime.Telemetry.Enabled { + // Initialize metrics endpoint + telemetryEndpoint, err := telemetry.NewEndpoint(settings) + if err != nil { + log.Printf("Error initializing metrics manager: %v", err) + } + + // Start metrics server + telemetryEndpoint.Start(metrics, wg, quitChan) + } +} + // monitorCtrlC listens for the SIGINT (Ctrl+C) signal and triggers the application shutdown process. func monitorCtrlC(quitChan chan struct{}) { go func() { diff --git a/internal/conf/config.go b/internal/conf/config.go index 6e53106c..e8130967 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -8,7 +8,6 @@ import ( "path/filepath" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/spf13/viper" ) @@ -38,10 +37,8 @@ type Settings struct { } Realtime struct { - Interval int // minimum interval between log messages in seconds - ProcessingTime bool // true to report processing time for each prediction - Prometheus bool // true to enable Prometheus metrics - PrometheusDetectionCounter *prometheus.CounterVec + Interval int // minimum interval between log messages in seconds + ProcessingTime bool // true to report processing time for each prediction AudioExport struct { Enabled bool // export audio clips containing indentified bird calls @@ -86,6 +83,11 @@ type Settings struct { Username string // MQTT username Password string // MQTT password } + + Telemetry struct { + Enabled bool // true to enable Prometheus compatible telemetry endpoint + Listen string // IP address and port to listen on + } } WebServer struct { @@ -268,16 +270,20 @@ realtime: username: birdnet # MQTT username password: secret # MQTT password - privacyfilter: - enabled: true + privacyfilter: # Privacy filter prevents audio clip saving if human voice + enabled: true # is detected durin audio capture dogbarkfilter: enabled: true + telemetry: + enabled: false # true to enable Prometheus compatible telemetry endpoint + listen: "0.0.0.0:8090" # IP address and port to listen on + retention: - enabled: true # true to enable retention policy of clips - minEvictionHours: 72 # minumum number of hours before considering clip for eviction - minClipsPerSpecies: 10 # minumum number of clips per species to keep before starting evictions + enabled: true # true to enable retention policy of clips + minEvictionHours: 72 # minumum number of hours before considering clip for eviction + minClipsPerSpecies: 10 # minumum number of clips per species to keep before starting evictions webserver: enabled: true # true to enable web server diff --git a/internal/telemetry/debug.go b/internal/telemetry/debug.go new file mode 100644 index 00000000..0d4e6ee7 --- /dev/null +++ b/internal/telemetry/debug.go @@ -0,0 +1,24 @@ +// debug.go: pprof debug routes and helpers for telemetry package +package telemetry + +import ( + "net/http" + "net/http/pprof" +) + +const debugPath = "/debug/pprof/" + +// RegisterDebugHandlers adds pprof debugging routes to the provided mux +func RegisterDebugHandlers(mux *http.ServeMux) { + mux.HandleFunc(debugPath, pprof.Index) + mux.HandleFunc(debugPath+"cmdline", pprof.Cmdline) + mux.HandleFunc(debugPath+"profile", pprof.Profile) + mux.HandleFunc(debugPath+"symbol", pprof.Symbol) + mux.HandleFunc(debugPath+"trace", pprof.Trace) + mux.Handle(debugPath+"allocs", pprof.Handler("allocs")) + mux.Handle(debugPath+"goroutine", pprof.Handler("goroutine")) + mux.Handle(debugPath+"heap", pprof.Handler("heap")) + mux.Handle(debugPath+"threadcreate", pprof.Handler("threadcreate")) + mux.Handle(debugPath+"block", pprof.Handler("block")) + mux.Handle(debugPath+"mutex", pprof.Handler("mutex")) +} diff --git a/internal/telemetry/endpoint.go b/internal/telemetry/endpoint.go new file mode 100644 index 00000000..5ded9c3d --- /dev/null +++ b/internal/telemetry/endpoint.go @@ -0,0 +1,63 @@ +// endpoint.go: Prohmetheus compatible telemetry endpoint +package telemetry + +import ( + "context" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/tphakala/birdnet-go/internal/conf" +) + +// Endpoint handles all operations related to Prometehus compatible telemetry +type Endpoint struct { + server *http.Server + ListenAddress string +} + +// New creates a new instance of telemetry Endpoint +func NewEndpoint(settings *conf.Settings) (*Endpoint, error) { + if !settings.Realtime.Telemetry.Enabled { + return nil, fmt.Errorf("metrics not enabled") + } + + return &Endpoint{ + ListenAddress: settings.Realtime.Telemetry.Listen, + }, nil +} + +// Start the HTTP server for telemetry endpoint and listen for the quit signal to shut down. +func (e *Endpoint) Start(metrics *Metrics, wg *sync.WaitGroup, quitChan <-chan struct{}) { + mux := http.NewServeMux() + RegisterMetricsHandlers(mux) // Registering metrics handlers + RegisterDebugHandlers(mux) // Registering debug handlers + + e.server = &http.Server{ + Addr: e.ListenAddress, + Handler: mux, + } + + // Run the server in a separate goroutine so that it doesn't block. + wg.Add(1) + go func() { + defer wg.Done() + log.Printf("Telemetry endpoint starting at %s", e.ListenAddress) + if err := e.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Failed to start telemetry HTTP server at %s: %v", e.ListenAddress, err) + } + }() + + // Listen for quit signal + go func() { + <-quitChan + log.Println("Quit signal received, stopping telemetry server.") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := e.server.Shutdown(ctx); err != nil { + log.Printf("Failed to shutdown telemetry server gracefully: %v", err) + } + }() +} diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go new file mode 100644 index 00000000..a1fd1138 --- /dev/null +++ b/internal/telemetry/metrics.go @@ -0,0 +1,47 @@ +// metrics.go: Prometheus metrics setup and manipulation for telemetry +package telemetry + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type Metrics struct { + DetectionCounter *prometheus.CounterVec + // Additional metrics can be added here +} + +const metricsPath = "/metrics" + +// NewMetrics initializes and registers all Prometheus metrics used in the telemetry system. +func NewMetrics() (*Metrics, error) { + metrics := &Metrics{} + + // Setup DetectionCounter + counterOpts := prometheus.CounterOpts{ + Name: "birdnet_detections", + Help: "Counts of BirdNET detections partitioned by common name.", + } + labels := []string{"name"} + metrics.DetectionCounter = prometheus.NewCounterVec(counterOpts, labels) + + if err := prometheus.Register(metrics.DetectionCounter); err != nil { + return nil, err + } + + // Additional metrics can be initialized here + + return metrics, nil +} + +// RegisterMetricsHandlers adds metrics routes to the provided mux +func RegisterMetricsHandlers(mux *http.ServeMux) { + mux.Handle(metricsPath, promhttp.Handler()) +} + +// IncrementDetectionCounter increments the detection counter for a given species +func (m *Metrics) IncrementDetectionCounter(speciesName string) { + m.DetectionCounter.WithLabelValues(speciesName).Inc() +}