-
Notifications
You must be signed in to change notification settings - Fork 0
/
collector.go
159 lines (133 loc) · 3.45 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package gomet
import (
"sync/atomic"
"time"
)
// global chan that used by meter to send events
var c = closedChan()
// it returns closed Event chan.
// So if you forgot to call Setup the Meter will panic on sending to closed chan
func closedChan() chan Event {
c := make(chan Event)
close(c)
return c
}
// Setup starts internal metrics collector.
// The collector groups metrics with specified time period.
// I suggest not setting period less than a second.
// Setup should be executed only once at the program start.
func Setup(period time.Duration) chan Tick {
// 1024 is big enough to smooth activity spike.
// but if profiler shows permanent awaiting on c chan the collector should be added with one more goroutine reader from c
c = make(chan Event, 1000)
out := collector(c, period)
return out
}
func collector(in chan Event, period time.Duration) chan Tick {
// collect events
eventsc := collect(in, period)
// aggregate events
tickc := aggregate(eventsc, period)
return tickc
}
func collect(in chan Event, period time.Duration) chan []Event {
out := make(chan []Event, 100)
go func() {
defer close(out)
ticker := time.NewTicker(period)
defer ticker.Stop()
buf := make([]Event, 0, 100)
wids := make(map[string]struct {
top int64
low int64
})
for {
select {
case ev, ok := <-in:
if !ok {
return
}
// to simplify implementation we neglect the time spending by event in input channel
// todo: for the high resolution stats it could be vital and needs to be improved
ev.Time = time.Now()
// for chan events calculate worker id
if ev.Worker == 0 {
// we in chan event
w := wids[ev.Group]
wid := &w.top //ChanIn
if ev.State == "" {
wid = &w.low //ChanOut
}
ev.Worker = atomic.AddInt64(wid, 1)
wids[ev.Group] = w
}
buf = append(buf, ev)
case now := <-ticker.C:
evs := make([]Event, len(buf), len(buf)+1)
copy(evs, buf)
buf = buf[:0]
// add auxiliary event to transfer tick time
evs = append(evs, Event{Group: "_auxiliary_", Time: now})
out <- evs
}
}
}()
return out
}
func aggregate(in chan []Event, period time.Duration) chan Tick {
out := make(chan Tick, 100)
go func() {
defer close(out)
a := newApp()
var tick Tick
var outChan chan Tick
queue := make([]Tick, 0, 1)
for {
select {
case evs, ok := <-in:
if !ok {
return
}
// pull auxiliary event to get tick time
aux := evs[len(evs)-1]
evs = evs[:len(evs)-1]
t := newTick(aux.Time, period)
for _, ev := range evs {
// get goroutine state and duration
state, start, dur := a.update(ev)
if state == "" {
continue
}
// add stats to tick
t.set(ev.Group, ev.Worker, state, start, dur, false)
}
// states that are runing - are in app, but not in tick
// states that have finished - not in app, but are in tick
for gname, g := range a {
for wid, aw := range g {
dur := t.Time.Sub(aw.Start)
t.set(gname, wid, aw.State, aw.Start, dur, true)
}
}
// initiate sending of tick to output channel if no pending tick
if outChan == nil {
tick = t
outChan = out
break
}
// put tick in the awaiting queue
queue = append(queue, t)
case outChan <- tick:
if len(queue) == 0 {
outChan = nil
break
}
// get next tick from queue
tick = queue[0]
n := copy(queue, queue[1:])
queue = queue[:n]
}
}
}()
return out
}