Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/rowcodec,tablecodec: remove stmtctx dependency from rowcodec and tablecodec #48816

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,8 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
}
checksums := w.calcChecksums()
sctx, rd := w.sessCtx.GetSessionVars().StmtCtx, &w.sessCtx.GetSessionVars().RowEncoder
newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd, checksums...)
newRowVal, err := tablecodec.EncodeRow(sctx.TimeZone(), newRow, newColumnIDs, nil, nil, rd, checksums...)
err = sctx.HandleError(err)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/errctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ go_test(
"//pkg/types",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_uber_go_multierr//:multierr",
],
)
20 changes: 20 additions & 0 deletions pkg/errctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,27 @@ func (ctx *Context) appendWarning(err error) {
}

// HandleError handles the error according to the context. See the comment of `HandleErrorWithAlias` for detailed logic.
//
// It also allows using `errors.ErrorGroup`, in this case, it'll handle each error in order, and return the first error
// it founds.
func (ctx *Context) HandleError(err error) error {
// The function of handling `errors.ErrorGroup` is placed in `HandleError` but not in `HandleErrorWithAlias`, because
// it's hard to give a proper error and warn alias for an error group.
if errs, ok := err.(errors.ErrorGroup); ok {
for _, singleErr := range errs.Errors() {
singleErr = ctx.HandleError(singleErr)
// If the one error is found, just return it.
// TODO: consider whether it's more appropriate to continue to handle other errors. For example, other errors
// may need to append warnings. The current behavior is same with TiDB original behavior before using
// `errctx` to handle multiple errors.
if singleErr != nil {
return singleErr
}
}

return nil
}

return ctx.HandleErrorWithAlias(err, err, err)
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/errctx/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
)

func TestContext(t *testing.T) {
Expand Down Expand Up @@ -50,4 +51,13 @@ func TestContext(t *testing.T) {
require.Equal(t, warn, testWarn)
// newCtx2 will return all errors
require.Equal(t, newCtx2.HandleErrorWithAlias(testInternalErr, testErr, testWarn), testErr)

// test `multierr`
testErrs := multierr.Append(testInternalErr, testErr)
require.Equal(t, ctx.HandleError(testErrs), testInternalErr)
require.Equal(t, newCtx.HandleError(testErrs), testErr)
require.Equal(t, warn, testInternalErr)

// test nil
require.Nil(t, ctx.HandleError(nil))
}
7 changes: 5 additions & 2 deletions pkg/executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, kvRanges []kv.K
if err != nil {
return nil, err
}
return tablecodec.EncodeValue(us.Ctx().GetSessionVars().StmtCtx, nil, d)
sctx := us.Ctx().GetSessionVars().StmtCtx
buf, err := tablecodec.EncodeValue(sctx.TimeZone(), nil, d)
return buf, sctx.HandleError(err)
}
cd := NewRowDecoder(us.Ctx(), us.Schema(), us.table.Meta())
rd := rowcodec.NewByteDecoder(colInfo, pkColIDs, defVal, us.Ctx().GetSessionVars().Location())
Expand Down Expand Up @@ -1164,7 +1166,8 @@ func getColIDAndPkColIDs(ctx sessionctx.Context, tbl table.Table, columns []*mod
if err != nil {
return nil, err
}
return tablecodec.EncodeValue(ctx.GetSessionVars().StmtCtx, nil, d)
buf, err := tablecodec.EncodeValue(ctx.GetSessionVars().StmtCtx.TimeZone(), nil, d)
return buf, ctx.GetSessionVars().StmtCtx.HandleError(err)
}
rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, defVal, ctx.GetSessionVars().Location())
return colIDs, pkColIDs, rd
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func setColValue(t *testing.T, txn kv.Transaction, key kv.Key, v types.Datum) {
colIDs := []int64{2, 3}
sc := stmtctx.NewStmtCtxWithTimeZone(time.Local)
rd := rowcodec.Encoder{Enable: true}
value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, &rd)
value, err := tablecodec.EncodeRow(sc.TimeZone(), row, colIDs, nil, nil, &rd)
require.NoError(t, err)
err = txn.Set(key, value)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,15 @@ func addUnchangedKeysForLockByRow(
return count, err
}
unchangedUniqueKey, _, err := tablecodec.GenIndexKey(
stmtCtx,
stmtCtx.TimeZone(),
idx.TableMeta(),
meta,
physicalID,
ukVals,
h,
nil,
)
err = stmtCtx.HandleError(err)
if err != nil {
return count, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/handler/tests/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func TestDecodeColumnValue(t *testing.T) {
}
rd := rowcodec.Encoder{Enable: true}
sc := stmtctx.NewStmtCtxWithTimeZone(time.UTC)
bs, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, &rd)
bs, err := tablecodec.EncodeRow(sc.TimeZone(), row, colIDs, nil, nil, &rd)
require.NoError(t, err)
require.NotNil(t, bs)
bin := base64.StdEncoding.EncodeToString(bs)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ func (sc *StatementContext) HandleTruncate(err error) error {

// HandleError handles the error based on `ErrCtx()`
func (sc *StatementContext) HandleError(err error) error {
intest.AssertNotNil(sc)
if sc == nil {
return err
}
errCtx := sc.ErrCtx()
return errCtx.HandleError(err)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"slices"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -259,10 +260,15 @@ func (c *CMSketch) SubValue(h1, h2 uint64, count uint64) {
// QueryValue is used to query the count of specified value.
func QueryValue(sctx sessionctx.Context, c *CMSketch, t *TopN, val types.Datum) (uint64, error) {
var sc *stmtctx.StatementContext
tz := time.UTC
if sctx != nil {
sc = sctx.GetSessionVars().StmtCtx
tz = sc.TimeZone()
}
rawData, err := tablecodec.EncodeValue(tz, nil, val)
if sc != nil {
err = sc.HandleError(err)
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}
rawData, err := tablecodec.EncodeValue(sc, nil, val)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/row_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) {
return nil, err
}
decodedVal.SetBytesAsString(s.Collators[i].Key(decodedVal.GetString()), decodedVal.Collation(), uint32(decodedVal.Length()))
encodedKey, err := tablecodec.EncodeValue(s.Sc, nil, decodedVal)
encodedKey, err := tablecodec.EncodeValue(s.Sc.TimeZone(), nil, decodedVal)
err = s.Sc.HandleError(err)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func (s SampleBuilder) CollectColumnStats() ([]*SampleCollector, *SortedBuilder,
return nil, nil, err
}
decodedVal.SetBytesAsString(s.Collators[i].Key(decodedVal.GetString()), decodedVal.Collation(), uint32(decodedVal.Length()))
encodedKey, err := tablecodec.EncodeValue(s.Sc, nil, decodedVal)
encodedKey, err := tablecodec.EncodeValue(s.Sc.TimeZone(), nil, decodedVal)
err = s.Sc.HandleError(err)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -306,7 +307,8 @@ func (c *SampleCollector) ExtractTopN(numTop uint32, sc *stmtctx.StatementContex
if err != nil {
return err
}
data, err := tablecodec.EncodeValue(sc, nil, d)
data, err := tablecodec.EncodeValue(sc.TimeZone(), nil, d)
err = sc.HandleError(err)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/mockstore/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestClusterSplit(t *testing.T) {
colValue := types.NewStringDatum(strconv.Itoa(int(handle)))
// TODO: Should use session's TimeZone instead of UTC.
rd := rowcodec.Encoder{Enable: true}
rowValue, err1 := tablecodec.EncodeRow(sc, []types.Datum{colValue}, []int64{colID}, nil, nil, &rd)
rowValue, err1 := tablecodec.EncodeRow(sc.TimeZone(), []types.Datum{colValue}, []int64{colID}, nil, nil, &rd)
require.NoError(t, err1)
txn.Set(rowKey, rowValue)

Expand Down
3 changes: 2 additions & 1 deletion pkg/store/mockstore/unistore/cophandler/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ func (e *analyzeColumnsExec) Process(key, value []byte) error {
continue
}

value, err := tablecodec.EncodeValue(e.evalCtx.sc, nil, d)
value, err := tablecodec.EncodeValue(e.evalCtx.sc.TimeZone(), nil, d)
err = e.evalCtx.sc.HandleError(err)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func prepareTestTableData(keyNumber int, tableID int64) (*data, error) {
for i := 0; i < keyNumber; i++ {
datum := types.MakeDatums(i, "abc", 10.0)
rows[int64(i)] = datum
rowEncodedData, err := tablecodec.EncodeRow(stmtCtx, datum, colIds, nil, nil, encoder)
rowEncodedData, err := tablecodec.EncodeRow(stmtCtx.TimeZone(), datum, colIds, nil, nil, encoder)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/store/mockstore/unistore/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/sessionctx/stmtctx",
"//pkg/store/mockstore/unistore/client",
"//pkg/store/mockstore/unistore/config",
"//pkg/store/mockstore/unistore/cophandler",
Expand Down
3 changes: 1 addition & 2 deletions pkg/store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/config"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/lockstore"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/pd"
Expand Down Expand Up @@ -1001,7 +1000,7 @@ func encodeFromOldRow(oldRow, buf []byte) ([]byte, error) {
}
var encoder rowcodec.Encoder
buf = buf[:0]
return encoder.Encode(stmtctx.NewStmtCtx(), colIDs, datums, buf)
return encoder.Encode(time.UTC, colIDs, datums, buf)
}

func (store *MVCCStore) buildPrewriteLock(reqCtx *requestCtx, m *kvrpcpb.Mutation, item *badger.Item,
Expand Down
11 changes: 8 additions & 3 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (c *index) GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.
if c.idxInfo.Global {
idxTblID = c.tblInfo.ID
}
return tablecodec.GenIndexKey(sc, c.tblInfo, c.idxInfo, idxTblID, indexedValues, h, buf)
key, distinct, err = tablecodec.GenIndexKey(sc.TimeZone(), c.tblInfo, c.idxInfo, idxTblID, indexedValues, h, buf)
err = sc.HandleError(err)
return
}

// GenIndexValue generates the index value.
Expand All @@ -102,7 +104,9 @@ func (c *index) GenIndexValue(sc *stmtctx.StatementContext, distinct bool, index
c.initNeedRestoreData.Do(func() {
c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns)
})
return tablecodec.GenIndexValuePortal(sc, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, false, indexedValues, h, c.phyTblID, restoredData, buf)
idx, err := tablecodec.GenIndexValuePortal(sc.TimeZone(), c.tblInfo, c.idxInfo, c.needRestoredData, distinct, false, indexedValues, h, c.phyTblID, restoredData, buf)
err = sc.HandleError(err)
return idx, err
}

// getIndexedValue will produce the result like:
Expand Down Expand Up @@ -233,8 +237,9 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
c.initNeedRestoreData.Do(func() {
c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns)
})
idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo,
idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx.TimeZone(), c.tblInfo, c.idxInfo,
c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData, nil)
err = sctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/table/tables/mutation_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestCheckRowInsertionConsistency(t *testing.T) {
// mocked data
mockRowKey233 := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(233))
mockValue233, err := tablecodec.EncodeRow(
sessVars.StmtCtx, []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd,
sessVars.StmtCtx.TimeZone(), []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd,
)
require.Nil(t, err)
fakeRowInsertion := mutation{key: []byte{1, 1}, value: []byte{1, 1, 1}}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) {
// test checkHandleConsistency
rowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, handle)
corruptedRowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, corruptedHandle)
rowValue, err := tablecodec.EncodeRow(sessVars.StmtCtx, rowToInsert, []int64{1, 2}, nil, nil, &rd)
rowValue, err := tablecodec.EncodeRow(sessVars.StmtCtx.TimeZone(), rowToInsert, []int64{1, 2}, nil, nil, &rd)
require.Nil(t, err)
rowMutation := mutation{key: rowKey, value: rowValue}
corruptedRowMutation := mutation{key: corruptedRowKey, value: rowValue}
Expand All @@ -327,14 +327,14 @@ func buildIndexKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *
return nil, nil, err
}
key, distinct, err := tablecodec.GenIndexKey(
sessVars.StmtCtx, &tableInfo, indexInfo, 1, indexedValues, handle, nil,
sessVars.StmtCtx.TimeZone(), &tableInfo, indexInfo, 1, indexedValues, handle, nil,
)
if err != nil {
return nil, nil, err
}
rsData := TryGetHandleRestoredDataWrapper(table.meta, rowToInsert, nil, indexInfo)
value, err := tablecodec.GenIndexValuePortal(
sessVars.StmtCtx, &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns),
sessVars.StmtCtx.TimeZone(), &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns),
distinct, false, indexedValues, handle, 0, rsData, nil,
)
if err != nil {
Expand Down
21 changes: 14 additions & 7 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
key := t.RecordKey(h)
sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder
checksums, writeBufs.RowValBuf = t.calcChecksums(sctx, h, checksumData, writeBufs.RowValBuf)
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc.TimeZone(), row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
err = sc.HandleError(err)
if err != nil {
return err
}
Expand Down Expand Up @@ -988,7 +989,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
zap.Stringer("key", key))
sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder
checksums, writeBufs.RowValBuf = t.calcChecksums(sctx, recordID, checksumData, writeBufs.RowValBuf)
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc.TimeZone(), row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
err = sc.HandleError(err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1395,7 +1397,8 @@ func (t *TableCommon) addInsertBinlog(ctx sessionctx.Context, h kv.Handle, row [
if err != nil {
return err
}
value, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil)
value, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), row, colIDs, nil, nil)
err = ctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return err
}
Expand All @@ -1406,11 +1409,13 @@ func (t *TableCommon) addInsertBinlog(ctx sessionctx.Context, h kv.Handle, row [
}

func (t *TableCommon) addUpdateBinlog(ctx sessionctx.Context, oldRow, newRow []types.Datum, colIDs []int64) error {
old, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, oldRow, colIDs, nil, nil)
old, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), oldRow, colIDs, nil, nil)
err = ctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return err
}
newVal, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, newRow, colIDs, nil, nil)
newVal, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), newRow, colIDs, nil, nil)
err = ctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return err
}
Expand All @@ -1422,7 +1427,8 @@ func (t *TableCommon) addUpdateBinlog(ctx sessionctx.Context, oldRow, newRow []t
}

func (t *TableCommon) addDeleteBinlog(ctx sessionctx.Context, r []types.Datum, colIDs []int64) error {
data, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, r, colIDs, nil, nil)
data, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), r, colIDs, nil, nil)
err = ctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return err
}
Expand Down Expand Up @@ -2316,7 +2322,8 @@ func SetPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnIn
return err
}

pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(sessVars.StmtCtx, nil, d)
pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(sessVars.StmtCtx.TimeZone(), nil, d)
err = sessVars.StmtCtx.HandleError(err)
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/tablecodec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/tablecodec",
visibility = ["//visibility:public"],
deps = [
"//pkg/errctx",
"//pkg/errno",
"//pkg/kv",
"//pkg/parser/charset",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/sessionctx/stmtctx",
"//pkg/structure",
"//pkg/types",
"//pkg/util/codec",
Expand Down
Loading