Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-17]TPCDS Q72 optimization (#47)
Browse files Browse the repository at this point in the history
* Add an optimization when hashmap key is unique

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Move hash calucation inside hashmap for further optimization

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Cache most recent cache key for fast probe

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* switch left and right wrap sequence due to numOutput

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Fix a round_2 maybe name conflict issue

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* skip is_null check if !has_null

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Add HasNull check in HashRelationColumn

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
  • Loading branch information
xuechendi authored Jan 19, 2021
1 parent c03d469 commit a9ae3d8
Show file tree
Hide file tree
Showing 10 changed files with 408 additions and 191 deletions.
51 changes: 28 additions & 23 deletions core/src/main/java/com/intel/oap/vectorized/SerializableObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
allocator = UnpooledByteBufAllocator.DEFAULT;
directAddrs = new ByteBuf[size_len];
for (int i = 0; i < size.length; i++) {
byte[] data = new byte[size[i]];
directAddrs[i] = allocator.directBuffer(size[i], size[i]);
OutputStream out = new ByteBufOutputStream(directAddrs[i]);
data = (byte[]) in.readObject();
out.write(data);
out.close();
if (size[i] > 0) {
byte[] data = new byte[size[i]];
data = (byte[]) in.readObject();
OutputStream out = new ByteBufOutputStream(directAddrs[i]);
out.write(data);
out.close();
}
}
}

Expand All @@ -88,13 +90,12 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(this.size.length);
out.writeObject(this.size);
for (int i = 0; i < size.length; i++) {
byte[] data = new byte[size[i]];
ByteBufInputStream in = new ByteBufInputStream(directAddrs[i]);
try {
if (size[i] > 0) {
byte[] data = new byte[size[i]];
ByteBufInputStream in = new ByteBufInputStream(directAddrs[i]);
in.read(data);
} catch (IOException e) {
out.writeObject(data);
}
out.writeObject(data);
}
}

Expand All @@ -106,14 +107,16 @@ public void read(Kryo kryo, Input in) {
allocator = UnpooledByteBufAllocator.DEFAULT;
directAddrs = new ByteBuf[size_len];
for (int i = 0; i < size.length; i++) {
byte[] data = new byte[size[i]];
directAddrs[i] = allocator.directBuffer(size[i], size[i]);
OutputStream out = new ByteBufOutputStream(directAddrs[i]);
try {
in.readBytes(data);
out.write(data);
out.close();
} catch (IOException e) {
if (size[i] > 0) {
byte[] data = new byte[size[i]];
OutputStream out = new ByteBufOutputStream(directAddrs[i]);
try {
in.readBytes(data);
out.write(data);
out.close();
} catch (IOException e) {
}
}
}
}
Expand All @@ -124,13 +127,15 @@ public void write(Kryo kryo, Output out) {
out.writeInt(this.size.length);
out.writeInts(this.size);
for (int i = 0; i < size.length; i++) {
byte[] data = new byte[size[i]];
ByteBufInputStream in = new ByteBufInputStream(directAddrs[i]);
try {
in.read(data);
} catch (IOException e) {
if (size[i] > 0) {
byte[] data = new byte[size[i]];
ByteBufInputStream in = new ByteBufInputStream(directAddrs[i]);
try {
in.read(data);
} catch (IOException e) {
}
out.writeBytes(data);
}
out.writeBytes(data);
}
}

Expand Down
116 changes: 78 additions & 38 deletions cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ class ConditionedMergeJoinKernel::Impl {
<< GetTemplateString(relation_col_type, "TypedRelationColumn",
"Type", "arrow::")
<< "> " << relation_col_name << ";" << std::endl;
sort_define_ss << "bool " << relation_col_name << "_has_null;" << std::endl;
sort_prepare_ss << "RETURN_NOT_OK(" << relation_list_name << "->GetColumn(" << i
<< ", &" << relation_col_name << "));" << std::endl;
sort_prepare_ss << relation_col_name << "_has_null = " << relation_col_name
<< "->HasNull();" << std::endl;
}
idx++;
}
Expand Down Expand Up @@ -213,21 +216,21 @@ class ConditionedMergeJoinKernel::Impl {
<< std::endl;
function_define_ss << "if (!(" << right_validity_paramater << ")) return 1;"
<< std::endl;
auto left_tuple_name = left_paramater;
auto right_tuple_name = right_paramater;
if (project_output_list[0].size() > 1) {
function_define_ss << "auto left_tuple = std::make_tuple(" << left_paramater
<< " );" << std::endl;
} else {
function_define_ss << "auto left_tuple = " << left_paramater << ";" << std::endl;
left_tuple_name = "left_tuple";
}
if (project_output_list[1].size() > 1) {
function_define_ss << "auto right_tuple = std::make_tuple(" << right_paramater
<< " );" << std::endl;
} else {
function_define_ss << "auto right_tuple = " << right_paramater << ";" << std::endl;
right_tuple_name = "right_tuple";
}
function_define_ss
<< "return left_tuple == right_tuple ? 0 : (left_tuple < right_tuple ? -1 : 1);"
<< std::endl;
function_define_ss << "return " << left_tuple_name << " == " << right_tuple_name
<< " ? 0 : (" << left_tuple_name << " < " << right_tuple_name
<< " ? -1 : 1);" << std::endl;
function_define_ss << "}" << std::endl;
auto compare_function = function_define_ss.str();
codegen_ctx->function_list.push_back(compare_function);
Expand Down Expand Up @@ -297,7 +300,7 @@ class ConditionedMergeJoinKernel::Impl {
std::vector<arrow::ArrayVector> cached_;

arrow::Status GetInnerJoin(bool cond_check, bool use_relation_for_stream,
std::shared_ptr<CodeGenContext>* output) {
bool cache_right, std::shared_ptr<CodeGenContext>* output) {
std::stringstream shuffle_ss;
std::stringstream codes_ss;
std::stringstream finish_codes_ss;
Expand Down Expand Up @@ -335,25 +338,35 @@ class ConditionedMergeJoinKernel::Impl {
codes_ss << "}" << std::endl;
///////////////////////////

std::stringstream right_for_loop_codes;
if (use_relation_for_stream) {
codes_ss << "auto " << streamed_range_name << " = " << streamed_relation
<< "->GetSameKeyRange();" << std::endl;
codes_ss << "for (int " << streamed_range_id << " = 0; " << streamed_range_id
<< " < " << streamed_range_name << "; " << streamed_range_id << "++) {"
<< std::endl;
codes_ss << right_index_name << " = " << streamed_relation
<< "->GetItemIndexWithShift(" << streamed_range_id << ");" << std::endl;
right_for_loop_codes << "for (int " << streamed_range_id << " = 0; "
<< streamed_range_id << " < " << streamed_range_name << "; "
<< streamed_range_id << "++) {" << std::endl;
right_for_loop_codes << right_index_name << " = " << streamed_relation
<< "->GetItemIndexWithShift(" << streamed_range_id << ");"
<< std::endl;
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << right_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
}
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << left_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
if (cache_right) {
codes_ss << right_for_loop_codes.str();
codes_ss << "auto is_smj_" << relation_id << " = false;" << std::endl;
}
codes_ss << "for (int " << range_id << " = 0; " << range_id << " < " << range_name
<< "; " << range_id << "++) {" << std::endl;
codes_ss << left_index_name << " = " << build_relation << "->GetItemIndexWithShift("
<< range_id << ");" << std::endl;
if (!cache_right) {
codes_ss << "auto is_smj_" << relation_id << " = false;" << std::endl;
codes_ss << right_for_loop_codes.str();
}
if (cond_check) {
auto condition_name = "ConditionCheck_" + std::to_string(relation_id_[0]);
if (use_relation_for_stream) {
Expand All @@ -378,7 +391,7 @@ class ConditionedMergeJoinKernel::Impl {
return arrow::Status::OK();
}
arrow::Status GetOuterJoin(bool cond_check, bool use_relation_for_stream,
std::shared_ptr<CodeGenContext>* output) {
bool cache_right, std::shared_ptr<CodeGenContext>* output) {
std::stringstream shuffle_ss;
std::stringstream codes_ss;
std::stringstream finish_codes_ss;
Expand Down Expand Up @@ -413,14 +426,16 @@ class ConditionedMergeJoinKernel::Impl {
codes_ss << "}" << std::endl;
///////////////////////////

std::stringstream right_for_loop_codes;
if (use_relation_for_stream) {
codes_ss << "auto " << streamed_range_name << " = " << streamed_relation
<< "->GetSameKeyRange();" << std::endl;
codes_ss << "for (int " << streamed_range_id << " = 0; " << streamed_range_id
<< " < " << streamed_range_name << "; " << streamed_range_id << "++) {"
<< std::endl;
codes_ss << right_index_name << " = " << streamed_relation
<< "->GetItemIndexWithShift(" << streamed_range_id << ");" << std::endl;
right_for_loop_codes << "for (int " << streamed_range_id << " = 0; "
<< streamed_range_id << " < " << streamed_range_name << "; "
<< streamed_range_id << "++) {" << std::endl;
right_for_loop_codes << right_index_name << " = " << streamed_relation
<< "->GetItemIndexWithShift(" << streamed_range_id << ");"
<< std::endl;
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << right_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
Expand All @@ -429,12 +444,20 @@ class ConditionedMergeJoinKernel::Impl {
prepare_ss << "ArrayItemIndexS " << left_index_name << ";" << std::endl;
prepare_ss << "bool " << fill_null_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
if (cache_right) {
codes_ss << right_for_loop_codes.str();
codes_ss << "auto is_smj_" << relation_id << " = false;" << std::endl;
}
codes_ss << "for (int " << range_id << " = 0; " << range_id << " < " << range_name
<< "; " << range_id << "++) {" << std::endl;
codes_ss << "if(!" << fill_null_name << "){" << std::endl;
codes_ss << left_index_name << " = " << build_relation << "->GetItemIndexWithShift("
<< range_id << ");" << std::endl;
codes_ss << "}" << std::endl;
if (!cache_right) {
codes_ss << "auto is_smj_" << relation_id << " = false;" << std::endl;
codes_ss << right_for_loop_codes.str();
}
if (cond_check) {
auto condition_name = "ConditionCheck_" + std::to_string(relation_id_[0]);
if (use_relation_for_stream) {
Expand Down Expand Up @@ -552,7 +575,6 @@ class ConditionedMergeJoinKernel::Impl {
return arrow::Status::OK();
}
arrow::Status GetSemiJoin(bool cond_check, bool use_relation_for_stream,

std::shared_ptr<CodeGenContext>* output) {
std::stringstream shuffle_ss;
std::stringstream codes_ss;
Expand Down Expand Up @@ -752,6 +774,10 @@ class ConditionedMergeJoinKernel::Impl {
// define output list here, which will also be defined in class variables definition

int right_index_shift = 0;
std::vector<int> left_output_idx_list;
std::vector<int> right_output_idx_list;
std::stringstream define_ss;

for (int idx = 0; idx < result_schema_index_list_.size(); idx++) {
std::string name;
std::string arguments;
Expand All @@ -768,45 +794,52 @@ class ConditionedMergeJoinKernel::Impl {
type = left_field_list_[i]->type();
arguments = left_index_name + ".array_id, " + left_index_name + ".id";
if (join_type == 1) {
valid_ss << "auto " << output_validity << " = !" << fill_null_name << " && !"
<< name << "->IsNull(" << arguments << ");" << std::endl;
valid_ss << output_validity << " = !" << fill_null_name << " && !(" << name
<< "_has_null && " << name << "->IsNull(" << arguments << "));"
<< std::endl;
} else {
valid_ss << "auto " << output_validity << " = !" << name << "->IsNull("
<< arguments << ");" << std::endl;
valid_ss << output_validity << " = !(" << name << "_has_null && " << name
<< "->IsNull(" << arguments << "));" << std::endl;
}
valid_ss << "auto " << output_name << " = " << name << "->GetValue(" << arguments
<< ");" << std::endl;
valid_ss << output_name << " = " << name << "->GetValue(" << arguments << ");"
<< std::endl;

left_output_idx_list.push_back(idx);
define_ss << "bool " << output_name << "_validity = true;" << std::endl;
define_ss << GetCTypeString(type) << " " << output_name << ";" << std::endl;
} else { /*right(streamed) table*/
if (use_relation_for_stream) { /* use sort relation in streamed side*/
if (exist_index_ != -1 && exist_index_ == i) {
name =
"sort_relation_" + std::to_string(relation_id_[0]) + "_existence_value";
type = arrow::boolean();
valid_ss << "auto " << output_validity << " = " << name << "_validity;"
<< std::endl;
valid_ss << "auto " << output_name << " = " << name << ";" << std::endl;
valid_ss << output_validity << " = " << name << "_validity;" << std::endl;
valid_ss << output_name << " = " << name << ";" << std::endl;
right_index_shift = -1;
} else {
i += right_index_shift;
name = "sort_relation_" + std::to_string(relation_id_[1]) + "_" +
std::to_string(i);
type = right_field_list_[i]->type();
arguments = right_index_name + ".array_id, " + right_index_name + ".id";
valid_ss << "auto " << output_validity << " = !" << name << "->IsNull("
<< arguments << ");" << std::endl;
valid_ss << "auto " << output_name << " = " << name << "->GetValue("
<< arguments << ");" << std::endl;
valid_ss << output_validity << " = !(" << name << "_has_null && " << name
<< "->IsNull(" << arguments << "));" << std::endl;
valid_ss << output_name << " = " << name << "->GetValue(" << arguments << ");"
<< std::endl;
}
right_output_idx_list.push_back(idx);
define_ss << "bool " << output_name << "_validity = true;" << std::endl;
define_ss << GetCTypeString(type) << " " << output_name << ";" << std::endl;
} else { /* use previous output in streamed side*/
if (exist_index_ != -1 && exist_index_ == i) {
name =
"sort_relation_" + std::to_string(relation_id_[0]) + "_existence_value";
valid_ss << "auto " << output_validity << " = " << name << "_validity;"
<< std::endl;
valid_ss << "auto " << output_name << " = " << name << ";" << std::endl;
valid_ss << output_validity << " = " << name << "_validity;" << std::endl;
valid_ss << output_name << " = " << name << ";" << std::endl;
type = arrow::boolean();
right_index_shift = -1;
define_ss << "bool " << output_name << "_validity = true;" << std::endl;
define_ss << GetCTypeString(type) << " " << output_name << ";" << std::endl;
} else {
i += right_index_shift;
output_name = input[i].first.first;
Expand All @@ -819,13 +852,18 @@ class ConditionedMergeJoinKernel::Impl {
(*output)->output_list.push_back(
std::make_pair(std::make_pair(output_name, valid_ss.str()), type));
}
std::stringstream process_ss;
bool cache_right = true;
if (left_output_idx_list.size() > right_output_idx_list.size()) cache_right = false;

switch (join_type) {
case 0: { /* inner join */
RETURN_NOT_OK(GetInnerJoin(cond_check, use_relation_for_stream, output));
RETURN_NOT_OK(
GetInnerJoin(cond_check, use_relation_for_stream, cache_right, output));
} break;
case 1: { /* Outer join */
RETURN_NOT_OK(GetOuterJoin(cond_check, use_relation_for_stream, output));
RETURN_NOT_OK(
GetOuterJoin(cond_check, use_relation_for_stream, cache_right, output));
} break;
case 2: { /* Anti join */
RETURN_NOT_OK(GetAntiJoin(cond_check, use_relation_for_stream, output));
Expand All @@ -839,6 +877,8 @@ class ConditionedMergeJoinKernel::Impl {
default: {
} break;
}
(*output)->process_codes += process_ss.str();
(*output)->definition_codes += define_ss.str();

return arrow::Status::OK();
}
Expand Down
Loading

0 comments on commit a9ae3d8

Please sign in to comment.