Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix typo of rebalance #688

Merged
merged 3 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ type changeFeed struct {
toCleanTables map[model.TableID]model.Ts
moveTableJobs map[model.TableID]*model.MoveTableJob
manualMoveCommands []*model.MoveTableJob
rebanlanceNextTick bool
rebalanceNextTick bool

lastRebanlanceTime time.Time
lastRebalanceTime time.Time

etcdCli kv.CDCEtcdClient
}
Expand Down Expand Up @@ -242,21 +242,21 @@ func (c *changeFeed) updatePartition(tblInfo *timodel.TableInfo, startTs uint64)
}
}

func (c *changeFeed) tryBalance(ctx context.Context, captures map[string]*model.CaptureInfo, rebanlanceNow bool,
func (c *changeFeed) tryBalance(ctx context.Context, captures map[string]*model.CaptureInfo, rebalanceNow bool,
manualMoveCommands []*model.MoveTableJob) error {
err := c.balanceOrphanTables(ctx, captures)
if err != nil {
return errors.Trace(err)
}
c.manualMoveCommands = append(c.manualMoveCommands, manualMoveCommands...)
if rebanlanceNow {
c.rebanlanceNextTick = true
if rebalanceNow {
c.rebalanceNextTick = true
}
err = c.handleManualMoveTableJobs(ctx, captures)
if err != nil {
return errors.Trace(err)
}
err = c.rebanlanceTables(ctx, captures)
err = c.rebalanceTables(ctx, captures)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -422,7 +422,7 @@ func (c *changeFeed) handleManualMoveTableJobs(ctx context.Context, captures map
return nil
}

func (c *changeFeed) rebanlanceTables(ctx context.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
func (c *changeFeed) rebalanceTables(ctx context.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
if len(captures) == 0 {
return nil
}
Expand All @@ -434,14 +434,14 @@ func (c *changeFeed) rebanlanceTables(ctx context.Context, captures map[model.Ca
return nil
}
}
timeToRebanlance := time.Since(c.lastRebanlanceTime) > time.Duration(c.info.Config.Scheduler.PollingTime)*time.Minute
timeToRebanlance = timeToRebanlance && c.info.Config.Scheduler.PollingTime > 0
timeToRebalance := time.Since(c.lastRebalanceTime) > time.Duration(c.info.Config.Scheduler.PollingTime)*time.Minute
timeToRebalance = timeToRebalance && c.info.Config.Scheduler.PollingTime > 0

if !c.rebanlanceNextTick && !timeToRebanlance {
if !c.rebalanceNextTick && !timeToRebalance {
return nil
}
c.lastRebanlanceTime = time.Now()
c.rebanlanceNextTick = false
c.lastRebalanceTime = time.Now()
c.rebalanceNextTick = false

captureIDs := make(map[model.CaptureID]struct{}, len(captures))
for cid := range captures {
Expand Down
4 changes: 2 additions & 2 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *Server) handleChangefeedAdmin(w http.ResponseWriter, req *http.Request)
handleOwnerResp(w, err)
}

func (s *Server) handleRebanlanceTrigger(w http.ResponseWriter, req *http.Request) {
func (s *Server) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
writeError(w, http.StatusBadRequest, errors.New("this api only supports POST method"))
return
Expand All @@ -129,7 +129,7 @@ func (s *Server) handleRebanlanceTrigger(w http.ResponseWriter, req *http.Reques
writeError(w, http.StatusBadRequest, errors.Errorf("invalid changefeed id: %s", changefeedID))
return
}
s.owner.TriggerRebanlance(changefeedID)
s.owner.TriggerRebalance(changefeedID)
handleOwnerResp(w, nil)
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Server) startStatusHTTP() error {
serverMux.HandleFunc("/debug/info", s.handleDebugInfo)
serverMux.HandleFunc("/capture/owner/resign", s.handleResignOwner)
serverMux.HandleFunc("/capture/owner/admin", s.handleChangefeedAdmin)
serverMux.HandleFunc("/capture/owner/rebanlance_trigger", s.handleRebanlanceTrigger)
serverMux.HandleFunc("/capture/owner/rebalance_trigger", s.handleRebalanceTrigger)
serverMux.HandleFunc("/capture/owner/move_table", s.handleMoveTable)

prometheus.DefaultGatherer = registry
Expand Down
90 changes: 45 additions & 45 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ import (

// Owner manages the cdc cluster
type Owner struct {
done chan struct{}
session *concurrency.Session
changeFeeds map[model.ChangeFeedID]*changeFeed
failedFeeds map[model.ChangeFeedID]struct{}
rebanlanceTigger map[model.ChangeFeedID]bool
rebanlanceForAllChangefeed bool
manualScheduleCommand map[model.ChangeFeedID][]*model.MoveTableJob
rebanlanceMu sync.Mutex
done chan struct{}
session *concurrency.Session
changeFeeds map[model.ChangeFeedID]*changeFeed
failedFeeds map[model.ChangeFeedID]struct{}
rebalanceTigger map[model.ChangeFeedID]bool
rebalanceForAllChangefeed bool
manualScheduleCommand map[model.ChangeFeedID][]*model.MoveTableJob
rebalanceMu sync.Mutex

cfRWriter ChangeFeedRWriter

Expand Down Expand Up @@ -87,7 +87,7 @@ func NewOwner(pdClient pd.Client, sess *concurrency.Session, gcTTL int64) (*Owne
changeFeeds: make(map[model.ChangeFeedID]*changeFeed),
failedFeeds: make(map[model.ChangeFeedID]struct{}),
captures: make(map[model.CaptureID]*model.CaptureInfo),
rebanlanceTigger: make(map[model.ChangeFeedID]bool),
rebalanceTigger: make(map[model.ChangeFeedID]bool),
manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob),
pdEndpoints: endpoints,
cfRWriter: cli,
Expand All @@ -102,9 +102,9 @@ func (o *Owner) addCapture(info *model.CaptureInfo) {
o.l.Lock()
o.captures[info.ID] = info
o.l.Unlock()
o.rebanlanceMu.Lock()
o.rebanlanceForAllChangefeed = true
o.rebanlanceMu.Unlock()
o.rebalanceMu.Lock()
o.rebalanceForAllChangefeed = true
o.rebalanceMu.Unlock()
}

func (o *Owner) removeCapture(info *model.CaptureInfo) {
Expand Down Expand Up @@ -290,17 +290,17 @@ func (o *Owner) newChangeFeed(
ResolvedTs: 0,
CheckpointTs: checkpointTs,
},
scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp),
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: sink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebanlanceTime: time.Now(),
scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp),
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: sink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
}
return cf, nil
}
Expand Down Expand Up @@ -405,30 +405,30 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
}

func (o *Owner) balanceTables(ctx context.Context) error {
rebanlanceForAllChangefeed := false
o.rebanlanceMu.Lock()
if o.rebanlanceForAllChangefeed {
rebanlanceForAllChangefeed = true
o.rebanlanceForAllChangefeed = false
rebalanceForAllChangefeed := false
o.rebalanceMu.Lock()
if o.rebalanceForAllChangefeed {
rebalanceForAllChangefeed = true
o.rebalanceForAllChangefeed = false
}
o.rebanlanceMu.Unlock()
o.rebalanceMu.Unlock()
for id, changefeed := range o.changeFeeds {
rebanlanceNow := false
rebalanceNow := false
var scheduleCommands []*model.MoveTableJob
o.rebanlanceMu.Lock()
if r, exist := o.rebanlanceTigger[id]; exist {
rebanlanceNow = r
delete(o.rebanlanceTigger, id)
o.rebalanceMu.Lock()
if r, exist := o.rebalanceTigger[id]; exist {
rebalanceNow = r
delete(o.rebalanceTigger, id)
}
if rebanlanceForAllChangefeed {
rebanlanceNow = true
if rebalanceForAllChangefeed {
rebalanceNow = true
}
if c, exist := o.manualScheduleCommand[id]; exist {
scheduleCommands = c
delete(o.manualScheduleCommand, id)
}
o.rebanlanceMu.Unlock()
err := changefeed.tryBalance(ctx, o.captures, rebanlanceNow, scheduleCommands)
o.rebalanceMu.Unlock()
err := changefeed.tryBalance(ctx, o.captures, rebalanceNow, scheduleCommands)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -764,18 +764,18 @@ func (o *Owner) EnqueueJob(job model.AdminJob) error {
return nil
}

// TriggerRebanlance triggers the rebalance in the specified changefeed
func (o *Owner) TriggerRebanlance(changefeedID model.ChangeFeedID) {
o.rebanlanceMu.Lock()
defer o.rebanlanceMu.Unlock()
o.rebanlanceTigger[changefeedID] = true
// TriggerRebalance triggers the rebalance in the specified changefeed
func (o *Owner) TriggerRebalance(changefeedID model.ChangeFeedID) {
o.rebalanceMu.Lock()
defer o.rebalanceMu.Unlock()
o.rebalanceTigger[changefeedID] = true
// TODO(leoppro) throw an error if the changefeed is not exist
}

// ManualSchedule moves the table from a capture to another capture
func (o *Owner) ManualSchedule(changefeedID model.ChangeFeedID, to model.CaptureID, tableID model.TableID) {
o.rebanlanceMu.Lock()
defer o.rebanlanceMu.Unlock()
o.rebalanceMu.Lock()
defer o.rebalanceMu.Unlock()
o.manualScheduleCommand[changefeedID] = append(o.manualScheduleCommand[changefeedID], &model.MoveTableJob{
To: to,
TableID: tableID,
Expand Down