Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add build option debug-disk-pool to log disk buffers allocated by category #7792

Merged
merged 2 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,9 @@ feature.compose <export-extra>on : <define>TORRENT_EXPORT_EXTRA ;
feature debug-disk-thread : off on : composite propagated ;
feature.compose <debug-disk-thread>on : <define>DEBUG_DISK_THREAD=1 ;

feature debug-disk-pool : off on : composite propagated ;
feature.compose <debug-disk-pool>on : <define>TORRENT_DEBUG_BUFFER_POOL=1 ;

feature msvc-version-macro : off on : composite propagated link-incompatible ;
# ask the compiler to correctly set __cplusplus version
feature.compose <msvc-version-macro>on : <cxxflags>/Zc\:__cplusplus ;
Expand Down
3 changes: 3 additions & 0 deletions examples/custom_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ struct temp_disk_io final : lt::disk_interface
// never free any buffer. We only return buffers owned by the storage
// object
}
#if TORRENT_DEBUG_BUFFER_POOL
void rename_buffer(char*, char const*) override {}
#endif

void update_stats_counters(lt::counters&) const override {}

Expand Down
19 changes: 15 additions & 4 deletions include/libtorrent/aux_/disk_buffer_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ see LICENSE file.

#include "libtorrent/config.hpp"

#if TORRENT_USE_INVARIANT_CHECKS
#include <set>
#ifndef TORRENT_DEBUG_BUFFER_POOL
#define TORRENT_DEBUG_BUFFER_POOL 0
#endif

#if TORRENT_DEBUG_BUFFER_POOL
#include <map>
#endif
#include <vector>
#include <mutex>
Expand Down Expand Up @@ -56,6 +60,9 @@ namespace aux {

void set_settings(settings_interface const& sett);

#if TORRENT_DEBUG_BUFFER_POOL
void rename_buffer(char* buf, char const* category) override;
#endif
private:

void free_buffer_impl(char* buf, std::unique_lock<std::mutex>& l);
Expand Down Expand Up @@ -94,8 +101,12 @@ namespace aux {
// this is specifically exempt from release_asserts
// since it's a quite costly check. Only for debug
// builds.
#if TORRENT_USE_INVARIANT_CHECKS
std::set<char*> m_buffers_in_use;
#if TORRENT_DEBUG_BUFFER_POOL
std::map<char*, char const*> m_buffers_in_use;
std::map<std::string, int> m_histogram;
time_t m_last_log = std::time(nullptr);

void maybe_log();
#endif
#if TORRENT_USE_ASSERTS
int m_magic = 0x1337;
Expand Down
10 changes: 10 additions & 0 deletions include/libtorrent/disk_buffer_holder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ see LICENSE file.
#include "libtorrent/assert.hpp"
#include <utility>

#ifndef TORRENT_DEBUG_BUFFER_POOL
#define TORRENT_DEBUG_BUFFER_POOL 0
#endif

namespace libtorrent {

// the interface for freeing disk buffers, used by the disk_buffer_holder.
Expand All @@ -23,6 +27,9 @@ namespace libtorrent {
struct TORRENT_EXPORT buffer_allocator_interface
{
virtual void free_disk_buffer(char* b) = 0;
#if TORRENT_DEBUG_BUFFER_POOL
virtual void rename_buffer(char* buf, char const* category) = 0;
#endif
protected:
~buffer_allocator_interface() = default;
};
Expand Down Expand Up @@ -77,6 +84,9 @@ namespace libtorrent {

std::ptrdiff_t size() const { return m_size; }

#if TORRENT_DEBUG_BUFFER_POOL
void rename(char const* category);
#endif
private:

buffer_allocator_interface* m_allocator = nullptr;
Expand Down
3 changes: 3 additions & 0 deletions src/disabled_disk_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ struct TORRENT_EXTRA_EXPORT disabled_disk_io final
// since we just have a single zeroed buffer, we don't need to free anything
// here. The buffer is owned by the disabled_disk_io object itself
void free_disk_buffer(char*) override {}
#if TORRENT_DEBUG_BUFFER_POOL
void rename_buffer(char*, char const*) override {}
#endif

std::vector<open_file_state> get_status(storage_index_t) const override
{ return {}; }
Expand Down
8 changes: 8 additions & 0 deletions src/disk_buffer_holder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,13 @@ namespace libtorrent {
m_size = 0;
}

#if TORRENT_DEBUG_BUFFER_POOL
void disk_buffer_holder::rename(char const* category)
{
if (m_buf != nullptr)
m_allocator->rename_buffer(m_buf, category);
}
#endif

disk_buffer_holder::~disk_buffer_holder() { reset(); }
}
50 changes: 44 additions & 6 deletions src/disk_buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ namespace {
}

char* disk_buffer_pool::allocate_buffer_impl(std::unique_lock<std::mutex>& l
, char const*)
, char const* category)
{
TORRENT_ASSERT(m_settings_set);
TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(l.owns_lock());
TORRENT_UNUSED(l);
TORRENT_UNUSED(category);

char* ret = static_cast<char*>(std::malloc(default_block_size));

Expand All @@ -127,17 +128,20 @@ namespace {

++m_in_use;

#if TORRENT_USE_INVARIANT_CHECKS
#if TORRENT_DEBUG_BUFFER_POOL
try
{
TORRENT_ASSERT(m_buffers_in_use.count(ret) == 0);
m_buffers_in_use.insert(ret);
auto const [it, added] = m_buffers_in_use.insert({ret, category});
TORRENT_UNUSED(it);
TORRENT_ASSERT(added);
}
catch (...)
{
free_buffer_impl(ret, l);
return nullptr;
}
m_histogram[category] += 1;
maybe_log();
#endif

if (m_in_use >= m_low_watermark + (m_max_use - m_low_watermark)
Expand Down Expand Up @@ -192,13 +196,47 @@ namespace {
void disk_buffer_pool::remove_buffer_in_use(char* buf)
{
TORRENT_UNUSED(buf);
#if TORRENT_USE_INVARIANT_CHECKS
std::set<char*>::iterator i = m_buffers_in_use.find(buf);
#if TORRENT_DEBUG_BUFFER_POOL
auto i = m_buffers_in_use.find(buf);
TORRENT_ASSERT(i != m_buffers_in_use.end());
TORRENT_ASSERT(m_histogram[i->second] > 0);
m_histogram[i->second] -= 1;
m_buffers_in_use.erase(i);
maybe_log();
#endif
}

#if TORRENT_DEBUG_BUFFER_POOL
void disk_buffer_pool::maybe_log()
{
time_t const now = std::time(nullptr);
if (now - m_last_log > 1)
{
m_last_log = now;
FILE* f = fopen("buffer_pool.log", "a");
fprintf(f, "%ld ", now);
for (auto it = m_histogram.begin(); it != m_histogram.end(); ++it)
{
fprintf(f, "%s:%d ", it->first.c_str(), it->second);
}
fputs("\n", f);
fclose(f);
}
}

void disk_buffer_pool::rename_buffer(char* buf, char const* category)
{
std::unique_lock<std::mutex> l(m_pool_mutex);
auto i = m_buffers_in_use.find(buf);
TORRENT_ASSERT(i != m_buffers_in_use.end());
TORRENT_ASSERT(m_histogram[i->second] > 0);
m_histogram[i->second] -= 1;
i->second = category;
m_histogram[category] += 1;
maybe_log();
}
#endif

void disk_buffer_pool::free_buffer_impl(char* buf, std::unique_lock<std::mutex>& l)
{
TORRENT_ASSERT(buf);
Expand Down
12 changes: 8 additions & 4 deletions src/mmap_disk_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(

status_t mmap_disk_io::do_job(aux::job::read& a, aux::mmap_disk_job* j)
{
a.buf = disk_buffer_holder(m_buffer_pool, m_buffer_pool.allocate_buffer("send buffer"), default_block_size);
a.buf = disk_buffer_holder(m_buffer_pool, m_buffer_pool.allocate_buffer("send buffer (cache miss)"), default_block_size);
if (!a.buf)
{
j->error.ec = error::no_memory;
Expand Down Expand Up @@ -477,6 +477,10 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(

m_stats_counters.inc_stats_counter(counters::num_writing_threads, 1);

#if TORRENT_DEBUG_BUFFER_POOL
buffer.rename("flushing");
#endif

// the actual write operation
int const ret = j->storage->write(m_settings, b
, a.piece, a.offset, file_mode, j->flags, j->error);
Expand Down Expand Up @@ -554,7 +558,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(
int const ret = m_store_buffer.get2(loc1, loc2, [&](char const* buf1, char const* buf2)
{
buffer = disk_buffer_holder(m_buffer_pool
, m_buffer_pool.allocate_buffer("send buffer")
, m_buffer_pool.allocate_buffer("send buffer (cache hit)")
, r.length);
if (!buffer)
{
Expand Down Expand Up @@ -605,7 +609,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(
{
if (m_store_buffer.get({ storage, r.piece, block_offset }, [&](char const* buf)
{
buffer = disk_buffer_holder(m_buffer_pool, m_buffer_pool.allocate_buffer("send buffer"), r.length);
buffer = disk_buffer_holder(m_buffer_pool, m_buffer_pool.allocate_buffer("send buffer (cache hit)"), r.length);
if (!buffer)
{
ec.ec = error::no_memory;
Expand Down Expand Up @@ -641,7 +645,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(
TORRENT_ASSERT(valid_flags(flags));
bool exceeded = false;
disk_buffer_holder buffer(m_buffer_pool, m_buffer_pool.allocate_buffer(
exceeded, o, "receive buffer"), default_block_size);
exceeded, o, "store buffer"), default_block_size);
if (!buffer) aux::throw_ex<std::bad_alloc>();
std::memcpy(buffer.data(), buf, aux::numeric_cast<std::size_t>(r.length));

Expand Down
5 changes: 4 additions & 1 deletion src/peer_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3173,9 +3173,11 @@ namespace {
{
// if any other peer has a busy request to this block, we need
// to cancel it too
t->cancel_block(block_finished);
if (t->has_picker())
{
t->cancel_block(block_finished);
t->picker().write_failed(block_finished);
}

if (t->has_storage())
{
Expand Down Expand Up @@ -3724,6 +3726,7 @@ namespace {
TORRENT_ASSERT(block.piece_index != piece_block::invalid.piece_index);
TORRENT_ASSERT(block.piece_index < t->torrent_file().end_piece());
TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
TORRENT_ASSERT(t->has_picker());

// if all the peers that requested this block has been
// cancelled, then just ignore the cancel.
Expand Down
12 changes: 7 additions & 5 deletions test/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
logger.addHandler(logging.StreamHandler(sys.stdout))


async def handle(websocket, path):
async def handle(websocket):
try:
while True:
message = await websocket.recv()
Expand All @@ -40,10 +40,9 @@ async def handle(websocket, path):
print(e)


if __name__ == '__main__':
async def main() -> None:
port = int(sys.argv[1])
use_ssl = sys.argv[2] != '0'
min_interval = sys.argv[3]
print('python version: %s' % sys.version_info.__str__())

if use_ssl:
Expand All @@ -52,6 +51,9 @@ async def handle(websocket, path):
else:
ssl_context = None

start_server = websockets.serve(handle, '127.0.0.1', port, ssl=ssl_context)
asyncio.get_event_loop().run_until_complete(start_server)
await websockets.serve(handle, '127.0.0.1', port, ssl=ssl_context)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.get_event_loop().run_until_complete(main())
asyncio.get_event_loop().run_forever()
Loading