forked from fuyao-w/papillon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplication.go
353 lines (336 loc) · 11 KB
/
replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
package papillon
import (
"errors"
. "github.com/fuyao-w/common-util"
"math"
"sync/atomic"
"time"
)
type (
// replication 领导人复制时每个跟随者维护的上下文状态
replication struct {
failures int // Raft.replicateTo 支持退避重试,放到这里用于只支持短连接情况
peer *LockItem[ServerInfo] // 跟随者的 server 信息
nextIndex *atomic.Uint64 // 待复制给跟随者的下一条日志索引,初始化为领导人最新的日志索引
heartBeatStop, heartBeatDone, done chan struct{} // 心跳停止、复制线程结束、pipeline 返回结果处理线程结束
trigger chan *defaultDeferResponse // 强制复制,不需要复制结果可以投递 nil
notifyCh chan struct{} // 强制心跳
stop chan bool // 复制停止通知,true 代表需要在停机前尽力复制
lastContact *LockItem[time.Time] // 上次与跟随者联系的时间,用于计算领导权
notify *LockItem[map[*verifyFuture]struct{}] // Raft.VerifyLeader 请求跟踪
// allowPipeline 是否允许通过 Raft.pipelineReplicateHelper 复制,正常情况下先短连接复制,并可能发送快照,当跟随者追赶上进度后
// 可以通过长链接进行复制
allowPipeline bool
}
)
func (fr *replication) getNextIndex() uint64 {
return fr.nextIndex.Load()
}
func (fr *replication) setNextIndex(newNextIndex uint64) {
fr.nextIndex.Store(newNextIndex)
}
// notifyAll 同步所有的 verifyFuture 验证结果,然后清空 notify
func (fr *replication) notifyAll(leadership bool) {
fr.notify.Action(func(t *map[*verifyFuture]struct{}) {
for v := range *t {
v.vote(leadership)
}
*t = map[*verifyFuture]struct{}{}
})
}
// observe 增加 Raft.VerifyLeader 的跟踪请求
func (fr *replication) observe(v *verifyFuture) {
fr.notify.Action(func(t *map[*verifyFuture]struct{}) {
(*t)[v] = struct{}{}
})
}
func (fr *replication) setLastContact() {
fr.lastContact.Set(time.Now())
}
func (fr *replication) getLastContact() time.Time {
return fr.lastContact.Get()
}
// heartbeat 想跟随者发起心跳,跟随 replicate 关闭
func (r *Raft) heartbeat(fr *replication) {
var (
req = &AppendEntryRequest{
RPCHeader: r.buildRPCHeader(),
Term: r.getCurrentTerm(),
}
failures int
)
defer close(fr.heartBeatDone)
for {
select {
case <-fr.heartBeatStop:
return
case <-randomTimeout(r.Conf().HeartbeatTimeout / 10):
case <-fr.notifyCh:
}
resp, err := r.rpc.AppendEntries(Ptr(fr.peer.Get()), req)
if err != nil {
failures++
select {
case <-time.After(exponentialBackoff(backoffBaseDuration, r.Conf().HeartbeatTimeout, failures, maxBackoffRounds)):
case <-fr.heartBeatStop:
return
}
r.logger.Errorf("AppendEntries :%s", err)
continue
}
failures = 0
fr.setLastContact()
// 由于我们没有追加日志与校验 prev log, 所以 resp.Success 结果只会受任期校验影响,true 或者 false 都是 VerifyLeader 请求可信的
fr.notifyAll(resp.Success)
}
}
// sendLatestSnapshot 发送最新的快照
func (r *Raft) sendLatestSnapshot(fr *replication) (stop bool) {
peer := fr.peer.Get()
list, err := r.snapshotStore.List()
if err != nil {
return false
}
if len(list) == 0 {
r.logger.Errorf("sendLatestSnapshot|snapshot not exist")
return
}
latestID := list[0].ID
meta, readCloser, err := r.snapshotStore.Open(latestID)
if err != nil {
r.logger.Errorf("sendLatestSnapshot|open :%s err :%s", latestID, err)
return
}
defer func() {
readCloser.Close()
}()
resp, err := r.rpc.InstallSnapShot(Ptr(fr.peer.Get()), &InstallSnapshotRequest{
RPCHeader: r.buildRPCHeader(),
Term: r.getCurrentTerm(),
SnapshotMeta: meta,
}, readCloser)
if err != nil {
r.logger.Errorf("sendLatestSnapshot|InstallSnapShot err :%s", err)
return
}
if resp.Term > r.getCurrentTerm() {
r.leaderLease(fr)
return true
}
if resp.Success {
fr.setNextIndex(meta.Index + 1)
r.updateMatchIndex(peer.ID, meta.Index)
}
fr.setLastContact()
fr.notifyAll(resp.Success)
r.logger.Debug("update next index ", fr.getNextIndex(), peer.ID)
return
}
// clacLatestIndex 计算复制到远端节点的最大索引
func clacLatestIndex(nextIndex uint64, latestIndex, maxAppendEntries uint64) uint64 {
return Min(latestIndex, nextIndex+maxAppendEntries-1)
}
// clacNextIndex 计算 replication.nextIndex
func clacNextIndex(nextIndex, peerLatestIndex uint64) uint64 {
return Max(1, Min(nextIndex-1, peerLatestIndex))
}
// replicateTo 负责短连接的跟随者复制,如果 replication.nextIndex 到 latestIndex 之前有空洞,则发送快照
// 短连接需要保证跟随者尽快追赶上进度,所以需要循环执行
func (r *Raft) replicateTo(fr *replication, latestIndex uint64) (stop bool) {
for {
req, err := r.buildAppendEntryReq(fr.getNextIndex(), latestIndex)
if err != nil {
if errors.Is(ErrNotFound, err) {
return r.sendLatestSnapshot(fr)
}
r.logger.Errorf("buildAppendEntryReq err :%s ,latest index", err, latestIndex)
return true
}
if len(req.Entries) > 0 {
//r.logger.Debug("AppendEntries to id :", fr.peer.Get().ID, " next index:", fr.getNextIndex(),
// " latestIndex:", latestIndex)
}
resp, err := r.rpc.AppendEntries(Ptr(fr.peer.Get()), req)
if err != nil {
fr.failures++
select {
case <-fr.stop:
return true
case <-time.After(exponentialBackoff(backoffBaseDuration, time.Minute, fr.failures, math.MaxInt)):
}
r.logger.Errorf("AppendEntries :%s", err)
return
}
fr.failures = 0
if resp.Term > r.getCurrentTerm() {
r.leaderLease(fr)
r.logger.Info("replicate to leader Lease", fr.peer.Get().ID)
return true
}
fr.setLastContact()
if len(req.Entries) > 0 {
//r.logger.Debug("replicate to id:", fr.peer.Get().ID, ",latestIndex:", latestIndex, ",result:", resp.Success,
// ",peer latest index", resp.LatestIndex)
}
if resp.Success {
r.updateLatestCommit(fr, req.Entries)
fr.allowPipeline = true
} else {
fr.setNextIndex(clacNextIndex(fr.getNextIndex(), resp.LatestIndex))
}
// has more
select {
case <-fr.stop:
return true
default:
if fr.getNextIndex() > latestIndex || stop {
return
}
}
}
}
// processPipelineResult 处理 pipeline 的结果
func (r *Raft) processPipelineResult(fr *replication, pipeline AppendEntryPipeline, finishCh, pipelineStopCh chan struct{}) {
defer close(pipelineStopCh)
for {
select {
case <-finishCh:
return
case fu := <-pipeline.Consumer():
resp, err := fu.Response()
if err != nil {
r.logger.Errorf("pipeline result err :%s, id :%s ", err, fr.peer.Get().ID)
continue
}
if resp.Term > r.getCurrentTerm() {
r.leaderLease(fr)
return
}
fr.setLastContact()
if resp.Success {
r.updateLatestCommit(fr, fu.Request().Entries)
} else {
fr.setNextIndex(clacNextIndex(fr.getNextIndex(), resp.LatestIndex))
r.logger.Debug("processPipelineResult", fr.getNextIndex(), resp.LatestIndex, fr.peer.Get().ID)
}
}
}
}
// pipelineReplicateTo 执行长链接复制
func (r *Raft) pipelineReplicateTo(fr *replication, pipeline AppendEntryPipeline) (stop, hasMore bool) {
req, err := r.buildAppendEntryReq(fr.getNextIndex(), r.getLatestIndex())
if err != nil {
r.logger.Errorf("pipelineReplicateTo|buildAppendEntryReq err:%s ,next index:%d ,latest index :%d ,%s", err, fr.getNextIndex(), r.getLatestIndex(), fr.peer.Get().ID)
return true, false
}
_, err = pipeline.AppendEntries(req)
if err != nil {
r.logger.Errorf("pipelineReplicateTo|AppendEntries err:%s", err)
return true, false
}
// 因为请求结果在 processPipelineResult 函数线程异步接收,这里需要立即更新 nextIndex 防止重复发送
if n := len(req.Entries); n > 0 {
fr.setNextIndex(req.Entries[n-1].Index + 1)
r.logger.Debug("setNextIndex ", fr.getNextIndex(), fr.peer.Get().ID)
return false, r.getLatestIndex()-fr.getNextIndex() > 0
}
return
}
// pipelineReplicateHelper 负责长链接复制,仅当确认跟随者上一次已经追赶上进度后才会执行,不具备快照发送能力
func (r *Raft) pipelineReplicateHelper(fr *replication) {
var (
finishCh, pipelineStopCh = make(chan struct{}), make(chan struct{})
hasMore bool
timeout = r.Conf().CommitTimeout
ticker = time.NewTimer(timeout)
)
defer ticker.Stop()
fr.allowPipeline = false
peer := fr.peer.Get()
pipeline, err := r.rpc.AppendEntryPipeline(&peer)
if err != nil {
r.logger.Error("append entry pipeline err :", err, " peer id :", peer.ID)
return
}
r.goFunc(func() {
r.processPipelineResult(fr, pipeline, finishCh, pipelineStopCh)
})
for stop := false; !stop; {
select {
case <-pipelineStopCh:
stop = true
case shouldSend := <-fr.stop:
if shouldSend {
r.pipelineReplicateTo(fr, pipeline)
}
stop = true
case <-ticker.C: // trigger 有几率丢失通知(如果 rpc 延迟过大,可能会略过最新的通知),所以通过定时任务作为补充
stop, hasMore = r.pipelineReplicateTo(fr, pipeline)
if hasMore { // 加速追赶
ticker.Reset(0)
} else {
ticker.Reset(timeout)
}
case fu := <-fr.trigger:
stop, _ = r.pipelineReplicateTo(fr, pipeline)
if fu != nil {
if stop {
fu.fail(errors.New("replication failed"))
} else {
fu.success()
}
}
}
}
close(finishCh)
select {
case <-pipelineStopCh:
case <-r.shutDown.C:
}
r.logger.Info("pipelineReplicateHelper stop", peer.ID)
return
}
// replicate 复制到制定的跟随者,先短连接(可以发送快照),后长链接
func (r *Raft) replicate(fr *replication) {
for stop := false; !stop; {
stop = r.replicateHelper(fr)
if !stop && fr.allowPipeline {
r.pipelineReplicateHelper(fr)
}
}
close(fr.heartBeatStop)
<-fr.heartBeatDone
r.logger.Info("replicate ", fr.peer.Get().ID, " stop")
close(fr.done)
}
// replicateHelper 短链复制的触发函数,通过定时器、channel 通知触发复制
func (r *Raft) replicateHelper(fr *replication) (stop bool) {
var (
ticker = time.NewTicker(r.Conf().CommitTimeout)
)
defer ticker.Stop()
defer func() {
r.logger.Info("replicateHelper end ,id:", fr.peer.Get().ID, ", stop:", stop)
}()
for !stop && !fr.allowPipeline {
select {
case shouldSend := <-fr.stop:
if shouldSend {
r.replicateTo(fr, r.getLatestIndex())
}
return true
case <-ticker.C: // trigger 有几率丢失通知(如果 rpc 延迟过大,可能会略过最新的通知),所以通过定时任务作为补充
stop = r.replicateTo(fr, r.getLatestIndex())
case fu := <-fr.trigger:
stop = r.replicateTo(fr, r.getLatestIndex())
if fu != nil {
if !stop {
fu.success()
} else {
fu.fail(errors.New("replication failed"))
}
}
}
}
return
}