-
Notifications
You must be signed in to change notification settings - Fork 12
/
context.go
184 lines (162 loc) · 4.97 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package gohalt
import (
"context"
"time"
)
type ghctxid string
const (
ghctxpriority ghctxid = "gohalt_context_priority"
ghctxkey ghctxid = "gohalt_context_key"
ghctxmessage ghctxid = "gohalt_context_message"
ghctxtimestamp ghctxid = "gohalt_context_timestamp"
ghctxmarshaler ghctxid = "gohalt_context_marshaler"
ghctxweight ghctxid = "gohalt_context_weight"
)
// WithTimestamp adds the provided timestamp to the provided context
// to determine latency between `Acquire` and `Release`.
// Resulted context is used by: `latency` and `percentile` throtttlers.
func WithTimestamp(ctx context.Context, ts time.Time) context.Context {
return context.WithValue(ctx, ghctxtimestamp, ts)
}
func ctxTimestamp(ctx context.Context) time.Time {
if val := ctx.Value(ghctxtimestamp); val != nil {
if timestamp, ok := val.(time.Time); ok {
return timestamp.UTC()
}
}
return time.Now().UTC()
}
// WithPriority adds the provided priority to the provided context
// to differ `Acquire` priority levels.
// Resulted context is used by: `priority` throtttler.
func WithPriority(ctx context.Context, priority uint8) context.Context {
return context.WithValue(ctx, ghctxpriority, priority)
}
func ctxPriority(ctx context.Context, limit uint8) uint8 {
if val := ctx.Value(ghctxpriority); val != nil {
if priority, ok := val.(uint8); ok && priority > 0 && priority <= limit {
return priority
}
}
return 1
}
// WithWeight adds the provided weight to the provided context
// to differ `Acquire` weight levels.
// Resulted context is used by: `before`, `after`, `timed`, `adaptive`, `semaphore`, `cellrate` and `bucket` throtttlers.
func WithWeight(ctx context.Context, weight int64) context.Context {
return context.WithValue(ctx, ghctxweight, weight)
}
func ctxWeight(ctx context.Context) int64 {
if val := ctx.Value(ghctxweight); val != nil {
if weight, ok := val.(int64); ok {
return weight
}
}
return 1
}
func ctxWeightMod(ctx context.Context) int64 {
if val := ctx.Value(ghctxweight); val != nil {
if weight, ok := val.(int64); ok && weight > 0 {
return weight
}
}
return 1
}
// WithKey adds the provided key to the provided context
// to add additional call identifier to context.
// Resulted context is used by: `pattern` and `generator` throtttlers.
func WithKey(ctx context.Context, key string) context.Context {
return context.WithValue(ctx, ghctxkey, key)
}
func ctxKey(ctx context.Context) string {
if val, ok := ctx.Value(ghctxkey).(string); ok {
return val
}
return ""
}
// WithMessage adds the provided message to the provided context
// to add additional message that need to be used to context.
// Resulted context is used by: `enqueue` throtttler.
// Used in pair with `WithMarshaler`.
func WithMessage(ctx context.Context, message interface{}) context.Context {
return context.WithValue(ctx, ghctxmessage, message)
}
func ctxMessage(ctx context.Context) interface{} {
return ctx.Value(ghctxmessage)
}
// WithMarshaler adds the provided marshaler to the provided context
// to add additional message marshaler that need to be used to context.
// Resulted context is used by: `enqueue` throtttler.
// Used in pair with `WithMessage`.
func WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context {
return context.WithValue(ctx, ghctxmarshaler, mrsh)
}
func ctxMarshaler(ctx context.Context) Marshaler {
if val := ctx.Value(ghctxmarshaler); val != nil {
if marshaler, ok := val.(Marshaler); ok {
return marshaler
}
}
return DefaultMarshaler
}
// WithParams facade call that respectively calls:
// - `WithTimestamp`
// - `WithPriority`
// - `WithWeight`
// - `WithKey`
// - `WithMessage`
// - `WithMarshaler`
func WithParams(
ctx context.Context,
ts time.Time,
priority uint8,
weight int64,
key string,
message interface{},
marshaler Marshaler,
) context.Context {
ctx = WithTimestamp(ctx, ts)
ctx = WithPriority(ctx, priority)
ctx = WithWeight(ctx, weight)
ctx = WithKey(ctx, key)
ctx = WithMessage(ctx, message)
ctx = WithMarshaler(ctx, marshaler)
return ctx
}
type ctxthr struct {
context.Context
thr Throttler
freq time.Duration
}
// WithThrottler adds the provided thr to the provided context
// and defines context implementation that uses parrent context plus throttler internally
// that closes context done chanel if internal throttler throttles.
func WithThrottler(ctx context.Context, thr Throttler, freq time.Duration) context.Context {
return ctxthr{Context: ctx, thr: thr, freq: freq}
}
func (ctx ctxthr) Done() <-chan struct{} {
ch := make(chan struct{})
// proactively test throttler once
if err := ctx.Err(); err != nil {
close(ch)
return ch
}
// run long throttler error pooling
gorun(ctx, loop(ctx.freq, func(ctx context.Context) error {
err := ctx.Err()
if err != nil {
close(ch)
log("context is canceled due: %v", err)
}
return err
}))
return ch
}
func (ctx ctxthr) Err() (err error) {
r := NewRunnerSync(ctx.Context, ctx.thr)
r.Run(nope)
return r.Result()
}
func (ctx ctxthr) Throttler() Throttler {
return ctx.thr
}