diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 36f2066c9abed7..e06b8028c9c730 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -227,14 +227,20 @@ bool PipelineTask::_wait_to_start() { _blocked_dep = _execution_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { static_cast(_blocked_dep)->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } for (auto* op_dep : _filter_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } return false; @@ -249,7 +255,10 @@ bool PipelineTask::_is_blocked() { _blocked_dep = dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators. @@ -268,7 +277,10 @@ bool PipelineTask::_is_blocked() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } return false; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index a60512bab48b39..36362e15813238 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -90,7 +90,7 @@ class PipelineTask { _blocked_dep = fin_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + return true; } } return false;