Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pump client: support change select pump's strategy #221

Merged
merged 4 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions tidb-binlog/pump_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ var (

// PumpInfos saves pumps' infomations in pumps client.
type PumpInfos struct {
sync.RWMutex
// Pumps saves the map of pump's nodeID and pump status.
Pumps map[string]*PumpStatus

Expand All @@ -88,6 +87,8 @@ func NewPumpInfos() *PumpInfos {

// PumpsClient is the client of pumps.
type PumpsClient struct {
sync.RWMutex

ctx context.Context

cancel context.CancelFunc
Expand Down Expand Up @@ -123,7 +124,7 @@ type PumpsClient struct {

// NewPumpsClient returns a PumpsClient.
// TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now.
func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) {
func NewPumpsClient(etcdURLs, strategy string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) {
ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -154,7 +155,7 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur
ClusterID: clusterID,
EtcdRegistry: node.NewEtcdRegistry(cli, DefaultEtcdTimeout),
Pumps: NewPumpInfos(),
Selector: NewSelector(Range),
Selector: NewSelector(strategy),
BinlogWriteTimeout: timeout,
Security: security,
nodePath: path.Join(node.DefaultRootPath, node.NodePrefix[node.PumpNode]),
Expand Down Expand Up @@ -241,14 +242,19 @@ func (c *PumpsClient) getPumpStatus(pctx context.Context) (revision int64, err e

// WriteBinlog writes binlog to a situable pump. Tips: will never return error for commit/rollback binlog.
func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error {
c.RLock()
pumpNum := len(c.Pumps.AvaliablePumps)
selector := c.Selector
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Mar 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why putting it into a lock scope?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,I think you can put lock in selector, and provide an update function. it's better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when change strategy will create a new selector. And selector is a member of pump client. so I think lock pump client seems better

c.RUnlock()

var choosePump *PumpStatus
meetError := false
defer func() {
if meetError {
c.checkPumpAvaliable()
}

c.Selector.Feedback(binlog.StartTs, binlog.Tp, choosePump)
selector.Feedback(binlog.StartTs, binlog.Tp, choosePump)
}()

commitData, err := binlog.Marshal()
Expand All @@ -262,13 +268,9 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error {
var resp *pb.WriteBinlogResp
startTime := time.Now()

c.Pumps.RLock()
pumpNum := len(c.Pumps.AvaliablePumps)
c.Pumps.RUnlock()

for {
if pump == nil || binlog.Tp == pb.BinlogType_Prewrite {
pump = c.Selector.Select(binlog, retryTime)
pump = selector.Select(binlog, retryTime)
}
if pump == nil {
err = ErrNoAvaliablePump
Expand Down Expand Up @@ -335,11 +337,11 @@ func (c *PumpsClient) backoffWriteBinlog(req *pb.WriteBinlogReq, binlogType pb.B
}

unAvaliablePumps := make([]*PumpStatus, 0, 3)
c.Pumps.RLock()
c.RLock()
for _, pump := range c.Pumps.UnAvaliablePumps {
unAvaliablePumps = append(unAvaliablePumps, pump)
}
c.Pumps.RUnlock()
c.RUnlock()

var resp *pb.WriteBinlogResp
// send binlog to unavaliable pumps to retry again.
Expand Down Expand Up @@ -367,9 +369,9 @@ func (c *PumpsClient) backoffWriteBinlog(req *pb.WriteBinlogReq, binlogType pb.B
}

func (c *PumpsClient) checkPumpAvaliable() {
c.Pumps.RLock()
c.RLock()
allPumps := copyPumps(c.Pumps.Pumps)
c.Pumps.RUnlock()
c.RUnlock()

for _, pump := range allPumps {
if !pump.IsUsable() {
Expand All @@ -380,8 +382,8 @@ func (c *PumpsClient) checkPumpAvaliable() {

// setPumpAvaliable set pump's isAvaliable, and modify UnAvaliablePumps or AvaliablePumps.
func (c *PumpsClient) setPumpAvaliable(pump *PumpStatus, avaliable bool) {
c.Pumps.Lock()
defer c.Pumps.Unlock()
c.Lock()
defer c.Unlock()

pump.Reset()

Expand All @@ -403,7 +405,7 @@ func (c *PumpsClient) setPumpAvaliable(pump *PumpStatus, avaliable bool) {

// addPump add a new pump.
func (c *PumpsClient) addPump(pump *PumpStatus, updateSelector bool) {
c.Pumps.Lock()
c.Lock()

if pump.IsUsable() {
c.Pumps.AvaliablePumps[pump.NodeID] = pump
Expand All @@ -416,13 +418,21 @@ func (c *PumpsClient) addPump(pump *PumpStatus, updateSelector bool) {
c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps))
}

c.Pumps.Unlock()
c.Unlock()
}

// SetSelectStrategy sets the selector's strategy, strategy should be 'range' or 'hash' now.
func (c *PumpsClient) SetSelectStrategy(strategy string) {
c.Lock()
c.Selector = NewSelector(strategy)
c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps))
c.Unlock()
}

// updatePump update pump's status, and return whether pump's IsAvaliable should be changed.
func (c *PumpsClient) updatePump(status *node.Status) (pump *PumpStatus, avaliableChanged, avaliable bool) {
var ok bool
c.Pumps.Lock()
c.Lock()
if pump, ok = c.Pumps.Pumps[status.NodeID]; ok {
if pump.Status.State != status.State {
if status.State == node.Online {
Expand All @@ -435,29 +445,29 @@ func (c *PumpsClient) updatePump(status *node.Status) (pump *PumpStatus, avaliab
}
pump.Status = *status
}
c.Pumps.Unlock()
c.Unlock()

return
}

// removePump removes a pump, used when pump is offline.
func (c *PumpsClient) removePump(nodeID string) {
c.Pumps.Lock()
c.Lock()
if pump, ok := c.Pumps.Pumps[nodeID]; ok {
pump.Reset()
}
delete(c.Pumps.Pumps, nodeID)
delete(c.Pumps.UnAvaliablePumps, nodeID)
delete(c.Pumps.AvaliablePumps, nodeID)
c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps))
c.Pumps.Unlock()
c.Unlock()
}

// exist returns true if pumps client has pump matched this nodeID.
func (c *PumpsClient) exist(nodeID string) bool {
c.Pumps.RLock()
c.RLock()
_, ok := c.Pumps.Pumps[nodeID]
c.Pumps.RUnlock()
c.RUnlock()
return ok
}

Expand Down Expand Up @@ -533,13 +543,13 @@ func (c *PumpsClient) detect() {
needCheckPumps := make([]*PumpStatus, 0, len(c.Pumps.UnAvaliablePumps))
checkPassPumps := make([]*PumpStatus, 0, 1)
req := &pb.WriteBinlogReq{ClusterID: c.ClusterID, Payload: nil}
c.Pumps.RLock()
c.RLock()
for _, pump := range c.Pumps.UnAvaliablePumps {
if pump.IsUsable() {
needCheckPumps = append(needCheckPumps, pump)
}
}
c.Pumps.RUnlock()
c.RUnlock()

for _, pump := range needCheckPumps {
_, err := pump.WriteBinlog(req, c.BinlogWriteTimeout)
Expand Down
24 changes: 19 additions & 5 deletions tidb-binlog/pump_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ var _ = Suite(&testClientSuite{})
type testClientSuite struct{}

func (t *testClientSuite) TestSelector(c *C) {
algorithms := []string{Hash, Range}
for _, algorithm := range algorithms {
t.testSelector(c, algorithm)
strategys := []string{Hash, Range}
for _, strategy := range strategys {
t.testSelector(c, strategy)
}
}

func (*testClientSuite) testSelector(c *C, algorithm string) {
func (*testClientSuite) testSelector(c *C, strategy string) {
pumpsClient := &PumpsClient{
Pumps: NewPumpInfos(),
Selector: NewSelector(algorithm),
Selector: NewSelector(strategy),
BinlogWriteTimeout: DefaultBinlogWriteTimeout,
}

Expand Down Expand Up @@ -139,6 +139,20 @@ func (*testClientSuite) testSelector(c *C, algorithm string) {
// prewrite binlog and commit binlog with same start ts should choose same pump
c.Assert(pump1.NodeID, Equals, pump2.NodeID)
pumpsClient.setPumpAvaliable(pump1, true)

// after change strategy, prewrite binlog and commit binlog will choose same pump
pump1 = pumpsClient.Selector.Select(prewriteBinlog, 0)
pumpsClient.Selector.Feedback(prewriteBinlog.StartTs, prewriteBinlog.Tp, pump1)
if strategy == Range {
pumpsClient.SetSelectStrategy(Hash)
} else {
pumpsClient.SetSelectStrategy(Range)
}
pump2 = pumpsClient.Selector.Select(commitBinlog, 0)
c.Assert(pump1.NodeID, Equals, pump2.NodeID)

// set back
pumpsClient.SetSelectStrategy(strategy)
}
}

Expand Down
Loading