-
Notifications
You must be signed in to change notification settings - Fork 745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Export env to python #7792
Export env to python #7792
Conversation
oneflow/core/vm/virtual_machine.cpp
Outdated
@@ -199,6 +202,15 @@ Maybe<void> VirtualMachine::Receive(vm::InstructionMsgList* instr_list) { | |||
// `ComputeInFuseMode` will be replaced by `Compute` soon. | |||
instr_msg->mut_instr_type_id()->instruction_type().ComputeInFuseMode(instr_msg); | |||
} | |||
} else if (IsShuttingDown()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在shutting down阶段直接在main线程里处理指令。
@@ -210,7 +210,7 @@ def is_deprecated(func_or_class): | |||
|
|||
if not env_util.HasAllMultiClientEnvVars(): | |||
env_util.SetDefaultMultiClientEnvVars() | |||
env_util.api_env_init() | |||
_oneflow_global_unique_env_ = env_util.create_env() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
最核心改动。
@@ -13,17 +13,6 @@ | |||
See the License for the specific language governing permissions and | |||
limitations under the License. | |||
""" | |||
from oneflow.framework.env_util import api_all_device_placement as all_device_placement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这些接口都是过时的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all_device_placement 这个接口不是过时的,不能删除
class TestCallWhenShuttingDown: | ||
def __init__(self): | ||
tensor = oneflow.ones((2, 2)) | ||
print(tensor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果把这一行注释,行为就会和pytorch不一致,pytorch会执行成功,oneflow会报错。
但是这一问题和本次pr肯定没关系,我们新开issue讨论这一问题。
寻找在shutting down环节执行torch代码的规律首先考察pytorch。 # script0
import torch
device_type = "cpu"
class Foo:
def __init__(self):
pass
def __del__(self):
tensor = torch.ones((8, 8), device=torch.device(device_type))
print(tensor)
foo = Foo() 上述示例代码能正常工作,输入如下:
如果把device_type改成gpu,也就是示例代码如: # script1
import torch
device_type = "cuda"
class Foo:
def __init__(self):
pass
def __del__(self):
tensor = torch.ones((8, 8), device=torch.device(device_type))
print(tensor)
foo = Foo() 这就不能工作,输出如下:
但如果我们在外层作用域先执行一次torch.ones,示例代码如: # script2
import torch
device_type = "cuda"
torch.ones((32, 32), device=torch.device(device_type))
class Foo:
def __init__(self):
pass
def __del__(self):
tensor = torch.ones((8, 8), device=torch.device(device_type))
print(tensor)
foo = Foo() 这又是能正常工作的,同样会正常输出:
猜测原因猜测背后的规则应该非常简单:在shutting down的环节不能再实质import新的module,我们可以这样解释上述现象:
最终的原因只能查看python的文档或者代码。 |
关于py::gil_scoped_acquired在非主线程调用的问题我单独测试表明,py::gil_scoped_acquired可以在python解释器退出时,可以安全地在非主线程里调用。 // example.cpp
#include <pybind11/pybind11.h>
#include <pybind11/functional.h>
#include <thread>
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <chrono>
void TestGILInNonMainThread() {
std::mutex mutex;
std::condition_variable cond;
std::thread thread([&]{
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, []{ return true; });
std::cerr << "before_gil_scoped_acquire" << " ... ";
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
pybind11::gil_scoped_acquire lock_gil{};
std::cerr << "after_gil_scoped_acquire" << std::endl;
});
cond.notify_one();
pybind11::gil_scoped_release unlock_gil{};
thread.join();
}
PYBIND11_MODULE(example, m) {
m.def("TestGILInNonMainThread", &TestGILInNonMainThread);
} g++ -O3 -Wall -shared -std=c++11 -fPIC $(python3 -m pybind11 --includes) example.cpp -o example$(python3-config --extension-suffix) # a.py
import example
class Foo:
def __init__(self):
pass
def __del__(self):
example.TestGILInNonMainThread()
foo = Foo() 最后输出表明py::gil_scoped_acquired正常工作。 $ python3 a.py
before_gil_scoped_acquire ... after_gil_scoped_acquire |
已经移除了virtual_machine.cpp对shutting down的依赖,回滚到master的逻辑。 |
env_util.api_env_init() | ||
_unittest_env_initilized = True | ||
|
||
TestCase = unittest.TestCase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
目前我们的TestCase相较于基类unittest.TestCase不需要多余的操作,所以直接导出。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@strint @caishenghang
彻底调查清楚py::gil_scoped_acquire 在python finalization阶段中被non-main调用的问题pybind11 的这个链接 pybind/pybind11#3274 已经完全说明清楚了问题,而且与我们的观察完全一致。 剩下的问题是为什么本pr上述 #7792 (comment) 又莫名其妙的正常工作,原因是上述代码使用的python版本是3.6,出问题的python版本是3.8。如果上述代码用如下编译方式编译出python包: g++ -O3 -Wall -shared -std=c++11 -fPIC $(python3.8 -m pybind11 --includes) example.cpp -o example$(python3.8-config --extension-suffix) 再执行python3 a.py就会复现这一python自身的BUG。
|
这个bug来源于python本身, https://bugs.python.org/issue42969 , 关联的PR:python/cpython#28525 还没合并,看起来他们还没就处理办法达成一致。也就是即使升级到python 3.11 也不能解决问题。 而我们要兼容Python 3.6, 3.7, 3.8, 3.9, 3.10,所以使用atexit来避免Python的这个bug是长期的。 |
c072305
to
86296cb
Compare
1873471
to
454f5e7
Compare
env_api.h is deleted by master
if (is_normal_exit) { | ||
JUST(vm::ClusterSync()); | ||
auto* vm = JUST(GlobalMaybe<VirtualMachine>()); | ||
JUST(vm->CloseVMThreads()); | ||
} | ||
JUST(env->init_is_normal_exit(is_normal_exit)); | ||
SetShuttingDown(true); | ||
return Maybe<void>::Ok(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
旧版的逻辑写在python层。如果遇到系统异常退出,则完全不执行DeleteEnv。为了对齐此逻辑,我们让EnvGlobalObjectsScope的析构在!is_normal_exit的时候不执行那一系列的Global::Delete();
@@ -229,6 +229,7 @@ Maybe<void> EnvGlobalObjectsScope::Init(const EnvProto& env_proto) { | |||
} | |||
|
|||
EnvGlobalObjectsScope::~EnvGlobalObjectsScope() { | |||
if (is_normal_exit_.has_value() && !CHECK_JUST(is_normal_exit_)) { return; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
也许应该命名为is_abnormal_exit
std::mutex pending_instruction_mutex_; | ||
PendingInstructionMutexedList pending_instruction_list_; | ||
Notifier notifier_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
完全去掉channel,使用list + notifier代替。原因是我们在finalization阶段会结束worker线程,让指令在main线程运行,channel和线程绑定得过深了,必须关闭channel才能让worker线程退出,而一旦关闭了channel,其他线程就没法再通过channel发送指令。而list + notifier相当于拆解了channel的功能,关闭notifier才能让线程退出,之后list可以继续使用。
while (thread_ctx->mut_notifier()->WaitAndClearNotifiedCnt() == kNotifierStatusSuccess) { | ||
while (thread_ctx->TryReceiveAndRun()) {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
此处的逻辑非常类似scheduler线程和callback线程的处理。
@@ -115,7 +118,7 @@ VirtualMachine::VirtualMachine(const Resource& resource, int64_t this_machine_id | |||
// In order to notify threads in VirtualMachineEngine, a notify callback lambda should be take as | |||
// an argument for VirtualMachineEngine's constructor. | |||
vm_ = intrusive::make_shared<vm::VirtualMachineEngine>( | |||
vm::MakeVmDesc(resource, this_machine_id).Get(), [this]() { callback_notifier_.Notify(); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callback_notifier_被ScheduleCtx代替了。
Maybe<void> VirtualMachine::CloseVMThreads() { | ||
CHECK_OR_RETURN(!vm_threads_closed_); | ||
ControlSync(); | ||
pending_notifier_.Close(); | ||
schedule_thread_.join(); | ||
CHECK(!vm_); | ||
vm_threads_closed_ = true; | ||
return Maybe<void>::Ok(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
关闭VMThread线程,从此之后vm将以单线程的方式执行。
这部分的功能从VirtualMachine的析构里单独抽取出来,供python的atexit调用。
@@ -199,6 +212,8 @@ Maybe<void> VirtualMachine::Receive(vm::InstructionMsgList* instr_list) { | |||
// `ComputeInFuseMode` will be replaced by `Compute` soon. | |||
instr_msg->mut_instr_type_id()->instruction_type().ComputeInFuseMode(instr_msg); | |||
} | |||
} else if (unlikely(vm_threads_closed_)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vm_threads_closed_在CloseVMThreads里被置为true
void OnGarbageMsgPending() const override { vm_->Callback(); } | ||
void OnWorkerLoadPending(vm::ThreadCtx* thread_ctx) const override { | ||
while (thread_ctx->TryReceiveAndRun() > 0) {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
一旦接到任务,都是原地执行。
if hook.is_normal_exit(): | ||
oneflow._oneflow_internal.DestroyEnv() | ||
oneflow._oneflow_internal.SetShuttingDown() | ||
_oneflow_global_unique_env_.SwitchToShuttingDownPhase(hook.is_normal_exit()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
上边删掉的逻辑都放置在SwitchToShuttingDownPhase函数里。
… into export_env_to_python
Static analysis with clang failed. PR label automerge has been removed |
Speed stats:
|
Speed stats:
|
CI failed when running job: cuda-speed-test. PR label automerge has been removed |
Speed stats:
|
View latest API docs preview at: https://staging.oneflow.info/docs/Oneflow-Inc/oneflow/pr/7792/ |
oneflow的env生命周期应该是一个被oneflow python模块持有的对象,这样它的生命周期会持续到python解释器结束。