-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fleet_executor] Interceptor run op #37623
[fleet_executor] Interceptor run op #37623
Conversation
Thanks for your contribution! |
while (IsInputReady() && CanWriteOutput()) { | ||
// If there is no limit, source interceptor can be executed | ||
// an unlimited number of times. | ||
// Now source node can only run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个annotation是不是没写完😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
额,写了的,不知道咋漏了
// buffer is using | ||
if (out_buff.second.second != 0) return; | ||
} | ||
step_ = 0; // reset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
咱们有RESET
message呀,这个置0应该是消息驱动的吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
除了source和若干特殊interceptor,大部分interceptor都是不需要reset的。如果需要reset,也是需要像stop那样,否则还没跑完就给reset了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个reset不应该放在carrier.start()里么,每一次外部调用fleet_executor.run()的时候,这个step就应该reset呀。这个step不是标明当前ministep里,跑了多少个micro step么。而且在运行过程中,reset source interceptor有什么特殊含义?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
那这个reset也得有一个数据流的流动,和当前的并不冲突。这一步的判断总归是要做的,只不过再加个reset的条件触发。否则刚执行完一个micro_step,来了个reset消息,还没执行完就给reset了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的,应该加一个条件,不过,为啥会刚执行完一个mirco step就会收到reset 🤨
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
那在开头加上一个step的reset吗,没有下一步了咋办O_o
// source compute node, should we add a new SourceInterceptor? | ||
if (upstream.empty()) { | ||
is_source_ = true; | ||
PADDLE_ENFORCE_GT(node_->max_run_times(), 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为啥这儿有一个判断,这个东西不是上层指定的,他说跑0次就跑0次呗
而且,如果这个要判断,是不是只要有下游的interceptor都要判断一下,因为都需要至少跑一次才可以触发下游
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source interceptor,in degree为0,没有上游依赖,需要指定跑多少次,跑0次肯定是错的,这个max_run_times其实也是不准确的,应该就是run_times。 其它的compute interceptor,并不需要知道跑多少次,上游给消息就跑,除非有特殊功能,是不需要max_run_times的。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reasonable
attrs["shape"] = framework::vectorize<int>({2, 3}); | ||
attrs["value"] = 1.0f; | ||
|
||
auto zero_op = framework::OpRegistry::CreateOp("fill_constant", {}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为啥叫zero呀🤔,值不是1么
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
从别处copy的,木有改 ╮(╯_╰)╭
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😂,all right
framework::AttributeMap()); | ||
|
||
// NOTE: don't delete | ||
return {zero_op.release(), op.release()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dangling ptr++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没有和线程或者carrier绑定起来,析构了就访问非法地址了-。-
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR types
Others
PR changes
Others
Describe
Interceptor run op