Skip to content

Commit

Permalink
add metrics-aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
asiyani committed Dec 27, 2024
1 parent c536b37 commit 949dd1b
Show file tree
Hide file tree
Showing 4 changed files with 517 additions and 0 deletions.
19 changes: 19 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module github.com/utilitywarehouse/metrics-aggregator

go 1.23.0

require (
github.com/google/go-cmp v0.6.0
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
google.golang.org/protobuf v1.34.2
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
golang.org/x/sys v0.21.0 // indirect
)
22 changes: 22 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
216 changes: 216 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package main

import (
"flag"
"fmt"
"io"
"log/slog"
"maps"
"net/http"
"os"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)

var (
log *slog.Logger

pcDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "metrics_aggregation_duration_seconds",
Help: "Duration of a collection",
},
[]string{"remote"},
)
)

type RemoteAggregator struct {
url string
withOutLabels []string

addPrefix string
addLabels map[string]string
}

func (ra *RemoteAggregator) Describe(ch chan<- *prometheus.Desc) {
// No static descriptions, metrics are dynamic.
}

func (ra *RemoteAggregator) Collect(ch chan<- prometheus.Metric) {
defer updateRunTime(ra.url, time.Now())

resp, err := http.Get(ra.url)
if err != nil {
log.Error("error fetching metrics", "err", err)
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Error("unexpected status code", "code", resp.StatusCode)
return
}

ra.decodeAndSend(resp.Body, ch)
}

func (ra *RemoteAggregator) decodeAndSend(reader io.Reader, ch chan<- prometheus.Metric) {
decoder := expfmt.NewDecoder(reader, expfmt.NewFormat(expfmt.TypeTextPlain))
var metricFamily dto.MetricFamily

for {
err := decoder.Decode(&metricFamily)
if err == io.EOF {
break
}
if err != nil {
log.Error("error decoding metric family", "err", err)
break
}

ra.aggregateAndSend(&metricFamily, ch)
}
}

func (ra *RemoteAggregator) aggregateAndSend(metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric) {

aggregatedLabels, aggregatedValue := aggregateMetrics(metricFamily.Metric, ra.withOutLabels)

for key, value := range aggregatedValue {

var promMetric prometheus.Metric
var err error

// modify name and labels if required
name := metricFamily.GetName()
if ra.addPrefix != "" {
name = ra.addPrefix + name
}

maps.Copy(aggregatedLabels[key], ra.addLabels)

desc := prometheus.NewDesc(name, metricFamily.GetHelp(), nil, aggregatedLabels[key])

switch metricFamily.GetType() {
case dto.MetricType_GAUGE:
promMetric, err = prometheus.NewConstMetric(desc, prometheus.GaugeValue, value)
case dto.MetricType_COUNTER:
promMetric, err = prometheus.NewConstMetric(desc, prometheus.CounterValue, value)
default:
promMetric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, value)
}

if err != nil {
log.Error("error creating Prometheus metric", "err", err)
continue
}

ch <- promMetric
}
}

// aggregateMetrics returns aggregated values and label pairs map on same key
func aggregateMetrics(metrics []*dto.Metric, withOutLabels []string) (map[string]map[string]string, map[string]float64) {
ignoredSet := make(map[string]struct{}, len(withOutLabels))
for _, label := range withOutLabels {
ignoredSet[label] = struct{}{}
}

aggregatedValue := make(map[string]float64)
aggregatedLabels := make(map[string]map[string]string)

for _, metric := range metrics {
filteredLabels := make(map[string]string)
key := ""
for _, label := range metric.Label {
if _, found := ignoredSet[label.GetName()]; !found {
filteredLabels[label.GetName()] = label.GetValue()
key += label.GetName() + "=" + label.GetValue() + ","
}
}
aggregatedLabels[key] = filteredLabels

if metric.GetGauge() != nil {
aggregatedValue[key] += metric.GetGauge().GetValue()
} else if metric.GetCounter() != nil {
aggregatedValue[key] += metric.GetCounter().GetValue()
}
}
return aggregatedLabels, aggregatedValue
}

func updateRunTime(remoteURL string, start time.Time) {
pcDuration.WithLabelValues(remoteURL).Observe(time.Since(start).Seconds())
}

func usage() {
fmt.Fprintf(os.Stderr, "NAME:\n")
fmt.Fprintf(os.Stderr, "\tmetrics-aggregator\n")

fmt.Fprintf(os.Stderr, "DESCRIPTION:\n")
fmt.Fprintf(os.Stderr, "\tA metrics aggregator to aggregate metrics without given labels.\n")

fmt.Fprintf(os.Stderr, "OPTIONS:\n")
fmt.Fprintf(os.Stderr, "\t--listen-address (default: :9000)\n")
fmt.Fprintf(os.Stderr, "\t--metrics-path (default: /metrics)\n")
fmt.Fprintf(os.Stderr, "\t--target-url (default: 'http://localhost:8080/metrics')\n")
fmt.Fprintf(os.Stderr, "\t--aggregate-without-label (default: '')\n")
fmt.Fprintf(os.Stderr, "\t--add-prefix (default: '')\n")
fmt.Fprintf(os.Stderr, "\t--add-labels (default: '')\n")
os.Exit(2)
}

func main() {
port := flag.String("listen-address", ":9000", "address the metrics server binds to")
metricPath := flag.String("metrics-path", "/metrics", "path under which to expose metrics")
targetURL := flag.String("target-url", "http://localhost:8090/metrics", "remote target url to scrap metrics")
withOutLabels := flag.String("aggregate-without-labels", "", "comma separated names of the labels which are removed from the aggregated metrics")
addPrefix := flag.String("add-prefix", "", "given prefix will be added to all metrics name")
addLabels := flag.String("add-labels", "", "comma separated list of key=value pairs which will be added to all metrics")

flag.Usage = usage
flag.Parse()

log = slog.New(slog.NewTextHandler(
os.Stderr,
&slog.HandlerOptions{
Level: slog.LevelInfo,
},
))

log = slog.Default()

if withOutLabels == nil || *withOutLabels == "" {
log.Error("'aggregate-without-labels' is required!")
os.Exit(1)
}

collector := &RemoteAggregator{
url: *targetURL,
withOutLabels: strings.Split(*withOutLabels, ","),
addPrefix: *addPrefix,
addLabels: make(map[string]string),
}

for _, pair := range strings.Split(*addLabels, ",") {
kv := strings.Split(pair, "=")
collector.addLabels[kv[0]] = kv[1]
}

reg := prometheus.NewPedanticRegistry()

reg.MustRegister(collector, pcDuration)

log.Info("starting server", "port", *port, "metrics", *metricPath)

http.Handle(*metricPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))

if err := http.ListenAndServe(*port, nil); err != nil {
log.Error("error starting HTTP server", "err", err)
os.Exit(1)
}
}
Loading

0 comments on commit 949dd1b

Please sign in to comment.