Skip to content

Commit

Permalink
refactor(telemetry): Move Prometheus metrics to dedicated package and…
Browse files Browse the repository at this point in the history
… add pprof debug
  • Loading branch information
tphakala committed Apr 21, 2024
1 parent 44779f8 commit f4a6ab1
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 41 deletions.
3 changes: 2 additions & 1 deletion cmd/realtime/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func setupFlags(cmd *cobra.Command, settings *conf.Settings) error {
cmd.Flags().BoolVar(&settings.Realtime.ProcessingTime, "processingtime", viper.GetBool("realtime.processingtime"), "Report processing time for each detection")
cmd.Flags().StringVar(&settings.Realtime.RTSP.Url, "rtsp", viper.GetString("realtime.rtsp.url"), "URL of RTSP audio stream to capture")
cmd.Flags().StringVar(&settings.Realtime.RTSP.Transport, "rtsptransport", viper.GetString("realtime.rtsp.transport"), "RTSP transport (tcp/udp)")
cmd.Flags().BoolVar(&settings.Realtime.Prometheus, "prometheus", viper.GetBool("realtime.prometheus"), "Enable prometheus metrics")
cmd.Flags().BoolVar(&settings.Realtime.Telemetry.Enabled, "telemetry", viper.GetBool("realtime.telemetry.enabled"), "Enable Prometheus telemetry endpoint")
cmd.Flags().StringVar(&settings.Realtime.Telemetry.Listen, "listen", viper.GetString("realtime.telemetry.listen"), "Listen address and port of telemetry endpoint")

// Bind flags to the viper settings
if err := viper.BindPFlags(cmd.Flags()); err != nil {
Expand Down
24 changes: 13 additions & 11 deletions internal/analysis/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/tphakala/birdnet-go/internal/mqtt"
"github.com/tphakala/birdnet-go/internal/myaudio"
"github.com/tphakala/birdnet-go/internal/observation"
"github.com/tphakala/birdnet-go/internal/telemetry"
)

type Processor struct {
Expand All @@ -31,6 +32,7 @@ type Processor struct {
AudioBuffer *myaudio.AudioBuffer
LastDogDetection time.Time // keep track of dog barks to filter out false positive owl detections
LastHumanDetection time.Time // keep track of human vocal for privacy filtering
Metrics *telemetry.Metrics
}

type Detections struct {
Expand Down Expand Up @@ -58,14 +60,15 @@ var PendingDetections map[string]PendingDetection = make(map[string]PendingDetec
// ensuring thread safety when the map is accessed or modified by concurrent goroutines.
var mutex sync.Mutex

func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, audioBuffer *myaudio.AudioBuffer) *Processor {
func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, audioBuffer *myaudio.AudioBuffer, metrics *telemetry.Metrics) *Processor {
p := &Processor{
Settings: settings,
Ds: ds,
Bn: bn,
EventTracker: NewEventTracker(),
IncludedSpecies: new([]string),
AudioBuffer: audioBuffer,
Settings: settings, // BirdNET-Go Settings struct
Ds: ds, // Datastore
Bn: bn, // BirdNET analyzer
EventTracker: NewEventTracker(), // Duplicate event tracker
IncludedSpecies: new([]string), // Included species list
AudioBuffer: audioBuffer, // Audio buffer for audio export
Metrics: metrics, // Prometheus metrics struct
}

// Start the detection processor
Expand Down Expand Up @@ -329,10 +332,9 @@ func (p *Processor) pendingDetectionsFlusher() {
// Detection is now processed, remove it from pending detections map.
delete(PendingDetections, species)

// Update prometheus detection counter
if p.Settings.Realtime.Prometheus {
p.Settings.Realtime.PrometheusDetectionCounter.
WithLabelValues(item.Detection.Note.CommonName).Inc()
// Update Prometheus metrics detection counter
if p.Settings.Realtime.Telemetry.Enabled {
p.Metrics.IncrementDetectionCounter(item.Detection.Note.CommonName)
}
}
}
Expand Down
46 changes: 27 additions & 19 deletions internal/analysis/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@ package analysis
import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tphakala/birdnet-go/internal/analysis/processor"
"github.com/tphakala/birdnet-go/internal/analysis/queue"
"github.com/tphakala/birdnet-go/internal/birdnet"
"github.com/tphakala/birdnet-go/internal/conf"
"github.com/tphakala/birdnet-go/internal/datastore"
"github.com/tphakala/birdnet-go/internal/httpcontroller"
"github.com/tphakala/birdnet-go/internal/myaudio"
"github.com/tphakala/birdnet-go/internal/telemetry"
)

const (
Expand All @@ -28,7 +26,6 @@ const (

// RealtimeAnalysis initiates the BirdNET Analyzer in real-time mode and waits for a termination signal.
func RealtimeAnalysis(settings *conf.Settings) error {

// Initialize the BirdNET interpreter.
bn, err := birdnet.NewBirdNET(settings)
if err != nil {
Expand All @@ -48,20 +45,6 @@ func RealtimeAnalysis(settings *conf.Settings) error {
// Initialize database access.
dataStore := datastore.New(settings)

// Initialize Prometheus (if enabled)
if settings.Realtime.Prometheus {
settings.Realtime.PrometheusDetectionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "birdnet_detections",
Help: "How many BirdNET detections partitioned by common name.",
},
[]string{"name"},
)
prometheus.MustRegister(settings.Realtime.PrometheusDetectionCounter)
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":2112", nil)
}

// Open a connection to the database and handle possible errors.
if err := dataStore.Open(); err != nil {
//logger.Error("main", "Failed to open database: %v", err)
Expand All @@ -88,8 +71,14 @@ func RealtimeAnalysis(settings *conf.Settings) error {
// init detection queue
queue.Init(5, 5)

// Initialize Prometheus metrics manager
metrics, err := telemetry.NewMetrics()
if err != nil {
log.Fatalf("Error initializing metrics: %v", err)
}

// Start worker pool for processing detections
processor.New(settings, dataStore, bn, audioBuffer)
processor.New(settings, dataStore, bn, audioBuffer, metrics)

// Start http server
httpcontroller.New(settings, dataStore)
Expand All @@ -98,11 +87,16 @@ func RealtimeAnalysis(settings *conf.Settings) error {
var wg sync.WaitGroup
// start buffer monitor
startBufferMonitor(&wg, bn, quitChan)

// start audio capture
startAudioCapture(&wg, settings, quitChan, restartChan, audioBuffer)

// start cleanup of clips
startClipCleanupMonitor(&wg, settings, dataStore, quitChan)

// start telemetry endpoint
startTelemetryEndpoint(&wg, settings, metrics, quitChan)

// start quit signal monitor
monitorCtrlC(quitChan)

Expand Down Expand Up @@ -146,6 +140,20 @@ func startClipCleanupMonitor(wg *sync.WaitGroup, settings *conf.Settings, dataSt
go ClipCleanupMonitor(wg, settings, dataStore, quitChan)
}

func startTelemetryEndpoint(wg *sync.WaitGroup, settings *conf.Settings, metrics *telemetry.Metrics, quitChan chan struct{}) {
// Initialize Prometheus metrics endpoint if enabled
if settings.Realtime.Telemetry.Enabled {
// Initialize metrics endpoint
telemetryEndpoint, err := telemetry.NewEndpoint(settings)
if err != nil {
log.Printf("Error initializing metrics manager: %v", err)
}

// Start metrics server
telemetryEndpoint.Start(metrics, wg, quitChan)
}
}

// monitorCtrlC listens for the SIGINT (Ctrl+C) signal and triggers the application shutdown process.
func monitorCtrlC(quitChan chan struct{}) {
go func() {
Expand Down
26 changes: 16 additions & 10 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"path/filepath"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -38,10 +37,8 @@ type Settings struct {
}

Realtime struct {
Interval int // minimum interval between log messages in seconds
ProcessingTime bool // true to report processing time for each prediction
Prometheus bool // true to enable Prometheus metrics
PrometheusDetectionCounter *prometheus.CounterVec
Interval int // minimum interval between log messages in seconds
ProcessingTime bool // true to report processing time for each prediction

AudioExport struct {
Enabled bool // export audio clips containing indentified bird calls
Expand Down Expand Up @@ -86,6 +83,11 @@ type Settings struct {
Username string // MQTT username
Password string // MQTT password
}

Telemetry struct {
Enabled bool // true to enable Prometheus compatible telemetry endpoint
Listen string // IP address and port to listen on
}
}

WebServer struct {
Expand Down Expand Up @@ -268,16 +270,20 @@ realtime:
username: birdnet # MQTT username
password: secret # MQTT password
privacyfilter:
enabled: true
privacyfilter: # Privacy filter prevents audio clip saving if human voice
enabled: true # is detected durin audio capture
dogbarkfilter:
enabled: true
telemetry:
enabled: false # true to enable Prometheus compatible telemetry endpoint
listen: "0.0.0.0:8090" # IP address and port to listen on
retention:
enabled: true # true to enable retention policy of clips
minEvictionHours: 72 # minumum number of hours before considering clip for eviction
minClipsPerSpecies: 10 # minumum number of clips per species to keep before starting evictions
enabled: true # true to enable retention policy of clips
minEvictionHours: 72 # minumum number of hours before considering clip for eviction
minClipsPerSpecies: 10 # minumum number of clips per species to keep before starting evictions
webserver:
enabled: true # true to enable web server
Expand Down
24 changes: 24 additions & 0 deletions internal/telemetry/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// debug.go: pprof debug routes and helpers for telemetry package
package telemetry

import (
"net/http"
"net/http/pprof"
)

const debugPath = "/debug/pprof/"

// RegisterDebugHandlers adds pprof debugging routes to the provided mux
func RegisterDebugHandlers(mux *http.ServeMux) {
mux.HandleFunc(debugPath, pprof.Index)
mux.HandleFunc(debugPath+"cmdline", pprof.Cmdline)
mux.HandleFunc(debugPath+"profile", pprof.Profile)
mux.HandleFunc(debugPath+"symbol", pprof.Symbol)
mux.HandleFunc(debugPath+"trace", pprof.Trace)
mux.Handle(debugPath+"allocs", pprof.Handler("allocs"))
mux.Handle(debugPath+"goroutine", pprof.Handler("goroutine"))
mux.Handle(debugPath+"heap", pprof.Handler("heap"))
mux.Handle(debugPath+"threadcreate", pprof.Handler("threadcreate"))
mux.Handle(debugPath+"block", pprof.Handler("block"))
mux.Handle(debugPath+"mutex", pprof.Handler("mutex"))
}
63 changes: 63 additions & 0 deletions internal/telemetry/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// endpoint.go: Prohmetheus compatible telemetry endpoint
package telemetry

import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"

"github.com/tphakala/birdnet-go/internal/conf"
)

// Endpoint handles all operations related to Prometehus compatible telemetry
type Endpoint struct {
server *http.Server
ListenAddress string
}

// New creates a new instance of telemetry Endpoint
func NewEndpoint(settings *conf.Settings) (*Endpoint, error) {
if !settings.Realtime.Telemetry.Enabled {
return nil, fmt.Errorf("metrics not enabled")
}

return &Endpoint{
ListenAddress: settings.Realtime.Telemetry.Listen,
}, nil
}

// Start the HTTP server for telemetry endpoint and listen for the quit signal to shut down.
func (e *Endpoint) Start(metrics *Metrics, wg *sync.WaitGroup, quitChan <-chan struct{}) {
mux := http.NewServeMux()
RegisterMetricsHandlers(mux) // Registering metrics handlers
RegisterDebugHandlers(mux) // Registering debug handlers

e.server = &http.Server{
Addr: e.ListenAddress,
Handler: mux,
}

// Run the server in a separate goroutine so that it doesn't block.
wg.Add(1)
go func() {
defer wg.Done()
log.Printf("Telemetry endpoint starting at %s", e.ListenAddress)
if err := e.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Failed to start telemetry HTTP server at %s: %v", e.ListenAddress, err)
}
}()

// Listen for quit signal
go func() {
<-quitChan
log.Println("Quit signal received, stopping telemetry server.")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := e.server.Shutdown(ctx); err != nil {
log.Printf("Failed to shutdown telemetry server gracefully: %v", err)
}
}()
}
47 changes: 47 additions & 0 deletions internal/telemetry/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// metrics.go: Prometheus metrics setup and manipulation for telemetry
package telemetry

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type Metrics struct {
DetectionCounter *prometheus.CounterVec
// Additional metrics can be added here
}

const metricsPath = "/metrics"

// NewMetrics initializes and registers all Prometheus metrics used in the telemetry system.
func NewMetrics() (*Metrics, error) {
metrics := &Metrics{}

// Setup DetectionCounter
counterOpts := prometheus.CounterOpts{
Name: "birdnet_detections",
Help: "Counts of BirdNET detections partitioned by common name.",
}
labels := []string{"name"}
metrics.DetectionCounter = prometheus.NewCounterVec(counterOpts, labels)

if err := prometheus.Register(metrics.DetectionCounter); err != nil {
return nil, err
}

// Additional metrics can be initialized here

return metrics, nil
}

// RegisterMetricsHandlers adds metrics routes to the provided mux
func RegisterMetricsHandlers(mux *http.ServeMux) {
mux.Handle(metricsPath, promhttp.Handler())
}

// IncrementDetectionCounter increments the detection counter for a given species
func (m *Metrics) IncrementDetectionCounter(speciesName string) {
m.DetectionCounter.WithLabelValues(speciesName).Inc()
}

0 comments on commit f4a6ab1

Please sign in to comment.