Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Dec 4, 2019
1 parent 3a26798 commit 1e5bb2d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 74 deletions.
21 changes: 8 additions & 13 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ type HashAggFinalWorker struct {

// AfFinalResult indicates aggregation functions final result.
type AfFinalResult struct {
chk *chunk.Chunk
err error
chk *chunk.Chunk
err error
giveBackCh chan *chunk.Chunk
}

// HashAggExec deals with all the aggregate functions.
Expand Down Expand Up @@ -150,7 +151,6 @@ type HashAggExec struct {

finishCh chan struct{}
finalOutputCh chan *AfFinalResult
finalInputCh chan *chunk.Chunk
partialOutputChs []chan *HashAggIntermData
inputCh chan *HashAggInput
partialInputChs []chan *chunk.Chunk
Expand Down Expand Up @@ -271,10 +271,6 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialConcurrency := sessionVars.HashAggPartialConcurrency
e.isChildReturnEmpty = true
e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency)
e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency)
for i := 0; i < finalConcurrency; i++ {
e.finalInputCh <- newFirstChunk(e)
}
e.inputCh = make(chan *HashAggInput, partialConcurrency)
e.finishCh = make(chan struct{}, 1)

Expand Down Expand Up @@ -319,11 +315,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
groupSet: set.NewStringSet(),
inputCh: e.partialOutputChs[i],
outputCh: e.finalOutputCh,
finalResultHolderCh: e.finalInputCh,
finalResultHolderCh: make(chan *chunk.Chunk, 1),
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(retTypes(e)),
groupKeys: make([][]byte, 0, 8),
}
e.finalWorkers[i].finalResultHolderCh <- newFirstChunk(e)
}
}

Expand Down Expand Up @@ -550,7 +547,7 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) {
}
}
}
w.outputCh <- &AfFinalResult{chk: result}
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
}

func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {
Expand Down Expand Up @@ -683,13 +680,11 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
if result.err != nil {
return result.err
}
chk.Append(result.chk, 0, result.chk.NumRows())
chk.SwapColumns(result.chk)
result.chk.Reset()
e.finalInputCh <- result.chk
result.giveBackCh <- result.chk
if chk.NumRows() > 0 {
e.isChildReturnEmpty = false
}
if chk.IsFull() {
return nil
}
}
Expand Down
61 changes: 0 additions & 61 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,67 +675,6 @@ func (s *testExecSuite) TestStreamAggRequiredRows(c *C) {
}
}

func (s *testExecSuite) TestHashAggParallelRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
aggFunc string
requiredRows []int
expectedRows []int
expectedRowsDS []int
gen func(valType *types.FieldType) interface{}
}{
{
totalRows: maxChunkSize,
aggFunc: ast.AggFuncSum,
requiredRows: []int{1, 2, 3, 4, 5, 6, 7},
expectedRows: []int{1, 2, 3, 4, 5, 6, 7},
expectedRowsDS: []int{maxChunkSize, 0},
gen: divGenerator(1),
},
{
totalRows: maxChunkSize * 3,
aggFunc: ast.AggFuncAvg,
requiredRows: []int{1, 3},
expectedRows: []int{1, 2},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0},
gen: divGenerator(maxChunkSize),
},
{
totalRows: maxChunkSize * 3,
aggFunc: ast.AggFuncAvg,
requiredRows: []int{maxChunkSize, maxChunkSize},
expectedRows: []int{maxChunkSize, maxChunkSize / 2},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0},
gen: divGenerator(2),
},
}

for _, hasDistinct := range []bool{false, true} {
for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen)
childCols := ds.Schema().Columns
schema := expression.NewSchema(childCols...)
groupBy := []expression.Expression{childCols[1]}
aggFunc, err := aggregation.NewAggFuncDesc(sctx, testCase.aggFunc, []expression.Expression{childCols[0]}, hasDistinct)
c.Assert(err, IsNil)
aggFuncs := []*aggregation.AggFuncDesc{aggFunc}
exec := buildHashAggExecutor(sctx, ds, schema, aggFuncs, groupBy)
c.Assert(exec.Open(ctx), IsNil)
chk := newFirstChunk(exec)
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
}

func (s *testExecSuite) TestMergeJoinRequiredRows(c *C) {
justReturn1 := func(valType *types.FieldType) interface{} {
switch valType.Tp {
Expand Down

0 comments on commit 1e5bb2d

Please sign in to comment.