Skip to content

Commit

Permalink
Fix memory statistics of MemoryTracker when input stream released. (#…
Browse files Browse the repository at this point in the history
…6134)

close #6130
  • Loading branch information
JinheLin authored Oct 17, 2022
1 parent a57e024 commit 16aac42
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
20 changes: 17 additions & 3 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class MergedTask
passive_merged_segments.fetch_sub(units.size() - 1, std::memory_order_relaxed);
GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Decrement();
GET_METRIC(tiflash_storage_read_thread_seconds, type_merged_task).Observe(sw.elapsedSeconds());
// `setAllStreamFinished` must be called to explicitly releasing all streams for updating memory statistics of `MemoryTracker`.
setAllStreamsFinished();
}

int readBlock();
Expand Down Expand Up @@ -116,13 +118,25 @@ class MergedTask
{
if (!isStreamFinished(i))
{
units[i].pool = nullptr;
units[i].task = nullptr;
units[i].stream = nullptr;
// `MergedUnit.stream` must be released explicitly for updating memory statistics of `MemoryTracker`.
auto & [pool, task, stream] = units[i];
{
MemoryTrackerSetter setter(true, pool->getMemoryTracker().get());
task = nullptr;
stream = nullptr;
}
pool = nullptr;
finished_count++;
}
}

void setAllStreamsFinished()
{
for (size_t i = 0; i < units.size(); ++i)
{
setStreamFinished(i);
}
}
uint64_t seg_id;
std::vector<MergedUnit> units;
bool inited;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ class SegmentReadTaskPool : private boost::noncopyable
return add_to_scheduler;
}

MemoryTrackerPtr & getMemoryTracker()
{
return mem_tracker;
}

private:
int64_t getFreeActiveSegmentCountUnlock();
bool exceptionHappened() const;
Expand Down

0 comments on commit 16aac42

Please sign in to comment.