Skip to content

Commit

Permalink
Merge pull request #7374 from zhang2014/feat/max_execute_time
Browse files Browse the repository at this point in the history
feat(processor): support max execute time settings
  • Loading branch information
BohuTANG authored Aug 30, 2022
2 parents 869344f + 8b6a0a7 commit 08a9221
Show file tree
Hide file tree
Showing 20 changed files with 212 additions and 22 deletions.
12 changes: 9 additions & 3 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::api::InitNodesChannelPacket;
use crate::api::QueryFragmentsPlanPacket;
use crate::interpreters::QueryFragmentActions;
use crate::interpreters::QueryFragmentsActions;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::QueryPipelineBuilder;
Expand Down Expand Up @@ -562,9 +563,14 @@ impl QueryCoordinator {

let async_runtime = GlobalIORuntime::instance();
let query_need_abort = info.query_ctx.query_need_abort();

let executor =
PipelineCompleteExecutor::from_pipelines(async_runtime, query_need_abort, pipelines)?;
let executor_settings = ExecutorSettings::try_create(&info.query_ctx.get_settings())?;

let executor = PipelineCompleteExecutor::from_pipelines(
async_runtime,
query_need_abort,
pipelines,
executor_settings,
)?;

self.fragment_exchanges.clear();
let info_mut = self.info.as_mut().expect("Query info is None");
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/interpreters/async_insert_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use parking_lot::RwLock;
use super::InsertInterpreter;
use super::SelectInterpreter;
use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::processors::port::InputPort;
use crate::pipelines::processors::port::OutputPort;
Expand Down Expand Up @@ -305,10 +306,12 @@ impl AsyncInsertManager {
}
pipeline.add_pipe(sink_pipeline_builder.finalize());

let executor_settings = ExecutorSettings::try_create(&settings)?;
let executor = PipelineCompleteExecutor::try_create(
GlobalIORuntime::instance(),
ctx.query_need_abort(),
pipeline,
executor_settings,
)
.unwrap();
executor.execute()?;
Expand Down
10 changes: 8 additions & 2 deletions src/query/service/src/interpreters/interpreter_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_streams::SendableDataBlockStream;

use super::Interpreter;
use crate::interpreters::ProcessorExecutorStream;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::procedures::ProcedureFactory;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -81,8 +82,13 @@ impl Interpreter for CallInterpreter {
let async_runtime = GlobalIORuntime::instance();
let query_need_abort = ctx.query_need_abort();
pipeline.set_max_threads(settings.get_max_threads()? as usize);
let executor =
PipelinePullingExecutor::try_create(async_runtime, query_need_abort, pipeline)?;
let executor_settings = ExecutorSettings::try_create(&settings)?;
let executor = PipelinePullingExecutor::try_create(
async_runtime,
query_need_abort,
pipeline,
executor_settings,
)?;

Ok(Box::pin(ProcessorExecutorStream::create(executor)?))
}
Expand Down
9 changes: 8 additions & 1 deletion src/query/service/src/interpreters/interpreter_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures::TryStreamExt;
use regex::Regex;
use tracing::warn;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::processors::TransformAddOn;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -60,9 +61,15 @@ pub fn append2table(
table.append2(ctx.clone(), &mut pipeline)?;
let async_runtime = GlobalIORuntime::instance();
let query_need_abort = ctx.query_need_abort();
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?;

pipeline.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
let executor = PipelineCompleteExecutor::try_create(async_runtime, query_need_abort, pipeline)?;
let executor = PipelineCompleteExecutor::try_create(
async_runtime,
query_need_abort,
pipeline,
executor_settings,
)?;
executor.execute()
}

Expand Down
10 changes: 8 additions & 2 deletions src/query/service/src/interpreters/interpreter_copy_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use super::append2table;
use super::commit2table;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreterV2;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::Pipeline;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -160,8 +161,13 @@ impl CopyInterpreterV2 {

let async_runtime = GlobalIORuntime::instance();
let query_need_abort = ctx.query_need_abort();
let executor =
PipelineCompleteExecutor::try_create(async_runtime, query_need_abort, pipeline)?;
let executor_settings = ExecutorSettings::try_create(&settings)?;
let executor = PipelineCompleteExecutor::try_create(
async_runtime,
query_need_abort,
pipeline,
executor_settings,
)?;
executor.execute()?;

Ok(ctx.consume_precommit_blocks())
Expand Down
9 changes: 8 additions & 1 deletion src/query/service/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::interpreters::plan_schedulers;
use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
use crate::optimizers::Optimizers;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::Pipeline;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -92,8 +93,14 @@ impl Interpreter for SelectInterpreter {
let build_res = self.build_pipeline().await?;
let async_runtime = GlobalIORuntime::instance();
let query_need_abort = self.ctx.query_need_abort();
let executor_settings = ExecutorSettings::try_create(&self.ctx.get_settings())?;
Ok(Box::pin(ProcessorExecutorStream::create(
PipelinePullingExecutor::from_pipelines(async_runtime, query_need_abort, build_res)?,
PipelinePullingExecutor::from_pipelines(
async_runtime,
query_need_abort,
build_res,
executor_settings,
)?,
)?))
}

Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/interpreters/interpreter_select_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::plan_schedulers::schedule_query_v2;
use crate::clusters::ClusterHelper;
use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::Pipeline;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -103,11 +104,14 @@ impl Interpreter for SelectInterpreterV2 {
.await;
}

let executor_settings = ExecutorSettings::try_create(&self.ctx.get_settings())?;

Ok(Box::pin(Box::pin(ProcessorExecutorStream::create(
PipelinePullingExecutor::from_pipelines(
GlobalIORuntime::instance(),
self.ctx.query_need_abort(),
build_res,
executor_settings,
)?,
)?)))
}
Expand Down
20 changes: 16 additions & 4 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;

use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::Pipeline;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -68,8 +69,14 @@ impl OptimizeTableInterpreter {

let async_runtime = GlobalIORuntime::instance();
let query_need_abort = ctx.query_need_abort();
let executor =
PipelineCompleteExecutor::try_create(async_runtime, query_need_abort, pipeline)?;
let executor_settings = ExecutorSettings::try_create(&settings)?;
let executor = PipelineCompleteExecutor::try_create(
async_runtime,
query_need_abort,
pipeline,
executor_settings,
)?;

executor.execute()?;
drop(executor);

Expand All @@ -91,8 +98,13 @@ impl OptimizeTableInterpreter {
pipeline.set_max_threads(settings.get_max_threads()? as usize);
let async_runtime = GlobalIORuntime::instance();
let query_need_abort = ctx.query_need_abort();
let executor =
PipelineCompleteExecutor::try_create(async_runtime, query_need_abort, pipeline)?;
let executor_settings = ExecutorSettings::try_create(&settings)?;
let executor = PipelineCompleteExecutor::try_create(
async_runtime,
query_need_abort,
pipeline,
executor_settings,
)?;
executor.execute()?;
drop(executor);

Expand Down
31 changes: 31 additions & 0 deletions src/query/service/src/pipelines/executor/executor_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use common_exception::Result;
use common_settings::Settings;

pub struct ExecutorSettings {
pub max_execute_time: Duration,
}

impl ExecutorSettings {
pub fn try_create(settings: &Settings) -> Result<ExecutorSettings> {
let max_execute_time = settings.get_max_execute_time()?;
Ok(ExecutorSettings {
max_execute_time: Duration::from_millis(max_execute_time),
})
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod pipeline_executor;

mod executor_condvar;
mod executor_graph;
mod executor_settings;
mod executor_tasks;
mod executor_worker_context;
mod pipeline_complete_executor;
Expand All @@ -24,6 +25,7 @@ mod pipeline_pushing_executor;
mod processor_async_task;

pub use executor_graph::RunningGraph;
pub use executor_settings::ExecutorSettings;
pub use pipeline_complete_executor::PipelineCompleteExecutor;
pub use pipeline_executor::FinishedCallback;
pub use pipeline_executor::PipelineExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_base::base::Runtime;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineExecutor;
use crate::pipelines::Pipeline;

Expand All @@ -33,21 +34,24 @@ impl PipelineCompleteExecutor {
async_runtime: Arc<Runtime>,
query_need_abort: Arc<AtomicBool>,
pipeline: Pipeline,
settings: ExecutorSettings,
) -> Result<PipelineCompleteExecutor> {
if !pipeline.is_complete_pipeline()? {
return Err(ErrorCode::LogicalError(
"Logical error, PipelineCompleteExecutor can only work on complete pipeline.",
));
}

let executor = PipelineExecutor::create(async_runtime, query_need_abort, pipeline)?;
let executor =
PipelineExecutor::create(async_runtime, query_need_abort, pipeline, settings)?;
Ok(PipelineCompleteExecutor { executor })
}

pub fn from_pipelines(
async_runtime: Arc<Runtime>,
query_need_abort: Arc<AtomicBool>,
pipelines: Vec<Pipeline>,
settings: ExecutorSettings,
) -> Result<Arc<PipelineCompleteExecutor>> {
for pipeline in &pipelines {
if !pipeline.is_complete_pipeline()? {
Expand All @@ -58,7 +62,7 @@ impl PipelineCompleteExecutor {
}

let executor =
PipelineExecutor::from_pipelines(async_runtime, query_need_abort, pipelines)?;
PipelineExecutor::from_pipelines(async_runtime, query_need_abort, pipelines, settings)?;
Ok(Arc::new(PipelineCompleteExecutor { executor }))
}

Expand Down
Loading

1 comment on commit 08a9221

@vercel
Copy link

@vercel vercel bot commented on 08a9221 Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.rs

Please sign in to comment.