From d27bab6002110d2dbc5e7d60c298de05aaad9c21 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Mon, 16 Jul 2018 18:15:19 +0800 Subject: [PATCH] executor, store: fixed daylight saving time issue (#6823) --- ddl/column_test.go | 2 +- distsql/select_result.go | 199 +++++++++++ distsql/stream.go | 2 +- executor/admin.go | 6 +- executor/analyze.go | 2 +- executor/builder.go | 352 +++++++++++++++----- executor/distsql.go | 17 +- executor/executor_test.go | 1 + executor/prepared.go | 2 +- expression/builtin_time.go | 14 +- expression/builtin_time_test.go | 2 +- expression/helper.go | 2 +- expression/integration_test.go | 19 +- sessionctx/variable/session.go | 4 +- store/mockstore/mocktikv/cop_handler_dag.go | 58 +++- store/tikv/gcworker/gc_worker.go | 49 +-- store/tikv/gcworker/gc_worker_test.go | 18 +- table/tables/tables.go | 4 +- util/admin/admin.go | 6 +- 19 files changed, 617 insertions(+), 142 deletions(-) create mode 100644 distsql/select_result.go diff --git a/ddl/column_test.go b/ddl/column_test.go index 7098d49beb5ca..a2b0791de98ef 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -275,7 +275,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab } colMap := make(map[int64]*types.FieldType) colMap[col.ID] = &col.FieldType - rowMap, err := tablecodec.DecodeRow(data, colMap, ctx.GetSessionVars().GetTimeZone()) + rowMap, err := tablecodec.DecodeRow(data, colMap, ctx.GetSessionVars().Location()) if err != nil { return errors.Trace(err) } diff --git a/distsql/select_result.go b/distsql/select_result.go new file mode 100644 index 0000000000000..835104090594a --- /dev/null +++ b/distsql/select_result.go @@ -0,0 +1,199 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package distsql + +import ( + "time" + + "github.com/juju/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/terror" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/goroutine_pool" + "github.com/pingcap/tipb/go-tipb" + "golang.org/x/net/context" +) + +var ( + _ SelectResult = (*selectResult)(nil) + _ SelectResult = (*streamResult)(nil) +) + +var ( + selectResultGP = gp.New(time.Minute * 2) +) + +// SelectResult is an iterator of coprocessor partial results. +type SelectResult interface { + // Fetch fetches partial results from client. + Fetch(context.Context) + // NextRaw gets the next raw result. + NextRaw(context.Context) ([]byte, error) + // Next reads the data into chunk. + Next(context.Context, *chunk.Chunk) error + // Close closes the iterator. + Close() error +} + +type resultWithErr struct { + result kv.ResultSubset + err error +} + +type selectResult struct { + label string + resp kv.Response + + results chan resultWithErr + closed chan struct{} + + rowLen int + fieldTypes []*types.FieldType + ctx sessionctx.Context + + selectResp *tipb.SelectResponse + respChkIdx int + + feedback *statistics.QueryFeedback + partialCount int64 // number of partial results. +} + +func (r *selectResult) Fetch(ctx context.Context) { + selectResultGP.Go(func() { + r.fetch(ctx) + }) +} + +func (r *selectResult) fetch(ctx context.Context) { + startTime := time.Now() + defer func() { + close(r.results) + duration := time.Since(startTime) + metrics.DistSQLQueryHistgram.WithLabelValues(r.label).Observe(duration.Seconds()) + }() + for { + resultSubset, err := r.resp.Next(ctx) + if err != nil { + r.results <- resultWithErr{err: errors.Trace(err)} + return + } + if resultSubset == nil { + return + } + + select { + case r.results <- resultWithErr{result: resultSubset}: + case <-r.closed: + // If selectResult called Close() already, make fetch goroutine exit. + return + case <-ctx.Done(): + return + } + } +} + +// NextRaw returns the next raw partial result. +func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) { + re := <-r.results + r.partialCount++ + r.feedback.Invalidate() + if re.result == nil || re.err != nil { + return nil, errors.Trace(re.err) + } + return re.result.GetData(), nil +} + +// Next reads data to the chunk. +func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { + chk.Reset() + for chk.NumRows() < r.ctx.GetSessionVars().MaxChunkSize { + if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) { + err := r.getSelectResp() + if err != nil || r.selectResp == nil { + return errors.Trace(err) + } + } + err := r.readRowsData(chk) + if err != nil { + return errors.Trace(err) + } + if len(r.selectResp.Chunks[r.respChkIdx].RowsData) == 0 { + r.respChkIdx++ + } + } + return nil +} + +func (r *selectResult) getSelectResp() error { + r.respChkIdx = 0 + for { + re := <-r.results + if re.err != nil { + return errors.Trace(re.err) + } + if re.result == nil { + r.selectResp = nil + return nil + } + r.selectResp = new(tipb.SelectResponse) + err := r.selectResp.Unmarshal(re.result.GetData()) + if err != nil { + return errors.Trace(err) + } + if err := r.selectResp.Error; err != nil { + return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg) + } + for _, warning := range r.selectResp.Warnings { + r.ctx.GetSessionVars().StmtCtx.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg)) + } + r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts) + r.partialCount++ + if len(r.selectResp.Chunks) == 0 { + continue + } + return nil + } +} + +func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) { + rowsData := r.selectResp.Chunks[r.respChkIdx].RowsData + maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize + decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location()) + for chk.NumRows() < maxChunkSize && len(rowsData) > 0 { + for i := 0; i < r.rowLen; i++ { + rowsData, err = decoder.DecodeOne(rowsData, i, r.fieldTypes[i]) + if err != nil { + return errors.Trace(err) + } + } + } + r.selectResp.Chunks[r.respChkIdx].RowsData = rowsData + return nil +} + +// Close closes selectResult. +func (r *selectResult) Close() error { + // Close this channel tell fetch goroutine to exit. + if r.feedback.Actual() >= 0 { + metrics.DistSQLScanKeysHistogram.Observe(float64(r.feedback.Actual())) + } + metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount)) + close(r.closed) + return r.resp.Close() +} diff --git a/distsql/stream.go b/distsql/stream.go index b91066f36e305..d42b64c99bab4 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -113,7 +113,7 @@ func (r *streamResult) readDataIfNecessary(ctx context.Context) error { func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) { remainRowsData := r.curr.RowsData maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize - decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().GetTimeZone()) + decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location()) for chk.NumRows() < maxChunkSize && len(remainRowsData) > 0 { for i := 0; i < r.rowLen; i++ { remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i]) diff --git a/executor/admin.go b/executor/admin.go index 63e43a8cd8ec2..6fadef321aa20 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -121,7 +121,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error { func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} dagReq.StartTs = e.ctx.Txn().StartTS() - dagReq.TimeZoneOffset = timeZoneOffset(e.ctx) + dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx) sc := e.ctx.GetSessionVars().StmtCtx dagReq.Flags = statementContextToFlags(sc) for i := range e.schema.Columns { @@ -219,7 +219,7 @@ func (e *RecoverIndexExec) constructLimitPB(count uint64) *tipb.Executor { func (e *RecoverIndexExec) buildDAGPB(txn kv.Transaction, limitCnt uint64) (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} dagReq.StartTs = txn.StartTS() - dagReq.TimeZoneOffset = timeZoneOffset(e.ctx) + dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx) sc := e.ctx.GetSessionVars().StmtCtx dagReq.Flags = statementContextToFlags(sc) for i := range e.columns { @@ -647,7 +647,7 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error { func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} dagReq.StartTs = txn.StartTS() - dagReq.TimeZoneOffset = timeZoneOffset(e.ctx) + dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx) sc := e.ctx.GetSessionVars().StmtCtx dagReq.Flags = statementContextToFlags(sc) for i := range e.idxCols { diff --git a/executor/analyze.go b/executor/analyze.go index 31669b5e0a0ea..4a81130c82ab1 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -373,7 +373,7 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms [] collectors[i].MergeSampleCollector(sc, statistics.SampleCollectorFromProto(rc)) } } - timeZone := e.ctx.GetSessionVars().GetTimeZone() + timeZone := e.ctx.GetSessionVars().Location() if e.pkInfo != nil { pkHist.ID = e.pkInfo.ID err = pkHist.DecodeTo(&e.pkInfo.FieldType, timeZone) diff --git a/executor/builder.go b/executor/builder.go index bb19feeff16f0..332615e8d6002 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -15,6 +15,7 @@ package executor import ( "bytes" + "fmt" "math" "sort" "strings" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" @@ -40,27 +42,26 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/admin" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/ranger" - tipb "github.com/pingcap/tipb/go-tipb" + "github.com/pingcap/tipb/go-tipb" "golang.org/x/net/context" ) // executorBuilder builds an Executor from a Plan. // The InfoSchema must not change during execution. type executorBuilder struct { - ctx sessionctx.Context - is infoschema.InfoSchema - priority int - startTS uint64 // cached when the first time getStartTS() is called + ctx sessionctx.Context + is infoschema.InfoSchema + startTS uint64 // cached when the first time getStartTS() is called // err is set when there is error happened during Executor building process. err error } -func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, priority int) *executorBuilder { +func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder { return &executorBuilder{ - ctx: ctx, - is: is, - priority: priority, + ctx: ctx, + is: is, } } @@ -207,6 +208,8 @@ func (b *executorBuilder) buildShowDDL(v *plan.ShowDDL) Executor { func (b *executorBuilder) buildShowDDLJobs(v *plan.ShowDDLJobs) Executor { e := &ShowDDLJobsExec{ + jobNumber: v.JobNumber, + is: b.is, baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), } return e @@ -226,7 +229,7 @@ func (b *executorBuilder) buildCheckIndex(v *plan.CheckIndex) Executor { b.err = errors.Trace(err) return nil } - readerExec.ranges = ranger.FullNewRange() + readerExec.ranges = ranger.FullRange() readerExec.isCheckOp = true e := &CheckIndexExec{ @@ -509,24 +512,22 @@ func (b *executorBuilder) buildInsert(v *plan.Insert) Executor { ivs := &InsertValues{ baseExecutor: baseExec, + Table: v.Table, Columns: v.Columns, Lists: v.Lists, - Setlist: v.Setlist, + SetList: v.SetList, GenColumns: v.GenCols.Columns, GenExprs: v.GenCols.Exprs, needFillDefaultValues: v.NeedFillDefaultValue, SelectExec: selectExec, } - ivs.Table = v.Table if v.IsReplace { return b.buildReplace(ivs) } insert := &InsertExec{ InsertValues: ivs, OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...), - Priority: v.Priority, - IgnoreErr: v.IgnoreErr, } return insert } @@ -550,18 +551,18 @@ func (b *executorBuilder) buildLoadData(v *plan.LoadData) Executor { b.err = errors.Trace(err) return nil } - loadDataExec := &LoadData{ + loadDataExec := &LoadDataExec{ baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()), IsLocal: v.IsLocal, loadDataInfo: &LoadDataInfo{ - row: make([]types.Datum, len(columns)), - insertVal: insertVal, - Path: v.Path, - Table: tbl, - FieldsInfo: v.FieldsInfo, - LinesInfo: v.LinesInfo, - Ctx: b.ctx, - columns: columns, + row: make([]types.Datum, len(columns)), + InsertValues: insertVal, + Path: v.Path, + Table: tbl, + FieldsInfo: v.FieldsInfo, + LinesInfo: v.LinesInfo, + Ctx: b.ctx, + columns: columns, }, } @@ -823,21 +824,135 @@ func (b *executorBuilder) buildHashJoin(v *plan.PhysicalHashJoin) Executor { return e } +// buildProjBelowAgg builds a ProjectionExec below AggregationExec. +// If all the args of `aggFuncs`, and all the item of `groupByItems` +// are columns or constants, we do not need to build the `proj`. +func (b *executorBuilder) buildProjBelowAgg(aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression, src Executor) Executor { + hasScalarFunc := false + for i := 0; !hasScalarFunc && i < len(aggFuncs); i++ { + f := aggFuncs[i] + for _, arg := range f.Args { + _, isScalarFunc := arg.(*expression.ScalarFunction) + hasScalarFunc = hasScalarFunc || isScalarFunc + } + } + for i, isScalarFunc := 0, false; !hasScalarFunc && i < len(groupByItems); i++ { + _, isScalarFunc = groupByItems[i].(*expression.ScalarFunction) + hasScalarFunc = hasScalarFunc || isScalarFunc + } + if !hasScalarFunc { + return src + } + + b.ctx.GetSessionVars().PlanID++ + id := b.ctx.GetSessionVars().PlanID + projFromID := fmt.Sprintf("%s_%d", plan.TypeProj, id) + + projSchemaCols := make([]*expression.Column, 0, len(aggFuncs)+len(groupByItems)) + projExprs := make([]expression.Expression, 0, cap(projSchemaCols)) + cursor := 0 + for _, f := range aggFuncs { + for i, arg := range f.Args { + if _, isCnst := arg.(*expression.Constant); isCnst { + continue + } + projExprs = append(projExprs, arg) + newArg := &expression.Column{ + RetType: arg.GetType(), + ColName: model.NewCIStr(fmt.Sprintf("%s_%d", f.Name, i)), + Index: cursor, + } + projSchemaCols = append(projSchemaCols, newArg) + f.Args[i] = newArg + cursor++ + } + } + for i, item := range groupByItems { + if _, isCnst := item.(*expression.Constant); isCnst { + continue + } + projExprs = append(projExprs, item) + newArg := &expression.Column{ + RetType: item.GetType(), + ColName: model.NewCIStr(fmt.Sprintf("group_%d", i)), + Index: cursor, + } + projSchemaCols = append(projSchemaCols, newArg) + groupByItems[i] = newArg + cursor++ + } + + return &ProjectionExec{ + baseExecutor: newBaseExecutor(b.ctx, expression.NewSchema(projSchemaCols...), projFromID, src), + evaluatorSuit: expression.NewEvaluatorSuit(projExprs), + } +} + func (b *executorBuilder) buildHashAgg(v *plan.PhysicalHashAgg) Executor { src := b.build(v.Children()[0]) if b.err != nil { b.err = errors.Trace(b.err) return nil } + src = b.buildProjBelowAgg(v.AggFuncs, v.GroupByItems, src) + sessionVars := b.ctx.GetSessionVars() e := &HashAggExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), - sc: b.ctx.GetSessionVars().StmtCtx, + sc: sessionVars.StmtCtx, AggFuncs: make([]aggregation.Aggregation, 0, len(v.AggFuncs)), GroupByItems: v.GroupByItems, } + // We take `create table t(a int, b int);` as example. + // + // 1. If all the aggregation functions are FIRST_ROW, we do not need to set the defaultVal for them: + // e.g. + // mysql> select distinct a, b from t; + // 0 rows in set (0.00 sec) + // + // 2. If there exists group by items, we do not need to set the defaultVal for them either: + // e.g. + // mysql> select avg(a) from t group by b; + // Empty set (0.00 sec) + // + // mysql> select avg(a) from t group by a; + // +--------+ + // | avg(a) | + // +--------+ + // | NULL | + // +--------+ + // 1 row in set (0.00 sec) + if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { + e.defaultVal = nil + } else { + e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1) + } for _, aggDesc := range v.AggFuncs { - e.AggFuncs = append(e.AggFuncs, aggDesc.GetAggFunc()) + if aggDesc.HasDistinct { + e.isUnparallelExec = true + } } + // When we set both tidb_hashagg_final_concurrency and tidb_hashagg_partial_concurrency to 1, + // we do not need to parallelly execute hash agg, + // and this action can be a workaround when meeting some unexpected situation using parallelExec. + if finalCon, partialCon := sessionVars.HashAggFinalConcurrency, sessionVars.HashAggPartialConcurrency; finalCon <= 0 || partialCon <= 0 || finalCon == 1 && partialCon == 1 { + e.isUnparallelExec = true + } + for i, aggDesc := range v.AggFuncs { + if !e.isUnparallelExec { + if aggDesc.Mode == aggregation.CompleteMode { + aggDesc.Mode = aggregation.Partial1Mode + } else { + aggDesc.Mode = aggregation.Partial2Mode + } + } + aggFunc := aggDesc.GetAggFunc() + e.AggFuncs = append(e.AggFuncs, aggFunc) + if e.defaultVal != nil { + value := aggFunc.GetDefaultValue() + e.defaultVal.AppendDatum(i, &value) + } + } + metrics.ExecutorCounter.WithLabelValues("HashAggExec").Inc() return e } @@ -848,14 +963,38 @@ func (b *executorBuilder) buildStreamAgg(v *plan.PhysicalStreamAgg) Executor { b.err = errors.Trace(b.err) return nil } + src = b.buildProjBelowAgg(v.AggFuncs, v.GroupByItems, src) e := &StreamAggExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), StmtCtx: b.ctx.GetSessionVars().StmtCtx, AggFuncs: make([]aggregation.Aggregation, 0, len(v.AggFuncs)), GroupByItems: v.GroupByItems, } - for _, aggDesc := range v.AggFuncs { - e.AggFuncs = append(e.AggFuncs, aggDesc.GetAggFunc()) + if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { + e.defaultVal = nil + } else { + e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1) + } + newAggFuncs := make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)) + for i, aggDesc := range v.AggFuncs { + aggFunc := aggDesc.GetAggFunc() + e.AggFuncs = append(e.AggFuncs, aggFunc) + if e.defaultVal != nil { + value := aggFunc.GetDefaultValue() + e.defaultVal.AppendDatum(i, &value) + } + // For new aggregate evaluation framework. + newAggFunc := aggfuncs.Build(aggDesc, i) + if newAggFunc != nil { + newAggFuncs = append(newAggFuncs, newAggFunc) + } + } + + // Once we have successfully build all the aggregate functions to the new + // aggregate function execution framework, we can store them to the stream + // aggregate operator to indicate it using the new execution framework. + if len(newAggFuncs) == len(v.AggFuncs) { + e.newAggFuncs = newAggFuncs } metrics.ExecutorCounter.WithLabelValues("StreamAggExec").Inc() return e @@ -882,9 +1021,17 @@ func (b *executorBuilder) buildProjection(v *plan.PhysicalProjection) Executor { } e := &ProjectionExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), + numWorkers: b.ctx.GetSessionVars().ProjectionConcurrency, evaluatorSuit: expression.NewEvaluatorSuit(v.Exprs), calculateNoDelay: v.CalculateNoDelay, } + + // If the calculation row count for this Projection operator is smaller + // than a Chunk size, we turn back to the un-parallel Projection + // implementation to reduce the goroutine overhead. + if v.StatsInfo().Count() < int64(b.ctx.GetSessionVars().MaxChunkSize) { + e.numWorkers = 0 + } return e } @@ -923,7 +1070,6 @@ func (b *executorBuilder) buildMemTable(v *plan.PhysicalMemTable) Executor { t: tb, columns: v.Columns, seekHandle: math.MinInt64, - ranges: v.Ranges, isVirtualTable: tb.Type() == table.VirtualTable, } return e @@ -975,11 +1121,9 @@ func (b *executorBuilder) buildApply(apply *plan.PhysicalApply) *NestedLoopApply return nil } joinSchema := expression.MergeSchema(leftChild.Schema(), rightChild.Schema()) - for _, cond := range v.EqualConditions { - col0 := cond.GetArgs()[0].(*expression.Column) - col0.ResolveIndices(joinSchema) - col1 := cond.GetArgs()[1].(*expression.Column) - col1.ResolveIndices(joinSchema) + // TODO: remove this. Do this in Apply's ResolveIndices. + for i, cond := range v.EqualConditions { + v.EqualConditions[i] = cond.ResolveIndices(joinSchema).(*expression.ScalarFunction) } otherConditions := append(expression.ScalarFuncs2Exprs(v.EqualConditions), v.OtherConditions...) defaultValues := v.DefaultValues @@ -1062,7 +1206,6 @@ func (b *executorBuilder) buildUpdate(v *plan.Update) Executor { SelectExec: selExec, OrderedList: v.OrderedList, tblID2table: tblID2table, - IgnoreErr: v.IgnoreErr, } return updateExec } @@ -1088,29 +1231,27 @@ func (b *executorBuilder) buildDelete(v *plan.Delete) Executor { } func (b *executorBuilder) buildAnalyzeIndexPushdown(task plan.AnalyzeIndexTask) *AnalyzeIndexExec { + _, offset := zone(b.ctx) e := &AnalyzeIndexExec{ ctx: b.ctx, tblInfo: task.TableInfo, idxInfo: task.IndexInfo, concurrency: b.ctx.GetSessionVars().IndexSerialScanConcurrency, - priority: b.priority, analyzePB: &tipb.AnalyzeReq{ Tp: tipb.AnalyzeType_TypeIndex, StartTs: math.MaxUint64, Flags: statementContextToFlags(b.ctx.GetSessionVars().StmtCtx), - TimeZoneOffset: timeZoneOffset(b.ctx), + TimeZoneOffset: offset, }, } e.analyzePB.IdxReq = &tipb.AnalyzeIndexReq{ BucketSize: maxBucketSize, NumColumns: int32(len(task.IndexInfo.Columns)), } - if !task.IndexInfo.Unique { - depth := int32(defaultCMSketchDepth) - width := int32(defaultCMSketchWidth) - e.analyzePB.IdxReq.CmsketchDepth = &depth - e.analyzePB.IdxReq.CmsketchWidth = &width - } + depth := int32(defaultCMSketchDepth) + width := int32(defaultCMSketchWidth) + e.analyzePB.IdxReq.CmsketchDepth = &depth + e.analyzePB.IdxReq.CmsketchWidth = &width return e } @@ -1121,19 +1262,20 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa keepOrder = true cols = append([]*model.ColumnInfo{task.PKInfo}, cols...) } + + _, offset := zone(b.ctx) e := &AnalyzeColumnsExec{ ctx: b.ctx, tblInfo: task.TableInfo, colsInfo: task.ColsInfo, pkInfo: task.PKInfo, concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency, - priority: b.priority, keepOrder: keepOrder, analyzePB: &tipb.AnalyzeReq{ Tp: tipb.AnalyzeType_TypeColumn, StartTs: math.MaxUint64, Flags: statementContextToFlags(b.ctx.GetSessionVars().StmtCtx), - TimeZoneOffset: timeZoneOffset(b.ctx), + TimeZoneOffset: offset, }, } depth := int32(defaultCMSketchDepth) @@ -1142,7 +1284,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa BucketSize: maxBucketSize, SampleSize: maxRegionSampleSize, SketchSize: maxSketchSize, - ColumnsInfo: plan.ColumnsToProto(cols, task.TableInfo.PKIsHandle), + ColumnsInfo: model.ColumnsToProto(cols, task.TableInfo.PKIsHandle), CmsketchDepth: &depth, CmsketchWidth: &width, } @@ -1178,24 +1320,62 @@ func (b *executorBuilder) buildAnalyze(v *plan.Analyze) Executor { return e } -func (b *executorBuilder) constructDAGReq(plans []plan.PhysicalPlan) (*tipb.DAGRequest, bool, error) { - dagReq := &tipb.DAGRequest{} - dagReq.StartTs = b.getStartTS() - dagReq.TimeZoneOffset = timeZoneOffset(b.ctx) - sc := b.ctx.GetSessionVars().StmtCtx - dagReq.Flags = statementContextToFlags(sc) +func constructDistExec(sctx sessionctx.Context, plans []plan.PhysicalPlan) ([]*tipb.Executor, bool, error) { streaming := true + executors := make([]*tipb.Executor, 0, len(plans)) for _, p := range plans { - execPB, err := p.ToPB(b.ctx) + execPB, err := p.ToPB(sctx) if err != nil { return nil, false, errors.Trace(err) } if !plan.SupportStreaming(p) { streaming = false } - dagReq.Executors = append(dagReq.Executors, execPB) + executors = append(executors, execPB) + } + return executors, streaming, nil +} + +func (b *executorBuilder) constructDAGReq(plans []plan.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { + dagReq = &tipb.DAGRequest{} + dagReq.StartTs = b.getStartTS() + dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(b.ctx) + sc := b.ctx.GetSessionVars().StmtCtx + dagReq.Flags = statementContextToFlags(sc) + dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans) + return dagReq, streaming, errors.Trace(err) +} + +func (b *executorBuilder) corColInDistPlan(plans []plan.PhysicalPlan) bool { + for _, p := range plans { + x, ok := p.(*plan.PhysicalSelection) + if !ok { + continue + } + for _, cond := range x.Conditions { + if len(expression.ExtractCorColumns(cond)) > 0 { + return true + } + } } - return dagReq, streaming, nil + return false +} + +// corColInAccess checks whether there's correlated column in access conditions. +func (b *executorBuilder) corColInAccess(p plan.PhysicalPlan) bool { + var access []expression.Expression + switch x := p.(type) { + case *plan.PhysicalTableScan: + access = x.AccessCondition + case *plan.PhysicalIndexScan: + access = x.AccessCondition + } + for _, cond := range access { + if len(expression.ExtractCorColumns(cond)) > 0 { + return true + } + } + return false } func (b *executorBuilder) buildIndexLookUpJoin(v *plan.PhysicalIndexJoin) Executor { @@ -1284,15 +1464,20 @@ func buildNoRangeTableReader(b *executorBuilder, v *plan.PhysicalTableReader) (* ts := v.TablePlans[0].(*plan.PhysicalTableScan) table, _ := b.is.TableByID(ts.Table.ID) e := &TableReaderExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - dagPB: dagReq, - tableID: ts.Table.ID, - table: table, - keepOrder: ts.KeepOrder, - desc: ts.Desc, - columns: ts.Columns, - priority: b.priority, - streaming: streaming, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + dagPB: dagReq, + tableID: ts.Table.ID, + table: table, + keepOrder: ts.KeepOrder, + desc: ts.Desc, + columns: ts.Columns, + streaming: streaming, + corColInFilter: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.TablePlans[0]), + plans: v.TablePlans, + } + if isPartition, partitionID := ts.IsPartition(); isPartition { + e.tableID = partitionID } if containsLimit(dagReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc) @@ -1331,16 +1516,20 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plan.PhysicalIndexReader) (* is := v.IndexPlans[0].(*plan.PhysicalIndexScan) table, _ := b.is.TableByID(is.Table.ID) e := &IndexReaderExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - dagPB: dagReq, - tableID: is.Table.ID, - table: table, - index: is.Index, - keepOrder: is.KeepOrder, - desc: is.Desc, - columns: is.Columns, - priority: b.priority, - streaming: streaming, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + dagPB: dagReq, + tableID: is.Table.ID, + table: table, + index: is.Index, + keepOrder: is.KeepOrder, + desc: is.Desc, + columns: is.Columns, + streaming: streaming, + corColInFilter: b.corColInDistPlan(v.IndexPlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + plans: v.IndexPlans, } if containsLimit(dagReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc) @@ -1398,10 +1587,16 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plan.PhysicalIndexLook desc: is.Desc, tableRequest: tableReq, columns: is.Columns, - priority: b.priority, indexStreaming: indexStreaming, tableStreaming: tableStreaming, dataReaderBuilder: &dataReaderBuilder{executorBuilder: b}, + corColInIdxSide: b.corColInDistPlan(v.IndexPlans), + corColInTblSide: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + idxPlans: v.IndexPlans, + tblPlans: v.TablePlans, } if containsLimit(indexReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc) @@ -1448,7 +1643,7 @@ type dataReaderBuilder struct { } func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, datums [][]types.Datum, - IndexRanges []*ranger.NewRange, keyOff2IdxOff []int) (Executor, error) { + IndexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { switch v := builder.Plan.(type) { case *plan.PhysicalTableReader: return builder.buildTableReaderForIndexJoin(ctx, v, datums) @@ -1479,7 +1674,6 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex SetDAGRequest(e.dagPB). SetDesc(e.desc). SetKeepOrder(e.keepOrder). - SetPriority(e.priority). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). Build() @@ -1497,7 +1691,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex } func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plan.PhysicalIndexReader, - values [][]types.Datum, indexRanges []*ranger.NewRange, keyOff2IdxOff []int) (Executor, error) { + values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { e, err := buildNoRangeIndexReader(builder.executorBuilder, v) if err != nil { return nil, errors.Trace(err) @@ -1511,7 +1705,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plan.PhysicalIndexLookUpReader, - values [][]types.Datum, indexRanges []*ranger.NewRange, keyOff2IdxOff []int) (Executor, error) { + values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v) if err != nil { return nil, errors.Trace(err) @@ -1525,7 +1719,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } // buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. -func buildKvRangesForIndexJoin(sc *stmtctx.StatementContext, tableID, indexID int64, keyDatums [][]types.Datum, indexRanges []*ranger.NewRange, keyOff2IdxOff []int) ([]kv.KeyRange, error) { +func buildKvRangesForIndexJoin(sc *stmtctx.StatementContext, tableID, indexID int64, keyDatums [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) ([]kv.KeyRange, error) { kvRanges := make([]kv.KeyRange, 0, len(indexRanges)*len(keyDatums)) for _, val := range keyDatums { for _, ran := range indexRanges { diff --git a/executor/distsql.go b/executor/distsql.go index 465e6746226a3..e7381076727bf 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -117,11 +117,20 @@ func closeAll(objs ...Closeable) error { return errors.Trace(err) } -// timeZoneOffset returns the local time zone offset in seconds. -func timeZoneOffset(ctx sessionctx.Context) int64 { - loc := ctx.GetSessionVars().GetTimeZone() +// zone returns the current timezone name and timezone offset in seconds. +// In compatible with MySQL, we change `Local` to `System`. +// TODO: Golang team plan to return system timezone name intead of +// returning `Local` when `loc` is `time.Local`. We need keep an eye on this. +func zone(sctx sessionctx.Context) (string, int64) { + loc := sctx.GetSessionVars().Location() _, offset := time.Now().In(loc).Zone() - return int64(offset) + var name string + name = loc.String() + if name == "Local" { + name = "System" + } + + return name, int64(offset) } // Flags are used by tipb.SelectRequest.Flags to handle execution mode, like how to handle truncate error. diff --git a/executor/executor_test.go b/executor/executor_test.go index cab1c0f64909a..8b8f4f2373554 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1924,6 +1924,7 @@ func (s *testSuite) TestTimestampTimeZone(c *C) { r.Check(testkit.Rows("123381351 1734 2014-03-31 08:57:10 127.0.0.1")) // Cover IndexLookupExec // For issue https://github.com/pingcap/tidb/issues/3485 + tk.MustExec("set time_zone = 'Asia/Shanghai'") tk.MustExec("drop table if exists t1") tk.MustExec(`CREATE TABLE t1 ( id bigint(20) NOT NULL AUTO_INCREMENT, diff --git a/executor/prepared.go b/executor/prepared.go index ae92d12a25c57..45e83a1be209e 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -269,7 +269,7 @@ func CompileExecutePreparedStmt(ctx sessionctx.Context, ID uint32, args ...inter func ResetStmtCtx(ctx sessionctx.Context, s ast.StmtNode) { sessVars := ctx.GetSessionVars() sc := new(stmtctx.StatementContext) - sc.TimeZone = sessVars.GetTimeZone() + sc.TimeZone = sessVars.Location() sc.MemTracker = memory.NewTracker(s.Text(), sessVars.MemQuotaQuery) switch config.GetGlobalConfig().OOMAction { case config.OOMActionCancel: diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 289c4a01f804d..773e2d831e963 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -1889,8 +1889,8 @@ func (b *builtinSysDateWithFspSig) evalTime(row types.Row) (d types.Time, isNull return types.Time{}, isNull, errors.Trace(err) } - tz := b.ctx.GetSessionVars().GetTimeZone() - now := time.Now().In(tz) + loc := b.ctx.GetSessionVars().Location() + now := time.Now().In(loc) result, err := convertTimeToMysqlTime(now, int(fsp)) if err != nil { return types.Time{}, true, errors.Trace(err) @@ -1911,7 +1911,7 @@ func (b *builtinSysDateWithoutFspSig) Clone() builtinFunc { // evalTime evals SYSDATE(). // See https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_sysdate func (b *builtinSysDateWithoutFspSig) evalTime(row types.Row) (d types.Time, isNull bool, err error) { - tz := b.ctx.GetSessionVars().GetTimeZone() + tz := b.ctx.GetSessionVars().Location() now := time.Now().In(tz) result, err := convertTimeToMysqlTime(now, 0) if err != nil { @@ -1947,7 +1947,7 @@ func (b *builtinCurrentDateSig) Clone() builtinFunc { // evalTime evals CURDATE(). // See https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_curdate func (b *builtinCurrentDateSig) evalTime(row types.Row) (d types.Time, isNull bool, err error) { - tz := b.ctx.GetSessionVars().GetTimeZone() + tz := b.ctx.GetSessionVars().Location() year, month, day := time.Now().In(tz).Date() result := types.Time{ Time: types.FromDate(year, int(month), day, 0, 0, 0, 0), @@ -2002,7 +2002,7 @@ func (b *builtinCurrentTime0ArgSig) Clone() builtinFunc { } func (b *builtinCurrentTime0ArgSig) evalDuration(row types.Row) (types.Duration, bool, error) { - tz := b.ctx.GetSessionVars().GetTimeZone() + tz := b.ctx.GetSessionVars().Location() dur := time.Now().In(tz).Format(types.TimeFormat) res, err := types.ParseDuration(dur, types.MinFsp) if err != nil { @@ -2026,7 +2026,7 @@ func (b *builtinCurrentTime1ArgSig) evalDuration(row types.Row) (types.Duration, if err != nil { return types.Duration{}, true, errors.Trace(err) } - tz := b.ctx.GetSessionVars().GetTimeZone() + tz := b.ctx.GetSessionVars().Location() dur := time.Now().In(tz).Format(types.TimeFSPFormat) res, err := types.ParseDuration(dur, int(fsp)) if err != nil { @@ -2310,7 +2310,7 @@ func evalNowWithFsp(ctx sessionctx.Context, fsp int) (types.Time, bool, error) { return types.Time{}, true, errors.Trace(err) } - err = result.ConvertTimeZone(time.Local, ctx.GetSessionVars().GetTimeZone()) + err = result.ConvertTimeZone(time.Local, ctx.GetSessionVars().Location()) if err != nil { return types.Time{}, true, errors.Trace(err) } diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index b74f41ed6874e..381a69aff0721 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2212,7 +2212,7 @@ func (s *testEvaluatorSuite) TestLastDay(c *C) { func (s *testEvaluatorSuite) TestWithTimeZone(c *C) { sv := s.ctx.GetSessionVars() - originTZ := sv.GetTimeZone() + originTZ := sv.Location() sv.TimeZone, _ = time.LoadLocation("Asia/Tokyo") defer func() { sv.TimeZone = originTZ diff --git a/expression/helper.go b/expression/helper.go index 9210cf47dbafe..a7bc83fcb7770 100644 --- a/expression/helper.go +++ b/expression/helper.go @@ -59,7 +59,7 @@ func GetTimeValue(ctx sessionctx.Context, v interface{}, tp byte, fsp int) (d ty if upperX == strings.ToUpper(ast.CurrentTimestamp) { value.Time = types.FromGoTime(defaultTime) if tp == mysql.TypeTimestamp { - err = value.ConvertTimeZone(time.Local, ctx.GetSessionVars().GetTimeZone()) + err = value.ConvertTimeZone(time.Local, ctx.GetSessionVars().Location()) if err != nil { return d, errors.Trace(err) } diff --git a/expression/integration_test.go b/expression/integration_test.go index b84f37c572701..3a9ebbbdbaa0a 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2666,7 +2666,7 @@ func (s *testIntegrationSuite) TestCompareBuiltin(c *C) { tk.MustExec(`insert into t2 values(1, 1.1, "2017-08-01 12:01:01", "12:01:01", "abcdef", 0b10101)`) result = tk.MustQuery("select coalesce(NULL, a), coalesce(NULL, b, a), coalesce(c, NULL, a, b), coalesce(d, NULL), coalesce(d, c), coalesce(NULL, NULL, e, 1), coalesce(f), coalesce(1, a, b, c, d, e, f) from t2") - result.Check(testkit.Rows(fmt.Sprintf("1 1.1 2017-08-01 12:01:01 12:01:01 %s 12:01:01 abcdef 21 1", time.Now().In(tk.Se.GetSessionVars().GetTimeZone()).Format("2006-01-02")))) + result.Check(testkit.Rows(fmt.Sprintf("1 1.1 2017-08-01 12:01:01 12:01:01 %s 12:01:01 abcdef 21 1", time.Now().In(tk.Se.GetSessionVars().Location()).Format("2006-01-02")))) // nullif result = tk.MustQuery(`SELECT NULLIF(NULL, 1), NULLIF(1, NULL), NULLIF(1, 1), NULLIF(NULL, NULL);`) @@ -2728,9 +2728,24 @@ func (s *testIntegrationSuite) TestCompareBuiltin(c *C) { result = tk.MustQuery(`select least(cast("2017-01-01" as datetime), "123", "234", cast("2018-01-01" as date)), least(cast("2017-01-01" as date), "123", null)`) result.Check(testkit.Rows("123 ")) tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1292|invalid time format: '123'", "Warning|1292|invalid time format: '234'", "Warning|1292|invalid time format: '123'")) - tk.MustQuery(`select 1 < 17666000000000000000, 1 > 17666000000000000000, 1 = 17666000000000000000`).Check(testkit.Rows("1 0 0")) + tk.MustExec("drop table if exists t") + // insert value at utc timezone + tk.MustExec("set time_zone = '+00:00'") + tk.MustExec("create table t(a timestamp)") + tk.MustExec("insert into t value('1991-05-06 04:59:28')") + // check daylight saving time in Asia/Shanghai + tk.MustExec("set time_zone='Asia/Shanghai'") + tk.MustQuery("select * from t").Check(testkit.Rows("1991-05-06 13:59:28")) + // insert an nonexistent time + tk.MustExec("set time_zone = 'America/Los_Angeles'") + _, err := tk.Exec("insert into t value('2011-03-13 02:00:00')") + c.Assert(err, NotNil) + // reset timezone to a +8 offset + tk.MustExec("set time_zone = '+08:00'") + tk.MustQuery("select * from t").Check(testkit.Rows("1991-05-06 12:59:28")) + tk.MustExec("drop table if exists t") tk.MustExec("create table t(a bigint unsigned)") tk.MustExec("insert into t value(17666000000000000000)") diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 79017225193f7..1c195e68d10eb 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -442,8 +442,8 @@ func (s *SessionVars) GetNextPreparedStmtID() uint32 { return s.preparedStmtID } -// GetTimeZone returns the value of time_zone session variable. -func (s *SessionVars) GetTimeZone() *time.Location { +// Location returns the value of time_zone session variable. If it is nil, then return time.Local. +func (s *SessionVars) Location() *time.Location { loc := s.TimeZone if loc == nil { loc = time.Local diff --git a/store/mockstore/mocktikv/cop_handler_dag.go b/store/mockstore/mocktikv/cop_handler_dag.go index 8c0d8fa6661b6..2b7e7df79172f 100644 --- a/store/mockstore/mocktikv/cop_handler_dag.go +++ b/store/mockstore/mocktikv/cop_handler_dag.go @@ -15,7 +15,9 @@ package mocktikv import ( "bytes" + "fmt" "io" + "sync" "time" "github.com/golang/protobuf/proto" @@ -43,6 +45,50 @@ import ( var dummySlice = make([]byte, 0) +// locCache is a simple map with lock. It stores all used timezone during the lifetime of tidb instance. +// Talked with Golang team about whether they can have some forms of cache policy available for programmer, +// they suggests that only programmers knows which one is best for their use case. +// For detail, please refer to: https://github.com/golang/go/issues/26106 +type locCache struct { + sync.RWMutex + // locMap stores locations used in past and can be retrieved by a timezone's name. + locMap map[string]*time.Location +} + +// init initializes `locCache`. +func init() { + LocCache = &locCache{} + LocCache.locMap = make(map[string]*time.Location) +} + +// LocCache is a simple cache policy to improve the performance of 'time.LoadLocation'. +var LocCache *locCache + +// getLoc first trying to load location from a cache map. If nothing found in such map, then call +// `time.LocadLocation` to get a timezone location. After trying both way, an error wil be returned +// if valid Location is not found. +func (lm *locCache) getLoc(name string) (*time.Location, error) { + if name == "System" { + name = "Local" + } + lm.RLock() + if v, ok := lm.locMap[name]; ok { + lm.RUnlock() + return v, nil + } + + if loc, err := time.LoadLocation(name); err == nil { + lm.RUnlock() + lm.Lock() + lm.locMap[name] = loc + lm.Unlock() + return loc, nil + } + + lm.RUnlock() + return nil, errors.New(fmt.Sprintf("invalid name for timezone %s", name)) +} + type dagContext struct { dagReq *tipb.DAGRequest keyRanges []*coprocessor.KeyRange @@ -100,7 +146,17 @@ func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, ex return nil, nil, nil, errors.Trace(err) } sc := flagsToStatementContext(dagReq.Flags) - sc.TimeZone = time.FixedZone("UTC", int(dagReq.TimeZoneOffset)) + + // retrieving timezone by name first. When name is set, it means we need + // consider daylight saving time. If it is not, we can use offset. + if dagReq.TimeZoneName != "" { + if sc.TimeZone, err = LocCache.getLoc(dagReq.TimeZoneName); err != nil { + return nil, nil, nil, errors.Trace(err) + } + dagReq.TimeZoneName = sc.TimeZone.String() + } else { + sc.TimeZone = time.FixedZone("UTC", int(dagReq.TimeZoneOffset)) + } ctx := &dagContext{ dagReq: dagReq, keyRanges: req.Ranges, diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 62b5fa60dcefe..0614aa24b3cfb 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -240,7 +240,7 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { return nil } -// prepare checks required conditions for starting a GC job. It returns a bool +// prepare checks preconditions for starting a GC job. It returns a bool // that indicates whether the GC job should start and the new safePoint. func (w *GCWorker) prepare() (bool, uint64, error) { now, err := w.getOracleTime() @@ -255,11 +255,11 @@ func (w *GCWorker) prepare() (bool, uint64, error) { if err != nil || newSafePoint == nil { return false, 0, errors.Trace(err) } - err = w.saveTime(gcLastRunTimeKey, now, w.session) + err = w.saveTime(gcLastRunTimeKey, now) if err != nil { return false, 0, errors.Trace(err) } - err = w.saveTime(gcSafePointKey, *newSafePoint, w.session) + err = w.saveTime(gcSafePointKey, *newSafePoint) if err != nil { return false, 0, errors.Trace(err) } @@ -282,7 +282,7 @@ func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { return false, errors.Trace(err) } gcConfigGauge.WithLabelValues(gcRunIntervalKey).Set(runInterval.Seconds()) - lastRun, err := w.loadTime(gcLastRunTimeKey, w.session) + lastRun, err := w.loadTime(gcLastRunTimeKey) if err != nil { return false, errors.Trace(err) } @@ -300,7 +300,7 @@ func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) { return nil, errors.Trace(err) } gcConfigGauge.WithLabelValues(gcLifeTimeKey).Set(lifeTime.Seconds()) - lastSafePoint, err := w.loadTime(gcSafePointKey, w.session) + lastSafePoint, err := w.loadTime(gcSafePointKey) if err != nil { return nil, errors.Trace(err) } @@ -421,12 +421,12 @@ func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error { } func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) { - str, err := w.loadValueFromSysTable(gcConcurrencyKey, w.session) + str, err := w.loadValueFromSysTable(gcConcurrencyKey) if err != nil { return gcDefaultConcurrency, errors.Trace(err) } if str == "" { - err = w.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency), w.session) + err = w.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency)) if err != nil { return gcDefaultConcurrency, errors.Trace(err) } @@ -752,7 +752,8 @@ func (w *GCWorker) checkLeader() (bool, error) { if err != nil { return false, errors.Trace(err) } - leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey, se) + w.session = se + leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey) if err != nil { _, err1 := se.Execute(ctx, "ROLLBACK") terror.Log(errors.Trace(err1)) @@ -760,7 +761,7 @@ func (w *GCWorker) checkLeader() (bool, error) { } log.Debugf("[gc worker] got leader: %s", leader) if leader == w.uuid { - err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease), se) + err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease)) if err != nil { _, err1 := se.Execute(ctx, "ROLLBACK") terror.Log(errors.Trace(err1)) @@ -780,7 +781,7 @@ func (w *GCWorker) checkLeader() (bool, error) { if err != nil { return false, errors.Trace(err) } - lease, err := w.loadTime(gcLeaderLeaseKey, se) + lease, err := w.loadTime(gcLeaderLeaseKey) if err != nil { return false, errors.Trace(err) } @@ -788,19 +789,19 @@ func (w *GCWorker) checkLeader() (bool, error) { log.Debugf("[gc worker] register %s as leader", w.uuid) gcWorkerCounter.WithLabelValues("register_leader").Inc() - err = w.saveValueToSysTable(gcLeaderUUIDKey, w.uuid, se) + err = w.saveValueToSysTable(gcLeaderUUIDKey, w.uuid) if err != nil { _, err1 := se.Execute(ctx, "ROLLBACK") terror.Log(errors.Trace(err1)) return false, errors.Trace(err) } - err = w.saveValueToSysTable(gcLeaderDescKey, w.desc, se) + err = w.saveValueToSysTable(gcLeaderDescKey, w.desc) if err != nil { _, err1 := se.Execute(ctx, "ROLLBACK") terror.Log(errors.Trace(err1)) return false, errors.Trace(err) } - err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease), se) + err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease)) if err != nil { _, err1 := se.Execute(ctx, "ROLLBACK") terror.Log(errors.Trace(err1)) @@ -827,13 +828,13 @@ func (w *GCWorker) saveSafePoint(kv tikv.SafePointKV, key string, t uint64) erro return nil } -func (w *GCWorker) saveTime(key string, t time.Time, s session.Session) error { - err := w.saveValueToSysTable(key, t.Format(gcTimeFormat), s) +func (w *GCWorker) saveTime(key string, t time.Time) error { + err := w.saveValueToSysTable(key, t.Format(gcTimeFormat)) return errors.Trace(err) } -func (w *GCWorker) loadTime(key string, s session.Session) (*time.Time, error) { - str, err := w.loadValueFromSysTable(key, s) +func (w *GCWorker) loadTime(key string) (*time.Time, error) { + str, err := w.loadValueFromSysTable(key) if err != nil { return nil, errors.Trace(err) } @@ -848,12 +849,12 @@ func (w *GCWorker) loadTime(key string, s session.Session) (*time.Time, error) { } func (w *GCWorker) saveDuration(key string, d time.Duration) error { - err := w.saveValueToSysTable(key, d.String(), w.session) + err := w.saveValueToSysTable(key, d.String()) return errors.Trace(err) } func (w *GCWorker) loadDuration(key string) (*time.Duration, error) { - str, err := w.loadValueFromSysTable(key, w.session) + str, err := w.loadValueFromSysTable(key) if err != nil { return nil, errors.Trace(err) } @@ -882,10 +883,10 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time return d, nil } -func (w *GCWorker) loadValueFromSysTable(key string, s session.Session) (string, error) { +func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { ctx := context.Background() stmt := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key) - rs, err := s.Execute(ctx, stmt) + rs, err := w.session.Execute(ctx, stmt) if len(rs) > 0 { defer terror.Call(rs[0].Close) } @@ -906,15 +907,15 @@ func (w *GCWorker) loadValueFromSysTable(key string, s session.Session) (string, return value, nil } -func (w *GCWorker) saveValueToSysTable(key, value string, s session.Session) error { +func (w *GCWorker) saveValueToSysTable(key, value string) error { stmt := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') ON DUPLICATE KEY UPDATE variable_value = '%[2]s', comment = '%[3]s'`, key, value, gcVariableComments[key]) - if s == nil { + if w.session == nil { return errors.New("[saveValueToSysTable session is nil]") } - _, err := s.Execute(context.Background(), stmt) + _, err := w.session.Execute(context.Background(), stmt) log.Debugf("[gc worker] save kv, %s:%s %v", key, value, err) return errors.Trace(err) } diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 2197fb5d77300..f2d7da9b70600 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -86,10 +86,10 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { close(s.gcWorker.done) ok, _, err := s.gcWorker.prepare() c.Assert(err, IsNil) - lastRun, err := s.gcWorker.loadTime(gcLastRunTimeKey, s.gcWorker.session) + lastRun, err := s.gcWorker.loadTime(gcLastRunTimeKey) c.Assert(err, IsNil) c.Assert(lastRun, NotNil) - safePoint, err := s.gcWorker.loadTime(gcSafePointKey, s.gcWorker.session) + safePoint, err := s.gcWorker.loadTime(gcSafePointKey) c.Assert(err, IsNil) s.timeEqual(c, safePoint.Add(gcDefaultLifeTime), now, 2*time.Second) @@ -118,7 +118,7 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsTrue) - safePoint, err = s.gcWorker.loadTime(gcSafePointKey, s.gcWorker.session) + safePoint, err = s.gcWorker.loadTime(gcSafePointKey) c.Assert(err, IsNil) s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second) @@ -127,19 +127,19 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { c.Assert(err, IsNil) c.Assert(concurrency, Equals, gcDefaultConcurrency) - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMinConcurrency), s.gcWorker.session) + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMinConcurrency)) c.Assert(err, IsNil) concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() c.Assert(err, IsNil) c.Assert(concurrency, Equals, gcMinConcurrency) - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(-1), s.gcWorker.session) + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(-1)) c.Assert(err, IsNil) concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() c.Assert(err, IsNil) c.Assert(concurrency, Equals, gcMinConcurrency) - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(1000000), s.gcWorker.session) + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(1000000)) c.Assert(err, IsNil) concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() c.Assert(err, IsNil) @@ -185,17 +185,17 @@ func (s *testGCWorkerSuite) TestDoGC(c *C) { gcSafePointCacheInterval = 1 - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency), s.gcWorker.session) + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency)) c.Assert(err, IsNil) err = s.gcWorker.doGC(ctx, 20) c.Assert(err, IsNil) - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMinConcurrency), s.gcWorker.session) + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMinConcurrency)) c.Assert(err, IsNil) err = s.gcWorker.doGC(ctx, 20) c.Assert(err, IsNil) - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMaxConcurrency), s.gcWorker.session) + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMaxConcurrency)) c.Assert(err, IsNil) err = s.gcWorker.doGC(ctx, 20) c.Assert(err, IsNil) diff --git a/table/tables/tables.go b/table/tables/tables.go index f486977e446c8..046911e5cb9d4 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -545,7 +545,7 @@ func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h int64, co } colTps[col.ID] = &col.FieldType } - rowMap, err := tablecodec.DecodeRow(value, colTps, ctx.GetSessionVars().GetTimeZone()) + rowMap, err := tablecodec.DecodeRow(value, colTps, ctx.GetSessionVars().Location()) if err != nil { return nil, rowMap, errors.Trace(err) } @@ -727,7 +727,7 @@ func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*tab if err != nil { return errors.Trace(err) } - rowMap, err := tablecodec.DecodeRow(it.Value(), colMap, ctx.GetSessionVars().GetTimeZone()) + rowMap, err := tablecodec.DecodeRow(it.Value(), colMap, ctx.GetSessionVars().Location()) if err != nil { return errors.Trace(err) } diff --git a/util/admin/admin.go b/util/admin/admin.go index 761dbe5a3bddd..9f859737030ed 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -292,7 +292,7 @@ func checkIndexAndRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table return errors.Trace(err) } - vals1, err = tablecodec.UnflattenDatums(vals1, fieldTypes, sessCtx.GetSessionVars().GetTimeZone()) + vals1, err = tablecodec.UnflattenDatums(vals1, fieldTypes, sessCtx.GetSessionVars().Location()) if err != nil { return errors.Trace(err) } @@ -492,7 +492,7 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h } colTps[col.ID] = &col.FieldType } - row, err := tablecodec.DecodeRow(value, colTps, sessCtx.GetSessionVars().GetTimeZone()) + row, err := tablecodec.DecodeRow(value, colTps, sessCtx.GetSessionVars().Location()) if err != nil { return nil, errors.Trace(err) } @@ -553,7 +553,7 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab return errors.Trace(err) } - rowMap, err := tablecodec.DecodeRow(it.Value(), colMap, sessCtx.GetSessionVars().GetTimeZone()) + rowMap, err := tablecodec.DecodeRow(it.Value(), colMap, sessCtx.GetSessionVars().Location()) if err != nil { return errors.Trace(err) }