Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: add node failure test #355

Merged
merged 14 commits into from
May 7, 2022
121 changes: 82 additions & 39 deletions lib/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type Config struct {
}

type Checkpoint struct {
Ticks map[int]int64 `json:"ticks"`
EtcdCheckpoints map[int]EtcdCheckpoint `json:"etcd-checkpoints"`
Ticks map[int]int64 `json:"ticks"`
Checkpoints map[int]workerCheckpoint `json:"checkpoints"`
}

func (cp *Checkpoint) String() string {
Expand All @@ -60,31 +60,52 @@ func (cp *Checkpoint) String() string {
return string(data)
}

// workerCheckpoint is used to resume a new worker from old checkpoint
type workerCheckpoint struct {
Tick int64 `json:"tick"`
Revision int64 `json:"revision"`
MvccCount int `json:"mvcc-count"`
Value string `json:"value"`
}

func zeroWorkerCheckpoint() workerCheckpoint {
return workerCheckpoint{}
}

var _ lib.BaseJobMaster = (*Master)(nil)

type Master struct {
lib.BaseJobMaster

config *Config
bStatus *businessStatus

// workerID stores the ID of the Master AS A WORKER.
workerID libModel.WorkerID

workerListMu sync.Mutex
workerList []lib.WorkerHandle
workerID2BusinessID map[libModel.WorkerID]int
pendingWorkerSet map[libModel.WorkerID]int
statusRateLimiter *rate.Limiter
status map[libModel.WorkerID]*dummyWorkerStatus
finishedSet map[libModel.WorkerID]int
config *Config
statusCode struct {

// worker status
statusRateLimiter *rate.Limiter
statusCode struct {
sync.RWMutex
code libModel.WorkerStatusCode
}

ctx context.Context
clocker clock.Clock
initialized bool
}

type businessStatus struct {
sync.RWMutex
status map[libModel.WorkerID]*dummyWorkerStatus
}

func (m *Master) OnJobManagerFailover(reason lib.MasterFailoverReason) error {
log.L().Info("FakeMaster: OnJobManagerFailover", zap.Any("reason", reason))
return nil
Expand Down Expand Up @@ -146,7 +167,7 @@ func (m *Master) Workload() model.RescUnit {

func (m *Master) InitImpl(ctx context.Context) error {
log.L().Info("FakeMaster: Init", zap.Any("config", m.config))
return m.createWorkers()
return m.initWorkers()
}

// This function is not thread safe, it must be called with m.workerListMu locked
Expand All @@ -163,7 +184,7 @@ func (m *Master) createWorker(wcfg *WorkerConfig) error {
return nil
}

func (m *Master) createWorkers() error {
func (m *Master) initWorkers() error {
m.workerListMu.Lock()
defer m.workerListMu.Unlock()
OUT:
Expand All @@ -174,7 +195,7 @@ OUT:
continue OUT
}
}
wcfg := m.genWorkerConfig(i, 0, 0)
wcfg := m.genWorkerConfig(i, zeroWorkerCheckpoint())
err := m.createWorker(wcfg)
if err != nil {
return err
Expand Down Expand Up @@ -235,14 +256,15 @@ func (m *Master) tickedCheckWorkers(ctx context.Context) error {
for i, worker := range m.workerList {
// create new worker for non-active worker
if worker == nil {
var startTick, startRevision int64
workerCkpt := zeroWorkerCheckpoint()
if tick, ok := ckpt.Ticks[i]; ok {
startTick = tick
workerCkpt.Tick = tick
}
if etcdCkpt, ok := ckpt.EtcdCheckpoints[i]; ok {
startRevision = etcdCkpt.Revision
if etcdCkpt, ok := ckpt.Checkpoints[i]; ok {
workerCkpt.Revision = etcdCkpt.Revision
workerCkpt.MvccCount = etcdCkpt.MvccCount
}
wcfg := m.genWorkerConfig(i, startTick, startRevision)
wcfg := m.genWorkerConfig(i, workerCkpt)
if err := m.createWorker(wcfg); err != nil {
return errors.Trace(err)
}
Expand All @@ -262,7 +284,9 @@ func (m *Master) tickedCheckWorkers(ctx context.Context) error {
return err
}
}
m.status[worker.ID()] = dws
m.bStatus.Lock()
m.bStatus.status[worker.ID()] = dws
m.bStatus.Unlock()
}
}

Expand All @@ -271,7 +295,9 @@ func (m *Master) tickedCheckWorkers(ctx context.Context) error {

func (m *Master) tickedCheckStatus(ctx context.Context) error {
if m.statusRateLimiter.Allow() {
log.L().Info("FakeMaster: Tick", zap.Any("status", m.status))
m.bStatus.RLock()
log.L().Info("FakeMaster: Tick", zap.Any("status", m.bStatus.status))
m.bStatus.RUnlock()
// save checkpoint, which is used in business only
_, metaErr := m.MetaKVClient().Put(ctx, CheckpointKey(m.workerID), m.genCheckpoint().String())
if metaErr != nil {
Expand Down Expand Up @@ -352,6 +378,10 @@ func (m *Master) OnWorkerOffline(worker lib.WorkerHandle, reason error) error {
return errors.Errorf("worker(%s) is not found in worker list", worker.ID())
}

m.bStatus.Lock()
delete(m.bStatus.status, worker.ID())
m.bStatus.Unlock()

if derrors.ErrWorkerFinish.Equal(reason) {
log.L().Info("FakeMaster: OnWorkerOffline: worker finished", zap.String("worker-id", worker.ID()))
m.finishedSet[worker.ID()] = index
Expand All @@ -360,16 +390,17 @@ func (m *Master) OnWorkerOffline(worker lib.WorkerHandle, reason error) error {

log.L().Info("FakeMaster: OnWorkerOffline",
zap.String("worker-id", worker.ID()), zap.Error(reason))
var startTick, startRevision int64
workerCkpt := zeroWorkerCheckpoint()
if ws, err := parseExtBytes(worker.Status().ExtBytes); err != nil {
log.L().Warn("failed to parse worker ext bytes", zap.Error(err))
} else {
startTick = ws.Tick
if ws.EtcdCheckpoint != nil {
startRevision = ws.EtcdCheckpoint.Revision
workerCkpt.Tick = ws.Tick
if ws.Checkpoint != nil {
workerCkpt.Revision = ws.Checkpoint.Revision
workerCkpt.MvccCount = ws.Checkpoint.MvccCount
}
}
wcfg := m.genWorkerConfig(index, startTick, startRevision)
wcfg := m.genWorkerConfig(index, workerCkpt)
m.workerListMu.Lock()
defer m.workerListMu.Unlock()
return m.createWorker(wcfg)
Expand Down Expand Up @@ -401,14 +432,21 @@ func (m *Master) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) erro
return nil
}

func (m *Master) Status() libModel.WorkerStatus {
bytes, err := json.Marshal(m.status)
func (m *Master) marshalBusinessStatus() []byte {
m.bStatus.RLock()
defer m.bStatus.RUnlock()
bytes, err := json.Marshal(m.bStatus.status)
if err != nil {
log.L().Panic("unexpected marshal error", zap.Error(err))
}
return bytes
}

func (m *Master) Status() libModel.WorkerStatus {
extBytes := m.marshalBusinessStatus()
return libModel.WorkerStatus{
Code: m.getStatusCode(),
ExtBytes: bytes,
ExtBytes: extBytes,
}
}

Expand Down Expand Up @@ -438,31 +476,36 @@ func (m *Master) genCheckpoint() *Checkpoint {
m.workerListMu.Lock()
defer m.workerListMu.Unlock()
cp := &Checkpoint{
Ticks: make(map[int]int64),
EtcdCheckpoints: make(map[int]EtcdCheckpoint),
Ticks: make(map[int]int64),
Checkpoints: make(map[int]workerCheckpoint),
}
for wid, status := range m.status {
m.bStatus.RLock()
defer m.bStatus.RUnlock()
for wid, status := range m.bStatus.status {
if businessID, ok := m.workerID2BusinessID[wid]; ok {
cp.Ticks[businessID] = status.Tick
if status.EtcdCheckpoint != nil {
cp.EtcdCheckpoints[businessID] = *status.EtcdCheckpoint
if status.Checkpoint != nil {
cp.Checkpoints[businessID] = *status.Checkpoint
} else {
cp.EtcdCheckpoints[businessID] = EtcdCheckpoint{}
cp.Checkpoints[businessID] = workerCheckpoint{}
}
}
}
return cp
}

func (m *Master) genWorkerConfig(index int, startTick, startRevision int64) *WorkerConfig {
func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *WorkerConfig {
return &WorkerConfig{
ID: index,
StartTick: startTick,
TargetTick: int64(m.config.TargetTick),
EtcdWatchEnable: m.config.EtcdWatchEnable,
EtcdEndpoints: m.config.EtcdEndpoints,
EtcdWatchPrefix: m.config.EtcdWatchPrefix,
EtcdWatchRevision: startRevision,
ID: index,

// generated from fake master config
TargetTick: int64(m.config.TargetTick),
EtcdWatchEnable: m.config.EtcdWatchEnable,
EtcdEndpoints: m.config.EtcdEndpoints,
EtcdWatchPrefix: m.config.EtcdWatchPrefix,

// loaded from checkpoint if exists
Checkpoint: checkpoint,
}
}

Expand All @@ -476,7 +519,7 @@ func NewFakeMaster(ctx *dcontext.Context, workerID libModel.WorkerID, masterID l
workerID2BusinessID: make(map[libModel.WorkerID]int),
config: masterConfig,
statusRateLimiter: rate.NewLimiter(rate.Every(time.Second*3), 1),
status: make(map[libModel.WorkerID]*dummyWorkerStatus),
bStatus: &businessStatus{status: make(map[libModel.WorkerID]*dummyWorkerStatus)},
finishedSet: make(map[libModel.WorkerID]int),
ctx: ctx.Context,
clocker: clock.New(),
Expand Down
45 changes: 23 additions & 22 deletions lib/fake/fake_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,12 @@ type (
WorkerConfig struct {
ID int `json:"id"`
TargetTick int64 `json:"target-tick"`
StartTick int64 `json:"start-tick"`

EtcdWatchEnable bool `json:"etcd-watch-enable"`
EtcdEndpoints []string `json:"etcd-endpoints"`
EtcdWatchPrefix string `json:"etcd-watch-prefix"`
EtcdWatchRevision int64 `json:"etcd-watch-revision"`
}
EtcdWatchEnable bool `json:"etcd-watch-enable"`
EtcdEndpoints []string `json:"etcd-endpoints"`
EtcdWatchPrefix string `json:"etcd-watch-prefix"`

EtcdCheckpoint struct {
Revision int64 `json:"revision"`
Mvcc int `json:"mvcc"` // record mvcc version count
Value string `json:"value"`
Checkpoint workerCheckpoint `json:"checkpoint"`
}

dummyWorker struct {
Expand All @@ -66,9 +60,9 @@ type (

type dummyWorkerStatus struct {
sync.RWMutex
BusinessID int `json:"business-id"`
Tick int64 `json:"tick"`
EtcdCheckpoint *EtcdCheckpoint `json:"etcd-checkpoint"`
BusinessID int `json:"business-id"`
Tick int64 `json:"tick"`
Checkpoint *workerCheckpoint `json:"checkpoint"`
}

func (s *dummyWorkerStatus) tick() {
Expand All @@ -77,16 +71,16 @@ func (s *dummyWorkerStatus) tick() {
s.Tick++
}

func (s *dummyWorkerStatus) getEtcdCheckpoint() EtcdCheckpoint {
func (s *dummyWorkerStatus) getEtcdCheckpoint() workerCheckpoint {
s.RLock()
defer s.RUnlock()
return *s.EtcdCheckpoint
return *s.Checkpoint
}

func (s *dummyWorkerStatus) setEtcdCheckpoint(ckpt *EtcdCheckpoint) {
func (s *dummyWorkerStatus) setEtcdCheckpoint(ckpt *workerCheckpoint) {
s.Lock()
defer s.Unlock()
s.EtcdCheckpoint = ckpt
s.Checkpoint = ckpt
}

func (s *dummyWorkerStatus) Marshal() ([]byte, error) {
Expand Down Expand Up @@ -237,7 +231,11 @@ watchLoop:
return errors.Trace(ctx.Err())
default:
}
ch := cli.Watch(ctx, key, clientv3.WithRev(d.status.getEtcdCheckpoint().Revision))
opts := make([]clientv3.OpOption, 0)
if d.status.getEtcdCheckpoint().Revision > 0 {
opts = append(opts, clientv3.WithRev(d.status.getEtcdCheckpoint().Revision+1))
}
ch := cli.Watch(ctx, key, opts...)
for resp := range ch {
if resp.Err() != nil {
log.L().Warn("watch met error", zap.Error(resp.Err()))
Expand All @@ -247,7 +245,7 @@ watchLoop:
// no concurrent write of this checkpoint, so it is safe to read
// old value, change it and overwrite.
ckpt := d.status.getEtcdCheckpoint()
ckpt.Mvcc++
ckpt.MvccCount++
ckpt.Revision = event.Kv.ModRevision
switch event.Type {
case mvccpb.PUT:
Expand All @@ -268,9 +266,12 @@ func NewDummyWorker(
) lib.WorkerImpl {
wcfg := cfg.(*WorkerConfig)
status := &dummyWorkerStatus{
BusinessID: wcfg.ID,
Tick: wcfg.StartTick,
EtcdCheckpoint: &EtcdCheckpoint{Revision: wcfg.EtcdWatchRevision},
BusinessID: wcfg.ID,
Tick: wcfg.Checkpoint.Tick,
Checkpoint: &workerCheckpoint{
Revision: wcfg.Checkpoint.Revision,
MvccCount: wcfg.Checkpoint.MvccCount,
},
}
return &dummyWorker{
statusRateLimiter: rate.NewLimiter(rate.Every(time.Second*3), 1),
Expand Down
2 changes: 1 addition & 1 deletion sample/config/executor.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
keepalive-ttl = "10s"
keepalive-ttl = "20s"
keepalive-interval = "500ms"
session-ttl = 20

Expand Down
Loading