Skip to content
This repository has been archived by the owner on Feb 17, 2025. It is now read-only.

Commit

Permalink
feat: add metrics for piping messages
Browse files Browse the repository at this point in the history
  • Loading branch information
1995parham committed Sep 18, 2021
1 parent 550cf91 commit b0559d4
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
64 changes: 64 additions & 0 deletions internal/pipe/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package pipe

import (
"errors"

"github.com/prometheus/client_golang/prometheus"
)

// Piped contains metrics to meter the number of piped messages.
type Piped struct {
PipedMessages prometheus.Counter
FailedMessages prometheus.Counter
}

func NewPiped(name string) Piped {
piped := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "sjr",
Name: "piped_total",
Help: "total number of piped messages",
Subsystem: "pipe",
ConstLabels: prometheus.Labels{
"topic": name,
},
})

if err := prometheus.Register(piped); err != nil {
var are prometheus.AlreadyRegisteredError
if ok := errors.As(err, &are); ok {
piped, ok = are.ExistingCollector.(prometheus.Counter)
if !ok {
panic("piped must be a counter")
}
} else {
panic(err)
}
}

failed := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "sjr",
Name: "piped_total",
Help: "total number of failed messages",
Subsystem: "pipe",
ConstLabels: prometheus.Labels{
"topic": name,
},
})

if err := prometheus.Register(failed); err != nil {
var are prometheus.AlreadyRegisteredError
if ok := errors.As(err, &are); ok {
piped, ok = are.ExistingCollector.(prometheus.Counter)
if !ok {
panic("failed must be a counter")
}
} else {
panic(err)
}
}

return Piped{
PipedMessages: piped,
FailedMessages: failed,
}
}
6 changes: 5 additions & 1 deletion internal/pipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func New(c *cmq.CMQ, s *streaming.Streaming, logger *zap.Logger, tracer trace.Tr
// its subscription on streaming isn't durable and it always start from 1 second behind.
// the reason here is to reduce load on the streaming server as much as possible.
func (p *Pipe) Pipe(topic string) {
piped := NewPiped(topic)

if _, err := p.Streaming.Conn.QueueSubscribe(topic, p.Streaming.Group, func(imsg *stan.Msg) {
defer func() {
_ = imsg.Ack()
Expand All @@ -56,11 +58,13 @@ func (p *Pipe) Pipe(topic string) {
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(omsg.Header))

if _, err := p.CMQ.JConn.PublishMsg(omsg); err != nil {
piped.FailedMessages.Inc()
span.RecordError(err)
p.Logger.Error("stan subscription failed", zap.Error(err))
p.Logger.Error("jetstream publish failed", zap.Error(err))
}

span.End()
piped.PipedMessages.Inc()
}, stan.StartAtTimeDelta(time.Second), stan.SetManualAckMode()); err != nil {
p.Logger.Fatal("stan subscription failed", zap.Error(err))
}
Expand Down

0 comments on commit b0559d4

Please sign in to comment.