-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmqtt.go
113 lines (98 loc) · 2.74 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package eevee
import (
"context"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// MqttConnectionConfig configures the connection to MQTT and defines what topics are accessed
type MqttConnectionConfig struct {
Server string
Topics []string
ClientID string
Qos byte
Username string
Password string
}
// MqttConnection manages and abstracts the connection Kafka
type MqttConnection struct {
client mqtt.Client
config MqttConnectionConfig
in chan Payload
out chan Payload
routeStatus chan RouteStatus
}
// NewMqttConnection returns a new object connected to MQTT with specific topics
func NewMqttConnection(config MqttConnectionConfig) (*MqttConnection, error) {
options := mqtt.NewClientOptions().SetClientID(config.ClientID).AddBroker(config.Server).SetUsername(config.Username).SetPassword(config.Password)
client := mqtt.NewClient(options)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return &MqttConnection{
client: client,
config: config,
in: make(chan Payload),
out: make(chan Payload),
routeStatus: make(chan RouteStatus),
}, nil
}
// Start begins sending and receiving MQTT messages. The context is used to stop the MQTT client
func (mc *MqttConnection) Start(ctx context.Context) {
receiveMessages := func(client mqtt.Client, message mqtt.Message) {
log.Debug("MQTT received message")
mc.in <- NewPayload(message.Payload(), message.Topic())
}
topics := addQOSToTopics(mc.config.Topics, mc.config.Qos)
mc.client.SubscribeMultiple(topics, receiveMessages)
go mc.stop(ctx)
go mc.sendMessages(ctx)
go mc.ignoreRouteStatus(ctx)
log.Info("MQTT client has started")
}
// In receives the messages subscribed from MQTT
func (mc *MqttConnection) In() <-chan Payload {
return mc.in
}
// Out sends messages out to MQTT
func (mc *MqttConnection) Out() chan<- Payload {
return mc.out
}
// Errors from eevee
func (mc *MqttConnection) RouteStatus() chan<- RouteStatus {
return mc.routeStatus
}
func (mc *MqttConnection) sendMessages(ctx context.Context) {
loop:
for {
select {
case <-ctx.Done():
break loop
case payload := <-mc.out:
log.Debug("MQTT sending message")
mc.client.Publish(payload.Topic, mc.config.Qos, false, payload.RawMessage)
}
}
}
func (mc *MqttConnection) stop(ctx context.Context) {
<-ctx.Done()
log.Info("Stopping MQTT Client")
mc.client.Disconnect(100)
close(mc.in)
close(mc.out)
}
func addQOSToTopics(topics []string, qos byte) map[string]byte {
m := map[string]byte{}
for _, topic := range topics {
m[topic] = qos
}
return m
}
func (mc *MqttConnection) ignoreRouteStatus(ctx context.Context) {
loop:
for {
select {
case <-ctx.Done():
break loop
case <-mc.routeStatus:
}
}
}