Skip to content

Commit

Permalink
chore: add metrics for mongodb change stream
Browse files Browse the repository at this point in the history
  • Loading branch information
ucpr committed Dec 16, 2023
1 parent 58341c4 commit cee9cc2
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 0 deletions.
6 changes: 6 additions & 0 deletions internal/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/ucpr/mongo-streamer/internal/metric/mongo"
)

// Register register prometheus metrics to http.ServeMux
func Register(mux *http.ServeMux) {
reg := prometheus.NewRegistry()

// register metrics
reg.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
collectors.NewGoCollector(),
)
reg.MustRegister(
mongo.Collectors()...,
)

mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
}
87 changes: 87 additions & 0 deletions internal/metric/mongo/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package mongo

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
// namespace is the namespace for the metrics.
namespace = "mongo_streamer"
// subSystem is the subSystem for the metrics.
subSystem = "mongodb"

lDatabase = "database"
lCollection = "collection"
)

var (
// receivedTotal is the total number of change stream received.
receivedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "change_stream_received_total",
Help: "Total number of change stream received",
}, []string{lDatabase, lCollection},
)

// receivedBytesTotal is the total number of change stream received bytes.
receivedBytesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "change_stream_received_bytes_total",
Help: "Total number of change stream received bytes",
}, []string{lDatabase, lCollection},
)

// successHandleEventTotal is the total number of change stream handle event success.
successHandleEventTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "change_stream_handle_event_success_total",
Help: "Total number of change stream handle event success",
}, []string{lDatabase, lCollection},
)

// failedHandleEventTotal is the total number of change stream handle event failed.
failedHandleEventTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "change_stream_handle_event_failed_total",
Help: "Total number of change stream handle event failed",
}, []string{lDatabase, lCollection},
)
)

// Collectors returns all collectors of MongoDB.
func Collectors() []prometheus.Collector {
return []prometheus.Collector{
receivedTotal,
receivedBytesTotal,
successHandleEventTotal,
failedHandleEventTotal,
}
}

// ReceiveChangeStream increase the total number of change stream received.
func ReceiveChangeStream(database, collection string) {
receivedTotal.WithLabelValues(database, collection).Inc()
}

// ReceiveChangeStreamBytes increase the total number of change stream received bytes.
func ReceiveBytes(database, collection string, size int) {
receivedBytesTotal.WithLabelValues(database, collection).Add(float64(size))
}

// HandleChangeEventSuccess increase the total number of change stream handle event success.
func HandleChangeEventSuccess(database, collection string) {
successHandleEventTotal.WithLabelValues(database, collection).Inc()
}

// HandleChangeEventFailed increase the total number of change stream handle event failed.
func HandleChangeEventFailed(database, collection string) {
failedHandleEventTotal.WithLabelValues(database, collection).Inc()
}
12 changes: 12 additions & 0 deletions internal/mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/ucpr/mongo-streamer/internal/log"
mmetric "github.com/ucpr/mongo-streamer/internal/metric/mongo"
"github.com/ucpr/mongo-streamer/internal/persistent"
)

Expand All @@ -19,6 +20,8 @@ type (
cs *mongo.ChangeStream
handler ChangeStreamHandler
tokenManager persistent.StorageBuffer
db string
col string
}

// ChangeStreamOptions is a struct that represents options for change stream.
Expand Down Expand Up @@ -81,31 +84,40 @@ func NewChangeStream(ctx context.Context, cli *Client, db, col string, handler C
cs: changeStream,
handler: handler,
tokenManager: st,
db: db,
col: col,
}
return cs, nil
}

// Run starts watching change stream.
func (c *ChangeStream) Run(ctx context.Context) {
for c.cs.Next(ctx) {
mmetric.ReceiveChangeStream(c.db, c.col)

var streamObject bson.M
if err := c.cs.Decode(&streamObject); err != nil {
mmetric.HandleChangeEventFailed(c.db, c.col)
log.Error("failed to decode steream object", slog.String("err", err.Error()))
continue
}

// marshal stream object to json
jb, err := bson.MarshalExtJSON(streamObject, false, false)
if err != nil {
mmetric.HandleChangeEventFailed(c.db, c.col)
log.Error("failed to marshal stream object", slog.String("err", err.Error()))
continue
}
mmetric.ReceiveBytes(c.db, c.col, len(jb))

if err := c.handler(context.Background(), jb); err != nil {
mmetric.HandleChangeEventFailed(c.db, c.col)
log.Error("failed to handle change stream", slog.String("err", err.Error()))
// TODO: If handle fails, the process is repeated again
continue
}
mmetric.HandleChangeEventSuccess(c.db, c.col)

// save resume token
if err := c.tokenManager.Set(c.resumeToken()); err != nil {
Expand Down

0 comments on commit cee9cc2

Please sign in to comment.