Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed May 15, 2024
1 parent dadc2c1 commit 83b8ab9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
20 changes: 11 additions & 9 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,20 @@ type BackendCtx interface {
// BackendCtx.
Register(indexIDs []int64, tableName string) ([]Engine, error)
UnregisterEngines()
// FinishImport imports the engine of given index ID into the storage, collects
// the duplicate errors if the `unique` is true. The first call of FinishImport
// means no further data will be wrote to the engine.
//
// TODO(lance6716): refine the interface to let caller don't need to pass the
// indexID, and unify with CollectRemoteDuplicateRows.
FinishImport(indexID int64, unique bool, tbl table.Table) error
// FinishedWritingNeedImport returns true only when all the engines are finished
// writing and only need import, that is to say, engines are closed.
// writing and only need import. Considering the calling usage of FinishImport,
// it will return true after a successful call of FinishImport and may return
// true after a failed call of FinishImport.
FinishedWritingNeedImport() bool

CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error
FinishImport(indexID int64, unique bool, tbl table.Table) error
FlushController
Done() bool
SetDone()
Expand Down Expand Up @@ -192,15 +200,9 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID
}
defer bc.flushing.Store(false)

unlockFn := make([]func(), 0, len(bc.engines))
defer func() {
for _, fn := range unlockFn {
fn()
}
}()
for indexID, ei := range bc.engines {
ei.flushLock.Lock()
unlockFn = append(unlockFn, ei.flushLock.Unlock)
defer ei.flushLock.Unlock()

if err = ei.Flush(); err != nil {
return false, false, indexID, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type engineInfo struct {
jobID int64
indexID int64
openedEngine *backend.OpenedEngine
// closedEngine is set only closed when all data is finished written.
// closedEngine is set only when all data is finished written and all writers are
// closed.
closedEngine *backend.ClosedEngine
uuid uuid.UUID
cfg *backend.EngineConfig
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func (bc *litBackendCtx) FinishedWritingNeedImport() bool {
return false
}
for _, ei := range bc.engines {
return ei.closedEngine != nil
if ei.closedEngine != nil {
return true
}
}
return false
}

0 comments on commit 83b8ab9

Please sign in to comment.