Skip to content

Commit

Permalink
*: fix time zone problems encountered when changing the other type co…
Browse files Browse the repository at this point in the history
…lumns to timestamp type columns (#31843)

close #29585
  • Loading branch information
zimulala authored Feb 21, 2022
1 parent 63e6f31 commit 1624123
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 66 deletions.
20 changes: 18 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/timeutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -536,6 +537,19 @@ func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]de
return decodeColMap, nil
}

func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
// It is set to SystemLocation to be compatible with nil LocationInfo.
*sctx.GetSessionVars().TimeZone = *timeutil.SystemLocation()
if info.ReorgMeta.Location != nil {
loc, err := info.ReorgMeta.Location.GetLocation()
if err != nil {
return errors.Trace(err)
}
*sctx.GetSessionVars().TimeZone = *loc
}
return nil
}

// writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
//
Expand Down Expand Up @@ -606,8 +620,10 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
// Simulate the sql mode environment in the worker sessionCtx.
sqlMode := reorgInfo.ReorgMeta.SQLMode
sessCtx.GetSessionVars().SQLMode = sqlMode
// TODO: skip set the timezone, it will cause data inconsistency when add index, since some reorg place using the timeUtil.SystemLocation() to do the time conversion. (need a more systemic plan)
// sessCtx.GetSessionVars().TimeZone = reorgInfo.ReorgMeta.Location
if err := setSessCtxLocation(sessCtx, reorgInfo); err != nil {
return errors.Trace(err)
}

sessCtx.GetSessionVars().StmtCtx.BadNullAsWarning = !sqlMode.HasStrictMode()
sessCtx.GetSessionVars().StmtCtx.TruncateAsWarning = !sqlMode.HasStrictMode()
sessCtx.GetSessionVars().StmtCtx.OverflowAsWarning = !sqlMode.HasStrictMode()
Expand Down
6 changes: 3 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1323,7 +1322,8 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
}

func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error {
_, err := w.rowDecoder.DecodeTheExistedColumnMap(w.sessCtx, handle, rawRow, time.UTC, w.rowMap)
sysTZ := w.sessCtx.GetSessionVars().StmtCtx.TimeZone
_, err := w.rowDecoder.DecodeTheExistedColumnMap(w.sessCtx, handle, rawRow, sysTZ, w.rowMap)
if err != nil {
return errors.Trace(errCantDecodeRecord.GenWithStackByArgs("column", err))
}
Expand Down Expand Up @@ -1369,7 +1369,7 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
})

w.rowMap[w.newColInfo.ID] = newColVal
_, err = w.rowDecoder.EvalRemainedExprColumnMap(w.sessCtx, timeutil.SystemLocation(), w.rowMap)
_, err = w.rowDecoder.EvalRemainedExprColumnMap(w.sessCtx, w.rowMap)
if err != nil {
return errors.Trace(err)
}
Expand Down
89 changes: 88 additions & 1 deletion ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromIntegerToOthers(c *C
modifiedColumn = getModifyColumn(c, tk.Se, "test", "t", "e", false)
c.Assert(modifiedColumn, NotNil)
c.Assert(modifiedColumn.Tp, Equals, parser_mysql.TypeTimestamp)
tk.MustQuery("select e from t").Check(testkit.Rows("2001-11-11 00:00:00")) // the given number will be left-forward used.
tk.MustQuery("select e from t").Check(testkit.Rows("2001-11-10 16:00:00")) // the given number will be left-forward used.

// integer to datetime
tk.MustExec("alter table t modify f datetime")
Expand Down Expand Up @@ -2371,3 +2371,90 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeBetweenFloatAndDouble(c
tk.MustExec("alter table t modify a float(6,1)")
tk.MustQuery("select a from t;").Check(testkit.Rows("36.4", "24.1"))
}

func (s *testColumnTypeChangeSuite) TestColumnTypeChangeTimestampToInt(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

// 1. modify a timestamp column to bigint
// 2. modify the bigint column to timestamp
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key auto_increment, c1 timestamp default '2020-07-10 01:05:08');")
tk.MustExec("insert into t values();")
tk.MustQuery("select * from t").Check(testkit.Rows("1 2020-07-10 01:05:08"))
tk.MustExec("alter table t modify column c1 bigint;")
tk.MustQuery("select * from t").Check(testkit.Rows("1 20200710010508"))
tk.MustExec("alter table t modify c1 timestamp")
tk.MustExec("set @@session.time_zone=UTC")
tk.MustQuery("select * from t").Check(testkit.Rows("1 2020-07-09 17:05:08"))

// 1. modify a timestamp column to bigint
// 2. add the index
// 3. modify the bigint column to timestamp
// The current session.time_zone is '+00:00'.
tk.MustExec(`set time_zone = '+00:00'`)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key auto_increment, c1 timestamp default '2020-07-10 01:05:08', index idx(c1));")
tk.MustExec("insert into t values();")
tk.MustQuery("select * from t").Check(testkit.Rows("1 2020-07-10 01:05:08"))
tk.MustExec("alter table t modify column c1 bigint;")
tk.MustExec("alter table t add index idx1(id, c1);")
tk.MustQuery("select * from t").Check(testkit.Rows("1 20200710010508"))
tk.MustExec("admin check table t")
// change timezone
tk.MustExec("set @@session.time_zone='+5:00'")
tk.MustExec("alter table t modify c1 timestamp")
// change timezone
tk.MustQuery("select * from t").Check(testkit.Rows("1 2020-07-10 01:05:08"))
tk.MustExec("set @@session.time_zone='-8:00'")
tk.MustQuery("select * from t").Check(testkit.Rows("1 2020-07-09 12:05:08"))
tk.MustExec("admin check table t")
// test the timezone of "default" and "system"
// The current session.time_zone is '-8:00'.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key auto_increment, c1 timestamp default '2020-07-10 01:05:08', index idx(c1));")
tk.MustExec("insert into t values();")
tk.MustQuery("select * from t").Check(testkit.Rows("1 2020-07-10 01:05:08"))
tk.MustExec("alter table t modify column c1 bigint;")
tk.MustExec("alter table t add index idx1(id, c1);")
tk.MustQuery("select * from t").Check(testkit.Rows("1 20200710010508"))
tk.MustExec("admin check table t")
// change timezone
tk.MustExec("set @@session.time_zone= default")
tk.MustExec("alter table t modify c1 timestamp")
// change timezone
tk.MustQuery("select * from t").Check(testkit.Rows("1 2020-07-10 01:05:08"))
tk.MustExec("set @@session.time_zone='SYSTEM'")
tk.MustQuery("select * from t").Check(testkit.Rows("1 2020-07-10 01:05:08"))
tk.MustExec("admin check table t")

// tests DST
// 1. modify a timestamp column to bigint
// 2. modify the bigint column to timestamp
tk.MustExec("drop table if exists t")
tk.MustExec("set @@session.time_zone=UTC")
tk.MustExec("create table t(id int primary key auto_increment, c1 timestamp default '1990-04-15 18:00:00');")
tk.MustExec("insert into t values();")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1990-04-15 18:00:00"))
tk.MustExec("set @@session.time_zone='Asia/Shanghai'")
tk.MustExec("alter table t modify column c1 bigint;")
tk.MustQuery("select * from t").Check(testkit.Rows("1 19900416030000"))
tk.MustExec("alter table t modify c1 timestamp default '1990-04-15 18:00:00'")
tk.MustExec("set @@session.time_zone=UTC")
tk.MustExec("insert into t values();")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1990-04-15 18:00:00", "2 1990-04-15 09:00:00"))
// 1. modify a timestamp column to bigint
// 2. add the index
// 3. modify the bigint column to timestamp
// The current session.time_zone is '+00:00'.
tk.MustExec("drop table if exists t")
tk.MustExec("set @@session.time_zone='-8:00'")
tk.MustExec("create table t(id int primary key auto_increment, c1 timestamp default '2016-03-13 02:30:00', index idx(c1));")
tk.MustExec("insert into t values();")
tk.MustQuery("select * from t").Check(testkit.Rows("1 2016-03-13 02:30:00"))
tk.MustExec("set @@session.time_zone='America/Los_Angeles'")
tk.MustExec("alter table t modify column c1 bigint;")
tk.MustQuery("select * from t").Check(testkit.Rows("1 20160313033000"))
tk.MustExec("alter table t add index idx1(id, c1);")
tk.MustExec("admin check table t")
}
14 changes: 14 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,20 @@ func (s *testStateChangeSuite) TestWriteOnlyOnDupUpdateForAddColumns(c *C) {
s.runTestInSchemaState(c, model.StateWriteOnly, true, addColumnsSQL, sqls, expectQuery)
}

func (s *testStateChangeSuite) TestWriteReorgForModifyColumnTimestampToInt(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test_db_state")
tk.MustExec("drop table if exists tt")
tk.MustExec("create table tt(id int primary key auto_increment, c1 timestamp default '2020-07-10 01:05:08');")
tk.MustExec("insert into tt values();")

sqls := make([]sqlWithErr, 1)
sqls[0] = sqlWithErr{"insert into tt values();", nil}
modifyColumnSQL := "alter table tt modify column c1 bigint;"
expectQuery := &expectQuery{"select c1 from tt", []string{"20200710010508", "20200710010508"}}
s.runTestInSchemaState(c, model.StateWriteReorganization, true, modifyColumnSQL, sqls, expectQuery)
}

type idxType byte

const (
Expand Down
41 changes: 41 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
testddlutil "github.com/pingcap/tidb/ddl/testutil"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/executor"
Expand Down Expand Up @@ -7771,3 +7772,43 @@ func (s *testSerialDBSuite) TestAddGeneratedColumnAndInsert(c *C) {
tk.MustQuery("select * from t1 order by a").Check(testkit.Rows("4 5", "10 11"))
c.Assert(checkErr, IsNil)
}

func (s *testDBSuite1) TestGetTimeZone(c *C) {
tk := testkit.NewTestKit(c, s.store)

testCases := []struct {
tzSQL string
tzStr string
tzName string
offset int
err string
}{
{"set time_zone = '+00:00'", "", "UTC", 0, ""},
{"set time_zone = '-00:00'", "", "UTC", 0, ""},
{"set time_zone = 'UTC'", "UTC", "UTC", 0, ""},
{"set time_zone = '+05:00'", "", "UTC", 18000, ""},
{"set time_zone = '-08:00'", "", "UTC", -28800, ""},
{"set time_zone = '+08:00'", "", "UTC", 28800, ""},
{"set time_zone = 'Asia/Shanghai'", "Asia/Shanghai", "Asia/Shanghai", 0, ""},
{"set time_zone = 'SYSTEM'", "Asia/Shanghai", "Asia/Shanghai", 0, ""},
{"set time_zone = DEFAULT", "Asia/Shanghai", "Asia/Shanghai", 0, ""},
{"set time_zone = 'GMT'", "GMT", "GMT", 0, ""},
{"set time_zone = 'GMT+1'", "GMT", "GMT", 0, "[variable:1298]Unknown or incorrect time zone: 'GMT+1'"},
{"set time_zone = 'Etc/GMT+12'", "Etc/GMT+12", "Etc/GMT+12", 0, ""},
{"set time_zone = 'Etc/GMT-12'", "Etc/GMT-12", "Etc/GMT-12", 0, ""},
{"set time_zone = 'EST'", "EST", "EST", 0, ""},
{"set time_zone = 'Australia/Lord_Howe'", "Australia/Lord_Howe", "Australia/Lord_Howe", 0, ""},
}
for _, tc := range testCases {
err := tk.ExecToErr(tc.tzSQL)
if err != nil {
c.Assert(err.Error(), Equals, tc.err)
} else {
c.Assert(tc.err, Equals, "")
}
c.Assert(tk.Se.GetSessionVars().TimeZone.String(), Equals, tc.tzStr, Commentf("sql: %s", tc.tzSQL))
tz, offset := ddlutil.GetTimeZone(tk.Se)
c.Assert(tc.tzName, Equals, tz, Commentf("sql: %s, offset: %d", tc.tzSQL, offset))
c.Assert(tc.offset, Equals, offset, Commentf("sql: %s", tc.tzSQL))
}
}
12 changes: 12 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/label"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -4379,6 +4380,7 @@ func (d *ddl) getModifiableColumnJob(ctx context.Context, sctx sessionctx.Contex
return nil, errors.Trace(err)
}

tzName, tzOffset := ddlutil.GetTimeZone(sctx)
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
Expand All @@ -4389,6 +4391,7 @@ func (d *ddl) getModifiableColumnJob(ctx context.Context, sctx sessionctx.Contex
SQLMode: sctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
CtxVars: []interface{}{needChangeColData},
Args: []interface{}{&newCol, originalColName, spec.Position, modifyColumnTp, newAutoRandBits},
Expand Down Expand Up @@ -4617,6 +4620,8 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al
}
}

tzName, tzOffset := ddlutil.GetTimeZone(ctx)

newCol := oldCol.Clone()
newCol.Name = newColName
job := &model.Job{
Expand All @@ -4629,6 +4634,7 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Args: []interface{}{&newCol, oldColName, spec.Position, 0},
}
Expand Down Expand Up @@ -5429,6 +5435,8 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
return errors.Trace(err)
}

tzName, tzOffset := ddlutil.GetTimeZone(ctx)

unique := true
sqlMode := ctx.GetSessionVars().SQLMode
job := &model.Job{
Expand All @@ -5441,6 +5449,7 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, sqlMode, nil, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Expand Down Expand Up @@ -5618,6 +5627,8 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
if _, err = validateCommentLength(ctx.GetSessionVars(), indexName.String(), indexOption); err != nil {
return errors.Trace(err)
}

tzName, tzOffset := ddlutil.GetTimeZone(ctx)
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
Expand All @@ -5628,6 +5639,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Expand Down
20 changes: 4 additions & 16 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/timeutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -1060,7 +1059,6 @@ var mockNotOwnerErrOnce uint32
// getIndexRecord gets index columns values use w.rowDecoder, and generate indexRecord.
func (w *baseIndexWorker) getIndexRecord(idxInfo *model.IndexInfo, handle kv.Handle, recordKey []byte) (*indexRecord, error) {
cols := w.table.WritableCols()
sysZone := timeutil.SystemLocation()
failpoint.Inject("MockGetIndexRecordErr", func(val failpoint.Value) {
if valStr, ok := val.(string); ok {
switch valStr {
Expand Down Expand Up @@ -1094,16 +1092,6 @@ func (w *baseIndexWorker) getIndexRecord(idxInfo *model.IndexInfo, handle kv.Han
return nil, errors.Trace(err)
}

if idxColumnVal.Kind() == types.KindMysqlTime {
t := idxColumnVal.GetMysqlTime()
if t.Type() == mysql.TypeTimestamp && sysZone != time.UTC {
err := t.ConvertTimeZone(sysZone, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
idxColumnVal.SetMysqlTime(t)
}
}
idxVal[j] = idxColumnVal
}

Expand All @@ -1129,9 +1117,9 @@ func (w *baseIndexWorker) getNextKey(taskRange reorgBackfillTask, taskDone bool)
return taskRange.endKey.Next()
}

func (w *baseIndexWorker) updateRowDecoder(handle kv.Handle, recordKey []byte, rawRecord []byte) error {
sysZone := timeutil.SystemLocation()
_, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, handle, rawRecord, time.UTC, sysZone, w.rowMap)
func (w *baseIndexWorker) updateRowDecoder(handle kv.Handle, rawRecord []byte) error {
sysZone := w.sessCtx.GetSessionVars().StmtCtx.TimeZone
_, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, handle, rawRecord, sysZone, w.rowMap)
if err != nil {
return errors.Trace(errCantDecodeRecord.GenWithStackByArgs("index", err))
}
Expand Down Expand Up @@ -1165,7 +1153,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac
}

// Decode one row, generate records of this row.
err := w.updateRowDecoder(handle, recordKey, rawRow)
err := w.updateRowDecoder(handle, rawRow)
if err != nil {
return false, err
}
Expand Down
6 changes: 5 additions & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func newContext(store kv.Storage) sessionctx.Context {
c := mock.NewContext()
c.Store = store
c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false)
c.GetSessionVars().StmtCtx.TimeZone = time.UTC

tz := *time.UTC
c.GetSessionVars().TimeZone = &tz
c.GetSessionVars().StmtCtx.TimeZone = &tz
return c
}

Expand Down Expand Up @@ -199,6 +202,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.
SQLMode: mysql.ModeNone,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: time.UTC.String(), Offset: 0},
}
}
if w.reorgCtx.doneCh == nil {
Expand Down
Loading

0 comments on commit 1624123

Please sign in to comment.