Skip to content

Commit

Permalink
Merge pull request #393 from tphakala/event-tracker-fix
Browse files Browse the repository at this point in the history
Event tracker fixes
  • Loading branch information
tphakala authored Jan 15, 2025
2 parents ec4bc1e + 2905037 commit 0f87b8d
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 66 deletions.
126 changes: 67 additions & 59 deletions internal/analysis/processor/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,62 +73,62 @@ func (a LogAction) Execute(data interface{}) error {
species := strings.ToLower(a.Note.CommonName)

// Check if the event should be handled for this species
if a.EventTracker.TrackEvent(species, LogToFile) {
if err := observation.LogNoteToFile(a.Settings, a.Note); err != nil {
// If an error occurs when logging to a file, wrap and return the error.
log.Printf("Failed to log note to file: %v", err)
}
fmt.Printf("%s %s %.2f\n", a.Note.Time, a.Note.CommonName, a.Note.Confidence)
if !a.EventTracker.TrackEvent(species, LogToFile) {
return nil
}

//log.Printf("Log action throttled for species: %s", species)
// Log note to file
if err := observation.LogNoteToFile(a.Settings, a.Note); err != nil {
// If an error occurs when logging to a file, wrap and return the error.
log.Printf("Failed to log note to file: %v", err)
}
fmt.Printf("%s %s %.2f\n", a.Note.Time, a.Note.CommonName, a.Note.Confidence)

return nil
}

// Execute saves the note to the database
func (a DatabaseAction) Execute(data interface{}) error {
species := strings.ToLower(a.Note.CommonName)

// Check if the event should be handled for this species
if a.EventTracker.TrackEvent(species, DatabaseSave) {
// Save note to database
if err := a.Ds.Save(&a.Note, a.Results); err != nil {
log.Printf("Failed to save note and results to database: %v", err)
// Check event frequency
if !a.EventTracker.TrackEvent(species, DatabaseSave) {
return nil
}

// Save note to database
if err := a.Ds.Save(&a.Note, a.Results); err != nil {
log.Printf("Failed to save note and results to database: %v", err)
return err
}

// Save audio clip to file if enabled
if a.Settings.Realtime.Audio.Export.Enabled {
// export audio clip from capture buffer
pcmData, err := myaudio.ReadSegmentFromCaptureBuffer(a.Note.Source, a.Note.BeginTime, 15)
if err != nil {
log.Printf("Failed to read audio segment from buffer: %v", err)
return err
}

// Save audio clip to file if enabled
if a.Settings.Realtime.Audio.Export.Enabled {
// export audio clip from capture buffer
pcmData, err := myaudio.ReadSegmentFromCaptureBuffer(a.Note.Source, a.Note.BeginTime, 15)
if err != nil {
log.Printf("Failed to read audio segment from buffer: %v", err)
return err
}

// Create a SaveAudioAction and execute it
saveAudioAction := SaveAudioAction{
Settings: a.Settings,
ClipName: a.Note.ClipName,
pcmData: pcmData,
}

if err := saveAudioAction.Execute(nil); err != nil {
log.Printf("Failed to save audio clip: %v", err)
return err
}

if a.Settings.Debug {
log.Printf("Saved audio clip to %s\n", a.Note.ClipName)
log.Printf("detection time %v, begin time %v, end time %v\n", a.Note.Time, a.Note.BeginTime, time.Now())
}
// Create a SaveAudioAction and execute it
saveAudioAction := SaveAudioAction{
Settings: a.Settings,
ClipName: a.Note.ClipName,
pcmData: pcmData,
}

return nil
if err := saveAudioAction.Execute(nil); err != nil {
log.Printf("Failed to save audio clip: %v", err)
return err
}

if a.Settings.Debug {
log.Printf("Saved audio clip to %s\n", a.Note.ClipName)
log.Printf("detection time %v, begin time %v, end time %v\n", a.Note.Time, a.Note.BeginTime, time.Now())
}
}

//log.Printf("Database save action throttled for species: %s", species)
return nil
}

Expand Down Expand Up @@ -170,30 +170,31 @@ func (a SaveAudioAction) Execute(data interface{}) error {
func (a BirdWeatherAction) Execute(data interface{}) error {
species := strings.ToLower(a.Note.CommonName)

if a.EventTracker.TrackEvent(species, BirdWeatherSubmit) {
// Add threshold check here
if a.Note.Confidence < float64(a.Settings.Realtime.Birdweather.Threshold) {
if a.Settings.Debug {
log.Printf("Skipping BirdWeather upload for %s: confidence %.2f below threshold %.2f\n",
species, a.Note.Confidence, a.Settings.Realtime.Birdweather.Threshold)
}
return nil
}

if a.BwClient == nil {
return fmt.Errorf("BirdWeather client is not initialized")
}
// Check event frequency
if !a.EventTracker.TrackEvent(species, BirdWeatherSubmit) {
return nil
}

if err := a.BwClient.Publish(a.Note, a.pcmData); err != nil {
log.Printf("error uploading to BirdWeather: %s\n", err)
return err
} else if a.Settings.Debug {
log.Printf("Uploaded %s to Birdweather\n", a.Note.ClipName)
// Add threshold check here
if a.Note.Confidence < float64(a.Settings.Realtime.Birdweather.Threshold) {
if a.Settings.Debug {
log.Printf("Skipping BirdWeather upload for %s: confidence %.2f below threshold %.2f\n",
species, a.Note.Confidence, a.Settings.Realtime.Birdweather.Threshold)
}
return nil
}
//log.Printf("BirdWeather Submit action throttled for species: %s", species)
return nil // return an error if the action fails

if a.BwClient == nil {
return fmt.Errorf("BirdWeather client is not initialized")
}

if err := a.BwClient.Publish(a.Note, a.pcmData); err != nil {
log.Printf("error uploading to BirdWeather: %s\n", err)
return err
} else if a.Settings.Debug {
log.Printf("Uploaded %s to Birdweather\n", a.Note.ClipName)
}
return nil
}

type NoteWithBirdImage struct {
Expand All @@ -203,6 +204,13 @@ type NoteWithBirdImage struct {

// Execute sends the note to the MQTT broker
func (a MqttAction) Execute(data interface{}) error {
species := strings.ToLower(a.Note.CommonName)

// Check event frequency
if !a.EventTracker.TrackEvent(species, MQTTPublish) {
return nil
}

// First, check if the MQTT client is connected
if !a.MqttClient.IsConnected() {
log.Println("MQTT client is not connected, skipping publish")
Expand Down
22 changes: 16 additions & 6 deletions internal/analysis/processor/eventtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,24 @@ type EventTracker struct {
Mutex sync.Mutex // Mutex to ensure thread-safe access
}

// NewEventTracker initializes a new EventTracker with default event handlers.
func NewEventTracker() *EventTracker {
// Add this new struct to hold configuration
type EventTrackerConfig struct {
DatabaseSaveInterval time.Duration
LogToFileInterval time.Duration
NotificationInterval time.Duration
BirdWeatherSubmitInterval time.Duration
MQTTPublishInterval time.Duration
}

// Modify NewEventTracker to accept configuration
func NewEventTracker(interval time.Duration) *EventTracker {
return &EventTracker{
Handlers: map[EventType]*EventHandler{
DatabaseSave: NewEventHandler(15*time.Second, StandardEventBehavior),
LogToFile: NewEventHandler(15*time.Second, StandardEventBehavior),
SendNotification: NewEventHandler(60*time.Minute, StandardEventBehavior),
BirdWeatherSubmit: NewEventHandler(15*time.Second, StandardEventBehavior),
DatabaseSave: NewEventHandler(interval, StandardEventBehavior),
LogToFile: NewEventHandler(interval, StandardEventBehavior),
SendNotification: NewEventHandler(interval, StandardEventBehavior),
BirdWeatherSubmit: NewEventHandler(interval, StandardEventBehavior),
MQTTPublish: NewEventHandler(interval, StandardEventBehavior),
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/analysis/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, m
Ds: ds,
Bn: bn,
BirdImageCache: birdImageCache,
EventTracker: NewEventTracker(),
EventTracker: NewEventTracker(time.Duration(settings.Realtime.Interval) * time.Second),
Metrics: metrics,
LastDogDetection: make(map[string]time.Time),
LastHumanDetection: make(map[string]time.Time),
Expand Down

0 comments on commit 0f87b8d

Please sign in to comment.