Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: change interface of lightning package wrappers #53233

Merged
merged 13 commits into from
May 16, 2024
Merged
6 changes: 6 additions & 0 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,12 @@ func (dc *ddlCtx) writePhysicalTableRecord(
return errors.Trace(err)
}
defer scheduler.close(true)
if lit, ok := scheduler.(*ingestBackfillScheduler); ok {
if lit.finishedWritingNeedImport() {
return nil
}
}

err = scheduler.setupWorkers()
if err != nil {
return errors.Trace(err)
Expand Down
44 changes: 31 additions & 13 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(Index
}()
w.indexIngestBaseWorker.HandleTask(rs, send)
// needs to flush and import to avoid too much use of disk.
_, _, _, err := ingest.TryFlushAllIndexes(w.backendCtx, ingest.FlushModeAuto, w.indexIDs)
_, _, _, err := w.backendCtx.Flush(ingest.FlushModeAuto)
if err != nil {
w.ctx.onError(err)
return
Expand Down Expand Up @@ -726,8 +726,14 @@ func (w *indexIngestBaseWorker) initSessCtx() {
}

func (w *indexIngestBaseWorker) Close() {
// TODO(lance6716): unify the real write action for engineInfo and external
// writer.
for _, writer := range w.writers {
err := writer.Close(w.ctx)
ew, ok := writer.(*external.Writer)
if !ok {
break
}
err := ew.Close(w.ctx)
if err != nil {
w.ctx.onError(err)
}
Expand Down Expand Up @@ -827,24 +833,36 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Inject("mockFlushError", func(_ failpoint.Value) {
failpoint.Return(errors.New("mock flush error"))
})
for _, index := range s.indexes {
idxInfo := index.Meta()
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceFlushAndImport)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
return err
_, _, errIdxID, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
var idxInfo table.Index
for _, idx := range s.indexes {
if idx.Meta().ID == errIdxID {
idxInfo = idx
break
}
}
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
if idxInfo == nil {
logutil.Logger(s.ctx).Error("index not found", zap.Int64("indexID", errIdxID))
return kv.ErrKeyExists
}
return convertToKeyExistsErr(err, idxInfo.Meta(), s.tbl.Meta())
}
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
}
return nil
}

func (s *indexWriteResultSink) Close() error {
return s.errGroup.Wait()
err := s.errGroup.Wait()
// for local pipeline
if bc := s.backendCtx; bc != nil {
bc.UnregisterEngines()
}
return err
}

func (*indexWriteResultSink) String() string {
Expand Down
25 changes: 11 additions & 14 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if opCtx.OperatorErr() != nil {
return opCtx.OperatorErr()
}
if err != nil {
return err
}

r.bc.ResetWorkers(r.job.ID)
return nil
return err
}

func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
Expand Down Expand Up @@ -226,15 +221,17 @@ func (r *readIndexExecutor) buildLocalStorePipeline(
return nil, err
}
d := r.d
engines := make([]ingest.Engine, 0, len(r.indexes))
indexIDs := make([]int64, 0, len(r.indexes))
for _, index := range r.indexes {
ei, err := r.bc.Register(r.job.ID, index.ID, r.job.SchemaName, r.job.TableName)
if err != nil {
tidblogutil.Logger(opCtx).Warn("cannot register new engine", zap.Error(err),
zap.Int64("job ID", r.job.ID), zap.Int64("index ID", index.ID))
return nil, err
}
engines = append(engines, ei)
indexIDs = append(indexIDs, index.ID)
}
engines, err := r.bc.Register(indexIDs, r.job.TableName)
if err != nil {
tidblogutil.Logger(opCtx).Error("cannot register new engine",
zap.Error(err),
zap.Int64("job ID", r.job.ID),
zap.Int64s("index IDs", indexIDs))
return nil, err
}
counter := metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
Expand Down
58 changes: 32 additions & 26 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ func newIngestBackfillScheduler(
}, nil
}

func (b *ingestBackfillScheduler) finishedWritingNeedImport() bool {
job := b.reorgInfo.Job
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if !ok {
return false
}
return bc.FinishedWritingNeedImport()
}

func (b *ingestBackfillScheduler) setupWorkers() error {
job := b.reorgInfo.Job
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
Expand All @@ -371,10 +380,26 @@ func (b *ingestBackfillScheduler) setupWorkers() error {
if err != nil {
return errors.Trace(err)
}

indexIDs := make([]int64, 0, len(b.reorgInfo.elements))
for _, e := range b.reorgInfo.elements {
indexIDs = append(indexIDs, e.ID)
}
engines, err := b.backendCtx.Register(indexIDs, job.TableName)
if err != nil {
return errors.Trace(err)
}

b.copReqSenderPool = copReqSenderPool
readerCnt, writerCnt := b.expectedWorkerSize()
writerPool := workerpool.NewWorkerPool[IndexRecordChunk]("ingest_writer",
poolutil.DDL, writerCnt, b.createWorker)
writerPool := workerpool.NewWorkerPool[IndexRecordChunk](
"ingest_writer",
poolutil.DDL,
writerCnt,
func() workerpool.Worker[IndexRecordChunk, workerpool.None] {
return b.createWorker(indexIDs, engines)
},
)
writerPool.Start(b.ctx)
b.writerPool = writerPool
b.copReqSenderPool.chunkSender = writerPool
Expand Down Expand Up @@ -406,13 +431,9 @@ func (b *ingestBackfillScheduler) close(force bool) {
})
}
close(b.resultCh)
if intest.InTest && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() {
if intest.InTest && b.copReqSenderPool != nil && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() {
panic(fmt.Sprintf("unexpected chunk size %d", len(b.copReqSenderPool.srcChkPool)))
}
if !force {
jobID := b.reorgInfo.ID
b.backendCtx.ResetWorkers(jobID)
}
}

func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) error {
Expand Down Expand Up @@ -446,32 +467,17 @@ func (b *ingestBackfillScheduler) adjustWorkerSize() error {
return nil
}

func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordChunk, workerpool.None] {
func (b *ingestBackfillScheduler) createWorker(
indexIDs []int64,
engines []ingest.Engine,
) workerpool.Worker[IndexRecordChunk, workerpool.None] {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
b.sendResult(&backfillResult{err: err})
return nil
}
bcCtx := b.backendCtx
indexIDs := make([]int64, 0, len(reorgInfo.elements))
engines := make([]ingest.Engine, 0, len(reorgInfo.elements))
for _, elem := range reorgInfo.elements {
ei, err := bcCtx.Register(job.ID, elem.ID, job.SchemaName, job.TableName)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
b.sendResult(&backfillResult{err: err})
return nil
}
logutil.Logger(b.ctx).Warn("cannot create new writer", zap.Error(err),
zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", elem.ID))
return nil
}
indexIDs = append(indexIDs, elem.ID)
engines = append(engines, ei)
}

worker, err := newAddIndexIngestWorker(
b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID,
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
"engine.go",
"engine_mgr.go",
"env.go",
"flush.go",
"mem_root.go",
"message.go",
"mock.go",
Expand Down
Loading