@@ -104,33 +104,14 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) {
104
104
return Status::OK ();
105
105
}
106
106
107
- Status LocalExchangeSinkLocalState::close (RuntimeState* state, Status exec_status) {
108
- if (_closed) {
109
- return Status::OK ();
110
- }
111
- RETURN_IF_ERROR (Base::close (state, exec_status));
112
- if (exec_status.ok ()) {
113
- DCHECK (_release_count || _exchanger == nullptr ||
114
- _exchanger->_running_source_operators == 0 )
115
- << " Do not finish correctly! " << debug_string (0 )
116
- << " state: { cancel = " << state->is_cancelled () << " , "
117
- << state->cancel_reason ().to_string ()
118
- << " } query ctx: { cancel = " << state->get_query_ctx ()->is_cancelled () << " , "
119
- << state->get_query_ctx ()->exec_status ().to_string ()
120
- << " } Exchanger: " << (void *)_exchanger;
121
- }
122
- return Status::OK ();
123
- }
124
-
125
107
std::string LocalExchangeSinkLocalState::debug_string (int indentation_level) const {
126
108
fmt::memory_buffer debug_string_buffer;
127
109
fmt::format_to (debug_string_buffer,
128
110
" {}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
129
- " _running_sink_operators: {}, _running_source_operators: {}, _release_count: {} " ,
111
+ " _running_sink_operators: {}, _running_source_operators: {}" ,
130
112
Base::debug_string (indentation_level), _channel_id, _exchanger->_num_partitions ,
131
113
_exchanger->_num_senders , _exchanger->_num_sources ,
132
- _exchanger->_running_sink_operators , _exchanger->_running_source_operators ,
133
- _release_count);
114
+ _exchanger->_running_sink_operators , _exchanger->_running_source_operators );
134
115
return fmt::to_string (debug_string_buffer);
135
116
}
136
117
@@ -143,13 +124,11 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
143
124
144
125
// If all exchange sources ended due to limit reached, current task should also finish
145
126
if (local_state._exchanger ->_running_source_operators == 0 ) {
146
- local_state._release_count = true ;
147
127
local_state._shared_state ->sub_running_sink_operators ();
148
128
return Status::EndOfFile (" receiver eof" );
149
129
}
150
130
if (eos) {
151
131
local_state._shared_state ->sub_running_sink_operators ();
152
- local_state._release_count = true ;
153
132
}
154
133
155
134
return Status::OK ();
0 commit comments