forked from easyCZ/connect-go-prometheus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconn.go
104 lines (86 loc) · 2.83 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package connect_go_prometheus
import (
"time"
"google.golang.org/protobuf/proto"
"connectrpc.com/connect"
)
type streamingConn struct {
startTime time.Time
callType, service, method string
reporter *Metrics
}
func newStreamingConn(spec connect.Spec, reporter *Metrics) streamingConn {
callPackage, callMethod := procedureToPackageAndMethod(spec.Procedure)
conn := streamingConn{
startTime: time.Now(),
callType: streamTypeString(spec.StreamType),
service: callPackage,
method: callMethod,
reporter: reporter,
}
reporter.requestStarted.WithLabelValues(conn.callType, conn.service, conn.method).Inc()
return conn
}
func (conn *streamingConn) reportSend(message any) {
conn.reporter.streamMsgSent.WithLabelValues(conn.callType, conn.service, conn.method).Inc()
if conn.reporter.bytesSent != nil {
conn.reporter.bytesSent.WithLabelValues(conn.callType, conn.service, conn.method).Add(float64(proto.Size(message.(proto.Message))))
}
}
func (conn *streamingConn) reportReceive(message any) {
conn.reporter.streamMsgReceived.WithLabelValues(conn.callType, conn.service, conn.method).Inc()
if conn.reporter.bytesReceived != nil {
conn.reporter.bytesReceived.WithLabelValues(conn.callType, conn.service, conn.method).Add(float64(proto.Size(message.(proto.Message))))
}
}
type streamingClientConn struct {
connect.StreamingClientConn
streamingConn
onClose func(error)
}
func newStreamingClientConn(conn connect.StreamingClientConn, i *Interceptor, onClose func(error)) *streamingClientConn {
return &streamingClientConn{
StreamingClientConn: conn,
streamingConn: newStreamingConn(conn.Spec(), i.client),
onClose: onClose,
}
}
func (conn *streamingClientConn) Send(msg any) error {
conn.reportSend(msg)
return conn.StreamingClientConn.Send(msg)
}
func (conn *streamingClientConn) Receive(msg any) error {
err := conn.StreamingClientConn.Receive(msg)
if err == nil {
conn.reportReceive(msg)
}
return err
}
func (conn *streamingClientConn) CloseResponse() error {
err := conn.StreamingClientConn.CloseResponse()
conn.onClose(err)
return err
}
var _ connect.StreamingClientConn = (*streamingClientConn)(nil)
type streamingHandlerConn struct {
connect.StreamingHandlerConn
streamingConn
}
func newStreamingHandlerConn(conn connect.StreamingHandlerConn, i *Interceptor) *streamingHandlerConn {
return &streamingHandlerConn{
StreamingHandlerConn: conn,
streamingConn: newStreamingConn(conn.Spec(), i.server),
}
}
func (conn *streamingHandlerConn) Send(msg any) error {
conn.reportSend(msg)
return conn.StreamingHandlerConn.Send(msg)
}
func (conn *streamingHandlerConn) Receive(msg any) error {
err := conn.StreamingHandlerConn.Receive(msg)
if err == nil {
conn.reportReceive(msg)
}
return err
}
var _ connect.StreamingHandlerConn = (*streamingHandlerConn)(nil)