Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20240227) (#4786)
Browse files Browse the repository at this point in the history
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240227)

* Fix Build due to ClickHouse/ClickHouse#60082

* Fix UT due to ClickHouse/ClickHouse#60082

---------

Co-authored-by: kyligence-git <gluten@kyligence.io>
Co-authored-by: Chang Chen <baibaichen@gmail.com>
  • Loading branch information
3 people authored Feb 27, 2024
1 parent 285e6d3 commit 94c8e7e
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 24 deletions.
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240225
CH_COMMIT=0c480a024a5
CH_BRANCH=rebase_ch/20240227
CH_COMMIT=5c829b8fe91
3 changes: 2 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ QueryPlanStepPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrai
const ActionsDAGPtr actions_dag = std::make_shared<ActionsDAG>(blockToNameAndTypeList(header));
const ActionsDAG::Node * filter_node = parseExpression(actions_dag, rel.filter());
actions_dag->addOrReplaceInOutputs(*filter_node);
source_step->addFilter(actions_dag, filter_node);
assert(filter_node == &(actions_dag->findInOutputs(filter_node->result_name)));
source_step->addFilter(actions_dag, filter_node->result_name);
}
return source_step;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,9 @@ SubstraitFileSource::SubstraitFileSource(
}
}

void SubstraitFileSource::setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs & nodes, DB::ContextPtr context_)
void SubstraitFileSource::setKeyCondition(const DB::ActionsDAGPtr & filter_actions_dag, DB::ContextPtr context_)
{
const auto & keys = to_read_header;
std::unordered_map<std::string, DB::ColumnWithTypeAndName> node_name_to_input_column;
for (const auto & column : keys.getColumnsWithTypeAndName())
node_name_to_input_column.insert({column.name, column});

auto filter_actions_dag = DB::ActionsDAG::buildFilterActionsDAG(nodes, node_name_to_input_column);
key_condition = std::make_shared<const DB::KeyCondition>(
filter_actions_dag,
context_,
keys.getNames(),
std::make_shared<DB::ExpressionActions>(std::make_shared<DB::ActionsDAG>(keys.getColumnsWithTypeAndName())));
setKeyConditionImpl(filter_actions_dag, context_, to_read_header);
}

DB::Chunk SubstraitFileSource::generate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class SubstraitFileSource : public DB::SourceWithKeyCondition

String getName() const override { return "SubstraitFileSource"; }

void setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs & nodes, DB::ContextPtr context_) override;
void setKeyCondition(const DB::ActionsDAGPtr & filter_actions_dag, DB::ContextPtr context_) override;

protected:
DB::Chunk generate() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,26 @@ namespace ErrorCodes

namespace local_engine
{
namespace
{
/*
* As discussed with community, we need to create a dummy storage to be used in SubstraitFileSourceStep.
*/
class SubstraitFileStorage final : public DB::IStorage
{
public:
explicit SubstraitFileStorage(const DB::StorageID & storage_id_) : IStorage(storage_id_) { }
bool canMoveConditionsToPrewhere() const override { return false; }
std::string getName() const override { return "SubstraitFile"; };
};
SubstraitFileStorage dummy_storage{DB::StorageID("dummy_db", "dummy_table")};

}

SubstraitFileSourceStep::SubstraitFileSourceStep(DB::ContextPtr context_, DB::Pipe pipe_, const String &)
: SourceStepWithFilter(DB::DataStream{.header = pipe_.getHeader()}), pipe(std::move(pipe_)), context(context_)
SubstraitFileSourceStep::SubstraitFileSourceStep(const DB::ContextPtr & context_, DB::Pipe pipe_, const String &)
: SourceStepWithFilter(
DB::DataStream{.header = pipe_.getHeader()}, {}, {}, dummy_storage.getStorageSnapshot(nullptr, nullptr), context_)
, pipe(std::move(pipe_))
{
}

Expand All @@ -56,12 +73,13 @@ void SubstraitFileSourceStep::initializePipeline(DB::QueryPipelineBuilder & pipe
pipeline.init(std::move(pipe));
}

void SubstraitFileSourceStep::applyFilters()
void SubstraitFileSourceStep::applyFilters(const DB::ActionDAGNodes added_filter_nodes)
{
filter_actions_dag = DB::ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
for (const auto & processor : pipe.getProcessors())
{
if (auto * source = dynamic_cast<DB::SourceWithKeyCondition *>(processor.get()))
source->setKeyCondition(filter_nodes.nodes, context);
source->setKeyCondition(filter_actions_dag, context);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,16 @@ namespace local_engine
class SubstraitFileSourceStep : public DB::SourceStepWithFilter
{
public:
explicit SubstraitFileSourceStep(DB::ContextPtr context_, DB::Pipe pipe_, const String & name);
explicit SubstraitFileSourceStep(const DB::ContextPtr & context_, DB::Pipe pipe_, const String & name);

void applyFilters() override;
void applyFilters(DB::ActionDAGNodes added_filter_nodes) override;

String getName() const override { return "SubstraitFileSourceStep"; }

void initializePipeline(DB::QueryPipelineBuilder &, const DB::BuildQueryPipelineSettings &) override;

private:
DB::Pipe pipe;
DB::ContextPtr context;
};

}

0 comments on commit 94c8e7e

Please sign in to comment.