diff --git a/src/plugins/intel_gpu/include/intel_gpu/plugin/sub_memory_manager.hpp b/src/plugins/intel_gpu/include/intel_gpu/plugin/sub_memory_manager.hpp index b02107ca880f99..cc70fbf85b0063 100644 --- a/src/plugins/intel_gpu/include/intel_gpu/plugin/sub_memory_manager.hpp +++ b/src/plugins/intel_gpu/include/intel_gpu/plugin/sub_memory_manager.hpp @@ -23,6 +23,7 @@ class SubMemoryManager { bool last_used; std::shared_ptr stream_ptr; std::vector recv_bufs; + std::vector remote_mems; std::vector events; cldnn::memory::ptr output; cldnn::layout layout; @@ -36,6 +37,7 @@ class SubMemoryManager { memory_info.last_used = false; memory_info.layout = cldnn::layout(); memory_info.recv_bufs.assign(_num_sub_streams, nullptr); + memory_info.remote_mems.assign(_num_sub_streams, nullptr); memory_info.events.assign(_num_sub_streams, nullptr); std::vector memorys; memorys.assign(_num_sub_streams, memory_info); diff --git a/src/plugins/intel_gpu/src/graph/impls/ocl/sync_tensor.cpp b/src/plugins/intel_gpu/src/graph/impls/ocl/sync_tensor.cpp index 3b992c6e8f87a4..19273c99ee9a42 100644 --- a/src/plugins/intel_gpu/src/graph/impls/ocl/sync_tensor.cpp +++ b/src/plugins/intel_gpu/src/graph/impls/ocl/sync_tensor.cpp @@ -574,6 +574,15 @@ struct sync_tensor_impl : public typed_primitive_impl { sync_tensor_impl() : parent() {} + ~sync_tensor_impl() { + for (auto& mem : all_gather_remote_dst) { + if (mem) { + release_remote_mems(static_cast(mem)); + } + } + all_gather_remote_dst.clear(); + } + explicit sync_tensor_impl(const sync_tensor_node& outer) { set_node_params(outer); } @@ -615,10 +624,6 @@ struct sync_tensor_impl : public typed_primitive_impl { copy_list[idx] = 0; // std::lock_guard lock(sub_mem_mgr->_flagMutex); sub_mem_mgr->_memorys_table[id][w_rank].events[idx] = nullptr; - // MUST release remote cl_mem, but it will cause remote map failed. - // cl_mem remote_mem = - // static_cast(sub_mem_mgr->_memorys_table[id][idx].remote_mem[w_rank]); - // clReleaseMemObject(remote_mem); } } } @@ -657,7 +662,7 @@ struct sync_tensor_impl : public typed_primitive_impl { } } - void update_internal_buffer(sync_tensor_inst& instance, + bool update_internal_buffer(sync_tensor_inst& instance, std::vector& bufs, cldnn::layout& last_layout, cldnn::layout& layout, @@ -665,6 +670,7 @@ struct sync_tensor_impl : public typed_primitive_impl { size_t w_rank) { auto& engine = instance.get_network().get_engine(); size_t required_size = layout.bytes_count(); + bool allocated = false; if (0) { std::lock_guard lock(debug_mutex); std::cout << "Before update_internal_buffer: " << std::endl; @@ -695,6 +701,7 @@ struct sync_tensor_impl : public typed_primitive_impl { size_t origin_size = bufs[i] != nullptr ? bufs[i]->size() : 0; bufs[i] = nullptr; bufs[i] = engine.allocate_memory(layout, cldnn::allocation_type::cl_mem, false); + allocated = true; if (debug_enable) { std::lock_guard lock(debug_mutex); std::cout << "tensor_sync allocate: rank[" << w_rank << "]: layout[" << i @@ -703,6 +710,19 @@ struct sync_tensor_impl : public typed_primitive_impl { << std::endl; } } + return allocated; + } + + void release_remote_mems(cl_mem remote_mems) { + if (remote_mems) { + size_t data_size = 0; + auto _cl_mem = static_cast(remote_mems); + if (debug_enable) { + clGetMemObjectInfo(_cl_mem, CL_MEM_SIZE, sizeof(size_t), &data_size, NULL); + std::cout << "clReleaseMemObject...size = " << data_size << std::endl; + } + clReleaseMemObject(_cl_mem); + } } event::ptr execute_impl(const std::vector& events, sync_tensor_inst& instance) override { @@ -713,6 +733,9 @@ struct sync_tensor_impl : public typed_primitive_impl { auto w_rank = instance.get_network().get_program()->get_config().subStreamExecConfig.get_rank()[0]; auto w_size = instance.get_network().get_program()->get_config().get_context_for_tp().size(); auto is_all_reduce = instance.get_impl_params()->need_add == true; + if (!is_all_reduce && all_gather_remote_dst.size() == 0) { + all_gather_remote_dst.assign(w_size, nullptr); + } auto start = perf_dump_start(); if (!pass_through_events) { for (auto e : events) { @@ -723,8 +746,9 @@ struct sync_tensor_impl : public typed_primitive_impl { std::string("rank[") + std::to_string(w_rank) + std::string("] sync_tensor wait events"), true); + auto sub_mem_mgr = instance.get_network().get_sub_mem_mgr(); - auto id = sub_mem_mgr->get_memory_id(w_rank); + auto id = 0; //sub_mem_mgr->get_memory_id(w_rank); sub_mem_mgr->set_memory_used(id, w_rank); auto start_1 = perf_dump_start(); while (true) { @@ -755,18 +779,19 @@ struct sync_tensor_impl : public typed_primitive_impl { auto& ocl_stream = downcast(stream); auto local_context = ocl_stream.get_engine().get_cl_context().get(); auto p2p_src_layout = instance.get_output_layout(0); + bool need_update_remote_mems = false; if (is_all_reduce) { OPENVINO_ASSERT(1 == instance.get_output_memorys().size(), "All reduce only has one output!"); sub_mem_mgr->_memorys_table[id][w_rank].recv_bufs[w_rank] = instance.get_output_memorys()[0]; sub_mem_mgr->_memorys_table[id][w_rank].output = instance.get_output_memorys()[0]; // Allocate or reuse buffer for P2P target, same shape with output[0] p2p_src_layout = instance.get_output_layout(0); - update_internal_buffer(instance, - sub_mem_mgr->_memorys_table[id][w_rank].recv_bufs, - sub_mem_mgr->_memorys_table[id][w_rank].layout, - p2p_src_layout, - w_size, - w_rank); + need_update_remote_mems = update_internal_buffer(instance, + sub_mem_mgr->_memorys_table[id][w_rank].recv_bufs, + sub_mem_mgr->_memorys_table[id][w_rank].layout, + p2p_src_layout, + w_size, + w_rank); } else { OPENVINO_ASSERT(2 == instance.get_output_memorys().size(), "All gather need additional buffer for concat result!"); @@ -774,9 +799,25 @@ struct sync_tensor_impl : public typed_primitive_impl { sub_mem_mgr->_memorys_table[id][w_rank].output = instance.get_output_memorys()[0]; // All gather doesn't need intermediate buffer at all. p2p_src_layout = instance.get_output_layout(1); + auto tmp = + std::dynamic_pointer_cast(instance.get_output_memorys()[0])->get_buffer().get(); + if (tmp != all_gather_current_dst) { + need_update_remote_mems = true; + all_gather_current_dst = tmp; + } } sub_mem_mgr->_memorys_table[id][w_rank].flag = true; + // The mapped remote cl_mem will hold the original cl_mem, it should be released if the original cl_mem has been + // released, else it will cause gpu memory leak. + if (need_update_remote_mems) { + if (debug_enable) { + std::cout << "release_remote_mems: old_layout = " + << sub_mem_mgr->_memorys_table[id][w_rank].layout.to_short_string() + << ", new_layout = " << p2p_src_layout.to_short_string() << std::endl; + } + } + std::vector wait_list(w_size, 1); auto start_2 = perf_dump_start(); wait_list[w_rank] = 0; // no need to wait for itself @@ -789,17 +830,34 @@ struct sync_tensor_impl : public typed_primitive_impl { int wait_size = 0; for (int idx = 0; idx < static_cast(w_size); idx++) { if (idx != w_rank && wait_list[idx] > 0 && sub_mem_mgr->_memorys_table[id][idx].flag) { - cldnn::memory::ptr dst_mem = nullptr; + cl_mem dst_cl_buf = nullptr; if (is_all_reduce) { - dst_mem = sub_mem_mgr->_memorys_table[id][idx].recv_bufs[w_rank]; + cldnn::memory::ptr dst_mem = sub_mem_mgr->_memorys_table[id][idx].recv_bufs[w_rank]; + data_size = dst_mem->size(); + auto dst_cl_buf_remote = + std::dynamic_pointer_cast(dst_mem)->get_buffer().get(); + dst_cl_buf = static_cast(sub_mem_mgr->_memorys_table[id][w_rank].remote_mems[idx]); + if (need_update_remote_mems || dst_cl_buf == nullptr) { + if (dst_cl_buf) { + release_remote_mems(dst_cl_buf); + } + dst_cl_buf = gpu_p2p_instance.map_remote_mem(local_context, dst_cl_buf_remote, data_size); + sub_mem_mgr->_memorys_table[id][w_rank].remote_mems[idx] = dst_cl_buf; + } } else { - dst_mem = sub_mem_mgr->_memorys_table[id][idx].output; + cldnn::memory::ptr dst_mem = sub_mem_mgr->_memorys_table[id][idx].output; + data_size = dst_mem->size(); + auto dst_cl_buf_remote = + std::dynamic_pointer_cast(dst_mem)->get_buffer().get(); + dst_cl_buf = static_cast(all_gather_remote_dst[idx]); + if (need_update_remote_mems || dst_cl_buf == nullptr) { + if (dst_cl_buf) { + release_remote_mems(dst_cl_buf); + } + dst_cl_buf = gpu_p2p_instance.map_remote_mem(local_context, dst_cl_buf_remote, data_size); + all_gather_remote_dst[idx] = dst_cl_buf; + } } - auto dst_cl_buf_remote = - std::dynamic_pointer_cast(dst_mem)->get_buffer().get(); - - data_size = dst_mem->size(); - auto dst_cl_buf = gpu_p2p_instance.map_remote_mem(local_context, dst_cl_buf_remote, data_size); auto p2p_data_size = p2p_src_layout.bytes_count(); { gpu_lock.acquire(); @@ -909,6 +967,9 @@ struct sync_tensor_impl : public typed_primitive_impl { static std::unique_ptr create(const sync_tensor_node& arg, const kernel_impl_params& impl_param) { return make_unique(); } + + std::vector all_gather_remote_dst; + cl_mem all_gather_current_dst; }; namespace detail { diff --git a/src/plugins/intel_gpu/tests/unit/transformations/fc_all_reduce_test.cpp b/src/plugins/intel_gpu/tests/unit/transformations/fc_all_reduce_test.cpp index e1998646aaf2e7..b1f698b69b68dd 100644 --- a/src/plugins/intel_gpu/tests/unit/transformations/fc_all_reduce_test.cpp +++ b/src/plugins/intel_gpu/tests/unit/transformations/fc_all_reduce_test.cpp @@ -55,7 +55,7 @@ TEST(TransformationTestsF1, FullyConnectedSplitInput11) { // -------- Loading a model to the device -------- ov::Core core; - ov::CompiledModel compiled_model = core.compile_model(model, "GPU"); + ov::CompiledModel compiled_model = core.compile_model(model, "GPU", ov::device::priorities("GPU.0,GPU.1")); // -------- Create an infer request -------- ov::InferRequest infer_request = compiled_model.create_infer_request(); @@ -141,7 +141,7 @@ TEST(TransformationTestsF1, FullyConnectedSplitInput16) { // -------- Loading a model to the device -------- ov::Core core; - ov::CompiledModel compiled_model = core.compile_model(model, "GPU"); + ov::CompiledModel compiled_model = core.compile_model(model, "GPU", ov::device::priorities("GPU.0,GPU.1")); // -------- Create an infer request -------- ov::InferRequest infer_request = compiled_model.create_infer_request(); @@ -183,12 +183,12 @@ TEST(TransformationTestsF1, FullyConnectedSplitInput1024) { { // -------- Construct model unsigned long m = 1024, k = 2048, n = 13696; - auto input1 = std::make_shared(ov::element::f32, ov::Shape{m, n}); + auto input1 = std::make_shared(ov::element::f32, ov::Shape{m, k}); std::vector input_data(m * k, 1); - std::vector weights(k * n, 2); + std::vector weights(n * k, 2); std::vector result(m * n, 2); - auto input2 = ov::op::v0::Constant::create(ov::element::f32, ov::Shape{k, n}, weights.data()); + auto input2 = ov::op::v0::Constant::create(ov::element::f32, ov::Shape{n, k}, weights.data()); std::cout << "input_shape: " << m << " * " << k << std::endl; std::cout << "weight_shape: " << k << " * " << n << std::endl; @@ -204,7 +204,7 @@ TEST(TransformationTestsF1, FullyConnectedSplitInput1024) { // -------- Loading a model to the device -------- ov::Core core; - ov::CompiledModel compiled_model = core.compile_model(model, "GPU"); + ov::CompiledModel compiled_model = core.compile_model(model, "GPU", ov::device::priorities("GPU.0,GPU.1")); // -------- Create an infer request -------- ov::InferRequest infer_request = compiled_model.create_infer_request();