Skip to content

Commit

Permalink
disttask: add operator abstraction (#46279)
Browse files Browse the repository at this point in the history
ref #46258
  • Loading branch information
tangenta authored Aug 22, 2023
1 parent 1769f3a commit 1ea6499
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 21 deletions.
5 changes: 1 addition & 4 deletions ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,8 @@ func (b *ingestBackfillScheduler) setupWorkers() error {
}
b.copReqSenderPool = copReqSenderPool
readerCnt, writerCnt := b.expectedWorkerSize()
writerPool, err := workerpool.NewWorkerPool[idxRecResult]("ingest_writer",
writerPool := workerpool.NewWorkerPool[idxRecResult]("ingest_writer",
poolutil.DDL, writerCnt, b.createWorker)
if err != nil {
return errors.Trace(err)
}
writerPool.Start()
b.writerPool = writerPool
b.copReqSenderPool.chunkSender = writerPool
Expand Down
27 changes: 27 additions & 0 deletions disttask/operator/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "operator",
srcs = [
"compose.go",
"operator.go",
"pipeline.go",
"wrapper.go",
],
importpath = "github.com/pingcap/tidb/disttask/operator",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager/pool/workerpool",
"//resourcemanager/util",
"@org_golang_x_sync//errgroup",
],
)

go_test(
name = "operator_test",
timeout = "short",
srcs = ["pipeline_test.go"],
embed = [":operator"],
flaky = True,
deps = ["@com_github_stretchr_testify//require"],
)
58 changes: 58 additions & 0 deletions disttask/operator/compose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package operator

// WithSource is an interface that can be used to set the source of an operator.
type WithSource[T any] interface {
SetSource(channel DataChannel[T])
}

// WithSink is an interface that can be used to set the sink of an operator.
type WithSink[T any] interface {
SetSink(channel DataChannel[T])
}

// Compose sets the sink of op1 and the source of op2.
func Compose[T any](op1 WithSink[T], op2 WithSource[T]) {
ch := NewSimpleDataChannel(make(chan T))
op1.SetSink(ch)
op2.SetSource(ch)
}

// DataChannel is a channel that can be used to transfer data between operators.
type DataChannel[T any] interface {
Channel() chan T
Finish()
}

// SimpleDataChannel is a simple implementation of DataChannel.
type SimpleDataChannel[T any] struct {
channel chan T
}

// NewSimpleDataChannel creates a new SimpleDataChannel.
func NewSimpleDataChannel[T any](ch chan T) *SimpleDataChannel[T] {
return &SimpleDataChannel[T]{channel: ch}
}

// Channel returns the underlying channel of the SimpleDataChannel.
func (s *SimpleDataChannel[T]) Channel() chan T {
return s.channel
}

// Finish closes the underlying channel of the SimpleDataChannel.
func (s *SimpleDataChannel[T]) Finish() {
close(s.channel)
}
102 changes: 102 additions & 0 deletions disttask/operator/operator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package operator

import (
"fmt"

"github.com/pingcap/tidb/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/resourcemanager/util"
)

// Operator is the basic operation unit in the task execution.
type Operator interface {
Open() error
Close() error
String() string
}

// AsyncOperator process the data in async way.
//
// Eg: The sink of AsyncOperator op1 and the source of op2
// use the same channel, Then op2's worker will handle
// the result from op1.
type AsyncOperator[T, R any] struct {
pool *workerpool.WorkerPool[T, R]
}

// NewAsyncOperatorWithTransform create an AsyncOperator with a transform function.
func NewAsyncOperatorWithTransform[T, R any](name string, workerNum int, transform func(T) R) *AsyncOperator[T, R] {
pool := workerpool.NewWorkerPool(name, util.DistTask, workerNum, newAsyncWorkerCtor(transform))
return NewAsyncOperator(pool)
}

// NewAsyncOperator create an AsyncOperator.
func NewAsyncOperator[T, R any](pool *workerpool.WorkerPool[T, R]) *AsyncOperator[T, R] {
return &AsyncOperator[T, R]{
pool: pool,
}
}

// Open implements the Operator's Open interface.
func (c *AsyncOperator[T, R]) Open() error {
c.pool.Start()
return nil
}

// Close implements the Operator's Close interface.
func (c *AsyncOperator[T, R]) Close() error {
// Wait all tasks done.
// We don't need to close the task channel because
// it is closed by the workerpool.
c.pool.Wait()
c.pool.Release()
return nil
}

// String show the name.
func (*AsyncOperator[T, R]) String() string {
var zT T
var zR R
return fmt.Sprintf("AsyncOp[%T, %T]", zT, zR)
}

// SetSource set the source channel.
func (c *AsyncOperator[T, R]) SetSource(ch DataChannel[T]) {
c.pool.SetTaskReceiver(ch.Channel())
}

// SetSink set the sink channel.
func (c *AsyncOperator[T, R]) SetSink(ch DataChannel[R]) {
c.pool.SetResultSender(ch.Channel())
}

type asyncWorker[T, R any] struct {
transform func(T) R
}

func newAsyncWorkerCtor[T, R any](transform func(T) R) func() workerpool.Worker[T, R] {
return func() workerpool.Worker[T, R] {
return &asyncWorker[T, R]{
transform: transform,
}
}
}

func (s *asyncWorker[T, R]) HandleTask(task T) R {
return s.transform(task)
}

func (*asyncWorker[T, R]) Close() {}
67 changes: 67 additions & 0 deletions disttask/operator/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package operator

import "strings"

// AsyncPipeline wraps a list of Operators.
// The dataflow is from the first operator to the last operator.
type AsyncPipeline struct {
ops []Operator
}

// Execute starts all operators waiting to handle tasks.
func (p *AsyncPipeline) Execute() error {
// Start running each operator.
for i, op := range p.ops {
err := op.Open()
if err != nil {
// Close all operators that have been opened.
for j := i - 1; j >= 0; j-- {
_ = p.ops[j].Close()
}
return err
}
}
return nil
}

// Close waits all tasks done.
func (p *AsyncPipeline) Close() error {
var firstErr error
for _, op := range p.ops {
err := op.Close()
if firstErr == nil {
firstErr = err
}
}
return firstErr
}

// NewAsyncPipeline creates a new AsyncPipeline.
func NewAsyncPipeline(ops ...Operator) *AsyncPipeline {
return &AsyncPipeline{
ops: ops,
}
}

// String shows the pipeline.
func (p *AsyncPipeline) String() string {
opStrs := make([]string, len(p.ops))
for i, op := range p.ops {
opStrs[i] = op.String()
}
return "AsyncPipeline[" + strings.Join(opStrs, " -> ") + "]"
}
101 changes: 101 additions & 0 deletions disttask/operator/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package operator

import (
"regexp"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestPipelineAsyncMultiOperators(t *testing.T) {
words := `Bob hiT a ball, the hIt BALL flew far after it was hit.`
var mostCommonWord string
splitter := makeSplitter(words)
lower := makeLower()
trimmer := makeTrimmer()
counter := makeCounter()
collector := makeCollector(&mostCommonWord)

Compose[string](splitter, lower)
Compose[string](lower, trimmer)
Compose[string](trimmer, counter)
Compose[strCnt](counter, collector)

pipeline := NewAsyncPipeline(splitter, lower, trimmer, counter, collector)
require.Equal(t, pipeline.String(), "AsyncPipeline[simpleSource -> simpleOperator(AsyncOp[string, string]) -> simpleOperator(AsyncOp[string, string]) -> simpleOperator(AsyncOp[string, operator.strCnt]) -> simpleSink]")
err := pipeline.Execute()
require.NoError(t, err)
err = pipeline.Close()
require.NoError(t, err)
require.Equal(t, mostCommonWord, "hit")
}

type strCnt struct {
str string
cnt int
}

func makeSplitter(s string) *simpleSource[string] {
ss := strings.Split(s, " ")
src := newSimpleSource(func() string {
if len(ss) == 0 {
return ""
}
ret := ss[0]
ss = ss[1:]
return ret
})
return src
}

func makeLower() *simpleOperator[string, string] {
return newSimpleOperator(strings.ToLower, 3)
}

func makeTrimmer() *simpleOperator[string, string] {
var nonAlphaRegex = regexp.MustCompile(`[^a-zA-Z0-9]+`)
return newSimpleOperator(func(s string) string {
return nonAlphaRegex.ReplaceAllString(s, "")
}, 3)
}

func makeCounter() *simpleOperator[string, strCnt] {
strCntMap := make(map[string]int)
strCntMapMu := sync.Mutex{}
return newSimpleOperator(func(s string) strCnt {
strCntMapMu.Lock()
old := strCntMap[s]
strCntMap[s] = old + 1
strCntMapMu.Unlock()
return strCnt{s, old + 1}
}, 3)
}

func makeCollector(v *string) *simpleSink[strCnt] {
maxCnt := 0
maxMu := sync.Mutex{}
return newSimpleSink(func(sc strCnt) {
maxMu.Lock()
if sc.cnt > maxCnt {
maxCnt = sc.cnt
*v = sc.str
}
maxMu.Unlock()
})
}
Loading

0 comments on commit 1ea6499

Please sign in to comment.