Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed May 24, 2022
1 parent 9d5e7d6 commit fcea4d5
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 11 deletions.
14 changes: 6 additions & 8 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ func newCaptureStatus(rev schedulepb.OwnerRevision) *CaptureStatus {
func (c *CaptureStatus) handleHeartbeatResponse(
resp *schedulepb.HeartbeatResponse, epoch schedulepb.ProcessorEpoch,
) {
epochMismatch := c.State != CaptureStateUninitialize &&
c.Epoch.Epoch != epoch.Epoch
if epochMismatch {
// Check epoch for initialized captures.
if c.State != CaptureStateUninitialize && c.Epoch.Epoch != epoch.Epoch {
log.Warn("tpscheduler: ignore heartbeat response",
zap.String("epoch", c.Epoch.Epoch),
zap.String("respEpoch", epoch.Epoch),
Expand Down Expand Up @@ -95,14 +94,13 @@ func (c *captureManager) captureTableSets() map[model.CaptureID]*CaptureStatus {
return c.Captures
}

func (c *captureManager) checkCaptureInitialized() bool {
allInitialized := true
func (c *captureManager) checkAllCaptureInitialized() bool {
for _, captrueStatus := range c.Captures {
if captrueStatus.State == CaptureStateUninitialize {
allInitialized = false
return false
}
}
return allInitialized
return true
}

func (c *captureManager) tick() []*schedulepb.Message {
Expand Down Expand Up @@ -137,7 +135,7 @@ func (c *captureManager) poll(
msg.GetHeartbeatResponse(), msg.Header.ProcessorEpoch)
}
}
return outMsgs, c.checkCaptureInitialized()
return outMsgs, c.checkAllCaptureInitialized()
}

func (c *captureManager) onAliveCaptureUpdate(
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/capture_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestCaptureManagerTick(t *testing.T) {
for _, s := range []CaptureState{CaptureStateInitialized, CaptureStateStopping} {
cm.Captures["1"].State = s
cm.Captures["2"].State = s
require.True(t, cm.checkCaptureInitialized())
require.True(t, cm.checkAllCaptureInitialized())
msgs = cm.tick()
require.Empty(t, msgs)
msgs = cm.tick()
Expand Down
4 changes: 3 additions & 1 deletion cdc/scheduler/internal/tp/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ func (r *replicationManager) handleMessageHeartbeat(msg *schedulepb.Heartbeat) {
// TODO: build s.tables from Heartbeat message.
}

func (r *replicationManager) handleMessageDispatchTableResponse(msg *schedulepb.DispatchTableResponse) {
func (r *replicationManager) handleMessageDispatchTableResponse(
msg *schedulepb.DispatchTableResponse,
) {
// TODO: update s.tables from DispatchTableResponse message.
}

Expand Down
5 changes: 4 additions & 1 deletion scripts/check-diff-line-width.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ BASE_HASH=$(git --no-pager log -E --grep='\(#[0-9]+\)$' -n 1 --format=format:%H)
WARN_THRESHOLD=80
ERROR_THRESHOLD=100

git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd -- ':(exclude)*_gen.go' -- ':(exclude)*_gen_test.go' |
git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd \
-- ':(exclude)*_gen.go' \
-- ':(exclude)*_gen_test.go' \
-- ':(exclude)*.pb.go' |
grep -E '^\+' | grep -vE '^\+\+\+' |
sed 's/\t/ /g' |
awk "
Expand Down

0 comments on commit fcea4d5

Please sign in to comment.