Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: make tidb being aware of dst. #7303

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab
}
colMap := make(map[int64]*types.FieldType)
colMap[col.ID] = &col.FieldType
rowMap, err := tablecodec.DecodeRow(data, colMap, ctx.GetSessionVars().GetTimeZone())
rowMap, err := tablecodec.DecodeRow(data, colMap, ctx.GetSessionVars().Location())
if err != nil {
return errors.Trace(err)
}
Expand Down
199 changes: 199 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package distsql

import (
"time"

"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)

var (
_ SelectResult = (*selectResult)(nil)
_ SelectResult = (*streamResult)(nil)
)

var (
selectResultGP = gp.New(time.Minute * 2)
)

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// Fetch fetches partial results from client.
Fetch(context.Context)
// NextRaw gets the next raw result.
NextRaw(context.Context) ([]byte, error)
// Next reads the data into chunk.
Next(context.Context, *chunk.Chunk) error
// Close closes the iterator.
Close() error
}

type resultWithErr struct {
result kv.ResultSubset
err error
}

type selectResult struct {
label string
resp kv.Response

results chan resultWithErr
closed chan struct{}

rowLen int
fieldTypes []*types.FieldType
ctx sessionctx.Context

selectResp *tipb.SelectResponse
respChkIdx int

feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
}

func (r *selectResult) Fetch(ctx context.Context) {
selectResultGP.Go(func() {
r.fetch(ctx)
})
}

func (r *selectResult) fetch(ctx context.Context) {
startTime := time.Now()
defer func() {
close(r.results)
duration := time.Since(startTime)
metrics.DistSQLQueryHistgram.WithLabelValues(r.label).Observe(duration.Seconds())
}()
for {
resultSubset, err := r.resp.Next(ctx)
if err != nil {
r.results <- resultWithErr{err: errors.Trace(err)}
return
}
if resultSubset == nil {
return
}

select {
case r.results <- resultWithErr{result: resultSubset}:
case <-r.closed:
// If selectResult called Close() already, make fetch goroutine exit.
return
case <-ctx.Done():
return
}
}
}

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {
re := <-r.results
r.partialCount++
r.feedback.Invalidate()
if re.result == nil || re.err != nil {
return nil, errors.Trace(re.err)
}
return re.result.GetData(), nil
}

// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
for chk.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
return errors.Trace(err)
}
}
err := r.readRowsData(chk)
if err != nil {
return errors.Trace(err)
}
if len(r.selectResp.Chunks[r.respChkIdx].RowsData) == 0 {
r.respChkIdx++
}
}
return nil
}

func (r *selectResult) getSelectResp() error {
r.respChkIdx = 0
for {
re := <-r.results
if re.err != nil {
return errors.Trace(re.err)
}
if re.result == nil {
r.selectResp = nil
return nil
}
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
for _, warning := range r.selectResp.Warnings {
r.ctx.GetSessionVars().StmtCtx.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
}
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
if len(r.selectResp.Chunks) == 0 {
continue
}
return nil
}
}

func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
rowsData := r.selectResp.Chunks[r.respChkIdx].RowsData
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location())
for chk.NumRows() < maxChunkSize && len(rowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
rowsData, err = decoder.DecodeOne(rowsData, i, r.fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
}
}
r.selectResp.Chunks[r.respChkIdx].RowsData = rowsData
return nil
}

// Close closes selectResult.
func (r *selectResult) Close() error {
// Close this channel tell fetch goroutine to exit.
if r.feedback.Actual() >= 0 {
metrics.DistSQLScanKeysHistogram.Observe(float64(r.feedback.Actual()))
}
metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount))
close(r.closed)
return r.resp.Close()
}
2 changes: 1 addition & 1 deletion distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *streamResult) readDataIfNecessary(ctx context.Context) error {
func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) {
remainRowsData := r.curr.RowsData
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().GetTimeZone())
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location())
for chk.NumRows() < maxChunkSize && len(remainRowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i])
Expand Down
6 changes: 3 additions & 3 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = e.ctx.Txn().StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.schema.Columns {
Expand Down Expand Up @@ -219,7 +219,7 @@ func (e *RecoverIndexExec) constructLimitPB(count uint64) *tipb.Executor {
func (e *RecoverIndexExec) buildDAGPB(txn kv.Transaction, limitCnt uint64) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.columns {
Expand Down Expand Up @@ -647,7 +647,7 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error {
func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.idxCols {
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
collectors[i].MergeSampleCollector(sc, statistics.SampleCollectorFromProto(rc))
}
}
timeZone := e.ctx.GetSessionVars().GetTimeZone()
timeZone := e.ctx.GetSessionVars().Location()
if e.pkInfo != nil {
pkHist.ID = e.pkInfo.ID
err = pkHist.DecodeTo(&e.pkInfo.FieldType, timeZone)
Expand Down
Loading