Skip to content

Commit

Permalink
use cli pkg for flags
Browse files Browse the repository at this point in the history
  • Loading branch information
asiyani committed Dec 27, 2024
1 parent 1570a3c commit aefdc2d
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 93 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# metrics-aggregator

`metrics-aggregator` aggregates metrics after removing given labels from the target metrics.
Aggregate metrics to reduce cardinality by removing labels.

## options
```
--metrics-bind-address value The address the metric endpoint binds to. (default: ":9090")
--metrics-path value The path under which to expose metrics. (default: "/metrics")
--target-url value The remote target metrics url to scrap metrics.
--aggregate-without-label value [ --aggregate-without-label value ] The metrics will be aggregated over all label except listed labels.
Labels will be removed from the result vector, while all other labels are preserved in the output.
--include-metric value [ --include-metric value ] The name of the scrapped metrics which will be aggregated and exported. if its not set all metrics will be exported from target.
--add-prefix value The prefix which will be added to all exported metrics name.
--add-labelValue value [ --add-labelValue value ] The list of key=value pairs which will be added to all exported metrics.
--help, -h show help
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
github.com/urfave/cli/v3 v3.0.0-beta1
google.golang.org/protobuf v1.34.2
)

Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
Expand All @@ -16,7 +18,13 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/urfave/cli/v3 v3.0.0-beta1 h1:6DTaaUarcM0wX7qj5Hcvs+5Dm3dyUTBbEwIWAjcw9Zg=
github.com/urfave/cli/v3 v3.0.0-beta1/go.mod h1:FnIeEMYu+ko8zP1F9Ypr3xkZMIDqW3DR92yUtY39q1Y=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
180 changes: 98 additions & 82 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,79 @@
package main

import (
"flag"
"context"
"fmt"
"io"
"log/slog"
"maps"
"net/http"
"os"
"slices"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/urfave/cli/v3"
)

var (
log *slog.Logger
log = slog.New(slog.NewTextHandler(
os.Stderr,
&slog.HandlerOptions{
Level: slog.LevelInfo,
},
))

pcDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "metrics_aggregation_duration_seconds",
Help: "Duration of a collection",
},
[]string{"remote"},
)

flags = []cli.Flag{
&cli.StringFlag{
Name: "metrics-bind-address",
Value: ":9090",
Usage: "The address the metric endpoint binds to.",
},
&cli.StringFlag{
Name: "metrics-path",
Value: "/metrics",
Usage: "The path under which to expose metrics.",
},
&cli.StringFlag{
Name: "target-url",
Usage: "The remote target metrics url to scrap metrics.",
Required: true,
},
&cli.StringSliceFlag{
Name: "aggregate-without-label",
Usage: "The metrics will be aggregated over all label except listed labels. Labels will be removed from the result vector, while all other labels are preserved in the output.",
Required: true,
},
&cli.StringSliceFlag{
Name: "include-metric",
Usage: "The name of the scrapped metrics which will be aggregated and exported. if its not set all metrics will be exported from target.",
},
&cli.StringFlag{
Name: "add-prefix",
Usage: "The prefix which will be added to all exported metrics name.",
},
&cli.StringSliceFlag{
Name: "add-labelValue",
Usage: "The list of key=value pairs which will be added to all exported metrics.",
},
}
)

type RemoteAggregator struct {
url string
withOutLabels []string
url string
includeMetrics []string
aggregateWithOutLabels []string

addPrefix string
addLabels map[string]string
Expand Down Expand Up @@ -72,25 +115,28 @@ func (ra *RemoteAggregator) decodeAndSend(reader io.Reader, ch chan<- prometheus
break
}

ra.aggregateAndSend(&metricFamily, ch)
ra.processAndSend(&metricFamily, ch)
}
}

func (ra *RemoteAggregator) aggregateAndSend(metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric) {
func (ra *RemoteAggregator) processAndSend(metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric) {

aggregatedLabels, aggregatedValue := aggregateMetrics(metricFamily.Metric, ra.withOutLabels)
name := metricFamily.GetName()
// if includeMetrics is set filter metrics based on name
if len(ra.includeMetrics) > 0 && !slices.Contains(ra.includeMetrics, name) {
return
}

for key, value := range aggregatedValue {
if ra.addPrefix != "" {
name = ra.addPrefix + name
}

aggregatedLabels, aggregatedValue := aggregateMetrics(metricFamily.Metric, ra.aggregateWithOutLabels)

for key, value := range aggregatedValue {
var promMetric prometheus.Metric
var err error

// modify name and labels if required
name := metricFamily.GetName()
if ra.addPrefix != "" {
name = ra.addPrefix + name
}

maps.Copy(aggregatedLabels[key], ra.addLabels)

desc := prometheus.NewDesc(name, metricFamily.GetHelp(), nil, aggregatedLabels[key])
Expand All @@ -114,24 +160,22 @@ func (ra *RemoteAggregator) aggregateAndSend(metricFamily *dto.MetricFamily, ch
}

// aggregateMetrics returns aggregated values and label pairs map on same key
func aggregateMetrics(metrics []*dto.Metric, withOutLabels []string) (map[string]map[string]string, map[string]float64) {
ignoredSet := make(map[string]struct{}, len(withOutLabels))
for _, label := range withOutLabels {
ignoredSet[label] = struct{}{}
}

func aggregateMetrics(metrics []*dto.Metric, aggregateWithOutLabels []string) (map[string]map[string]string, map[string]float64) {
aggregatedValue := make(map[string]float64)
aggregatedLabels := make(map[string]map[string]string)

for _, metric := range metrics {

var key string
filteredLabels := make(map[string]string)
key := ""

for _, label := range metric.Label {
if _, found := ignoredSet[label.GetName()]; !found {
if !slices.Contains(aggregateWithOutLabels, label.GetName()) {
filteredLabels[label.GetName()] = label.GetValue()
key += label.GetName() + "=" + label.GetValue() + ","
}
}

aggregatedLabels[key] = filteredLabels

if metric.GetGauge() != nil {
Expand All @@ -147,75 +191,47 @@ func updateRunTime(remoteURL string, start time.Time) {
pcDuration.WithLabelValues(remoteURL).Observe(time.Since(start).Seconds())
}

func usage() {
fmt.Fprintf(os.Stderr, "NAME:\n")
fmt.Fprintf(os.Stderr, "\tmetrics-aggregator\n")

fmt.Fprintf(os.Stderr, "DESCRIPTION:\n")
fmt.Fprintf(os.Stderr, "\tA metrics aggregator to aggregate metrics without given labels.\n")

fmt.Fprintf(os.Stderr, "OPTIONS:\n")
fmt.Fprintf(os.Stderr, "\t--listen-address (default: :9000)\n")
fmt.Fprintf(os.Stderr, "\t--metrics-path (default: /metrics)\n")
fmt.Fprintf(os.Stderr, "\t--target-url (default: 'http://localhost:8080/metrics')\n")
fmt.Fprintf(os.Stderr, "\t--aggregate-without-label (default: '')\n")
fmt.Fprintf(os.Stderr, "\t--add-prefix (default: '')\n")
fmt.Fprintf(os.Stderr, "\t--add-labels (default: '')\n")
os.Exit(2)
}

func main() {
port := flag.String("listen-address", ":9000", "address the metrics server binds to")
metricPath := flag.String("metrics-path", "/metrics", "path under which to expose metrics")
targetURL := flag.String("target-url", "http://localhost:8090/metrics", "remote target url to scrap metrics")
withOutLabels := flag.String("aggregate-without-labels", "", "comma separated names of the labels which are removed from the aggregated metrics")
addPrefix := flag.String("add-prefix", "", "given prefix will be added to all metrics name")
addLabels := flag.String("add-labels", "", "comma separated list of key=value pairs which will be added to all metrics")

flag.Usage = usage
flag.Parse()

log = slog.New(slog.NewTextHandler(
os.Stderr,
&slog.HandlerOptions{
Level: slog.LevelInfo,
},
))

log = slog.Default()
cmd := &cli.Command{
Name: "metrics-aggregator",
Usage: "ggregate metrics to reduce cardinality by removing labels",
Flags: flags,
Action: func(ctx context.Context, cmd *cli.Command) error {

collector := &RemoteAggregator{
url: cmd.String("target-url"),
includeMetrics: cmd.StringSlice("include-metric"),
aggregateWithOutLabels: cmd.StringSlice("aggregate-without-label"),
addPrefix: cmd.String("add-prefix"),
addLabels: make(map[string]string),
}

if withOutLabels == nil || *withOutLabels == "" {
log.Error("'aggregate-without-labels' is required!")
os.Exit(1)
}
for _, pair := range cmd.StringSlice("add-labelValue") {
kv := strings.Split(pair, "=")
if len(kv) == 2 {
collector.addLabels[kv[0]] = kv[1]
}
}

collector := &RemoteAggregator{
url: *targetURL,
withOutLabels: strings.Split(*withOutLabels, ","),
addPrefix: *addPrefix,
addLabels: make(map[string]string),
}
reg := prometheus.NewPedanticRegistry()

for _, pair := range strings.Split(*addLabels, ",") {
if pair == "" {
continue
}
kv := strings.Split(pair, "=")
if len(kv) == 2 {
collector.addLabels[kv[0]] = kv[1]
}
}
reg.MustRegister(collector, pcDuration)

reg := prometheus.NewPedanticRegistry()
log.Info("starting server", "port", cmd.String("metrics-bind-address"), "metrics", cmd.String("metrics-path"))

reg.MustRegister(collector, pcDuration)
http.Handle(cmd.String("metrics-path"), promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))

log.Info("starting server", "port", *port, "metrics", *metricPath)
if err := http.ListenAndServe(cmd.String("metrics-bind-address"), nil); err != nil {
return fmt.Errorf("error starting HTTP server %w", err)
}

http.Handle(*metricPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
return nil
},
}

if err := http.ListenAndServe(*port, nil); err != nil {
log.Error("error starting HTTP server", "err", err)
if err := cmd.Run(context.Background(), os.Args); err != nil {
log.Error("error running app", "err", err)
os.Exit(1)
}

}
20 changes: 10 additions & 10 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func TestAggregateMetricss(t *testing.T) {
}

tests := []struct {
name string
withOutLabels []string
wantAggregatedLabels map[string]map[string]string
wantAggregatedValues map[string]float64
name string
aggregateWithOutLabels []string
wantAggregatedLabels map[string]map[string]string
wantAggregatedValues map[string]float64
}{
{
"no-matching-labels",
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestAggregateMetricss(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
aggregatedLabels, aggregatedValues := aggregateMetrics(metrics, tt.withOutLabels)
aggregatedLabels, aggregatedValues := aggregateMetrics(metrics, tt.aggregateWithOutLabels)

if diff := cmp.Diff(aggregatedLabels, tt.wantAggregatedLabels, cmpopts.IgnoreUnexported(dto.LabelPair{})); diff != "" {
t.Errorf("filteredLabels mismatch (-want +got):\n%s", diff)
Expand Down Expand Up @@ -151,9 +151,9 @@ component_received_event_bytes_total{l1="v1",l2="v2",l3="v3"} 3000 1735054866000
defer ts.Close()

tests := []struct {
name string
withOutLabels []string
want string
name string
aggregateWithOutLabels []string
want string
}{
{
"no-matching-labels",
Expand Down Expand Up @@ -227,8 +227,8 @@ component_received_events_total{l1="v1"} 60
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
collector := &RemoteAggregator{
url: ts.URL,
withOutLabels: tt.withOutLabels,
url: ts.URL,
aggregateWithOutLabels: tt.aggregateWithOutLabels,
}

reg := prometheus.NewPedanticRegistry()
Expand Down

0 comments on commit aefdc2d

Please sign in to comment.