Skip to content

Commit

Permalink
Merge pull request #121 from isZumpo/initial_retention_policy
Browse files Browse the repository at this point in the history
Add initial retention policy
  • Loading branch information
tphakala authored Apr 16, 2024
2 parents f4cbefb + 365730a commit 784b06f
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 0 deletions.
41 changes: 41 additions & 0 deletions internal/analysis/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"sync"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -99,6 +100,8 @@ func RealtimeAnalysis(settings *conf.Settings) error {
startBufferMonitor(&wg, bn, quitChan)
// start audio capture
startAudioCapture(&wg, settings, quitChan, restartChan, audioBuffer)
// start cleanup of clips
startClipCleanupMonitor(&wg, settings, dataStore, quitChan)

// start quit signal monitor
monitorCtrlC(quitChan)
Expand All @@ -122,6 +125,7 @@ func RealtimeAnalysis(settings *conf.Settings) error {
startAudioCapture(&wg, settings, quitChan, restartChan, audioBuffer)
}
}

}

// startAudioCapture initializes and starts the audio capture routine in a new goroutine.
Expand All @@ -136,6 +140,12 @@ func startBufferMonitor(wg *sync.WaitGroup, bn *birdnet.BirdNET, quitChan chan s
go myaudio.BufferMonitor(wg, bn, quitChan)
}

// startClipCleanupMonitor initializes and starts the clip cleanup monitoring routine in a new goroutine.
func startClipCleanupMonitor(wg *sync.WaitGroup, settings *conf.Settings, dataStore datastore.Interface, quitChan chan struct{}) {
wg.Add(1)
go ClipCleanupMonitor(wg, settings, dataStore, quitChan)
}

// monitorCtrlC listens for the SIGINT (Ctrl+C) signal and triggers the application shutdown process.
func monitorCtrlC(quitChan chan struct{}) {
go func() {
Expand All @@ -157,3 +167,34 @@ func closeDataStore(store datastore.Interface) {
log.Println("Successfully closed database")
}
}

// ClipCleanupMonitor monitors the database and deletes clips that meets the retention policy.
func ClipCleanupMonitor(wg *sync.WaitGroup, settings *conf.Settings, dataStore datastore.Interface, quitChan chan struct{}) {
defer wg.Done()

// Creating a ticker that ticks every 1 minute
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for {
select {
case <-quitChan:
// Quit signal received, stop the clip cleanup monitor
return

case <-ticker.C: // Wait for the next tick
clipsForRemoval, _ := dataStore.GetClipsQualifyingForRemoval(settings.Realtime.Retention.MinEvictionHours, settings.Realtime.Retention.MinClipsPerSpecies)

log.Printf("Found %d clips to remove\n", len(clipsForRemoval))

for _, clip := range clipsForRemoval {
if err := os.Remove(clip.ClipName); err != nil {
log.Printf("Failed to remove %s: %s\n", clip.ClipName, err)
} else {
log.Printf("Removed %s\n", clip.ClipName)
}
dataStore.DeleteNoteClipPath(clip.ID)
}
}
}
}
10 changes: 10 additions & 0 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type Settings struct {
Enabled bool // true to enable dog bark filter
}

Retention struct {
MinEvictionHours int // minimum number of hours to keep audio clips
MinClipsPerSpecies int // minimum number of clips per species to keep
}

RTSP struct {
Url string // RTSP stream URL
Transport string // RTSP Transport Protocol
Expand Down Expand Up @@ -269,6 +274,11 @@ realtime:
dogbarkfilter:
enabled: true
retention:
enabled: true # true to enable retention policy of clips
minEvictionHours: 72 # minumum number of hours before considering clip for eviction
minClipsPerSpecies: 10 # minumum number of clips per species to keep before starting evictions
webserver:
enabled: true # true to enable web server
port: 8080 # port for web server
Expand Down
45 changes: 45 additions & 0 deletions internal/datastore/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Interface interface {
GetLastDetections(numDetections int) ([]Note, error)
SearchNotes(query string, sortAscending bool, limit int, offset int) ([]Note, error)
GetNoteClipPath(noteID string) (string, error)
DeleteNoteClipPath(noteID string) error
GetClipsQualifyingForRemoval(minHours int, minClips int) ([]ClipForRemoval, error)
}

// DataStore implements StoreInterface using a GORM database.
Expand Down Expand Up @@ -148,6 +150,19 @@ func (ds *DataStore) GetNoteClipPath(noteID string) (string, error) {
return clipPath.ClipName, nil
}

// DeleteNoteClipPath deletes the field representing the path to the audio clip associated with a note.
func (ds *DataStore) DeleteNoteClipPath(noteID string) error {
err := ds.DB.Model(&Note{}).
Where("id = ?", noteID).
Update("clip_name", "").Error

if err != nil {
return fmt.Errorf("failed to delete clip path: %w", err)
}

return nil
}

// GetAllNotes retrieves all notes from the database.
func (ds *DataStore) GetAllNotes() ([]Note, error) {
var notes []Note
Expand All @@ -174,6 +189,36 @@ func (ds *DataStore) GetTopBirdsData(selectedDate string, minConfidenceNormalize
return results, err
}

type ClipForRemoval struct {
ID string
ScientificName string
ClipName string
NumRecordings int
}

// GetClipsQualifyingForRemoval returns the list of clips that qualify for removal based on retention policy.
func (ds *DataStore) GetClipsQualifyingForRemoval(minHours int, minClips int) ([]ClipForRemoval, error) {

if minHours <= 0 || minClips <= 0 {
return []ClipForRemoval{}, nil
}

var results []ClipForRemoval

subquery := ds.DB.Model(&Note{}).
Select("ID, scientific_name, ROW_NUMBER () OVER ( PARTITION BY scientific_name ) num_recordings").
Where("clip_name != ''")

ds.DB.Model(&Note{}).
Select("n.ID, n.scientific_name, n.clip_name, sub.num_recordings").
Joins("n INNER JOIN (?) AS sub ON n.ID = sub.ID", subquery).
Where("(strftime('%s', 'now') - strftime('%s', begin_time)) / 3600 > ?", minHours).
Where("sub.num_recordings > ?", minClips).
Scan(&results)

return results, nil
}

// GetHourFormat returns the database-specific SQL fragment for formatting a time column as hour.
func (ds *DataStore) GetHourFormat() string {
// Handling for supported databases: SQLite and MySQL
Expand Down
88 changes: 88 additions & 0 deletions internal/datastore/interfaces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package datastore

import (
"testing"
"time"

"github.com/tphakala/birdnet-go/internal/conf"
"github.com/tphakala/birdnet-go/internal/logger"
)

func createDatabase(t testing.TB, settings *conf.Settings) Interface {
tempDir := t.TempDir()
settings.Output.SQLite.Enabled = true
settings.Output.SQLite.Path = tempDir + "/test.db"

dataStore := New(settings)

// Open a connection to the database and handle possible errors.
if err := dataStore.Open(); err != nil {
logger.Error("main", "Failed to open database: %v", err)
} else {
t.Cleanup(func() { dataStore.Close() })
}

return dataStore
}

func TestGetClipsQualifyingForRemoval(t *testing.T) {

settings := &conf.Settings{}

dataStore := createDatabase(t, settings)

// One Cool bird should be removed since there is one too many
// No Amazing bird should be removed since there is only one
// While there are two Wonderful birds, only one of them are old enough, but too few to be removed
// While there are two Magnificent birds, only one of them have a clip, meaning that the remaining one should be kept
dataStore.Save(&Note{
ClipName: "test.wav",
ScientificName: "Cool bird",
BeginTime: time.Now().Add(-2 * time.Hour),
}, []Results{})
dataStore.Save(&Note{
ClipName: "test2.wav",
ScientificName: "Amazing bird",
BeginTime: time.Now().Add(-2 * time.Hour),
}, []Results{})
dataStore.Save(&Note{
ClipName: "test3.wav",
ScientificName: "Cool bird",
BeginTime: time.Now().Add(-2 * time.Hour),
}, []Results{})
dataStore.Save(&Note{
ClipName: "test4.wav",
ScientificName: "Wonderful bird",
BeginTime: time.Now().Add(-2 * time.Hour),
}, []Results{})
dataStore.Save(&Note{
ClipName: "test5.wav",
ScientificName: "Magnificent bird",
BeginTime: time.Now().Add(-2 * time.Hour),
}, []Results{})
dataStore.Save(&Note{
ClipName: "",
ScientificName: "Magnificent bird",
BeginTime: time.Now(),
}, []Results{})
dataStore.Save(&Note{
ClipName: "test7.wav",
ScientificName: "Wonderful bird",
BeginTime: time.Now(),
}, []Results{})

minHours := 1
minClips := 1

clipsForRemoval, err := dataStore.GetClipsQualifyingForRemoval(minHours, minClips)

if err != nil {
t.Errorf("Expected no error, got %v", err)
}
if len(clipsForRemoval) != 1 {
t.Errorf("Expected one entry in clipsForRemoval, got %d", len(clipsForRemoval))
}
if clipsForRemoval[0].ScientificName != "Cool bird" {
t.Errorf("Expected ScientificName to be 'Cool bird', got '%s'", clipsForRemoval[0].ScientificName)
}
}

0 comments on commit 784b06f

Please sign in to comment.