diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..bb8079a --- /dev/null +++ b/go.mod @@ -0,0 +1,19 @@ +module github.com/utilitywarehouse/metrics-aggregator + +go 1.23.0 + +require ( + github.com/google/go-cmp v0.6.0 + github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/client_model v0.6.1 + github.com/prometheus/common v0.55.0 + google.golang.org/protobuf v1.34.2 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + golang.org/x/sys v0.21.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..73dcdb3 --- /dev/null +++ b/go.sum @@ -0,0 +1,22 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/main.go b/main.go new file mode 100644 index 0000000..7d2831a --- /dev/null +++ b/main.go @@ -0,0 +1,216 @@ +package main + +import ( + "flag" + "fmt" + "io" + "log/slog" + "maps" + "net/http" + "os" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +var ( + log *slog.Logger + + pcDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "metrics_aggregation_duration_seconds", + Help: "Duration of a collection", + }, + []string{"remote"}, + ) +) + +type RemoteAggregator struct { + url string + withOutLabels []string + + addPrefix string + addLabels map[string]string +} + +func (ra *RemoteAggregator) Describe(ch chan<- *prometheus.Desc) { + // No static descriptions, metrics are dynamic. +} + +func (ra *RemoteAggregator) Collect(ch chan<- prometheus.Metric) { + defer updateRunTime(ra.url, time.Now()) + + resp, err := http.Get(ra.url) + if err != nil { + log.Error("error fetching metrics", "err", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Error("unexpected status code", "code", resp.StatusCode) + return + } + + ra.decodeAndSend(resp.Body, ch) +} + +func (ra *RemoteAggregator) decodeAndSend(reader io.Reader, ch chan<- prometheus.Metric) { + decoder := expfmt.NewDecoder(reader, expfmt.NewFormat(expfmt.TypeTextPlain)) + var metricFamily dto.MetricFamily + + for { + err := decoder.Decode(&metricFamily) + if err == io.EOF { + break + } + if err != nil { + log.Error("error decoding metric family", "err", err) + break + } + + ra.aggregateAndSend(&metricFamily, ch) + } +} + +func (ra *RemoteAggregator) aggregateAndSend(metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric) { + + aggregatedLabels, aggregatedValue := aggregateMetrics(metricFamily.Metric, ra.withOutLabels) + + for key, value := range aggregatedValue { + + var promMetric prometheus.Metric + var err error + + // modify name and labels if required + name := metricFamily.GetName() + if ra.addPrefix != "" { + name = ra.addPrefix + name + } + + maps.Copy(aggregatedLabels[key], ra.addLabels) + + desc := prometheus.NewDesc(name, metricFamily.GetHelp(), nil, aggregatedLabels[key]) + + switch metricFamily.GetType() { + case dto.MetricType_GAUGE: + promMetric, err = prometheus.NewConstMetric(desc, prometheus.GaugeValue, value) + case dto.MetricType_COUNTER: + promMetric, err = prometheus.NewConstMetric(desc, prometheus.CounterValue, value) + default: + promMetric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, value) + } + + if err != nil { + log.Error("error creating Prometheus metric", "err", err) + continue + } + + ch <- promMetric + } +} + +// aggregateMetrics returns aggregated values and label pairs map on same key +func aggregateMetrics(metrics []*dto.Metric, withOutLabels []string) (map[string]map[string]string, map[string]float64) { + ignoredSet := make(map[string]struct{}, len(withOutLabels)) + for _, label := range withOutLabels { + ignoredSet[label] = struct{}{} + } + + aggregatedValue := make(map[string]float64) + aggregatedLabels := make(map[string]map[string]string) + + for _, metric := range metrics { + filteredLabels := make(map[string]string) + key := "" + for _, label := range metric.Label { + if _, found := ignoredSet[label.GetName()]; !found { + filteredLabels[label.GetName()] = label.GetValue() + key += label.GetName() + "=" + label.GetValue() + "," + } + } + aggregatedLabels[key] = filteredLabels + + if metric.GetGauge() != nil { + aggregatedValue[key] += metric.GetGauge().GetValue() + } else if metric.GetCounter() != nil { + aggregatedValue[key] += metric.GetCounter().GetValue() + } + } + return aggregatedLabels, aggregatedValue +} + +func updateRunTime(remoteURL string, start time.Time) { + pcDuration.WithLabelValues(remoteURL).Observe(time.Since(start).Seconds()) +} + +func usage() { + fmt.Fprintf(os.Stderr, "NAME:\n") + fmt.Fprintf(os.Stderr, "\tmetrics-aggregator\n") + + fmt.Fprintf(os.Stderr, "DESCRIPTION:\n") + fmt.Fprintf(os.Stderr, "\tA metrics aggregator to aggregate metrics without given labels.\n") + + fmt.Fprintf(os.Stderr, "OPTIONS:\n") + fmt.Fprintf(os.Stderr, "\t--listen-address (default: :9000)\n") + fmt.Fprintf(os.Stderr, "\t--metrics-path (default: /metrics)\n") + fmt.Fprintf(os.Stderr, "\t--target-url (default: 'http://localhost:8080/metrics')\n") + fmt.Fprintf(os.Stderr, "\t--aggregate-without-label (default: '')\n") + fmt.Fprintf(os.Stderr, "\t--add-prefix (default: '')\n") + fmt.Fprintf(os.Stderr, "\t--add-labels (default: '')\n") + os.Exit(2) +} + +func main() { + port := flag.String("listen-address", ":9000", "address the metrics server binds to") + metricPath := flag.String("metrics-path", "/metrics", "path under which to expose metrics") + targetURL := flag.String("target-url", "http://localhost:8090/metrics", "remote target url to scrap metrics") + withOutLabels := flag.String("aggregate-without-labels", "", "comma separated names of the labels which are removed from the aggregated metrics") + addPrefix := flag.String("add-prefix", "", "given prefix will be added to all metrics name") + addLabels := flag.String("add-labels", "", "comma separated list of key=value pairs which will be added to all metrics") + + flag.Usage = usage + flag.Parse() + + log = slog.New(slog.NewTextHandler( + os.Stderr, + &slog.HandlerOptions{ + Level: slog.LevelInfo, + }, + )) + + log = slog.Default() + + if withOutLabels == nil || *withOutLabels == "" { + log.Error("'aggregate-without-labels' is required!") + os.Exit(1) + } + + collector := &RemoteAggregator{ + url: *targetURL, + withOutLabels: strings.Split(*withOutLabels, ","), + addPrefix: *addPrefix, + addLabels: make(map[string]string), + } + + for _, pair := range strings.Split(*addLabels, ",") { + kv := strings.Split(pair, "=") + collector.addLabels[kv[0]] = kv[1] + } + + reg := prometheus.NewPedanticRegistry() + + reg.MustRegister(collector, pcDuration) + + log.Info("starting server", "port", *port, "metrics", *metricPath) + + http.Handle(*metricPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + + if err := http.ListenAndServe(*port, nil); err != nil { + log.Error("error starting HTTP server", "err", err) + os.Exit(1) + } +} diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..42d4b6c --- /dev/null +++ b/main_test.go @@ -0,0 +1,260 @@ +package main + +import ( + "bytes" + "fmt" + "log/slog" + "net/http" + "net/http/httptest" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "google.golang.org/protobuf/proto" +) + +func pointer(v string) *string { return &v } + +func TestAggregateMetricss(t *testing.T) { + metrics := []*dto.Metric{ + { + Label: []*dto.LabelPair{ + {Name: pointer("l1"), Value: pointer("v1")}, + }, + Counter: &dto.Counter{Value: proto.Float64(10)}, + }, + { + Label: []*dto.LabelPair{ + {Name: pointer("l1"), Value: pointer("v1")}, + {Name: pointer("l2"), Value: pointer("v2")}, + }, + Counter: &dto.Counter{Value: proto.Float64(20)}, + }, + { + Label: []*dto.LabelPair{ + {Name: pointer("l1"), Value: pointer("v1")}, + {Name: pointer("l2"), Value: pointer("v2")}, + {Name: pointer("l3"), Value: pointer("v3")}, + }, + Counter: &dto.Counter{Value: proto.Float64(30)}, + }, + } + + tests := []struct { + name string + withOutLabels []string + wantAggregatedLabels map[string]map[string]string + wantAggregatedValues map[string]float64 + }{ + { + "no-matching-labels", + []string{"l4"}, + map[string]map[string]string{ + "l1=v1,": {"l1": "v1"}, + "l1=v1,l2=v2,": {"l1": "v1", "l2": "v2"}, + "l1=v1,l2=v2,l3=v3,": {"l1": "v1", "l2": "v2", "l3": "v3"}, + }, + map[string]float64{ + "l1=v1,": 10, + "l1=v1,l2=v2,": 20, + "l1=v1,l2=v2,l3=v3,": 30, + }, + }, + { + "matching-one", + []string{"l3"}, + map[string]map[string]string{ + "l1=v1,": {"l1": "v1"}, + "l1=v1,l2=v2,": {"l1": "v1", "l2": "v2"}, + }, + map[string]float64{ + "l1=v1,": 10, + "l1=v1,l2=v2,": 50, + }, + }, + { + "matching-two", + []string{"l2"}, + map[string]map[string]string{ + "l1=v1,": {"l1": "v1"}, + "l1=v1,l3=v3,": {"l1": "v1", "l3": "v3"}, + }, + map[string]float64{ + "l1=v1,": 30, + "l1=v1,l3=v3,": 30, + }, + }, + { + "matching-all", + []string{"l1"}, + map[string]map[string]string{ + "": {}, + "l2=v2,": {"l2": "v2"}, + "l2=v2,l3=v3,": {"l2": "v2", "l3": "v3"}, + }, + map[string]float64{ + "": 10, + "l2=v2,": 20, + "l2=v2,l3=v3,": 30, + }, + }, + { + "multiple-labels", + []string{"l2", "l3"}, + map[string]map[string]string{ + "l1=v1,": {"l1": "v1"}, + }, + map[string]float64{ + "l1=v1,": 60, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + aggregatedLabels, aggregatedValues := aggregateMetrics(metrics, tt.withOutLabels) + + if diff := cmp.Diff(aggregatedLabels, tt.wantAggregatedLabels, cmpopts.IgnoreUnexported(dto.LabelPair{})); diff != "" { + t.Errorf("filteredLabels mismatch (-want +got):\n%s", diff) + } + + if diff := cmp.Diff(aggregatedValues, tt.wantAggregatedValues); diff != "" { + t.Errorf("aggregatedValues mismatch (-want +got):\n%s", diff) + } + + }) + } +} + +func Test_Collector(t *testing.T) { + log = slog.Default() + + originalMetrics := ` +# HELP component_received_events_total component_received_events_total +# TYPE component_received_events_total counter +component_received_events_total{l1="v1"} 10 1735054883000 +component_received_events_total{l1="v1",l2="v2"} 20 1735054879000 +component_received_events_total{l1="v1",l2="v2",l3="v3"} 30 1735054866000 +# HELP component_received_event_bytes_total component_received_event_bytes_total +# TYPE component_received_event_bytes_total counter +component_received_event_bytes_total{l1="v1"} 1000 1735054883000 +component_received_event_bytes_total{l1="v1",l2="v2"} 2000 1735054879000 +component_received_event_bytes_total{l1="v1",l2="v2",l3="v3"} 3000 1735054866000 +` + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, originalMetrics) + })) + defer ts.Close() + + tests := []struct { + name string + withOutLabels []string + want string + }{ + { + "no-matching-labels", + []string{"l4"}, + `# HELP component_received_event_bytes_total component_received_event_bytes_total +# TYPE component_received_event_bytes_total counter +component_received_event_bytes_total{l1="v1"} 1000 +component_received_event_bytes_total{l1="v1",l2="v2"} 2000 +component_received_event_bytes_total{l1="v1",l2="v2",l3="v3"} 3000 +# HELP component_received_events_total component_received_events_total +# TYPE component_received_events_total counter +component_received_events_total{l1="v1"} 10 +component_received_events_total{l1="v1",l2="v2"} 20 +component_received_events_total{l1="v1",l2="v2",l3="v3"} 30 +`, + }, + { + "matching-one", + []string{"l3"}, + `# HELP component_received_event_bytes_total component_received_event_bytes_total +# TYPE component_received_event_bytes_total counter +component_received_event_bytes_total{l1="v1"} 1000 +component_received_event_bytes_total{l1="v1",l2="v2"} 5000 +# HELP component_received_events_total component_received_events_total +# TYPE component_received_events_total counter +component_received_events_total{l1="v1"} 10 +component_received_events_total{l1="v1",l2="v2"} 50 +`, + }, + { + "matching-two", + []string{"l2"}, + `# HELP component_received_event_bytes_total component_received_event_bytes_total +# TYPE component_received_event_bytes_total counter +component_received_event_bytes_total{l1="v1"} 3000 +component_received_event_bytes_total{l1="v1",l3="v3"} 3000 +# HELP component_received_events_total component_received_events_total +# TYPE component_received_events_total counter +component_received_events_total{l1="v1"} 30 +component_received_events_total{l1="v1",l3="v3"} 30 +`, + }, + { + "matching-all", + []string{"l1"}, + `# HELP component_received_event_bytes_total component_received_event_bytes_total +# TYPE component_received_event_bytes_total counter +component_received_event_bytes_total 1000 +component_received_event_bytes_total{l2="v2"} 2000 +component_received_event_bytes_total{l2="v2",l3="v3"} 3000 +# HELP component_received_events_total component_received_events_total +# TYPE component_received_events_total counter +component_received_events_total 10 +component_received_events_total{l2="v2"} 20 +component_received_events_total{l2="v2",l3="v3"} 30 +`, + }, + { + "multiple-labels", + []string{"l2", "l3"}, + `# HELP component_received_event_bytes_total component_received_event_bytes_total +# TYPE component_received_event_bytes_total counter +component_received_event_bytes_total{l1="v1"} 6000 +# HELP component_received_events_total component_received_events_total +# TYPE component_received_events_total counter +component_received_events_total{l1="v1"} 60 +`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + collector := &RemoteAggregator{ + url: ts.URL, + withOutLabels: tt.withOutLabels, + } + + reg := prometheus.NewPedanticRegistry() + reg.MustRegister(collector) + + gathering, err := reg.Gather() + if err != nil { + t.Errorf("JSONCollector.process() error = %v", err) + } + + got := metricsToText(gathering) + + if diff := cmp.Diff(got, tt.want); diff != "" { + t.Errorf("collector output mismatch (-want +got):\n%s", diff) + } + }) + } + +} + +func metricsToText(gathering []*dto.MetricFamily) string { + out := &bytes.Buffer{} + for _, mf := range gathering { + if _, err := expfmt.MetricFamilyToText(out, mf); err != nil { + panic(err) + } + } + return out.String() +}