diff --git a/.gitignore b/.gitignore index e5c14a88..98be3730 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out +/vendor /prometheus-kafka-adapter diff --git a/config.go b/config.go index 08cb2cd7..c818f1ed 100644 --- a/config.go +++ b/config.go @@ -16,12 +16,14 @@ package main import ( "fmt" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "gopkg.in/yaml.v2" "os" "strings" "text/template" + "time" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "gopkg.in/yaml.v2" "github.com/sirupsen/logrus" ) @@ -29,6 +31,9 @@ import ( var ( kafkaBrokerList = "kafka:9092" kafkaTopic = "metrics" + kafkaPartitionLabels []string + kafkaMetadataTimeout = time.Second * 10 + kafkaMetadataInterval = time.Minute * 5 topicTemplate *template.Template match = make(map[string]*dto.MetricFamily, 0) basicauth = false @@ -63,6 +68,23 @@ func init() { kafkaTopic = value } + if value := os.Getenv("KAFKA_PARTITION_LABELS"); value != "" { + kafkaPartitionLabels = strings.Split(value, ",") + } + + if value := os.Getenv("KAFKA_METADATA_TIMEOUT"); value != "" { + d, err := time.ParseDuration(value) + if err != nil { + logrus.WithError(err).Errorf("KAFKA_METADATA_TIMEOUT parsing failed, using default") + } else { + if d < 0 { + logrus.Errorf("KAFKA_METADATA_TIMEOUT does not support negative timeout") + } else { + kafkaMetadataTimeout = d + } + } + } + if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" { basicauth = true basicauthUsername = value diff --git a/go.mod b/go.mod index bcc15030..103a788a 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/confluentinc/confluent-kafka-go v1.6.1 + github.com/confluentinc/confluent-kafka-go v1.7.0 github.com/gin-gonic/contrib v0.0.0-20191209060500-d6e26eeaa607 github.com/gin-gonic/gin v1.6.3 github.com/gogo/protobuf v1.3.1 diff --git a/go.sum b/go.sum index 1c766e98..ceeb5548 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/confluentinc/confluent-kafka-go v1.6.1 h1:YxM/UtMQ2vgJX2gIgeJFUD0ANQYTEvfo4Cs4qKUlmGE= github.com/confluentinc/confluent-kafka-go v1.6.1/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= +github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM= +github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/handlers.go b/handlers.go index f382961d..52f7105f 100644 --- a/handlers.go +++ b/handlers.go @@ -15,8 +15,10 @@ package main import ( + "hash/fnv" "io/ioutil" "net/http" + "strings" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -62,10 +64,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin } for topic, metrics := range metricsPerTopic { - t := topic - part := kafka.TopicPartition{ - Partition: kafka.PartitionAny, - Topic: &t, + parts := strings.Split(topic, "|") + var part kafka.TopicPartition + + if len(parts) == 1 { + part = kafka.TopicPartition{ + Partition: kafka.PartitionAny, + Topic: &parts[0], + } + } else { + + part = kafka.TopicPartition{ + Partition: getPartition(parts[0], parts[1]), + Topic: &parts[0], + } } for _, metric := range metrics { err := producer.Produce(&kafka.Message{ @@ -83,3 +95,15 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin } } + +func getPartition(topic string, hashKey string) int32 { + h := fnv.New32a() + h.Write([]byte(hashKey)) + + v, ok := topicPartitionCount.Load(topic) + if !ok { + logrus.WithField("topic", topic).Error("did not find metadata requested topic") + return 0 + } + return int32(h.Sum32() % uint32(v.(int))) +} diff --git a/main.go b/main.go index 1e80666d..f05ec132 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ package main import ( + "context" "time" "github.com/confluentinc/confluent-kafka-go/kafka" @@ -28,6 +29,9 @@ import ( func main() { log.Info("creating kafka producer") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + kafkaConfig := kafka.ConfigMap{ "bootstrap.servers": kafkaBrokerList, "compression.codec": kafkaCompression, @@ -69,6 +73,12 @@ func main() { logrus.WithError(err).Fatal("couldn't create kafka producer") } + if kafkaPartitionLabels != nil { + if err := syncTopicMetadata(ctx, producer); err != nil { + logrus.WithError(err).Fatal("couldn't fetch topic metadata") + } + } + r := gin.New() r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery()) diff --git a/serializers.go b/serializers.go index c89ab58b..154263ea 100644 --- a/serializers.go +++ b/serializers.go @@ -113,10 +113,22 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) { } func topic(labels map[string]string) string { - var buf bytes.Buffer + var buf, buf2 bytes.Buffer if err := topicTemplate.Execute(&buf, labels); err != nil { return "" } + for _, s := range kafkaPartitionLabels { + v, ok := labels[s] + if ok { + if _, err := buf2.WriteString(v); err != nil { + return "" + } + } + } + if buf2.Len() > 0 { + buf.WriteString("|") + buf.WriteString(buf2.String()) + } return buf.String() } diff --git a/topic_metadata.go b/topic_metadata.go new file mode 100644 index 00000000..adc7a922 --- /dev/null +++ b/topic_metadata.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "math" + "sync" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/sirupsen/logrus" +) + +var topicPartitionCount sync.Map + +type metaDataFetcher interface { + GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) +} + +func syncTopicMetadata(ctx context.Context, producer metaDataFetcher) error { + + if err := processMetadata(producer); err != nil { + return err + } + go func() { + select { + case <-ctx.Done(): + return + + case <-time.After(kafkaMetadataInterval): + if err := processMetadata(producer); err != nil { + logrus.WithError(err).Error("could not fetch topic metadata") + } + } + }() + return nil +} + +func processMetadata(producer metaDataFetcher) error { + metadata, err := producer.GetMetadata(nil, true, int(math.Ceil(kafkaMetadataTimeout.Seconds()))) + if err != nil { + return err + } + for name, topic := range metadata.Topics { + topicPartitionCount.Store(name, len(topic.Partitions)) + } + return nil +}