Skip to content

Commit

Permalink
fix(query): fix attach_query_str (#15066)
Browse files Browse the repository at this point in the history
* fix(query): fix attach_query_str

* fix(query): fix attach_query_str

* fix(query): fix attach_query_str

* fix(query): fix attach_query_str

* fix(query): fix attach_query_str

* update

* update
  • Loading branch information
sundy-li authored Mar 23, 2024
1 parent 171ada8 commit 9399794
Show file tree
Hide file tree
Showing 18 changed files with 78 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/query/catalog/src/query_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum QueryKind {
Query,
Explain,
CopyIntoTable,
CopyIntoLocation,
Update,
Insert,
Other,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_sql::get_query_kind;
use databend_common_sql::plans::Plan;
use databend_common_sql::PlanExtras;
use databend_common_sql::Planner;
Expand Down Expand Up @@ -95,7 +96,10 @@ impl FlightSqlServiceImpl {
.await
.map_err(|e| status!("Could not create_query_context", e))?;

context.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql());
context.attach_query_str(
get_query_kind(&plan_extras.statement),
plan_extras.statement.to_mask_sql(),
);
let interpreter = InterpreterFactory::get(context.clone(), plan).await?;

let mut blocks = interpreter.execute(context.clone()).await?;
Expand All @@ -120,7 +124,10 @@ impl FlightSqlServiceImpl {
.await
.map_err(|e| status!("Could not create_query_context", e))?;

context.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql());
context.attach_query_str(
get_query_kind(&plan_extras.statement),
plan_extras.statement.to_mask_sql(),
);
let interpreter = InterpreterFactory::get(context.clone(), plan).await?;

let data_schema = plan.schema();
Expand Down
3 changes: 0 additions & 3 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,6 @@ pub async fn clickhouse_handler_get(
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;
let format = get_format_with_default(extras.format, default_format)?;

context.attach_query_str(plan.kind(), extras.statement.to_mask_sql());
let interpreter = InterpreterFactory::get(context.clone(), &plan)
.await
.map_err(|err| err.display_with_sql(&sql))
Expand Down Expand Up @@ -357,7 +355,6 @@ pub async fn clickhouse_handler_post(
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;
let schema = plan.schema();
ctx.attach_query_str(plan.kind(), extras.statement.to_mask_sql());
let mut handle = None;
if let Plan::Insert(insert) = &mut plan {
if let InsertInputSource::StreamingWithFormat(format, start, input_context_ref) =
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/servers/http/v1/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,11 @@ pub async fn streaming_load(
.map_err(InternalServerError)?;

let mut planner = Planner::new(context.clone());
let (mut plan, extras) = planner
let (mut plan, _) = planner
.plan_sql(insert_sql)
.await
.map_err(|err| err.display_with_sql(insert_sql))
.map_err(InternalServerError)?;
context.attach_query_str(plan.kind(), extras.statement.to_mask_sql());

let schema = plan.schema();
match &mut plan {
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,10 @@ impl ExecuteState {
let entry = QueryEntry::create(&ctx)?;
let queue_guard = QueriesQueueManager::instance().acquire(entry).await?;

let (plan, plan_extras) = ExecuteState::plan_sql(&sql, ctx.clone())
let (plan, _) = ExecuteState::plan_sql(&sql, ctx.clone())
.await
.map_err(|err| err.display_with_sql(&sql))?;

ctx.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql());
{
// set_var may change settings
let mut guard = format_settings.write();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,7 @@ impl InteractiveWorkerBase {
let entry = QueryEntry::create(&context)?;
let _guard = QueriesQueueManager::instance().acquire(entry).await?;
let mut planner = Planner::new(context.clone());
let (plan, extras) = planner.plan_sql(query).await?;

context.attach_query_str(plan.kind(), extras.statement.to_mask_sql());
let (plan, _) = planner.plan_sql(query).await?;
let interpreter = InterpreterFactory::get(context.clone(), &plan).await;

let has_result_set = plan.has_result_set();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ impl SuggestedBackgroundTasksSource {
sql: String,
) -> Result<Option<RecordBatch>> {
let mut planner = Planner::new(ctx.clone());
let (plan, plan_extras) = planner.plan_sql(sql.as_str()).await?;
ctx.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql());
let (plan, _) = planner.plan_sql(sql.as_str()).await?;
let data_schema = plan.schema();
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;
let stream = interpreter.execute(ctx.clone()).await?;
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/test_kits/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl ConfigBuilder {
let tmp_dir = TempDir::new().expect("create tmp dir failed");
let root = tmp_dir.path().to_str().unwrap().to_string();
conf.storage.params = StorageParams::Fs(StorageFsConfig { root });
conf.storage.allow_insecure = true;

ConfigBuilder { conf }
}
Expand Down
4 changes: 1 addition & 3 deletions src/query/service/tests/it/api/http/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_query::interpreters::InterpreterFactory;
use databend_query::sessions::QueryContext;
use databend_query::sessions::SessionManager;
use databend_query::sessions::SessionType;
use databend_query::sessions::TableContext;
use databend_query::sql::Planner;
use databend_query::test_kits::*;
use poem::get;
Expand Down Expand Up @@ -69,8 +68,7 @@ async fn run_query(query_ctx: &Arc<QueryContext>) -> Result<Arc<dyn Interpreter>
.set_authed_user(user, None)
.await?;
let mut planner = Planner::new(query_ctx.clone());
let (plan, extras) = planner.plan_sql(sql).await?;
query_ctx.attach_query_str(plan.kind(), extras.statement.to_mask_sql());
let (plan, _) = planner.plan_sql(sql).await?;
InterpreterFactory::get(query_ctx.clone(), &plan).await
}

Expand Down
5 changes: 2 additions & 3 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,7 @@ impl TableContext for CtxDelegation {
todo!()
}

fn attach_query_str(&self, _kind: QueryKind, _query: String) {
todo!()
}
fn attach_query_str(&self, _kind: QueryKind, _query: String) {}

fn get_query_str(&self) -> String {
todo!()
Expand Down Expand Up @@ -854,6 +852,7 @@ async fn test_get_same_table_once() -> Result<()> {

let mut planner = Planner::new(ctx.clone());
let (_, _) = planner.plan_sql(query.as_str()).await?;

assert_eq!(
ctx.table_without_cache
.load(std::sync::atomic::Ordering::SeqCst),
Expand Down
28 changes: 28 additions & 0 deletions src/query/service/tests/it/sql/planner/builders/binder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use databend_common_catalog::query_kind::QueryKind;
use databend_common_exception::Result;
use databend_common_sql::Planner;
use databend_common_storages_fuse::TableContext;
use databend_query::sessions::SessionType;
use databend_query::test_kits::TestFixture;

#[tokio::test(flavor = "multi_thread")]
async fn test_query_kind() -> Result<()> {
let fixture = TestFixture::setup().await?;
fixture.create_default_database().await?;
fixture.create_default_table().await?;

let http_session = fixture
.new_session_with_type(SessionType::HTTPQuery)
.await?;
let ctx = http_session.create_query_context().await?;
let mut planner = Planner::new(ctx.clone());
let sql = format!(
"COPY INTO {}.{} from @~/ pattern='.*' FILE_FORMAT = (TYPE = 'csv') PURGE=true FORCE=true max_files=10000;",
fixture.default_db_name(),
fixture.default_table_name()
);
let (_, _) = planner.plan_sql(&sql).await?;
let kind = ctx.get_query_kind();
assert_eq!(kind, QueryKind::CopyIntoTable);
Ok(())
}
1 change: 1 addition & 0 deletions src/query/service/tests/it/sql/planner/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod binder;
mod select_builder;
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,7 @@ impl TableContext for CtxDelegation {
todo!()
}

fn attach_query_str(&self, _kind: QueryKind, _query: String) {
todo!()
}
fn attach_query_str(&self, _kind: QueryKind, _query: String) {}

fn get_query_str(&self) -> String {
todo!()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'query' | 'tenant_id' | 'test' | '' |
| 'query' | 'udf_server_allow_list' | '' | '' |
| 'query' | 'users' | '{"name":"root","auth_type":"no_password","auth_string":null}' | '' |
| 'storage' | 'allow_insecure' | 'false' | '' |
| 'storage' | 'allow_insecure' | 'true' | '' |
| 'storage' | 'azblob.account_key' | '' | '' |
| 'storage' | 'azblob.account_name' | '' | '' |
| 'storage' | 'azblob.container' | '' | '' |
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub use bloom_index::BloomIndexColumns;
pub use expression_parser::*;
pub use format::format_scalar;
pub use metadata::*;
pub use planner::get_query_kind;
pub use planner::PlanExtras;
pub use planner::Planner;
pub use plans::insert::InsertInputSource;
Expand Down
29 changes: 23 additions & 6 deletions src/query/sql/src/planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ impl Planner {
return Err(ErrorCode::SyntaxException("convert prql to sql failed."));
}

if matches!(stmt, Statement::CopyIntoLocation(_)) {
// Indicate binder there is no need to collect column statistics for the binding table.
self.ctx
.attach_query_str(QueryKind::CopyIntoTable, String::new());
}

self.replace_stmt(&mut stmt, sql_dialect);

// Step 3: Bind AST with catalog, and generate a pure logical SExpr
Expand All @@ -153,7 +147,14 @@ impl Planner {
name_resolution_ctx,
metadata.clone(),
);

// Indicate binder there is no need to collect column statistics for the binding table.
self.ctx
.attach_query_str(get_query_kind(&stmt), stmt.to_mask_sql());
let plan = binder.bind(&stmt).await?;
// attach again to avoid the query kind is overwritten by the subquery
self.ctx
.attach_query_str(get_query_kind(&stmt), stmt.to_mask_sql());

// Step 4: Optimize the SExpr with optimizers, and generate optimized physical SExpr
let opt_ctx = OptimizerContext::new(self.ctx.clone(), metadata.clone())
Expand Down Expand Up @@ -236,3 +237,19 @@ impl Planner {
self.add_max_rows_limit(stmt);
}
}

pub fn get_query_kind(stmt: &Statement) -> QueryKind {
match stmt {
Statement::Query { .. } => QueryKind::Query,
Statement::CopyIntoTable(_) => QueryKind::CopyIntoTable,
Statement::CopyIntoLocation(_) => QueryKind::CopyIntoLocation,
Statement::Explain { .. } => QueryKind::Explain,
Statement::Insert(_) => QueryKind::Insert,
Statement::Replace(_)
| Statement::Delete(_)
| Statement::MergeInto(_)
| Statement::OptimizeTable(_)
| Statement::Update(_) => QueryKind::Update,
_ => QueryKind::Other,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ impl ParquetRSTable {

// If the query is `COPY`, we don't need to collect column statistics.
// It's because the only transform could be contained in `COPY` command is projection.
let need_stats_provider = !matches!(ctx.get_query_kind(), QueryKind::CopyIntoTable);
let need_stats_provider = !matches!(
ctx.get_query_kind(),
QueryKind::CopyIntoTable | QueryKind::CopyIntoLocation
);
let settings = ctx.get_settings();
let max_threads = settings.get_max_threads()? as usize;
let max_memory_usage = settings.get_max_memory_usage()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ c5 BIGINT 1 1
c2 BIGINT 1 2
c4 VARCHAR 1 3

query error get diff schema
query error
copy into t1 from @data/parquet/diff_schema/ file_format=(type=parquet) pattern='.*[.]parquet'

query
Expand Down

0 comments on commit 9399794

Please sign in to comment.