From 0e24cc8876e7bfd71d5003eae9c3a43b166a45db Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 24 Dec 2020 12:32:29 +0800 Subject: [PATCH 1/3] cherry pick #1210 to release-4.0 Signed-off-by: ti-srebot --- .gitignore | 2 + Makefile | 7 +- cdc/model/changefeed.go | 2 +- cdc/model/changefeed_test.go | 2 +- cdc/puller/sorter/backend_pool.go | 108 ++++- cdc/puller/sorter/backend_pool_test.go | 153 +++++++ cdc/puller/sorter/file_backend.go | 5 + cdc/puller/sorter/heap_sorter.go | 258 ++++++------ cdc/puller/sorter/merger.go | 37 +- cdc/puller/sorter/unified_sorter.go | 73 +++- cdc/puller/sorter_test.go | 40 +- cdc/server.go | 8 +- cmd/client_changefeed.go | 2 +- cmd/server.go | 24 +- errors.toml | 20 + pkg/config/sorter.go | 2 + pkg/errors/errors.go | 22 ++ pkg/workerpool/async_pool.go | 28 ++ pkg/workerpool/async_pool_impl.go | 179 +++++++++ pkg/workerpool/async_pool_test.go | 139 +++++++ pkg/workerpool/context.go | 114 ++++++ pkg/workerpool/context_test.go | 237 +++++++++++ pkg/workerpool/hash.go | 33 ++ pkg/workerpool/pool.go | 59 +++ pkg/workerpool/pool_impl.go | 373 ++++++++++++++++++ pkg/workerpool/pool_test.go | 370 +++++++++++++++++ .../many_sorters_test/many_sorters.go | 20 +- .../many_sorters_test/many_sorters_test.go | 20 + .../sorter_stress_test/sorter_stress.go | 14 + tests/_utils/run_cdc_server | 2 + tests/cdclog_s3/run.sh | 1 + tests/ddl_puller_lag/run.sh | 4 +- tests/kafka_messages/run.sh | 12 +- tests/processor_panic/run.sh | 2 +- tests/unified_sorter/conf/workload | 2 +- tests/unified_sorter/run.sh | 2 +- 36 files changed, 2190 insertions(+), 186 deletions(-) create mode 100644 cdc/puller/sorter/backend_pool_test.go create mode 100644 pkg/workerpool/async_pool.go create mode 100644 pkg/workerpool/async_pool_impl.go create mode 100644 pkg/workerpool/async_pool_test.go create mode 100644 pkg/workerpool/context.go create mode 100644 pkg/workerpool/context_test.go create mode 100644 pkg/workerpool/hash.go create mode 100644 pkg/workerpool/pool.go create mode 100644 pkg/workerpool/pool_impl.go create mode 100644 pkg/workerpool/pool_test.go create mode 100644 testing_utils/many_sorters_test/many_sorters_test.go diff --git a/.gitignore b/.gitignore index ab4e4eb158f..26856c215c2 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,5 @@ docker/logs # Binary file when running intergration test integration/integration + +*.tmp diff --git a/Makefile b/Makefile index 18f334c6b74..ec2fe21b561 100644 --- a/Makefile +++ b/Makefile @@ -29,11 +29,16 @@ endif ARCH := "`uname -s`" LINUX := "Linux" MAC := "Darwin" -PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration' +PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration|testing_utils' PACKAGES := $$($(PACKAGE_LIST)) PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||' +<<<<<<< HEAD FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor' | grep -vE 'kv_gen') TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration') +======= +FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|kv_gen|proto') +TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration|testing_utils') +>>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) CDC_PKG := github.com/pingcap/ticdc FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/$(PROJECT)/"}|grep -v "github.com/pingcap/$(PROJECT)"; done) FAILPOINT := bin/failpoint-ctl diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 7f50659bd43..fb148bccaef 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -176,7 +176,7 @@ func (info *ChangeFeedInfo) Unmarshal(data []byte) error { func (info *ChangeFeedInfo) VerifyAndFix() error { defaultConfig := config.GetDefaultReplicaConfig() if info.Engine == "" { - info.Engine = SortInMemory + info.Engine = SortUnified } if info.Config.Filter == nil { info.Config.Filter = defaultConfig.Filter diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index f2ca0324115..10322f63e72 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -179,7 +179,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) { err := info.VerifyAndFix() c.Assert(err, check.IsNil) - c.Assert(info.Engine, check.Equals, SortInMemory) + c.Assert(info.Engine, check.Equals, SortUnified) marshalConfig1, err := info.Config.Marshal() c.Assert(err, check.IsNil) diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go index f32283d8cb2..e63732264fe 100644 --- a/cdc/puller/sorter/backend_pool.go +++ b/cdc/puller/sorter/backend_pool.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "os" + "path/filepath" "reflect" "runtime/debug" "sync" @@ -29,9 +30,14 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/config" + cerrors "github.com/pingcap/ticdc/pkg/errors" "go.uber.org/zap" ) +const ( + backgroundJobInterval = time.Second * 5 +) + var ( pool *backEndPool // this is the singleton instance of backEndPool poolMu sync.Mutex // this mutex is for delayed initialization of `pool` only @@ -44,6 +50,13 @@ type backEndPool struct { memPressure int32 cache [256]unsafe.Pointer dir string + filePrefix string + + // cancelCh needs to be unbuffered to prevent races + cancelCh chan struct{} + // cancelRWLock protects cache against races when the backEnd is exiting + cancelRWLock sync.RWMutex + isTerminating bool } func newBackEndPool(dir string, captureAddr string) *backEndPool { @@ -51,17 +64,25 @@ func newBackEndPool(dir string, captureAddr string) *backEndPool { memoryUseEstimate: 0, fileNameCounter: 0, dir: dir, + cancelCh: make(chan struct{}), + filePrefix: fmt.Sprintf("%s/sort-%d-", dir, os.Getpid()), } go func() { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(backgroundJobInterval) + defer ticker.Stop() metricSorterInMemoryDataSizeGauge := sorterInMemoryDataSizeGauge.WithLabelValues(captureAddr) metricSorterOnDiskDataSizeGauge := sorterOnDiskDataSizeGauge.WithLabelValues(captureAddr) metricSorterOpenFileCountGauge := sorterOpenFileCountGauge.WithLabelValues(captureAddr) for { - <-ticker.C + select { + case <-ret.cancelCh: + log.Info("Unified Sorter backEnd is being cancelled") + return + case <-ticker.C: + } metricSorterInMemoryDataSizeGauge.Set(float64(atomic.LoadInt64(&ret.memoryUseEstimate))) metricSorterOnDiskDataSizeGauge.Set(float64(atomic.LoadInt64(&ret.onDiskDataSize))) @@ -79,9 +100,10 @@ func newBackEndPool(dir string, captureAddr string) *backEndPool { memPressure := m.Used * 100 / m.Total atomic.StoreInt32(&ret.memPressure, int32(memPressure)) - if memPressure > 50 { - log.Debug("unified sorter: high memory pressure", zap.Uint64("memPressure", memPressure), - zap.Int64("usedBySorter", atomic.LoadInt64(&ret.memoryUseEstimate))) + + if memPressure := ret.memoryPressure(); memPressure > 50 { + log.Debug("unified sorter: high memory pressure", zap.Int32("memPressure", memPressure), + zap.Int64("usedBySorter", ret.sorterMemoryUsage())) // Increase GC frequency to avoid necessary OOM debug.SetGCPercent(10) } else { @@ -117,13 +139,20 @@ func newBackEndPool(dir string, captureAddr string) *backEndPool { func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) { sorterConfig := config.GetSorterConfig() - if atomic.LoadInt64(&p.memoryUseEstimate) < int64(sorterConfig.MaxMemoryConsumption) && - atomic.LoadInt32(&p.memPressure) < int32(sorterConfig.MaxMemoryPressure) { + if p.sorterMemoryUsage() < int64(sorterConfig.MaxMemoryConsumption) && + p.memoryPressure() < int32(sorterConfig.MaxMemoryPressure) { ret := newMemoryBackEnd() return ret, nil } + p.cancelRWLock.RLock() + defer p.cancelRWLock.RUnlock() + + if p.isTerminating { + return nil, cerrors.ErrUnifiedSorterBackendTerminating.GenWithStackByArgs() + } + for i := range p.cache { ptr := &p.cache[i] ret := atomic.SwapPointer(ptr, nil) @@ -132,8 +161,10 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) { } } - fname := fmt.Sprintf("%s/sort-%d-%d", p.dir, os.Getpid(), atomic.AddUint64(&p.fileNameCounter, 1)) - log.Debug("Unified Sorter: trying to create file backEnd", zap.String("filename", fname)) + fname := fmt.Sprintf("%s%d.tmp", p.filePrefix, atomic.AddUint64(&p.fileNameCounter, 1)) + log.Debug("Unified Sorter: trying to create file backEnd", + zap.String("filename", fname), + zap.String("table", tableNameFromCtx(ctx))) ret, err := newFileBackEnd(fname, &msgPackGenSerde{}) if err != nil { @@ -155,6 +186,13 @@ func (p *backEndPool) dealloc(backEnd backEnd) error { failpoint.Return(nil) } }) + p.cancelRWLock.RLock() + defer p.cancelRWLock.RUnlock() + + if p.isTerminating { + return cerrors.ErrUnifiedSorterBackendTerminating.GenWithStackByArgs() + } + for i := range p.cache { ptr := &p.cache[i] if atomic.CompareAndSwapPointer(ptr, nil, unsafe.Pointer(b)) { @@ -173,3 +211,55 @@ func (p *backEndPool) dealloc(backEnd backEnd) error { } return nil } + +func (p *backEndPool) terminate() { + p.cancelCh <- struct{}{} + defer close(p.cancelCh) + // the background goroutine can be considered terminated here + + p.cancelRWLock.Lock() + defer p.cancelRWLock.Unlock() + p.isTerminating = true + + // any new allocs and deallocs will not succeed from this point + // accessing p.cache without atomics is safe from now + + for i := range p.cache { + ptr := &p.cache[i] + backend := (*fileBackEnd)(*ptr) + if backend == nil { + continue + } + _ = backend.free() + } + + if p.filePrefix == "" { + // This should not happen. But to prevent accidents in production, we add this anyway. + log.Panic("Empty filePrefix, please report a bug") + } + + files, err := filepath.Glob(p.filePrefix + "*") + if err != nil { + log.Warn("Unified Sorter clean-up failed", zap.Error(err)) + } + for _, file := range files { + err = os.RemoveAll(file) + if err != nil { + log.Warn("Unified Sorter clean-up failed: failed to remove", zap.String("file-name", file), zap.Error(err)) + } + } +} + +func (p *backEndPool) sorterMemoryUsage() int64 { + failpoint.Inject("memoryUsageInjectPoint", func(val failpoint.Value) { + failpoint.Return(int64(val.(int))) + }) + return atomic.LoadInt64(&p.memoryUseEstimate) +} + +func (p *backEndPool) memoryPressure() int32 { + failpoint.Inject("memoryPressureInjectPoint", func(val failpoint.Value) { + failpoint.Return(int32(val.(int))) + }) + return atomic.LoadInt32(&p.memPressure) +} diff --git a/cdc/puller/sorter/backend_pool_test.go b/cdc/puller/sorter/backend_pool_test.go new file mode 100644 index 00000000000..2c923142a9f --- /dev/null +++ b/cdc/puller/sorter/backend_pool_test.go @@ -0,0 +1,153 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "context" + "os" + "strconv" + "testing" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/util/testleak" +) + +func TestSuite(t *testing.T) { check.TestingT(t) } + +type backendPoolSuite struct{} + +var _ = check.Suite(&backendPoolSuite{}) + +func (s *backendPoolSuite) TestBasicFunction(c *check.C) { + defer testleak.AfterTest(c)() + + err := os.MkdirAll("/tmp/sorter", 0o755) + c.Assert(err, check.IsNil) + + config.SetSorterConfig(&config.SorterConfig{ + MaxMemoryPressure: 90, // 90% + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16G + }) + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)") + c.Assert(err, check.IsNil) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + defer cancel() + + backEndPool := newBackEndPool("/tmp/sorter", "") + c.Assert(backEndPool, check.NotNil) + defer backEndPool.terminate() + + backEnd, err := backEndPool.alloc(ctx) + c.Assert(err, check.IsNil) + c.Assert(backEnd, check.FitsTypeOf, &fileBackEnd{}) + fileName := backEnd.(*fileBackEnd).fileName + c.Assert(fileName, check.Not(check.Equals), "") + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(0)") + c.Assert(err, check.IsNil) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryUsageInjectPoint", "return(34359738368)") + c.Assert(err, check.IsNil) + + backEnd1, err := backEndPool.alloc(ctx) + c.Assert(err, check.IsNil) + c.Assert(backEnd1, check.FitsTypeOf, &fileBackEnd{}) + fileName1 := backEnd1.(*fileBackEnd).fileName + c.Assert(fileName1, check.Not(check.Equals), "") + c.Assert(fileName1, check.Not(check.Equals), fileName) + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(0)") + c.Assert(err, check.IsNil) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryUsageInjectPoint", "return(0)") + c.Assert(err, check.IsNil) + + backEnd2, err := backEndPool.alloc(ctx) + c.Assert(err, check.IsNil) + c.Assert(backEnd2, check.FitsTypeOf, &memoryBackEnd{}) + + err = backEndPool.dealloc(backEnd) + c.Assert(err, check.IsNil) + + err = backEndPool.dealloc(backEnd1) + c.Assert(err, check.IsNil) + + err = backEndPool.dealloc(backEnd2) + c.Assert(err, check.IsNil) + + time.Sleep(backgroundJobInterval * 3 / 2) + + _, err = os.Stat(fileName) + c.Assert(os.IsNotExist(err), check.IsTrue) + + _, err = os.Stat(fileName1) + c.Assert(os.IsNotExist(err), check.IsTrue) +} + +func (s *backendPoolSuite) TestCleanUp(c *check.C) { + defer testleak.AfterTest(c)() + + err := os.MkdirAll("/tmp/sorter", 0o755) + c.Assert(err, check.IsNil) + + config.SetSorterConfig(&config.SorterConfig{ + MaxMemoryPressure: 90, // 90% + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16G + }) + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)") + c.Assert(err, check.IsNil) + + backEndPool := newBackEndPool("/tmp/sorter", "") + c.Assert(backEndPool, check.NotNil) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + defer cancel() + + var fileNames []string + for i := 0; i < 20; i++ { + backEnd, err := backEndPool.alloc(ctx) + c.Assert(err, check.IsNil) + c.Assert(backEnd, check.FitsTypeOf, &fileBackEnd{}) + + fileName := backEnd.(*fileBackEnd).fileName + _, err = os.Stat(fileName) + c.Assert(err, check.IsNil) + + fileNames = append(fileNames, fileName) + } + + prefix := backEndPool.filePrefix + c.Assert(prefix, check.Not(check.Equals), "") + + for j := 100; j < 120; j++ { + fileName := prefix + strconv.Itoa(j) + ".tmp" + f, err := os.Create(fileName) + c.Assert(err, check.IsNil) + err = f.Close() + c.Assert(err, check.IsNil) + + fileNames = append(fileNames, fileName) + } + + backEndPool.terminate() + + for _, fileName := range fileNames { + _, err = os.Stat(fileName) + c.Assert(os.IsNotExist(err), check.IsTrue) + } +} diff --git a/cdc/puller/sorter/file_backend.go b/cdc/puller/sorter/file_backend.go index d93265e3544..ad0526fcc36 100644 --- a/cdc/puller/sorter/file_backend.go +++ b/cdc/puller/sorter/file_backend.go @@ -63,7 +63,11 @@ func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd } func (f *fileBackEnd) reader() (backEndReader, error) { +<<<<<<< HEAD fd, err := os.OpenFile(f.fileName, os.O_RDONLY, 0644) +======= + fd, err := os.OpenFile(f.fileName, os.O_RDWR, 0o644) +>>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) if err != nil { return nil, errors.Trace(err) } @@ -121,6 +125,7 @@ func (f *fileBackEnd) free() error { } }) + log.Debug("Removing file", zap.String("file", f.fileName)) err := os.Remove(f.fileName) if err != nil { failpoint.Inject("sorterDebug", func() { diff --git a/cdc/puller/sorter/heap_sorter.go b/cdc/puller/sorter/heap_sorter.go index 82174fc8b8e..7758dcbe193 100644 --- a/cdc/puller/sorter/heap_sorter.go +++ b/cdc/puller/sorter/heap_sorter.go @@ -16,14 +16,17 @@ package sorter import ( "container/heap" "context" + "sync" "sync/atomic" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/ticdc/pkg/workerpool" "go.uber.org/zap" ) @@ -50,6 +53,9 @@ type heapSorter struct { inputCh chan *model.PolymorphicEvent outputCh chan *flushTask heap sortHeap + + poolHandle workerpool.EventHandle + internalState *heapSorterInternalState } func newHeapSorter(id int, out chan *flushTask) *heapSorter { @@ -61,22 +67,35 @@ func newHeapSorter(id int, out chan *flushTask) *heapSorter { } } -// flush should only be called within the main loop in run(). +// flush should only be called in the same goroutine where the heap is being written to. func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { captureAddr := util.CaptureAddrFromCtx(ctx) changefeedID := util.ChangefeedIDFromCtx(ctx) _, tableName := util.TableIDFromCtx(ctx) sorterFlushCountHistogram.WithLabelValues(captureAddr, changefeedID, tableName).Observe(float64(h.heap.Len())) - - isEmptyFlush := h.heap.Len() == 0 - if isEmptyFlush { - return nil - } var ( backEnd backEnd lowerBound uint64 ) + if h.heap.Len() > 0 { + lowerBound = h.heap[0].entry.CRTs + } else { + return nil + } + + // We check if the heap contains only one entry and that entry is a ResolvedEvent. + // As an optimization, when the condition is true, we clear the heap and send an empty flush. + // Sending an empty flush saves CPU and potentially IO. + // Since when a table is mostly idle or near-idle, most flushes would contain one ResolvedEvent alone, + // this optimization will greatly improve performance when (1) total number of table is large, + // and (2) most tables do not have many events. + if h.heap.Len() == 1 && h.heap[0].entry.RawKV.OpType == model.OpTypeResolved { + h.heap.Pop() + } + + isEmptyFlush := h.heap.Len() == 0 + var finishCh chan error if !isEmptyFlush { var err error backEnd, err = pool.alloc(ctx) @@ -84,7 +103,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { return errors.Trace(err) } - lowerBound = h.heap[0].entry.CRTs + finishCh = make(chan error, 1) } task := &flushTask{ @@ -93,7 +112,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { backend: backEnd, tsLowerBound: lowerBound, maxResolvedTs: maxResolvedTs, - finished: make(chan error, 2), + finished: finishCh, } h.taskCounter++ @@ -113,73 +132,75 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { return nil } } + failpoint.Inject("sorterDebug", func() { + log.Debug("Unified Sorter new flushTask", + zap.String("table", tableNameFromCtx(ctx)), + zap.Int("heap-id", task.heapSorterID), + zap.Uint64("resolvedTs", task.maxResolvedTs)) + }) - log.Debug("Unified Sorter new flushTask", - zap.String("table", tableNameFromCtx(ctx)), - zap.Int("heap-id", task.heapSorterID), - zap.Uint64("resolvedTs", task.maxResolvedTs)) - - go func() { - if isEmptyFlush { - return - } + if !isEmptyFlush { backEndFinal := backEnd - writer, err := backEnd.writer() - if err != nil { - if backEndFinal != nil { - _ = task.dealloc() + err := heapSorterIOPool.Go(ctx, func() { + writer, err := backEnd.writer() + if err != nil { + if backEndFinal != nil { + _ = task.dealloc() + } + task.finished <- errors.Trace(err) + return } - task.finished <- errors.Trace(err) - return - } - defer func() { - // handle errors (or aborts) gracefully to prevent resource leaking (especially FD's) - if writer != nil { - _ = writer.flushAndClose() - } - if backEndFinal != nil { - _ = task.dealloc() - } - close(task.finished) - }() + defer func() { + // handle errors (or aborts) gracefully to prevent resource leaking (especially FD's) + if writer != nil { + _ = writer.flushAndClose() + } + if backEndFinal != nil { + _ = task.dealloc() + } + close(task.finished) + }() - for oldHeap.Len() > 0 { - select { - case <-ctx.Done(): - task.finished <- ctx.Err() - default: + for oldHeap.Len() > 0 { + event := heap.Pop(&oldHeap).(*sortItem).entry + err := writer.writeNext(event) + if err != nil { + task.finished <- errors.Trace(err) + return + } } - event := heap.Pop(&oldHeap).(*sortItem).entry - err := writer.writeNext(event) + dataSize := writer.dataSize() + atomic.StoreInt64(&task.dataSize, int64(dataSize)) + eventCount := writer.writtenCount() + + writer1 := writer + writer = nil + err = writer1.flushAndClose() if err != nil { task.finished <- errors.Trace(err) return } - } - dataSize := writer.dataSize() - atomic.StoreInt64(&task.dataSize, int64(dataSize)) - eventCount := writer.writtenCount() + backEndFinal = nil - writer1 := writer - writer = nil - err = writer1.flushAndClose() + failpoint.Inject("sorterDebug", func() { + log.Debug("Unified Sorter flushTask finished", + zap.Int("heap-id", task.heapSorterID), + zap.String("table", tableNameFromCtx(ctx)), + zap.Uint64("resolvedTs", task.maxResolvedTs), + zap.Uint64("data-size", dataSize), + zap.Int("size", eventCount)) + }) + + task.finished <- nil // DO NOT access `task` beyond this point in this function + }) if err != nil { - task.finished <- errors.Trace(err) - return + close(task.finished) + return errors.Trace(err) } - - backEndFinal = nil - task.finished <- nil // DO NOT access `task` beyond this point in this function - log.Debug("Unified Sorter flushTask finished", - zap.Int("heap-id", task.heapSorterID), - zap.String("table", tableNameFromCtx(ctx)), - zap.Uint64("resolvedTs", task.maxResolvedTs), - zap.Uint64("data-size", dataSize), - zap.Int("size", eventCount)) - }() + } select { case <-ctx.Done(): @@ -189,63 +210,78 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { return nil } -func (h *heapSorter) run(ctx context.Context) error { - var ( - maxResolved uint64 - heapSizeBytesEstimate int64 - rateCounter int - ) +var ( + heapSorterPool workerpool.WorkerPool + heapSorterIOPool workerpool.AsyncPool + poolOnce sync.Once +) - rateTicker := time.NewTicker(1 * time.Second) - defer rateTicker.Stop() - - flushTicker := time.NewTicker(5 * time.Second) - defer flushTicker.Stop() - - sorterConfig := config.GetSorterConfig() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case event := <-h.inputCh: - heap.Push(&h.heap, &sortItem{entry: event}) - isResolvedEvent := event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved - - if isResolvedEvent { - if event.RawKV.CRTs < maxResolved { - log.Panic("ResolvedTs regression, bug?", zap.Uint64("event-resolvedTs", event.RawKV.CRTs), - zap.Uint64("max-resolvedTs", maxResolved)) - } - maxResolved = event.RawKV.CRTs - } +type heapSorterInternalState struct { + maxResolved uint64 + heapSizeBytesEstimate int64 + rateCounter int + sorterConfig *config.SorterConfig + timerMultiplier int +} - if event.RawKV.CRTs < maxResolved { - log.Panic("Bad input to sorter", zap.Uint64("cur-ts", event.RawKV.CRTs), zap.Uint64("maxResolved", maxResolved)) +func (h *heapSorter) init(ctx context.Context, onError func(err error)) { + state := &heapSorterInternalState{ + sorterConfig: config.GetSorterConfig(), + } + + poolHandle := heapSorterPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error { + event := eventI.(*model.PolymorphicEvent) + heap.Push(&h.heap, &sortItem{entry: event}) + isResolvedEvent := event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved + + if isResolvedEvent { + if event.RawKV.CRTs < state.maxResolved { + log.Panic("ResolvedTs regression, bug?", zap.Uint64("event-resolvedTs", event.RawKV.CRTs), + zap.Uint64("max-resolvedTs", state.maxResolved)) } + state.maxResolved = event.RawKV.CRTs + } - // 5 * 8 is for the 5 fields in PolymorphicEvent - heapSizeBytesEstimate += event.RawKV.ApproximateSize() + 40 - needFlush := heapSizeBytesEstimate >= int64(sorterConfig.ChunkSizeLimit) || - (isResolvedEvent && rateCounter < flushRateLimitPerSecond) + if event.RawKV.CRTs < state.maxResolved { + log.Panic("Bad input to sorter", zap.Uint64("cur-ts", event.RawKV.CRTs), zap.Uint64("maxResolved", state.maxResolved)) + } - if needFlush { - rateCounter++ - err := h.flush(ctx, maxResolved) - if err != nil { - return errors.Trace(err) - } - heapSizeBytesEstimate = 0 + // 5 * 8 is for the 5 fields in PolymorphicEvent + state.heapSizeBytesEstimate += event.RawKV.ApproximateSize() + 40 + needFlush := state.heapSizeBytesEstimate >= int64(state.sorterConfig.ChunkSizeLimit) || + (isResolvedEvent && state.rateCounter < flushRateLimitPerSecond) + + if needFlush { + state.rateCounter++ + err := h.flush(ctx, state.maxResolved) + if err != nil { + return errors.Trace(err) } - case <-flushTicker.C: - if rateCounter < flushRateLimitPerSecond { - err := h.flush(ctx, maxResolved) - if err != nil { - return errors.Trace(err) - } - heapSizeBytesEstimate = 0 + state.heapSizeBytesEstimate = 0 + } + + return nil + }).SetTimer(ctx, 1*time.Second, func(ctx context.Context) error { + state.rateCounter = 0 + state.timerMultiplier = (state.timerMultiplier + 1) % 5 + if state.timerMultiplier == 0 && state.rateCounter < flushRateLimitPerSecond { + err := h.flush(ctx, state.maxResolved) + if err != nil { + return errors.Trace(err) } - case <-rateTicker.C: - rateCounter = 0 + state.heapSizeBytesEstimate = 0 } - } + return nil + }).OnExit(onError) + + h.poolHandle = poolHandle + h.internalState = state +} + +func lazyInitWorkerPool() { + poolOnce.Do(func() { + sorterConfig := config.GetSorterConfig() + heapSorterPool = workerpool.NewDefaultWorkerPool(sorterConfig.NumWorkerPoolGoroutine) + heapSorterIOPool = workerpool.NewDefaultAsyncPool(sorterConfig.NumWorkerPoolGoroutine * 2) + }) } diff --git a/cdc/puller/sorter/merger.go b/cdc/puller/sorter/merger.go index 38f56840fce..1952274873b 100644 --- a/cdc/puller/sorter/merger.go +++ b/cdc/puller/sorter/merger.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/util" @@ -184,11 +185,13 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch return nil } - if sortHeap.Len() > 0 { - log.Debug("Unified Sorter: start merging", - zap.String("table", tableNameFromCtx(ctx)), - zap.Uint64("minResolvedTs", minResolvedTs)) - } + failpoint.Inject("sorterDebug", func() { + if sortHeap.Len() > 0 { + log.Debug("Unified Sorter: start merging", + zap.String("table", tableNameFromCtx(ctx)), + zap.Uint64("minResolvedTs", minResolvedTs)) + } + }) counter := 0 for sortHeap.Len() > 0 { @@ -281,11 +284,13 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch continue } - if counter%10 == 0 { - log.Debug("Merging progress", - zap.String("table", tableNameFromCtx(ctx)), - zap.Int("counter", counter)) - } + failpoint.Inject("sorterDebug", func() { + if counter%10 == 0 { + log.Debug("Merging progress", + zap.String("table", tableNameFromCtx(ctx)), + zap.Int("counter", counter)) + } + }) heap.Push(sortHeap, &sortItem{ entry: event, @@ -297,11 +302,13 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch log.Panic("unified sorter: merging ended prematurely, bug?", zap.Uint64("resolvedTs", minResolvedTs)) } - if counter > 0 { - log.Debug("Unified Sorter: merging ended", - zap.String("table", tableNameFromCtx(ctx)), - zap.Uint64("resolvedTs", minResolvedTs), zap.Int("count", counter)) - } + failpoint.Inject("sorterDebug", func() { + if counter > 0 { + log.Debug("Unified Sorter: merging ended", + zap.String("table", tableNameFromCtx(ctx)), + zap.Uint64("resolvedTs", minResolvedTs), zap.Int("count", counter)) + } + }) err := sendResolvedEvent(minResolvedTs) if err != nil { return errors.Trace(err) diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index 10b7adf87da..e9d8385d48d 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -15,7 +15,9 @@ package sorter import ( "context" + "sync" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" @@ -45,6 +47,7 @@ func NewUnifiedSorter(dir string, tableName string, captureAddr string) *Unified pool = newBackEndPool(dir, captureAddr) } + lazyInitWorkerPool() return &UnifiedSorter{ inputCh: make(chan *model.PolymorphicEvent, 128000), outputCh: make(chan *model.PolymorphicEvent, 128000), @@ -54,6 +57,18 @@ func NewUnifiedSorter(dir string, tableName string, captureAddr string) *Unified } } +// UnifiedSorterCleanUp cleans up the files that might have been used. +func UnifiedSorterCleanUp() { + poolMu.Lock() + defer poolMu.Unlock() + + if pool != nil { + log.Info("Unified Sorter: starting cleaning up files") + pool.terminate() + pool = nil + } +} + // Run implements the EventSorter interface func (s *UnifiedSorter) Run(ctx context.Context) error { failpoint.Inject("sorterDebug", func() { @@ -73,20 +88,38 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { // mergerCleanUp will consumer the remaining elements in heapSorterCollectCh to prevent any FD leak. defer mergerCleanUp(heapSorterCollectCh) - heapSorterErrg, subsubctx := errgroup.WithContext(subctx) + heapSorterErrCh := make(chan error, 1) + defer close(heapSorterErrCh) + heapSorterErrOnce := &sync.Once{} heapSorters := make([]*heapSorter, sorterConfig.NumConcurrentWorker) for i := range heapSorters { finalI := i heapSorters[finalI] = newHeapSorter(finalI, heapSorterCollectCh) - heapSorterErrg.Go(func() error { - return printError(heapSorters[finalI].run(subsubctx)) + heapSorters[finalI].init(subctx, func(err error) { + heapSorterErrOnce.Do(func() { + heapSorterErrCh <- err + }) }) } errg.Go(func() error { - // must wait for all writers to exit to close the channel. - defer close(heapSorterCollectCh) - return heapSorterErrg.Wait() + defer func() { + // cancelling the heapSorters from the outside + for _, hs := range heapSorters { + hs.poolHandle.Unregister() + } + // must wait for all writers to exit to close the channel. + close(heapSorterCollectCh) + }() + + for { + select { + case <-subctx.Done(): + return errors.Trace(subctx.Err()) + case err := <-heapSorterErrCh: + return errors.Trace(err) + } + } }) errg.Go(func() error { @@ -106,7 +139,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { select { case <-subctx.Done(): return subctx.Err() - case sorter.inputCh <- event: + default: + } + err := sorter.poolHandle.AddEvent(subctx, event) + if err != nil { + return errors.Trace(err) } } continue @@ -118,7 +155,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { select { case <-subctx.Done(): return subctx.Err() - case heapSorters[targetID].inputCh <- event: + default: + err := heapSorters[targetID].poolHandle.AddEvent(subctx, event) + if err != nil { + return errors.Trace(err) + } } } } @@ -141,6 +182,22 @@ func (s *UnifiedSorter) Output() <-chan *model.PolymorphicEvent { return s.outputCh } +// RunWorkerPool runs the worker pool used by the heapSorters +// It **must** be running for Unified Sorter to work. +func RunWorkerPool(ctx context.Context) error { + lazyInitWorkerPool() + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return errors.Trace(heapSorterPool.Run(ctx)) + }) + + errg.Go(func() error { + return errors.Trace(heapSorterIOPool.Run(ctx)) + }) + + return errors.Trace(errg.Wait()) +} + // tableNameFromCtx is used for retrieving the table's name from a context within the Unified Sorter func tableNameFromCtx(ctx context.Context) string { if sorter, ok := ctx.Value(ctxKey{}).(*UnifiedSorter); ok { diff --git a/cdc/puller/sorter_test.go b/cdc/puller/sorter_test.go index 8674ba2c2b5..cf3a2bf0fab 100644 --- a/cdc/puller/sorter_test.go +++ b/cdc/puller/sorter_test.go @@ -55,16 +55,23 @@ func generateMockRawKV(ts uint64) *model.RawKVEntry { func (s *sorterSuite) TestSorterBasic(c *check.C) { defer testleak.AfterTest(c)() + defer sorter2.UnifiedSorterCleanUp() + config.SetSorterConfig(&config.SorterConfig{ - NumConcurrentWorker: 8, - ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, - MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + NumWorkerPoolGoroutine: 4, }) +<<<<<<< HEAD err := os.MkdirAll("./sorter", 0755) +======= + err := os.MkdirAll("/tmp/sorter", 0o755) +>>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) c.Assert(err, check.IsNil) - sorter := sorter2.NewUnifiedSorter("./sorter", "test", "0.0.0.0:0") + sorter := sorter2.NewUnifiedSorter("/tmp/sorter", "test", "0.0.0.0:0") ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() @@ -73,16 +80,23 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) { func (s *sorterSuite) TestSorterCancel(c *check.C) { defer testleak.AfterTest(c)() + defer sorter2.UnifiedSorterCleanUp() + config.SetSorterConfig(&config.SorterConfig{ - NumConcurrentWorker: 8, - ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, - MaxMemoryConsumption: 0, + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 0, + NumWorkerPoolGoroutine: 4, }) +<<<<<<< HEAD err := os.MkdirAll("./sorter", 0755) +======= + err := os.MkdirAll("/tmp/sorter", 0o755) +>>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) c.Assert(err, check.IsNil) - sorter := sorter2.NewUnifiedSorter("./sorter", "test", "0.0.0.0:0") + sorter := sorter2.NewUnifiedSorter("/tmp/sorter", "test", "0.0.0.0:0") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -93,7 +107,7 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { close(finishedCh) }() - after := time.After(20 * time.Second) + after := time.After(30 * time.Second) select { case <-after: c.FailNow() @@ -115,6 +129,10 @@ func testSorter(ctx context.Context, c *check.C, sorter EventSorter, count int) return sorter.Run(ctx) }) + errg.Go(func() error { + return sorter2.RunWorkerPool(ctx) + }) + producerProgress := make([]uint64, numProducers) // launch the producers diff --git a/cdc/server.go b/cdc/server.go index 5ab1d25d900..03c969350a2 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -22,10 +22,10 @@ import ( "sync" "time" - "github.com/pingcap/ticdc/cdc/kv" - "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/cdc/puller/sorter" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" @@ -334,6 +334,10 @@ func (s *Server) run(ctx context.Context) (err error) { return s.campaignOwnerLoop(cctx) }) + wg.Go(func() error { + return sorter.RunWorkerPool(cctx) + }) + wg.Go(func() error { return s.capture.Run(cctx) }) diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index c0147a7a917..2a42530abd8 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -376,7 +376,7 @@ func changefeedConfigVariables(command *cobra.Command) { command.PersistentFlags().StringVar(&sinkURI, "sink-uri", "", "sink uri") command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file") command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format") - command.PersistentFlags().StringVar(&sortEngine, "sort-engine", "memory", "sort engine used for data sort") + command.PersistentFlags().StringVar(&sortEngine, "sort-engine", "unified", "sort engine used for data sort") command.PersistentFlags().StringVar(&sortDir, "sort-dir", ".", "directory used for file sort") command.PersistentFlags().StringVar(&timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)") command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed") diff --git a/cmd/server.go b/cmd/server.go index 2749a3cdf73..3595b841540 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -17,6 +17,8 @@ import ( "context" "time" + "github.com/pingcap/ticdc/cdc/puller/sorter" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc" @@ -38,10 +40,11 @@ var ( logFile string logLevel string // variables for unified sorter - numConcurrentWorker int - chunkSizeLimit uint64 - maxMemoryPressure int - maxMemoryConsumption uint64 + numConcurrentWorker int + chunkSizeLimit uint64 + maxMemoryPressure int + maxMemoryConsumption uint64 + numWorkerPoolGoroutine int ownerFlushInterval time.Duration processorFlushInterval time.Duration @@ -74,7 +77,8 @@ func init() { serverCmd.Flags().DurationVar(&ownerFlushInterval, "owner-flush-interval", time.Millisecond*200, "owner flushes changefeed status interval") serverCmd.Flags().DurationVar(&processorFlushInterval, "processor-flush-interval", time.Millisecond*100, "processor flushes task status interval") - serverCmd.Flags().IntVar(&numConcurrentWorker, "sorter-num-concurrent-worker", 8, "sorter concurrency level") + serverCmd.Flags().IntVar(&numWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", 16, "sorter workerpool size") + serverCmd.Flags().IntVar(&numConcurrentWorker, "sorter-num-concurrent-worker", 4, "sorter concurrency level") serverCmd.Flags().Uint64Var(&chunkSizeLimit, "sorter-chunk-size-limit", 1024*1024*1024, "size of heaps for sorting") serverCmd.Flags().IntVar(&maxMemoryPressure, "sorter-max-memory-percentage", 90, "system memory usage threshold for forcing in-disk sort") serverCmd.Flags().Uint64Var(&maxMemoryConsumption, "sorter-max-memory-consumption", 16*1024*1024*1024, "maximum memory consumption of in-memory sort") @@ -94,10 +98,11 @@ func runEServer(cmd *cobra.Command, args []string) error { } config.SetSorterConfig(&config.SorterConfig{ - NumConcurrentWorker: numConcurrentWorker, - ChunkSizeLimit: chunkSizeLimit, - MaxMemoryPressure: maxMemoryPressure, - MaxMemoryConsumption: maxMemoryConsumption, + NumConcurrentWorker: numConcurrentWorker, + ChunkSizeLimit: chunkSizeLimit, + MaxMemoryPressure: maxMemoryPressure, + MaxMemoryConsumption: maxMemoryConsumption, + NumWorkerPoolGoroutine: numWorkerPoolGoroutine, }) version.LogVersionInfo() @@ -121,6 +126,7 @@ func runEServer(cmd *cobra.Command, args []string) error { return errors.Annotate(err, "run server") } server.Close() + sorter.UnifiedSorterCleanUp() log.Info("cdc server exits successfully") return nil diff --git a/errors.toml b/errors.toml index 5a0251edf28..11ed12bd3c1 100755 --- a/errors.toml +++ b/errors.toml @@ -16,6 +16,11 @@ error = ''' Async broadcasts not supported ''' +["CDC:ErrAsyncPoolExited"] +error = ''' +asyncPool has exited. Report a bug if seen externally. +''' + ["CDC:ErrAvroEncodeFailed"] error = ''' encode to avro native data @@ -646,6 +651,11 @@ error = ''' url format is invalid ''' +["CDC:ErrUnifiedSorterBackendTerminating"] +error = ''' +unified sorter backend is terminating +''' + ["CDC:ErrUnknownKVEventType"] error = ''' unknown kv event type: %v, entry: %v @@ -676,6 +686,16 @@ error = ''' waiting processor to handle the operation finished timeout ''' +["CDC:ErrWorkerPoolEmptyTask"] +error = ''' +workerpool received an empty task, please report a bug +''' + +["CDC:ErrWorkerPoolHandleCancelled"] +error = ''' +workerpool handle is cancelled +''' + ["CDC:ErrWriteTsConflict"] error = ''' write ts conflict diff --git a/pkg/config/sorter.go b/pkg/config/sorter.go index ef76c980cd5..a1b81b4b375 100644 --- a/pkg/config/sorter.go +++ b/pkg/config/sorter.go @@ -25,6 +25,8 @@ type SorterConfig struct { MaxMemoryPressure int `toml:"max-memory-pressure" json:"max-memory-pressure"` // the maximum memory consumption allowed for in-memory sorting MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"` + // the size of workerpool + NumWorkerPoolGoroutine int `toml:"num-workerpool-goroutine" json:"num-workerpool-goroutine"` } var ( diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 151fc0c5c69..65284717d3d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -175,4 +175,26 @@ var ( ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch")) ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) +<<<<<<< HEAD +======= + + // EtcdWorker related errors. Internal use only. + // ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort. + ErrEtcdTryAgain = errors.Normalize("the etcd txn should be aborted and retried immediately", errors.RFCCodeText("CDC:ErrEtcdTryAgain")) + // ErrEtcdIgnore is used by a PatchFunc to signal that the reactor no longer wishes to update Etcd. + ErrEtcdIgnore = errors.Normalize("this patch should be excluded from the current etcd txn", errors.RFCCodeText("CDC:ErrEtcdIgnore")) + // ErrReactorFinished is used by reactor to signal a **normal** exit. + ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) + + // pipeline errors + ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline")) + + // workerpool errors + ErrWorkerPoolHandleCancelled = errors.Normalize("workerpool handle is cancelled", errors.RFCCodeText("CDC:ErrWorkerPoolHandleCancelled")) + ErrWorkerPoolEmptyTask = errors.Normalize("workerpool received an empty task, please report a bug", errors.RFCCodeText("CDC:ErrWorkerPoolEmptyTask")) + ErrAsyncPoolExited = errors.Normalize("asyncPool has exited. Report a bug if seen externally.", errors.RFCCodeText("CDC:ErrAsyncPoolExited")) + + // unified sorter errors + ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating")) +>>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) ) diff --git a/pkg/workerpool/async_pool.go b/pkg/workerpool/async_pool.go new file mode 100644 index 00000000000..90e717b8cfb --- /dev/null +++ b/pkg/workerpool/async_pool.go @@ -0,0 +1,28 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +import "context" + +// AsyncPool provides a simple Goroutine pool, where the order in which jobs are run is non-deterministic. +type AsyncPool interface { + // Go mimics the semantics of the "go" keyword, with the only difference being the `ctx` parameter, + // which is used to cancel **the submission of task**. + // **All** tasks successfully submitted will be run eventually, as long as Run are called infinitely many times. + // Go might block when the AsyncPool is not running. + Go(ctx context.Context, f func()) error + + // Run runs the AsyncPool. + Run(ctx context.Context) error +} diff --git a/pkg/workerpool/async_pool_impl.go b/pkg/workerpool/async_pool_impl.go new file mode 100644 index 00000000000..81ad5b2067d --- /dev/null +++ b/pkg/workerpool/async_pool_impl.go @@ -0,0 +1,179 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/cenkalti/backoff" + "github.com/pingcap/errors" + cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/retry" + "golang.org/x/sync/errgroup" +) + +type defaultAsyncPoolImpl struct { + workers []*asyncWorker + nextWorkerID int32 + isRunning int32 + runningLock sync.RWMutex +} + +// NewDefaultAsyncPool creates a new AsyncPool that uses the default implementation +func NewDefaultAsyncPool(numWorkers int) AsyncPool { + return newDefaultAsyncPoolImpl(numWorkers) +} + +func newDefaultAsyncPoolImpl(numWorkers int) *defaultAsyncPoolImpl { + workers := make([]*asyncWorker, numWorkers) + + return &defaultAsyncPoolImpl{ + workers: workers, + } +} + +func (p *defaultAsyncPoolImpl) Go(ctx context.Context, f func()) error { + if p.doGo(ctx, f) == nil { + return nil + } + return errors.Trace(retry.Run(time.Millisecond*1, 25, func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + err := errors.Trace(p.doGo(ctx, f)) + if err != nil && cerrors.ErrAsyncPoolExited.NotEqual(errors.Cause(err)) { + return backoff.Permanent(err) + } + return err + })) +} + +func (p *defaultAsyncPoolImpl) doGo(ctx context.Context, f func()) error { + p.runningLock.RLock() + defer p.runningLock.RUnlock() + + if atomic.LoadInt32(&p.isRunning) == 0 { + return cerrors.ErrAsyncPoolExited.GenWithStackByArgs() + } + + task := &asyncTask{f: f} + worker := p.workers[int(atomic.AddInt32(&p.nextWorkerID, 1))%len(p.workers)] + + worker.chLock.RLock() + defer worker.chLock.RUnlock() + + if atomic.LoadInt32(&worker.isClosed) == 1 { + return cerrors.ErrAsyncPoolExited.GenWithStackByArgs() + } + + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case worker.inputCh <- task: + } + + return nil +} + +func (p *defaultAsyncPoolImpl) Run(ctx context.Context) error { + p.prepare() + errg := errgroup.Group{} + + p.runningLock.Lock() + atomic.StoreInt32(&p.isRunning, 1) + p.runningLock.Unlock() + + defer func() { + p.runningLock.Lock() + atomic.StoreInt32(&p.isRunning, 0) + p.runningLock.Unlock() + }() + + errCh := make(chan error, len(p.workers)) + defer close(errCh) + + for _, worker := range p.workers { + workerFinal := worker + errg.Go(func() error { + err := workerFinal.run() + if err != nil && cerrors.ErrAsyncPoolExited.NotEqual(errors.Cause(err)) { + errCh <- err + } + return nil + }) + } + + errg.Go(func() error { + var err error + select { + case <-ctx.Done(): + err = ctx.Err() + case err = <-errCh: + } + + for _, worker := range p.workers { + worker.close() + } + + return err + }) + + return errors.Trace(errg.Wait()) +} + +func (p *defaultAsyncPoolImpl) prepare() { + for i := range p.workers { + p.workers[i] = newAsyncWorker() + } +} + +type asyncTask struct { + f func() +} + +type asyncWorker struct { + inputCh chan *asyncTask + isClosed int32 + chLock sync.RWMutex +} + +func newAsyncWorker() *asyncWorker { + return &asyncWorker{inputCh: make(chan *asyncTask, 12800)} +} + +func (w *asyncWorker) run() error { + for { + task := <-w.inputCh + if task == nil { + return cerrors.ErrAsyncPoolExited.GenWithStackByArgs() + } + task.f() + } +} + +func (w *asyncWorker) close() { + if atomic.SwapInt32(&w.isClosed, 1) == 1 { + return + } + + w.chLock.Lock() + defer w.chLock.Unlock() + + close(w.inputCh) +} diff --git a/pkg/workerpool/async_pool_test.go b/pkg/workerpool/async_pool_test.go new file mode 100644 index 00000000000..d1e2107c607 --- /dev/null +++ b/pkg/workerpool/async_pool_test.go @@ -0,0 +1,139 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +import ( + "context" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/log" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/util/testleak" + "golang.org/x/sync/errgroup" +) + +type asyncPoolSuite struct{} + +var _ = check.Suite(&asyncPoolSuite{}) + +func (s *asyncPoolSuite) TestBasic(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + errg, ctx := errgroup.WithContext(ctx) + + pool := newDefaultAsyncPoolImpl(4) + errg.Go(func() error { + return pool.Run(ctx) + }) + + var sum int32 + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + finalI := i + err := pool.Go(ctx, func() { + time.Sleep(time.Millisecond * time.Duration(rand.Int()%100)) + atomic.AddInt32(&sum, int32(finalI+1)) + wg.Done() + }) + c.Assert(err, check.IsNil) + } + + wg.Wait() + c.Assert(sum, check.Equals, int32(5050)) + + cancel() + err := errg.Wait() + c.Assert(err, check.ErrorMatches, "context canceled") +} + +func (s *asyncPoolSuite) TestEventuallyRun(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + errg, ctx := errgroup.WithContext(ctx) + loopCtx, cancelLoop := context.WithCancel(ctx) + defer cancelLoop() + + pool := newDefaultAsyncPoolImpl(4) + errg.Go(func() error { + defer cancelLoop() + for i := 0; i < 10; i++ { + log.Info("running pool") + err := runForDuration(ctx, time.Millisecond*500, func(ctx context.Context) error { + return pool.Run(ctx) + }) + if err != nil { + return errors.Trace(err) + } + } + return nil + }) + + var sum int32 + var sumExpected int32 +loop: + for i := 0; ; i++ { + select { + case <-loopCtx.Done(): + break loop + default: + } + finalI := i + err := pool.Go(loopCtx, func() { + if rand.Int()%128 == 0 { + time.Sleep(2 * time.Millisecond) + } + atomic.AddInt32(&sum, int32(finalI+1)) + }) + if err != nil { + c.Assert(err, check.ErrorMatches, "context canceled") + } else { + sumExpected += int32(i + 1) + } + } + + cancel() + err := errg.Wait() + c.Assert(err, check.IsNil) + c.Assert(sum, check.Equals, sumExpected) +} + +func runForDuration(ctx context.Context, duration time.Duration, f func(ctx context.Context) error) error { + timedCtx, cancel := context.WithTimeout(ctx, duration) + defer cancel() + + errCh := make(chan error) + go func() { + errCh <- f(timedCtx) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errCh: + if errors.Cause(err) == context.DeadlineExceeded { + return nil + } + return errors.Trace(err) + } +} diff --git a/pkg/workerpool/context.go b/pkg/workerpool/context.go new file mode 100644 index 00000000000..59acf448bb2 --- /dev/null +++ b/pkg/workerpool/context.go @@ -0,0 +1,114 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +type mergedContext struct { + doneChMu sync.Mutex + hasDoneCh int32 + doneCh chan struct{} + isCancelled int32 + cancelCh chan struct{} + + ctx1 context.Context + ctx2 context.Context +} + +// MergeContexts merges two contexts. +// In case of conflicting errors or values, ctx1 takes priority. +func MergeContexts(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { + cancelCh := make(chan struct{}) + ret := &mergedContext{ + ctx1: ctx1, + ctx2: ctx2, + cancelCh: cancelCh, + } + + cancelFunc := func() { + if atomic.SwapInt32(&ret.isCancelled, 1) == 0 { + close(cancelCh) + } + } + return ret, cancelFunc +} + +func (m *mergedContext) Deadline() (deadline time.Time, ok bool) { + ddl1, ok1 := m.ctx1.Deadline() + ddl2, ok2 := m.ctx2.Deadline() + + if ok1 && ok2 { + if ddl2.After(ddl1) { + return ddl1, true + } + return ddl2, true + } + + if ok1 { + return ddl1, true + } + + return ddl2, ok2 +} + +func (m *mergedContext) Done() <-chan struct{} { + // Using an atomic operation to check if the channel has already been created. + // The makes sure that all calls to Done, except for the first few, are block-free. + if atomic.LoadInt32(&m.hasDoneCh) == 1 { + // fast path + return m.doneCh + } + + // slow path using mutex + m.doneChMu.Lock() + defer m.doneChMu.Unlock() + + // Several goroutines could have attempted to lock the mutex, + // only the first one should create the channel. + // So we check if m.hasDoneCh is still 0. + if atomic.LoadInt32(&m.hasDoneCh) == 0 { + m.doneCh = make(chan struct{}) + atomic.StoreInt32(&m.hasDoneCh, 1) + + go func() { + select { + case <-m.ctx1.Done(): + case <-m.ctx2.Done(): + case <-m.cancelCh: + } + close(m.doneCh) + }() + } + + return m.doneCh +} + +func (m *mergedContext) Err() error { + if m.ctx1.Err() != nil { + return m.ctx1.Err() + } + return m.ctx2.Err() +} + +func (m *mergedContext) Value(key interface{}) interface{} { + if v1 := m.ctx1.Value(key); v1 != nil { + return v1 + } + return m.ctx2.Value(key) +} diff --git a/pkg/workerpool/context_test.go b/pkg/workerpool/context_test.go new file mode 100644 index 00000000000..ce1bda6f099 --- /dev/null +++ b/pkg/workerpool/context_test.go @@ -0,0 +1,237 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/ticdc/pkg/util/testleak" + "golang.org/x/sync/errgroup" +) + +type contextSuite struct{} + +var _ = check.Suite(&contextSuite{}) + +// TestDoneCase1 tests cancelling the first context in the merge. +func (s *contextSuite) TestDoneCase1(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel0 := context.WithTimeout(context.Background(), time.Second*3) + defer cancel0() + + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() + ctx2, cancel2 := context.WithCancel(ctx) + defer cancel2() + + merged, cancelM := MergeContexts(ctx1, ctx2) + defer cancelM() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + <-merged.Done() + wg.Done() + }() + + cancel1() + wg.Wait() +} + +// TestDoneCase2 tests cancelling the second context in the merge. +func (s *contextSuite) TestDoneCase2(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel0 := context.WithTimeout(context.Background(), time.Second*3) + defer cancel0() + + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() + ctx2, cancel2 := context.WithCancel(ctx) + defer cancel2() + + merged, cancelM := MergeContexts(ctx1, ctx2) + defer cancelM() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + <-merged.Done() + wg.Done() + }() + + cancel2() + wg.Wait() +} + +// TestDoneCaseCancel tests cancelling the merged context. +func (s *contextSuite) TestDoneCaseCancel(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel0 := context.WithTimeout(context.Background(), time.Second*3) + defer cancel0() + + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() + ctx2, cancel2 := context.WithCancel(ctx) + defer cancel2() + + merged, cancelM := MergeContexts(ctx1, ctx2) + defer cancelM() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + <-merged.Done() + wg.Done() + }() + + cancelM() + wg.Wait() +} + +func (s *contextSuite) TestDoneContention(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel0 := context.WithTimeout(context.Background(), time.Second*10) + defer cancel0() + + errg, ctx := errgroup.WithContext(ctx) + + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() + ctx2, cancel2 := context.WithCancel(ctx) + defer cancel2() + + merged, cancelM := MergeContexts(ctx1, ctx2) + defer cancelM() + + for i := 0; i < 32; i++ { + errg.Go(func() error { + <-merged.Done() + c.Assert(ctx.Err(), check.IsNil) + return nil + }) + } + + time.Sleep(time.Second * 1) + cancel1() + + err := errg.Wait() + c.Assert(err, check.IsNil) +} + +type mockContext struct { + deadline time.Time + err error + values map[string]string +} + +func (m *mockContext) Deadline() (deadline time.Time, ok bool) { + if m.deadline.IsZero() { + return time.Time{}, false + } + return m.deadline, true +} + +func (m *mockContext) Done() <-chan struct{} { + panic("not used") +} + +func (m *mockContext) Err() error { + return m.err +} + +func (m *mockContext) Value(key interface{}) interface{} { + if value, ok := m.values[key.(string)]; ok { + return value + } + return nil +} + +func (s *contextSuite) TestErr(c *check.C) { + defer testleak.AfterTest(c)() + + ctx1 := &mockContext{} + ctx2 := &mockContext{} + + mContext, cancelM := MergeContexts(ctx1, ctx2) + defer cancelM() + + c.Assert(mContext.Err(), check.IsNil) + + ctx1.err = errors.New("error1") + c.Assert(mContext.Err(), check.ErrorMatches, "error1") + + ctx2.err = errors.New("error2") + c.Assert(mContext.Err(), check.ErrorMatches, "error1") + + ctx1.err = nil + c.Assert(mContext.Err(), check.ErrorMatches, "error2") +} + +func (s *contextSuite) TestDeadline(c *check.C) { + defer testleak.AfterTest(c)() + + ctx1 := &mockContext{} + ctx2 := &mockContext{} + + mContext, cancelM := MergeContexts(ctx1, ctx2) + defer cancelM() + + _, ok := mContext.Deadline() + c.Assert(ok, check.IsFalse) + + startTime := time.Now() + + ctx1.deadline = startTime.Add(time.Minute * 1) + ddl, ok := mContext.Deadline() + c.Assert(ok, check.IsTrue) + c.Assert(ddl, check.Equals, startTime.Add(time.Minute*1)) + + ctx2.deadline = startTime.Add(time.Minute * 2) + ddl, ok = mContext.Deadline() + c.Assert(ok, check.IsTrue) + c.Assert(ddl, check.Equals, startTime.Add(time.Minute*1)) + + ctx1.deadline = startTime.Add(time.Minute * 3) + ddl, ok = mContext.Deadline() + c.Assert(ok, check.IsTrue) + c.Assert(ddl, check.Equals, startTime.Add(time.Minute*2)) + + ctx1.deadline = time.Time{} + ddl, ok = mContext.Deadline() + c.Assert(ok, check.IsTrue) + c.Assert(ddl, check.Equals, startTime.Add(time.Minute*2)) +} + +func (s *contextSuite) TestValues(c *check.C) { + defer testleak.AfterTest(c)() + + ctx1 := &mockContext{ + values: map[string]string{"a": "1a", "c": "1c"}, + } + ctx2 := &mockContext{ + values: map[string]string{"b": "2b", "c": "2c"}, + } + + mContext, cancelM := MergeContexts(ctx1, ctx2) + defer cancelM() + + c.Assert(mContext.Value("a"), check.Equals, "1a") + c.Assert(mContext.Value("b"), check.Equals, "2b") + c.Assert(mContext.Value("c"), check.Equals, "1c") + c.Assert(mContext.Value("d"), check.IsNil) +} diff --git a/pkg/workerpool/hash.go b/pkg/workerpool/hash.go new file mode 100644 index 00000000000..9af534ca8a5 --- /dev/null +++ b/pkg/workerpool/hash.go @@ -0,0 +1,33 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +// Hasher is actually a "placement driver" that balances the workload. +// Non-trivial Hashers will be useful if and when we implement dynamic resizing of WorkerPool. +type Hasher interface { + Hash(object Hashable) int64 +} + +// Hashable is an object that can be hashed. +type Hashable interface { + HashCode() int64 +} + +type defaultHasher struct { +} + +// Hash returns the hash value. +func (m *defaultHasher) Hash(object Hashable) int64 { + return object.HashCode() +} diff --git a/pkg/workerpool/pool.go b/pkg/workerpool/pool.go new file mode 100644 index 00000000000..c0b54051d8a --- /dev/null +++ b/pkg/workerpool/pool.go @@ -0,0 +1,59 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +import ( + "context" + "time" +) + +// WorkerPool runs a number of Goroutines that process the submitted events. +// Each EventHandle is bound to a fixed worker thread to prevent data races. +type WorkerPool interface { + // RegisterEvent returns a handle that can be used to trigger the execution of `f`. + // `f` will be called with a context that is a child of the context with which Run is called. + // TODO more reasonable usage of contexts, potentially involving context merging. + RegisterEvent(f func(ctx context.Context, event interface{}) error) EventHandle + + // Run runs the WorkerPool. + // Internally several Goroutines are spawned. + Run(ctx context.Context) error +} + +// EventHandle is a handle for a registered event. +// Since events are executed asynchronously, errors should be collected from ErrCh(). +// EventHandles SHOULD NOT be assumed to be thread safe. +type EventHandle interface { + // AddEvent adds an `event` object to the internal queue, so that the `f` used to register the handle can be called. + // Note: events are always processed in the order they are added. + // Unregistering the EventHandle MAY CAUSE EVENT LOSSES. But for an event lost, any event after it is guaranteed to be lost too. + AddEvent(ctx context.Context, event interface{}) error + + // SetTimer is used to provide a function that is periodic called, as long as the EventHandle has not been unregistered. + // The current implementation uses as the base clock source a ticker whose interval is the const workerPoolDefaultClockSourceInterval. + // DO NOT set an interval less than workerPoolDefaultClockSourceInterval. + SetTimer(ctx context.Context, interval time.Duration, f func(ctx context.Context) error) EventHandle + + // Unregister removes the EventHandle from the WorkerPool. + // Note: Unregister WILL block until the operation has taken effect, i.e. the handler will not be executed after + // Unregister returns. Unregister WILL NOT attempt to wait for pending events to complete, which means the last few events can be lost. + Unregister() + + // ErrCh returns a channel that outputs the first non-nil result of events submitted to this EventHandle. + // Note that a non-nil result of an event cancels the EventHandle, so there is at most one error. + ErrCh() <-chan error + + // OnExit is used to provide a function that will be called when the handle exits abnormally. + OnExit(f func(err error)) EventHandle +} diff --git a/pkg/workerpool/pool_impl.go b/pkg/workerpool/pool_impl.go new file mode 100644 index 00000000000..0ebc522e54e --- /dev/null +++ b/pkg/workerpool/pool_impl.go @@ -0,0 +1,373 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/log" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/notify" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + workerPoolDefaultClockSourceInterval = time.Millisecond * 100 +) + +type defaultPoolImpl struct { + // assume the hasher to be the trivial hasher for now + hasher Hasher + // do not resize this slice after creating the pool + workers []*worker + // used to generate handler IDs, must be accessed atomically + nextHandlerID int64 +} + +// NewDefaultWorkerPool creates a new WorkerPool that uses the default implementation +func NewDefaultWorkerPool(numWorkers int) WorkerPool { + return newDefaultPoolImpl(&defaultHasher{}, numWorkers) +} + +func newDefaultPoolImpl(hasher Hasher, numWorkers int) *defaultPoolImpl { + workers := make([]*worker, numWorkers) + for i := 0; i < numWorkers; i++ { + workers[i] = newWorker() + } + return &defaultPoolImpl{ + hasher: hasher, + workers: workers, + } +} + +func (p *defaultPoolImpl) Run(ctx context.Context) error { + errg, ctx := errgroup.WithContext(ctx) + + for _, worker := range p.workers { + workerFinal := worker + errg.Go(func() error { + err := workerFinal.run(ctx) + if err != nil { + return errors.Trace(err) + } + return nil + }) + } + + return errg.Wait() +} + +func (p *defaultPoolImpl) RegisterEvent(f func(ctx context.Context, event interface{}) error) EventHandle { + handler := &defaultEventHandle{ + f: f, + errCh: make(chan error, 1), + id: atomic.AddInt64(&p.nextHandlerID, 1) - 1, + } + + workerID := p.hasher.Hash(handler) % int64(len(p.workers)) + p.workers[workerID].addHandle(handler) + handler.worker = p.workers[workerID] + + return handler +} + +type defaultEventHandle struct { + // the function to be run each time the event is triggered + f func(ctx context.Context, event interface{}) error + // whether this handle has been cancelled, must be accessed atomically + isCancelled int32 + // channel for the error returned by f + errCh chan error + // the worker that the handle is associated with + worker *worker + // identifier for this handle. No significant usage for now. + // Might be used to support consistent hashing in the future, + // so that the pool can be resized efficiently. + id int64 + + // whether there is a valid timer handler, must be accessed atomically + hasTimer int32 + // the time when timer was triggered the last time + lastTimer time.Time + // minimum interval between two timer calls + timerInterval time.Duration + // the handler for the timer + timerHandler func(ctx context.Context) error + + // whether this is a valid errorHandler, must be accessed atomically + hasErrorHandler int32 + // the error handler, called when the handle meets an error (which is returned by f) + errorHandler func(err error) +} + +func (h *defaultEventHandle) AddEvent(ctx context.Context, event interface{}) error { + if atomic.LoadInt32(&h.isCancelled) == 1 { + return cerrors.ErrWorkerPoolHandleCancelled.GenWithStackByArgs() + } + + failpoint.Inject("addEventDelayPoint", func() {}) + + task := &task{ + handle: h, + f: func(ctx1 context.Context) error { + // Here we merge the context passed down from WorkerPool.Run, + // with the context supplied by AddEvent, + // because we want operations to be cancellable by both contexts. + mContext, cancel := MergeContexts(ctx, ctx1) + // this cancels the merged context only. + defer cancel() + return h.f(mContext, event) + }, + } + + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case h.worker.taskCh <- task: + } + return nil +} + +func (h *defaultEventHandle) SetTimer(ctx context.Context, interval time.Duration, f func(ctx context.Context) error) EventHandle { + // mark the timer handler function as invalid + atomic.StoreInt32(&h.hasTimer, 0) + // wait for `hasTimer` to take effect, otherwise we might have a data race, if there was a previous handler. + h.worker.synchronize() + + h.timerInterval = interval + h.timerHandler = func(ctx1 context.Context) error { + mContext, cancel := MergeContexts(ctx, ctx1) + defer cancel() + return f(mContext) + } + // mark the timer handler function as valid + atomic.StoreInt32(&h.hasTimer, 1) + + return h +} + +func (h *defaultEventHandle) Unregister() { + if !atomic.CompareAndSwapInt32(&h.isCancelled, 0, 1) { + // already cancelled + return + } + + failpoint.Inject("unregisterDelayPoint", func() {}) + + // call synchronize so that all function executions related to this handle will be + // linearized BEFORE Unregister. + h.worker.synchronize() + + h.doCancel(cerrors.ErrWorkerPoolHandleCancelled.GenWithStackByArgs()) +} + +// callers of doCancel need to check h.isCancelled first. +// DO NOT call doCancel multiple times on the same handle. +func (h *defaultEventHandle) doCancel(err error) { + h.worker.removeHandle(h) + + if atomic.LoadInt32(&h.hasErrorHandler) == 1 { + h.errorHandler(err) + } + + h.errCh <- err + close(h.errCh) +} + +func (h *defaultEventHandle) ErrCh() <-chan error { + return h.errCh +} + +func (h *defaultEventHandle) OnExit(f func(err error)) EventHandle { + atomic.StoreInt32(&h.hasErrorHandler, 0) + h.worker.synchronize() + h.errorHandler = f + atomic.StoreInt32(&h.hasErrorHandler, 1) + return h +} + +func (h *defaultEventHandle) HashCode() int64 { + return h.id +} + +func (h *defaultEventHandle) cancelWithErr(err error) { + if !atomic.CompareAndSwapInt32(&h.isCancelled, 0, 1) { + // already cancelled + return + } + + h.doCancel(err) +} + +func (h *defaultEventHandle) durationSinceLastTimer() time.Duration { + return time.Since(h.lastTimer) +} + +func (h *defaultEventHandle) doTimer(ctx context.Context) error { + if atomic.LoadInt32(&h.hasTimer) == 0 { + return nil + } + + if h.durationSinceLastTimer() < h.timerInterval { + return nil + } + + err := h.timerHandler(ctx) + if err != nil { + return errors.Trace(err) + } + + h.lastTimer = time.Now() + + return nil +} + +type task struct { + handle *defaultEventHandle + f func(ctx context.Context) error +} + +type worker struct { + taskCh chan *task + handles map[*defaultEventHandle]struct{} + handleRWLock sync.RWMutex + // A message is passed to handleCancelCh when we need to wait for the + // current execution of handler to finish. Should be BLOCKING. + handleCancelCh chan struct{} + // must be accessed atomically + isRunning int32 + // notifies exits of run() + stopNotifier notify.Notifier +} + +func newWorker() *worker { + return &worker{ + taskCh: make(chan *task, 128000), + handles: make(map[*defaultEventHandle]struct{}), + handleCancelCh: make(chan struct{}), // this channel must be unbuffered, i.e. blocking + } +} + +func (w *worker) run(ctx context.Context) error { + ticker := time.NewTicker(workerPoolDefaultClockSourceInterval) + atomic.StoreInt32(&w.isRunning, 1) + defer func() { + ticker.Stop() + atomic.StoreInt32(&w.isRunning, 0) + w.stopNotifier.Notify() + }() + + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case task := <-w.taskCh: + if task == nil { + return cerrors.ErrWorkerPoolEmptyTask.GenWithStackByArgs() + } + if atomic.LoadInt32(&task.handle.isCancelled) == 1 { + // ignored cancelled handle + continue + } + + err := task.f(ctx) + if err != nil { + task.handle.cancelWithErr(err) + } + case <-ticker.C: + var handleErrs []struct { + h *defaultEventHandle + e error + } + + w.handleRWLock.RLock() + for handle := range w.handles { + if atomic.LoadInt32(&handle.isCancelled) == 1 { + // ignored cancelled handle + continue + } + err := handle.doTimer(ctx) + if err != nil { + handleErrs = append(handleErrs, struct { + h *defaultEventHandle + e error + }{handle, err}) + } + } + w.handleRWLock.RUnlock() + + // cancelWithErr must be called out side of the loop above, + // to avoid deadlock. + for _, handleErr := range handleErrs { + handleErr.h.cancelWithErr(handleErr.e) + } + case <-w.handleCancelCh: + } + } +} + +// synchronize waits for the worker to loop at least once, or to exit. +func (w *worker) synchronize() { + if atomic.LoadInt32(&w.isRunning) == 0 { + return + } + + receiver, err := w.stopNotifier.NewReceiver(time.Millisecond * 100) + if err != nil { + if cerrors.ErrOperateOnClosedNotifier.Equal(errors.Cause(err)) { + return + } + log.Panic("unexpected error", zap.Error(err)) + } + defer receiver.Stop() + + startTime := time.Now() + for { + workerHasFinishedLoop := false + select { + case w.handleCancelCh <- struct{}{}: + workerHasFinishedLoop = true + case <-receiver.C: + } + if workerHasFinishedLoop || atomic.LoadInt32(&w.isRunning) == 0 { + break + } + + if time.Since(startTime) > time.Second*10 { + // likely the workerpool has deadlocked, or there is a bug in the event handlers. + log.Warn("synchronize is taking too long, report a bug", zap.Duration("elapsed", time.Since(startTime))) + } + } +} + +func (w *worker) addHandle(handle *defaultEventHandle) { + w.handleRWLock.Lock() + defer w.handleRWLock.Unlock() + + w.handles[handle] = struct{}{} +} + +func (w *worker) removeHandle(handle *defaultEventHandle) { + w.handleRWLock.Lock() + defer w.handleRWLock.Unlock() + + delete(w.handles, handle) +} diff --git a/pkg/workerpool/pool_test.go b/pkg/workerpool/pool_test.go new file mode 100644 index 00000000000..2c53ff0ab19 --- /dev/null +++ b/pkg/workerpool/pool_test.go @@ -0,0 +1,370 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerpool + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/util/testleak" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +func TestSuite(t *testing.T) { check.TestingT(t) } + +type workerPoolSuite struct{} + +var _ = check.Suite(&workerPoolSuite{}) + +func (s *workerPoolSuite) TestTaskError(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return pool.Run(ctx) + }) + + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + if event.(int) == 3 { + return errors.New("test error") + } + return nil + }).OnExit(func(err error) { + c.Assert(err, check.ErrorMatches, "test error") + }) + + errg.Go(func() error { + for i := 0; i < 10; i++ { + err := handle.AddEvent(ctx, i) + if err != nil { + c.Assert(err, check.ErrorMatches, ".*ErrWorkerPoolHandleCancelled.*") + return nil + } + } + return nil + }) + + select { + case <-ctx.Done(): + c.FailNow() + case err := <-handle.ErrCh(): + c.Assert(err, check.ErrorMatches, "test error") + } + cancel() + + err := errg.Wait() + c.Assert(err, check.ErrorMatches, "context canceled") +} + +func (s *workerPoolSuite) TestTimerError(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return pool.Run(ctx) + }) + + counter := 0 + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + return nil + }).SetTimer(ctx, time.Millisecond*200, func(ctx context.Context) error { + if counter == 3 { + return errors.New("timer error") + } + counter++ + return nil + }) + + select { + case <-ctx.Done(): + c.FailNow() + case err := <-handle.ErrCh(): + c.Assert(err, check.ErrorMatches, "timer error") + } + cancel() + + err := errg.Wait() + c.Assert(err, check.ErrorMatches, "context canceled") +} + +func (s *workerPoolSuite) TestMultiError(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return pool.Run(ctx) + }) + + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + if event.(int) >= 3 { + return errors.New("test error") + } + return nil + }) + + errg.Go(func() error { + for i := 0; i < 10; i++ { + err := handle.AddEvent(ctx, i) + if err != nil { + c.Assert(err, check.ErrorMatches, ".*ErrWorkerPoolHandleCancelled.*") + } + } + return nil + }) + + select { + case <-ctx.Done(): + c.FailNow() + case err := <-handle.ErrCh(): + c.Assert(err, check.ErrorMatches, "test error") + } + cancel() + + err := errg.Wait() + c.Assert(err, check.ErrorMatches, "context canceled") +} + +func (s *workerPoolSuite) TestCancelHandle(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return pool.Run(ctx) + }) + + var num int32 + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + atomic.StoreInt32(&num, int32(event.(int))) + return nil + }) + + errg.Go(func() error { + i := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + err := handle.AddEvent(ctx, i) + if err != nil { + c.Assert(err, check.ErrorMatches, ".*ErrWorkerPoolHandleCancelled.*") + c.Assert(i, check.GreaterEqual, 5000) + return nil + } + i++ + } + }) + + for { + select { + case <-ctx.Done(): + c.FailNow() + default: + } + if atomic.LoadInt32(&num) > 5000 { + break + } + } + + err := failpoint.Enable("github.com/pingcap/ticdc/pkg/workerpool/addEventDelayPoint", "1*sleep(500)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/pkg/workerpool/addEventDelayPoint") + }() + + handle.Unregister() + handle.Unregister() // Unregistering many times does not matter + handle.Unregister() + + lastNum := atomic.LoadInt32(&num) + for i := 0; i <= 1000; i++ { + c.Assert(atomic.LoadInt32(&num), check.Equals, lastNum) + } + + time.Sleep(1 * time.Second) + cancel() + + err = errg.Wait() + c.Assert(err, check.ErrorMatches, "context canceled") +} + +func (s *workerPoolSuite) TestCancelTimer(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return pool.Run(ctx) + }) + + err := failpoint.Enable("github.com/pingcap/ticdc/pkg/workerpool/unregisterDelayPoint", "sleep(5000)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/pkg/workerpool/unregisterDelayPoint") + }() + + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + return nil + }).SetTimer(ctx, 200*time.Millisecond, func(ctx context.Context) error { + return nil + }) + + errg.Go(func() error { + i := 0 + for { + err := handle.AddEvent(ctx, i) + if err != nil { + c.Assert(err, check.ErrorMatches, ".*ErrWorkerPoolHandleCancelled.*") + return nil + } + i++ + } + }) + + handle.Unregister() + + cancel() + err = errg.Wait() + c.Assert(err, check.ErrorMatches, "context canceled") +} + +func (s *workerPoolSuite) TestTimer(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return pool.Run(ctx) + }) + + time.Sleep(200 * time.Millisecond) + + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + if event.(int) == 3 { + return errors.New("test error") + } + return nil + }) + + var lastTime time.Time + count := 0 + handle.SetTimer(ctx, time.Second*1, func(ctx context.Context) error { + if !lastTime.IsZero() { + c.Assert(time.Since(lastTime), check.GreaterEqual, 900*time.Millisecond) + c.Assert(time.Since(lastTime), check.LessEqual, 1200*time.Millisecond) + } + if count == 3 { + cancel() + return nil + } + count++ + + lastTime = time.Now() + return nil + }) + + err := errg.Wait() + c.Assert(err, check.ErrorMatches, "context canceled") +} + +func (s *workerPoolSuite) TestBasics(c *check.C) { + defer testleak.AfterTest(c)() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + + errg.Go(func() error { + return pool.Run(ctx) + }) + + var wg sync.WaitGroup + + wg.Add(16) + for i := 0; i < 16; i++ { + finalI := i + resultCh := make(chan int, 128) + handler := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + select { + case <-ctx.Done(): + return ctx.Err() + case resultCh <- event.(int): + } + log.Debug("result added", zap.Int("id", finalI), zap.Int("result", event.(int))) + return nil + }) + + errg.Go(func() error { + for j := 0; j < 256; j++ { + err := handler.AddEvent(ctx, j) + if err != nil { + return errors.Trace(err) + } + } + return nil + }) + + errg.Go(func() error { + defer wg.Done() + nextExpected := 0 + for n := range resultCh { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + log.Debug("result received", zap.Int("id", finalI), zap.Int("result", n)) + c.Assert(n, check.Equals, nextExpected) + nextExpected++ + if nextExpected == 256 { + break + } + } + return nil + }) + } + + wg.Wait() + cancel() + + err := errg.Wait() + c.Assert(err, check.ErrorMatches, "context canceled") +} diff --git a/testing_utils/many_sorters_test/many_sorters.go b/testing_utils/many_sorters_test/many_sorters.go index 0acfbc2428e..414de49cece 100644 --- a/testing_utils/many_sorters_test/many_sorters.go +++ b/testing_utils/many_sorters_test/many_sorters.go @@ -30,7 +30,9 @@ import ( "github.com/pingcap/ticdc/cdc/puller" pullerSorter "github.com/pingcap/ticdc/cdc/puller/sorter" "github.com/pingcap/ticdc/pkg/config" + cerrors "github.com/pingcap/ticdc/pkg/errors" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" ) @@ -41,17 +43,18 @@ var percentageResolves = flag.Int("percentage-resolve-events", 70, "percentage o func main() { flag.Parse() - log.SetLevel(zap.DebugLevel) err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)") if err != nil { log.Fatal("Could not enable failpoint", zap.Error(err)) } + log.SetLevel(zapcore.DebugLevel) config.SetSorterConfig(&config.SorterConfig{ - NumConcurrentWorker: 4, - ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, - MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + NumWorkerPoolGoroutine: 16, }) go func() { @@ -67,6 +70,10 @@ func main() { ctx0, cancel := context.WithCancel(context.Background()) errg, ctx := errgroup.WithContext(ctx0) + errg.Go(func() error { + return pullerSorter.RunWorkerPool(ctx) + }) + var finishCount int32 for i := 0; i < *numSorters; i++ { sorters[i] = pullerSorter.NewUnifiedSorter(*sorterDir, fmt.Sprintf("test-%d", i), "0.0.0.0:0") @@ -142,7 +149,8 @@ func printError(err error) error { if err != nil && errors.Cause(err) != context.Canceled && errors.Cause(err) != context.DeadlineExceeded && !strings.Contains(err.Error(), "context canceled") && - !strings.Contains(err.Error(), "context deadline exceeded") { + !strings.Contains(err.Error(), "context deadline exceeded") && + cerrors.ErrWorkerPoolHandleCancelled.NotEqual(errors.Cause(err)) { log.Warn("Unified Sorter: Error detected", zap.Error(err)) } diff --git a/testing_utils/many_sorters_test/many_sorters_test.go b/testing_utils/many_sorters_test/many_sorters_test.go new file mode 100644 index 00000000000..bca7aaa063d --- /dev/null +++ b/testing_utils/many_sorters_test/many_sorters_test.go @@ -0,0 +1,20 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import "testing" + +func TestFoo(t *testing.T) { + main() +} diff --git a/testing_utils/sorter_stress_test/sorter_stress.go b/testing_utils/sorter_stress_test/sorter_stress.go index 2f058397a6f..c75415f20c1 100644 --- a/testing_utils/sorter_stress_test/sorter_stress.go +++ b/testing_utils/sorter_stress_test/sorter_stress.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/pingcap/failpoint" + "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/puller" @@ -32,10 +33,19 @@ import ( "golang.org/x/sync/errgroup" ) +<<<<<<< HEAD var sorterDir = flag.String("dir", "./sorter", "temporary directory used for sorting") var numBatches = flag.Int("num-batches", 256, "number of batches of ordered events") var msgsPerBatch = flag.Int("num-messages-per-batch", 102400, "number of events in a batch") var bytesPerMsg = flag.Int("bytes-per-message", 1024, "number of bytes in an event") +======= +var ( + sorterDir = flag.String("dir", "./sorter", "temporary directory used for sorting") + numBatches = flag.Int("num-batches", 256, "number of batches of ordered events") + msgsPerBatch = flag.Int("num-messages-per-batch", 1024, "number of events in a batch") + bytesPerMsg = flag.Int("bytes-per-message", 1024, "number of bytes in an event") +) +>>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) func main() { flag.Parse() @@ -67,6 +77,10 @@ func main() { eg, ctx := errgroup.WithContext(ctx1) + eg.Go(func() error { + return pullerSorter.RunWorkerPool(ctx) + }) + eg.Go(func() error { return sorter.Run(ctx) }) diff --git a/tests/_utils/run_cdc_server b/tests/_utils/run_cdc_server index ad55d087757..691e3c17818 100755 --- a/tests/_utils/run_cdc_server +++ b/tests/_utils/run_cdc_server @@ -86,6 +86,7 @@ if [[ "$restart" == "true" ]]; then GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ + --sorter-num-workerpool-goroutine 4 \ $tls \ $certcn \ $addr \ @@ -99,6 +100,7 @@ else GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ + --sorter-num-workerpool-goroutine 4 \ $tls \ $certcn \ $addr \ diff --git a/tests/cdclog_s3/run.sh b/tests/cdclog_s3/run.sh index 990c2a895c1..d4e5c45ec97 100644 --- a/tests/cdclog_s3/run.sh +++ b/tests/cdclog_s3/run.sh @@ -16,6 +16,7 @@ export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY export S3_ENDPOINT=127.0.0.1:24927 rm -rf "$WORK_DIR" mkdir -p "$WORK_DIR" +pkill -9 minio || true bin/minio server --address $S3_ENDPOINT "$WORK_DIR/s3" & MINIO_PID=$! i=0 diff --git a/tests/ddl_puller_lag/run.sh b/tests/ddl_puller_lag/run.sh index 8ca06b70a06..b1c9b4d1925 100644 --- a/tests/ddl_puller_lag/run.sh +++ b/tests/ddl_puller_lag/run.sh @@ -22,7 +22,7 @@ function prepare() { run_sql "CREATE table test.ddl_puller_lag2(id int primary key, val int);" - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --failpoint 'github.com/pingcap/ticdc/cdc/processorDDLResolved=1*sleep(600000)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --failpoint 'github.com/pingcap/ticdc/cdc/processorDDLResolved=1*sleep(180000)' TOPIC_NAME="ticdc-ddl-puller-lag-test-$RANDOM" case $SINK_TYPE in @@ -102,6 +102,6 @@ function sql_test() { trap stop_tidb_cluster EXIT prepare $* -sleep 600 +sleep 180 sql_test $* echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/kafka_messages/run.sh b/tests/kafka_messages/run.sh index 548331a495d..7c4dd19a86a 100755 --- a/tests/kafka_messages/run.sh +++ b/tests/kafka_messages/run.sh @@ -32,7 +32,7 @@ function run_length_limit() { TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=4096" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-dir="$sort_dir" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=4096" fi @@ -40,7 +40,7 @@ function run_length_limit() { # Add a check table to reduce check time, or if we check data with sync diff # directly, there maybe a lot of diff data at first because of the incremental scan run_sql "CREATE table kafka_message.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "kafka_message.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_table_exists "kafka_message.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml @@ -58,7 +58,7 @@ function run_length_limit() { run_sql "create table kafka_message.USERTABLE2 like kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "insert into kafka_message.USERTABLE2 select * from kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "create table kafka_message.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "kafka_message.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_table_exists "kafka_message.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml @@ -88,7 +88,7 @@ function run_batch_size_limit() { TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-dir="$sort_dir" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3" fi @@ -96,7 +96,7 @@ function run_batch_size_limit() { # Add a check table to reduce check time, or if we check data with sync diff # directly, there maybe a lot of diff data at first because of the incremental scan run_sql "CREATE table kafka_message.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "kafka_message.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_table_exists "kafka_message.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml @@ -114,7 +114,7 @@ function run_batch_size_limit() { run_sql "create table kafka_message.USERTABLE2 like kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "insert into kafka_message.USERTABLE2 select * from kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "create table kafka_message.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "kafka_message.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_table_exists "kafka_message.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml diff --git a/tests/processor_panic/run.sh b/tests/processor_panic/run.sh index 7ce20cb508a..a161e2be8b5 100644 --- a/tests/processor_panic/run.sh +++ b/tests/processor_panic/run.sh @@ -41,7 +41,7 @@ prepare $* cd "$(dirname "$0")" set -o pipefail GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log -check_table_exists test.end_mark_table ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} +check_table_exists test.end_mark_table ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/diff_config.toml cleanup_process $CDC_BINARY diff --git a/tests/unified_sorter/conf/workload b/tests/unified_sorter/conf/workload index 4c7f6583e40..88bdb41f85d 100644 --- a/tests/unified_sorter/conf/workload +++ b/tests/unified_sorter/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=45000 +recordcount=30000 operationcount=0 workload=core diff --git a/tests/unified_sorter/run.sh b/tests/unified_sorter/run.sh index a1519295ed2..de7e7517314 100755 --- a/tests/unified_sorter/run.sh +++ b/tests/unified_sorter/run.sh @@ -5,7 +5,7 @@ set -e CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc +CDC_BINARY=cdc.test SINK_TYPE=$1 CDC_COUNT=3 From 2bc2ef32c0e5081d75506d3173947c8f0c81a240 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 31 Dec 2020 23:31:49 +0800 Subject: [PATCH 2/3] merge --- Makefile | 5 ----- cdc/puller/sorter/file_backend.go | 4 ---- cdc/puller/sorter_test.go | 8 -------- pkg/errors/errors.go | 3 --- testing_utils/sorter_stress_test/sorter_stress.go | 10 +--------- 5 files changed, 1 insertion(+), 29 deletions(-) diff --git a/Makefile b/Makefile index ec2fe21b561..99d79189af3 100644 --- a/Makefile +++ b/Makefile @@ -32,13 +32,8 @@ MAC := "Darwin" PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration|testing_utils' PACKAGES := $$($(PACKAGE_LIST)) PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||' -<<<<<<< HEAD -FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor' | grep -vE 'kv_gen') -TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration') -======= FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|kv_gen|proto') TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration|testing_utils') ->>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) CDC_PKG := github.com/pingcap/ticdc FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/$(PROJECT)/"}|grep -v "github.com/pingcap/$(PROJECT)"; done) FAILPOINT := bin/failpoint-ctl diff --git a/cdc/puller/sorter/file_backend.go b/cdc/puller/sorter/file_backend.go index ad0526fcc36..42fc2a19082 100644 --- a/cdc/puller/sorter/file_backend.go +++ b/cdc/puller/sorter/file_backend.go @@ -63,11 +63,7 @@ func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd } func (f *fileBackEnd) reader() (backEndReader, error) { -<<<<<<< HEAD - fd, err := os.OpenFile(f.fileName, os.O_RDONLY, 0644) -======= fd, err := os.OpenFile(f.fileName, os.O_RDWR, 0o644) ->>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/puller/sorter_test.go b/cdc/puller/sorter_test.go index cf3a2bf0fab..f55d22ad01c 100644 --- a/cdc/puller/sorter_test.go +++ b/cdc/puller/sorter_test.go @@ -65,11 +65,7 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) { NumWorkerPoolGoroutine: 4, }) -<<<<<<< HEAD - err := os.MkdirAll("./sorter", 0755) -======= err := os.MkdirAll("/tmp/sorter", 0o755) ->>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) c.Assert(err, check.IsNil) sorter := sorter2.NewUnifiedSorter("/tmp/sorter", "test", "0.0.0.0:0") @@ -90,11 +86,7 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { NumWorkerPoolGoroutine: 4, }) -<<<<<<< HEAD - err := os.MkdirAll("./sorter", 0755) -======= err := os.MkdirAll("/tmp/sorter", 0o755) ->>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) c.Assert(err, check.IsNil) sorter := sorter2.NewUnifiedSorter("/tmp/sorter", "test", "0.0.0.0:0") diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 65284717d3d..f8b76664f85 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -175,8 +175,6 @@ var ( ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch")) ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) -<<<<<<< HEAD -======= // EtcdWorker related errors. Internal use only. // ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort. @@ -196,5 +194,4 @@ var ( // unified sorter errors ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating")) ->>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) ) diff --git a/testing_utils/sorter_stress_test/sorter_stress.go b/testing_utils/sorter_stress_test/sorter_stress.go index c75415f20c1..eda06b0f1e8 100644 --- a/testing_utils/sorter_stress_test/sorter_stress.go +++ b/testing_utils/sorter_stress_test/sorter_stress.go @@ -23,7 +23,6 @@ import ( "strings" "github.com/pingcap/failpoint" - "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/puller" @@ -33,19 +32,12 @@ import ( "golang.org/x/sync/errgroup" ) -<<<<<<< HEAD -var sorterDir = flag.String("dir", "./sorter", "temporary directory used for sorting") -var numBatches = flag.Int("num-batches", 256, "number of batches of ordered events") -var msgsPerBatch = flag.Int("num-messages-per-batch", 102400, "number of events in a batch") -var bytesPerMsg = flag.Int("bytes-per-message", 1024, "number of bytes in an event") -======= var ( - sorterDir = flag.String("dir", "./sorter", "temporary directory used for sorting") + sorterDir = flag.String("dir", "/tmp/sorter", "temporary directory used for sorting") numBatches = flag.Int("num-batches", 256, "number of batches of ordered events") msgsPerBatch = flag.Int("num-messages-per-batch", 1024, "number of events in a batch") bytesPerMsg = flag.Int("bytes-per-message", 1024, "number of bytes in an event") ) ->>>>>>> a3fb52e... sorter: Stabilize Unified Sorter (#1210) func main() { flag.Parse() From e50fc35fd5d89f5054e40555c7af46c445d4108c Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 6 Jan 2021 13:37:08 +0800 Subject: [PATCH 3/3] change the default sorter back to memory --- cdc/model/changefeed.go | 2 +- cdc/model/changefeed_test.go | 2 +- cmd/client_changefeed.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index fb148bccaef..7f50659bd43 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -176,7 +176,7 @@ func (info *ChangeFeedInfo) Unmarshal(data []byte) error { func (info *ChangeFeedInfo) VerifyAndFix() error { defaultConfig := config.GetDefaultReplicaConfig() if info.Engine == "" { - info.Engine = SortUnified + info.Engine = SortInMemory } if info.Config.Filter == nil { info.Config.Filter = defaultConfig.Filter diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 10322f63e72..f2ca0324115 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -179,7 +179,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) { err := info.VerifyAndFix() c.Assert(err, check.IsNil) - c.Assert(info.Engine, check.Equals, SortUnified) + c.Assert(info.Engine, check.Equals, SortInMemory) marshalConfig1, err := info.Config.Marshal() c.Assert(err, check.IsNil) diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index 2a42530abd8..c0147a7a917 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -376,7 +376,7 @@ func changefeedConfigVariables(command *cobra.Command) { command.PersistentFlags().StringVar(&sinkURI, "sink-uri", "", "sink uri") command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file") command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format") - command.PersistentFlags().StringVar(&sortEngine, "sort-engine", "unified", "sort engine used for data sort") + command.PersistentFlags().StringVar(&sortEngine, "sort-engine", "memory", "sort engine used for data sort") command.PersistentFlags().StringVar(&sortDir, "sort-dir", ".", "directory used for file sort") command.PersistentFlags().StringVar(&timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)") command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed")