1
+ // Licensed to the Apache Software Foundation (ASF) under one
2
+ // or more contributor license agreements. See the NOTICE file
3
+ // distributed with this work for additional information
4
+ // regarding copyright ownership. The ASF licenses this file
5
+ // to you under the Apache License, Version 2.0 (the
6
+ // "License"); you may not use this file except in compliance
7
+ // with the License. You may obtain a copy of the License at
8
+ //
9
+ // http://www.apache.org/licenses/LICENSE-2.0
10
+ //
11
+ // Unless required by applicable law or agreed to in writing,
12
+ // software distributed under the License is distributed on an
13
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
+ // KIND, either express or implied. See the License for the
15
+ // specific language governing permissions and limitations
16
+ // under the License.
17
+
18
+ #include " exec/repeat_node.h"
19
+
20
+ #include " exprs/expr.h"
21
+ #include " runtime/raw_value.h"
22
+ #include " runtime/row_batch.h"
23
+ #include " runtime/runtime_state.h"
24
+ #include " util/runtime_profile.h"
25
+
26
+ namespace doris {
27
+
28
+ RepeatNode::RepeatNode (ObjectPool* pool, const TPlanNode& tnode,
29
+ const DescriptorTbl& descs)
30
+ : ExecNode(pool, tnode, descs),
31
+ _slot_id_set_list (tnode.repeat_node.slot_id_set_list),
32
+ _repeat_id_list(tnode.repeat_node.repeat_id_list),
33
+ _output_tuple_id(tnode.repeat_node.output_tuple_id),
34
+ _tuple_desc(nullptr ),
35
+ _child_row_batch(nullptr ),
36
+ _child_eos(false ),
37
+ _repeat_id_idx(0 ),
38
+ _runtime_state(nullptr ) {
39
+ }
40
+
41
+ RepeatNode::~RepeatNode () {
42
+ }
43
+
44
+ Status RepeatNode::prepare (RuntimeState* state) {
45
+ SCOPED_TIMER (_runtime_profile->total_time_counter ());
46
+ RETURN_IF_ERROR (ExecNode::prepare (state));
47
+
48
+ _runtime_state = state;
49
+ _tuple_desc = state->desc_tbl ().get_tuple_descriptor (_output_tuple_id);
50
+ if (_tuple_desc == NULL ) {
51
+ return Status::InternalError (" Failed to get tuple descriptor." );
52
+ }
53
+
54
+ return Status::OK ();
55
+ }
56
+
57
+ Status RepeatNode::open (RuntimeState* state) {
58
+ SCOPED_TIMER (_runtime_profile->total_time_counter ());
59
+ RETURN_IF_ERROR (ExecNode::open (state));
60
+ RETURN_IF_CANCELLED (state);
61
+ RETURN_IF_ERROR (child (0 )->open (state));
62
+ return Status::OK ();
63
+ }
64
+
65
+ Status RepeatNode::get_repeated_batch (
66
+ RowBatch* child_row_batch, int repeat_id_idx, RowBatch* row_batch) {
67
+ DCHECK (repeat_id_idx >= 0 );
68
+ for (const std::vector<int64_t >& v : _repeat_id_list) {
69
+ DCHECK (repeat_id_idx <= (int )v.size ());
70
+ }
71
+ DCHECK (child_row_batch != nullptr );
72
+ DCHECK_EQ (row_batch->num_rows (), 0 );
73
+
74
+ // Fill all slots according to child
75
+ MemPool* tuple_pool = row_batch->tuple_data_pool ();
76
+ const vector<TupleDescriptor*>& src_tuple_descs = child_row_batch->row_desc ().tuple_descriptors ();
77
+ const vector<TupleDescriptor*>& dst_tuple_descs = row_batch->row_desc ().tuple_descriptors ();
78
+ vector<Tuple*> dst_tuples (src_tuple_descs.size (), nullptr );
79
+ for (int i = 0 ; i < child_row_batch->num_rows (); ++i) {
80
+ int row_idx = row_batch->add_row ();
81
+ TupleRow* dst_row = row_batch->get_row (row_idx);
82
+ TupleRow* src_row = child_row_batch->get_row (i);
83
+
84
+ auto src_it = src_tuple_descs.begin ();
85
+ auto dst_it = dst_tuple_descs.begin ();
86
+ for (int j = 0 ; src_it != src_tuple_descs.end () && dst_it != dst_tuple_descs.end ();
87
+ ++src_it, ++dst_it, ++j) {
88
+ Tuple* src_tuple = src_row->get_tuple (j);
89
+ if (src_tuple == NULL ) {
90
+ continue ;
91
+ }
92
+
93
+ if (dst_tuples[j] == nullptr ) {
94
+ int size = row_batch->capacity () * (*dst_it)->byte_size ();
95
+ void * tuple_buffer = tuple_pool->allocate (size);
96
+ if (tuple_buffer == nullptr ) {
97
+ return Status::InternalError (" Allocate memory for row batch failed." );
98
+ }
99
+ dst_tuples[j] = reinterpret_cast <Tuple*>(tuple_buffer);
100
+ } else {
101
+ char * new_tuple = reinterpret_cast <char *>(dst_tuples[j]);
102
+ new_tuple += (*dst_it)->byte_size ();
103
+ dst_tuples[j] = reinterpret_cast <Tuple*>(new_tuple);
104
+ }
105
+ dst_row->set_tuple (j, dst_tuples[j]);
106
+ memset (dst_tuples[j], 0 , (*dst_it)->num_null_bytes ());
107
+ src_tuple->deep_copy (dst_tuples[j], **dst_it, tuple_pool);
108
+ for (int k = 0 ; k < (*src_it)->slots ().size (); k++) {
109
+ SlotDescriptor* src_slot_desc = (*src_it)->slots ()[k];
110
+ SlotDescriptor* dst_slot_desc = (*dst_it)->slots ()[k];
111
+ DCHECK_EQ (src_slot_desc->type ().type , dst_slot_desc->type ().type );
112
+ DCHECK_EQ (src_slot_desc->col_name (), dst_slot_desc->col_name ());
113
+ if (_slot_id_set_list[0 ].find (src_slot_desc->id ()) != _slot_id_set_list[0 ].end ()) {
114
+ std::set<SlotId>& repeat_ids = _slot_id_set_list[repeat_id_idx];
115
+ if (repeat_ids.find (src_slot_desc->id ()) == repeat_ids.end ()) {
116
+ dst_tuples[j]->set_null (dst_slot_desc->null_indicator_offset ());
117
+ continue ;
118
+ }
119
+ }
120
+ }
121
+ }
122
+ row_batch->commit_last_row ();
123
+ }
124
+ Tuple *tuple = nullptr ;
125
+ // Fill grouping ID to tuple
126
+ for (int i = 0 ; i < child_row_batch->num_rows (); ++i) {
127
+ int row_idx = i;
128
+ TupleRow *row = row_batch->get_row (row_idx);
129
+
130
+ if (tuple == nullptr ) {
131
+ int size = row_batch->capacity () * _tuple_desc->byte_size ();
132
+ void *tuple_buffer = tuple_pool->allocate (size);
133
+ if (tuple_buffer == nullptr ) {
134
+ return Status::InternalError (" Allocate memory for row batch failed." );
135
+ }
136
+ tuple = reinterpret_cast <Tuple *>(tuple_buffer);
137
+ } else {
138
+ char *new_tuple = reinterpret_cast <char *>(tuple);
139
+ new_tuple += _tuple_desc->byte_size ();
140
+ tuple = reinterpret_cast <Tuple *>(new_tuple);
141
+ }
142
+
143
+ row->set_tuple (src_tuple_descs.size (), tuple);
144
+ memset (tuple, 0 , _tuple_desc->num_null_bytes ());
145
+
146
+ for (size_t slot_idx = 0 ; slot_idx < _repeat_id_list.size (); ++slot_idx) {
147
+ int64_t val = _repeat_id_list[slot_idx][repeat_id_idx];
148
+ const SlotDescriptor *slot_desc = _tuple_desc->slots ()[slot_idx];
149
+ tuple->set_not_null (slot_desc->null_indicator_offset ());
150
+ RawValue::write (&val, tuple, slot_desc, tuple_pool);
151
+ }
152
+ }
153
+
154
+ return Status::OK ();
155
+ }
156
+
157
+ Status RepeatNode::get_next (RuntimeState* state, RowBatch* row_batch, bool * eos) {
158
+ SCOPED_TIMER (_runtime_profile->total_time_counter ());
159
+ RETURN_IF_ERROR (exec_debug_action (TExecNodePhase::GETNEXT));
160
+ RETURN_IF_CANCELLED (state);
161
+
162
+ // current child has finished its repeat, get child's next batch
163
+ if (_child_row_batch.get () == nullptr ) {
164
+ if (_child_eos) {
165
+ *eos = true ;
166
+ return Status::OK ();
167
+ }
168
+
169
+ _child_row_batch.reset (
170
+ new RowBatch (child (0 )->row_desc (), state->batch_size (), mem_tracker ()));
171
+ RETURN_IF_ERROR (child (0 )->get_next (state, _child_row_batch.get (), &_child_eos));
172
+
173
+ if (_child_row_batch->num_rows () <= 0 ) {
174
+ _child_row_batch.reset (nullptr );
175
+ *eos = true ;
176
+ return Status::OK ();
177
+ }
178
+ }
179
+
180
+ DCHECK_EQ (row_batch->num_rows (), 0 );
181
+ RETURN_IF_ERROR (get_repeated_batch (_child_row_batch.get (), _repeat_id_idx, row_batch));
182
+ _repeat_id_idx++;
183
+
184
+ DCHECK_GT ( _repeat_id_list.size (), 0 );
185
+ int size = _repeat_id_list[0 ].size ();
186
+ if (_repeat_id_idx >= size) {
187
+ _child_row_batch.reset (nullptr );
188
+ _repeat_id_idx = 0 ;
189
+ }
190
+
191
+ return Status::OK ();
192
+ }
193
+
194
+ Status RepeatNode::close (RuntimeState* state) {
195
+ if (is_closed ()) {
196
+ return Status::OK ();
197
+ }
198
+ _child_row_batch.reset (nullptr );
199
+ RETURN_IF_ERROR (child (0 )->close (state));
200
+ return ExecNode::close (state);
201
+ }
202
+
203
+ void RepeatNode::debug_string (int indentation_level, std::stringstream* out) const {
204
+ *out << string (indentation_level * 2 , ' ' );
205
+ *out << " RepeatNode(" ;
206
+ // TODO output content of RepeatNode
207
+ ExecNode::debug_string (indentation_level, out);
208
+ *out << " )" ;
209
+ }
210
+
211
+ }
0 commit comments