Skip to content

Commit

Permalink
*: Add temp_table_size support for local temporary table (#27205)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Aug 16, 2021
1 parent e4dcef9 commit 462c9dc
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 20 deletions.
6 changes: 3 additions & 3 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,17 @@ func (e *BatchPointGetExec) Open(context.Context) error {
// temporaryTableSnapshot inherits kv.Snapshot and override the BatchGet methods to return empty.
type temporaryTableSnapshot struct {
kv.Snapshot
memBuffer kv.MemBuffer
sessionData variable.TemporaryTableData
}

func (s temporaryTableSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
values := make(map[string][]byte)
if s.memBuffer == nil {
if s.sessionData == nil {
return values, nil
}

for _, key := range keys {
val, err := s.memBuffer.Get(ctx, key)
val, err := s.sessionData.Get(ctx, key)
if err == kv.ErrNotExist {
continue
}
Expand Down
10 changes: 5 additions & 5 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ func (e *DDLExec) toErr(err error) error {
}

// deleteTemporaryTableRecords delete temporary table data.
func deleteTemporaryTableRecords(memData kv.MemBuffer, tblID int64) error {
if memData == nil {
func deleteTemporaryTableRecords(sessionData variable.TemporaryTableData, tblID int64) error {
if sessionData == nil {
return kv.ErrNotExist
}

tblPrefix := tablecodec.EncodeTablePrefix(tblID)
endKey := tablecodec.EncodeTablePrefix(tblID + 1)

iter, err := memData.Iter(tblPrefix, endKey)
iter, err := sessionData.Iter(tblPrefix, endKey)
if err != nil {
return err
}
Expand All @@ -92,7 +92,7 @@ func deleteTemporaryTableRecords(memData kv.MemBuffer, tblID int64) error {
break
}

err = memData.Delete(key)
err = sessionData.DeleteTableKey(tblID, key)
if err != nil {
return err
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func (e *DDLExec) createSessionTemporaryTable(s *ast.CreateTableStmt) error {
return err
}

sessVars.TemporaryTableData = bufferTxn.GetMemBuffer()
sessVars.TemporaryTableData = variable.NewTemporaryTableData(bufferTxn.GetMemBuffer())
}

err = localTempTables.AddTable(dbInfo.Name, tbl)
Expand Down
3 changes: 3 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type MemBuffer interface {

// Len returns the number of entries in the DB.
Len() int

// Size returns sum of keys and values length.
Size() int
}

// LockCtx contains information for LockKeys method.
Expand Down
4 changes: 2 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,9 @@ func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transac

value := iter.Value()
if len(value) == 0 {
err = sessionData.Delete(key)
err = sessionData.DeleteTableKey(tblID, key)
} else {
err = sessionData.Set(key, iter.Value())
err = sessionData.SetTableKey(tblID, key, iter.Value())
}

if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4749,6 +4749,8 @@ func (s *testSessionSuite) TestTMPTableSize(c *C) {
tk.MustExec("use test")
tk.MustExec("set tidb_enable_global_temporary_table=on")
tk.MustExec("create global temporary table t (c1 int, c2 varchar(512)) on commit delete rows")
tk.MustExec("set tidb_enable_noop_functions=on")
tk.MustExec("create temporary table tl (c1 int, c2 varchar(512))")

tk.MustQuery("select @@global.tmp_table_size").Check(testkit.Rows(strconv.Itoa(variable.DefTMPTableSize)))
c.Assert(tk.Se.GetSessionVars().TMPTableSize, Equals, int64(variable.DefTMPTableSize))
Expand All @@ -4771,6 +4773,21 @@ func (s *testSessionSuite) TestTMPTableSize(c *C) {
tk.MustExec("insert into t values (1, repeat('x', 512))")
tk.MustExec("insert into t values (1, repeat('x', 512))")
tk.MustGetErrCode("insert into t values (1, repeat('x', 512))", errno.ErrRecordFileFull)
tk.MustExec("rollback")

// Check local temporary table
tk.MustExec("begin")
tk.MustExec("insert into tl values (1, repeat('x', 512))")
tk.MustExec("insert into tl values (1, repeat('x', 512))")
tk.MustGetErrCode("insert into tl values (1, repeat('x', 512))", errno.ErrRecordFileFull)
tk.MustExec("rollback")

// Check local temporary table with some data in session
tk.MustExec("insert into tl values (1, repeat('x', 512))")
tk.MustExec("begin")
tk.MustExec("insert into tl values (1, repeat('x', 512))")
tk.MustGetErrCode("insert into tl values (1, repeat('x', 512))", errno.ErrRecordFileFull)
tk.MustExec("rollback")
}

func (s *testSessionSuite) TestTiDBEnableGlobalTemporaryTable(c *C) {
Expand Down
71 changes: 67 additions & 4 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,69 @@ func (r *RewritePhaseInfo) Reset() {
r.PreprocessSubQueries = 0
}

// TemporaryTableData is a interface to maintain temporary data in session
type TemporaryTableData interface {
kv.Retriever
// Staging create a new staging buffer inside the MemBuffer.
// Subsequent writes will be temporarily stored in this new staging buffer.
// When you think all modifications looks good, you can call `Release` to public all of them to the upper level buffer.
Staging() kv.StagingHandle
// Release publish all modifications in the latest staging buffer to upper level.
Release(kv.StagingHandle)
// Cleanup cleanups the resources referenced by the StagingHandle.
// If the changes are not published by `Release`, they will be discarded.
Cleanup(kv.StagingHandle)
// GetTableSize get the size of a table
GetTableSize(tblID int64) int64
// DeleteTableKey removes the entry for key k from table
DeleteTableKey(tblID int64, k kv.Key) error
// SetTableKey sets the entry for k from table
SetTableKey(tblID int64, k kv.Key, val []byte) error
}

// temporaryTableData is used for store temporary table data in session
type temporaryTableData struct {
kv.MemBuffer
tblSize map[int64]int64
}

// NewTemporaryTableData creates a new TemporaryTableData
func NewTemporaryTableData(memBuffer kv.MemBuffer) TemporaryTableData {
return &temporaryTableData{
MemBuffer: memBuffer,
tblSize: make(map[int64]int64),
}
}

// GetTableSize get the size of a table
func (d *temporaryTableData) GetTableSize(tblID int64) int64 {
if tblSize, ok := d.tblSize[tblID]; ok {
return tblSize
}
return 0
}

// DeleteTableKey removes the entry for key k from table
func (d *temporaryTableData) DeleteTableKey(tblID int64, k kv.Key) error {
bufferSize := d.MemBuffer.Size()
defer d.updateTblSize(tblID, bufferSize)

return d.MemBuffer.Delete(k)
}

// SetTableKey sets the entry for k from table
func (d *temporaryTableData) SetTableKey(tblID int64, k kv.Key, val []byte) error {
bufferSize := d.MemBuffer.Size()
defer d.updateTblSize(tblID, bufferSize)

return d.MemBuffer.Set(k, val)
}

func (d *temporaryTableData) updateTblSize(tblID int64, beforeSize int) {
delta := int64(d.MemBuffer.Size() - beforeSize)
d.tblSize[tblID] = d.GetTableSize(tblID) + delta
}

const (
// oneShotDef means default, that is tx_isolation_one_shot not set.
oneShotDef txnIsolationLevelOneShotState = iota
Expand Down Expand Up @@ -876,7 +939,7 @@ type SessionVars struct {
LocalTemporaryTables interface{}

// TemporaryTableData stores committed kv values for temporary table for current session.
TemporaryTableData kv.MemBuffer
TemporaryTableData TemporaryTableData

// MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed.
MPPStoreLastFailTime map[string]time.Time
Expand Down Expand Up @@ -2251,16 +2314,16 @@ func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 {

// TemporaryTableSnapshotReader can read the temporary table snapshot data
type TemporaryTableSnapshotReader struct {
memBuffer kv.MemBuffer
temporaryTableData TemporaryTableData
}

// Get gets the value for key k from snapshot.
func (s *TemporaryTableSnapshotReader) Get(ctx context.Context, k kv.Key) ([]byte, error) {
if s.memBuffer == nil {
if s.temporaryTableData == nil {
return nil, kv.ErrNotExist
}

v, err := s.memBuffer.Get(ctx, k)
v, err := s.temporaryTableData.Get(ctx, k)
if err != nil {
return v, err
}
Expand Down
4 changes: 4 additions & 0 deletions store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func newMemBuffer(m *tikv.MemDB) kv.MemBuffer {
return &memBuffer{MemDB: m}
}

func (m *memBuffer) Size() int {
return m.MemDB.Size()
}

func (m *memBuffer) Delete(k kv.Key) error {
return m.MemDB.Delete(k)
}
Expand Down
25 changes: 19 additions & 6 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context,

if m := t.Meta(); m.TempTableType != model.TempTableNone {
if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil {
if tmpTable.GetSize() > sctx.GetSessionVars().TMPTableSize {
return table.ErrTempTableFull.GenWithStackByArgs(m.Name.O)
if err := checkTempTableSize(sctx, tmpTable, m); err != nil {
return err
}
defer handleTempTableSize(tmpTable, txn.Size(), txn)
}
Expand Down Expand Up @@ -617,6 +617,19 @@ func handleTempTableSize(t tableutil.TempTable, txnSizeBefore int, txn kv.Transa
t.SetSize(newSize)
}

func checkTempTableSize(ctx sessionctx.Context, tmpTable tableutil.TempTable, tblInfo *model.TableInfo) error {
tmpTableSize := tmpTable.GetSize()
if tempTableData := ctx.GetSessionVars().TemporaryTableData; tempTableData != nil {
tmpTableSize += tempTableData.GetTableSize(tblInfo.ID)
}

if tmpTableSize > ctx.GetSessionVars().TMPTableSize {
return table.ErrTempTableFull.GenWithStackByArgs(tblInfo.Name.O)
}

return nil
}

// AddRecord implements table.Table AddRecord interface.
func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
txn, err := sctx.Txn(true)
Expand All @@ -631,8 +644,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .

if m := t.Meta(); m.TempTableType != model.TempTableNone {
if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil {
if tmpTable.GetSize() > sctx.GetSessionVars().TMPTableSize {
return nil, table.ErrTempTableFull.GenWithStackByArgs(m.Name.O)
if err := checkTempTableSize(sctx, tmpTable, m); err != nil {
return nil, err
}
defer handleTempTableSize(tmpTable, txn.Size(), txn)
}
Expand Down Expand Up @@ -1045,8 +1058,8 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
}
if m := t.Meta(); m.TempTableType != model.TempTableNone {
if tmpTable := addTemporaryTable(ctx, m); tmpTable != nil {
if tmpTable.GetSize() > ctx.GetSessionVars().TMPTableSize {
return table.ErrTempTableFull.GenWithStackByArgs(m.Name.O)
if err := checkTempTableSize(ctx, tmpTable, m); err != nil {
return err
}
defer handleTempTableSize(tmpTable, txn.Size(), txn)
}
Expand Down

0 comments on commit 462c9dc

Please sign in to comment.