From f4ea97324c1a6fc426d32de3129f9aad1636e10d Mon Sep 17 00:00:00 2001 From: Weiyue Su Date: Thu, 11 Mar 2021 22:32:27 +0800 Subject: [PATCH 01/15] batch random_sample --- .../distributed/service/graph_brpc_client.cc | 35 ++++++++++++++----- .../distributed/service/graph_brpc_server.cc | 25 ++++++++++--- .../distributed/table/common_graph_table.cc | 8 ++--- .../distributed/table/common_graph_table.h | 2 +- paddle/fluid/distributed/table/table.h | 4 +-- 5 files changed, 53 insertions(+), 21 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index 16014c8dbf23f..dc51da6cd15a0 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -52,13 +52,27 @@ std::future GraphBrpcClient::sample(uint32_t table_id, size_t bytes_size = io_buffer_itr.bytes_left(); char *buffer = new char[bytes_size]; io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); - int start = 0; - while (start < bytes_size) { - GraphNode node; - node.recover_from_buffer(buffer + start); - start += node.get_size(); - res.push_back(node); + + size_t num_nodes = *(size_t *)buffer; + int *actual_sizes = (int *)(buffer + sizeof(size_t)); + char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * num_nodes; + + std::vector > ress; + std::vector res_; + int offset = 0; + for (size_t idx = 0; idx < num_nodes; ++idx){ + int actual_size = actual_sizes[idx]; + int start = 0; + while (start < actual_size) { + GraphNode node; + node.recover_from_buffer(node_buffer + offset + start); + start += node.get_size(); + res_.push_back(node); + } + offset += actual_size; + ress.push_back(res_); } + res = ress[0]; } closure->set_promise_value(ret); }); @@ -70,8 +84,11 @@ std::future GraphBrpcClient::sample(uint32_t table_id, closure->request(0)->set_table_id(table_id); closure->request(0)->set_client_id(_client_id); // std::string type_str = GraphNode::node_type_to_string(type); - closure->request(0)->add_params((char *)&node_id, sizeof(uint64_t)); - // closure->request(0)->add_params(type_str.c_str(), type_str.size()); + std::vector node_ids; + node_ids.push_back(node_id); + size_t num_nodes = node_ids.size(); + + closure->request(0)->add_params((char *)node_ids.data(), sizeof(uint64_t)*num_nodes); closure->request(0)->add_params((char *)&sample_size, sizeof(int)); PsService_Stub rpc_stub(get_cmd_channel(server_index)); closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); @@ -129,4 +146,4 @@ int32_t GraphBrpcClient::initialize() { return 0; } } -} \ No newline at end of file +} diff --git a/paddle/fluid/distributed/service/graph_brpc_server.cc b/paddle/fluid/distributed/service/graph_brpc_server.cc index ecb078c0ad400..14748bce78664 100644 --- a/paddle/fluid/distributed/service/graph_brpc_server.cc +++ b/paddle/fluid/distributed/service/graph_brpc_server.cc @@ -283,12 +283,27 @@ int32_t GraphBrpcService::graph_random_sample(Table *table, "graph_random_sample request requires at least 2 arguments"); return 0; } - uint64_t node_id = *(uint64_t *)(request.params(0).c_str()); + size_t num_nodes = request.params(0).size(); + uint64_t *node_data = (uint64_t *)(request.params(0).c_str()); int sample_size = *(uint64_t *)(request.params(1).c_str()); - char *buffer; - int actual_size; - table->random_sample(node_id, sample_size, buffer, actual_size); - cntl->response_attachment().append(buffer, actual_size); + + std::vector*> tasks; + std::vector buffers(num_nodes); + std::vector actual_sizes(num_nodes); + + for (size_t idx = 0; idx < num_nodes; ++idx){ + std::future task = table->random_sample(node_data[idx], sample_size, + buffers[idx], actual_sizes[idx]); + tasks.push_back(&task); + } + for (size_t idx = 0; idx < num_nodes; ++idx){ + tasks[idx]->get(); + } + cntl->response_attachment().append(&num_nodes, sizeof(size_t)); + cntl->response_attachment().append(actual_sizes.data(), sizeof(int)*num_nodes); + for (size_t idx = 0; idx < num_nodes; ++idx){ + cntl->response_attachment().append(buffers[idx], actual_sizes[idx]); + } return 0; } diff --git a/paddle/fluid/distributed/table/common_graph_table.cc b/paddle/fluid/distributed/table/common_graph_table.cc index 5c97ba631d57d..4d126c3818af3 100644 --- a/paddle/fluid/distributed/table/common_graph_table.cc +++ b/paddle/fluid/distributed/table/common_graph_table.cc @@ -140,7 +140,7 @@ GraphNode *GraphTable::find_node(uint64_t id) { uint32_t GraphTable::get_thread_pool_index(uint64_t node_id) { return node_id % shard_num_per_table % task_pool_size_; } -int32_t GraphTable::random_sample(uint64_t node_id, int sample_size, +std::future GraphTable::random_sample(uint64_t node_id, int sample_size, char *&buffer, int &actual_size) { return _shards_task_pool[get_thread_pool_index(node_id)] ->enqueue([&]() -> int { @@ -166,8 +166,8 @@ int32_t GraphTable::random_sample(uint64_t node_id, int sample_size, } actual_size = total_size; return 0; - }) - .get(); + }); + //.get(); // GraphNode *node = find_node(node_id, type); // if (node == NULL) { // actual_size = 0; @@ -275,4 +275,4 @@ int32_t GraphTable::initialize() { return 0; } } -}; \ No newline at end of file +}; diff --git a/paddle/fluid/distributed/table/common_graph_table.h b/paddle/fluid/distributed/table/common_graph_table.h index 1f2b8c86d363b..8a806f8278877 100644 --- a/paddle/fluid/distributed/table/common_graph_table.h +++ b/paddle/fluid/distributed/table/common_graph_table.h @@ -71,7 +71,7 @@ class GraphTable : public SparseTable { virtual ~GraphTable() {} virtual int32_t pull_graph_list(int start, int size, char *&buffer, int &actual_size); - virtual int32_t random_sample(uint64_t node_id, int sampe_size, char *&buffer, + virtual std::future random_sample(uint64_t node_id, int sampe_size, char *&buffer, int &actual_size); virtual int32_t initialize(); int32_t load(const std::string &path, const std::string ¶m); diff --git a/paddle/fluid/distributed/table/table.h b/paddle/fluid/distributed/table/table.h index 83da622f940fb..b389eb3acae98 100644 --- a/paddle/fluid/distributed/table/table.h +++ b/paddle/fluid/distributed/table/table.h @@ -93,9 +93,9 @@ class Table { return 0; } // only for graph table - virtual int32_t random_sample(uint64_t node_id, int sampe_size, char *&buffer, + virtual std::future random_sample(uint64_t node_id, int sampe_size, char *&buffer, int &actual_size) { - return 0; + return std::future(); } virtual int32_t pour() { return 0; } From 492e5511d13a82317a9932d24a99cd7e1d47a482 Mon Sep 17 00:00:00 2001 From: Weiyue Su Date: Thu, 11 Mar 2021 23:20:06 +0800 Subject: [PATCH 02/15] batch_sample_k --- .../distributed/service/graph_brpc_client.cc | 60 +++++++++++++++++++ .../distributed/service/graph_brpc_client.h | 3 + .../distributed/service/graph_py_service.h | 6 ++ paddle/fluid/distributed/service/ps_client.h | 9 +++ paddle/fluid/pybind/fleet_py.cc | 3 +- 5 files changed, 80 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index dc51da6cd15a0..da2e936819040 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -98,6 +98,66 @@ std::future GraphBrpcClient::sample(uint32_t table_id, return fut; } +std::future GraphBrpcClient::batch_sample(uint32_t table_id, + std::vector node_ids, int sample_size, + std::vector > &res) { + uint64_t node_id = node_ids[0]; + int server_index = get_server_index_by_id(node_id); + DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { + int ret = 0; + auto *closure = (DownpourBrpcClosure *)done; + if (closure->check_response(0, PS_GRAPH_SAMPLE) != 0) { + ret = -1; + } else { + VLOG(0) << "check sample response: " + << " " << closure->check_response(0, PS_GRAPH_SAMPLE); + auto &res_io_buffer = closure->cntl(0)->response_attachment(); + butil::IOBufBytesIterator io_buffer_itr(res_io_buffer); + size_t bytes_size = io_buffer_itr.bytes_left(); + char *buffer = new char[bytes_size]; + io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); + + size_t num_nodes = *(size_t *)buffer; + int *actual_sizes = (int *)(buffer + sizeof(size_t)); + char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * num_nodes; + + std::vector res_; + int offset = 0; + for (size_t idx = 0; idx < num_nodes; ++idx){ + int actual_size = actual_sizes[idx]; + int start = 0; + while (start < actual_size) { + GraphNode node; + node.recover_from_buffer(node_buffer + offset + start); + start += node.get_size(); + res_.push_back(node); + } + offset += actual_size; + res.push_back(res_); + } + } + closure->set_promise_value(ret); + }); + auto promise = std::make_shared>(); + closure->add_promise(promise); + std::future fut = promise->get_future(); + ; + closure->request(0)->set_cmd_id(PS_GRAPH_SAMPLE); + closure->request(0)->set_table_id(table_id); + closure->request(0)->set_client_id(_client_id); + // std::string type_str = GraphNode::node_type_to_string(type); + size_t num_nodes = node_ids.size(); + + closure->request(0)->add_params((char *)node_ids.data(), sizeof(uint64_t)*num_nodes); + closure->request(0)->add_params((char *)&sample_size, sizeof(int)); + PsService_Stub rpc_stub(get_cmd_channel(server_index)); + closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); + rpc_stub.service(closure->cntl(0), closure->request(0), closure->response(0), + closure); + + return fut; +} + std::future GraphBrpcClient::pull_graph_list( uint32_t table_id, int server_index, int start, int size, std::vector &res) { diff --git a/paddle/fluid/distributed/service/graph_brpc_client.h b/paddle/fluid/distributed/service/graph_brpc_client.h index 8e472b96be94d..d0653120e07ce 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.h +++ b/paddle/fluid/distributed/service/graph_brpc_client.h @@ -38,6 +38,9 @@ class GraphBrpcClient : public BrpcPsClient { virtual std::future sample(uint32_t table_id, uint64_t node_id, int sample_size, std::vector &res); + virtual std::future batch_sample(uint32_t table_id, std::vector node_ids, + int sample_size, + std::vector > &res); virtual std::future pull_graph_list(uint32_t table_id, int server_index, int start, int size, diff --git a/paddle/fluid/distributed/service/graph_py_service.h b/paddle/fluid/distributed/service/graph_py_service.h index be946dc44e57d..0975460003436 100644 --- a/paddle/fluid/distributed/service/graph_py_service.h +++ b/paddle/fluid/distributed/service/graph_py_service.h @@ -149,6 +149,12 @@ class GraphPyService { status.wait(); return v; } + std::vector > batch_sample_k(std::vector node_ids, int sample_size) { + std::vector > v; + auto status = worker_ptr->batch_sample(table_id, node_ids, sample_size, v); + status.wait(); + return v; + } std::vector pull_graph_list(int server_index, int start, int size) { std::vector res; diff --git a/paddle/fluid/distributed/service/ps_client.h b/paddle/fluid/distributed/service/ps_client.h index b6014b9aea139..2122743197052 100644 --- a/paddle/fluid/distributed/service/ps_client.h +++ b/paddle/fluid/distributed/service/ps_client.h @@ -164,6 +164,15 @@ class PSClient { promise.set_value(-1); return fut; } + virtual std::future batch_sample(uint32_t table_id, std::vector node_ids, + int sample_size, + std::vector > &res) { + LOG(FATAL) << "Did not implement"; + std::promise promise; + std::future fut = promise.get_future(); + promise.set_value(-1); + return fut; + } virtual std::future pull_graph_list(uint32_t table_id, int server_index, int start, int size, diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 68d8c7ca338fa..6b86da9664897 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -168,7 +168,8 @@ void BindGraphService(py::module* m) { .def("load_file", &GraphPyService::load_file) .def("set_up", &GraphPyService::set_up) .def("pull_graph_list", &GraphPyService::pull_graph_list) - .def("sample_k", &GraphPyService::sample_k); + .def("sample_k", &GraphPyService::sample_k) + .def("batch_sample_k", &GraphPyService::batch_sample_k); } } // end namespace pybind From 193203f421ec192a4abf83e9442429cb5bab96c9 Mon Sep 17 00:00:00 2001 From: Weiyue Su Date: Fri, 12 Mar 2021 00:40:47 +0800 Subject: [PATCH 03/15] fix num_nodes size --- .../fluid/distributed/service/graph_brpc_client.cc | 2 ++ .../fluid/distributed/service/graph_brpc_server.cc | 14 ++++++++------ .../fluid/distributed/table/common_graph_table.cc | 6 ++++-- .../fluid/distributed/table/common_graph_table.h | 4 +++- paddle/fluid/distributed/table/table.h | 8 ++++++-- 5 files changed, 23 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index da2e936819040..2156065caba38 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -123,8 +123,10 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, std::vector res_; int offset = 0; + std::cout << "num_nodes: " << num_nodes << std::endl; for (size_t idx = 0; idx < num_nodes; ++idx){ int actual_size = actual_sizes[idx]; + std::cout << "actual_size: " << actual_size << std::endl; int start = 0; while (start < actual_size) { GraphNode node; diff --git a/paddle/fluid/distributed/service/graph_brpc_server.cc b/paddle/fluid/distributed/service/graph_brpc_server.cc index 14748bce78664..bf926d7fbe52c 100644 --- a/paddle/fluid/distributed/service/graph_brpc_server.cc +++ b/paddle/fluid/distributed/service/graph_brpc_server.cc @@ -283,7 +283,7 @@ int32_t GraphBrpcService::graph_random_sample(Table *table, "graph_random_sample request requires at least 2 arguments"); return 0; } - size_t num_nodes = request.params(0).size(); + size_t num_nodes = request.params(0).size() / sizeof(uint64_t); uint64_t *node_data = (uint64_t *)(request.params(0).c_str()); int sample_size = *(uint64_t *)(request.params(1).c_str()); @@ -292,13 +292,15 @@ int32_t GraphBrpcService::graph_random_sample(Table *table, std::vector actual_sizes(num_nodes); for (size_t idx = 0; idx < num_nodes; ++idx){ - std::future task = table->random_sample(node_data[idx], sample_size, + //std::future task = table->random_sample(node_data[idx], sample_size, + //buffers[idx], actual_sizes[idx]); + table->random_sample(node_data[idx], sample_size, buffers[idx], actual_sizes[idx]); - tasks.push_back(&task); - } - for (size_t idx = 0; idx < num_nodes; ++idx){ - tasks[idx]->get(); + //tasks.push_back(&task); } + //for (size_t idx = 0; idx < num_nodes; ++idx){ + //tasks[idx]->get(); + //} cntl->response_attachment().append(&num_nodes, sizeof(size_t)); cntl->response_attachment().append(actual_sizes.data(), sizeof(int)*num_nodes); for (size_t idx = 0; idx < num_nodes; ++idx){ diff --git a/paddle/fluid/distributed/table/common_graph_table.cc b/paddle/fluid/distributed/table/common_graph_table.cc index 4d126c3818af3..9b9e9a88b84a7 100644 --- a/paddle/fluid/distributed/table/common_graph_table.cc +++ b/paddle/fluid/distributed/table/common_graph_table.cc @@ -140,7 +140,9 @@ GraphNode *GraphTable::find_node(uint64_t id) { uint32_t GraphTable::get_thread_pool_index(uint64_t node_id) { return node_id % shard_num_per_table % task_pool_size_; } -std::future GraphTable::random_sample(uint64_t node_id, int sample_size, +//std::future GraphTable::random_sample(uint64_t node_id, int sample_size, + //char *&buffer, int &actual_size) { +int32_t GraphTable::random_sample(uint64_t node_id, int sample_size, char *&buffer, int &actual_size) { return _shards_task_pool[get_thread_pool_index(node_id)] ->enqueue([&]() -> int { @@ -166,7 +168,7 @@ std::future GraphTable::random_sample(uint64_t node_id, int sample_size, } actual_size = total_size; return 0; - }); + }).get(); //.get(); // GraphNode *node = find_node(node_id, type); // if (node == NULL) { diff --git a/paddle/fluid/distributed/table/common_graph_table.h b/paddle/fluid/distributed/table/common_graph_table.h index 8a806f8278877..18fdfdb5d8f66 100644 --- a/paddle/fluid/distributed/table/common_graph_table.h +++ b/paddle/fluid/distributed/table/common_graph_table.h @@ -71,7 +71,9 @@ class GraphTable : public SparseTable { virtual ~GraphTable() {} virtual int32_t pull_graph_list(int start, int size, char *&buffer, int &actual_size); - virtual std::future random_sample(uint64_t node_id, int sampe_size, char *&buffer, + //virtual std::future random_sample(uint64_t node_id, int sampe_size, char *&buffer, + //int &actual_size); + int32_t random_sample(uint64_t node_id, int sampe_size, char *&buffer, int &actual_size); virtual int32_t initialize(); int32_t load(const std::string &path, const std::string ¶m); diff --git a/paddle/fluid/distributed/table/table.h b/paddle/fluid/distributed/table/table.h index b389eb3acae98..d9b6835a31f29 100644 --- a/paddle/fluid/distributed/table/table.h +++ b/paddle/fluid/distributed/table/table.h @@ -93,10 +93,14 @@ class Table { return 0; } // only for graph table - virtual std::future random_sample(uint64_t node_id, int sampe_size, char *&buffer, + virtual int32_t random_sample(uint64_t node_id, int sampe_size, char *&buffer, int &actual_size) { - return std::future(); + return 0; } + //virtual std::future random_sample(uint64_t node_id, int sampe_size, char *&buffer, + //int &actual_size) { + //return std::future(); + //} virtual int32_t pour() { return 0; } virtual void clear() = 0; From b55bf426b73f8dc7fa8c69e19d32d919518ef14c Mon Sep 17 00:00:00 2001 From: Weiyue Su Date: Fri, 12 Mar 2021 01:59:43 +0800 Subject: [PATCH 04/15] batch brpc --- .../distributed/service/graph_brpc_client.cc | 119 +++++++++++------- 1 file changed, 71 insertions(+), 48 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index 2156065caba38..1db5a30274dca 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -53,14 +53,14 @@ std::future GraphBrpcClient::sample(uint32_t table_id, char *buffer = new char[bytes_size]; io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); - size_t num_nodes = *(size_t *)buffer; + size_t node_num = *(size_t *)buffer; int *actual_sizes = (int *)(buffer + sizeof(size_t)); - char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * num_nodes; + char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * node_num; std::vector > ress; std::vector res_; int offset = 0; - for (size_t idx = 0; idx < num_nodes; ++idx){ + for (size_t idx = 0; idx < node_num; ++idx){ int actual_size = actual_sizes[idx]; int start = 0; while (start < actual_size) { @@ -86,9 +86,9 @@ std::future GraphBrpcClient::sample(uint32_t table_id, // std::string type_str = GraphNode::node_type_to_string(type); std::vector node_ids; node_ids.push_back(node_id); - size_t num_nodes = node_ids.size(); + size_t node_num = node_ids.size(); - closure->request(0)->add_params((char *)node_ids.data(), sizeof(uint64_t)*num_nodes); + closure->request(0)->add_params((char *)node_ids.data(), sizeof(uint64_t)*node_num); closure->request(0)->add_params((char *)&sample_size, sizeof(int)); PsService_Stub rpc_stub(get_cmd_channel(server_index)); closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); @@ -101,61 +101,84 @@ std::future GraphBrpcClient::sample(uint32_t table_id, std::future GraphBrpcClient::batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, std::vector > &res) { - uint64_t node_id = node_ids[0]; - int server_index = get_server_index_by_id(node_id); + + std::vector > node_id_buckets; + std::vector > query_idx_buckets; + std::vector request2server; + std::vector server2request(server_size, -1); + + for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx){ + int server_index = get_server_index_by_id(node_ids[query_idx]); + if(server2request[server_index] == -1){ + server2request[server_index] = request2server.size(); + request2server.push_back(server_index); + node_id_buckets.push_back(std::vector ()); + query_idx_buckets.push_back(std::vector ()); + } + int request_idx = server2request[server_index]; + node_id_buckets[request_idx].push_back(node_ids[query_idx]); + query_idx_buckets[request_idx].push_back(query_idx); + res.push_back(std::vector()); + } + size_t request_call_num = request2server.size(); + DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { int ret = 0; auto *closure = (DownpourBrpcClosure *)done; - if (closure->check_response(0, PS_GRAPH_SAMPLE) != 0) { - ret = -1; - } else { - VLOG(0) << "check sample response: " - << " " << closure->check_response(0, PS_GRAPH_SAMPLE); - auto &res_io_buffer = closure->cntl(0)->response_attachment(); - butil::IOBufBytesIterator io_buffer_itr(res_io_buffer); - size_t bytes_size = io_buffer_itr.bytes_left(); - char *buffer = new char[bytes_size]; - io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); + for (int request_idx = 0; request_idx < request_call_num; ++request_idx){ + if (closure->check_response(request_idx, PS_GRAPH_SAMPLE) != 0) { + ret = -1; + } else { + VLOG(0) << "check sample response: " + << " " << closure->check_response(request_idx, PS_GRAPH_SAMPLE); + auto &res_io_buffer = closure->cntl(request_idx)->response_attachment(); + butil::IOBufBytesIterator io_buffer_itr(res_io_buffer); + size_t bytes_size = io_buffer_itr.bytes_left(); + char *buffer = new char[bytes_size]; + io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); - size_t num_nodes = *(size_t *)buffer; - int *actual_sizes = (int *)(buffer + sizeof(size_t)); - char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * num_nodes; - - std::vector res_; - int offset = 0; - std::cout << "num_nodes: " << num_nodes << std::endl; - for (size_t idx = 0; idx < num_nodes; ++idx){ - int actual_size = actual_sizes[idx]; - std::cout << "actual_size: " << actual_size << std::endl; - int start = 0; - while (start < actual_size) { - GraphNode node; - node.recover_from_buffer(node_buffer + offset + start); - start += node.get_size(); - res_.push_back(node); + size_t node_num = *(size_t *)buffer; + int *actual_sizes = (int *)(buffer + sizeof(size_t)); + char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * node_num; + + //std::vector res_; + int offset = 0; + for (size_t node_idx = 0; node_idx < node_num; ++node_idx){ + int query_idx = query_idx_buckets[request_idx][node_idx]; + int actual_size = actual_sizes[node_idx]; + int start = 0; + while (start < actual_size) { + GraphNode node; + node.recover_from_buffer(node_buffer + offset + start); + start += node.get_size(); + res[query_idx].push_back(node); + } + offset += actual_size; } - offset += actual_size; - res.push_back(res_); } } closure->set_promise_value(ret); }); + auto promise = std::make_shared>(); closure->add_promise(promise); std::future fut = promise->get_future(); - ; - closure->request(0)->set_cmd_id(PS_GRAPH_SAMPLE); - closure->request(0)->set_table_id(table_id); - closure->request(0)->set_client_id(_client_id); - // std::string type_str = GraphNode::node_type_to_string(type); - size_t num_nodes = node_ids.size(); - - closure->request(0)->add_params((char *)node_ids.data(), sizeof(uint64_t)*num_nodes); - closure->request(0)->add_params((char *)&sample_size, sizeof(int)); - PsService_Stub rpc_stub(get_cmd_channel(server_index)); - closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); - rpc_stub.service(closure->cntl(0), closure->request(0), closure->response(0), - closure); + + for (int request_idx = 0; request_idx < request_call_num; ++request_idx){ + int server_index = request2server[request_idx]; + closure->request(request_idx)->set_cmd_id(PS_GRAPH_SAMPLE); + closure->request(request_idx)->set_table_id(table_id); + closure->request(request_idx)->set_client_id(_client_id); + // std::string type_str = GraphNode::node_type_to_string(type); + size_t node_num = node_id_buckets[request_idx].size(); + + closure->request(request_idx)->add_params((char *)node_id_buckets[request_idx].data(), sizeof(uint64_t)*node_num); + closure->request(request_idx)->add_params((char *)&sample_size, sizeof(int)); + PsService_Stub rpc_stub(get_cmd_channel(server_index)); + closure->cntl(request_idx)->set_log_id(butil::gettimeofday_ms()); + rpc_stub.service(closure->cntl(request_idx), closure->request(request_idx), closure->response(request_idx), + closure); + } return fut; } From 102ae95bdb324f3f573dc5dd4f5f9836a3db6268 Mon Sep 17 00:00:00 2001 From: Weiyue Su Date: Fri, 12 Mar 2021 03:24:41 +0800 Subject: [PATCH 05/15] batch brpc --- .../distributed/service/graph_brpc_client.cc | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index 1db5a30274dca..bb63487496f48 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -102,27 +102,31 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, std::vector > &res) { - std::vector > node_id_buckets; - std::vector > query_idx_buckets; std::vector request2server; std::vector server2request(server_size, -1); - for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx){ int server_index = get_server_index_by_id(node_ids[query_idx]); if(server2request[server_index] == -1){ server2request[server_index] = request2server.size(); request2server.push_back(server_index); - node_id_buckets.push_back(std::vector ()); - query_idx_buckets.push_back(std::vector ()); } + res.push_back(std::vector()); + } + size_t request_call_num = request2server.size(); + std::vector > node_id_buckets(request_call_num); + std::vector > query_idx_buckets(request_call_num); + for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx){ + int server_index = get_server_index_by_id(node_ids[query_idx]); int request_idx = server2request[server_index]; node_id_buckets[request_idx].push_back(node_ids[query_idx]); query_idx_buckets[request_idx].push_back(query_idx); - res.push_back(std::vector()); } - size_t request_call_num = request2server.size(); + + for (int request_idx = 0; request_idx < request_call_num; ++request_idx){ + + } - DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { + DownpourBrpcClosure *closure = new DownpourBrpcClosure(request_call_num, [&, node_id_buckets, query_idx_buckets, request_call_num](void *done) { int ret = 0; auto *closure = (DownpourBrpcClosure *)done; for (int request_idx = 0; request_idx < request_call_num; ++request_idx){ @@ -130,7 +134,7 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, ret = -1; } else { VLOG(0) << "check sample response: " - << " " << closure->check_response(request_idx, PS_GRAPH_SAMPLE); + << " " << closure->check_response(request_idx, PS_GRAPH_SAMPLE); auto &res_io_buffer = closure->cntl(request_idx)->response_attachment(); butil::IOBufBytesIterator io_buffer_itr(res_io_buffer); size_t bytes_size = io_buffer_itr.bytes_left(); @@ -141,10 +145,9 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, int *actual_sizes = (int *)(buffer + sizeof(size_t)); char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * node_num; - //std::vector res_; int offset = 0; for (size_t node_idx = 0; node_idx < node_num; ++node_idx){ - int query_idx = query_idx_buckets[request_idx][node_idx]; + int query_idx = query_idx_buckets.at(request_idx).at(node_idx); int actual_size = actual_sizes[node_idx]; int start = 0; while (start < actual_size) { From 3106b2a8bc0a1d63bb307ceda7a1adfe44c4a008 Mon Sep 17 00:00:00 2001 From: Weiyue Su Date: Fri, 12 Mar 2021 03:34:33 +0800 Subject: [PATCH 06/15] add test --- paddle/fluid/distributed/service/graph_brpc_client.cc | 7 +------ paddle/fluid/distributed/test/graph_node_test.cc | 3 +++ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index bb63487496f48..a5134c8614d47 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -57,8 +57,6 @@ std::future GraphBrpcClient::sample(uint32_t table_id, int *actual_sizes = (int *)(buffer + sizeof(size_t)); char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * node_num; - std::vector > ress; - std::vector res_; int offset = 0; for (size_t idx = 0; idx < node_num; ++idx){ int actual_size = actual_sizes[idx]; @@ -67,19 +65,16 @@ std::future GraphBrpcClient::sample(uint32_t table_id, GraphNode node; node.recover_from_buffer(node_buffer + offset + start); start += node.get_size(); - res_.push_back(node); + res.push_back(node); } offset += actual_size; - ress.push_back(res_); } - res = ress[0]; } closure->set_promise_value(ret); }); auto promise = std::make_shared>(); closure->add_promise(promise); std::future fut = promise->get_future(); - ; closure->request(0)->set_cmd_id(PS_GRAPH_SAMPLE); closure->request(0)->set_table_id(table_id); closure->request(0)->set_client_id(_client_id); diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index 2aa3ff5599068..5c3e62ae9d427 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -211,6 +211,7 @@ void RunBrpcPushSparse() { pull_status.wait(); std::vector v; + std::vector > vs; pull_status = worker_ptr_->sample(0, 37, 4, v); pull_status.wait(); // for (auto g : v) { @@ -253,6 +254,8 @@ void RunBrpcPushSparse() { v.clear(); v = gps2.sample_k(96, 4); ASSERT_EQ(v.size(), 3); + vs = gps2.batch_sample_k([96, 37], 4); + ASSERT_EQ(vs.size(), 2); // to test in python,try this: // from paddle.fluid.core import GraphPyService // ips_str = "127.0.0.1:4211;127.0.0.1:4212" From 51ed7d8791e5d41fe1eab55cf465f782a20a1cb1 Mon Sep 17 00:00:00 2001 From: Weiyue Su Date: Fri, 12 Mar 2021 03:39:58 +0800 Subject: [PATCH 07/15] add test --- paddle/fluid/distributed/test/graph_node_test.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index 5c3e62ae9d427..e522af9910ec5 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -254,7 +254,10 @@ void RunBrpcPushSparse() { v.clear(); v = gps2.sample_k(96, 4); ASSERT_EQ(v.size(), 3); - vs = gps2.batch_sample_k([96, 37], 4); + std::vector node_ids; + node_ids.push_back(96); + node_ids.push_back(37); + vs = gps2.batch_sample_k(node_ids, 4); ASSERT_EQ(vs.size(), 2); // to test in python,try this: // from paddle.fluid.core import GraphPyService From db6ee862c56c9af33ad01ae720f6e114aa3e1ba9 Mon Sep 17 00:00:00 2001 From: suweiyue Date: Mon, 15 Mar 2021 14:48:39 +0800 Subject: [PATCH 08/15] merge pair type --- .../distributed/service/graph_brpc_client.cc | 24 +-- .../distributed/service/graph_brpc_client.h | 7 +- .../distributed/service/graph_py_service.cc | 13 +- .../distributed/service/graph_py_service.h | 93 ++++++++---- paddle/fluid/distributed/service/ps_client.h | 6 +- .../distributed/table/common_graph_table.cc | 142 ++++++++---------- paddle/fluid/distributed/table/graph_node.cc | 3 +- paddle/fluid/distributed/table/graph_node.h | 10 +- .../distributed/table/weighted_sampler.cc | 15 +- .../distributed/table/weighted_sampler.h | 19 +-- .../fluid/distributed/test/graph_node_test.cc | 65 +++++--- paddle/fluid/pybind/fleet_py.cc | 3 +- 12 files changed, 223 insertions(+), 177 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index a5134c8614d47..616d310f07fbc 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "paddle/fluid/distributed/service/graph_brpc_client.h" #include #include #include #include +#include #include #include "Eigen/Dense" - #include "paddle/fluid/distributed/service/brpc_ps_client.h" -#include "paddle/fluid/distributed/service/graph_brpc_client.h" #include "paddle/fluid/distributed/table/table.h" #include "paddle/fluid/framework/archive.h" #include "paddle/fluid/string/string_helper.h" @@ -35,9 +35,9 @@ int GraphBrpcClient::get_server_index_by_id(uint64_t id) { return id % shard_num / shard_per_server; } // char* &buffer,int &actual_size -std::future GraphBrpcClient::sample(uint32_t table_id, - uint64_t node_id, int sample_size, - std::vector &res) { +std::future GraphBrpcClient::sample( + uint32_t table_id, uint64_t node_id, int sample_size, + std::vector> &res) { int server_index = get_server_index_by_id(node_id); DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { int ret = 0; @@ -45,8 +45,6 @@ std::future GraphBrpcClient::sample(uint32_t table_id, if (closure->check_response(0, PS_GRAPH_SAMPLE) != 0) { ret = -1; } else { - VLOG(0) << "check sample response: " - << " " << closure->check_response(0, PS_GRAPH_SAMPLE); auto &res_io_buffer = closure->cntl(0)->response_attachment(); butil::IOBufBytesIterator io_buffer_itr(res_io_buffer); size_t bytes_size = io_buffer_itr.bytes_left(); @@ -62,10 +60,13 @@ std::future GraphBrpcClient::sample(uint32_t table_id, int actual_size = actual_sizes[idx]; int start = 0; while (start < actual_size) { - GraphNode node; - node.recover_from_buffer(node_buffer + offset + start); - start += node.get_size(); - res.push_back(node); + //GraphNode node; + //node.recover_from_buffer(node_buffer + offset + start); + //start += node.get_size(); + //res.push_back(node); + res.push_back({*(uint64_t *)(node_buffer + offset + start), + *(float *)(node_buffer + offset + start + GraphNode::id_size)}); + start += GraphNode::id_size + GraphNode::weight_size; } offset += actual_size; } @@ -84,6 +85,7 @@ std::future GraphBrpcClient::sample(uint32_t table_id, size_t node_num = node_ids.size(); closure->request(0)->add_params((char *)node_ids.data(), sizeof(uint64_t)*node_num); + //closure->request(0)->add_params((char *)&node_id, sizeof(uint64_t)); closure->request(0)->add_params((char *)&sample_size, sizeof(int)); PsService_Stub rpc_stub(get_cmd_channel(server_index)); closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); diff --git a/paddle/fluid/distributed/service/graph_brpc_client.h b/paddle/fluid/distributed/service/graph_brpc_client.h index d0653120e07ce..4778d5261219f 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.h +++ b/paddle/fluid/distributed/service/graph_brpc_client.h @@ -18,6 +18,7 @@ #include #include +#include #include "brpc/channel.h" #include "brpc/controller.h" #include "brpc/server.h" @@ -35,9 +36,9 @@ class GraphBrpcClient : public BrpcPsClient { public: GraphBrpcClient() {} virtual ~GraphBrpcClient() {} - virtual std::future sample(uint32_t table_id, uint64_t node_id, - int sample_size, - std::vector &res); + virtual std::future sample( + uint32_t table_id, uint64_t node_id, int sample_size, + std::vector> &res); virtual std::future batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, std::vector > &res); diff --git a/paddle/fluid/distributed/service/graph_py_service.cc b/paddle/fluid/distributed/service/graph_py_service.cc index 86d2d54193bd9..04c04930e43e8 100644 --- a/paddle/fluid/distributed/service/graph_py_service.cc +++ b/paddle/fluid/distributed/service/graph_py_service.cc @@ -27,11 +27,16 @@ std::vector GraphPyService::split(std::string &str, } void GraphPyService::set_up(std::string ips_str, int shard_num, int rank, - int client_id, uint32_t table_id) { + int client_id, std::vector edge_types) { set_shard_num(shard_num); set_client_Id(client_id); set_rank(rank); - this->table_id = table_id; + + this -> table_id_map[std::string("")] = 0; + // Table 0 are for nodes + for(size_t table_id = 0; table_id < edge_types.size(); table_id ++ ) { + this -> table_id_map[edge_types[table_id]] = int(table_id + 1); + } server_thread = client_thread = NULL; std::istringstream stream(ips_str); std::string ip; @@ -47,10 +52,10 @@ void GraphPyService::set_up(std::string ips_str, int shard_num, int rank, host_sign_list.push_back(ph_host.serialize_to_string()); index++; } - VLOG(0) << "IN set up rank = " << rank; + //VLOG(0) << "IN set up rank = " << rank; start_client(); start_server(server_list[rank], std::stoul(port_list[rank])); sleep(1); } } -} \ No newline at end of file +} diff --git a/paddle/fluid/distributed/service/graph_py_service.h b/paddle/fluid/distributed/service/graph_py_service.h index 0975460003436..e4f049f2eb26a 100644 --- a/paddle/fluid/distributed/service/graph_py_service.h +++ b/paddle/fluid/distributed/service/graph_py_service.h @@ -21,6 +21,7 @@ #include #include #include // NOLINT +#include #include #include "google/protobuf/text_format.h" @@ -46,7 +47,7 @@ class GraphPyService { std::vector keys; std::vector server_list, port_list, host_sign_list; int server_size, shard_num, rank, client_id; - uint32_t table_id; + std::unordered_map table_id_map; std::thread *server_thread, *client_thread; std::shared_ptr pserver_ptr; @@ -67,7 +68,8 @@ class GraphPyService { int get_shard_num() { return shard_num; } void set_shard_num(int shard_num) { this->shard_num = shard_num; } void GetDownpourSparseTableProto( - ::paddle::distributed::TableParameter* sparse_table_proto) { + ::paddle::distributed::TableParameter* sparse_table_proto, + uint32_t table_id) { sparse_table_proto->set_table_id(table_id); sparse_table_proto->set_table_class("GraphTable"); sparse_table_proto->set_shard_num(shard_num); @@ -96,9 +98,12 @@ class GraphPyService { server_service_proto->set_start_server_port(0); server_service_proto->set_server_thread_num(12); - ::paddle::distributed::TableParameter* sparse_table_proto = - downpour_server_proto->add_downpour_table_param(); - GetDownpourSparseTableProto(sparse_table_proto); + for (auto& tuple : this->table_id_map) { + ::paddle::distributed::TableParameter* sparse_table_proto = + downpour_server_proto->add_downpour_table_param(); + GetDownpourSparseTableProto(sparse_table_proto, tuple.second); + } + return server_fleet_desc; } @@ -111,9 +116,11 @@ class GraphPyService { ::paddle::distributed::DownpourWorkerParameter* downpour_worker_proto = worker_proto->mutable_downpour_worker_param(); - ::paddle::distributed::TableParameter* worker_sparse_table_proto = - downpour_worker_proto->add_downpour_table_param(); - GetDownpourSparseTableProto(worker_sparse_table_proto); + for (auto& tuple : this->table_id_map) { + ::paddle::distributed::TableParameter* worker_sparse_table_proto = + downpour_worker_proto->add_downpour_table_param(); + GetDownpourSparseTableProto(worker_sparse_table_proto, tuple.second); + } ::paddle::distributed::ServerParameter* server_proto = worker_fleet_desc.mutable_server_param(); @@ -127,9 +134,11 @@ class GraphPyService { server_service_proto->set_start_server_port(0); server_service_proto->set_server_thread_num(12); - ::paddle::distributed::TableParameter* server_sparse_table_proto = - downpour_server_proto->add_downpour_table_param(); - GetDownpourSparseTableProto(server_sparse_table_proto); + for (auto& tuple : this->table_id_map) { + ::paddle::distributed::TableParameter* sparse_table_proto = + downpour_server_proto->add_downpour_table_param(); + GetDownpourSparseTableProto(sparse_table_proto, tuple.second); + } return worker_fleet_desc; } @@ -137,30 +146,58 @@ class GraphPyService { int get_server_size(int server_size) { return server_size; } std::vector split(std::string& str, const char pattern); - void load_file(std::string filepath) { - auto status = - get_ps_client()->load(table_id, std::string(filepath), std::string("")); - status.wait(); + void load_edge_file(std::string name, std::string filepath, bool reverse) { + std::string params = "edge"; + if (reverse) { + params += "|reverse"; + } + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = + get_ps_client()->load(table_id, std::string(filepath), params); + status.wait(); + } + } + + void load_node_file(std::string name, std::string filepath) { + std::string params = "node"; + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = + get_ps_client()->load(table_id, std::string(filepath), params); + status.wait(); + } } - std::vector sample_k(uint64_t node_id, int sample_size) { - std::vector v; - auto status = worker_ptr->sample(table_id, node_id, sample_size, v); - status.wait(); + std::vector> sample_k(std::string name, + uint64_t node_id, + int sample_size) { + std::vector> v; + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = worker_ptr->sample(table_id, node_id, sample_size, v); + status.wait(); + } return v; } - std::vector > batch_sample_k(std::vector node_ids, int sample_size) { + std::vector > batch_sample_k(std::string name, std::vector node_ids, int sample_size) { std::vector > v; - auto status = worker_ptr->batch_sample(table_id, node_ids, sample_size, v); - status.wait(); + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = worker_ptr->batch_sample(table_id, node_ids, sample_size, v); + status.wait(); + } return v; } - std::vector pull_graph_list(int server_index, int start, - int size) { + std::vector pull_graph_list(std::string name, int server_index, + int start, int size) { std::vector res; - auto status = - worker_ptr->pull_graph_list(table_id, server_index, start, size, res); - status.wait(); + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = + worker_ptr->pull_graph_list(table_id, server_index, start, size, res); + status.wait(); + } return res; } void start_server(std::string ip, uint32_t port) { @@ -203,7 +240,7 @@ class GraphPyService { worker_ptr->configure(worker_proto, dense_regions, _ps_env, client_id); } void set_up(std::string ips_str, int shard_num, int rank, int client_id, - uint32_t table_id); + std::vector edge_types); void set_keys(std::vector keys) { // just for test this->keys = keys; } diff --git a/paddle/fluid/distributed/service/ps_client.h b/paddle/fluid/distributed/service/ps_client.h index 2122743197052..78ef708ffe8b4 100644 --- a/paddle/fluid/distributed/service/ps_client.h +++ b/paddle/fluid/distributed/service/ps_client.h @@ -155,9 +155,9 @@ class PSClient { promise.set_value(-1); return fut; } - virtual std::future sample(uint32_t table_id, uint64_t node_id, - int sample_size, - std::vector &res) { + virtual std::future sample( + uint32_t table_id, uint64_t node_id, int sample_size, + std::vector> &res) { LOG(FATAL) << "Did not implement"; std::promise promise; std::future fut = promise.get_future(); diff --git a/paddle/fluid/distributed/table/common_graph_table.cc b/paddle/fluid/distributed/table/common_graph_table.cc index 9b9e9a88b84a7..4910ba125355a 100644 --- a/paddle/fluid/distributed/table/common_graph_table.cc +++ b/paddle/fluid/distributed/table/common_graph_table.cc @@ -13,14 +13,18 @@ // limitations under the License. #include "paddle/fluid/distributed/table/common_graph_table.h" +#include #include +#include #include #include "paddle/fluid/distributed/common/utils.h" #include "paddle/fluid/string/printf.h" #include "paddle/fluid/string/string_helper.h" namespace paddle { namespace distributed { + int GraphShard::bucket_low_bound = 11; + std::vector GraphShard::get_batch(int start, int total_size) { if (start < 0) start = 0; int size = 0, cur_size; @@ -51,6 +55,7 @@ std::vector GraphShard::get_batch(int start, int total_size) { } return res; } + size_t GraphShard::get_size() { size_t res = 0; for (int i = 0; i < bucket_size; i++) { @@ -58,69 +63,79 @@ size_t GraphShard::get_size() { } return res; } + std::list::iterator GraphShard::add_node(GraphNode *node) { if (node_location.find(node->get_id()) != node_location.end()) return node_location.find(node->get_id())->second; + int index = node->get_id() % shard_num % bucket_size; + std::list::iterator iter = bucket[index].insert(bucket[index].end(), node); + node_location[node->get_id()] = iter; return iter; } + void GraphShard::add_neighboor(uint64_t id, GraphEdge *edge) { (*add_node(new GraphNode(id, std::string(""))))->add_edge(edge); } + GraphNode *GraphShard::find_node(uint64_t id) { if (node_location.find(id) == node_location.end()) return NULL; return *(node_location[id]); } + int32_t GraphTable::load(const std::string &path, const std::string ¶m) { + auto cmd = paddle::string::split_string(param, "|"); + std::set cmd_set(cmd.begin(), cmd.end()); + bool load_edge = cmd_set.count(std::string("edge")); + bool reverse_edge = cmd_set.count(std::string("reverse")); + VLOG(0) << "Reverse Edge " << reverse_edge; + auto paths = paddle::string::split_string(path, ";"); VLOG(0) << paths.size(); + int count = 0; + for (auto path : paths) { std::ifstream file(path); std::string line; while (std::getline(file, line)) { auto values = paddle::string::split_string(line, "\t"); + count++; if (values.size() < 2) continue; - auto id = std::stoull(values[0]); - size_t shard_id = id % shard_num; - if (shard_id >= shard_end || shard_id < shard_start) { - VLOG(0) << "will not load " << id << " from " << path + auto src_id = std::stoull(values[0]); + auto dst_id = std::stoull(values[1]); + if (reverse_edge) { + std::swap(src_id, dst_id); + } + float weight = 0; + if (values.size() == 3) { + weight = std::stof(values[2]); + } + size_t src_shard_id = src_id % shard_num; + + if (src_shard_id >= shard_end || src_shard_id < shard_start) { + VLOG(0) << "will not load " << src_id << " from " << path << ", please check id distribution"; continue; } - size_t index = shard_id - shard_start; - // GraphNodeType type = GraphNode::get_graph_node_type(values[1]); - // VLOG(0)<<"shards's size = "<get_id(); - for (size_t i = 2; i < values.size(); i++) { - auto edge_arr = - paddle::string::split_string(values[i], ";"); - if (edge_arr.size() == 2) { - // VLOG(0)<<"edge content "<::iterator iter = bucket[i].begin(); - while (iter != bucket[i].end()) { - auto node = *iter; - node->build_sampler(); - iter++; - } + } + VLOG(0) << "Load Finished Total Edge Count " << count; + + // Build Sampler j + for (auto &shard : shards) { + auto bucket = shard.get_bucket(); + for (int i = 0; i < bucket.size(); i++) { + std::list::iterator iter = bucket[i].begin(); + while (iter != bucket[i].end()) { + auto node = *iter; + node->build_sampler(); + iter++; } } } @@ -132,8 +147,6 @@ GraphNode *GraphTable::find_node(uint64_t id) { return NULL; } size_t index = shard_id - shard_start; - // VLOG(0)<<"try to find node-id "<enqueue([&]() -> int { + GraphNode *node = find_node(node_id); if (node == NULL) { actual_size = 0; @@ -153,48 +167,22 @@ int32_t GraphTable::random_sample(uint64_t node_id, int sample_size, } std::vector res = node->sample_k(sample_size); std::vector node_list; - int total_size = 0; - for (auto x : res) { - GraphNode temp; - temp.set_id(x->id); - total_size += temp.get_size(); - node_list.push_back(temp); - } - buffer = new char[total_size]; - int index = 0; - for (auto x : node_list) { - x.to_buffer(buffer + index); - index += x.get_size(); + actual_size = + res.size() * (GraphNode::id_size + GraphNode::weight_size); + buffer = new char[actual_size]; + int offset = 0; + uint64_t id; + float weight; + for (auto &x : res) { + id = x->get_id(); + weight = x->get_weight(); + memcpy(buffer + offset, &id, GraphNode::id_size); + offset += GraphNode::id_size; + memcpy(buffer + offset, &weight, GraphNode::weight_size); + offset += GraphNode::weight_size; } - actual_size = total_size; - return 0; - }).get(); - //.get(); - // GraphNode *node = find_node(node_id, type); - // if (node == NULL) { - // actual_size = 0; - // rwlock_->UNLock(); - // return 0; - // } - // std::vector res = node->sample_k(sample_size); - // std::vector node_list; - // int total_size = 0; - // for (auto x : res) { - // GraphNode temp; - // temp.set_id(x->id); - // temp.set_graph_node_type(x->type); - // total_size += temp.get_size(); - // node_list.push_back(temp); - // } - // buffer = new char[total_size]; - // int index = 0; - // for (auto x : node_list) { - // x.to_buffer(buffer + index); - // index += x.get_size(); - // } - // actual_size = total_size; - // rwlock_->UNLock(); - // return 0; + }) + .get(); } int32_t GraphTable::pull_graph_list(int start, int total_size, char *&buffer, int &actual_size) { diff --git a/paddle/fluid/distributed/table/graph_node.cc b/paddle/fluid/distributed/table/graph_node.cc index 78a586d507ef4..c63fff8883636 100644 --- a/paddle/fluid/distributed/table/graph_node.cc +++ b/paddle/fluid/distributed/table/graph_node.cc @@ -16,9 +16,8 @@ #include namespace paddle { namespace distributed { -int GraphNode::enum_size = sizeof(int); +int GraphNode::weight_size = sizeof(float); int GraphNode::id_size = sizeof(uint64_t); -int GraphNode::double_size = sizeof(double); int GraphNode::int_size = sizeof(int); int GraphNode::get_size() { return feature.size() + id_size + int_size; } void GraphNode::build_sampler() { diff --git a/paddle/fluid/distributed/table/graph_node.h b/paddle/fluid/distributed/table/graph_node.h index 218d14e01edc1..a8fe5eca3e824 100644 --- a/paddle/fluid/distributed/table/graph_node.h +++ b/paddle/fluid/distributed/table/graph_node.h @@ -20,11 +20,13 @@ namespace distributed { // enum GraphNodeType { user = 0, item = 1, query = 2, unknown = 3 }; class GraphEdge : public WeightedObject { public: - double weight; - uint64_t id; // GraphNodeType type; GraphEdge() {} - GraphEdge(uint64_t id, double weight) : weight(weight), id(id) {} + GraphEdge(uint64_t id, float weight) : id(id), weight(weight) {} + uint64_t get_id() { return id; } + float get_weight() { return weight; } + uint64_t id; + float weight; }; class GraphNode { public: @@ -35,7 +37,7 @@ class GraphNode { : id(id), feature(feature), sampler(NULL) {} virtual ~GraphNode() {} std::vector get_graph_edge() { return edges; } - static int enum_size, id_size, int_size, double_size; + static int id_size, int_size, weight_size; uint64_t get_id() { return id; } void set_id(uint64_t id) { this->id = id; } // GraphNodeType get_graph_node_type() { return type; } diff --git a/paddle/fluid/distributed/table/weighted_sampler.cc b/paddle/fluid/distributed/table/weighted_sampler.cc index c93bc551f54f3..09ecdc2b642e4 100644 --- a/paddle/fluid/distributed/table/weighted_sampler.cc +++ b/paddle/fluid/distributed/table/weighted_sampler.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/distributed/table/weighted_sampler.h" +#include namespace paddle { namespace distributed { void WeightedSampler::build(WeightedObject **v, int start, int end) { @@ -37,11 +38,11 @@ std::vector WeightedSampler::sample_k(int k) { k = count; } std::vector sample_result; - double subtract; - std::unordered_map subtract_weight_map; + float subtract; + std::unordered_map subtract_weight_map; std::unordered_map subtract_count_map; while (k--) { - double query_weight = rand() % 100000 / 100000.0; + float query_weight = rand() % 100000 / 100000.0; query_weight *= weight - subtract_weight_map[this]; sample_result.push_back(sample(query_weight, subtract_weight_map, subtract_count_map, subtract)); @@ -49,10 +50,10 @@ std::vector WeightedSampler::sample_k(int k) { return sample_result; } WeightedObject *WeightedSampler::sample( - double query_weight, - std::unordered_map &subtract_weight_map, + float query_weight, + std::unordered_map &subtract_weight_map, std::unordered_map &subtract_count_map, - double &subtract) { + float &subtract) { if (left == NULL) { subtract_weight_map[this] = weight; subtract = weight; @@ -61,7 +62,7 @@ WeightedObject *WeightedSampler::sample( } int left_count = left->count - subtract_count_map[left]; int right_count = right->count - subtract_count_map[right]; - double left_subtract = subtract_weight_map[left]; + float left_subtract = subtract_weight_map[left]; WeightedObject *return_id; if (right_count == 0 || left_count > 0 && left->weight - left_subtract >= query_weight) { diff --git a/paddle/fluid/distributed/table/weighted_sampler.h b/paddle/fluid/distributed/table/weighted_sampler.h index 53bfaa8d30119..9ed2cc04649de 100644 --- a/paddle/fluid/distributed/table/weighted_sampler.h +++ b/paddle/fluid/distributed/table/weighted_sampler.h @@ -22,15 +22,8 @@ class WeightedObject { public: WeightedObject() {} virtual ~WeightedObject() {} - virtual unsigned long long get_id() { return id; } - virtual double get_weight() { return weight; } - - virtual void set_id(unsigned long long id) { this->id = id; } - virtual void set_weight(double weight) { this->weight = weight; } - - private: - unsigned long long id; - double weight; + virtual uint64_t get_id() = 0; + virtual float get_weight() = 0; }; class WeightedSampler { @@ -38,16 +31,16 @@ class WeightedSampler { WeightedSampler *left, *right; WeightedObject *object; int count; - double weight; + float weight; void build(WeightedObject **v, int start, int end); std::vector sample_k(int k); private: WeightedObject *sample( - double query_weight, - std::unordered_map &subtract_weight_map, + float query_weight, + std::unordered_map &subtract_weight_map, std::unordered_map &subtract_count_map, - double &subtract); + float &subtract); }; } } diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index e522af9910ec5..8921d35bb8ccb 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -18,6 +18,7 @@ limitations under the License. */ #include #include #include // NOLINT +#include #include #include "google/protobuf/text_format.h" @@ -49,10 +50,18 @@ namespace memory = paddle::memory; namespace distributed = paddle::distributed; void testGraphToBuffer(); -std::string nodes[] = {std::string("37\taa\t45;0.34\t145;0.31\t112;0.21"), - std::string("96\tfeature\t48;1.4\t247;0.31\t111;1.21"), - std::string("59\ttreat\t45;0.34\t145;0.31\t112;0.21"), - std::string("97\tfood\t48;1.4\t247;0.31\t111;1.21")}; +// std::string nodes[] = {std::string("37\taa\t45;0.34\t145;0.31\t112;0.21"), +// std::string("96\tfeature\t48;1.4\t247;0.31\t111;1.21"), +// std::string("59\ttreat\t45;0.34\t145;0.31\t112;0.21"), +// std::string("97\tfood\t48;1.4\t247;0.31\t111;1.21")}; + +std::string nodes[] = { + std::string("37\t45\t0.34"), std::string("37\t145\t0.31"), + std::string("37\t112\t0.21"), std::string("96\t48\t1.4"), + std::string("96\t247\t0.31"), std::string("96\t111\t1.21"), + std::string("59\t45\t0.34"), std::string("59\t145\t0.31"), + std::string("59\t122\t0.21"), std::string("97\t48\t0.34"), + std::string("97\t247\t0.31"), std::string("97\t111\t0.21")}; char file_name[] = "nodes.txt"; void prepare_file(char file_name[]) { std::ofstream ofile; @@ -210,8 +219,8 @@ void RunBrpcPushSparse() { worker_ptr_->load(0, std::string(file_name), std::string("")); pull_status.wait(); - std::vector v; std::vector > vs; + std::vector> v; pull_status = worker_ptr_->sample(0, 37, 4, v); pull_status.wait(); // for (auto g : v) { @@ -221,43 +230,51 @@ void RunBrpcPushSparse() { v.clear(); pull_status = worker_ptr_->sample(0, 96, 4, v); pull_status.wait(); + std::unordered_set s = { 111, 48, 247 }; + ASSERT_EQ(3, v.size()); for (auto g : v) { - std::cout << g.get_id() << std::endl; + // std::cout << g.first << std::endl; + ASSERT_EQ(true, s.find(g.first) != s.end()); } - // ASSERT_EQ(v.size(),3); v.clear(); - pull_status = worker_ptr_->pull_graph_list(0, 0, 0, 1, v); + std::vector nodes; + pull_status = worker_ptr_->pull_graph_list(0, 0, 0, 1, nodes); pull_status.wait(); - ASSERT_EQ(v.size(), 1); - ASSERT_EQ(v[0].get_id(), 37); + ASSERT_EQ(nodes.size(), 1); + ASSERT_EQ(nodes[0].get_id(), 37); // for (auto g : v) { // std::cout << g.get_id() << " " << g.get_graph_node_type() << std::endl; // } // ASSERT_EQ(v.size(),1); - v.clear(); - pull_status = worker_ptr_->pull_graph_list(0, 0, 1, 4, v); + nodes.clear(); + pull_status = worker_ptr_->pull_graph_list(0, 0, 1, 4, nodes); pull_status.wait(); - ASSERT_EQ(v.size(), 1); - ASSERT_EQ(v[0].get_id(), 59); - for (auto g : v) { + ASSERT_EQ(nodes.size(), 1); + ASSERT_EQ(nodes[0].get_id(), 59); + for (auto g : nodes) { std::cout << g.get_id() << std::endl; } distributed::GraphPyService gps1, gps2; std::string ips_str = "127.0.0.1:4211;127.0.0.1:4212"; - gps1.set_up(ips_str, 127, 0, 0, 0); - gps2.set_up(ips_str, 127, 1, 1, 0); - gps1.load_file(std::string(file_name)); - v.clear(); - v = gps2.pull_graph_list(0, 1, 4); - ASSERT_EQ(v[0].get_id(), 59); - v.clear(); - v = gps2.sample_k(96, 4); + std::vector edge_types = {std::string("user2item")}; + gps1.set_up(ips_str, 127, 0, 0, edge_types); + gps2.set_up(ips_str, 127, 1, 1, edge_types); + gps1.load_edge_file(std::string("user2item"), std::string(file_name), 0); + nodes.clear(); + nodes = gps2.pull_graph_list(std::string("user2item"), 0, 1, 4); + ASSERT_EQ(nodes[0].get_id(), 59); + nodes.clear(); + v = gps2.sample_k(std::string("user2item"), 96, 4); ASSERT_EQ(v.size(), 3); + std::cout << "sample result" << std::endl; + for (auto p : v) { + std::cout << p.first << " " << p.second << std::endl; + } std::vector node_ids; node_ids.push_back(96); node_ids.push_back(37); - vs = gps2.batch_sample_k(node_ids, 4); + vs = gps2.batch_sample_k(std::string("user2item"), node_ids, 4); ASSERT_EQ(vs.size(), 2); // to test in python,try this: // from paddle.fluid.core import GraphPyService diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 6b86da9664897..ef0535293e438 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -165,7 +165,8 @@ void BindGraphNode(py::module* m) { void BindGraphService(py::module* m) { py::class_(*m, "GraphPyService") .def(py::init<>()) - .def("load_file", &GraphPyService::load_file) + .def("load_edge_file", &GraphPyService::load_edge_file) + .def("load_node_file", &GraphPyService::load_node_file) .def("set_up", &GraphPyService::set_up) .def("pull_graph_list", &GraphPyService::pull_graph_list) .def("sample_k", &GraphPyService::sample_k) From abb0ee0407312d39b7022367ddbeda06c8041b28 Mon Sep 17 00:00:00 2001 From: suweiyue Date: Mon, 15 Mar 2021 15:48:54 +0800 Subject: [PATCH 09/15] fix --- .../distributed/service/graph_brpc_client.cc | 16 ++++++++++------ .../distributed/service/graph_brpc_client.h | 3 ++- .../distributed/service/graph_py_service.cc | 10 ++++++++++ .../fluid/distributed/service/graph_py_service.h | 2 ++ paddle/fluid/distributed/service/ps_client.h | 2 +- .../fluid/distributed/table/common_graph_table.h | 2 +- paddle/fluid/distributed/test/graph_node_test.cc | 4 ++-- paddle/fluid/pybind/fleet_py.cc | 9 ++------- 8 files changed, 30 insertions(+), 18 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index 616d310f07fbc..1c9583e440bb9 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -97,7 +97,7 @@ std::future GraphBrpcClient::sample( std::future GraphBrpcClient::batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, - std::vector > &res) { + std::vector > > &res) { std::vector request2server; std::vector server2request(server_size, -1); @@ -107,7 +107,8 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, server2request[server_index] = request2server.size(); request2server.push_back(server_index); } - res.push_back(std::vector()); + //res.push_back(std::vector()); + res.push_back(std::vector>()); } size_t request_call_num = request2server.size(); std::vector > node_id_buckets(request_call_num); @@ -148,10 +149,13 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, int actual_size = actual_sizes[node_idx]; int start = 0; while (start < actual_size) { - GraphNode node; - node.recover_from_buffer(node_buffer + offset + start); - start += node.get_size(); - res[query_idx].push_back(node); + //GraphNode node; + //node.recover_from_buffer(node_buffer + offset + start); + //start += node.get_size(); + //res[query_idx].push_back(node); + res[query_idx].push_back({*(uint64_t *)(node_buffer + offset + start), + *(float *)(node_buffer + offset + start + GraphNode::id_size)}); + start += GraphNode::id_size + GraphNode::weight_size; } offset += actual_size; } diff --git a/paddle/fluid/distributed/service/graph_brpc_client.h b/paddle/fluid/distributed/service/graph_brpc_client.h index 4778d5261219f..9bc7505a7009d 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.h +++ b/paddle/fluid/distributed/service/graph_brpc_client.h @@ -41,7 +41,8 @@ class GraphBrpcClient : public BrpcPsClient { std::vector> &res); virtual std::future batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, - std::vector > &res); + std::vector>> &res); + //std::vector > &res); virtual std::future pull_graph_list(uint32_t table_id, int server_index, int start, int size, diff --git a/paddle/fluid/distributed/service/graph_py_service.cc b/paddle/fluid/distributed/service/graph_py_service.cc index 2df3843c96afe..64a572677962a 100644 --- a/paddle/fluid/distributed/service/graph_py_service.cc +++ b/paddle/fluid/distributed/service/graph_py_service.cc @@ -193,6 +193,16 @@ std::vector> GraphPyClient::sample_k( } return v; } +std::vector > > GraphPyClient::batch_sample_k( + std::string name, std::vector node_ids, int sample_size) { + std::vector > > v; + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = worker_ptr->batch_sample(table_id, node_ids, sample_size, v); + status.wait(); + } + return v; +} std::vector GraphPyClient::pull_graph_list(std::string name, int server_index, int start, int size) { diff --git a/paddle/fluid/distributed/service/graph_py_service.h b/paddle/fluid/distributed/service/graph_py_service.h index 56b47d71093f6..8e3764fe62c46 100644 --- a/paddle/fluid/distributed/service/graph_py_service.h +++ b/paddle/fluid/distributed/service/graph_py_service.h @@ -122,6 +122,8 @@ class GraphPyClient : public GraphPyService { std::vector> sample_k(std::string name, uint64_t node_id, int sample_size); + std::vector > > batch_sample_k( + std::string name, std::vector node_ids, int sample_size); std::vector pull_graph_list(std::string name, int server_index, int start, int size); ::paddle::distributed::PSParameter GetWorkerProto(); diff --git a/paddle/fluid/distributed/service/ps_client.h b/paddle/fluid/distributed/service/ps_client.h index 78ef708ffe8b4..21f0ef51aa76d 100644 --- a/paddle/fluid/distributed/service/ps_client.h +++ b/paddle/fluid/distributed/service/ps_client.h @@ -166,7 +166,7 @@ class PSClient { } virtual std::future batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, - std::vector > &res) { + std::vector>> &res) { LOG(FATAL) << "Did not implement"; std::promise promise; std::future fut = promise.get_future(); diff --git a/paddle/fluid/distributed/table/common_graph_table.h b/paddle/fluid/distributed/table/common_graph_table.h index 18fdfdb5d8f66..68f265685b9d9 100644 --- a/paddle/fluid/distributed/table/common_graph_table.h +++ b/paddle/fluid/distributed/table/common_graph_table.h @@ -73,7 +73,7 @@ class GraphTable : public SparseTable { int &actual_size); //virtual std::future random_sample(uint64_t node_id, int sampe_size, char *&buffer, //int &actual_size); - int32_t random_sample(uint64_t node_id, int sampe_size, char *&buffer, + virtual int32_t random_sample(uint64_t node_id, int sampe_size, char *&buffer, int &actual_size); virtual int32_t initialize(); int32_t load(const std::string &path, const std::string ¶m); diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index 491d72655f762..aeaa32656bae1 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -219,7 +219,7 @@ void RunBrpcPushSparse() { worker_ptr_->load(0, std::string(file_name), std::string("")); pull_status.wait(); - std::vector > vs; + std::vector > > vs; std::vector> v; pull_status = worker_ptr_->sample(0, 37, 4, v); pull_status.wait(); @@ -286,7 +286,7 @@ void RunBrpcPushSparse() { std::vector node_ids; node_ids.push_back(96); node_ids.push_back(37); - vs = gps2.batch_sample_k(std::string("user2item"), node_ids, 4); + vs = client1.batch_sample_k(std::string("user2item"), node_ids, 4); ASSERT_EQ(vs.size(), 2); // to test in python,try this: // from paddle.fluid.core import GraphPyService diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 086bb9ca161df..5dbb8cdd56d69 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -176,18 +176,13 @@ void BindGraphPyServer(py::module* m) { void BindGraphPyClient(py::module* m) { py::class_(*m, "GraphPyClient") .def(py::init<>()) - .def("load_edge_file", &GraphPyService::load_edge_file) - .def("load_node_file", &GraphPyService::load_node_file) - .def("set_up", &GraphPyService::set_up) - .def("pull_graph_list", &GraphPyService::pull_graph_list) - .def("sample_k", &GraphPyService::sample_k) .def("load_edge_file", &GraphPyClient::load_edge_file) .def("load_node_file", &GraphPyClient::load_node_file) .def("set_up", &GraphPyClient::set_up) .def("pull_graph_list", &GraphPyClient::pull_graph_list) .def("sample_k", &GraphPyClient::sample_k) - .def("start_client", &GraphPyClient::start_client); - .def("batch_sample_k", &GraphPyService::batch_sample_k); + .def("start_client", &GraphPyClient::start_client) + .def("batch_sample_k", &GraphPyClient::batch_sample_k); } } // end namespace pybind From 583e8de7e431f0512b0e4398f45db4eb713032f5 Mon Sep 17 00:00:00 2001 From: suweiyue Date: Mon, 15 Mar 2021 22:06:41 +0800 Subject: [PATCH 10/15] random_sample return 0 --- paddle/fluid/distributed/table/common_graph_table.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/paddle/fluid/distributed/table/common_graph_table.cc b/paddle/fluid/distributed/table/common_graph_table.cc index 7a848925d3a7a..aa011bbfcbf2e 100644 --- a/paddle/fluid/distributed/table/common_graph_table.cc +++ b/paddle/fluid/distributed/table/common_graph_table.cc @@ -228,6 +228,7 @@ int32_t GraphTable::random_sample(uint64_t node_id, int sample_size, memcpy(buffer + offset, &weight, GraphNode::weight_size); offset += GraphNode::weight_size; } + return 0; }) .get(); } From 32558de1c1e40c7ff952f911802af0d1f748287d Mon Sep 17 00:00:00 2001 From: suweiyue Date: Mon, 15 Mar 2021 22:11:19 +0800 Subject: [PATCH 11/15] rm useless loop --- paddle/fluid/distributed/service/graph_brpc_client.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index 1c9583e440bb9..65c012c060ddd 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -120,10 +120,6 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, query_idx_buckets[request_idx].push_back(query_idx); } - for (int request_idx = 0; request_idx < request_call_num; ++request_idx){ - - } - DownpourBrpcClosure *closure = new DownpourBrpcClosure(request_call_num, [&, node_id_buckets, query_idx_buckets, request_call_num](void *done) { int ret = 0; auto *closure = (DownpourBrpcClosure *)done; From 8643616c3b9be19a3ae5f0537d6b17176ae381c2 Mon Sep 17 00:00:00 2001 From: suweiyue Date: Mon, 15 Mar 2021 22:36:31 +0800 Subject: [PATCH 12/15] test:load edge --- paddle/fluid/distributed/test/graph_node_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index aeaa32656bae1..b419d3ae06356 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -216,7 +216,7 @@ void RunBrpcPushSparse() { /*-----------------------Test Server Init----------------------------------*/ auto pull_status = - worker_ptr_->load(0, std::string(file_name), std::string("")); + worker_ptr_->load(0, std::string(file_name), std::string("edge")); pull_status.wait(); std::vector > > vs; From a21d599d24fad623b7568b25724471bf6c68bf6a Mon Sep 17 00:00:00 2001 From: suweiyue Date: Mon, 15 Mar 2021 22:52:44 +0800 Subject: [PATCH 13/15] fix ret -1 --- paddle/fluid/distributed/service/graph_brpc_client.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index 65c012c060ddd..3cda11c6a36d8 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -123,9 +123,10 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, DownpourBrpcClosure *closure = new DownpourBrpcClosure(request_call_num, [&, node_id_buckets, query_idx_buckets, request_call_num](void *done) { int ret = 0; auto *closure = (DownpourBrpcClosure *)done; + int fail_num = 0; for (int request_idx = 0; request_idx < request_call_num; ++request_idx){ if (closure->check_response(request_idx, PS_GRAPH_SAMPLE) != 0) { - ret = -1; + ++fail_num; } else { VLOG(0) << "check sample response: " << " " << closure->check_response(request_idx, PS_GRAPH_SAMPLE); @@ -145,10 +146,6 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, int actual_size = actual_sizes[node_idx]; int start = 0; while (start < actual_size) { - //GraphNode node; - //node.recover_from_buffer(node_buffer + offset + start); - //start += node.get_size(); - //res[query_idx].push_back(node); res[query_idx].push_back({*(uint64_t *)(node_buffer + offset + start), *(float *)(node_buffer + offset + start + GraphNode::id_size)}); start += GraphNode::id_size + GraphNode::weight_size; @@ -156,6 +153,9 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, offset += actual_size; } } + if (fail_num == request_call_num){ + ret = -1; + } } closure->set_promise_value(ret); }); From dd14309a6ac7ff99526eebac14a3ab4a97ffef6e Mon Sep 17 00:00:00 2001 From: suweiyue Date: Mon, 15 Mar 2021 23:06:56 +0800 Subject: [PATCH 14/15] test: rm sample --- .../distributed/service/graph_brpc_client.cc | 1 + .../distributed/service/graph_brpc_client.h | 1 - .../fluid/distributed/test/graph_node_test.cc | 26 ++++++++++--------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index 3cda11c6a36d8..f30c8fad08c35 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -101,6 +101,7 @@ std::future GraphBrpcClient::batch_sample(uint32_t table_id, std::vector request2server; std::vector server2request(server_size, -1); + res.clear(); for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx){ int server_index = get_server_index_by_id(node_ids[query_idx]); if(server2request[server_index] == -1){ diff --git a/paddle/fluid/distributed/service/graph_brpc_client.h b/paddle/fluid/distributed/service/graph_brpc_client.h index 9bc7505a7009d..01eb11a393c1a 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.h +++ b/paddle/fluid/distributed/service/graph_brpc_client.h @@ -42,7 +42,6 @@ class GraphBrpcClient : public BrpcPsClient { virtual std::future batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, std::vector>> &res); - //std::vector > &res); virtual std::future pull_graph_list(uint32_t table_id, int server_index, int start, int size, diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index b419d3ae06356..f10507c71147a 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -220,20 +220,22 @@ void RunBrpcPushSparse() { pull_status.wait(); std::vector > > vs; - std::vector> v; - pull_status = worker_ptr_->sample(0, 37, 4, v); + //std::vector> v; + //pull_status = worker_ptr_->sample(0, 37, 4, v); + pull_status = worker_ptr_->batch_sample(0, std::vector(1, 37), 4, vs); pull_status.wait(); - ASSERT_EQ(v.size(), 3); - v.clear(); - pull_status = worker_ptr_->sample(0, 96, 4, v); + ASSERT_EQ(vs[0].size(), 3); + vs.clear(); + //pull_status = worker_ptr_->sample(0, 96, 4, v); + pull_status = worker_ptr_->batch_sample(0, std::vector(1, 96), 4, vs); pull_status.wait(); std::unordered_set s = {111, 48, 247}; - ASSERT_EQ(3, v.size()); - for (auto g : v) { + ASSERT_EQ(3, vs[0].size()); + for (auto g : vs[0]) { // std::cout << g.first << std::endl; ASSERT_EQ(true, s.find(g.first) != s.end()); } - v.clear(); + vs.clear(); std::vector nodes; pull_status = worker_ptr_->pull_graph_list(0, 0, 0, 1, nodes); pull_status.wait(); @@ -277,10 +279,10 @@ void RunBrpcPushSparse() { nodes = client2.pull_graph_list(std::string("user2item"), 0, 1, 4); ASSERT_EQ(nodes[0].get_id(), 59); nodes.clear(); - v = client1.sample_k(std::string("user2item"), 96, 4); - ASSERT_EQ(v.size(), 3); - std::cout << "sample result" << std::endl; - for (auto p : v) { + vs = client1.batch_sample_k(std::string("user2item"), std::vector(1, 96), 4); + ASSERT_EQ(vs[0].size(), 3); + std::cout << "batch sample result" << std::endl; + for (auto p : vs[0]) { std::cout << p.first << " " << p.second << std::endl; } std::vector node_ids; From 86ff4d9f1641b78ecefe872341a55e8049e63a84 Mon Sep 17 00:00:00 2001 From: suweiyue Date: Mon, 15 Mar 2021 23:12:33 +0800 Subject: [PATCH 15/15] rm sample --- .../distributed/service/graph_brpc_client.cc | 60 ------------------- .../distributed/service/graph_brpc_client.h | 3 - .../distributed/service/graph_py_service.cc | 10 ---- .../distributed/service/graph_py_service.h | 3 - paddle/fluid/distributed/service/ps_client.h | 9 --- paddle/fluid/pybind/fleet_py.cc | 1 - 6 files changed, 86 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index f30c8fad08c35..b3c5540265a23 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -35,66 +35,6 @@ int GraphBrpcClient::get_server_index_by_id(uint64_t id) { return id % shard_num / shard_per_server; } // char* &buffer,int &actual_size -std::future GraphBrpcClient::sample( - uint32_t table_id, uint64_t node_id, int sample_size, - std::vector> &res) { - int server_index = get_server_index_by_id(node_id); - DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { - int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; - if (closure->check_response(0, PS_GRAPH_SAMPLE) != 0) { - ret = -1; - } else { - auto &res_io_buffer = closure->cntl(0)->response_attachment(); - butil::IOBufBytesIterator io_buffer_itr(res_io_buffer); - size_t bytes_size = io_buffer_itr.bytes_left(); - char *buffer = new char[bytes_size]; - io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); - - size_t node_num = *(size_t *)buffer; - int *actual_sizes = (int *)(buffer + sizeof(size_t)); - char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * node_num; - - int offset = 0; - for (size_t idx = 0; idx < node_num; ++idx){ - int actual_size = actual_sizes[idx]; - int start = 0; - while (start < actual_size) { - //GraphNode node; - //node.recover_from_buffer(node_buffer + offset + start); - //start += node.get_size(); - //res.push_back(node); - res.push_back({*(uint64_t *)(node_buffer + offset + start), - *(float *)(node_buffer + offset + start + GraphNode::id_size)}); - start += GraphNode::id_size + GraphNode::weight_size; - } - offset += actual_size; - } - } - closure->set_promise_value(ret); - }); - auto promise = std::make_shared>(); - closure->add_promise(promise); - std::future fut = promise->get_future(); - closure->request(0)->set_cmd_id(PS_GRAPH_SAMPLE); - closure->request(0)->set_table_id(table_id); - closure->request(0)->set_client_id(_client_id); - // std::string type_str = GraphNode::node_type_to_string(type); - std::vector node_ids; - node_ids.push_back(node_id); - size_t node_num = node_ids.size(); - - closure->request(0)->add_params((char *)node_ids.data(), sizeof(uint64_t)*node_num); - //closure->request(0)->add_params((char *)&node_id, sizeof(uint64_t)); - closure->request(0)->add_params((char *)&sample_size, sizeof(int)); - PsService_Stub rpc_stub(get_cmd_channel(server_index)); - closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); - rpc_stub.service(closure->cntl(0), closure->request(0), closure->response(0), - closure); - - return fut; -} - std::future GraphBrpcClient::batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, std::vector > > &res) { diff --git a/paddle/fluid/distributed/service/graph_brpc_client.h b/paddle/fluid/distributed/service/graph_brpc_client.h index 01eb11a393c1a..3cd1d3eb753e3 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.h +++ b/paddle/fluid/distributed/service/graph_brpc_client.h @@ -36,9 +36,6 @@ class GraphBrpcClient : public BrpcPsClient { public: GraphBrpcClient() {} virtual ~GraphBrpcClient() {} - virtual std::future sample( - uint32_t table_id, uint64_t node_id, int sample_size, - std::vector> &res); virtual std::future batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, std::vector>> &res); diff --git a/paddle/fluid/distributed/service/graph_py_service.cc b/paddle/fluid/distributed/service/graph_py_service.cc index 64a572677962a..3ba35fda0bf39 100644 --- a/paddle/fluid/distributed/service/graph_py_service.cc +++ b/paddle/fluid/distributed/service/graph_py_service.cc @@ -183,16 +183,6 @@ void GraphPyClient::load_node_file(std::string name, std::string filepath) { status.wait(); } } -std::vector> GraphPyClient::sample_k( - std::string name, uint64_t node_id, int sample_size) { - std::vector> v; - if (this->table_id_map.count(name)) { - uint32_t table_id = this->table_id_map[name]; - auto status = worker_ptr->sample(table_id, node_id, sample_size, v); - status.wait(); - } - return v; -} std::vector > > GraphPyClient::batch_sample_k( std::string name, std::vector node_ids, int sample_size) { std::vector > > v; diff --git a/paddle/fluid/distributed/service/graph_py_service.h b/paddle/fluid/distributed/service/graph_py_service.h index 8e3764fe62c46..8f6c9f0ad0b64 100644 --- a/paddle/fluid/distributed/service/graph_py_service.h +++ b/paddle/fluid/distributed/service/graph_py_service.h @@ -119,9 +119,6 @@ class GraphPyClient : public GraphPyService { int get_client_id() { return client_id; } void set_client_id(int client_id) { this->client_id = client_id; } void start_client(); - std::vector> sample_k(std::string name, - uint64_t node_id, - int sample_size); std::vector > > batch_sample_k( std::string name, std::vector node_ids, int sample_size); std::vector pull_graph_list(std::string name, int server_index, diff --git a/paddle/fluid/distributed/service/ps_client.h b/paddle/fluid/distributed/service/ps_client.h index 21f0ef51aa76d..00ef4c5c8caef 100644 --- a/paddle/fluid/distributed/service/ps_client.h +++ b/paddle/fluid/distributed/service/ps_client.h @@ -155,15 +155,6 @@ class PSClient { promise.set_value(-1); return fut; } - virtual std::future sample( - uint32_t table_id, uint64_t node_id, int sample_size, - std::vector> &res) { - LOG(FATAL) << "Did not implement"; - std::promise promise; - std::future fut = promise.get_future(); - promise.set_value(-1); - return fut; - } virtual std::future batch_sample(uint32_t table_id, std::vector node_ids, int sample_size, std::vector>> &res) { diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 5dbb8cdd56d69..57e47461caf68 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -180,7 +180,6 @@ void BindGraphPyClient(py::module* m) { .def("load_node_file", &GraphPyClient::load_node_file) .def("set_up", &GraphPyClient::set_up) .def("pull_graph_list", &GraphPyClient::pull_graph_list) - .def("sample_k", &GraphPyClient::sample_k) .def("start_client", &GraphPyClient::start_client) .def("batch_sample_k", &GraphPyClient::batch_sample_k); }