Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MQTT support #98

Merged
merged 9 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ require (
)

require (
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-audio/riff v1.0.0 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
Expand All @@ -49,6 +51,7 @@ require (
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
Expand All @@ -21,6 +23,8 @@ github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keL
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down Expand Up @@ -102,6 +106,8 @@ golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjs
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
Expand Down
28 changes: 28 additions & 0 deletions internal/analysis/processor/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package processor

import (
"encoding/json"
"fmt"
"log"
"strings"
Expand All @@ -12,6 +13,7 @@
"github.com/tphakala/birdnet-go/internal/birdweather"
"github.com/tphakala/birdnet-go/internal/conf"
"github.com/tphakala/birdnet-go/internal/datastore"
"github.com/tphakala/birdnet-go/internal/mqtt"
"github.com/tphakala/birdnet-go/internal/myaudio"
"github.com/tphakala/birdnet-go/internal/observation"
)
Expand Down Expand Up @@ -51,6 +53,13 @@
EventTracker *EventTracker
}

type MqttAction struct {
Settings *conf.Settings
Note datastore.Note
MqttClient *mqtt.Client
EventTracker *EventTracker
}

type UpdateRangeFilterAction struct {
Bn *birdnet.BirdNET
IncludedSpecies *[]string
Expand Down Expand Up @@ -140,6 +149,25 @@
return nil // return an error if the action fails
}

// Execute sends the note to the MQTT broker
func (a MqttAction) Execute(data interface{}) error {
if a.Settings.Realtime.MQTT.Topic == "" {
return errors.New("MQTT topic is not specified")

Check failure on line 155 in internal/analysis/processor/actions.go

View workflow job for this annotation

GitHub Actions / lint

undefined: errors (typecheck)
}
noteJson, err := json.Marshal(a.Note)
if err != nil {
log.Printf("error marshalling note to JSON: %s\n", err)
return err
}

err = a.MqttClient.Publish(a.Settings.Realtime.MQTT.Topic, string(noteJson))
if err != nil {
return err
}

return nil // return an error if the action fails
}
tphakala marked this conversation as resolved.
Show resolved Hide resolved

func (a UpdateRangeFilterAction) Execute(data interface{}) error {
today := time.Now().Truncate(24 * time.Hour)
if today.After(*a.SpeciesListUpdated) {
Expand Down
17 changes: 17 additions & 0 deletions internal/analysis/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/tphakala/birdnet-go/internal/birdweather"
"github.com/tphakala/birdnet-go/internal/conf"
"github.com/tphakala/birdnet-go/internal/datastore"
"github.com/tphakala/birdnet-go/internal/mqtt"
"github.com/tphakala/birdnet-go/internal/myaudio"
"github.com/tphakala/birdnet-go/internal/observation"
)
Expand All @@ -21,6 +22,7 @@ type Processor struct {
Ds datastore.Interface
Bn *birdnet.BirdNET
BwClient *birdweather.BwClient
MqttClient *mqtt.Client
EventTracker *EventTracker
DogBarkFilter DogBarkFilter
SpeciesConfig SpeciesConfig
Expand Down Expand Up @@ -87,6 +89,21 @@ func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, a
p.BwClient = birdweather.New(settings)
}

if settings.Realtime.MQTT.Enabled {
p.MqttClient = mqtt.New(settings)

// Connect to the MQTT broker in a separate goroutine to avoid blocking the main thread.
go func() {
log.Println("Connecting to MQTT broker")
err := p.MqttClient.Connect()
if err != nil {
log.Printf("Failed to connect to MQTT broker: %s", err)
} else {
log.Println("Successfully connected to MQTT broker")
}
}()
}

// Initialize included species list
today := time.Now().Truncate(24 * time.Hour)
*p.IncludedSpecies = bn.GetProbableSpecies()
Expand Down
3 changes: 3 additions & 0 deletions internal/analysis/processor/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func (p *Processor) getDefaultActions(detection Detections) []Action {
if p.Settings.Realtime.Birdweather.Enabled {
actions = append(actions, BirdWeatherAction{Settings: p.Settings, EventTracker: p.EventTracker, BwClient: p.BwClient, Note: detection.Note, pcmData: detection.pcmData3s})
}
if p.Settings.Realtime.MQTT.Enabled {
actions = append(actions, MqttAction{Settings: p.Settings, MqttClient: p.MqttClient, EventTracker: p.EventTracker, Note: detection.Note})
}
tphakala marked this conversation as resolved.
Show resolved Hide resolved
// Check if UpdateRangeFilterAction needs to be executed for the day
today := time.Now().Truncate(24 * time.Hour) // Current date with time set to midnight
if p.SpeciesListUpdated.Before(today) {
Expand Down
15 changes: 15 additions & 0 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ type Settings struct {
Url string // RTSP stream URL
Transport string // RTSP Transport Protocol
}

MQTT struct {
Enabled bool // true to enable MQTT
Broker string // MQTT (tcp://host:port)
Topic string // MQTT topic
Username string // MQTT username
Password string // MQTT password
}
tphakala marked this conversation as resolved.
Show resolved Hide resolved
}

WebServer struct {
Expand Down Expand Up @@ -238,6 +246,13 @@ realtime:
url: # RTSP stream URL
transport: tcp # RTSP Transport Protocol

mqtt:
enabled: false # true to enable MQTT
broker: tcp://localhost:1883 # MQTT (tcp://host:port)
topic: birdnet # MQTT topic
username: birdnet # MQTT username
password: secret # MQTT password

privacyfilter:
enabled: true

Expand Down
68 changes: 68 additions & 0 deletions internal/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package mqtt

import (
"errors"
"log"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/tphakala/birdnet-go/internal/conf"
)

type Client struct {
Settings *conf.Settings
internalClient mqtt.Client
}

func New(settings *conf.Settings) *Client {
return &Client{
Settings: settings,
}
tphakala marked this conversation as resolved.
Show resolved Hide resolved
}

// Connect to MQTT broker
func (c *Client) Connect() error {
opts := mqtt.NewClientOptions()
opts.AddBroker(c.Settings.Realtime.MQTT.Broker)
opts.SetClientID("birdnet-go")
opts.SetUsername(c.Settings.Realtime.MQTT.Username)
opts.SetPassword(c.Settings.Realtime.MQTT.Password)
opts.SetCleanSession(true)
opts.SetAutoReconnect(true)
opts.SetOnConnectHandler(c.onConnect)
opts.SetConnectionLostHandler(c.onConnectionLost)
opts.SetConnectRetry(true)

client := mqtt.NewClient(opts)
c.internalClient = client

// It will wait infinitely until the connection is established
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}

return nil
tphakala marked this conversation as resolved.
Show resolved Hide resolved
}

// Publish a message to a topic
func (c *Client) Publish(topic string, payload string) error {
if c.internalClient == nil {
return errors.New("MQTT client is not initialized")
}

if c.internalClient.IsConnected() == false {

Check failure on line 53 in internal/mqtt/mqtt.go

View workflow job for this annotation

GitHub Actions / lint

S1002: should omit comparison to bool constant, can be simplified to `!c.internalClient.IsConnected()` (gosimple)
return errors.New("MQTT client is not connected")
}

token := c.internalClient.Publish(topic, 0, false, payload)
token.Wait()
return token.Error()
Comment on lines +52 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add QoS (Quality of Service) and retain flag as configurable options for the Publish method to enhance flexibility.

- func (c *Client) Publish(topic string, payload string) error {
+ func (c *Client) Publish(topic string, payload string, qos byte, retain bool) error {
-   token := c.internalClient.Publish(topic, 0, false, payload)
+   token := c.internalClient.Publish(topic, qos, retain, payload)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func (c *Client) Publish(topic string, payload string) error {
if c.internalClient == nil {
return errors.New("MQTT client is not initialized")
}
if c.internalClient.IsConnected() == false {
return errors.New("MQTT client is not connected")
}
token := c.internalClient.Publish(topic, 0, false, payload)
token.Wait()
return token.Error()
func (c *Client) Publish(topic string, payload string, qos byte, retain bool) error {
if c.internalClient == nil {
return errors.New("MQTT client is not initialized")
}
if c.internalClient.IsConnected() == false {
return errors.New("MQTT client is not connected")
}
token := c.internalClient.Publish(topic, qos, retain, payload)
token.Wait()
return token.Error()

}

func (c *Client) onConnect(client mqtt.Client) {
log.Println("Connected to MQTT broker")
}

func (c *Client) onConnectionLost(client mqtt.Client, err error) {
log.Println("Connection to MQTT broker lost")
}
tphakala marked this conversation as resolved.
Show resolved Hide resolved
Loading