Skip to content

Commit

Permalink
Merge pull request #98 from janvrska/mqtt-support
Browse files Browse the repository at this point in the history
Add MQTT support
  • Loading branch information
tphakala authored Apr 5, 2024
2 parents 7dd43f6 + 0551f70 commit 117c9c1
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ require (
)

require (
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-audio/riff v1.0.0 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
Expand All @@ -49,6 +51,7 @@ require (
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
Expand All @@ -21,6 +23,8 @@ github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keL
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down Expand Up @@ -102,6 +106,8 @@ golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjs
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
Expand Down
44 changes: 44 additions & 0 deletions internal/analysis/processor/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package processor

import (
"encoding/json"
"fmt"
"log"
"strings"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/tphakala/birdnet-go/internal/birdweather"
"github.com/tphakala/birdnet-go/internal/conf"
"github.com/tphakala/birdnet-go/internal/datastore"
"github.com/tphakala/birdnet-go/internal/mqtt"
"github.com/tphakala/birdnet-go/internal/myaudio"
"github.com/tphakala/birdnet-go/internal/observation"
)
Expand Down Expand Up @@ -51,6 +53,13 @@ type BirdWeatherAction struct {
EventTracker *EventTracker
}

type MqttAction struct {
Settings *conf.Settings
Note datastore.Note
MqttClient *mqtt.Client
EventTracker *EventTracker
}

type UpdateRangeFilterAction struct {
Bn *birdnet.BirdNET
IncludedSpecies *[]string
Expand Down Expand Up @@ -140,6 +149,41 @@ func (a BirdWeatherAction) Execute(data interface{}) error {
return nil // return an error if the action fails
}

// Execute sends the note to the MQTT broker
import (
"errors"
"encoding/json"
"fmt"
"log"
"strings"
"time"

"github.com/tphakala/birdnet-go/internal/birdnet"
"github.com/tphakala/birdnet-go/internal/birdweather"
"github.com/tphakala/birdnet-go/internal/conf"
"github.com/tphakala/birdnet-go/internal/datastore"
"github.com/tphakala/birdnet-go/internal/mqtt"
"github.com/tphakala/birdnet-go/internal/myaudio"
"github.com/tphakala/birdnet-go/internal/observation"
)
func (a MqttAction) Execute(data interface{}) error {
if a.Settings.Realtime.MQTT.Topic == "" {
return errors.New("MQTT topic is not specified")
}
noteJson, err := json.Marshal(a.Note)
if err != nil {
log.Printf("error marshalling note to JSON: %s\n", err)
return err
}

err = a.MqttClient.Publish(a.Settings.Realtime.MQTT.Topic, string(noteJson))
if err != nil {
return err
}

return nil // return an error if the action fails
}

func (a UpdateRangeFilterAction) Execute(data interface{}) error {
today := time.Now().Truncate(24 * time.Hour)
if today.After(*a.SpeciesListUpdated) {
Expand Down
17 changes: 17 additions & 0 deletions internal/analysis/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/tphakala/birdnet-go/internal/birdweather"
"github.com/tphakala/birdnet-go/internal/conf"
"github.com/tphakala/birdnet-go/internal/datastore"
"github.com/tphakala/birdnet-go/internal/mqtt"
"github.com/tphakala/birdnet-go/internal/myaudio"
"github.com/tphakala/birdnet-go/internal/observation"
)
Expand All @@ -21,6 +22,7 @@ type Processor struct {
Ds datastore.Interface
Bn *birdnet.BirdNET
BwClient *birdweather.BwClient
MqttClient *mqtt.Client
EventTracker *EventTracker
DogBarkFilter DogBarkFilter
SpeciesConfig SpeciesConfig
Expand Down Expand Up @@ -87,6 +89,21 @@ func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, a
p.BwClient = birdweather.New(settings)
}

if settings.Realtime.MQTT.Enabled {
p.MqttClient = mqtt.New(settings)

// Connect to the MQTT broker in a separate goroutine to avoid blocking the main thread.
go func() {
log.Println("Connecting to MQTT broker")
err := p.MqttClient.Connect()
if err != nil {
log.Printf("Failed to connect to MQTT broker: %s", err)
} else {
log.Println("Successfully connected to MQTT broker")
}
}()
}

// Initialize included species list
today := time.Now().Truncate(24 * time.Hour)
*p.IncludedSpecies = bn.GetProbableSpecies()
Expand Down
7 changes: 7 additions & 0 deletions internal/analysis/processor/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (p *Processor) getDefaultActions(detection Detections) []Action {
if p.Settings.Realtime.Birdweather.Enabled {
actions = append(actions, BirdWeatherAction{Settings: p.Settings, EventTracker: p.EventTracker, BwClient: p.BwClient, Note: detection.Note, pcmData: detection.pcmData3s})
}
if p.Settings.Realtime.MQTT.Enabled {
if p.MqttClient == nil {
log.Println("MQTT client is not initialized, skipping MQTT action")
return actions
}
actions = append(actions, MqttAction{Settings: p.Settings, MqttClient: p.MqttClient, EventTracker: p.EventTracker, Note: detection.Note})
}
// Check if UpdateRangeFilterAction needs to be executed for the day
today := time.Now().Truncate(24 * time.Hour) // Current date with time set to midnight
if p.SpeciesListUpdated.Before(today) {
Expand Down
30 changes: 30 additions & 0 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,29 @@ type Settings struct {
Url string // RTSP stream URL
Transport string // RTSP Transport Protocol
}

MQTT struct {
Enabled bool // true to enable MQTT
Broker string // MQTT (tcp://host:port)
Topic string // MQTT topic
Username string // MQTT username
Password string // MQTT password
}
func Load() (*Settings, error) {

Check failure on line 80 in internal/conf/config.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected func, expected field name or embedded type

Check failure on line 80 in internal/conf/config.go

View workflow job for this annotation

GitHub Actions / lint

expected '}', found 'func' (typecheck)

Check failure on line 80 in internal/conf/config.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected func, expected field name or embedded type
settings := &Settings{}
if err := initViper(); err != nil {

Check failure on line 82 in internal/conf/config.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected if, expected field name or embedded type

Check failure on line 82 in internal/conf/config.go

View workflow job for this annotation

GitHub Actions / lint

expected '}', found 'if' (typecheck)

Check failure on line 82 in internal/conf/config.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected if, expected field name or embedded type
return nil, fmt.Errorf("error initializing viper: %w", err)

Check failure on line 83 in internal/conf/config.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected comma in struct type; possibly missing semicolon or newline or } (typecheck)

Check failure on line 83 in internal/conf/config.go

View workflow job for this annotation

GitHub Actions / lint

expected declaration, found 'return' (typecheck)
}
if err := viper.Unmarshal(settings); err != nil {
return nil, fmt.Errorf("error unmarshaling config into struct: %w", err)
}
if settings.Realtime.MQTT.Enabled {
if settings.Realtime.MQTT.Broker == "" {
return nil, errors.New("MQTT broker URL is required when MQTT is enabled")
}
}
return settings, nil
}
}

WebServer struct {
Expand Down Expand Up @@ -238,6 +261,13 @@ realtime:
url: # RTSP stream URL
transport: tcp # RTSP Transport Protocol
mqtt:
enabled: false # true to enable MQTT
broker: tcp://localhost:1883 # MQTT (tcp://host:port)
topic: birdnet # MQTT topic
username: birdnet # MQTT username
password: secret # MQTT password
privacyfilter:
enabled: true
Expand Down
72 changes: 72 additions & 0 deletions internal/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package mqtt

import (
"errors"
"log"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/tphakala/birdnet-go/internal/conf"
)

type Client struct {
Settings *conf.Settings
internalClient mqtt.Client
}

func New(settings *conf.Settings) (*Client, error) {
if settings == nil || settings.Realtime.MQTT.Broker == "" {
return nil, errors.New("invalid MQTT settings provided")
}
return &Client{
Settings: settings,
}, nil
}

// Connect to MQTT broker
func (c *Client) Connect() error {
opts := mqtt.NewClientOptions()
opts.AddBroker(c.Settings.Realtime.MQTT.Broker)
opts.SetClientID("birdnet-go")
opts.SetUsername(c.Settings.Realtime.MQTT.Username)
opts.SetPassword(c.Settings.Realtime.MQTT.Password)
opts.SetCleanSession(true)
opts.SetAutoReconnect(true)
opts.SetOnConnectHandler(c.onConnect)
opts.SetConnectionLostHandler(c.onConnectionLost)
opts.SetConnectRetry(true)

client := mqtt.NewClient(opts)
c.internalClient = client

// It will wait infinitely until the connection is established
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Printf("Failed to connect to MQTT broker: %s", token.Error())
return errors.New("failed to connect to MQTT broker")
}

return nil
}

// Publish a message to a topic
func (c *Client) Publish(topic string, payload string) error {
if c.internalClient == nil {
return errors.New("MQTT client is not initialized")
}

if c.internalClient.IsConnected() == false {
return errors.New("MQTT client is not connected")
}

token := c.internalClient.Publish(topic, 0, false, payload)
token.Wait()
return token.Error()
}

func (c *Client) onConnect(client mqtt.Client) {
log.Printf("Connected to MQTT broker: %s", c.Settings.Realtime.MQTT.Broker)
}

func (c *Client) onConnectionLost(client mqtt.Client, err error) {
log.Printf("Connection to MQTT broker lost: %s, error: %v", c.Settings.Realtime.MQTT.Broker, err)
}

0 comments on commit 117c9c1

Please sign in to comment.