Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #75: allow labels to be hashed into topic partitions #76

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ package main

import (
"fmt"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"gopkg.in/yaml.v2"
"os"
"strings"
"text/template"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"gopkg.in/yaml.v2"

"github.com/sirupsen/logrus"
)

var (
kafkaBrokerList = "kafka:9092"
kafkaTopic = "metrics"
kafkaPartitionLabels []string
kafkaMetadataTimeout = time.Second * 10
kafkaMetadataInterval = time.Minute * 5
topicTemplate *template.Template
match = make(map[string]*dto.MetricFamily, 0)
basicauth = false
Expand Down Expand Up @@ -63,6 +68,23 @@ func init() {
kafkaTopic = value
}

if value := os.Getenv("KAFKA_PARTITION_LABELS"); value != "" {
kafkaPartitionLabels = strings.Split(value, ",")
}

if value := os.Getenv("KAFKA_METADATA_TIMEOUT"); value != "" {
d, err := time.ParseDuration(value)
if err != nil {
logrus.WithError(err).Errorf("KAFKA_METADATA_TIMEOUT parsing failed, using default")
} else {
if d < 0 {
logrus.Errorf("KAFKA_METADATA_TIMEOUT does not support negative timeout")
} else {
kafkaMetadataTimeout = d
}
}
}

if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" {
basicauth = true
basicauthUsername = value
Expand Down
32 changes: 28 additions & 4 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package main

import (
"fmt"
"hash/fnv"
"io/ioutil"
"net/http"
"strings"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -62,11 +64,16 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
return
}

for topic, metrics := range metricsPerTopic {
t := topic
for topicAndHashKey, metrics := range metricsPerTopic {

topic, partitionID, err := getPartitionAndTopic(topicAndHashKey)
if err != nil {
continue
}

part := kafka.TopicPartition{
Partition: kafka.PartitionAny,
Topic: &t,
Partition: partitionID,
Topic: &topic,
}
for _, metric := range metrics {
objectsWritten.Add(float64(1))
Expand All @@ -87,3 +94,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin

}
}

func getPartitionAndTopic(topic string) (string, int32, error) {
parts := strings.Split(topic, "|")

if len(parts) == 1 {
return parts[0], kafka.PartitionAny, nil
}
h := fnv.New32a()
h.Write([]byte(parts[1]))

v, ok := topicPartitionCount.Load(parts[0])
if !ok {
logrus.WithField("topic", parts[0]).Error("did not find metadata requested topic")
return topic, kafka.PartitionAny, fmt.Errorf("could not")
}
return parts[0], int32(h.Sum32() % uint32(v.(int))), nil
}
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
Expand All @@ -27,6 +28,9 @@ import (
func main() {
logrus.Info("creating kafka producer")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kafkaConfig := kafka.ConfigMap{
"bootstrap.servers": kafkaBrokerList,
"compression.codec": kafkaCompression,
Expand Down Expand Up @@ -68,6 +72,12 @@ func main() {
logrus.WithError(err).Fatal("couldn't create kafka producer")
}

if kafkaPartitionLabels != nil {
if err := syncTopicMetadata(ctx, producer); err != nil {
logrus.WithError(err).Fatal("couldn't fetch topic metadata")
}
}

r := gin.New()

r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery())
Expand Down
14 changes: 13 additions & 1 deletion serializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,22 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) {
}

func topic(labels map[string]string) string {
var buf bytes.Buffer
var buf, buf2 bytes.Buffer
if err := topicTemplate.Execute(&buf, labels); err != nil {
return ""
}
for _, s := range kafkaPartitionLabels {
v, ok := labels[s]
if ok {
if _, err := buf2.WriteString(v); err != nil {
return ""
}
}
}
if buf2.Len() > 0 {
buf.WriteString("|")
buf.WriteString(buf2.String())
}
return buf.String()
}

Expand Down
47 changes: 47 additions & 0 deletions topic_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"context"
"math"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/sirupsen/logrus"
)

var topicPartitionCount sync.Map

type metaDataFetcher interface {
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
}

func syncTopicMetadata(ctx context.Context, producer metaDataFetcher) error {

if err := processMetadata(producer); err != nil {
return err
}
go func() {
select {
case <-ctx.Done():
return

case <-time.After(kafkaMetadataInterval):
if err := processMetadata(producer); err != nil {
logrus.WithError(err).Error("could not fetch topic metadata")
}
}
}()
return nil
}

func processMetadata(producer metaDataFetcher) error {
metadata, err := producer.GetMetadata(nil, true, int(math.Ceil(kafkaMetadataTimeout.Seconds())))
if err != nil {
return err
}
for name, topic := range metadata.Topics {
topicPartitionCount.Store(name, len(topic.Partitions))
}
return nil
}