Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pglbox2.0] merge gpugraph to develop #49946

Merged
merged 52 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
8a70ba1
add set slot_num for psgpuwraper (#177)
chao9527 Nov 30, 2022
8650d4d
Add get_epoch_finish python interface (#182)
DesmonDay Dec 5, 2022
677c762
add unzip op (#183)
huwei02 Dec 8, 2022
e2aeb29
fix miss key for error dataset (#186)
chao9527 Dec 8, 2022
20852e4
add excluded_train_pair and infer_node_type (#187)
huwei02 Dec 10, 2022
91d951f
support return of degree (#188)
DesmonDay Dec 11, 2022
718db77
fix task stuck in barrier (#189)
chao9527 Dec 14, 2022
7830a09
check node/feature format when loading (#190)
huwei02 Dec 16, 2022
292f59b
degrade log (#191)
huwei02 Dec 19, 2022
ed25380
[PGLBOX]fix conflict
zmxdream Dec 24, 2022
d3ea91b
[PGLBOX]fix conflict
zmxdream Dec 24, 2022
ecb1163
[PGLBOX]replace LodTensor with phi::DenseTensor
zmxdream Dec 24, 2022
386ffee
[PGLBOX]fix gpu_primitives.h include path
zmxdream Dec 24, 2022
75f2258
[PGLBOX]from platform::PADDLE_CUDA_NUM_THREADS to phi::PADDLE_CUDA_NU…
zmxdream Dec 24, 2022
c45f685
[PGLBOX]fix unzip example code
zmxdream Dec 24, 2022
130a68f
[PGLBOX]fix unzip example code
zmxdream Dec 24, 2022
da9b965
[PGLBOX]fix unzip example code
zmxdream Dec 24, 2022
0cd6238
[PGLBOX]fix unzip example code
zmxdream Dec 24, 2022
daa0bd5
[PGLBOX]fix unzip ut
zmxdream Dec 24, 2022
e00b347
[PGLBOX]fix unzip ut
zmxdream Dec 24, 2022
1221fb8
[PGLBOX]fix code style
zmxdream Dec 24, 2022
67a47a3
[PGLBOX]fix code style
zmxdream Dec 24, 2022
803fdb4
[PGLBOX]fix code style
zmxdream Dec 24, 2022
679bdd9
fix code style
zmxdream Dec 25, 2022
e2a54c7
fix code style
zmxdream Dec 25, 2022
5a1163d
fix unzip ut
zmxdream Dec 25, 2022
0c3e21f
fix unzip ut
zmxdream Dec 25, 2022
2cd58d0
fix unzip ut
zmxdream Dec 25, 2022
93fbdac
fix unzip
zmxdream Dec 26, 2022
32a5d85
fix code stype
zmxdream Dec 26, 2022
cb43e47
add ut
zmxdream Dec 27, 2022
4e0c167
add c++ ut & fix train_mode_ set
zmxdream Dec 30, 2022
9390afe
Merge branch 'origin_dev' into pglbox
zmxdream Dec 30, 2022
7a7ce24
fix load into memory
zmxdream Dec 30, 2022
8f6ff17
fix c++ ut
zmxdream Dec 30, 2022
3948b6e
fix c++ ut
zmxdream Dec 31, 2022
b1cfc82
fix c++ ut
zmxdream Jan 1, 2023
f16519b
fix c++ ut
zmxdream Jan 1, 2023
45c2be2
fix code style
zmxdream Jan 3, 2023
d7d0f0d
fix collective
zmxdream Jan 6, 2023
bc54090
gpugraph merge develop
zmxdream Jan 7, 2023
e210464
fix unzip_op.cc
zmxdream Jan 8, 2023
cabfed9
fix barrier
zmxdream Jan 17, 2023
07db623
fix conflict
zmxdream Jan 17, 2023
32ff693
fix code style
zmxdream Jan 17, 2023
22f5bd7
fix barrier
zmxdream Jan 17, 2023
c41437c
fix barrier
zmxdream Jan 18, 2023
cf011f0
fix code styple
zmxdream Jan 18, 2023
f9358c8
fix unzip
zmxdream Jan 19, 2023
a05c2fa
add unzip.py
zmxdream Jan 29, 2023
b660edb
add unzip.py
zmxdream Jan 29, 2023
0e1caab
fix unzip.py
zmxdream Jan 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/ps/service/ps_local_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,10 @@ ::std::future<int32_t> PsLocalClient::SaveCacheTable(uint32_t table_id,
size_t threshold) {
auto* table_ptr = GetTable(table_id);
std::pair<int64_t, int64_t> ret = table_ptr->PrintTableStat();
VLOG(0) << "table id: " << table_id << ", feasign size: " << ret.first
VLOG(1) << "table id: " << table_id << ", feasign size: " << ret.first
<< ", mf size: " << ret.second;
if (ret.first > (int64_t)threshold) {
VLOG(0) << "run cache table";
VLOG(1) << "run cache table";
table_ptr->CacheTable(pass_id);
}
return done();
Expand Down
115 changes: 96 additions & 19 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();

std::stringstream ss;
for (int k = 0; k < slot_num; ++k) {
ss << slot_feature_num_map_[k] << " ";
if (FLAGS_v > 0) {
std::stringstream ss;
for (int k = 0; k < slot_num; ++k) {
ss << slot_feature_num_map_[k] << " ";
}
VLOG(1) << "slot_feature_num_map: " << ss.str();
}
VLOG(0) << "slot_feature_num_map: " << ss.str();

tasks.clear();

Expand All @@ -137,7 +139,7 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
for (size_t i = 0; i < shard_num; i++) {
tot_len += feature_array[i].size();
}
VLOG(0) << "Loaded feature table on cpu, feature_list_size[" << tot_len
VLOG(1) << "Loaded feature table on cpu, feature_list_size[" << tot_len
<< "] node_ids_size[" << node_ids.size() << "]";
res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), slot_num);
unsigned int offset = 0, ind = 0;
Expand Down Expand Up @@ -494,6 +496,8 @@ void GraphTable::export_partition_files(int idx, std::string file_path) {

for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
}
#endif

void GraphTable::clear_graph(int idx) {
for (auto p : edge_shards[idx]) {
p->clear();
Expand All @@ -506,6 +510,7 @@ void GraphTable::clear_graph(int idx) {
}
}

#ifdef PADDLE_WITH_HETERPS
void GraphTable::release_graph() {
// Before releasing graph, prepare for sampling ids and embedding keys.
build_graph_type_keys();
Expand Down Expand Up @@ -545,6 +550,7 @@ void GraphTable::release_graph_node() {
feature_shrink_to_fit();
}
}
#endif

void GraphTable::clear_edge_shard() {
VLOG(0) << "begin clear edge shard";
Expand Down Expand Up @@ -590,6 +596,7 @@ void GraphTable::clear_feature_shard() {
VLOG(0) << "finish clear feature shard";
}

#ifdef PADDLE_WITH_HETERPS
void GraphTable::feature_shrink_to_fit() {
std::vector<std::future<int>> tasks;
for (auto &type_shards : feature_shards) {
Expand Down Expand Up @@ -619,13 +626,16 @@ void GraphTable::merge_feature_shard() {
feature_shards.resize(1);
}

#endif

void GraphTable::clear_graph() {
VLOG(0) << "begin clear_graph";
clear_edge_shard();
clear_feature_shard();
VLOG(0) << "finish clear_graph";
}

#ifdef PADDLE_WITH_HETERPS
int32_t GraphTable::load_next_partition(int idx) {
if (next_partition >= static_cast<int>(partitions[idx].size())) {
VLOG(0) << "partition iteration is done";
Expand Down Expand Up @@ -1203,11 +1213,21 @@ int32_t GraphTable::Load(const std::string &path, const std::string &param) {
if (load_edge) {
bool reverse_edge = (param[1] == '<');
std::string edge_type = param.substr(2);
return this->load_edges(path, reverse_edge, edge_type);
int ret = this->load_edges(path, reverse_edge, edge_type);
if (ret != 0) {
VLOG(0) << "Fail to load edges, path[" << path << "] edge_type["
<< edge_type << "]";
return -1;
}
}
if (load_node) {
std::string node_type = param.substr(1);
return this->load_nodes(path, node_type);
int ret = this->load_nodes(path, node_type);
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << path << "] node_type["
<< node_type << "]";
return -1;
}
}
return 0;
}
Expand Down Expand Up @@ -1319,10 +1339,19 @@ int32_t GraphTable::parse_node_and_load(std::string ntype2files,
return 0;
}
if (FLAGS_graph_load_in_parallel) {
this->load_nodes(npath_str, "");
int ret = this->load_nodes(npath_str, "");
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << npath << "]";
return -1;
}
} else {
for (size_t j = 0; j < ntypes.size(); j++) {
this->load_nodes(npath_str, ntypes[j]);
int ret = this->load_nodes(npath_str, ntypes[j]);
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << npath << "], ntypes["
<< ntypes[j] << "]";
return -1;
}
}
}
return 0;
Expand Down Expand Up @@ -1397,17 +1426,30 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype2files,
return 0;
}
if (FLAGS_graph_load_in_parallel) {
this->load_nodes(npath_str, "");
int ret = this->load_nodes(npath_str, "");
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << npath_str << "]";
return -1;
}
} else {
for (size_t j = 0; j < ntypes.size(); j++) {
this->load_nodes(npath_str, ntypes[j]);
int ret = this->load_nodes(npath_str, ntypes[j]);
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << npath_str
<< "], ntypes[" << ntypes[j] << "]";
return -1;
}
}
}
}
return 0;
}));
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
if (is_parse_node_fail_) {
VLOG(0) << "Fail to load node_and_edge_file";
return -1;
}
return 0;
}

Expand Down Expand Up @@ -1499,7 +1541,12 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
node->set_feature_size(feat_name[idx].size());
for (int i = 1; i < num; ++i) {
auto &v = vals[i];
parse_feature(idx, v.ptr, v.len, node);
int ret = parse_feature(idx, v.ptr, v.len, node);
if (ret != 0) {
VLOG(0) << "Fail to parse feature, node_id[" << id << "]";
is_parse_node_fail_ = true;
return {0, 0};
}
}
}
local_valid_count++;
Expand Down Expand Up @@ -1551,7 +1598,12 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
if (node != NULL) {
for (int i = 2; i < num; ++i) {
auto &v = vals[i];
parse_feature(idx, v.ptr, v.len, node);
int ret = parse_feature(idx, v.ptr, v.len, node);
if (ret != 0) {
VLOG(0) << "Fail to parse feature, node_id[" << id << "]";
is_parse_node_fail_ = true;
return {0, 0};
}
}
}
local_valid_count++;
Expand Down Expand Up @@ -1603,6 +1655,11 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) {
valid_count += res.second;
}
}
if (is_parse_node_fail_) {
VLOG(0) << "Fail to load nodes, path[" << paths[0] << ".."
<< paths[paths.size() - 1] << "] node_type[" << node_type << "]";
return -1;
}

VLOG(0) << valid_count << "/" << count << " nodes in node_type[ " << node_type
<< "] are loaded successfully!";
Expand Down Expand Up @@ -2103,36 +2160,56 @@ int GraphTable::parse_feature(int idx,
if (dtype == "feasign") {
// string_vector_2_string(fields.begin() + 1, fields.end(), ' ',
// fea_ptr);
FeatureNode::parse_value_to_bytes<uint64_t>(
int ret = FeatureNode::parse_value_to_bytes<uint64_t>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "string") {
string_vector_2_string(
fea_fields.begin(), fea_fields.end(), ' ', fea_ptr);
return 0;
} else if (dtype == "float32") {
FeatureNode::parse_value_to_bytes<float>(
int ret = FeatureNode::parse_value_to_bytes<float>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "float64") {
FeatureNode::parse_value_to_bytes<double>(
int ret = FeatureNode::parse_value_to_bytes<double>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "int32") {
FeatureNode::parse_value_to_bytes<int32_t>(
int ret = FeatureNode::parse_value_to_bytes<int32_t>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "int64") {
FeatureNode::parse_value_to_bytes<uint64_t>(
int ret = FeatureNode::parse_value_to_bytes<uint64_t>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
}
} else {
VLOG(2) << "feature_name[" << name << "] is not in feat_id_map, ntype_id["
<< idx << "] feat_id_map_size[" << feat_id_map.size() << "]";
}

return -1;
return 0;
}
// thread safe shard vector merge
class MergeShardVector {
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ class GraphTable : public Table {
std::string slot_feature_separator_ = std::string(" ");
std::string feature_separator_ = std::string(" ");
std::vector<int> slot_feature_num_map_;
bool is_parse_node_fail_ = false;
};

/*
Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/distributed/ps/table/graph/graph_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class FeatureNode : public Node {
}

template <typename T>
static void parse_value_to_bytes(
static int parse_value_to_bytes(
std::vector<paddle::string::str_ptr>::iterator feat_str_begin,
std::vector<paddle::string::str_ptr>::iterator feat_str_end,
std::string *output) {
Expand All @@ -269,8 +269,14 @@ class FeatureNode : public Node {
thread_local paddle::string::str_ptr_stream ss;
for (size_t i = 0; i < feat_str_size; i++) {
ss.reset(*(feat_str_begin + i));
int len = ss.end - ss.ptr;
char *old_ptr = ss.ptr;
ss >> fea_ptrs[i];
if (ss.ptr - old_ptr != len) {
return -1;
}
}
return 0;
}

protected:
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,8 @@ if(WITH_DISTRIBUTE)
fleet
heter_server
brpc
fleet_executor)
fleet_executor
flags)
set(DISTRIBUTE_COMPILE_FLAGS
"-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=parentheses"
)
Expand Down
7 changes: 6 additions & 1 deletion paddle/fluid/framework/barrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

#pragma once

#if defined _WIN32 || defined __APPLE__
#else
#define __LINUX__
#endif

#ifdef __LINUX__
#include <pthread.h>
#include <semaphore.h>
Expand Down Expand Up @@ -48,7 +53,7 @@ class Barrier {
void wait() {
#ifdef __LINUX__
int err = pthread_barrier_wait(&_barrier);
if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD)) {
if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD) {
CHECK_EQ(1, 0);
}
#endif
Expand Down
9 changes: 9 additions & 0 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2112,15 +2112,24 @@ void SlotRecordInMemoryDataFeed::Init(const DataFeedDesc& data_feed_desc) {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.SetConfig(data_feed_desc);
#endif
if (gpu_graph_mode_) {
train_mode_ = true;
} else {
train_mode_ = data_feed_desc.graph_config().gpu_graph_training();
}
}

#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
void SlotRecordInMemoryDataFeed::InitGraphResource() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.AllocResource(thread_id_, feed_vec_);
#endif
}

void SlotRecordInMemoryDataFeed::InitGraphTrainResource() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.AllocTrainResource(thread_id_);
#endif
}
#endif

Expand Down
Loading