diff --git a/include/usearch/index.hpp b/include/usearch/index.hpp index e81b4af9..7a857aaf 100644 --- a/include/usearch/index.hpp +++ b/include/usearch/index.hpp @@ -2449,11 +2449,16 @@ class index_gt { total_node_bytes += node_head_bytes_() + node_neighbors_bytes_(node); } - // Firstly, serialize node offsets + // Firstly, calculate and serialize node offsets node_offsets_buffer_t offsets_buffer{}; node_offsets_t offsets{offsets_buffer}; - typename node_offsets_t::offset_t head_offset = state.size * sizeof(node_offsets_buffer_t); - typename node_offsets_t::offset_t vector_offset = head_offset + total_node_bytes; + typename node_offsets_t::offset_t head_offset = 0; + // Align vectors offset + typename node_offsets_t::offset_t vectors_base_offset = + sizeof(file_header_t) + size_ * sizeof(node_offsets_buffer_t) + total_node_bytes; + typename node_offsets_t::offset_t vectors_offset_shift = + config_.vector_alignment - vectors_base_offset % config_.vector_alignment; + typename node_offsets_t::offset_t vector_offset = total_node_bytes + vectors_offset_shift; for (std::size_t i = 0; i != state.size; ++i) { offsets.head = head_offset; offsets.vector = vector_offset; @@ -2475,8 +2480,7 @@ class index_gt { } // Finally, serialize vectors into aligned address - std::size_t offset = std::ftell(file); - std::fseek(file, config_.vector_alignment - offset % config_.vector_alignment, SEEK_CUR); + std::fseek(file, vectors_offset_shift, SEEK_CUR); for (std::size_t i = 0; i != state.size; ++i) { node_t node = node_with_id_(i); write_chunk(node.vector(), node_vector_bytes_(node)); @@ -2575,7 +2579,7 @@ class index_gt { } // Then, load vectors from aligned address - std::size_t offset = std::ftell(file); + typename node_offsets_t::offset_t offset = std::ftell(file); std::fseek(file, config_.vector_alignment - offset % config_.vector_alignment, SEEK_CUR); for (std::size_t i = 0; i != size_; ++i) { read_chunk(nodes_[i].vector(), node_vector_bytes_(nodes_[i].dim())); @@ -2610,7 +2614,7 @@ class index_gt { if (file_handle == INVALID_HANDLE_VALUE) return result.failed("Opening file failed!"); - size_t file_length = GetFileSize(file_handle, 0); + typename node_offsets_t::offset_t file_length = GetFileSize(file_handle, 0); HANDLE mapping_handle = CreateFileMapping(file_handle, 0, PAGE_READONLY, 0, 0, 0); if (mapping_handle == 0) { CloseHandle(file_handle); @@ -2681,24 +2685,35 @@ class index_gt { entry_id_ = static_cast(state.entry_idx); } - // TODO: Skip offsets for now, it may used in the feature for parallel loading. - std::size_t progress_bytes = sizeof(file_header_t) + size_ * sizeof(node_offsets_buffer_t); - - // First, locate every node headers packed into files - for (std::size_t i = 0; i != size_; ++i) { - byte_t* tape = (byte_t*)(file + progress_bytes); - level_t level = misaligned_load(tape + sizeof(label_t) + sizeof(dim_t)); - - nodes_[i] = node_t{tape, nullptr}; - progress_bytes += node_head_bytes_() + node_neighbors_bytes_(level); - } + // Read nodes and vectors + // Divide tasks between threads + std::size_t threads_count = (std::min)(limits_.threads(), size_ / 1'000); // Use optimal thread count + std::size_t thread_tasks_count = size_ / (threads_count + 1); // + main thread + std::size_t main_thread_tasks_count = thread_tasks_count + (threads_count ? size_ % (threads_count + 1) : 0); + + // Task + typename node_offsets_t::offset_t base_offset = sizeof(file_header_t); + typename node_offsets_t::offset_t nodes_base_offset = base_offset + size_ * sizeof(node_offsets_buffer_t); + auto task = [&](std::size_t start, std::size_t count) { + for (std::size_t i = start; i != start + count; ++i) { + node_offsets_t offsets{file + base_offset + i * sizeof(node_offsets_buffer_t)}; + byte_t* tape = file + nodes_base_offset + offsets.head; + byte_t* vector = file + nodes_base_offset + offsets.vector; + nodes_[i] = node_t{tape, (scalar_t*)vector}; + } + }; - // Then, locate every vector packed into file. Note, vectors are serialized in aligned address - progress_bytes += config_.vector_alignment - progress_bytes % config_.vector_alignment; - for (std::size_t i = 0; i != size_; ++i) { - nodes_[i] = node_t{nodes_[i].tape(), (scalar_t*)(file + progress_bytes)}; - progress_bytes += node_vector_bytes_(nodes_[i].dim()); - progress(i, size_); + // Run threads + std::vector threads; + for (std::size_t i = 0; i < threads_count; ++i) + threads.push_back(std::thread(task, i * thread_tasks_count, thread_tasks_count)); + task(threads_count * thread_tasks_count, main_thread_tasks_count); + + // Wait to finish + progress(main_thread_tasks_count, size_); + for (std::size_t i = 0; i < threads_count; ++i) { + threads[i].join(); + progress(main_thread_tasks_count + i * threads_count, size_); } return {};