Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Sep 18, 2023
1 parent e25413d commit a3e9817
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions disttask/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"path"
"strconv"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/disttask/operator"
Expand All @@ -30,6 +32,10 @@ import (
"go.uber.org/zap"
)

const (
maxWaitDuration = 30 * time.Second
)

// encodeAndSortOperator is an operator that encodes and sorts data.
// this operator process data of a subtask, i.e. one engine, it contains a lot
// of data chunks, each chunk is a data file or part of it.
Expand Down Expand Up @@ -137,14 +143,16 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, workerID int
op: op,
}
if op.tableImporter.IsGlobalSort() {
// in case on network partition, 2 nodes might run the same subtask.
workerUUID := uuid.New().String()
// sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID}
indexWriterFn := func(indexID int64) *external.Writer {
builder := external.NewWriterBuilder().
SetOnCloseFunc(func(summary *external.WriterSummary) {
op.sharedVars.mergeIndexSummary(indexID, summary)
})
prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID)))
writerID := path.Join("index", strconv.Itoa(int(indexID)), strconv.Itoa(int(workerID)))
writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID)
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
return writer
}
Expand All @@ -153,7 +161,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, workerID int
builder := external.NewWriterBuilder().
SetOnCloseFunc(op.sharedVars.mergeDataSummary)
prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID)))
writerID := path.Join("data", strconv.Itoa(int(workerID)))
writerID := path.Join("data", workerUUID)
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
w.dataWriter = external.NewEngineWriter(writer)

Expand All @@ -178,7 +186,9 @@ func (w *chunkWorker) Close() {
closeCtx := w.ctx
if closeCtx.Err() != nil {
// in case of context canceled, we need to create a new context to close writers.
closeCtx = context.Background()
newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration)
closeCtx = newCtx
defer cancel()
}
if w.dataWriter != nil {
// Note: we cannot ignore close error as we're writing to S3 or GCS.
Expand Down

0 comments on commit a3e9817

Please sign in to comment.