Skip to content

Commit

Permalink
fix typo of rebalance (#688)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored Jun 23, 2020
1 parent 40bcd29 commit 48d3b37
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 60 deletions.
24 changes: 12 additions & 12 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ type changeFeed struct {
toCleanTables map[model.TableID]model.Ts
moveTableJobs map[model.TableID]*model.MoveTableJob
manualMoveCommands []*model.MoveTableJob
rebanlanceNextTick bool
rebalanceNextTick bool

lastRebanlanceTime time.Time
lastRebalanceTime time.Time

etcdCli kv.CDCEtcdClient
}
Expand Down Expand Up @@ -242,21 +242,21 @@ func (c *changeFeed) updatePartition(tblInfo *timodel.TableInfo, startTs uint64)
}
}

func (c *changeFeed) tryBalance(ctx context.Context, captures map[string]*model.CaptureInfo, rebanlanceNow bool,
func (c *changeFeed) tryBalance(ctx context.Context, captures map[string]*model.CaptureInfo, rebalanceNow bool,
manualMoveCommands []*model.MoveTableJob) error {
err := c.balanceOrphanTables(ctx, captures)
if err != nil {
return errors.Trace(err)
}
c.manualMoveCommands = append(c.manualMoveCommands, manualMoveCommands...)
if rebanlanceNow {
c.rebanlanceNextTick = true
if rebalanceNow {
c.rebalanceNextTick = true
}
err = c.handleManualMoveTableJobs(ctx, captures)
if err != nil {
return errors.Trace(err)
}
err = c.rebanlanceTables(ctx, captures)
err = c.rebalanceTables(ctx, captures)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -422,7 +422,7 @@ func (c *changeFeed) handleManualMoveTableJobs(ctx context.Context, captures map
return nil
}

func (c *changeFeed) rebanlanceTables(ctx context.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
func (c *changeFeed) rebalanceTables(ctx context.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
if len(captures) == 0 {
return nil
}
Expand All @@ -434,14 +434,14 @@ func (c *changeFeed) rebanlanceTables(ctx context.Context, captures map[model.Ca
return nil
}
}
timeToRebanlance := time.Since(c.lastRebanlanceTime) > time.Duration(c.info.Config.Scheduler.PollingTime)*time.Minute
timeToRebanlance = timeToRebanlance && c.info.Config.Scheduler.PollingTime > 0
timeToRebalance := time.Since(c.lastRebalanceTime) > time.Duration(c.info.Config.Scheduler.PollingTime)*time.Minute
timeToRebalance = timeToRebalance && c.info.Config.Scheduler.PollingTime > 0

if !c.rebanlanceNextTick && !timeToRebanlance {
if !c.rebalanceNextTick && !timeToRebalance {
return nil
}
c.lastRebanlanceTime = time.Now()
c.rebanlanceNextTick = false
c.lastRebalanceTime = time.Now()
c.rebalanceNextTick = false

captureIDs := make(map[model.CaptureID]struct{}, len(captures))
for cid := range captures {
Expand Down
4 changes: 2 additions & 2 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *Server) handleChangefeedAdmin(w http.ResponseWriter, req *http.Request)
handleOwnerResp(w, err)
}

func (s *Server) handleRebanlanceTrigger(w http.ResponseWriter, req *http.Request) {
func (s *Server) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
writeError(w, http.StatusBadRequest, errors.New("this api only supports POST method"))
return
Expand All @@ -129,7 +129,7 @@ func (s *Server) handleRebanlanceTrigger(w http.ResponseWriter, req *http.Reques
writeError(w, http.StatusBadRequest, errors.Errorf("invalid changefeed id: %s", changefeedID))
return
}
s.owner.TriggerRebanlance(changefeedID)
s.owner.TriggerRebalance(changefeedID)
handleOwnerResp(w, nil)
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Server) startStatusHTTP() error {
serverMux.HandleFunc("/debug/info", s.handleDebugInfo)
serverMux.HandleFunc("/capture/owner/resign", s.handleResignOwner)
serverMux.HandleFunc("/capture/owner/admin", s.handleChangefeedAdmin)
serverMux.HandleFunc("/capture/owner/rebanlance_trigger", s.handleRebanlanceTrigger)
serverMux.HandleFunc("/capture/owner/rebalance_trigger", s.handleRebalanceTrigger)
serverMux.HandleFunc("/capture/owner/move_table", s.handleMoveTable)

prometheus.DefaultGatherer = registry
Expand Down
90 changes: 45 additions & 45 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ import (

// Owner manages the cdc cluster
type Owner struct {
done chan struct{}
session *concurrency.Session
changeFeeds map[model.ChangeFeedID]*changeFeed
failedFeeds map[model.ChangeFeedID]struct{}
rebanlanceTigger map[model.ChangeFeedID]bool
rebanlanceForAllChangefeed bool
manualScheduleCommand map[model.ChangeFeedID][]*model.MoveTableJob
rebanlanceMu sync.Mutex
done chan struct{}
session *concurrency.Session
changeFeeds map[model.ChangeFeedID]*changeFeed
failedFeeds map[model.ChangeFeedID]struct{}
rebalanceTigger map[model.ChangeFeedID]bool
rebalanceForAllChangefeed bool
manualScheduleCommand map[model.ChangeFeedID][]*model.MoveTableJob
rebalanceMu sync.Mutex

cfRWriter ChangeFeedRWriter

Expand Down Expand Up @@ -87,7 +87,7 @@ func NewOwner(pdClient pd.Client, sess *concurrency.Session, gcTTL int64) (*Owne
changeFeeds: make(map[model.ChangeFeedID]*changeFeed),
failedFeeds: make(map[model.ChangeFeedID]struct{}),
captures: make(map[model.CaptureID]*model.CaptureInfo),
rebanlanceTigger: make(map[model.ChangeFeedID]bool),
rebalanceTigger: make(map[model.ChangeFeedID]bool),
manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob),
pdEndpoints: endpoints,
cfRWriter: cli,
Expand All @@ -102,9 +102,9 @@ func (o *Owner) addCapture(info *model.CaptureInfo) {
o.l.Lock()
o.captures[info.ID] = info
o.l.Unlock()
o.rebanlanceMu.Lock()
o.rebanlanceForAllChangefeed = true
o.rebanlanceMu.Unlock()
o.rebalanceMu.Lock()
o.rebalanceForAllChangefeed = true
o.rebalanceMu.Unlock()
}

func (o *Owner) removeCapture(info *model.CaptureInfo) {
Expand Down Expand Up @@ -290,17 +290,17 @@ func (o *Owner) newChangeFeed(
ResolvedTs: 0,
CheckpointTs: checkpointTs,
},
scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp),
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: sink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebanlanceTime: time.Now(),
scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp),
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: sink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
}
return cf, nil
}
Expand Down Expand Up @@ -405,30 +405,30 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
}

func (o *Owner) balanceTables(ctx context.Context) error {
rebanlanceForAllChangefeed := false
o.rebanlanceMu.Lock()
if o.rebanlanceForAllChangefeed {
rebanlanceForAllChangefeed = true
o.rebanlanceForAllChangefeed = false
rebalanceForAllChangefeed := false
o.rebalanceMu.Lock()
if o.rebalanceForAllChangefeed {
rebalanceForAllChangefeed = true
o.rebalanceForAllChangefeed = false
}
o.rebanlanceMu.Unlock()
o.rebalanceMu.Unlock()
for id, changefeed := range o.changeFeeds {
rebanlanceNow := false
rebalanceNow := false
var scheduleCommands []*model.MoveTableJob
o.rebanlanceMu.Lock()
if r, exist := o.rebanlanceTigger[id]; exist {
rebanlanceNow = r
delete(o.rebanlanceTigger, id)
o.rebalanceMu.Lock()
if r, exist := o.rebalanceTigger[id]; exist {
rebalanceNow = r
delete(o.rebalanceTigger, id)
}
if rebanlanceForAllChangefeed {
rebanlanceNow = true
if rebalanceForAllChangefeed {
rebalanceNow = true
}
if c, exist := o.manualScheduleCommand[id]; exist {
scheduleCommands = c
delete(o.manualScheduleCommand, id)
}
o.rebanlanceMu.Unlock()
err := changefeed.tryBalance(ctx, o.captures, rebanlanceNow, scheduleCommands)
o.rebalanceMu.Unlock()
err := changefeed.tryBalance(ctx, o.captures, rebalanceNow, scheduleCommands)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -764,18 +764,18 @@ func (o *Owner) EnqueueJob(job model.AdminJob) error {
return nil
}

// TriggerRebanlance triggers the rebalance in the specified changefeed
func (o *Owner) TriggerRebanlance(changefeedID model.ChangeFeedID) {
o.rebanlanceMu.Lock()
defer o.rebanlanceMu.Unlock()
o.rebanlanceTigger[changefeedID] = true
// TriggerRebalance triggers the rebalance in the specified changefeed
func (o *Owner) TriggerRebalance(changefeedID model.ChangeFeedID) {
o.rebalanceMu.Lock()
defer o.rebalanceMu.Unlock()
o.rebalanceTigger[changefeedID] = true
// TODO(leoppro) throw an error if the changefeed is not exist
}

// ManualSchedule moves the table from a capture to another capture
func (o *Owner) ManualSchedule(changefeedID model.ChangeFeedID, to model.CaptureID, tableID model.TableID) {
o.rebanlanceMu.Lock()
defer o.rebanlanceMu.Unlock()
o.rebalanceMu.Lock()
defer o.rebalanceMu.Unlock()
o.manualScheduleCommand[changefeedID] = append(o.manualScheduleCommand[changefeedID], &model.MoveTableJob{
To: to,
TableID: tableID,
Expand Down

0 comments on commit 48d3b37

Please sign in to comment.