@@ -71,6 +71,17 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
71
71
RETURN_IF_ERROR (state->exec_env ()->result_mgr ()->create_sender (
72
72
state->fragment_instance_id (), p._result_sink_buffer_size_rows , &_sender, true , state));
73
73
((PipBufferControlBlock*)_sender.get ())->set_dependency (_dependency->shared_from_this ());
74
+
75
+ _output_vexpr_ctxs.resize (p._output_vexpr_ctxs .size ());
76
+ for (size_t i = 0 ; i < _output_vexpr_ctxs.size (); i++) {
77
+ RETURN_IF_ERROR (p._output_vexpr_ctxs [i]->clone (state, _output_vexpr_ctxs[i]));
78
+ }
79
+ if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
80
+ std::shared_ptr<arrow::Schema> arrow_schema;
81
+ RETURN_IF_ERROR (get_arrow_schema_from_expr_ctxs (_output_vexpr_ctxs, &arrow_schema,
82
+ state->timezone ()));
83
+ _sender->register_arrow_schema (arrow_schema);
84
+ }
74
85
return Status::OK ();
75
86
}
76
87
@@ -79,10 +90,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
79
90
SCOPED_TIMER (_open_timer);
80
91
RETURN_IF_ERROR (Base::open (state));
81
92
auto & p = _parent->cast <ResultSinkOperatorX>();
82
- _output_vexpr_ctxs.resize (p._output_vexpr_ctxs .size ());
83
- for (size_t i = 0 ; i < _output_vexpr_ctxs.size (); i++) {
84
- RETURN_IF_ERROR (p._output_vexpr_ctxs [i]->clone (state, _output_vexpr_ctxs[i]));
85
- }
86
93
// create writer based on sink type
87
94
switch (p._sink_type ) {
88
95
case TResultSinkType::MYSQL_PROTOCAL: {
@@ -96,10 +103,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
96
103
break ;
97
104
}
98
105
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
99
- std::shared_ptr<arrow::Schema> arrow_schema;
100
- RETURN_IF_ERROR (get_arrow_schema_from_expr_ctxs (_output_vexpr_ctxs, &arrow_schema,
101
- state->timezone ()));
102
- _sender->register_arrow_schema (arrow_schema);
103
106
_writer.reset (new (std::nothrow) vectorized::VArrowFlightResultWriter (
104
107
_sender.get (), _output_vexpr_ctxs, _profile));
105
108
break ;
0 commit comments