forked from ethereum/go-ethereum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmeter.go
189 lines (163 loc) · 4.93 KB
/
meter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package metrics
import (
"math"
"sync"
"sync/atomic"
"time"
)
type MeterSnapshot interface {
Count() int64
Rate1() float64
Rate5() float64
Rate15() float64
RateMean() float64
}
// Meters count events to produce exponentially-weighted moving average rates
// at one-, five-, and fifteen-minutes and a mean rate.
type Meter interface {
Mark(int64)
Snapshot() MeterSnapshot
Stop()
}
// GetOrRegisterMeter returns an existing Meter or constructs and registers a
// new StandardMeter.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterMeter(name string, r Registry) Meter {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewMeter).(Meter)
}
// NewMeter constructs a new StandardMeter and launches a goroutine.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
func NewMeter() Meter {
if !Enabled {
return NilMeter{}
}
m := newStandardMeter()
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters[m] = struct{}{}
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
}
return m
}
// NewInactiveMeter returns a meter but does not start any goroutines. This
// method is mainly intended for testing.
func NewInactiveMeter() Meter {
if !Enabled {
return NilMeter{}
}
m := newStandardMeter()
return m
}
// NewRegisteredMeter constructs and registers a new StandardMeter
// and launches a goroutine.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredMeter(name string, r Registry) Meter {
return GetOrRegisterMeter(name, r)
}
// meterSnapshot is a read-only copy of the meter's internal values.
type meterSnapshot struct {
count int64
rate1, rate5, rate15, rateMean float64
}
// Count returns the count of events at the time the snapshot was taken.
func (m *meterSnapshot) Count() int64 { return m.count }
// Rate1 returns the one-minute moving average rate of events per second at the
// time the snapshot was taken.
func (m *meterSnapshot) Rate1() float64 { return m.rate1 }
// Rate5 returns the five-minute moving average rate of events per second at
// the time the snapshot was taken.
func (m *meterSnapshot) Rate5() float64 { return m.rate5 }
// Rate15 returns the fifteen-minute moving average rate of events per second
// at the time the snapshot was taken.
func (m *meterSnapshot) Rate15() float64 { return m.rate15 }
// RateMean returns the meter's mean rate of events per second at the time the
// snapshot was taken.
func (m *meterSnapshot) RateMean() float64 { return m.rateMean }
// NilMeter is a no-op Meter.
type NilMeter struct{}
func (NilMeter) Count() int64 { return 0 }
func (NilMeter) Mark(n int64) {}
func (NilMeter) Snapshot() MeterSnapshot { return (*emptySnapshot)(nil) }
func (NilMeter) Stop() {}
// StandardMeter is the standard implementation of a Meter.
type StandardMeter struct {
count atomic.Int64
uncounted atomic.Int64 // not yet added to the EWMAs
rateMean atomic.Uint64
a1, a5, a15 EWMA
startTime time.Time
stopped atomic.Bool
}
func newStandardMeter() *StandardMeter {
return &StandardMeter{
a1: NewEWMA1(),
a5: NewEWMA5(),
a15: NewEWMA15(),
startTime: time.Now(),
}
}
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() {
if stopped := m.stopped.Swap(true); !stopped {
arbiter.Lock()
delete(arbiter.meters, m)
arbiter.Unlock()
}
}
// Mark records the occurrence of n events.
func (m *StandardMeter) Mark(n int64) {
m.uncounted.Add(n)
}
// Snapshot returns a read-only copy of the meter.
func (m *StandardMeter) Snapshot() MeterSnapshot {
return &meterSnapshot{
count: m.count.Load() + m.uncounted.Load(),
rate1: m.a1.Snapshot().Rate(),
rate5: m.a5.Snapshot().Rate(),
rate15: m.a15.Snapshot().Rate(),
rateMean: math.Float64frombits(m.rateMean.Load()),
}
}
func (m *StandardMeter) tick() {
// Take the uncounted values, add to count
n := m.uncounted.Swap(0)
count := m.count.Add(n)
m.rateMean.Store(math.Float64bits(float64(count) / time.Since(m.startTime).Seconds()))
// Update the EWMA's internal state
m.a1.Update(n)
m.a5.Update(n)
m.a15.Update(n)
// And trigger them to calculate the rates
m.a1.Tick()
m.a5.Tick()
m.a15.Tick()
}
// meterArbiter ticks meters every 5s from a single goroutine.
// meters are references in a set for future stopping.
type meterArbiter struct {
sync.RWMutex
started bool
meters map[*StandardMeter]struct{}
ticker *time.Ticker
}
var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})}
// Ticks meters on the scheduled interval
func (ma *meterArbiter) tick() {
for range ma.ticker.C {
ma.tickMeters()
}
}
func (ma *meterArbiter) tickMeters() {
ma.RLock()
defer ma.RUnlock()
for meter := range ma.meters {
meter.tick()
}
}