Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event tracker fixes #393

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 67 additions & 59 deletions internal/analysis/processor/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,62 +73,62 @@ func (a LogAction) Execute(data interface{}) error {
species := strings.ToLower(a.Note.CommonName)

// Check if the event should be handled for this species
if a.EventTracker.TrackEvent(species, LogToFile) {
if err := observation.LogNoteToFile(a.Settings, a.Note); err != nil {
// If an error occurs when logging to a file, wrap and return the error.
log.Printf("Failed to log note to file: %v", err)
}
fmt.Printf("%s %s %.2f\n", a.Note.Time, a.Note.CommonName, a.Note.Confidence)
if !a.EventTracker.TrackEvent(species, LogToFile) {
return nil
}

//log.Printf("Log action throttled for species: %s", species)
// Log note to file
if err := observation.LogNoteToFile(a.Settings, a.Note); err != nil {
// If an error occurs when logging to a file, wrap and return the error.
log.Printf("Failed to log note to file: %v", err)
}
fmt.Printf("%s %s %.2f\n", a.Note.Time, a.Note.CommonName, a.Note.Confidence)

return nil
}

// Execute saves the note to the database
func (a DatabaseAction) Execute(data interface{}) error {
species := strings.ToLower(a.Note.CommonName)

// Check if the event should be handled for this species
if a.EventTracker.TrackEvent(species, DatabaseSave) {
// Save note to database
if err := a.Ds.Save(&a.Note, a.Results); err != nil {
log.Printf("Failed to save note and results to database: %v", err)
// Check event frequency
if !a.EventTracker.TrackEvent(species, DatabaseSave) {
return nil
}

// Save note to database
if err := a.Ds.Save(&a.Note, a.Results); err != nil {
log.Printf("Failed to save note and results to database: %v", err)
return err
}

// Save audio clip to file if enabled
if a.Settings.Realtime.Audio.Export.Enabled {
// export audio clip from capture buffer
pcmData, err := myaudio.ReadSegmentFromCaptureBuffer(a.Note.Source, a.Note.BeginTime, 15)
if err != nil {
log.Printf("Failed to read audio segment from buffer: %v", err)
return err
}

// Save audio clip to file if enabled
if a.Settings.Realtime.Audio.Export.Enabled {
// export audio clip from capture buffer
pcmData, err := myaudio.ReadSegmentFromCaptureBuffer(a.Note.Source, a.Note.BeginTime, 15)
if err != nil {
log.Printf("Failed to read audio segment from buffer: %v", err)
return err
}

// Create a SaveAudioAction and execute it
saveAudioAction := SaveAudioAction{
Settings: a.Settings,
ClipName: a.Note.ClipName,
pcmData: pcmData,
}

if err := saveAudioAction.Execute(nil); err != nil {
log.Printf("Failed to save audio clip: %v", err)
return err
}

if a.Settings.Debug {
log.Printf("Saved audio clip to %s\n", a.Note.ClipName)
log.Printf("detection time %v, begin time %v, end time %v\n", a.Note.Time, a.Note.BeginTime, time.Now())
}
// Create a SaveAudioAction and execute it
saveAudioAction := SaveAudioAction{
Settings: a.Settings,
ClipName: a.Note.ClipName,
pcmData: pcmData,
}

return nil
if err := saveAudioAction.Execute(nil); err != nil {
log.Printf("Failed to save audio clip: %v", err)
return err
}

if a.Settings.Debug {
log.Printf("Saved audio clip to %s\n", a.Note.ClipName)
log.Printf("detection time %v, begin time %v, end time %v\n", a.Note.Time, a.Note.BeginTime, time.Now())
}
}

//log.Printf("Database save action throttled for species: %s", species)
return nil
}

Expand Down Expand Up @@ -170,30 +170,31 @@ func (a SaveAudioAction) Execute(data interface{}) error {
func (a BirdWeatherAction) Execute(data interface{}) error {
species := strings.ToLower(a.Note.CommonName)

if a.EventTracker.TrackEvent(species, BirdWeatherSubmit) {
// Add threshold check here
if a.Note.Confidence < float64(a.Settings.Realtime.Birdweather.Threshold) {
if a.Settings.Debug {
log.Printf("Skipping BirdWeather upload for %s: confidence %.2f below threshold %.2f\n",
species, a.Note.Confidence, a.Settings.Realtime.Birdweather.Threshold)
}
return nil
}

if a.BwClient == nil {
return fmt.Errorf("BirdWeather client is not initialized")
}
// Check event frequency
if !a.EventTracker.TrackEvent(species, BirdWeatherSubmit) {
return nil
}

if err := a.BwClient.Publish(a.Note, a.pcmData); err != nil {
log.Printf("error uploading to BirdWeather: %s\n", err)
return err
} else if a.Settings.Debug {
log.Printf("Uploaded %s to Birdweather\n", a.Note.ClipName)
// Add threshold check here
if a.Note.Confidence < float64(a.Settings.Realtime.Birdweather.Threshold) {
if a.Settings.Debug {
log.Printf("Skipping BirdWeather upload for %s: confidence %.2f below threshold %.2f\n",
species, a.Note.Confidence, a.Settings.Realtime.Birdweather.Threshold)
}
return nil
}
//log.Printf("BirdWeather Submit action throttled for species: %s", species)
return nil // return an error if the action fails

if a.BwClient == nil {
return fmt.Errorf("BirdWeather client is not initialized")
}

if err := a.BwClient.Publish(a.Note, a.pcmData); err != nil {
log.Printf("error uploading to BirdWeather: %s\n", err)
return err
} else if a.Settings.Debug {
log.Printf("Uploaded %s to Birdweather\n", a.Note.ClipName)
}
return nil
}

type NoteWithBirdImage struct {
Expand All @@ -203,6 +204,13 @@ type NoteWithBirdImage struct {

// Execute sends the note to the MQTT broker
func (a MqttAction) Execute(data interface{}) error {
species := strings.ToLower(a.Note.CommonName)

// Check event frequency
if !a.EventTracker.TrackEvent(species, MQTTPublish) {
return nil
}

// First, check if the MQTT client is connected
if !a.MqttClient.IsConnected() {
log.Println("MQTT client is not connected, skipping publish")
Expand Down
22 changes: 16 additions & 6 deletions internal/analysis/processor/eventtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,24 @@ type EventTracker struct {
Mutex sync.Mutex // Mutex to ensure thread-safe access
}

// NewEventTracker initializes a new EventTracker with default event handlers.
func NewEventTracker() *EventTracker {
// Add this new struct to hold configuration
type EventTrackerConfig struct {
DatabaseSaveInterval time.Duration
LogToFileInterval time.Duration
NotificationInterval time.Duration
BirdWeatherSubmitInterval time.Duration
MQTTPublishInterval time.Duration
}

// Modify NewEventTracker to accept configuration
func NewEventTracker(interval time.Duration) *EventTracker {
return &EventTracker{
Handlers: map[EventType]*EventHandler{
DatabaseSave: NewEventHandler(15*time.Second, StandardEventBehavior),
LogToFile: NewEventHandler(15*time.Second, StandardEventBehavior),
SendNotification: NewEventHandler(60*time.Minute, StandardEventBehavior),
BirdWeatherSubmit: NewEventHandler(15*time.Second, StandardEventBehavior),
DatabaseSave: NewEventHandler(interval, StandardEventBehavior),
LogToFile: NewEventHandler(interval, StandardEventBehavior),
SendNotification: NewEventHandler(interval, StandardEventBehavior),
BirdWeatherSubmit: NewEventHandler(interval, StandardEventBehavior),
MQTTPublish: NewEventHandler(interval, StandardEventBehavior),
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/analysis/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, m
Ds: ds,
Bn: bn,
BirdImageCache: birdImageCache,
EventTracker: NewEventTracker(),
EventTracker: NewEventTracker(time.Duration(settings.Realtime.Interval) * time.Second),
Metrics: metrics,
LastDogDetection: make(map[string]time.Time),
LastHumanDetection: make(map[string]time.Time),
Expand Down
Loading