-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaster.go
204 lines (157 loc) · 4.19 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
// Utility to perform master election/failover using etcd.
package election
import (
"errors"
"sync"
"time"
etcd "github.com/coreos/etcd/client"
log "github.com/Sirupsen/logrus"
"golang.org/x/net/context"
)
type Handle func(isMaster bool)
type Election struct {
running bool
mu *sync.RWMutex
servers []string
key string
whoami string
ttl time.Duration
sleep time.Duration
lastLease time.Time
handle Handle
}
type Config struct {
Servers []string
Key string
Name string
Ttl time.Duration
Sleep time.Duration
Handler Handle
}
func New(config Config) (*Election, error) {
if len(config.Servers) == 0 {
return nil, errors.New("Etcd servers cannot be empty or nil")
}
if config.Handler == nil {
return nil, errors.New("The handler cannnot be nil")
}
// Todo: Need more validation
if config.Key == "" {
return nil, errors.New("Key cannot be blank or nil")
}
// Todo: Need more validation
if config.Name == "" {
return nil, errors.New("Name cannot be blank or nil")
}
return &Election{
mu: &sync.RWMutex{},
servers: config.Servers,
key: config.Key,
whoami: config.Name,
ttl: config.Ttl,
sleep: config.Sleep,
handle: config.Handler,
}, nil
}
// Set the key passed
func (self *Election) Test(key string) error {
client, err := etcd.New(etcd.Config{Endpoints: self.servers})
if err != nil {
return err
}
keysApi := etcd.NewKeysAPI(client)
_, err = keysApi.Set(context.TODO(), key, "test", &etcd.SetOptions{
TTL: time.Second,
})
if err != nil {
return err
}
return nil
}
func (self *Election) Start() {
self.mu.Lock()
if self.running {
self.mu.Unlock()
log.Errorln("The lock is already running.")
return
}
self.running = true
self.mu.Unlock()
client, err := etcd.New(etcd.Config{Endpoints: self.servers})
if err != nil {
log.Fatalf("misconfigured etcd: %v", err)
}
for self.isRunning() {
self.leaseAndUpdateLoop(client)
}
}
func (self *Election) Stop() {
self.mu.Lock()
defer self.mu.Unlock()
self.running = false
}
func (self *Election) isRunning() bool {
self.mu.Lock()
defer self.mu.Unlock()
return self.running
}
// runs the election loop. never returns.
func (self *Election) leaseAndUpdateLoop(etcdClient etcd.Client) {
for {
master, err := self.acquireOrRenewLease(etcdClient)
if err != nil {
log.Errorf("Error in master election: %v", err)
if time.Now().Sub(self.lastLease) < self.ttl {
continue
}
// Our lease has expired due to our own accounting, pro-actively give it
// up, even if we couldn't contact etcd.
log.Infof("Too much time has elapsed, giving up lease.")
master = false
}
self.handle(master)
time.Sleep(self.sleep)
}
}
// acquireOrRenewLease either races to acquire a new master lease, or update the existing master's lease
// returns true if we have the lease, and an error if one occurs.
// TODO: use the master election utility once it is merged in.
func (self *Election) acquireOrRenewLease(etcdClient etcd.Client) (bool, error) {
keysAPI := etcd.NewKeysAPI(etcdClient)
resp, err := keysAPI.Get(context.TODO(), self.key, nil)
if err != nil {
if etcd.IsKeyNotFound(err) {
// there is no current master, try to become master, create will fail if the key already exists
_, err := keysAPI.Set(context.TODO(), self.key, self.whoami, &etcd.SetOptions{
TTL: self.ttl,
PrevExist: "",
})
if err != nil {
return false, err
}
self.lastLease = time.Now()
return true, nil
}
return false, err
}
if resp.Node.Value == self.whoami {
log.Infof("key already exists, we are the master (%s)", resp.Node.Value)
// we extend our lease @ 1/2 of the existing TTL, this ensures the master doesn't flap around
expiration := resp.Node.Expiration
if expiration == nil || expiration.Sub(time.Now()) < self.ttl/2 {
opts := etcd.SetOptions{
TTL: self.ttl,
PrevValue: self.whoami,
PrevIndex: resp.Node.ModifiedIndex,
}
_, err := keysAPI.Set(context.TODO(), self.key, self.whoami, &opts)
if err != nil {
return false, err
}
}
self.lastLease = time.Now()
return true, nil
}
log.Infof("key already exists, the master is %s, sleeping.", resp.Node.Value)
return false, nil
}