diff --git a/.github/workflows/dataflow_engine_e2e.yaml b/.github/workflows/dataflow_engine_e2e.yaml index d8307af8..5fc42cbf 100644 --- a/.github/workflows/dataflow_engine_e2e.yaml +++ b/.github/workflows/dataflow_engine_e2e.yaml @@ -94,6 +94,46 @@ jobs: name: node-failure-workflow-logs path: logs/* + Worker-error-workflow: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - uses: actions/setup-go@v3 + with: + go-version: 1.18 + + - name: Build images + run: $GITHUB_WORKSPACE/sample/prepare.sh + + - name: Run containers + run: docker-compose -f $GITHUB_WORKSPACE/sample/3m3e.yaml up -d + + - name: Run tests + run: | + cd $GITHUB_WORKSPACE/test/e2e + go test -count=1 -v -run=TestWorkerExit + + - name: Dump docker container logs on failure + if: ${{ failure() }} + uses: jwalton/gh-docker-logs@v2 + with: + tail: '100' + + - name: Collect docker logs on failure + if: ${{ failure() }} + uses: jwalton/gh-docker-logs@v2 + with: + dest: 'logs' + + - name: Upload logs to GitHub + if: ${{ failure() }} + uses: actions/upload-artifact@master + with: + name: node-failure-workflow-logs + path: logs/* + DM-workflow: runs-on: ubuntu-latest diff --git a/lib/base_jobmaster.go b/lib/base_jobmaster.go index 3a925e39..134d7c46 100644 --- a/lib/base_jobmaster.go +++ b/lib/base_jobmaster.go @@ -3,9 +3,6 @@ package lib import ( "context" - "github.com/hanfei1991/microcosm/pkg/errctx" - resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model" - "github.com/pingcap/errors" "github.com/pingcap/tiflow/dm/pkg/log" @@ -13,6 +10,9 @@ import ( libModel "github.com/hanfei1991/microcosm/lib/model" "github.com/hanfei1991/microcosm/model" dcontext "github.com/hanfei1991/microcosm/pkg/context" + "github.com/hanfei1991/microcosm/pkg/errctx" + derror "github.com/hanfei1991/microcosm/pkg/errors" + resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model" "github.com/hanfei1991/microcosm/pkg/meta/metaclient" "github.com/hanfei1991/microcosm/pkg/p2p" ) @@ -142,7 +142,10 @@ func (d *DefaultBaseJobMaster) Poll(ctx context.Context) error { return errors.Trace(err) } if err := d.worker.doPoll(ctx); err != nil { - return errors.Trace(err) + if derror.ErrWorkerHalfExit.NotEqual(err) { + return errors.Trace(err) + } + return nil } if err := d.impl.Tick(ctx); err != nil { return errors.Trace(err) diff --git a/lib/fake/fake_master.go b/lib/fake/fake_master.go index d3179804..d48df7d7 100644 --- a/lib/fake/fake_master.go +++ b/lib/fake/fake_master.go @@ -45,6 +45,8 @@ type Config struct { EtcdWatchEnable bool `json:"etcd-watch-enable"` EtcdEndpoints []string `json:"etcd-endpoints"` EtcdWatchPrefix string `json:"etcd-watch-prefix"` + + InjectErrorInterval time.Duration `json:"inject-error-interval"` } // Checkpoint defines the checkpoint of fake job @@ -436,6 +438,9 @@ func (m *Master) OnWorkerMessage(worker lib.WorkerHandle, topic p2p.Topic, messa // OnWorkerStatusUpdated implements MasterImpl.OnWorkerStatusUpdated func (m *Master) OnWorkerStatusUpdated(worker lib.WorkerHandle, newStatus *libModel.WorkerStatus) error { + log.L().Info("FakeMaster: worker status updated", + zap.String("worker-id", worker.ID()), + zap.Any("worker-status", newStatus)) return nil } @@ -532,7 +537,8 @@ func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *Worker EtcdWatchPrefix: m.config.EtcdWatchPrefix, // loaded from checkpoint if exists - Checkpoint: checkpoint, + Checkpoint: checkpoint, + InjectErrorInterval: m.config.InjectErrorInterval, } } @@ -546,7 +552,7 @@ func NewFakeMaster(ctx *dcontext.Context, workerID libModel.WorkerID, masterID l workerList: make([]lib.WorkerHandle, masterConfig.WorkerCount), workerID2BusinessID: make(map[libModel.WorkerID]int), config: masterConfig, - statusRateLimiter: rate.NewLimiter(rate.Every(time.Second*3), 1), + statusRateLimiter: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), bStatus: &businessStatus{status: make(map[libModel.WorkerID]*dummyWorkerStatus)}, finishedSet: make(map[libModel.WorkerID]int), ctx: ctx.Context, diff --git a/lib/fake/fake_worker.go b/lib/fake/fake_worker.go index da8eb877..a3e1295c 100644 --- a/lib/fake/fake_worker.go +++ b/lib/fake/fake_worker.go @@ -35,9 +35,10 @@ type ( ID int `json:"id"` TargetTick int64 `json:"target-tick"` - EtcdWatchEnable bool `json:"etcd-watch-enable"` - EtcdEndpoints []string `json:"etcd-endpoints"` - EtcdWatchPrefix string `json:"etcd-watch-prefix"` + EtcdWatchEnable bool `json:"etcd-watch-enable"` + EtcdEndpoints []string `json:"etcd-endpoints"` + EtcdWatchPrefix string `json:"etcd-watch-prefix"` + InjectErrorInterval time.Duration `json:"inject-error-interval"` Checkpoint workerCheckpoint `json:"checkpoint"` } @@ -57,6 +58,8 @@ type ( sync.RWMutex code libModel.WorkerStatusCode } + + startTime time.Time } ) @@ -102,6 +105,7 @@ func (d *dummyWorker) InitImpl(ctx context.Context) error { } d.init = true d.setStatusCode(libModel.WorkerStatusNormal) + d.startTime = time.Now() return nil } return errors.New("repeated init") @@ -144,6 +148,11 @@ func (d *dummyWorker) Tick(ctx context.Context) error { return d.Exit(ctx, d.Status(), nil) } + if d.config.InjectErrorInterval != 0 { + if time.Since(d.startTime) > d.config.InjectErrorInterval { + return errors.Errorf("injected error by worker: %d", d.config.ID) + } + } return nil } @@ -277,7 +286,7 @@ func NewDummyWorker( }, } return &dummyWorker{ - statusRateLimiter: rate.NewLimiter(rate.Every(time.Second*3), 1), + statusRateLimiter: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), status: status, config: wcfg, errCh: make(chan error, 1), diff --git a/lib/master.go b/lib/master.go index 2ccb1469..19d0aafb 100644 --- a/lib/master.go +++ b/lib/master.go @@ -313,6 +313,7 @@ func (m *DefaultBaseMaster) registerMessageHandlers(ctx context.Context) error { ReplyTime: m.clock.Now(), ToWorkerID: msg.FromWorkerID, Epoch: m.currentEpoch.Load(), + IsFinished: msg.IsFinished, }) if err != nil { return err diff --git a/lib/master/worker_entry.go b/lib/master/worker_entry.go index 03187505..45dfb89e 100644 --- a/lib/master/worker_entry.go +++ b/lib/master/worker_entry.go @@ -6,6 +6,7 @@ import ( "time" "github.com/pingcap/tiflow/dm/pkg/log" + "go.uber.org/atomic" "go.uber.org/zap" libModel "github.com/hanfei1991/microcosm/lib/model" @@ -53,6 +54,8 @@ type workerEntry struct { expireAt time.Time state workerEntryState + receivedFinish atomic.Bool + statusMu sync.RWMutex status *libModel.WorkerStatus } @@ -97,7 +100,7 @@ func (e *workerEntry) MarkAsTombstone() { e.mu.Lock() defer e.mu.Unlock() - if e.state == workerEntryWait || e.state == workerEntryOffline { + if e.state == workerEntryWait || e.state == workerEntryOffline || e.IsFinished() { // Only workerEntryWait and workerEntryOffline are allowed // to transition to workerEntryTombstone. e.state = workerEntryTombstone @@ -167,3 +170,11 @@ func (e *workerEntry) ExpireTime() time.Time { return e.expireAt } + +func (e *workerEntry) SetFinished() { + e.receivedFinish.Store(true) +} + +func (e *workerEntry) IsFinished() bool { + return e.receivedFinish.Load() +} diff --git a/lib/master/worker_manager.go b/lib/master/worker_manager.go index 5072d020..7c6e7623 100644 --- a/lib/master/worker_manager.go +++ b/lib/master/worker_manager.go @@ -187,16 +187,17 @@ func (m *WorkerManager) InitAfterRecover(ctx context.Context) (retErr error) { zap.Duration("duration", m.clock.Since(startTime))) case <-timer.C: // Wait for the worker timeout to expire - m.mu.Lock() - for _, entry := range m.workerEntries { - if entry.State() == workerEntryWait { - entry.MarkAsTombstone() - } - } - m.mu.Unlock() } + m.mu.Lock() + for _, entry := range m.workerEntries { + if entry.State() == workerEntryWait || entry.IsFinished() { + entry.MarkAsTombstone() + } + } m.state = workerManagerReady + m.mu.Unlock() + return nil } @@ -222,6 +223,10 @@ func (m *WorkerManager) HandleHeartbeat(msg *libModel.HeartbeatPingMessage, from return } + if msg.IsFinished { + entry.SetFinished() + } + entry.SetExpireTime(m.nextExpireTime()) if m.state == workerManagerWaitingHeartbeat { @@ -463,12 +468,14 @@ func (m *WorkerManager) checkWorkerEntriesOnce() error { continue } - if entry.ExpireTime().After(m.clock.Now()) { - // Not timed out + hasTimedOut := entry.ExpireTime().Before(m.clock.Now()) + shouldGoOffline := hasTimedOut || entry.IsFinished() + if !shouldGoOffline { continue } - // The worker has timed out. + // The worker has timed out, or has received a heartbeat + // with IsFinished == true. entry.MarkAsOffline() var offlineError error diff --git a/lib/master/worker_manager_test.go b/lib/master/worker_manager_test.go index 36cd2376..da77ce6b 100644 --- a/lib/master/worker_manager_test.go +++ b/lib/master/worker_manager_test.go @@ -36,12 +36,13 @@ func (s *workerManageTestSuite) AdvanceClockBy(duration time.Duration) { } func (s *workerManageTestSuite) SimulateHeartbeat( - workerID libModel.WorkerID, epoch libModel.Epoch, node p2p.NodeID, + workerID libModel.WorkerID, epoch libModel.Epoch, node p2p.NodeID, isFinished bool, ) { s.manager.HandleHeartbeat(&libModel.HeartbeatPingMessage{ SendTime: s.clock.Mono(), FromWorkerID: workerID, Epoch: epoch, + IsFinished: isFinished, }, node) } @@ -218,12 +219,14 @@ func NewWorkerManageTestSuite(isInit bool) *workerManageTestSuite { } func TestCreateWorkerAndWorkerOnline(t *testing.T) { + t.Parallel() + suite := NewWorkerManageTestSuite(true) suite.manager.BeforeStartingWorker("worker-1", "executor-1") - suite.SimulateHeartbeat("worker-1", 1, "executor-1") - suite.SimulateHeartbeat("worker-1", 1, "executor-1") - suite.SimulateHeartbeat("worker-1", 1, "executor-1") + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) event := suite.WaitForEvent(t, "worker-1") require.Equal(t, workerOnlineEvent, event.Tp) @@ -231,6 +234,8 @@ func TestCreateWorkerAndWorkerOnline(t *testing.T) { } func TestCreateWorkerAndWorkerTimesOut(t *testing.T) { + t.Parallel() + suite := NewWorkerManageTestSuite(true) suite.manager.BeforeStartingWorker("worker-1", "executor-1") suite.AdvanceClockBy(30 * time.Second) @@ -246,10 +251,12 @@ func TestCreateWorkerAndWorkerTimesOut(t *testing.T) { } func TestCreateWorkerAndWorkerStatusUpdatedAndTimesOut(t *testing.T) { + t.Parallel() + suite := NewWorkerManageTestSuite(true) suite.manager.BeforeStartingWorker("worker-1", "executor-1") - suite.SimulateHeartbeat("worker-1", 1, "executor-1") + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) event := suite.WaitForEvent(t, "worker-1") require.Equal(t, workerOnlineEvent, event.Tp) @@ -273,6 +280,8 @@ func TestCreateWorkerAndWorkerStatusUpdatedAndTimesOut(t *testing.T) { } func TestRecoverAfterFailover(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -302,9 +311,9 @@ func TestRecoverAfterFailover(t *testing.T) { }() require.Eventually(t, func() bool { - suite.SimulateHeartbeat("worker-1", 1, "executor-1") - suite.SimulateHeartbeat("worker-2", 1, "executor-2") - suite.SimulateHeartbeat("worker-3", 1, "executor-3") + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) + suite.SimulateHeartbeat("worker-2", 1, "executor-2", false) + suite.SimulateHeartbeat("worker-3", 1, "executor-3", false) select { case <-doneCh: @@ -313,7 +322,7 @@ func TestRecoverAfterFailover(t *testing.T) { } suite.AdvanceClockBy(1 * time.Second) return false - }, 1*time.Second, 10*time.Millisecond) + }, 5*time.Second, 10*time.Millisecond) require.True(t, suite.manager.IsInitialized()) require.Len(t, suite.manager.GetWorkers(), 4) @@ -329,6 +338,8 @@ func TestRecoverAfterFailover(t *testing.T) { } func TestRecoverAfterFailoverFast(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -346,8 +357,7 @@ func TestRecoverAfterFailoverFast(t *testing.T) { }() require.Eventually(t, func() bool { - suite.SimulateHeartbeat("worker-1", 1, "executor-1") - + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) select { case <-doneCh: return true @@ -363,6 +373,8 @@ func TestRecoverAfterFailoverFast(t *testing.T) { } func TestRecoverWithNoWorker(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -379,6 +391,8 @@ func TestRecoverWithNoWorker(t *testing.T) { } func TestCleanTombstone(t *testing.T) { + t.Parallel() + ctx := context.Background() suite := NewWorkerManageTestSuite(true) @@ -408,3 +422,78 @@ func TestCleanTombstone(t *testing.T) { suite.Close() } + +func TestWorkerGracefulExit(t *testing.T) { + t.Parallel() + + suite := NewWorkerManageTestSuite(true) + suite.manager.BeforeStartingWorker("worker-1", "executor-1") + + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) + suite.SimulateHeartbeat("worker-1", 1, "executor-1", false) + + event := suite.WaitForEvent(t, "worker-1") + require.Equal(t, workerOnlineEvent, event.Tp) + + suite.SimulateHeartbeat("worker-1", 1, "executor-1", true) + event = suite.WaitForEvent(t, "worker-1") + require.Equal(t, workerOfflineEvent, event.Tp) + + suite.Close() +} + +func TestWorkerGracefulExitOnFirstHeartbeat(t *testing.T) { + t.Parallel() + + suite := NewWorkerManageTestSuite(true) + suite.manager.BeforeStartingWorker("worker-1", "executor-1") + + suite.SimulateHeartbeat("worker-1", 1, "executor-1", true) + + // Now we expect there to be both workerOnlineEvent and workerOfflineEvent, + // in that order. + event := suite.WaitForEvent(t, "worker-1") + require.Equal(t, workerOnlineEvent, event.Tp) + event = suite.WaitForEvent(t, "worker-1") + require.Equal(t, workerOfflineEvent, event.Tp) + + suite.Close() +} + +func TestWorkerGracefulExitAfterFailover(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + suite := NewWorkerManageTestSuite(false) + err := suite.PutMeta("worker-1", &libModel.WorkerStatus{ + Code: libModel.WorkerStatusNormal, + }) + require.NoError(t, err) + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + err := suite.manager.InitAfterRecover(ctx) + require.NoError(t, err) + }() + + require.Eventually(t, func() bool { + suite.SimulateHeartbeat("worker-1", 1, "executor-1", true) + select { + case <-doneCh: + return true + default: + } + suite.AdvanceClockBy(1 * time.Second) + return false + }, 1*time.Second, 10*time.Millisecond) + + require.True(t, suite.manager.IsInitialized()) + require.Len(t, suite.manager.GetWorkers(), 1) + require.Contains(t, suite.manager.GetWorkers(), "worker-1") + require.NotNil(t, suite.manager.GetWorkers()["worker-1"].GetTombstone()) + suite.Close() +} diff --git a/lib/model/messages.go b/lib/model/messages.go index 0736f758..53309c71 100644 --- a/lib/model/messages.go +++ b/lib/model/messages.go @@ -29,6 +29,7 @@ type HeartbeatPingMessage struct { SendTime clock.MonotonicTime `json:"send-time"` FromWorkerID WorkerID `json:"from-worker-id"` Epoch Epoch `json:"epoch"` + IsFinished bool `json:"is-finished"` } // HeartbeatPongMessage ships information in heartbeat pong @@ -37,6 +38,7 @@ type HeartbeatPongMessage struct { ReplyTime time.Time `json:"reply-time"` ToWorkerID WorkerID `json:"to-worker-id"` Epoch Epoch `json:"epoch"` + IsFinished bool `json:"is-finished"` } // StatusChangeRequest ships information when updating worker status diff --git a/lib/worker.go b/lib/worker.go index bfa844ea..cacd28f1 100644 --- a/lib/worker.go +++ b/lib/worker.go @@ -8,6 +8,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/pkg/workerpool" + "go.uber.org/atomic" "go.uber.org/dig" "go.uber.org/zap" @@ -81,6 +82,14 @@ type BaseWorker interface { Exit(ctx context.Context, status libModel.WorkerStatus, err error) error } +type workerExitFsmState = int32 + +const ( + workerNormal = workerExitFsmState(iota + 1) + workerHalfExit + workerExited +) + // DefaultBaseWorker implements BaseWorker interface, it also embeds an Impl // which implements the WorkerImpl interface and passed from business logic. type DefaultBaseWorker struct { @@ -114,6 +123,8 @@ type DefaultBaseWorker struct { cancelBgTasks context.CancelFunc cancelPool context.CancelFunc + exitController *workerExitController + clock clock.Clock // user metastore prefix kvclient @@ -233,6 +244,7 @@ func (w *DefaultBaseWorker) doPreInit(ctx context.Context) error { })) }) + w.exitController = newWorkerExitController(w.masterClient, w.errCenter, w.clock) w.workerMetaClient = metadata.NewWorkerMetadataClient(w.masterID, w.frameMetaClient) w.statusSender = statusutil.NewWriter( @@ -269,11 +281,12 @@ func (w *DefaultBaseWorker) doPostInit(ctx context.Context) error { } func (w *DefaultBaseWorker) doPoll(ctx context.Context) error { - if err := w.messageHandlerManager.CheckError(ctx); err != nil { + err := w.exitController.PollExit() + if err != nil { return err } - if err := w.errCenter.CheckError(); err != nil { + if err := w.messageHandlerManager.CheckError(ctx); err != nil { return err } @@ -285,11 +298,14 @@ func (w *DefaultBaseWorker) Poll(ctx context.Context) error { ctx = w.errCenter.WithCancelOnFirstError(ctx) if err := w.doPoll(ctx); err != nil { - return errors.Trace(err) + if derror.ErrWorkerHalfExit.NotEqual(err) { + return err + } + return nil } if err := w.Impl.Tick(ctx); err != nil { - return errors.Trace(err) + w.errCenter.OnError(err) } return nil } @@ -390,7 +406,6 @@ func (w *DefaultBaseWorker) Exit(ctx context.Context, status libModel.WorkerStat func (w *DefaultBaseWorker) startBackgroundTasks() { ctx, cancel := context.WithCancel(context.Background()) - ctx = w.errCenter.WithCancelOnFirstError(ctx) w.cancelMu.Lock() w.cancelBgTasks = cancel @@ -415,12 +430,21 @@ func (w *DefaultBaseWorker) startBackgroundTasks() { func (w *DefaultBaseWorker) runHeartbeatWorker(ctx context.Context) error { ticker := w.clock.Ticker(w.timeoutConfig.WorkerHeartbeatInterval) + defer ticker.Stop() + for { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) case <-ticker.C: - if err := w.masterClient.SendHeartBeat(ctx, w.clock); err != nil { + isFinished := false + if w.exitController.IsExiting() { + // If we are in the state workerHalfExit, + // we need to notify the master so that the master + // marks us as exited. + isFinished = true + } + if err := w.masterClient.SendHeartBeat(ctx, w.clock, isFinished); err != nil { return errors.Trace(err) } } @@ -441,7 +465,9 @@ func (w *DefaultBaseWorker) runWatchDog(ctx context.Context) error { return errors.Trace(err) } if !isNormal { - return derror.ErrWorkerSuicide.GenWithStackByArgs(w.masterClient.MasterID()) + errOut := derror.ErrWorkerSuicide.GenWithStackByArgs(w.masterClient.MasterID()) + w.exitController.ForceExit(errOut) + return errOut } } } @@ -516,6 +542,10 @@ type masterClient struct { frameMetaClient pkgOrm.Client lastMasterAckedPingTime clock.MonotonicTime + // masterSideClosed records whether the master + // has marked us as closed + masterSideClosed atomic.Bool + timeoutConfig config.TimeoutConfig onMasterFailOver func() error @@ -619,6 +649,10 @@ func (m *masterClient) HandleHeartbeat(sender p2p.NodeID, msg *libModel.Heartbea m.masterEpoch = msg.Epoch m.masterNode = sender } + + if msg.IsFinished { + m.masterSideClosed.Store(true) + } m.lastMasterAckedPingTime = msg.SendTime } @@ -644,7 +678,7 @@ func (m *masterClient) CheckMasterTimeout(ctx context.Context, clock clock.Clock return false, nil } -func (m *masterClient) SendHeartBeat(ctx context.Context, clock clock.Clock) error { +func (m *masterClient) SendHeartBeat(ctx context.Context, clock clock.Clock, isFinished bool) error { m.mu.RLock() defer m.mu.RUnlock() @@ -657,6 +691,7 @@ func (m *masterClient) SendHeartBeat(ctx context.Context, clock clock.Clock) err SendTime: sendTime, FromWorkerID: m.workerID, Epoch: m.masterEpoch, + IsFinished: isFinished, } log.L().Debug("sending heartbeat", zap.String("worker", m.workerID)) @@ -672,9 +707,91 @@ func (m *masterClient) SendHeartBeat(ctx context.Context, clock clock.Clock) err return nil } +func (m *masterClient) IsMasterSideClosed() bool { + return m.masterSideClosed.Load() +} + // used in unit test only func (m *masterClient) getLastMasterAckedPingTime() clock.MonotonicTime { m.mu.RLock() defer m.mu.RUnlock() return m.lastMasterAckedPingTime } + +const ( + workerExitWaitForMasterTimeout = time.Second * 15 +) + +// workerExitController implements the exit sequence of +// a worker. This object is thread-safe. +// TODO move this to a separate file or package. +type workerExitController struct { + workerExitFsm atomic.Int32 + halfExitTime atomic.Time + errCenter *errctx.ErrCenter + masterClient *masterClient + + // clock is to facilitate unit testing. + clock clock.Clock +} + +func newWorkerExitController( + masterClient *masterClient, + errCenter *errctx.ErrCenter, + clock clock.Clock, +) *workerExitController { + return &workerExitController{ + workerExitFsm: *atomic.NewInt32(workerNormal), + errCenter: errCenter, + masterClient: masterClient, + clock: clock, + } +} + +// PollExit is called in each tick of the worker. +// Returning an error other than ErrWorkerHalfExit +// means that the worker is ready to exit. +func (c *workerExitController) PollExit() error { + err := c.errCenter.CheckError() + if err == nil { + return nil + } + + switch c.workerExitFsm.Load() { + case workerNormal: + c.workerExitFsm.CAS(workerNormal, workerHalfExit) + c.halfExitTime.Store(c.clock.Now()) + return derror.ErrWorkerHalfExit.FastGenByArgs() + case workerHalfExit: + if c.masterClient.IsMasterSideClosed() { + c.workerExitFsm.Store(workerExited) + return err + } + sinceStartExiting := c.clock.Since(c.halfExitTime.Load()) + if sinceStartExiting > workerExitWaitForMasterTimeout { + // TODO log worker ID and master ID. + log.L().Warn("Exiting worker cannot get acknowledgement from master") + return err + } + return derror.ErrWorkerHalfExit.FastGenByArgs() + case workerExited: + return err + default: + log.L().Panic("unreachable") + } + return nil +} + +// ForceExit forces a quick exit without notifying the +// master. It should be used when suicide is required when +// we have lost contact with the master. +func (c *workerExitController) ForceExit(errIn error) { + c.errCenter.OnError(errIn) + c.workerExitFsm.Store(workerExited) +} + +// IsExiting indicates whether the worker is performing +// an exit sequence. +func (c *workerExitController) IsExiting() bool { + return c.workerExitFsm.Load() == workerHalfExit +} diff --git a/lib/worker_test.go b/lib/worker_test.go index a0ffd83a..c67d5f27 100644 --- a/lib/worker_test.go +++ b/lib/worker_test.go @@ -5,15 +5,16 @@ import ( "testing" "time" + "github.com/pingcap/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + runtime "github.com/hanfei1991/microcosm/executor/worker" "github.com/hanfei1991/microcosm/lib/config" libModel "github.com/hanfei1991/microcosm/lib/model" "github.com/hanfei1991/microcosm/lib/statusutil" "github.com/hanfei1991/microcosm/pkg/clock" pkgOrm "github.com/hanfei1991/microcosm/pkg/orm" - - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) var ( @@ -363,6 +364,126 @@ func TestWorkerSuicideAfterRuntimeDelay(t *testing.T) { require.Regexp(t, ".*Suicide.*", pollErr) } +func TestWorkerGracefulExit(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + worker := newMockWorkerImpl(workerID1, masterName) + worker.clock = clock.NewMock() + worker.clock.(*clock.Mock).Set(time.Now()) + putMasterMeta(ctx, t, worker.metaClient, &libModel.MasterMetaKVData{ + ID: masterName, + NodeID: masterNodeName, + Epoch: 1, + StatusCode: libModel.MasterStatusInit, + }) + + worker.On("InitImpl", mock.Anything).Return(nil) + worker.On("Status").Return(libModel.WorkerStatus{ + Code: libModel.WorkerStatusNormal, + }, nil) + + err := worker.Init(ctx) + require.NoError(t, err) + + worker.On("Tick", mock.Anything). + Return(errors.New("fake error")).Once() + + for { + err := worker.Poll(ctx) + require.NoError(t, err) + + // Make the heartbeat worker tick. + worker.clock.(*clock.Mock).Add(time.Second) + + rawMsg, ok := worker.messageSender.TryPop(masterNodeName, libModel.HeartbeatPingTopic(masterName)) + if !ok { + continue + } + msg := rawMsg.(*libModel.HeartbeatPingMessage) + if msg.IsFinished { + pongMsg := &libModel.HeartbeatPongMessage{ + SendTime: msg.SendTime, + ReplyTime: time.Now(), + ToWorkerID: workerID1, + Epoch: 1, + IsFinished: true, + } + + err := worker.messageHandlerManager.InvokeHandler( + t, + libModel.HeartbeatPongTopic(masterName, workerID1), + masterNodeName, + pongMsg, + ) + require.NoError(t, err) + break + } + } + + err = worker.Poll(ctx) + require.Error(t, err) + require.Regexp(t, ".*fake error.*", err) +} + +func TestWorkerGracefulExitWhileTimeout(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + worker := newMockWorkerImpl(workerID1, masterName) + worker.clock = clock.NewMock() + worker.clock.(*clock.Mock).Set(time.Now()) + putMasterMeta(ctx, t, worker.metaClient, &libModel.MasterMetaKVData{ + ID: masterName, + NodeID: masterNodeName, + Epoch: 1, + StatusCode: libModel.MasterStatusInit, + }) + + worker.On("InitImpl", mock.Anything).Return(nil) + worker.On("Status").Return(libModel.WorkerStatus{ + Code: libModel.WorkerStatusNormal, + }, nil) + + err := worker.Init(ctx) + require.NoError(t, err) + + worker.On("Tick", mock.Anything). + Return(errors.New("fake error")).Once() + + for { + err := worker.Poll(ctx) + require.NoError(t, err) + + // Make the heartbeat worker tick. + worker.clock.(*clock.Mock).Add(time.Second) + + rawMsg, ok := worker.messageSender.TryPop(masterNodeName, libModel.HeartbeatPingTopic(masterName)) + if !ok { + continue + } + msg := rawMsg.(*libModel.HeartbeatPingMessage) + if msg.IsFinished { + break + } + } + + for { + err := worker.Poll(ctx) + worker.clock.(*clock.Mock).Add(time.Second) + + if err != nil { + require.Regexp(t, ".*fake error.*", err) + break + } + time.Sleep(10 * time.Millisecond) + } +} + func TestCloseBeforeInit(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 41b2c417..db9881e5 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -72,6 +72,7 @@ var ( ErrWorkerFinish = errors.Normalize("worker finished and exited", errors.RFCCodeText("DFLOW:ErrWorkerFinish")) ErrWorkerStop = errors.Normalize("worker is stopped", errors.RFCCodeText("DFLOW:ErrWorkerStop")) ErrTooManyStatusUpdates = errors.Normalize("there are too many pending worker status updates: %d", errors.RFCCodeText("DFLOW:ErrTooManyStatusUpdates")) + ErrWorkerHalfExit = errors.Normalize("the worker is in half-exited state", errors.RFCCodeText("DFLOW:ErrWorkerHalfExit")) // master etcd related errors ErrMasterEtcdCreateSessionFail = errors.Normalize("failed to create Etcd session", errors.RFCCodeText("DFLOW:ErrMasterEtcdCreateSessionFail")) diff --git a/sample/config/etcd.yml b/sample/config/etcd.yml index 37f31204..19625c06 100644 --- a/sample/config/etcd.yml +++ b/sample/config/etcd.yml @@ -135,6 +135,3 @@ log-outputs: [stderr] # Force to create a new one member cluster. force-new-cluster: false - -auto-compaction-mode: revision -auto-compaction-retention: "100" diff --git a/test/e2e/e2e_worker_exit_test.go b/test/e2e/e2e_worker_exit_test.go new file mode 100644 index 00000000..f5c937fc --- /dev/null +++ b/test/e2e/e2e_worker_exit_test.go @@ -0,0 +1,68 @@ +package e2e_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/hanfei1991/microcosm/lib/fake" + "github.com/hanfei1991/microcosm/pb" + "github.com/hanfei1991/microcosm/test/e2e" +) + +func TestWorkerExit(t *testing.T) { + // TODO: make the following variables configurable + var ( + masterAddrs = []string{"127.0.0.1:10245", "127.0.0.1:10246", "127.0.0.1:10247"} + userMetaAddrs = []string{"127.0.0.1:12479"} + userMetaAddrsInContainer = []string{"user-etcd-standalone:2379"} + ) + + ctx := context.Background() + cfg := &fake.Config{ + JobName: "test-node-failure", + WorkerCount: 4, + // use a large enough target tick to ensure the fake job long running + TargetTick: 10000000, + EtcdWatchEnable: true, + EtcdEndpoints: userMetaAddrsInContainer, + EtcdWatchPrefix: "/fake-job/test/", + + InjectErrorInterval: time.Second * 3, + } + cfgBytes, err := json.Marshal(cfg) + require.NoError(t, err) + + fakeJobCfg := &e2e.FakeJobConfig{ + EtcdEndpoints: userMetaAddrs, // reuse user meta KV endpoints + WorkerCount: cfg.WorkerCount, + KeyPrefix: cfg.EtcdWatchPrefix, + } + cli, err := e2e.NewUTCli(ctx, masterAddrs, userMetaAddrs, fakeJobCfg) + require.NoError(t, err) + + jobID, err := cli.CreateJob(ctx, pb.JobType_FakeJob, cfgBytes) + require.NoError(t, err) + + require.Eventually(t, func() bool { + // check tick increases to ensure all workers are online + // TODO modify the test case to use a "restart-count" as a terminating condition. + targetTick := int64(1000) + for jobIdx := 0; jobIdx < cfg.WorkerCount; jobIdx++ { + err := cli.CheckFakeJobTick(ctx, jobID, jobIdx, targetTick) + if err != nil { + log.L().Warn("check fake job tick failed", zap.Error(err)) + return false + } + } + return true + }, time.Second*300, time.Second*2) + + err = cli.PauseJob(ctx, jobID) + require.NoError(t, err) +}