-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathhandlers.go
323 lines (268 loc) · 7.75 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
package raft
import (
pb "github.com/bbengfort/raft/api/v1beta1"
"github.com/rs/zerolog/log"
)
func (r *Replica) onHeartbeatTimeout(e Event) error {
for _, peer := range r.remotes {
if err := peer.AppendEntries(r.leader, r.term, r.log); err != nil {
return err
}
}
return nil
}
func (r *Replica) onElectionTimeout(e Event) error {
r.setState(Candidate)
return nil
}
func (r *Replica) onCommitRequest(e Event) (err error) {
var (
ok bool
req *pb.CommitRequest
con chan *pb.CommitReply
entry *pb.LogEntry
)
if con, ok = e.Source().(chan *pb.CommitReply); !ok {
return ErrEventSourceError
}
// If the replica is not the leader, forward to the leader.
if r.leader != r.Name {
con <- r.makeRedirect()
return nil
}
// Otherwise append the entry and send out append entries.
if req, ok = e.Value().(*pb.CommitRequest); !ok {
return ErrEventTypeError
}
if entry, err = r.log.Create(req.Name, req.Value, r.term); err != nil {
return err
}
// Interrupt the heartbeat and send append entries
r.ticker.Interrupt(HeartbeatTimeout)
for _, peer := range r.remotes {
if err = peer.AppendEntries(r.leader, r.term, r.log); err != nil {
return err
}
}
// Tie the entry and the source together so reply is sent on commit/drop.
r.clients[entry.Index] = con
return nil
}
func (r *Replica) onAggregatedCommitRequest(ae Event) (err error) {
var (
ok bool
reqs []Event
req *pb.CommitRequest
con chan *pb.CommitReply
entry *pb.LogEntry
)
// Get all requests from the event
if reqs, ok = ae.Value().([]Event); !ok {
return ErrEventTypeError
}
// Tell the world about the aggregation
log.Info().Int("n_requests", len(reqs)).Msg("aggregating commit requests into a single append entry")
// Handle each request by redirecting to the leader or creating an entry
// in the log and associating the client with the entries index for reply.
for _, e := range reqs {
// Get the commit reply connection
if con, ok = e.Source().(chan *pb.CommitReply); !ok {
return ErrEventSourceError
}
// If the replica is not the leader, forward the client
if r.leader != r.Name {
con <- r.makeRedirect()
continue
}
// Otherwise create an entry in the log
if req, ok = e.Value().(*pb.CommitRequest); !ok {
return ErrEventTypeError
}
if entry, err = r.log.Create(req.Name, req.Value, r.term); err != nil {
return err
}
// Tie the entry and the source together so reply is sent on commit/drop.
r.clients[entry.Index] = con
}
// If we're not leader, we're done sending redirects, so exit
if r.leader != r.Name {
return nil
}
// Interrupt the heartbeat and send append entries with all new entries
r.ticker.Interrupt(HeartbeatTimeout)
for _, peer := range r.remotes {
if err = peer.AppendEntries(r.leader, r.term, r.log); err != nil {
return err
}
}
// Track the number of aggregations
r.Metrics.Aggregation(len(reqs))
return nil
}
func (r *Replica) onVoteRequest(e Event) (err error) {
var (
ok bool
req *pb.VoteRequest
con chan *pb.VoteReply
)
if req, ok = e.Value().(*pb.VoteRequest); !ok {
return ErrEventTypeError
}
if con, ok = e.Source().(chan *pb.VoteReply); !ok {
return ErrEventSourceError
}
log.Debug().
Str("name", r.Name).
Str("candidate", req.Candidate).
Uint64("term", req.Term).
Msg("received vote request")
// If RPC term is greater than local term, update and set state to follower.
if _, err := r.CheckRPCTerm(req.Term); err != nil {
return err
}
// Construct the reply
rep := &pb.VoteReply{Remote: r.Name, Term: r.term, Granted: false}
// Determine whether or not to grant the vote
if req.Term >= r.term {
if r.votedFor == "" || r.votedFor == req.Candidate {
// Ensure the log is as up to date as the candidate's.
if r.log.AsUpToDate(req.LastLogIndex, req.LastLogTerm) {
log.Info().Str("candidate", req.Candidate).Uint64("term", req.Term).Msg("voting for candidate")
r.ticker.Interrupt(ElectionTimeout)
rep.Granted = true
r.votedFor = req.Candidate
} else {
log.Debug().Str("candidate", req.Candidate).Msg("log is not as up to date as candidate")
}
} else {
log.Debug().Str("voted_for", r.votedFor).Uint64("term", r.term).Msg("already voted in current term")
}
}
// Send the reply
con <- rep
return nil
}
func (r *Replica) onVoteReply(e Event) error {
rep, ok := e.Value().(*pb.VoteReply)
if !ok {
return ErrEventTypeError
}
// If RPC term is greater than local term, update and set state to follower.
if _, err := r.CheckRPCTerm(rep.Term); err != nil {
return err
}
// If we're still a candidate, update vote and determine election
if r.state == Candidate {
log.Debug().
Str("candidate", r.Name).
Str("from", rep.Remote).
Bool("granted", rep.Granted).
Uint64("term", rep.Term).
Msg("received vote")
r.votes.Vote(rep.Remote, rep.Granted)
if r.votes.Passed() {
return r.setState(Leader)
}
}
return nil
}
func (r *Replica) onAppendRequest(e Event) error {
var (
ok bool
req *pb.AppendRequest
con chan *pb.AppendReply
)
if req, ok = e.Value().(*pb.AppendRequest); !ok {
return ErrEventTypeError
}
if con, ok = e.Source().(chan *pb.AppendReply); !ok {
return ErrEventSourceError
}
// If RPC term is greater than local term, update and set state to follower.
if _, err := r.CheckRPCTerm(req.Term); err != nil {
return err
}
if len(req.Entries) == 0 {
log.Trace().Str("leader", req.Leader).Uint64("term", req.Term).Msg("heartbeat received")
} else {
log.Debug().
Str("leader", req.Leader).
Uint64("term", req.Term).
Uint64("prev_log_index", req.PrevLogIndex).
Int("entries", len(req.Entries)).
Msg("appending entries after previous log index in term")
}
// Construct the reply
rep := &pb.AppendReply{
Remote: r.Name, Term: r.term, Success: false,
Index: r.log.LastApplied(), CommitIndex: r.log.CommitIndex(),
}
// Ensure reply is sent when function is concluded
defer func() { con <- rep }()
// Check to make sure that the append entires term is not stale
if req.Term < r.term {
log.Debug().Uint64("remote_term", req.Term).Uint64("local_term", r.term).Msg("append entries term is stale")
return nil
}
// Interrupt the election timeout and set sender as leader
r.ticker.Interrupt(ElectionTimeout)
r.leader = req.Leader
// Determine if we should append entries
if err := r.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
log.Debug().Err(err).Msg("could not truncate log")
return nil
}
// Perform the append entries
if len(req.Entries) > 0 {
if err := r.log.Append(req.Entries...); err != nil {
return err
}
}
// If leader commit > local commit, update our commit index
if req.LeaderCommit > r.log.CommitIndex() {
var commitIndex uint64
if req.LeaderCommit > r.log.lastApplied {
commitIndex = r.log.lastApplied
} else {
commitIndex = req.LeaderCommit
}
if err := r.log.Commit(commitIndex); err != nil {
return err
}
}
// At this point, append entries is accepted
rep.Success = true
rep.Index = r.log.LastApplied()
rep.CommitIndex = r.log.CommitIndex()
return nil
}
func (r *Replica) onAppendReply(e Event) error {
rep, ok := e.Value().(*pb.AppendReply)
if !ok {
return ErrEventTypeError
}
// If RPC term is greater than local term, update and set state to follower.
if _, err := r.CheckRPCTerm(rep.Term); err != nil {
return err
}
// If we're no longer the leader, stop processing reply
if r.state != Leader {
return nil
}
// Update remote state based on success or failure
// TODO: review for correctness
peer := r.remotes[rep.Remote]
if rep.Success {
peer.nextIndex = rep.Index + 1
peer.matchIndex = rep.Index
} else {
if rep.Index < peer.nextIndex {
peer.nextIndex = rep.Index + 1
} else {
peer.nextIndex = rep.Index
}
return nil
}
// Decide if we can commit
return r.CheckCommits()
}