From 462c9dc5ca8e7d9fc1fc970f8a3a09051ee75968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Mon, 16 Aug 2021 22:19:59 +0800 Subject: [PATCH] *: Add `temp_table_size` support for local temporary table (#27205) --- executor/batch_point_get.go | 6 +-- executor/ddl.go | 10 ++-- kv/kv.go | 3 ++ session/session.go | 4 +- session/session_test.go | 17 +++++++ sessionctx/variable/session.go | 71 +++++++++++++++++++++++++-- store/driver/txn/unionstore_driver.go | 4 ++ table/tables/tables.go | 25 +++++++--- 8 files changed, 120 insertions(+), 20 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index ce29c4c49aacc..0b6c0073c946c 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -173,17 +173,17 @@ func (e *BatchPointGetExec) Open(context.Context) error { // temporaryTableSnapshot inherits kv.Snapshot and override the BatchGet methods to return empty. type temporaryTableSnapshot struct { kv.Snapshot - memBuffer kv.MemBuffer + sessionData variable.TemporaryTableData } func (s temporaryTableSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) { values := make(map[string][]byte) - if s.memBuffer == nil { + if s.sessionData == nil { return values, nil } for _, key := range keys { - val, err := s.memBuffer.Get(ctx, key) + val, err := s.sessionData.Get(ctx, key) if err == kv.ErrNotExist { continue } diff --git a/executor/ddl.go b/executor/ddl.go index dd0ca83fbcc1e..e5172fedd2fdb 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -74,15 +74,15 @@ func (e *DDLExec) toErr(err error) error { } // deleteTemporaryTableRecords delete temporary table data. -func deleteTemporaryTableRecords(memData kv.MemBuffer, tblID int64) error { - if memData == nil { +func deleteTemporaryTableRecords(sessionData variable.TemporaryTableData, tblID int64) error { + if sessionData == nil { return kv.ErrNotExist } tblPrefix := tablecodec.EncodeTablePrefix(tblID) endKey := tablecodec.EncodeTablePrefix(tblID + 1) - iter, err := memData.Iter(tblPrefix, endKey) + iter, err := sessionData.Iter(tblPrefix, endKey) if err != nil { return err } @@ -92,7 +92,7 @@ func deleteTemporaryTableRecords(memData kv.MemBuffer, tblID int64) error { break } - err = memData.Delete(key) + err = sessionData.DeleteTableKey(tblID, key) if err != nil { return err } @@ -365,7 +365,7 @@ func (e *DDLExec) createSessionTemporaryTable(s *ast.CreateTableStmt) error { return err } - sessVars.TemporaryTableData = bufferTxn.GetMemBuffer() + sessVars.TemporaryTableData = variable.NewTemporaryTableData(bufferTxn.GetMemBuffer()) } err = localTempTables.AddTable(dbInfo.Name, tbl) diff --git a/kv/kv.go b/kv/kv.go index 11fe882f38c7a..292d2ed0e069a 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -133,6 +133,9 @@ type MemBuffer interface { // Len returns the number of entries in the DB. Len() int + + // Size returns sum of keys and values length. + Size() int } // LockCtx contains information for LockKeys method. diff --git a/session/session.go b/session/session.go index c5542e781ac1b..9e1a6bb58768e 100644 --- a/session/session.go +++ b/session/session.go @@ -631,9 +631,9 @@ func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transac value := iter.Value() if len(value) == 0 { - err = sessionData.Delete(key) + err = sessionData.DeleteTableKey(tblID, key) } else { - err = sessionData.Set(key, iter.Value()) + err = sessionData.SetTableKey(tblID, key, iter.Value()) } if err != nil { diff --git a/session/session_test.go b/session/session_test.go index 6b87fb554342e..574722fab4dfd 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4749,6 +4749,8 @@ func (s *testSessionSuite) TestTMPTableSize(c *C) { tk.MustExec("use test") tk.MustExec("set tidb_enable_global_temporary_table=on") tk.MustExec("create global temporary table t (c1 int, c2 varchar(512)) on commit delete rows") + tk.MustExec("set tidb_enable_noop_functions=on") + tk.MustExec("create temporary table tl (c1 int, c2 varchar(512))") tk.MustQuery("select @@global.tmp_table_size").Check(testkit.Rows(strconv.Itoa(variable.DefTMPTableSize))) c.Assert(tk.Se.GetSessionVars().TMPTableSize, Equals, int64(variable.DefTMPTableSize)) @@ -4771,6 +4773,21 @@ func (s *testSessionSuite) TestTMPTableSize(c *C) { tk.MustExec("insert into t values (1, repeat('x', 512))") tk.MustExec("insert into t values (1, repeat('x', 512))") tk.MustGetErrCode("insert into t values (1, repeat('x', 512))", errno.ErrRecordFileFull) + tk.MustExec("rollback") + + // Check local temporary table + tk.MustExec("begin") + tk.MustExec("insert into tl values (1, repeat('x', 512))") + tk.MustExec("insert into tl values (1, repeat('x', 512))") + tk.MustGetErrCode("insert into tl values (1, repeat('x', 512))", errno.ErrRecordFileFull) + tk.MustExec("rollback") + + // Check local temporary table with some data in session + tk.MustExec("insert into tl values (1, repeat('x', 512))") + tk.MustExec("begin") + tk.MustExec("insert into tl values (1, repeat('x', 512))") + tk.MustGetErrCode("insert into tl values (1, repeat('x', 512))", errno.ErrRecordFileFull) + tk.MustExec("rollback") } func (s *testSessionSuite) TestTiDBEnableGlobalTemporaryTable(c *C) { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index d7f97e657318c..81ef4cbf1f67c 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -357,6 +357,69 @@ func (r *RewritePhaseInfo) Reset() { r.PreprocessSubQueries = 0 } +// TemporaryTableData is a interface to maintain temporary data in session +type TemporaryTableData interface { + kv.Retriever + // Staging create a new staging buffer inside the MemBuffer. + // Subsequent writes will be temporarily stored in this new staging buffer. + // When you think all modifications looks good, you can call `Release` to public all of them to the upper level buffer. + Staging() kv.StagingHandle + // Release publish all modifications in the latest staging buffer to upper level. + Release(kv.StagingHandle) + // Cleanup cleanups the resources referenced by the StagingHandle. + // If the changes are not published by `Release`, they will be discarded. + Cleanup(kv.StagingHandle) + // GetTableSize get the size of a table + GetTableSize(tblID int64) int64 + // DeleteTableKey removes the entry for key k from table + DeleteTableKey(tblID int64, k kv.Key) error + // SetTableKey sets the entry for k from table + SetTableKey(tblID int64, k kv.Key, val []byte) error +} + +// temporaryTableData is used for store temporary table data in session +type temporaryTableData struct { + kv.MemBuffer + tblSize map[int64]int64 +} + +// NewTemporaryTableData creates a new TemporaryTableData +func NewTemporaryTableData(memBuffer kv.MemBuffer) TemporaryTableData { + return &temporaryTableData{ + MemBuffer: memBuffer, + tblSize: make(map[int64]int64), + } +} + +// GetTableSize get the size of a table +func (d *temporaryTableData) GetTableSize(tblID int64) int64 { + if tblSize, ok := d.tblSize[tblID]; ok { + return tblSize + } + return 0 +} + +// DeleteTableKey removes the entry for key k from table +func (d *temporaryTableData) DeleteTableKey(tblID int64, k kv.Key) error { + bufferSize := d.MemBuffer.Size() + defer d.updateTblSize(tblID, bufferSize) + + return d.MemBuffer.Delete(k) +} + +// SetTableKey sets the entry for k from table +func (d *temporaryTableData) SetTableKey(tblID int64, k kv.Key, val []byte) error { + bufferSize := d.MemBuffer.Size() + defer d.updateTblSize(tblID, bufferSize) + + return d.MemBuffer.Set(k, val) +} + +func (d *temporaryTableData) updateTblSize(tblID int64, beforeSize int) { + delta := int64(d.MemBuffer.Size() - beforeSize) + d.tblSize[tblID] = d.GetTableSize(tblID) + delta +} + const ( // oneShotDef means default, that is tx_isolation_one_shot not set. oneShotDef txnIsolationLevelOneShotState = iota @@ -876,7 +939,7 @@ type SessionVars struct { LocalTemporaryTables interface{} // TemporaryTableData stores committed kv values for temporary table for current session. - TemporaryTableData kv.MemBuffer + TemporaryTableData TemporaryTableData // MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed. MPPStoreLastFailTime map[string]time.Time @@ -2251,16 +2314,16 @@ func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 { // TemporaryTableSnapshotReader can read the temporary table snapshot data type TemporaryTableSnapshotReader struct { - memBuffer kv.MemBuffer + temporaryTableData TemporaryTableData } // Get gets the value for key k from snapshot. func (s *TemporaryTableSnapshotReader) Get(ctx context.Context, k kv.Key) ([]byte, error) { - if s.memBuffer == nil { + if s.temporaryTableData == nil { return nil, kv.ErrNotExist } - v, err := s.memBuffer.Get(ctx, k) + v, err := s.temporaryTableData.Get(ctx, k) if err != nil { return v, err } diff --git a/store/driver/txn/unionstore_driver.go b/store/driver/txn/unionstore_driver.go index e074ff9ca09b9..a60f5ddcf74c5 100644 --- a/store/driver/txn/unionstore_driver.go +++ b/store/driver/txn/unionstore_driver.go @@ -35,6 +35,10 @@ func newMemBuffer(m *tikv.MemDB) kv.MemBuffer { return &memBuffer{MemDB: m} } +func (m *memBuffer) Size() int { + return m.MemDB.Size() +} + func (m *memBuffer) Delete(k kv.Key) error { return m.MemDB.Delete(k) } diff --git a/table/tables/tables.go b/table/tables/tables.go index 0a957a659b27a..da32fcfbcf06c 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -331,8 +331,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if m := t.Meta(); m.TempTableType != model.TempTableNone { if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil { - if tmpTable.GetSize() > sctx.GetSessionVars().TMPTableSize { - return table.ErrTempTableFull.GenWithStackByArgs(m.Name.O) + if err := checkTempTableSize(sctx, tmpTable, m); err != nil { + return err } defer handleTempTableSize(tmpTable, txn.Size(), txn) } @@ -617,6 +617,19 @@ func handleTempTableSize(t tableutil.TempTable, txnSizeBefore int, txn kv.Transa t.SetSize(newSize) } +func checkTempTableSize(ctx sessionctx.Context, tmpTable tableutil.TempTable, tblInfo *model.TableInfo) error { + tmpTableSize := tmpTable.GetSize() + if tempTableData := ctx.GetSessionVars().TemporaryTableData; tempTableData != nil { + tmpTableSize += tempTableData.GetTableSize(tblInfo.ID) + } + + if tmpTableSize > ctx.GetSessionVars().TMPTableSize { + return table.ErrTempTableFull.GenWithStackByArgs(tblInfo.Name.O) + } + + return nil +} + // AddRecord implements table.Table AddRecord interface. func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { txn, err := sctx.Txn(true) @@ -631,8 +644,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . if m := t.Meta(); m.TempTableType != model.TempTableNone { if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil { - if tmpTable.GetSize() > sctx.GetSessionVars().TMPTableSize { - return nil, table.ErrTempTableFull.GenWithStackByArgs(m.Name.O) + if err := checkTempTableSize(sctx, tmpTable, m); err != nil { + return nil, err } defer handleTempTableSize(tmpTable, txn.Size(), txn) } @@ -1045,8 +1058,8 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type } if m := t.Meta(); m.TempTableType != model.TempTableNone { if tmpTable := addTemporaryTable(ctx, m); tmpTable != nil { - if tmpTable.GetSize() > ctx.GetSessionVars().TMPTableSize { - return table.ErrTempTableFull.GenWithStackByArgs(m.Name.O) + if err := checkTempTableSize(ctx, tmpTable, m); err != nil { + return err } defer handleTempTableSize(tmpTable, txn.Size(), txn) }