@@ -52,28 +52,16 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) {
52
52
return Status::OK ();
53
53
}
54
54
55
- Status LocalExchangeSinkLocalState::close (RuntimeState* state, Status exec_status) {
56
- if (_closed) {
57
- return Status::OK ();
58
- }
59
- RETURN_IF_ERROR (Base::close (state, exec_status));
60
- if (exec_status.ok ()) {
61
- DCHECK (_release_count) << " Do not finish correctly! " << debug_string (0 );
62
- }
63
- return Status::OK ();
64
- }
65
-
66
55
std::string LocalExchangeSinkLocalState::debug_string (int indentation_level) const {
67
56
fmt::memory_buffer debug_string_buffer;
68
57
fmt::format_to (debug_string_buffer,
69
58
" {}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, "
70
59
" _num_senders: {}, _num_sources: {}, "
71
- " _running_sink_operators: {}, _running_source_operators: {}, _release_count: {} " ,
60
+ " _running_sink_operators: {}, _running_source_operators: {}" ,
72
61
Base::debug_string (indentation_level),
73
62
_parent->cast <LocalExchangeSinkOperatorX>()._use_global_shuffle , _channel_id,
74
63
_exchanger->_num_partitions , _exchanger->_num_senders , _exchanger->_num_sources ,
75
- _exchanger->_running_sink_operators , _exchanger->_running_source_operators ,
76
- _release_count);
64
+ _exchanger->_running_sink_operators , _exchanger->_running_source_operators );
77
65
return fmt::to_string (debug_string_buffer);
78
66
}
79
67
@@ -121,13 +109,11 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
121
109
122
110
// If all exchange sources ended due to limit reached, current task should also finish
123
111
if (local_state._exchanger ->_running_source_operators == 0 ) {
124
- local_state._release_count = true ;
125
112
local_state._shared_state ->sub_running_sink_operators ();
126
113
return Status::EndOfFile (" receiver eof" );
127
114
}
128
115
if (eos) {
129
116
local_state._shared_state ->sub_running_sink_operators ();
130
- local_state._release_count = true ;
131
117
}
132
118
133
119
return Status::OK ();
0 commit comments