Skip to content

Commit

Permalink
Merge branch 'develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
seiriosPlus authored Apr 7, 2021
2 parents ceebbf9 + a881b4d commit a537778
Show file tree
Hide file tree
Showing 22 changed files with 232 additions and 122 deletions.
46 changes: 9 additions & 37 deletions paddle/fluid/distributed/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,41 +146,6 @@ void FleetWrapper::CreateClient2ClientConnection() {
client2client_max_retry_);
}

std::future<int32_t> FleetWrapper::PullSparseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, std::vector<uint64_t>* fea_keys,
std::vector<std::vector<float>>* fea_values, int fea_value_dim) {
fea_keys->clear();
fea_keys->resize(0);
fea_keys->reserve(MAX_FEASIGN_NUM);
for (auto name : var_names) {
Variable* var = scope.FindVar(name);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
CHECK(tensor != nullptr) << "tensor of var " << name << " is null";
int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();
for (auto i = 0u; i < len; ++i) {
if (ids[i] == 0u) {
continue;
}
fea_keys->push_back(static_cast<uint64_t>(ids[i]));
}
}
fea_values->resize(fea_keys->size() + 1);
for (auto& t : *fea_values) {
t.resize(fea_value_dim);
}
std::vector<float*> pull_result_ptr;
for (auto& t : *fea_values) {
pull_result_ptr.push_back(t.data());
}
return pserver_ptr_->_worker_ptr->pull_sparse(
pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size());
}

void FleetWrapper::PullSparseVarsSync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, std::vector<uint64_t>* fea_keys,
Expand Down Expand Up @@ -224,8 +189,10 @@ void FleetWrapper::PullSparseVarsSync(
for (auto& t : *fea_values) {
pull_result_ptr.push_back(t.data());
}
bool training = true;
auto status = pserver_ptr_->_worker_ptr->pull_sparse(
pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size());
pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size(),
training);
pull_sparse_status.push_back(std::move(status));
for (auto& t : pull_sparse_status) {
t.wait();
Expand All @@ -238,9 +205,13 @@ void FleetWrapper::PullSparseVarsSync(
}
}

// is_training is true means training, false means inference, the behavior is
// different on pserver

void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
uint64_t padding_id,
platform::Place place,
bool is_training,
std::vector<const LoDTensor*>* inputs,
std::vector<LoDTensor*>* outputs) {
std::vector<uint64_t> fea_keys;
Expand Down Expand Up @@ -279,7 +250,8 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
}
auto* communicator = Communicator::GetInstance();
auto status = communicator->_worker_ptr->pull_sparse(
pull_result_ptr.data(), table_id, fea_keys.data(), fea_keys.size());
pull_result_ptr.data(), table_id, fea_keys.data(), fea_keys.size(),
is_training);
status.wait();
auto ret = status.get();
if (ret != 0) {
Expand Down
13 changes: 4 additions & 9 deletions paddle/fluid/distributed/fleet.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,14 @@ class FleetWrapper {
int fea_dim,
const std::vector<std::string>& var_emb_names);

// Pull sparse variables from server in async mode
// Param<in>: scope, table_id, var_names, fea_keys, fea_dim
// Param<out>: fea_values std::future
std::future<int32_t> PullSparseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<uint64_t>* fea_keys,
std::vector<std::vector<float>>* fea_values, int fea_dim);

// Pull sparse variables from server in sync mode
// pull immediately to tensors
// is_training is true means training, false means inference, the behavior is
// different on pserver

void PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
uint64_t padding_id, platform::Place place,
bool is_training,
std::vector<const LoDTensor*>* inputs, // NOLINT
std::vector<LoDTensor*>* outputs); // NOLINT

Expand Down
17 changes: 14 additions & 3 deletions paddle/fluid/distributed/service/brpc_ps_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,8 @@ std::future<int32_t> BrpcPsClient::push_global_step(int table_id,

std::future<int32_t> BrpcPsClient::pull_sparse(float **select_values,
size_t table_id,
const uint64_t *keys,
size_t num) {
const uint64_t *keys, size_t num,
bool is_training) {
size_t request_call_num = _server_channels.size();

auto shard_sorted_kvs = std::make_shared<
Expand Down Expand Up @@ -837,16 +837,27 @@ std::future<int32_t> BrpcPsClient::pull_sparse(float **select_values,
uint32_t kv_request_count = 0;
size_t sorted_kv_size = sorted_kvs.size();
auto &request_buffer = closure->cntl(i)->request_attachment();

request_buffer.append((void *)&is_training, sizeof(bool));
std::vector<uint32_t> keys_counter;
keys_counter.reserve(sorted_kv_size);

for (size_t kv_idx = 0; kv_idx < sorted_kv_size; ++kv_idx) {
++kv_request_count;
uint32_t keys = 1;
last_key = sorted_kvs[kv_idx].first;
request_buffer.append((void *)&last_key, sizeof(uint64_t));
while (kv_idx < sorted_kv_size - 1 &&
last_key == sorted_kvs[kv_idx + 1].first) {
++kv_idx;
++keys;
}
keys_counter.push_back(keys);
}

request_buffer.append((void *)keys_counter.data(),
sizeof(uint32_t) * keys_counter.size());

if (kv_request_count == 0) {
closure->Run();
} else {
Expand Down Expand Up @@ -956,7 +967,7 @@ int32_t BrpcPsClient::recv_and_save_table(const uint64_t table_id,
}

auto status = pull_sparse((float **)save_vec.data(), table_id,
save_key.data(), save_key.size());
save_key.data(), save_key.size(), true);
status.wait();

// create lod tensor
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/distributed/service/brpc_ps_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class BrpcPsClient : public PSClient {

virtual std::future<int32_t> pull_sparse(float **select_values,
size_t table_id,
const uint64_t *keys, size_t num);
const uint64_t *keys, size_t num,
bool is_training);

virtual std::future<int32_t> print_table_stat(uint32_t table_id);

Expand Down
33 changes: 20 additions & 13 deletions paddle/fluid/distributed/service/brpc_ps_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "paddle/fluid/distributed/service/brpc_ps_server.h"
#include <thread> // NOLINT
#include "paddle/fluid/distributed/table/depends/sparse_utils.h"
#include "paddle/fluid/distributed/table/table.h"
#include "paddle/fluid/framework/archive.h"
#include "paddle/fluid/platform/profiler.h"
Expand Down Expand Up @@ -337,33 +338,39 @@ int32_t BrpcPsService::pull_sparse(Table *table,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->pull_sparse");
CHECK_TABLE_EXIST(table, request, response)
thread_local std::string push_sparse_request_buffer;

auto &req_io_buffer = cntl->request_attachment();
auto req_buffer_size = req_io_buffer.size();

if (req_buffer_size < 1) {
set_response_code(response, -1, "req attachment is empty");
return 0;
}

if (request.params_size() < 1) {
set_response_code(response, -1,
"PsRequestMessage.params is requeired at "
"least 1 for num of sparse_key");
return 0;
}

uint32_t num = *(uint32_t *)(request.params(0).c_str());
push_sparse_request_buffer.resize(0);
push_sparse_request_buffer.reserve(req_buffer_size);
const char *data = (const char *)cntl->request_attachment().fetch(
const_cast<char *>(push_sparse_request_buffer.data()), req_buffer_size);
/*
Attachment Content:
|---keysData---|
|---8*{num}B---|
*/
const uint64_t *keys = (const uint64_t *)data;
auto dim = table->value_accesor()->select_dim();

thread_local std::string req_buffer;
req_buffer.reserve(req_buffer_size);

const void *data = cntl->request_attachment().fetch(
const_cast<char *>(req_buffer.data()), req_buffer_size);

auto value = PullSparseValue(num, dim);

value.DeserializeFromBytes(const_cast<void *>(data));

std::vector<float> res_data;
res_data.resize(num * table->value_accesor()->select_size() / sizeof(float));
table->pull_sparse(res_data.data(), keys, num);
res_data.resize(num * dim);
table->pull_sparse(res_data.data(), value);

cntl->response_attachment().append((char *)res_data.data(),
res_data.size() * sizeof(float));
return 0;
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/distributed/service/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,11 @@ void Communicator::RpcRecvSparse(const std::string &varname, int table_id,
push_g_vec.push_back(tensor->data<float>() + i * dim);
}

bool training = true;

auto status = _worker_ptr->pull_sparse(
(float **)push_g_vec.data(), table_id, // NOLINT
sparse_push_keys.data(), sparse_push_keys.size());
sparse_push_keys.data(), sparse_push_keys.size(), training);
status.wait();
return;
}
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/service/ps_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ class PSClient {
// future结束前keys和values缓冲区不能再次使用
// 整合多个线程请求的keys,聚集并分散发送到server
// 返回结果后,遍历buffer并对values赋值
// is_training 用于区分请求是训练/预测,server端对于特征和准入会有不同的处理.
virtual std::future<int32_t> pull_sparse(float **select_values,
size_t table_id,
const uint64_t *keys,
size_t num) = 0;
const uint64_t *keys, size_t num,
bool is_training) = 0;

virtual std::future<int32_t> print_table_stat(uint32_t table_id) = 0;

Expand Down
9 changes: 6 additions & 3 deletions paddle/fluid/distributed/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,16 @@ class GraphTable : public SparseTable {

Node *find_node(uint64_t id);

virtual int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) {
virtual int32_t pull_sparse(float *values,
const PullSparseValue &pull_value) {
return 0;
}

virtual int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) {
return 0;
}

virtual void clear() {}
virtual int32_t flush() { return 0; }
virtual int32_t shrink(const std::string &param) { return 0; }
Expand Down Expand Up @@ -140,5 +143,5 @@ class GraphTable : public SparseTable {

std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
};
}
};
} // namespace distributed
}; // namespace paddle
52 changes: 30 additions & 22 deletions paddle/fluid/distributed/table/common_sparse_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ int32_t CommonSparseTable::initialize_value() {
}

auto accessor = _config.accessor();

std::vector<uint64_t> feasigns;

for (size_t x = 0; x < accessor.fea_dim(); ++x) {
Expand All @@ -271,9 +270,14 @@ int32_t CommonSparseTable::initialize_value() {
std::vector<uint64_t> ids(bucket_feasigns);
std::copy(feasigns.begin() + buckets[x], feasigns.begin() + buckets[x + 1],
ids.begin());

std::vector<uint32_t> fres;
fres.resize(ids.size(), 1);

auto pull_value = PullSparseValue(ids, fres, param_dim_);
std::vector<float> pulls;
pulls.resize(bucket_feasigns * param_dim_);
pull_sparse(pulls.data(), ids.data(), bucket_feasigns);
pull_sparse(pulls.data(), pull_value);
}

return 0;
Expand Down Expand Up @@ -399,32 +403,36 @@ int32_t CommonSparseTable::pour() {
return 0;
}

int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys,
size_t num) {
int32_t CommonSparseTable::pull_sparse(float* pull_values,
const PullSparseValue& pull_value) {
rwlock_->RDLock();

std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);

for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}

std::vector<std::future<int>> tasks(task_pool_size_);
auto shard_num = task_pool_size_;
std::vector<std::future<int>> tasks(shard_num);

for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
for (int shard_id = 0; shard_id < shard_num; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &pull_values]() -> int {
[this, shard_id, shard_num, &pull_value, &pull_values]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];

for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i];
auto id = keys[offset];
auto* value = block->Init(id);
std::copy_n(value + param_offset_, param_dim_,
pull_values + param_dim_ * offset);
std::vector<int> offsets;
pull_value.Fission(shard_id, shard_num, &offsets);

if (pull_value.is_training_) {
for (auto& offset : offsets) {
auto feasign = pull_value.feasigns_[offset];
auto frequencie = pull_value.frequencies_[offset];
auto* value = block->Init(feasign, true, frequencie);
std::copy_n(value + param_offset_, param_dim_,
pull_values + param_dim_ * offset);
}
} else {
for (auto& offset : offsets) {
auto feasign = pull_value.feasigns_[offset];
auto* value = block->Init(feasign, false);
std::copy_n(value + param_offset_, param_dim_,
pull_values + param_dim_ * offset);
}
}

return 0;
Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/distributed/table/common_sparse_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class CommonSparseTable : public SparseTable {
int32_t save(const std::string& path, const std::string& param);

virtual std::pair<int64_t, int64_t> print_table_stat();
virtual int32_t pull_sparse(float* pull_values, const uint64_t* keys,
size_t num);
virtual int32_t pull_sparse(float* values, const PullSparseValue& pull_value);

virtual int32_t push_sparse(const uint64_t* keys, const float* values,
size_t num);
Expand Down
8 changes: 4 additions & 4 deletions paddle/fluid/distributed/table/common_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ class DenseTable : public Table {
virtual ~DenseTable() {}

virtual void *get_shard(size_t shard_idx) { return 0; }
int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) override {
int32_t pull_sparse(float *values,
const PullSparseValue &pull_value) override {
return 0;
}
int32_t push_sparse(const uint64_t *keys, const float *values,
Expand All @@ -123,8 +123,8 @@ class BarrierTable : public Table {

int32_t push_dense(const float *values, size_t num) override { return 0; }

int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) override {
int32_t pull_sparse(float *values,
const PullSparseValue &pull_value) override {
return 0;
}
int32_t push_sparse(const uint64_t *keys, const float *values,
Expand Down
Loading

1 comment on commit a537778

@paddle-bot-old
Copy link

@paddle-bot-old paddle-bot-old bot commented on a537778 Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🕵️ CI failures summary

🔍PR: #32103 Commit ID: a537778 contains failed CI.

Please sign in to comment.