diff --git a/src/butil/iobuf_profiler.cpp b/src/butil/iobuf_profiler.cpp index 0eddd60608..8887817489 100644 --- a/src/butil/iobuf_profiler.cpp +++ b/src/butil/iobuf_profiler.cpp @@ -91,8 +91,6 @@ IOBufProfiler* IOBufProfiler::GetInstance() { IOBufProfiler::IOBufProfiler() : butil::SimpleThread("IOBufProfiler") , _stop(false) - , _sample_head(NULL) - , _cur_sample(NULL) , _sleep_ms(MIN_SLEEP_MS) { CHECK_EQ(0, _stack_map.init(1024)); CHECK_EQ(0, _block_info_map.init(1024)); @@ -103,21 +101,13 @@ IOBufProfiler::~IOBufProfiler() { StopAndJoin(); _block_info_map.clear(); _stack_map.clear(); - IOBufSample* sample = _cur_sample.load(butil::memory_order_relaxed); - if (sample) { - // Release all sample. - do { - // req was consumed, skip it. - if (sample->next != NULL) { - IOBufSample* const saved_req = sample; - sample = sample->next; - IOBufSample::Destroy(saved_req); - } - } while (!IsComplete(sample)); - if (sample) { - IOBufSample::Destroy(sample); - } + + // Clear `_sample_queue'. + IOBufSample* sample = NULL; + while (_sample_queue.Dequeue(sample)) { + IOBufSample::Destroy(sample); } + } void IOBufProfiler::Submit(IOBufSample* s) { @@ -125,15 +115,7 @@ void IOBufProfiler::Submit(IOBufSample* s) { return; } - s->next = IOBufSample::UNCONNECTED; - IOBufSample* const prev_head = _sample_head.exchange( - s, butil::memory_order_release); - if (prev_head) { - s->next = prev_head; - return; - } - s->next = NULL; - _cur_sample.store(s, butil::memory_order_release); + _sample_queue.Enqueue(s); } void IOBufProfiler::Dump(IOBufSample* s) { @@ -284,77 +266,17 @@ void IOBufProfiler::Run() { } void IOBufProfiler::Consume() { - IOBufSample* sample = _cur_sample.load(butil::memory_order_relaxed); - if (!sample) { - // If there are no samples, exponentially increase the sleep time - _sleep_ms = std::min(_sleep_ms * 2, MAX_SLEEP_MS); - return; + IOBufSample* sample = NULL; + bool is_empty = true; + while (_sample_queue.Dequeue(sample)) { + Dump(sample); + IOBufSample::Destroy(sample); + is_empty = false; } - _sleep_ms = MIN_SLEEP_MS; - _cur_sample.store(NULL, butil::memory_order_release); - do { - // sample was consumed, skip it. - if (sample->next != NULL && !sample->block) { - IOBufSample* const saved_req = sample; - sample = sample->next; - IOBufSample::Destroy(saved_req); - } - - // Consume all samples. - while (sample->next != NULL) { - IOBufSample* const saved_req = sample; - sample = sample->next; - if (saved_req->block) { - Dump(saved_req); - } - // Release samples until last sample. - IOBufSample::Destroy(saved_req); - } - if (sample->block) { - Dump(sample); - } - - // Return when there's no more samples and samples are completely consumed. - if (IsComplete(sample)) { - IOBufSample::Destroy(sample); - return; - } - } while (true); -} - -bool IOBufProfiler::IsComplete(IOBufSample* old_head) { - CHECK(!old_head->next); - IOBufSample* new_head = old_head; - IOBufSample* desired = NULL; - if (_sample_head.compare_exchange_strong( - new_head, desired, butil::memory_order_acquire)) { - // No one added new samples. - return true; - } - CHECK_NE(new_head, old_head); - // Above acquire fence pairs release fence of exchange in Submit() to make - // sure that we see all fields of samples set. - - // Someone added new samples. - // Reverse the list until old_head. - IOBufSample* tail = NULL; - IOBufSample* sample = new_head; - do { - - while (sample->next == IOBufSample::UNCONNECTED) { - sched_yield(); - } - IOBufSample* const saved_next = sample->next; - sample->next = tail; - tail = sample; - sample = saved_next; - CHECK(NULL != sample); - } while (sample != old_head); - - // Link old list with new list. - old_head->next = tail; - return false; + _sleep_ms = !is_empty ? + MIN_SLEEP_MS : + std::min(_sleep_ms * 2, MAX_SLEEP_MS); } void SubmitIOBufSample(IOBuf::Block* block, int64_t ref) { diff --git a/src/butil/iobuf_profiler.h b/src/butil/iobuf_profiler.h index 1ea9a734ac..5a10064bfd 100644 --- a/src/butil/iobuf_profiler.h +++ b/src/butil/iobuf_profiler.h @@ -26,6 +26,7 @@ #include "butil/threading/simple_thread.h" #include "butil/memory/singleton.h" #include "butil/containers/flat_map.h" +#include "butil/containers/mpsc_queue.h" namespace butil { @@ -139,11 +140,8 @@ class IOBufProfiler : public butil::SimpleThread { void Consume(); - bool IsComplete(IOBufSample* old_head); - butil::atomic _stop; - butil::atomic _sample_head; - butil::atomic _cur_sample; + MPSCQueue _sample_queue; butil::IOBuf _disk_buf; // temp buf before saving the file. IOBufRefMap _stack_map; // combining same samples to make result smaller.