Skip to content

Commit

Permalink
Use MPSC Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Feb 8, 2024
1 parent c47be11 commit b6775e1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 98 deletions.
110 changes: 16 additions & 94 deletions src/butil/iobuf_profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ IOBufProfiler* IOBufProfiler::GetInstance() {
IOBufProfiler::IOBufProfiler()
: butil::SimpleThread("IOBufProfiler")
, _stop(false)
, _sample_head(NULL)
, _cur_sample(NULL)
, _sleep_ms(MIN_SLEEP_MS) {
CHECK_EQ(0, _stack_map.init(1024));
CHECK_EQ(0, _block_info_map.init(1024));
Expand All @@ -103,37 +101,21 @@ IOBufProfiler::~IOBufProfiler() {
StopAndJoin();
_block_info_map.clear();
_stack_map.clear();
IOBufSample* sample = _cur_sample.load(butil::memory_order_relaxed);
if (sample) {
// Release all sample.
do {
// req was consumed, skip it.
if (sample->next != NULL) {
IOBufSample* const saved_req = sample;
sample = sample->next;
IOBufSample::Destroy(saved_req);
}
} while (!IsComplete(sample));
if (sample) {
IOBufSample::Destroy(sample);
}

// Clear `_sample_queue'.
IOBufSample* sample = NULL;
while (_sample_queue.Dequeue(sample)) {
IOBufSample::Destroy(sample);
}

}

void IOBufProfiler::Submit(IOBufSample* s) {
if (_stop.load(butil::memory_order_relaxed)) {
return;
}

s->next = IOBufSample::UNCONNECTED;
IOBufSample* const prev_head = _sample_head.exchange(
s, butil::memory_order_release);
if (prev_head) {
s->next = prev_head;
return;
}
s->next = NULL;
_cur_sample.store(s, butil::memory_order_release);
_sample_queue.Enqueue(s);
}

void IOBufProfiler::Dump(IOBufSample* s) {
Expand Down Expand Up @@ -284,77 +266,17 @@ void IOBufProfiler::Run() {
}

void IOBufProfiler::Consume() {
IOBufSample* sample = _cur_sample.load(butil::memory_order_relaxed);
if (!sample) {
// If there are no samples, exponentially increase the sleep time
_sleep_ms = std::min(_sleep_ms * 2, MAX_SLEEP_MS);
return;
IOBufSample* sample = NULL;
bool is_empty = true;
while (_sample_queue.Dequeue(sample)) {
Dump(sample);
IOBufSample::Destroy(sample);
is_empty = false;
}

_sleep_ms = MIN_SLEEP_MS;
_cur_sample.store(NULL, butil::memory_order_release);
do {
// sample was consumed, skip it.
if (sample->next != NULL && !sample->block) {
IOBufSample* const saved_req = sample;
sample = sample->next;
IOBufSample::Destroy(saved_req);
}

// Consume all samples.
while (sample->next != NULL) {
IOBufSample* const saved_req = sample;
sample = sample->next;
if (saved_req->block) {
Dump(saved_req);
}
// Release samples until last sample.
IOBufSample::Destroy(saved_req);
}
if (sample->block) {
Dump(sample);
}

// Return when there's no more samples and samples are completely consumed.
if (IsComplete(sample)) {
IOBufSample::Destroy(sample);
return;
}
} while (true);
}

bool IOBufProfiler::IsComplete(IOBufSample* old_head) {
CHECK(!old_head->next);
IOBufSample* new_head = old_head;
IOBufSample* desired = NULL;
if (_sample_head.compare_exchange_strong(
new_head, desired, butil::memory_order_acquire)) {
// No one added new samples.
return true;
}
CHECK_NE(new_head, old_head);
// Above acquire fence pairs release fence of exchange in Submit() to make
// sure that we see all fields of samples set.

// Someone added new samples.
// Reverse the list until old_head.
IOBufSample* tail = NULL;
IOBufSample* sample = new_head;
do {

while (sample->next == IOBufSample::UNCONNECTED) {
sched_yield();
}
IOBufSample* const saved_next = sample->next;
sample->next = tail;
tail = sample;
sample = saved_next;
CHECK(NULL != sample);
} while (sample != old_head);

// Link old list with new list.
old_head->next = tail;
return false;
_sleep_ms = !is_empty ?
MIN_SLEEP_MS :
std::min(_sleep_ms * 2, MAX_SLEEP_MS);
}

void SubmitIOBufSample(IOBuf::Block* block, int64_t ref) {
Expand Down
6 changes: 2 additions & 4 deletions src/butil/iobuf_profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "butil/threading/simple_thread.h"
#include "butil/memory/singleton.h"
#include "butil/containers/flat_map.h"
#include "butil/containers/mpsc_queue.h"

namespace butil {

Expand Down Expand Up @@ -139,11 +140,8 @@ class IOBufProfiler : public butil::SimpleThread {

void Consume();

bool IsComplete(IOBufSample* old_head);

butil::atomic<bool> _stop;
butil::atomic<IOBufSample*> _sample_head;
butil::atomic<IOBufSample*> _cur_sample;
MPSCQueue<IOBufSample*> _sample_queue;

butil::IOBuf _disk_buf; // temp buf before saving the file.
IOBufRefMap _stack_map; // combining same samples to make result smaller.
Expand Down

0 comments on commit b6775e1

Please sign in to comment.