Skip to content

Commit

Permalink
Merge pull request openvinotoolkit#37 from riverlijunjie/river/fix_gp…
Browse files Browse the repository at this point in the history
…u_mem_map_leak

Fix gpu memory leak caused by mem map
  • Loading branch information
WeldonWangwang authored Oct 8, 2024
2 parents 66f88b5 + c8438d8 commit 8402627
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class SubMemoryManager {
bool last_used;
std::shared_ptr<cldnn::stream> stream_ptr;
std::vector<cldnn::memory::ptr> recv_bufs;
std::vector<void*> remote_mems;
std::vector<cldnn::event::ptr> events;
cldnn::memory::ptr output;
cldnn::layout layout;
Expand All @@ -36,6 +37,7 @@ class SubMemoryManager {
memory_info.last_used = false;
memory_info.layout = cldnn::layout();
memory_info.recv_bufs.assign(_num_sub_streams, nullptr);
memory_info.remote_mems.assign(_num_sub_streams, nullptr);
memory_info.events.assign(_num_sub_streams, nullptr);
std::vector<MemoryInfo> memorys;
memorys.assign(_num_sub_streams, memory_info);
Expand Down
101 changes: 81 additions & 20 deletions src/plugins/intel_gpu/src/graph/impls/ocl/sync_tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,15 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {

sync_tensor_impl() : parent() {}

~sync_tensor_impl() {
for (auto& mem : all_gather_remote_dst) {
if (mem) {
release_remote_mems(static_cast<cl_mem>(mem));
}
}
all_gather_remote_dst.clear();
}

explicit sync_tensor_impl(const sync_tensor_node& outer) {
set_node_params(outer);
}
Expand Down Expand Up @@ -615,10 +624,6 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
copy_list[idx] = 0;
// std::lock_guard<std::mutex> lock(sub_mem_mgr->_flagMutex);
sub_mem_mgr->_memorys_table[id][w_rank].events[idx] = nullptr;
// MUST release remote cl_mem, but it will cause remote map failed.
// cl_mem remote_mem =
// static_cast<cl_mem>(sub_mem_mgr->_memorys_table[id][idx].remote_mem[w_rank]);
// clReleaseMemObject(remote_mem);
}
}
}
Expand Down Expand Up @@ -657,14 +662,15 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
}
}

void update_internal_buffer(sync_tensor_inst& instance,
bool update_internal_buffer(sync_tensor_inst& instance,
std::vector<cldnn::memory::ptr>& bufs,
cldnn::layout& last_layout,
cldnn::layout& layout,
size_t w_size,
size_t w_rank) {
auto& engine = instance.get_network().get_engine();
size_t required_size = layout.bytes_count();
bool allocated = false;
if (0) {
std::lock_guard<std::mutex> lock(debug_mutex);
std::cout << "Before update_internal_buffer: " << std::endl;
Expand Down Expand Up @@ -695,6 +701,7 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
size_t origin_size = bufs[i] != nullptr ? bufs[i]->size() : 0;
bufs[i] = nullptr;
bufs[i] = engine.allocate_memory(layout, cldnn::allocation_type::cl_mem, false);
allocated = true;
if (debug_enable) {
std::lock_guard<std::mutex> lock(debug_mutex);
std::cout << "tensor_sync allocate: rank[" << w_rank << "]: layout[" << i
Expand All @@ -703,6 +710,19 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
<< std::endl;
}
}
return allocated;
}

void release_remote_mems(cl_mem remote_mems) {
if (remote_mems) {
size_t data_size = 0;
auto _cl_mem = static_cast<cl_mem>(remote_mems);
if (debug_enable) {
clGetMemObjectInfo(_cl_mem, CL_MEM_SIZE, sizeof(size_t), &data_size, NULL);
std::cout << "clReleaseMemObject...size = " << data_size << std::endl;
}
clReleaseMemObject(_cl_mem);
}
}

event::ptr execute_impl(const std::vector<event::ptr>& events, sync_tensor_inst& instance) override {
Expand All @@ -713,6 +733,9 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
auto w_rank = instance.get_network().get_program()->get_config().subStreamExecConfig.get_rank()[0];
auto w_size = instance.get_network().get_program()->get_config().get_context_for_tp().size();
auto is_all_reduce = instance.get_impl_params()->need_add == true;
if (!is_all_reduce && all_gather_remote_dst.size() == 0) {
all_gather_remote_dst.assign(w_size, nullptr);
}
auto start = perf_dump_start();
if (!pass_through_events) {
for (auto e : events) {
Expand All @@ -723,8 +746,9 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
std::string("rank[") + std::to_string(w_rank) + std::string("] sync_tensor wait events"),
true);


auto sub_mem_mgr = instance.get_network().get_sub_mem_mgr();
auto id = sub_mem_mgr->get_memory_id(w_rank);
auto id = 0; //sub_mem_mgr->get_memory_id(w_rank);
sub_mem_mgr->set_memory_used(id, w_rank);
auto start_1 = perf_dump_start();
while (true) {
Expand Down Expand Up @@ -755,28 +779,45 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
auto& ocl_stream = downcast<ocl::ocl_stream>(stream);
auto local_context = ocl_stream.get_engine().get_cl_context().get();
auto p2p_src_layout = instance.get_output_layout(0);
bool need_update_remote_mems = false;
if (is_all_reduce) {
OPENVINO_ASSERT(1 == instance.get_output_memorys().size(), "All reduce only has one output!");
sub_mem_mgr->_memorys_table[id][w_rank].recv_bufs[w_rank] = instance.get_output_memorys()[0];
sub_mem_mgr->_memorys_table[id][w_rank].output = instance.get_output_memorys()[0];
// Allocate or reuse buffer for P2P target, same shape with output[0]
p2p_src_layout = instance.get_output_layout(0);
update_internal_buffer(instance,
sub_mem_mgr->_memorys_table[id][w_rank].recv_bufs,
sub_mem_mgr->_memorys_table[id][w_rank].layout,
p2p_src_layout,
w_size,
w_rank);
need_update_remote_mems = update_internal_buffer(instance,
sub_mem_mgr->_memorys_table[id][w_rank].recv_bufs,
sub_mem_mgr->_memorys_table[id][w_rank].layout,
p2p_src_layout,
w_size,
w_rank);
} else {
OPENVINO_ASSERT(2 == instance.get_output_memorys().size(),
"All gather need additional buffer for concat result!");
sub_mem_mgr->_memorys_table[id][w_rank].recv_bufs[w_rank] = instance.get_output_memorys()[1];
sub_mem_mgr->_memorys_table[id][w_rank].output = instance.get_output_memorys()[0];
// All gather doesn't need intermediate buffer at all.
p2p_src_layout = instance.get_output_layout(1);
auto tmp =
std::dynamic_pointer_cast<const ocl::gpu_buffer>(instance.get_output_memorys()[0])->get_buffer().get();
if (tmp != all_gather_current_dst) {
need_update_remote_mems = true;
all_gather_current_dst = tmp;
}
}
sub_mem_mgr->_memorys_table[id][w_rank].flag = true;

// The mapped remote cl_mem will hold the original cl_mem, it should be released if the original cl_mem has been
// released, else it will cause gpu memory leak.
if (need_update_remote_mems) {
if (debug_enable) {
std::cout << "release_remote_mems: old_layout = "
<< sub_mem_mgr->_memorys_table[id][w_rank].layout.to_short_string()
<< ", new_layout = " << p2p_src_layout.to_short_string() << std::endl;
}
}

std::vector<int> wait_list(w_size, 1);
auto start_2 = perf_dump_start();
wait_list[w_rank] = 0; // no need to wait for itself
Expand All @@ -789,17 +830,34 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
int wait_size = 0;
for (int idx = 0; idx < static_cast<int>(w_size); idx++) {
if (idx != w_rank && wait_list[idx] > 0 && sub_mem_mgr->_memorys_table[id][idx].flag) {
cldnn::memory::ptr dst_mem = nullptr;
cl_mem dst_cl_buf = nullptr;
if (is_all_reduce) {
dst_mem = sub_mem_mgr->_memorys_table[id][idx].recv_bufs[w_rank];
cldnn::memory::ptr dst_mem = sub_mem_mgr->_memorys_table[id][idx].recv_bufs[w_rank];
data_size = dst_mem->size();
auto dst_cl_buf_remote =
std::dynamic_pointer_cast<const ocl::gpu_buffer>(dst_mem)->get_buffer().get();
dst_cl_buf = static_cast<cl_mem>(sub_mem_mgr->_memorys_table[id][w_rank].remote_mems[idx]);
if (need_update_remote_mems || dst_cl_buf == nullptr) {
if (dst_cl_buf) {
release_remote_mems(dst_cl_buf);
}
dst_cl_buf = gpu_p2p_instance.map_remote_mem(local_context, dst_cl_buf_remote, data_size);
sub_mem_mgr->_memorys_table[id][w_rank].remote_mems[idx] = dst_cl_buf;
}
} else {
dst_mem = sub_mem_mgr->_memorys_table[id][idx].output;
cldnn::memory::ptr dst_mem = sub_mem_mgr->_memorys_table[id][idx].output;
data_size = dst_mem->size();
auto dst_cl_buf_remote =
std::dynamic_pointer_cast<const ocl::gpu_buffer>(dst_mem)->get_buffer().get();
dst_cl_buf = static_cast<cl_mem>(all_gather_remote_dst[idx]);
if (need_update_remote_mems || dst_cl_buf == nullptr) {
if (dst_cl_buf) {
release_remote_mems(dst_cl_buf);
}
dst_cl_buf = gpu_p2p_instance.map_remote_mem(local_context, dst_cl_buf_remote, data_size);
all_gather_remote_dst[idx] = dst_cl_buf;
}
}
auto dst_cl_buf_remote =
std::dynamic_pointer_cast<const ocl::gpu_buffer>(dst_mem)->get_buffer().get();

data_size = dst_mem->size();
auto dst_cl_buf = gpu_p2p_instance.map_remote_mem(local_context, dst_cl_buf_remote, data_size);
auto p2p_data_size = p2p_src_layout.bytes_count();
{
gpu_lock.acquire();
Expand Down Expand Up @@ -909,6 +967,9 @@ struct sync_tensor_impl : public typed_primitive_impl<sync_tensor> {
static std::unique_ptr<primitive_impl> create(const sync_tensor_node& arg, const kernel_impl_params& impl_param) {
return make_unique<sync_tensor_impl>();
}

std::vector<void*> all_gather_remote_dst;
cl_mem all_gather_current_dst;
};

namespace detail {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TEST(TransformationTestsF1, FullyConnectedSplitInput11) {

// -------- Loading a model to the device --------
ov::Core core;
ov::CompiledModel compiled_model = core.compile_model(model, "GPU");
ov::CompiledModel compiled_model = core.compile_model(model, "GPU", ov::device::priorities("GPU.0,GPU.1"));

// -------- Create an infer request --------
ov::InferRequest infer_request = compiled_model.create_infer_request();
Expand Down Expand Up @@ -141,7 +141,7 @@ TEST(TransformationTestsF1, FullyConnectedSplitInput16) {

// -------- Loading a model to the device --------
ov::Core core;
ov::CompiledModel compiled_model = core.compile_model(model, "GPU");
ov::CompiledModel compiled_model = core.compile_model(model, "GPU", ov::device::priorities("GPU.0,GPU.1"));

// -------- Create an infer request --------
ov::InferRequest infer_request = compiled_model.create_infer_request();
Expand Down Expand Up @@ -183,12 +183,12 @@ TEST(TransformationTestsF1, FullyConnectedSplitInput1024) {
{
// -------- Construct model
unsigned long m = 1024, k = 2048, n = 13696;
auto input1 = std::make_shared<ov::op::v0::Parameter>(ov::element::f32, ov::Shape{m, n});
auto input1 = std::make_shared<ov::op::v0::Parameter>(ov::element::f32, ov::Shape{m, k});
std::vector<float> input_data(m * k, 1);
std::vector<float> weights(k * n, 2);
std::vector<float> weights(n * k, 2);
std::vector<float> result(m * n, 2);

auto input2 = ov::op::v0::Constant::create(ov::element::f32, ov::Shape{k, n}, weights.data());
auto input2 = ov::op::v0::Constant::create(ov::element::f32, ov::Shape{n, k}, weights.data());

std::cout << "input_shape: " << m << " * " << k << std::endl;
std::cout << "weight_shape: " << k << " * " << n << std::endl;
Expand All @@ -204,7 +204,7 @@ TEST(TransformationTestsF1, FullyConnectedSplitInput1024) {

// -------- Loading a model to the device --------
ov::Core core;
ov::CompiledModel compiled_model = core.compile_model(model, "GPU");
ov::CompiledModel compiled_model = core.compile_model(model, "GPU", ov::device::priorities("GPU.0,GPU.1"));

// -------- Create an infer request --------
ov::InferRequest infer_request = compiled_model.create_infer_request();
Expand Down

0 comments on commit 8402627

Please sign in to comment.