From 83b8ab994abff1781190d678c6ebefe23f9b7e39 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 15 May 2024 18:12:51 +0800 Subject: [PATCH] address comment Signed-off-by: lance6716 --- pkg/ddl/ingest/backend.go | 20 +++++++++++--------- pkg/ddl/ingest/engine.go | 3 ++- pkg/ddl/ingest/engine_mgr.go | 4 +++- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index d4670bed36526..81d5d64779228 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -49,12 +49,20 @@ type BackendCtx interface { // BackendCtx. Register(indexIDs []int64, tableName string) ([]Engine, error) UnregisterEngines() + // FinishImport imports the engine of given index ID into the storage, collects + // the duplicate errors if the `unique` is true. The first call of FinishImport + // means no further data will be wrote to the engine. + // + // TODO(lance6716): refine the interface to let caller don't need to pass the + // indexID, and unify with CollectRemoteDuplicateRows. + FinishImport(indexID int64, unique bool, tbl table.Table) error // FinishedWritingNeedImport returns true only when all the engines are finished - // writing and only need import, that is to say, engines are closed. + // writing and only need import. Considering the calling usage of FinishImport, + // it will return true after a successful call of FinishImport and may return + // true after a failed call of FinishImport. FinishedWritingNeedImport() bool CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error - FinishImport(indexID int64, unique bool, tbl table.Table) error FlushController Done() bool SetDone() @@ -192,15 +200,9 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID } defer bc.flushing.Store(false) - unlockFn := make([]func(), 0, len(bc.engines)) - defer func() { - for _, fn := range unlockFn { - fn() - } - }() for indexID, ei := range bc.engines { ei.flushLock.Lock() - unlockFn = append(unlockFn, ei.flushLock.Unlock) + defer ei.flushLock.Unlock() if err = ei.Flush(); err != nil { return false, false, indexID, err diff --git a/pkg/ddl/ingest/engine.go b/pkg/ddl/ingest/engine.go index 65b9e656c14b3..1d3181f9b9359 100644 --- a/pkg/ddl/ingest/engine.go +++ b/pkg/ddl/ingest/engine.go @@ -53,7 +53,8 @@ type engineInfo struct { jobID int64 indexID int64 openedEngine *backend.OpenedEngine - // closedEngine is set only closed when all data is finished written. + // closedEngine is set only when all data is finished written and all writers are + // closed. closedEngine *backend.ClosedEngine uuid uuid.UUID cfg *backend.EngineConfig diff --git a/pkg/ddl/ingest/engine_mgr.go b/pkg/ddl/ingest/engine_mgr.go index 84de78754b734..8d1449658c1c8 100644 --- a/pkg/ddl/ingest/engine_mgr.go +++ b/pkg/ddl/ingest/engine_mgr.go @@ -117,7 +117,9 @@ func (bc *litBackendCtx) FinishedWritingNeedImport() bool { return false } for _, ei := range bc.engines { - return ei.closedEngine != nil + if ei.closedEngine != nil { + return true + } } return false }