-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkafka.go
152 lines (137 loc) · 3.89 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package eevee
import (
"context"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// KafkaConnectionConfig configures the connection to Kafka and defines what topics are accessed
type KafkaConnectionConfig struct {
Server string
Topics []string
ClientID string
}
// KafkaConnection manages and abstracts the connection to Kafka
type KafkaConnection struct {
consumer *kafka.Consumer
producer *kafka.Producer
config KafkaConnectionConfig
in chan Payload
out chan Payload
routeStatus chan RouteStatus
}
// NewKafkaConnection returns a new object connected to Kafka with specific topics
func NewKafkaConnection(config KafkaConnectionConfig) (*KafkaConnection, error) {
// config link: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": config.Server,
"group.id": config.ClientID,
"session.timeout.ms": 6000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}})
if err != nil {
log.Fatalf("Kafka failed to create consumer: %s", err)
return nil, err
}
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": config.Server})
if err != nil {
log.Fatalf("Kafka failed to create producer: %s", err)
return nil, err
}
return &KafkaConnection{
consumer: c,
producer: p,
config: config,
in: make(chan Payload),
out: make(chan Payload),
routeStatus: make(chan RouteStatus),
}, nil
}
// Start begins sending and receiving Kafka messages. The context is used to stop the Kafka clients
func (kc *KafkaConnection) Start(ctx context.Context) {
err := kc.consumer.SubscribeTopics(kc.config.Topics, nil)
if err != nil {
log.Fatal("Kakfa could not subscribe to topics")
}
go kc.receiveMessages(ctx)
go kc.sendMessages(ctx)
go kc.ignoreRouteStatus(ctx)
log.Info("Kafka client has started")
}
// In recieves the message subscribe from Kafka
func (kc *KafkaConnection) In() <-chan Payload {
return kc.in
}
// Out sends messages out to Kafka
func (kc *KafkaConnection) Out() chan<- Payload {
return kc.out
}
// Errors from eevee
func (kc *KafkaConnection) RouteStatus() chan<- RouteStatus {
return kc.routeStatus
}
func (kc *KafkaConnection) receiveMessages(ctx context.Context) {
loop:
for {
select {
case <-ctx.Done():
log.Info("Stopping Kafka consumer")
kc.consumer.Close()
close(kc.in)
break loop
case event := <-kc.consumer.Events():
switch e := event.(type) {
case *kafka.Message:
if e.TopicPartition.Topic == nil {
log.Criticalf("Topic was nil?")
continue
}
log.Debug("Kafka received message")
kc.in <- NewPayload(e.Value, *e.TopicPartition.Topic)
case kafka.Error:
log.Errorf("Kafka error: %v", e)
case kafka.AssignedPartitions:
log.Infof("Kafka parition: %v", e)
kc.consumer.Assign(e.Partitions)
case kafka.RevokedPartitions:
log.Infof("Kafka parition: %v", e)
kc.consumer.Unassign()
case kafka.PartitionEOF:
// log.Infof("Kafka parition: %v", e)
}
}
}
}
func (kc *KafkaConnection) sendMessages(ctx context.Context) {
loop:
for {
select {
case <-ctx.Done():
log.Info("Stopping Kafka publisher")
kc.producer.Close()
close(kc.out)
break loop
case payload := <-kc.out:
log.Debug("Kafka sending message")
kc.producer.ProduceChannel() <- kc.createKafkaMessage(payload)
}
}
}
func (kc *KafkaConnection) createKafkaMessage(payload Payload) *kafka.Message {
return &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &payload.Topic,
Partition: kafka.PartitionAny,
},
Value: payload.RawMessage,
}
}
func (kc *KafkaConnection) ignoreRouteStatus(ctx context.Context) {
loop:
for {
select {
case <-ctx.Done():
break loop
case <-kc.routeStatus:
}
}
}