65
65
66
66
// PumpInfos saves pumps' infomations in pumps client.
67
67
type PumpInfos struct {
68
- sync.RWMutex
69
68
// Pumps saves the map of pump's nodeID and pump status.
70
69
Pumps map [string ]* PumpStatus
71
70
@@ -88,6 +87,8 @@ func NewPumpInfos() *PumpInfos {
88
87
89
88
// PumpsClient is the client of pumps.
90
89
type PumpsClient struct {
90
+ sync.RWMutex
91
+
91
92
ctx context.Context
92
93
93
94
cancel context.CancelFunc
@@ -123,7 +124,7 @@ type PumpsClient struct {
123
124
124
125
// NewPumpsClient returns a PumpsClient.
125
126
// TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now.
126
- func NewPumpsClient (etcdURLs string , timeout time.Duration , securityOpt pd.SecurityOption ) (* PumpsClient , error ) {
127
+ func NewPumpsClient (etcdURLs , strategy string , timeout time.Duration , securityOpt pd.SecurityOption ) (* PumpsClient , error ) {
127
128
ectdEndpoints , err := utils .ParseHostPortAddr (etcdURLs )
128
129
if err != nil {
129
130
return nil , errors .Trace (err )
@@ -154,7 +155,7 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur
154
155
ClusterID : clusterID ,
155
156
EtcdRegistry : node .NewEtcdRegistry (cli , DefaultEtcdTimeout ),
156
157
Pumps : NewPumpInfos (),
157
- Selector : NewSelector (Range ),
158
+ Selector : NewSelector (strategy ),
158
159
BinlogWriteTimeout : timeout ,
159
160
Security : security ,
160
161
nodePath : path .Join (node .DefaultRootPath , node .NodePrefix [node .PumpNode ]),
@@ -241,14 +242,19 @@ func (c *PumpsClient) getPumpStatus(pctx context.Context) (revision int64, err e
241
242
242
243
// WriteBinlog writes binlog to a situable pump. Tips: will never return error for commit/rollback binlog.
243
244
func (c * PumpsClient ) WriteBinlog (binlog * pb.Binlog ) error {
245
+ c .RLock ()
246
+ pumpNum := len (c .Pumps .AvaliablePumps )
247
+ selector := c .Selector
248
+ c .RUnlock ()
249
+
244
250
var choosePump * PumpStatus
245
251
meetError := false
246
252
defer func () {
247
253
if meetError {
248
254
c .checkPumpAvaliable ()
249
255
}
250
256
251
- c . Selector .Feedback (binlog .StartTs , binlog .Tp , choosePump )
257
+ selector .Feedback (binlog .StartTs , binlog .Tp , choosePump )
252
258
}()
253
259
254
260
commitData , err := binlog .Marshal ()
@@ -262,13 +268,9 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error {
262
268
var resp * pb.WriteBinlogResp
263
269
startTime := time .Now ()
264
270
265
- c .Pumps .RLock ()
266
- pumpNum := len (c .Pumps .AvaliablePumps )
267
- c .Pumps .RUnlock ()
268
-
269
271
for {
270
272
if pump == nil || binlog .Tp == pb .BinlogType_Prewrite {
271
- pump = c . Selector .Select (binlog , retryTime )
273
+ pump = selector .Select (binlog , retryTime )
272
274
}
273
275
if pump == nil {
274
276
err = ErrNoAvaliablePump
@@ -335,11 +337,11 @@ func (c *PumpsClient) backoffWriteBinlog(req *pb.WriteBinlogReq, binlogType pb.B
335
337
}
336
338
337
339
unAvaliablePumps := make ([]* PumpStatus , 0 , 3 )
338
- c .Pumps . RLock ()
340
+ c .RLock ()
339
341
for _ , pump := range c .Pumps .UnAvaliablePumps {
340
342
unAvaliablePumps = append (unAvaliablePumps , pump )
341
343
}
342
- c .Pumps . RUnlock ()
344
+ c .RUnlock ()
343
345
344
346
var resp * pb.WriteBinlogResp
345
347
// send binlog to unavaliable pumps to retry again.
@@ -367,9 +369,9 @@ func (c *PumpsClient) backoffWriteBinlog(req *pb.WriteBinlogReq, binlogType pb.B
367
369
}
368
370
369
371
func (c * PumpsClient ) checkPumpAvaliable () {
370
- c .Pumps . RLock ()
372
+ c .RLock ()
371
373
allPumps := copyPumps (c .Pumps .Pumps )
372
- c .Pumps . RUnlock ()
374
+ c .RUnlock ()
373
375
374
376
for _ , pump := range allPumps {
375
377
if ! pump .IsUsable () {
@@ -380,8 +382,8 @@ func (c *PumpsClient) checkPumpAvaliable() {
380
382
381
383
// setPumpAvaliable set pump's isAvaliable, and modify UnAvaliablePumps or AvaliablePumps.
382
384
func (c * PumpsClient ) setPumpAvaliable (pump * PumpStatus , avaliable bool ) {
383
- c .Pumps . Lock ()
384
- defer c .Pumps . Unlock ()
385
+ c .Lock ()
386
+ defer c .Unlock ()
385
387
386
388
pump .Reset ()
387
389
@@ -403,7 +405,7 @@ func (c *PumpsClient) setPumpAvaliable(pump *PumpStatus, avaliable bool) {
403
405
404
406
// addPump add a new pump.
405
407
func (c * PumpsClient ) addPump (pump * PumpStatus , updateSelector bool ) {
406
- c .Pumps . Lock ()
408
+ c .Lock ()
407
409
408
410
if pump .IsUsable () {
409
411
c .Pumps .AvaliablePumps [pump .NodeID ] = pump
@@ -416,13 +418,26 @@ func (c *PumpsClient) addPump(pump *PumpStatus, updateSelector bool) {
416
418
c .Selector .SetPumps (copyPumps (c .Pumps .AvaliablePumps ))
417
419
}
418
420
419
- c .Pumps .Unlock ()
421
+ c .Unlock ()
422
+ }
423
+
424
+ // SetSelectStrategy sets the selector's strategy, strategy should be 'range' or 'hash' now.
425
+ func (c * PumpsClient ) SetSelectStrategy (strategy string ) error {
426
+ if strategy != Range && strategy != Hash {
427
+ return errors .Errorf ("strategy %s is not support" , strategy )
428
+ }
429
+
430
+ c .Lock ()
431
+ c .Selector = NewSelector (strategy )
432
+ c .Selector .SetPumps (copyPumps (c .Pumps .AvaliablePumps ))
433
+ c .Unlock ()
434
+ return nil
420
435
}
421
436
422
437
// updatePump update pump's status, and return whether pump's IsAvaliable should be changed.
423
438
func (c * PumpsClient ) updatePump (status * node.Status ) (pump * PumpStatus , avaliableChanged , avaliable bool ) {
424
439
var ok bool
425
- c .Pumps . Lock ()
440
+ c .Lock ()
426
441
if pump , ok = c .Pumps .Pumps [status .NodeID ]; ok {
427
442
if pump .Status .State != status .State {
428
443
if status .State == node .Online {
@@ -435,29 +450,29 @@ func (c *PumpsClient) updatePump(status *node.Status) (pump *PumpStatus, avaliab
435
450
}
436
451
pump .Status = * status
437
452
}
438
- c .Pumps . Unlock ()
453
+ c .Unlock ()
439
454
440
455
return
441
456
}
442
457
443
458
// removePump removes a pump, used when pump is offline.
444
459
func (c * PumpsClient ) removePump (nodeID string ) {
445
- c .Pumps . Lock ()
460
+ c .Lock ()
446
461
if pump , ok := c .Pumps .Pumps [nodeID ]; ok {
447
462
pump .Reset ()
448
463
}
449
464
delete (c .Pumps .Pumps , nodeID )
450
465
delete (c .Pumps .UnAvaliablePumps , nodeID )
451
466
delete (c .Pumps .AvaliablePumps , nodeID )
452
467
c .Selector .SetPumps (copyPumps (c .Pumps .AvaliablePumps ))
453
- c .Pumps . Unlock ()
468
+ c .Unlock ()
454
469
}
455
470
456
471
// exist returns true if pumps client has pump matched this nodeID.
457
472
func (c * PumpsClient ) exist (nodeID string ) bool {
458
- c .Pumps . RLock ()
473
+ c .RLock ()
459
474
_ , ok := c .Pumps .Pumps [nodeID ]
460
- c .Pumps . RUnlock ()
475
+ c .RUnlock ()
461
476
return ok
462
477
}
463
478
@@ -533,13 +548,13 @@ func (c *PumpsClient) detect() {
533
548
needCheckPumps := make ([]* PumpStatus , 0 , len (c .Pumps .UnAvaliablePumps ))
534
549
checkPassPumps := make ([]* PumpStatus , 0 , 1 )
535
550
req := & pb.WriteBinlogReq {ClusterID : c .ClusterID , Payload : nil }
536
- c .Pumps . RLock ()
551
+ c .RLock ()
537
552
for _ , pump := range c .Pumps .UnAvaliablePumps {
538
553
if pump .IsUsable () {
539
554
needCheckPumps = append (needCheckPumps , pump )
540
555
}
541
556
}
542
- c .Pumps . RUnlock ()
557
+ c .RUnlock ()
543
558
544
559
for _ , pump := range needCheckPumps {
545
560
_ , err := pump .WriteBinlog (req , c .BinlogWriteTimeout )
0 commit comments