Skip to content

Commit

Permalink
Simulator push
Browse files Browse the repository at this point in the history
  • Loading branch information
Devesh Sarda committed Feb 12, 2024
1 parent d3a3073 commit 90807f0
Show file tree
Hide file tree
Showing 24 changed files with 1,421 additions and 1,012 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,5 @@ Thumbs.db
# End of https://www.toptal.com/developers/gitignore/api/python

src/cpp/third_party
test_datasets
test_datasets
simulator/datasets
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,4 @@ pybind11_add_module(_storage ${STORAGE_BINDINGS})
target_link_libraries(_storage PRIVATE ${PROJECT_NAME} ${TORCH_PYTHON_LIBRARY})

add_custom_target(bindings)
add_dependencies(bindings _config _data _manager _nn _pipeline _report _storage)
add_dependencies(bindings _config _data _manager _nn _pipeline _report _storage)
8 changes: 8 additions & 0 deletions simulator/configs/arvix.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"dataset_name" : "ogbn_arxiv",
"features_stats" : {
"page_size" : "16 KB",
"feature_dimension" : 128,
"feature_size" : "float32"
}
}
3 changes: 3 additions & 0 deletions simulator/configs/papers.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"dataset_name" : "ogbn_papers100m"
}
Binary file added simulator/images/arvix.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
52 changes: 52 additions & 0 deletions simulator/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
import json
import argparse
import random

from src.dataset_loader import *
from src.features_loader import *
from src.sampler import *
from src.visualizer import *


def read_config_file(config_file):
with open(config_file, "r") as reader:
return json.load(reader)


def read_arguments():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("config_file", type=str, help="The config file containing the details for the simulation")
return parser.parse_args()


IMAGES_SAVE_DIR = "images"


def main():
arguments = read_arguments()
config = read_config_file(arguments.config_file)

# Create the loaders
data_loader = DatasetLoader(config["dataset_name"])
features_loader = FeaturesLoader(data_loader, config["features_stats"])
sampler = SubgraphSampler(data_loader, features_loader)

# Perform sampling
nodes_to_sample = [i for i in range(data_loader.get_num_nodes())]
random.shuffle(nodes_to_sample)

pages_loaded = []
for curr_node in nodes_to_sample:
num_pages_read = sampler.perform_sampling_for_node(curr_node)
if num_pages_read > 0:
pages_loaded.append(num_pages_read)
print("Got result for", len(pages_loaded), "nodes out of", len(nodes_to_sample), "nodes")

# Save the histogram
save_path = os.path.join(IMAGES_SAVE_DIR, os.path.basename(arguments.config_file).replace("json", "png"))
visualize_results(pages_loaded, save_path, config["dataset_name"])


if __name__ == "__main__":
main()
Empty file added simulator/src/__init__.py
Empty file.
52 changes: 52 additions & 0 deletions simulator/src/dataset_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import subprocess
import os
import numpy as np
from collections import defaultdict


class DatasetLoader:
SAVE_DIR = "datasets"
EDGES_PATH = "edges/train_edges.bin"

def __init__(self, name):
self.name = name
os.makedirs(DatasetLoader.SAVE_DIR, exist_ok=True)
self.save_dir = os.path.join(DatasetLoader.SAVE_DIR, self.name)
if not os.path.exists(self.save_dir):
self.create_dataset()
self.load_dataset()

def create_dataset(self):
command_to_run = f"marius_preprocess --dataset {self.name} --output_directory {self.save_dir}"
print("Running command", command_to_run)
subprocess.check_output(command_to_run, shell=True)

def load_dataset(self):
# Load the file
edges_path = os.path.join(self.save_dir, DatasetLoader.EDGES_PATH)
with open(edges_path, "rb") as reader:
edges_bytes = reader.read()

# Create the adjacency map
edges_flaten_arr = np.frombuffer(edges_bytes, dtype=np.int32)
self.nodes = set(edges_flaten_arr)
edges_arr = edges_flaten_arr.reshape((-1, 2))
self.num_edges = len(edges_arr)

self.adjacency_map = {}
for source, target in edges_arr:
if source not in self.adjacency_map:
self.adjacency_map[source] = []
self.adjacency_map[source].append(target)

def get_num_nodes(self):
return len(self.nodes)

def get_neigbhors_for_node(self, node_id):
if node_id not in self.adjacency_map:
return []

return self.adjacency_map[node_id]

def get_num_edges(self):
return self.num_edges
21 changes: 21 additions & 0 deletions simulator/src/features_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import humanfriendly
import os
import math


class FeaturesLoader:
def __init__(self, data_loader, features_stat):
self.data_loader = data_loader
self.page_size = humanfriendly.parse_size(features_stat["page_size"])
self.feature_size = int("".join(c for c in features_stat["feature_size"] if c.isdigit()))
self.node_feature_size = self.feature_size * features_stat["feature_dimension"]

self.nodes_per_page = max(int(self.page_size / self.node_feature_size), 1)
self.total_pages = int(math.ceil(data_loader.get_num_nodes() / (1.0 * self.nodes_per_page)))

def get_node_page(self, node_id):
return int(node_id / self.nodes_per_page)

def get_total_file_size(self):
total_bytes = self.page_size * self.total_bytes
return humanfriendly.format_size(total_bytes)
10 changes: 10 additions & 0 deletions simulator/src/sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class SubgraphSampler:
def __init__(self, data_loader, features_loader):
self.data_loader = data_loader
self.features_loader = features_loader

def perform_sampling_for_node(self, node_id):
pages_read = set()
for neighbor in self.data_loader.get_neigbhors_for_node(node_id):
pages_read.add(self.features_loader.get_node_page(neighbor))
return len(pages_read)
18 changes: 18 additions & 0 deletions simulator/src/visualizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import matplotlib.pyplot as plt
import os


def visualize_results(pages_loaded, save_path, dataset_name, num_bins=50):
# Create the histogram
plt.figure()
plt.ecdf(pages_loaded, label="CDF")
plt.hist(pages_loaded, bins=num_bins, histtype="step", density=True, cumulative=True, label="Cumulative histogram")
plt.xlabel("Number of pages loaded for node inference")
plt.ylabel("Percentage of nodes")
plt.title("Number of pages loaded for node inference on " + dataset_name)
plt.xlim(0, 50)
plt.legend()

# Save the result
print("Saving the result to", save_path)
plt.savefig(save_path)
4 changes: 2 additions & 2 deletions src/cpp/src/data/dataloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,9 @@ void DataLoader::setBufferOrdering() {
if (graph_storage_->useInMemorySubGraph()) {
auto tup = getEdgeBucketOrdering(options->edge_bucket_ordering, options->num_partitions, options->buffer_capacity, options->fine_to_coarse_ratio,
options->num_cache_partitions, options->randomly_assign_edge_buckets);

buffer_states_ = std::get<0>(tup);
edge_buckets_per_buffer_ = std::get<1>(tup);

edge_buckets_per_buffer_iterator_ = edge_buckets_per_buffer_.begin();

graph_storage_->setBufferOrdering(buffer_states_);
Expand All @@ -304,7 +304,6 @@ void DataLoader::setBufferOrdering() {
graph_storage_->getNumNodes(), options->num_partitions, options->buffer_capacity, options->fine_to_coarse_ratio, options->num_cache_partitions);
buffer_states_ = std::get<0>(tup);
node_ids_per_buffer_ = std::get<1>(tup);

node_ids_per_buffer_iterator_ = node_ids_per_buffer_.begin();

graph_storage_->setBufferOrdering(buffer_states_);
Expand Down Expand Up @@ -367,6 +366,7 @@ shared_ptr<Batch> DataLoader::getNextBatch() {
}
batch_lock.unlock();
batch_cv_->notify_all();

return batch;
}

Expand Down
4 changes: 2 additions & 2 deletions src/cpp/src/nn/encoders/encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ shared_ptr<Layer> GeneralEncoder::initGNNLayer(std::shared_ptr<LayerConfig> laye
auto options = std::dynamic_pointer_cast<GNNLayerOptions>(layer_config->options);

std::shared_ptr<Layer> layer;

bool is_none = options->type == GNNLayerType::NONE;
if (options->type == GNNLayerType::GRAPH_SAGE) {
string name = "graph_sage_layer:" + std::to_string(stage_id) + "_" + std::to_string(layer_id);
layer = std::make_shared<GraphSageLayer>(layer_config, device_);
Expand All @@ -138,7 +138,7 @@ shared_ptr<Layer> GeneralEncoder::initGNNLayer(std::shared_ptr<LayerConfig> laye
layer = std::make_shared<RGCNLayer>(layer_config, num_relations_, device_);
register_module<RGCNLayer>(name, std::dynamic_pointer_cast<RGCNLayer>(layer));
} else {
throw std::runtime_error("Unrecognized GNN layer type");
throw std::runtime_error("Unrecognized GNN layer type of");
}

return layer;
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/src/nn/layers/gnn/gat_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ torch::Tensor GATLayer::forward(torch::Tensor inputs, DENSEGraph dense_graph, bo
nbr_atn_weights = nbr_atn_weights.transpose(0, 2); // [total_num_nbrs, 1, num_heads_]
self_atn_weights = self_atn_weights.transpose(0, 2); // [num_to_encode, 1, num_heads_]

std::cout << "nbr_atn_weights sizes " << nbr_atn_weights.sizes() << std::endl;
std::cout << "self_atn_weights sizes " << self_atn_weights.sizes() << std::endl;
std::cout << "neighbor_offsets sizes " << neighbor_offsets.sizes() << std::endl;
std::cout << "parent_ids sizes " << parent_ids.sizes() << std::endl;
std::cout << "total_neighbors sizes " << total_neighbors.sizes() << std::endl;
std::tie(nbr_atn_weights, self_atn_weights) = attention_softmax(nbr_atn_weights, self_atn_weights, neighbor_offsets, parent_ids, total_neighbors);

nbr_atn_weights = nbr_atn_weights.transpose(0, 2);
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/src/nn/layers/gnn/layer_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ std::tuple<torch::Tensor, torch::Tensor> attention_softmax(torch::Tensor neighbo
torch::Tensor has_nbrs_mask = torch::not_equal(num_nbrs, 0);
has_nbrs_mask = has_nbrs_mask.reshape({-1, 1, 1});

std::cout << "Neighbor attention of size " << neighbor_attention.sizes() << ", and segment offsets of size " << segment_offsets.sizes() << std::endl;
torch::Tensor seg_max = segmented_max_with_offsets(neighbor_attention, segment_offsets);
std::cout << "seg_max of sizes " << seg_max.sizes() << ", and attention of sizes " << self_attention.sizes() << std::endl;
torch::Tensor attention_max = torch::where(has_nbrs_mask, torch::maximum(seg_max, self_attention), self_attention);

self_attention = torch::exp(self_attention - attention_max);
Expand Down
1 change: 0 additions & 1 deletion src/cpp/src/nn/model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ void Model::train_batch(shared_ptr<Batch> batch, bool call_step) {

if (learning_task_ == LearningTask::LINK_PREDICTION) {
auto all_scores = forward_lp(batch, true);

torch::Tensor pos_scores = std::get<0>(all_scores);
torch::Tensor neg_scores = std::get<1>(all_scores);
torch::Tensor inv_pos_scores = std::get<2>(all_scores);
Expand Down
1 change: 0 additions & 1 deletion src/cpp/src/pipeline/trainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ void SynchronousTrainer::train(int num_epochs) {
while (dataloader_->hasNextBatch()) {
// gets data and parameters for the next batch
shared_ptr<Batch> batch = dataloader_->getBatch();

if (dataloader_->graph_storage_->embeddingsOffDevice()) {
// transfers batch to the GPU
batch->to(model_->device_);
Expand Down
1 change: 1 addition & 0 deletions src/cpp/src/storage/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ void PartitionBuffer::load() {
SPDLOG_ERROR("Unable to allocate buffer memory\nError: {}", errno);
throw std::runtime_error("");
}

memset_wrapper(buff_mem_, 0, capacity_ * partition_size_ * embedding_size_ * dtype_size_);
buffer_tensor_view_ = torch::from_blob(buff_mem_, {capacity_ * partition_size_, embedding_size_}, dtype_);

Expand Down
19 changes: 16 additions & 3 deletions src/cpp/src/storage/graph_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,19 @@ bool GraphModelStorage::embeddingsOffDevice() {
}
}

void printTensor(std::string tensor_name, torch::Tensor tensor) {
auto data_ptr = tensor.data_ptr<int64_t>();
int num_elements = tensor.numel();
std::cout << "Tensor " << tensor_name << " has values: ";
for (int i = 0; i < num_elements; ++i) {
std::cout << data_ptr[i] << " ";
}
std::cout << std::endl;
}

void GraphModelStorage::initializeInMemorySubGraph(torch::Tensor buffer_state, int num_hash_maps) {
if (useInMemorySubGraph()) {
current_subgraph_state_ = std::make_shared<InMemorySubgraphState>();

buffer_state = buffer_state.to(torch::kInt64);

int buffer_size = buffer_state.size(0);
Expand All @@ -378,6 +387,7 @@ void GraphModelStorage::initializeInMemorySubGraph(torch::Tensor buffer_state, i
torch::Tensor edge_bucket_sizes = torch::from_blob(edge_bucket_sizes_.data(), {(int)edge_bucket_sizes_.size()}, torch::kInt64);
torch::Tensor edge_bucket_ends_disk = edge_bucket_sizes.cumsum(0);
torch::Tensor edge_bucket_starts_disk = edge_bucket_ends_disk - edge_bucket_sizes;

auto edge_bucket_sizes_accessor = edge_bucket_sizes.accessor<int64_t, 1>();
auto edge_bucket_starts_disk_accessor = edge_bucket_starts_disk.accessor<int64_t, 1>();

Expand All @@ -398,10 +408,9 @@ void GraphModelStorage::initializeInMemorySubGraph(torch::Tensor buffer_state, i
torch::Tensor in_mem_edge_bucket_starts = in_mem_edge_bucket_sizes.cumsum(0);
int64_t total_size = in_mem_edge_bucket_starts[-1].item<int64_t>();
in_mem_edge_bucket_starts = in_mem_edge_bucket_starts - in_mem_edge_bucket_sizes;

auto in_mem_edge_bucket_starts_accessor = in_mem_edge_bucket_starts.accessor<int64_t, 1>();

current_subgraph_state_->all_in_memory_edges_ = torch::empty({total_size, storage_ptrs_.edges->dim1_size_}, torch::kInt64);

if (hasEdgeWeights()) {
current_subgraph_state_->all_in_memory_edges_weights_ = torch::empty({total_size, storage_ptrs_.edges_weights->dim1_size_}, torch::kFloat32);
}
Expand Down Expand Up @@ -455,10 +464,13 @@ void GraphModelStorage::initializeInMemorySubGraph(torch::Tensor buffer_state, i
mapped_edges_weights = current_subgraph_state_->all_in_memory_edges_weights_.select(1, 0);
}

// Get the edges sorted by src
std::tuple<EdgeList, EdgeList> sorted_mapped_edges =
merge_sorted_edge_buckets(mapped_edges, mapped_edges_weights, in_mem_edge_bucket_starts, buffer_size, true);
mapped_edges = std::get<0>(sorted_mapped_edges);
mapped_edges_src_weights = std::get<1>(sorted_mapped_edges);

// Get the edges sorted by dst
std::tuple<EdgeList, EdgeList> sorted_mapped_edges_dst_sort =
merge_sorted_edge_buckets(mapped_edges, mapped_edges_weights, in_mem_edge_bucket_starts, buffer_size, false);
mapped_edges_dst_sort = std::get<0>(sorted_mapped_edges_dst_sort);
Expand All @@ -485,6 +497,7 @@ void GraphModelStorage::initializeInMemorySubGraph(torch::Tensor buffer_state, i
getNextSubGraph();
}
}

} else {
// Either nothing buffered (in memory training) or eval and doing full graph evaluation
current_subgraph_state_ = std::make_shared<InMemorySubgraphState>();
Expand Down
1 change: 0 additions & 1 deletion src/cpp/src/storage/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ std::map<std::string, shared_ptr<Storage>> initializeEdges(shared_ptr<StorageCon
storage_config->dataset->dataset_dir + PathConstants::edges_directory + PathConstants::test + PathConstants::edge_partition_offsets_file;

if (train_edge_storage != nullptr) {
std::cout << "Reading train edges partitions from file " << train_edges_partitions << std::endl;
train_edge_storage->readPartitionSizes(train_edges_partitions);
}

Expand Down
3 changes: 3 additions & 0 deletions src/cpp/src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ PartitionBufferStorage::PartitionBufferStorage(string filename, int64_t dim0_siz
int64_t partition_size = ceil((double)dim0_size_ / options_->num_partitions);
device_ = torch::kCPU;

std::cout << "Creating buffer with file " << filename_ << std::endl;
buffer_ = new PartitionBuffer(options_->buffer_capacity, options_->num_partitions, options_->fine_to_coarse_ratio, partition_size, dim1_size_, dim0_size_,
dtype_, filename_, options_->prefetching);
}
Expand All @@ -91,6 +92,7 @@ PartitionBufferStorage::PartitionBufferStorage(string filename, torch::Tensor da
int64_t partition_size = ceil((double)dim0_size_ / options_->num_partitions);
device_ = torch::kCPU;

std::cout << "Creating buffer with file " << filename_ << std::endl;
buffer_ = new PartitionBuffer(options_->buffer_capacity, options_->num_partitions, options_->fine_to_coarse_ratio, partition_size, dim1_size_, dim0_size_,
dtype_, filename_, options_->prefetching);
}
Expand All @@ -105,6 +107,7 @@ PartitionBufferStorage::PartitionBufferStorage(string filename, shared_ptr<Parti
int64_t partition_size = ceil((double)dim0_size_ / options_->num_partitions);
device_ = torch::kCPU;

std::cout << "Creating buffer with file " << filename_ << std::endl;
buffer_ = new PartitionBuffer(options_->buffer_capacity, options_->num_partitions, options_->fine_to_coarse_ratio, partition_size, dim1_size_, dim0_size_,
dtype_, filename_, options_->prefetching);
}
Expand Down
Loading

0 comments on commit 90807f0

Please sign in to comment.