Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 committed Mar 26, 2024
1 parent 891610d commit 8c15684
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use databend_common_arrow::arrow::chunk::Chunk;
use databend_common_catalog::catalog::CATALOG_DEFAULT;
use databend_common_exception::Result;
use databend_common_expression::RemoteExpr;
use databend_common_sql::executor::physical_plans::ChunkAppendData;
use databend_common_sql::executor::physical_plans::ChunkCommitInsert;
use databend_common_sql::executor::physical_plans::ChunkMerge;
Expand All @@ -37,8 +34,6 @@ use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::executor::cast_expr_to_non_null_boolean;
use crate::sql::executor::physical_plans::ChunkFilter;
use crate::sql::executor::physical_plans::Duplicate;
use crate::sql::executor::physical_plans::Shuffle;
pub struct InsertMultiTableInterpreter {
Expand Down Expand Up @@ -82,7 +77,7 @@ impl InsertMultiTableInterpreter {
intos,
} = &self.plan;

let (source_plan, select_column_bindings, _metadata) = match input_source {
let (source_plan, _select_column_bindings, _metadata) = match input_source {
Plan::Query {
s_expr,
metadata,
Expand Down Expand Up @@ -115,7 +110,7 @@ impl InsertMultiTableInterpreter {
projection,
casted_schema,
} = into;
let table = self.ctx.get_table(&catalog, &database, &table).await?;
let table = self.ctx.get_table(catalog, database, table).await?;
branches.push((table, Some(&when.condition), projection, casted_schema));
}
}
Expand All @@ -128,7 +123,7 @@ impl InsertMultiTableInterpreter {
projection,
casted_schema,
} = into;
let table = self.ctx.get_table(&catalog, &database, &table).await?;
let table = self.ctx.get_table(catalog, database, table).await?;
branches.push((table, None, projection, casted_schema));
}
}
Expand All @@ -141,7 +136,7 @@ impl InsertMultiTableInterpreter {
projection,
casted_schema,
} = into;
let table = self.ctx.get_table(&catalog, &database, &table).await?;
let table = self.ctx.get_table(catalog, database, table).await?;
branches.push((table, None, projection, casted_schema));
}
}
Expand Down
14 changes: 0 additions & 14 deletions src/query/service/src/pipelines/builders/builder_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::filter::build_select_expr;
use databend_common_expression::type_check::check_function;
use databend_common_expression::types::DataType;
use databend_common_expression::RemoteExpr;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::DynTransformBuilder;
use databend_common_sql::executor::physical_plans::Filter;

use crate::pipelines::processors::transforms::TransformFilter;
use crate::pipelines::processors::InputPort;
use crate::pipelines::processors::OutputPort;
use crate::pipelines::PipelineBuilder;
impl PipelineBuilder {
pub(crate) fn build_filter(&mut self, filter: &Filter) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@ use std::collections::HashMap;
use std::collections::HashSet;

use databend_common_catalog::catalog::CatalogManager;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::block_thresholds;
use databend_common_expression::filter::build_select_expr;
use databend_common_expression::type_check::check_function;
use databend_common_expression::types::DataType;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::DynTransformBuilder;
use databend_common_pipeline_sinks::AsyncSinker;
Expand All @@ -35,11 +28,9 @@ use databend_common_sql::executor::physical_plans::ChunkFilter;
use databend_common_sql::executor::physical_plans::ChunkMerge;
use databend_common_sql::executor::physical_plans::ChunkProject;
use databend_common_sql::executor::physical_plans::Duplicate;
use databend_common_sql::executor::physical_plans::Filter;
use databend_common_sql::executor::physical_plans::Shuffle;
use databend_common_storages_fuse::operations::CommitMultiTableInsert;

use crate::pipelines::processors::transforms::TransformFilter;
use crate::pipelines::PipelineBuilder;
impl PipelineBuilder {
pub(crate) fn build_duplicate(&mut self, plan: &Duplicate) -> Result<()> {
Expand All @@ -64,7 +55,7 @@ impl PipelineBuilder {
for predicate in plan.predicates.iter() {
if let Some(predicate) = predicate {
f.push(Box::new(self.filter_transform_builder(
&vec![predicate.clone()],
&[predicate.clone()],
HashSet::default(),
)?));
} else {
Expand All @@ -75,17 +66,17 @@ impl PipelineBuilder {
Ok(())
}

pub(crate) fn build_chunk_project(&mut self, plan: &ChunkProject) -> Result<()> {
pub(crate) fn build_chunk_project(&mut self, _plan: &ChunkProject) -> Result<()> {
Ok(())
}

pub(crate) fn build_chunk_cast_schema(&mut self, plan: &ChunkCastSchema) -> Result<()> {
pub(crate) fn build_chunk_cast_schema(&mut self, _plan: &ChunkCastSchema) -> Result<()> {
Ok(())
}

pub(crate) fn build_chunk_fill_and_reorder(
&mut self,
plan: &ChunkFillAndReorder,
_plan: &ChunkFillAndReorder,
) -> Result<()> {
Ok(())
}
Expand Down Expand Up @@ -130,7 +121,7 @@ impl PipelineBuilder {
deduplicated_label,
targets,
} = plan;
self.build_pipeline(&input)?;
self.build_pipeline(input)?;
let mut serialize_segment_builders: Vec<DynTransformBuilder> =
Vec::with_capacity(targets.len());
let mut mutation_aggregator_builders: Vec<DynTransformBuilder> =
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/pipelines/builders/transform_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ use databend_common_expression::BlockThresholds;
use databend_common_expression::RemoteExpr;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::DynTransformBuilder;
use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransformer;
use databend_common_pipeline_transforms::processors::BlockCompactor;
use databend_common_pipeline_transforms::processors::TransformCompact;
use databend_common_pipeline_transforms::processors::TransformDummy;
use databend_common_sql::executor::physical_plans::Filter;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::operations::TableMutationAggregator;
use databend_common_storages_fuse::operations::TransformSerializeBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl AccumulatingTransform for TransformMergeCommitMeta {
if to_merged.is_empty() {
return Ok(vec![]);
}
let table_id = to_merged[0].table_id.clone();
let table_id = to_merged[0].table_id;
let merged = to_merged
.into_iter()
.fold(CommitMeta::empty(table_id), |acc, x| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use async_trait::unboxed_simple;
Expand All @@ -23,29 +22,16 @@ use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::StringType;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::BlockRowIndex;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchemaRef;
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_meta_app::schema::UpdateTableMetaReq;
use databend_common_meta_types::MatchSeq;
use databend_common_metrics::storage::*;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sinks::AsyncSink;
use databend_common_pipeline_sinks::AsyncSinker;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::Versioned;
use futures::future::Fuse;
use opendal::Operator;

use crate::fuse_table;
use crate::io;
use crate::io::TableMetaLocationGenerator;
use crate::io::WriteSettings;
use crate::operations::common::CommitMeta;
use crate::operations::merge_commit_meta;
use crate::operations::AppendGenerator;
Expand Down

0 comments on commit 8c15684

Please sign in to comment.