Skip to content

Commit

Permalink
util: track the memory usage of rowPtrs in sortedRowContainer (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored and espresso98 committed Apr 25, 2022
1 parent 39d1428 commit 2e26d54
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
24 changes: 14 additions & 10 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ func (m *mutexForRowContainer) RUnlock() {
type RowContainer struct {
m *mutexForRowContainer

fieldType []*types.FieldType
chunkSize int
numRow int

memTracker *memory.Tracker
diskTracker *disk.Tracker
actionSpill *SpillDiskAction
Expand All @@ -97,8 +93,6 @@ func NewRowContainer(fieldType []*types.FieldType, chunkSize int) *RowContainer
rLock: rLock,
wLocks: []*sync.RWMutex{rLock},
},
fieldType: fieldType,
chunkSize: chunkSize,
memTracker: memory.NewTracker(memory.LabelForRowContainer, -1),
diskTracker: disk.NewTracker(memory.LabelForRowContainer, -1),
}
Expand Down Expand Up @@ -438,20 +432,24 @@ type SortedRowContainer struct {
keyCmpFuncs []CompareFunc

actionSpill *SortAndSpillDiskAction
memTracker *memory.Tracker
}

// NewSortedRowContainer creates a new SortedRowContainer in memory.
func NewSortedRowContainer(fieldType []*types.FieldType, chunkSize int, ByItemsDesc []bool,
keyColumns []int, keyCmpFuncs []CompareFunc) *SortedRowContainer {
return &SortedRowContainer{RowContainer: NewRowContainer(fieldType, chunkSize),
src := SortedRowContainer{RowContainer: NewRowContainer(fieldType, chunkSize),
ByItemsDesc: ByItemsDesc, keyColumns: keyColumns, keyCmpFuncs: keyCmpFuncs}
src.memTracker = memory.NewTracker(memory.LabelForRowContainer, -1)
src.RowContainer.GetMemTracker().AttachTo(src.GetMemTracker())
return &src
}

// Close close the SortedRowContainer
func (c *SortedRowContainer) Close() error {
c.ptrM.Lock()
defer c.ptrM.Unlock()
c.GetMemTracker().Consume(int64(-8 * cap(c.ptrM.rowPtrs)))
c.GetMemTracker().Consume(int64(-8 * c.NumRow()))
c.ptrM.rowPtrs = nil
return c.RowContainer.Close()
}
Expand Down Expand Up @@ -486,7 +484,7 @@ func (c *SortedRowContainer) Sort() {
if c.ptrM.rowPtrs != nil {
return
}
c.ptrM.rowPtrs = make([]RowPtr, 0, c.NumRow())
c.ptrM.rowPtrs = make([]RowPtr, 0, c.NumRow()) // The memory usage has been tracked in SortedRowContainer.Add() function
for chkIdx := 0; chkIdx < c.NumChunks(); chkIdx++ {
rowChk, err := c.GetChunk(chkIdx)
// err must be nil, because the chunk is in memory.
Expand All @@ -498,7 +496,6 @@ func (c *SortedRowContainer) Sort() {
}
}
sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess)
c.GetMemTracker().Consume(int64(8 * c.numRow))
}

func (c *SortedRowContainer) sortAndSpillToDisk() {
Expand All @@ -513,6 +510,8 @@ func (c *SortedRowContainer) Add(chk *Chunk) (err error) {
if c.ptrM.rowPtrs != nil {
return ErrCannotAddBecauseSorted
}
// Consume the memory usage of rowPtrs in advance
c.GetMemTracker().Consume(int64(chk.NumRows() * 8))
return c.RowContainer.Add(chk)
}

Expand Down Expand Up @@ -544,6 +543,11 @@ func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction {
return c.actionSpill
}

// GetMemTracker return the memory tracker for the sortedRowContainer
func (c *SortedRowContainer) GetMemTracker() *memory.Tracker {
return c.memTracker
}

// SortAndSpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SortAndSpillDiskAction.Action is
// triggered.
Expand Down
4 changes: 2 additions & 2 deletions util/chunk/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ func TestSortedRowContainerSortSpillAction(t *testing.T) {
var tracker *memory.Tracker
var err error
tracker = rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + 1)
tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())
err = rc.Add(chk)
rc.actionSpill.WaitForTest()
require.NoError(t, err)
require.False(t, rc.AlreadySpilledSafeForTest())
require.Equal(t, chk.MemoryUsage(), rc.GetMemTracker().BytesConsumed())
require.Equal(t, chk.MemoryUsage()+int64(8*chk.NumRows()), rc.GetMemTracker().BytesConsumed())
// The following line is erroneous, since chk is already handled by rc, Add it again causes duplicated memory usage account.
// It is only for test of spill, do not double-add a chunk elsewhere.
err = rc.Add(chk)
Expand Down

0 comments on commit 2e26d54

Please sign in to comment.