Skip to content

Commit

Permalink
Add: Parallel View
Browse files Browse the repository at this point in the history
  • Loading branch information
mgevor committed Jul 25, 2023
1 parent c600ffd commit ed3f845
Showing 1 changed file with 39 additions and 24 deletions.
63 changes: 39 additions & 24 deletions include/usearch/index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2449,11 +2449,16 @@ class index_gt {
total_node_bytes += node_head_bytes_() + node_neighbors_bytes_(node);
}

// Firstly, serialize node offsets
// Firstly, calculate and serialize node offsets
node_offsets_buffer_t offsets_buffer{};
node_offsets_t offsets{offsets_buffer};
typename node_offsets_t::offset_t head_offset = state.size * sizeof(node_offsets_buffer_t);
typename node_offsets_t::offset_t vector_offset = head_offset + total_node_bytes;
typename node_offsets_t::offset_t head_offset = 0;
// Align vectors offset
typename node_offsets_t::offset_t vectors_base_offset =
sizeof(file_header_t) + size_ * sizeof(node_offsets_buffer_t) + total_node_bytes;
typename node_offsets_t::offset_t vectors_offset_shift =
config_.vector_alignment - vectors_base_offset % config_.vector_alignment;
typename node_offsets_t::offset_t vector_offset = total_node_bytes + vectors_offset_shift;
for (std::size_t i = 0; i != state.size; ++i) {
offsets.head = head_offset;
offsets.vector = vector_offset;
Expand All @@ -2475,8 +2480,7 @@ class index_gt {
}

// Finally, serialize vectors into aligned address
std::size_t offset = std::ftell(file);
std::fseek(file, config_.vector_alignment - offset % config_.vector_alignment, SEEK_CUR);
std::fseek(file, vectors_offset_shift, SEEK_CUR);
for (std::size_t i = 0; i != state.size; ++i) {
node_t node = node_with_id_(i);
write_chunk(node.vector(), node_vector_bytes_(node));
Expand Down Expand Up @@ -2575,7 +2579,7 @@ class index_gt {
}

// Then, load vectors from aligned address
std::size_t offset = std::ftell(file);
typename node_offsets_t::offset_t offset = std::ftell(file);
std::fseek(file, config_.vector_alignment - offset % config_.vector_alignment, SEEK_CUR);
for (std::size_t i = 0; i != size_; ++i) {
read_chunk(nodes_[i].vector(), node_vector_bytes_(nodes_[i].dim()));
Expand Down Expand Up @@ -2610,7 +2614,7 @@ class index_gt {
if (file_handle == INVALID_HANDLE_VALUE)
return result.failed("Opening file failed!");

size_t file_length = GetFileSize(file_handle, 0);
typename node_offsets_t::offset_t file_length = GetFileSize(file_handle, 0);
HANDLE mapping_handle = CreateFileMapping(file_handle, 0, PAGE_READONLY, 0, 0, 0);
if (mapping_handle == 0) {
CloseHandle(file_handle);
Expand Down Expand Up @@ -2681,24 +2685,35 @@ class index_gt {
entry_id_ = static_cast<id_t>(state.entry_idx);
}

// TODO: Skip offsets for now, it may used in the feature for parallel loading.
std::size_t progress_bytes = sizeof(file_header_t) + size_ * sizeof(node_offsets_buffer_t);

// First, locate every node headers packed into files
for (std::size_t i = 0; i != size_; ++i) {
byte_t* tape = (byte_t*)(file + progress_bytes);
level_t level = misaligned_load<level_t>(tape + sizeof(label_t) + sizeof(dim_t));

nodes_[i] = node_t{tape, nullptr};
progress_bytes += node_head_bytes_() + node_neighbors_bytes_(level);
}
// Read nodes and vectors
// Divide tasks between threads
std::size_t threads_count = (std::min)(limits_.threads(), size_ / 1'000); // Use optimal thread count
std::size_t thread_tasks_count = size_ / (threads_count + 1); // + main thread
std::size_t main_thread_tasks_count = thread_tasks_count + (threads_count ? size_ % (threads_count + 1) : 0);

// Task
typename node_offsets_t::offset_t base_offset = sizeof(file_header_t);
typename node_offsets_t::offset_t nodes_base_offset = base_offset + size_ * sizeof(node_offsets_buffer_t);
auto task = [&](std::size_t start, std::size_t count) {
for (std::size_t i = start; i != start + count; ++i) {
node_offsets_t offsets{file + base_offset + i * sizeof(node_offsets_buffer_t)};
byte_t* tape = file + nodes_base_offset + offsets.head;
byte_t* vector = file + nodes_base_offset + offsets.vector;
nodes_[i] = node_t{tape, (scalar_t*)vector};
}
};

// Then, locate every vector packed into file. Note, vectors are serialized in aligned address
progress_bytes += config_.vector_alignment - progress_bytes % config_.vector_alignment;
for (std::size_t i = 0; i != size_; ++i) {
nodes_[i] = node_t{nodes_[i].tape(), (scalar_t*)(file + progress_bytes)};
progress_bytes += node_vector_bytes_(nodes_[i].dim());
progress(i, size_);
// Run threads
std::vector<std::thread> threads;
for (std::size_t i = 0; i < threads_count; ++i)
threads.push_back(std::thread(task, i * thread_tasks_count, thread_tasks_count));
task(threads_count * thread_tasks_count, main_thread_tasks_count);

// Wait to finish
progress(main_thread_tasks_count, size_);
for (std::size_t i = 0; i < threads_count; ++i) {
threads[i].join();
progress(main_thread_tasks_count + i * threads_count, size_);
}

return {};
Expand Down

0 comments on commit ed3f845

Please sign in to comment.