From 351fc5167b8689b82ff16686f7adcc9fd0fb68d1 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 20 Oct 2021 23:05:55 +0800 Subject: [PATCH 1/2] remote backend for fuse table --- common/datavalues/src/data_value_ops.rs | 6 ++++ common/datavalues/src/macros.rs | 5 +++- common/exception/src/exception.rs | 1 + common/meta/api/src/meta_api.rs | 11 +++---- common/meta/embedded/src/meta_api_impl.rs | 12 ++++---- common/meta/flight/src/flight_action.rs | 15 ++++++++++ common/meta/flight/src/impls/meta_api_impl.rs | 22 +++++++++----- .../meta/raft-store/src/state_machine/sm.rs | 1 + common/meta/types/src/commit_table_reply.rs | 2 +- common/meta/types/src/lib.rs | 2 +- common/meta/types/src/table_info.rs | 2 ++ metasrv/src/executor/action_handler.rs | 1 + metasrv/src/executor/meta_handlers.rs | 21 +++++++++++++- metasrv/src/meta_service/raftmeta.rs | 29 +++++++++++++++++++ query/src/catalogs/backends/backend.rs | 9 +++--- .../backends/impls/embedded_backend.rs | 24 +++++++-------- .../catalogs/backends/impls/meta_cached.rs | 17 ++++++----- .../catalogs/backends/impls/meta_remote.rs | 19 +++++++----- .../src/catalogs/backends/impls/meta_sync.rs | 20 ++++++++----- .../catalogs/backends/impls/remote_backend.rs | 19 +++++++----- query/src/catalogs/catalog.rs | 11 +++---- .../impls/catalog/metastore_catalog.rs | 19 +++++++----- .../impls/catalog/overlaid_catalog.rs | 21 +++++++++----- .../catalogs/impls/catalog/system_catalog.rs | 11 +++---- .../datasources/table/fuse/io/block_reader.rs | 6 ++-- .../datasources/table/fuse/table_do_append.rs | 12 ++++++-- .../table/fuse/table_do_truncate.rs | 10 +++++-- .../table/fuse/util/col_encoding.rs | 22 +++++++++----- scripts/ci/ci-run-stateless-tests-cluster.sh | 2 +- .../ci/ci-run-stateless-tests-standalone.sh | 2 +- .../deploy/config/databend-query-node-1.toml | 3 +- .../deploy/config/databend-query-node-2.toml | 3 +- .../deploy/config/databend-query-node-3.toml | 3 +- .../09_0000_remote_create_table.result | 2 +- .../09_0000_remote_create_table.sql | 2 +- .../0_stateless/09_0001_remote_insert.sql | 4 +-- .../09_0002_remote_truncate_table.sql | 2 +- .../0_stateless/09_0003_remote_drop_table.sql | 8 ++--- 38 files changed, 261 insertions(+), 120 deletions(-) diff --git a/common/datavalues/src/data_value_ops.rs b/common/datavalues/src/data_value_ops.rs index e2363fdce6315..cc2ae70d8ae92 100644 --- a/common/datavalues/src/data_value_ops.rs +++ b/common/datavalues/src/data_value_ops.rs @@ -85,6 +85,12 @@ impl DataValue { } DataType::Boolean => try_build_array! {values}, DataType::String => try_build_array! {String, values}, + DataType::Date16 => { + try_build_array! {PrimitiveArrayBuilder, u16, UInt16, values} + } + DataType::DateTime32(_) => { + try_build_array! {PrimitiveArrayBuilder, u32, UInt32, values} + } other => Result::Err(ErrorCode::BadDataValueType(format!( "Unexpected type:{} for DataValue List", other diff --git a/common/datavalues/src/macros.rs b/common/datavalues/src/macros.rs index 2fa05cdadd7bd..8bfdf7c01c9f2 100644 --- a/common/datavalues/src/macros.rs +++ b/common/datavalues/src/macros.rs @@ -186,7 +186,10 @@ macro_rules! try_build_array { match value { DataValue::$SCALAR_TY(Some(v)) => builder.append_value(*v), DataValue::$SCALAR_TY(None) => builder.append_null(), - _ => unreachable!(), + dv => { + eprintln!("DV is {:?}", dv); + unreachable!() + } } } Ok(builder.finish().into_series()) diff --git a/common/exception/src/exception.rs b/common/exception/src/exception.rs index 612b21aca82df..957cdcbcb7245 100644 --- a/common/exception/src/exception.rs +++ b/common/exception/src/exception.rs @@ -211,6 +211,7 @@ build_exceptions! { ConcurrentSnapshotInstall(2404), IllegalSnapshot(2405), + TableVersionMissMatch(2406), // KVSrv server error diff --git a/common/meta/api/src/meta_api.rs b/common/meta/api/src/meta_api.rs index ab042c205037f..61eb4b09cda46 100644 --- a/common/meta/api/src/meta_api.rs +++ b/common/meta/api/src/meta_api.rs @@ -16,13 +16,13 @@ use std::sync::Arc; use common_exception::Result; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -56,12 +56,13 @@ pub trait MetaApi: Send + Sync { table_version: Option, ) -> Result>; - async fn commit_table( + async fn upsert_table_option( &self, table_id: MetaId, - new_table_version: MetaVersion, - new_snapshot_location: String, - ) -> Result; + table_version: MetaVersion, + option_key: String, + option_value: String, + ) -> Result; fn name(&self) -> String; } diff --git a/common/meta/embedded/src/meta_api_impl.rs b/common/meta/embedded/src/meta_api_impl.rs index d55bf7f4307c3..51a2f764c896a 100644 --- a/common/meta/embedded/src/meta_api_impl.rs +++ b/common/meta/embedded/src/meta_api_impl.rs @@ -25,7 +25,6 @@ use common_exception::Result; use common_meta_api::MetaApi; use common_meta_raft_store::state_machine::AppliedState; use common_meta_types::Cmd; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; @@ -33,6 +32,7 @@ use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::Table; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -123,6 +123,7 @@ impl MetaApi for MetaEmbedded { let table = Table { table_id: 0, + table_version: 0, table_name: table_name.to_string(), database_id: 0, // this field is unused during the creation of table db_name: db_name.to_string(), @@ -260,12 +261,13 @@ impl MetaApi for MetaEmbedded { Ok(Arc::new(rst)) } - async fn commit_table( + async fn upsert_table_option( &self, _table_id: MetaId, - _new_table_version: MetaVersion, - _new_snapshot_location: String, - ) -> Result { + _table_version: MetaVersion, + _option_key: String, + _option_value: String, + ) -> Result { todo!() } diff --git a/common/meta/flight/src/flight_action.rs b/common/meta/flight/src/flight_action.rs index 60842346f2677..2e12a4c90854a 100644 --- a/common/meta/flight/src/flight_action.rs +++ b/common/meta/flight/src/flight_action.rs @@ -30,6 +30,7 @@ use common_meta_types::MetaVersion; use common_meta_types::PrefixListReply; use common_meta_types::TableInfo; use common_meta_types::UpsertKVActionReply; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -71,6 +72,7 @@ pub enum MetaFlightAction { GetTableExt(GetTableExtReq), GetTables(GetTablesAction), GetDatabases(GetDatabasesAction), + CommitTable(UpsertTableOptionReq), // general purpose kv UpsertKV(UpsertKVAction), @@ -254,6 +256,19 @@ action_declare!( MetaFlightAction::GetTableExt ); +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +pub struct UpsertTableOptionReq { + pub table_id: MetaId, + pub table_version: MetaVersion, + pub option_key: String, + pub option_value: String, +} +action_declare!( + UpsertTableOptionReq, + UpsertTableOptionReply, + MetaFlightAction::CommitTable +); + // - get tables #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] pub struct GetTablesAction { diff --git a/common/meta/flight/src/impls/meta_api_impl.rs b/common/meta/flight/src/impls/meta_api_impl.rs index d4181d80e28bc..6c0a173b23ccc 100644 --- a/common/meta/flight/src/impls/meta_api_impl.rs +++ b/common/meta/flight/src/impls/meta_api_impl.rs @@ -16,13 +16,13 @@ use std::sync::Arc; use common_meta_api::MetaApi; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -38,6 +38,7 @@ use crate::GetTableAction; use crate::GetTableExtReq; use crate::GetTablesAction; use crate::MetaFlightClient; +use crate::UpsertTableOptionReq; #[async_trait::async_trait] impl MetaApi for MetaFlightClient { @@ -101,13 +102,20 @@ impl MetaApi for MetaFlightClient { self.do_action(GetTableExtReq { tbl_id, tbl_ver }).await } - async fn commit_table( + async fn upsert_table_option( &self, - _table_id: MetaId, - _new_table_version: MetaVersion, - _new_snapshot_location: String, - ) -> common_exception::Result { - todo!() + table_id: MetaId, + table_version: MetaVersion, + option_key: String, + option_value: String, + ) -> common_exception::Result { + self.do_action(UpsertTableOptionReq { + table_id, + table_version, + option_key, + option_value, + }) + .await } fn name(&self) -> String { diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index 27f1ce735f991..a16ba8e5f2fe0 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -401,6 +401,7 @@ impl StateMachine { let table_id = self.incr_seq(SEQ_TABLE_ID).await?; let table = Table { table_id, + table_version: 0, table_name: table_name.to_string(), database_id: dbi.database_id, db_name: db_name.to_string(), diff --git a/common/meta/types/src/commit_table_reply.rs b/common/meta/types/src/commit_table_reply.rs index a7a52f68c6665..196ab053cfe81 100644 --- a/common/meta/types/src/commit_table_reply.rs +++ b/common/meta/types/src/commit_table_reply.rs @@ -13,4 +13,4 @@ // limitations under the License. // -pub type CommitTableReply = (); +pub type UpsertTableOptionReply = (); diff --git a/common/meta/types/src/lib.rs b/common/meta/types/src/lib.rs index c0449f48787be..0eec3c096c8ef 100644 --- a/common/meta/types/src/lib.rs +++ b/common/meta/types/src/lib.rs @@ -18,7 +18,7 @@ pub use cluster::Node; pub use cluster::NodeInfo; pub use cluster::Slot; pub use cmd::Cmd; -pub use commit_table_reply::CommitTableReply; +pub use commit_table_reply::UpsertTableOptionReply; pub use common_meta_sled_store::KVMeta; pub use common_meta_sled_store::KVValue; pub use common_meta_sled_store::SeqValue; diff --git a/common/meta/types/src/table_info.rs b/common/meta/types/src/table_info.rs index 205983b640498..5f062296bfd01 100644 --- a/common/meta/types/src/table_info.rs +++ b/common/meta/types/src/table_info.rs @@ -26,6 +26,8 @@ use crate::MetaVersion; pub struct Table { pub table_id: u64, + pub table_version: u64, + /// name of this table pub table_name: String, diff --git a/metasrv/src/executor/action_handler.rs b/metasrv/src/executor/action_handler.rs index 1d0a3bc2a18b2..6efacaaa59337 100644 --- a/metasrv/src/executor/action_handler.rs +++ b/metasrv/src/executor/action_handler.rs @@ -73,6 +73,7 @@ impl ActionHandler { MetaFlightAction::GetTable(a) => s.serialize(self.handle(a).await?), MetaFlightAction::GetTables(a) => s.serialize(self.handle(a).await?), MetaFlightAction::GetTableExt(a) => s.serialize(self.handle(a).await?), + MetaFlightAction::CommitTable(a) => s.serialize(self.handle(a).await?), } } } diff --git a/metasrv/src/executor/meta_handlers.rs b/metasrv/src/executor/meta_handlers.rs index a3139fec2ab41..d59835e3f4c75 100644 --- a/metasrv/src/executor/meta_handlers.rs +++ b/metasrv/src/executor/meta_handlers.rs @@ -30,6 +30,7 @@ use common_meta_flight::GetDatabasesAction; use common_meta_flight::GetTableAction; use common_meta_flight::GetTableExtReq; use common_meta_flight::GetTablesAction; +use common_meta_flight::UpsertTableOptionReq; use common_meta_raft_store::state_machine::AppliedState; use common_meta_types::Cmd::CreateDatabase; use common_meta_types::Cmd::CreateTable; @@ -41,6 +42,7 @@ use common_meta_types::DatabaseInfo; use common_meta_types::LogEntry; use common_meta_types::Table; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use log::info; use crate::executor::action_handler::RequestHandler; @@ -156,6 +158,7 @@ impl RequestHandler for ActionHandler { let table = Table { table_id: 0, + table_version: 0, table_name: table_name.to_string(), database_id: 0, // this field is unused during the creation of table db_name: db_name.to_string(), @@ -277,7 +280,7 @@ impl RequestHandler for ActionHandler { let rst = TableInfo { database_id: db.database_id, table_id: table.table_id, - version: 0, // placeholder, not yet implemented in meta service + version: table.table_version, db: db_name.clone(), name: table_name.clone(), schema: Arc::new(arrow_schema.into()), @@ -348,3 +351,19 @@ impl RequestHandler for ActionHandler { Ok(res) } } +#[async_trait::async_trait] +impl RequestHandler for ActionHandler { + async fn handle( + &self, + req: UpsertTableOptionReq, + ) -> common_exception::Result { + self.meta_node + .upsert_table_opt( + req.table_id, + req.table_version, + req.option_key, + req.option_value, + ) + .await + } +} diff --git a/metasrv/src/meta_service/raftmeta.rs b/metasrv/src/meta_service/raftmeta.rs index d65bedb1874ef..fb0a9fc34723e 100644 --- a/metasrv/src/meta_service/raftmeta.rs +++ b/metasrv/src/meta_service/raftmeta.rs @@ -1027,6 +1027,35 @@ impl MetaNode { sm.get_table(tid) } + #[tracing::instrument(level = "debug", skip(self))] + pub async fn upsert_table_opt( + &self, + table_id: u64, + table_version: u64, + opt_key: String, + opt_value: String, + ) -> common_exception::Result<()> { + // non-consensus modification, tobe fixed latter + let mut sm = self.sto.state_machine.write().await; + if let Some(tbl) = sm.tables.get_mut(&table_id) { + if tbl.table_version != table_version { + Err(ErrorCode::TableVersionMissMatch(format!( + "targeting version {}, current version {}", + table_version, tbl.table_version, + ))) + } else { + tbl.table_options.insert(opt_key, opt_value); + tbl.table_version += 1; + Ok(()) + } + } else { + Err(ErrorCode::UnknownTable(format!( + "unknown table of id {}", + table_id + ))) + } + } + #[tracing::instrument(level = "debug", skip(self))] pub async fn get_kv(&self, key: &str) -> common_exception::Result>> { // inconsistent get: from local state machine diff --git a/query/src/catalogs/backends/backend.rs b/query/src/catalogs/backends/backend.rs index 2c83a3f3bfe57..8e6f7566e8cec 100644 --- a/query/src/catalogs/backends/backend.rs +++ b/query/src/catalogs/backends/backend.rs @@ -16,13 +16,13 @@ use std::sync::Arc; use common_exception::Result; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -55,12 +55,13 @@ pub trait MetaApiSync: Send + Sync { table_version: Option, ) -> Result>; - fn commit_table( + fn upsert_table_option( &self, table_id: MetaId, new_table_version: MetaVersion, - new_snapshot_location: String, - ) -> Result; + table_option_key: String, + table_option_value: String, + ) -> Result; fn name(&self) -> String; } diff --git a/query/src/catalogs/backends/impls/embedded_backend.rs b/query/src/catalogs/backends/impls/embedded_backend.rs index 2acdec63f7dbc..535c2edbedaa6 100644 --- a/query/src/catalogs/backends/impls/embedded_backend.rs +++ b/query/src/catalogs/backends/impls/embedded_backend.rs @@ -18,13 +18,13 @@ use std::sync::Arc; use common_exception::ErrorCode; use common_infallible::RwLock; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -317,12 +317,13 @@ impl MetaApiSync for MetaEmbeddedSync { ))) } - fn commit_table( + fn upsert_table_option( &self, table_id: MetaId, - new_table_version: MetaVersion, - new_snapshot_location: String, - ) -> common_exception::Result { + table_version: MetaVersion, + table_option_name: String, + table_option_value: String, + ) -> common_exception::Result { let mut map = self.databases.write(); for (_, tbl_idx) in map.values_mut() { match tbl_idx.id2meta.get(&table_id) { @@ -330,21 +331,18 @@ impl MetaApiSync for MetaEmbeddedSync { continue; } Some(tbl) => { - if tbl.version + 1 == new_table_version { + if tbl.version == table_version { let mut new_tbl_info = tbl.as_ref().clone(); new_tbl_info .options - // TODO constant - .insert("SNAPSHOT_LOC".to_string(), new_snapshot_location); - new_tbl_info.version = new_table_version; + .insert(table_option_name, table_option_value); + new_tbl_info.version += 1; tbl_idx.insert(new_tbl_info); return Ok(()); } else { return Err(ErrorCode::CommitTableError(format!( - "none-consecutive table version: prev[{}], committing [{}], table_id {}", - tbl.version, - new_table_version, - table_id + "expecting table version: [{}], but got [{}]. (table_id {})", + tbl.version, table_version, table_id ))); } } diff --git a/query/src/catalogs/backends/impls/meta_cached.rs b/query/src/catalogs/backends/impls/meta_cached.rs index 28a7c45ca927f..2374985b84af4 100644 --- a/query/src/catalogs/backends/impls/meta_cached.rs +++ b/query/src/catalogs/backends/impls/meta_cached.rs @@ -19,13 +19,13 @@ use common_cache::Cache; use common_cache::LruCache; use common_exception::Result; use common_meta_api::MetaApi; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -104,13 +104,16 @@ impl MetaApi for MetaCached { Ok(reply) } - async fn commit_table( + async fn upsert_table_option( &self, - _table_id: MetaId, - _new_table_version: MetaVersion, - _new_snapshot_location: String, - ) -> Result { - todo!() + table_id: MetaId, + table_version: MetaVersion, + option_key: String, + option_value: String, + ) -> Result { + self.inner + .upsert_table_option(table_id, table_version, option_key, option_value) + .await } fn name(&self) -> String { diff --git a/query/src/catalogs/backends/impls/meta_remote.rs b/query/src/catalogs/backends/impls/meta_remote.rs index d09a7267319fa..722646c300e10 100644 --- a/query/src/catalogs/backends/impls/meta_remote.rs +++ b/query/src/catalogs/backends/impls/meta_remote.rs @@ -18,13 +18,13 @@ use std::time::Duration; use common_exception::Result; use common_meta_api::MetaApi; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -124,13 +124,18 @@ impl MetaApi for MetaRemote { .await } - async fn commit_table( + async fn upsert_table_option( &self, - _table_id: MetaId, - _new_table_version: MetaVersion, - _new_snapshot_location: String, - ) -> Result { - todo!() + table_id: MetaId, + table_version: MetaVersion, + option_key: String, + option_value: String, + ) -> Result { + self.query_backend(move |cli| async move { + cli.upsert_table_option(table_id, table_version, option_key, option_value) + .await + }) + .await } fn name(&self) -> String { diff --git a/query/src/catalogs/backends/impls/meta_sync.rs b/query/src/catalogs/backends/impls/meta_sync.rs index d131b706be515..731f1df57a2bc 100644 --- a/query/src/catalogs/backends/impls/meta_sync.rs +++ b/query/src/catalogs/backends/impls/meta_sync.rs @@ -19,13 +19,13 @@ use common_base::BlockingWait; use common_base::Runtime; use common_exception::Result; use common_meta_api::MetaApi; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -108,16 +108,22 @@ impl MetaApiSync for MetaSync { .wait_in(&self.rt, self.timeout)? } - fn commit_table( + fn upsert_table_option( &self, table_id: MetaId, - new_table_version: MetaVersion, - new_snapshot_location: String, - ) -> Result { + table_version: MetaVersion, + table_option_key: String, + table_option_value: String, + ) -> Result { let x = self.inner.clone(); (async move { - x.commit_table(table_id, new_table_version, new_snapshot_location) - .await + x.upsert_table_option( + table_id, + table_version, + table_option_key, + table_option_value, + ) + .await }) .wait_in(&self.rt, self.timeout)? } diff --git a/query/src/catalogs/backends/impls/remote_backend.rs b/query/src/catalogs/backends/impls/remote_backend.rs index c0042b3a94954..f61fe4376d6b7 100644 --- a/query/src/catalogs/backends/impls/remote_backend.rs +++ b/query/src/catalogs/backends/impls/remote_backend.rs @@ -18,13 +18,13 @@ use std::sync::Arc; use std::time::Duration; use common_exception::Result; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::CreateTableReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -113,14 +113,19 @@ impl + Send + Sync> MetaApiSync for U { self.deref().get_table_by_id(table_id, table_version) } - fn commit_table( + fn upsert_table_option( &self, table_id: MetaId, - new_table_version: MetaVersion, - new_snapshot_location: String, - ) -> Result { - self.deref() - .commit_table(table_id, new_table_version, new_snapshot_location) + table_version: MetaVersion, + table_option_key: String, + table_option_value: String, + ) -> Result { + self.deref().upsert_table_option( + table_id, + table_version, + table_option_key, + table_option_value, + ) } fn name(&self) -> String { diff --git a/query/src/catalogs/catalog.rs b/query/src/catalogs/catalog.rs index 77588bd23dcf2..8fc2dc3275952 100644 --- a/query/src/catalogs/catalog.rs +++ b/query/src/catalogs/catalog.rs @@ -15,10 +15,10 @@ use std::sync::Arc; use common_exception::Result; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::MetaId; use common_meta_types::MetaVersion; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -65,12 +65,13 @@ pub trait Catalog { unimplemented!() } - fn commit_table( + fn upsert_table_option( &self, table_id: MetaId, - new_table_version: MetaVersion, - new_snapshot_location: String, - ) -> common_exception::Result; + table_version: MetaVersion, + table_option_key: String, + table_option_value: String, + ) -> common_exception::Result; // Operation with database. fn create_database(&self, plan: CreateDatabasePlan) -> Result; diff --git a/query/src/catalogs/impls/catalog/metastore_catalog.rs b/query/src/catalogs/impls/catalog/metastore_catalog.rs index 7c9b9b78e1ac1..c9ec7ff951e51 100644 --- a/query/src/catalogs/impls/catalog/metastore_catalog.rs +++ b/query/src/catalogs/impls/catalog/metastore_catalog.rs @@ -21,12 +21,12 @@ use common_dal::InMemoryData; use common_exception::ErrorCode; use common_exception::Result; use common_infallible::RwLock; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::DatabaseInfo; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_meta_types::TableInfo; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -179,14 +179,19 @@ impl Catalog for MetaStoreCatalog { self.build_table_instance(table_info.as_ref().clone()) } - fn commit_table( + fn upsert_table_option( &self, table_id: MetaId, - new_table_version: MetaVersion, - new_snapshot_location: String, - ) -> Result { - self.meta - .commit_table(table_id, new_table_version, new_snapshot_location) + table_version: MetaVersion, + table_option_key: String, + table_option_value: String, + ) -> Result { + self.meta.upsert_table_option( + table_id, + table_version, + table_option_key, + table_option_value, + ) } fn create_table(&self, plan: CreateTablePlan) -> common_exception::Result<()> { diff --git a/query/src/catalogs/impls/catalog/overlaid_catalog.rs b/query/src/catalogs/impls/catalog/overlaid_catalog.rs index f50807e21050c..807a46c018c36 100644 --- a/query/src/catalogs/impls/catalog/overlaid_catalog.rs +++ b/query/src/catalogs/impls/catalog/overlaid_catalog.rs @@ -17,10 +17,10 @@ use std::collections::HashMap; use std::sync::Arc; use common_exception::ErrorCode; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::MetaId; use common_meta_types::MetaVersion; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -147,15 +147,20 @@ impl Catalog for OverlaidCatalog { Ok(func) } - fn commit_table( + fn upsert_table_option( &self, table_id: MetaId, - new_table_version: MetaVersion, - new_snapshot_location: String, - ) -> common_exception::Result { - // commit table in BOTTOM layer only - self.bottom - .commit_table(table_id, new_table_version, new_snapshot_location) + table_version: MetaVersion, + table_option_key: String, + table_option_value: String, + ) -> common_exception::Result { + // upsert table option in BOTTOM layer only + self.bottom.upsert_table_option( + table_id, + table_version, + table_option_key, + table_option_value, + ) } fn create_database( diff --git a/query/src/catalogs/impls/catalog/system_catalog.rs b/query/src/catalogs/impls/catalog/system_catalog.rs index a77993e70eb85..1b663f706c6b1 100644 --- a/query/src/catalogs/impls/catalog/system_catalog.rs +++ b/query/src/catalogs/impls/catalog/system_catalog.rs @@ -17,10 +17,10 @@ use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; -use common_meta_types::CommitTableReply; use common_meta_types::CreateDatabaseReply; use common_meta_types::MetaId; use common_meta_types::MetaVersion; +use common_meta_types::UpsertTableOptionReply; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::DropDatabasePlan; @@ -134,12 +134,13 @@ impl Catalog for SystemCatalog { Ok(table.clone()) } - fn commit_table( + fn upsert_table_option( &self, table_id: MetaId, - _new_table_version: MetaVersion, - _new_snapshot_location: String, - ) -> Result { + _table_version: MetaVersion, + _key: String, + _value: String, + ) -> Result { Err(ErrorCode::UnImplement(format!( "commit table not allowed for system catalog {}", table_id diff --git a/query/src/datasources/table/fuse/io/block_reader.rs b/query/src/datasources/table/fuse/io/block_reader.rs index c39aca198bcf7..1180f1dea6c21 100644 --- a/query/src/datasources/table/fuse/io/block_reader.rs +++ b/query/src/datasources/table/fuse/io/block_reader.rs @@ -56,6 +56,7 @@ mod cache_keys { pub type BlockMetaCache = Arc>>>; } +// TODO can we return a stream of DataBlock instead? pub async fn do_read( part: Part, data_accessor: Arc, @@ -99,8 +100,9 @@ pub async fn do_read( }); // TODO configuration of the buffer size - let n = std::cmp::min(10, col_num); - let data_cols = stream.buffer_unordered(n).try_collect().await?; + let buffer_size = 10; + let n = std::cmp::min(buffer_size, col_num); + let data_cols = stream.buffered(n).try_collect().await?; let block = DataBlock::create(Arc::new(DataSchema::from(arrow_schema)), data_cols); Ok(block) diff --git a/query/src/datasources/table/fuse/table_do_append.rs b/query/src/datasources/table/fuse/table_do_append.rs index d3bc6ba2ec4c1..d93d36de304c0 100644 --- a/query/src/datasources/table/fuse/table_do_append.rs +++ b/query/src/datasources/table/fuse/table_do_append.rs @@ -26,6 +26,7 @@ use common_planners::InsertIntoPlan; use uuid::Uuid; use crate::datasources::table::fuse::util; +use crate::datasources::table::fuse::util::TBL_OPT_KEY_SNAPSHOT_LOC; use crate::datasources::table::fuse::BlockAppender; use crate::datasources::table::fuse::FuseTable; use crate::datasources::table::fuse::SegmentInfo; @@ -79,7 +80,7 @@ impl FuseTable { // 5. commit let table_id = insert_plan.tbl_id; - commit(&io_ctx, table_id, self.table_info.version + 1, snapshot_loc)?; + commit(&io_ctx, table_id, self.table_info.version, snapshot_loc)?; } Ok(()) } @@ -109,7 +110,7 @@ fn merge_snapshot( fn commit( io_ctx: &TableIOContext, table_id: MetaId, - new_table_version: MetaVersion, + table_version: MetaVersion, new_snapshot_location: String, ) -> Result<()> { use crate::catalogs::Catalog; @@ -117,5 +118,10 @@ fn commit( .get_user_data()? .expect("DatabendQueryContext should not be None"); let catalog = ctx.get_catalog(); - catalog.commit_table(table_id, new_table_version, new_snapshot_location) + catalog.upsert_table_option( + table_id, + table_version, + TBL_OPT_KEY_SNAPSHOT_LOC.to_string(), + new_snapshot_location, + ) } diff --git a/query/src/datasources/table/fuse/table_do_truncate.rs b/query/src/datasources/table/fuse/table_do_truncate.rs index 2a53780049c47..bcb17771ebbf5 100644 --- a/query/src/datasources/table/fuse/table_do_truncate.rs +++ b/query/src/datasources/table/fuse/table_do_truncate.rs @@ -24,6 +24,7 @@ use uuid::Uuid; use crate::catalogs::Catalog; use crate::catalogs::Table; use crate::datasources::table::fuse::util; +use crate::datasources::table::fuse::util::TBL_OPT_KEY_SNAPSHOT_LOC; use crate::datasources::table::fuse::FuseTable; use crate::sessions::DatabendQueryContext; @@ -52,8 +53,13 @@ impl FuseTable { let catalog = ctx.get_catalog(); let table_id = self.get_id(); - let new_table_version = self.table_info.version + 1; - catalog.commit_table(table_id, new_table_version, new_snapshot_loc)? + // TODO backoff retry + catalog.upsert_table_option( + table_id, + self.table_info.version, + TBL_OPT_KEY_SNAPSHOT_LOC.to_string(), + new_snapshot_loc, + )? } Ok(()) diff --git a/query/src/datasources/table/fuse/util/col_encoding.rs b/query/src/datasources/table/fuse/util/col_encoding.rs index 21d8871a5f164..fcae70a7b690d 100644 --- a/query/src/datasources/table/fuse/util/col_encoding.rs +++ b/query/src/datasources/table/fuse/util/col_encoding.rs @@ -35,12 +35,18 @@ use common_arrow::parquet::encoding::Encoding; /// ) /// } /// ~~~ -pub fn col_encoding(data_type: &ArrowDataType) -> Encoding { - match data_type { - ArrowDataType::Binary - | ArrowDataType::LargeBinary - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 => Encoding::DeltaLengthByteArray, - _ => Encoding::Plain, - } +/// +/// +pub fn col_encoding(_data_type: &ArrowDataType) -> Encoding { + // Although encoding does work, parquet2 has not implemented decoding of DeltaLengthByteArray yet, we fallback to Plain + // From parquet2: Decoding "DeltaLengthByteArray"-encoded required V2 pages is not yet implemented for Binary. + // + //match data_type { + // ArrowDataType::Binary + // | ArrowDataType::LargeBinary + // | ArrowDataType::Utf8 + // | ArrowDataType::LargeUtf8 => Encoding::DeltaLengthByteArray, + // _ => Encoding::Plain, + //} + Encoding::Plain } diff --git a/scripts/ci/ci-run-stateless-tests-cluster.sh b/scripts/ci/ci-run-stateless-tests-cluster.sh index 78e1e10b6e877..f0faa9a4770e6 100755 --- a/scripts/ci/ci-run-stateless-tests-cluster.sh +++ b/scripts/ci/ci-run-stateless-tests-cluster.sh @@ -9,4 +9,4 @@ SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)" cd "$SCRIPT_PATH/../../tests" || exit echo "Starting databend-test" -./databend-test '^0[^4]_' --mode 'cluster' --skip '^09_*' +./databend-test '^0[^4]_' --mode 'cluster' diff --git a/scripts/ci/ci-run-stateless-tests-standalone.sh b/scripts/ci/ci-run-stateless-tests-standalone.sh index b82989f8d7979..2670aadecf734 100755 --- a/scripts/ci/ci-run-stateless-tests-standalone.sh +++ b/scripts/ci/ci-run-stateless-tests-standalone.sh @@ -9,4 +9,4 @@ SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)" cd "$SCRIPT_PATH/../../tests" || exit echo "Starting databend-test" -./databend-test --mode 'standalone' --skip '^09_*' +./databend-test --mode 'standalone' diff --git a/scripts/deploy/config/databend-query-node-1.toml b/scripts/deploy/config/databend-query-node-1.toml index 9236b4d710f1b..c0425fcb2d831 100644 --- a/scripts/deploy/config/databend-query-node-1.toml +++ b/scripts/deploy/config/databend-query-node-1.toml @@ -40,10 +40,11 @@ meta_client_timeout_in_second = 60 # Storage config. [storage] # disk|s3 -storage_type = "" +storage_type = "disk" # DISK storage. [storage.disk] +data_path = "stateless_test_data" # S3 storage. [storage.s3] diff --git a/scripts/deploy/config/databend-query-node-2.toml b/scripts/deploy/config/databend-query-node-2.toml index 1defc865f2c8f..511e83ba3e6c0 100644 --- a/scripts/deploy/config/databend-query-node-2.toml +++ b/scripts/deploy/config/databend-query-node-2.toml @@ -38,10 +38,11 @@ meta_client_timeout_in_second = 60 # Storage config. [storage] # disk|s3 -storage_type = "" +storage_type = "disk" # DISK storage. [storage.disk] +data_path = "stateless_test_data" # S3 storage. [storage.s3] diff --git a/scripts/deploy/config/databend-query-node-3.toml b/scripts/deploy/config/databend-query-node-3.toml index ba0f9abd08cc5..51bac9f192a88 100644 --- a/scripts/deploy/config/databend-query-node-3.toml +++ b/scripts/deploy/config/databend-query-node-3.toml @@ -38,10 +38,11 @@ meta_client_timeout_in_second = 60 # Storage config. [storage] # disk|s3 -storage_type = "" +storage_type = "disk" # DISK storage. [storage.disk] +data_path = "stateless_test_data" # S3 storage. [storage.s3] diff --git a/tests/suites/0_stateless/09_0000_remote_create_table.result b/tests/suites/0_stateless/09_0000_remote_create_table.result index 7f78c55b7ff89..dc54193862615 100644 --- a/tests/suites/0_stateless/09_0000_remote_create_table.result +++ b/tests/suites/0_stateless/09_0000_remote_create_table.result @@ -1 +1 @@ -db1 t1 remote +db1 t1 fuse diff --git a/tests/suites/0_stateless/09_0000_remote_create_table.sql b/tests/suites/0_stateless/09_0000_remote_create_table.sql index 79922b0d6c9d3..e3cc30e7331ef 100644 --- a/tests/suites/0_stateless/09_0000_remote_create_table.sql +++ b/tests/suites/0_stateless/09_0000_remote_create_table.sql @@ -3,7 +3,7 @@ DROP DATABASE IF EXISTS db1; CREATE DATABASE db1; USE db1; -CREATE TABLE IF NOT EXISTS t1(a int, b varchar) Engine = remote; +CREATE TABLE IF NOT EXISTS t1(a int, b varchar) Engine = fuse; SELECT * FROM system.tables WHERE database='db1'; DROP TABLE t1; diff --git a/tests/suites/0_stateless/09_0001_remote_insert.sql b/tests/suites/0_stateless/09_0001_remote_insert.sql index 72bf3030aa7f4..c514ebd666c0a 100644 --- a/tests/suites/0_stateless/09_0001_remote_insert.sql +++ b/tests/suites/0_stateless/09_0001_remote_insert.sql @@ -2,13 +2,13 @@ DROP DATABASE IF EXISTS db1; CREATE DATABASE db1; USE db1; -CREATE TABLE IF NOT EXISTS t1(a UInt32, b UInt64, c String) Engine = remote ; +CREATE TABLE IF NOT EXISTS t1(a UInt32, b UInt64, c String) Engine = fuse; INSERT INTO t1 (a,b,c) values ( 1, 1, '1' ), (2, 2, '"2"-"2"'); SELECT * FROM t1; SELECT sum(a) from t1; -CREATE TABLE IF NOT EXISTS t2(a Boolean, b Timestamp, c Date) Engine = remote; +CREATE TABLE IF NOT EXISTS t2(a Boolean, b Timestamp, c Date) Engine = fuse; INSERT INTO t2 (a,b,c) values(true, '2021-09-07 21:38:35', '2021-09-07'), (false, 1631050715, 18877); SELECT * FROM t2; diff --git a/tests/suites/0_stateless/09_0002_remote_truncate_table.sql b/tests/suites/0_stateless/09_0002_remote_truncate_table.sql index 71703746f61da..de8e69ba98680 100644 --- a/tests/suites/0_stateless/09_0002_remote_truncate_table.sql +++ b/tests/suites/0_stateless/09_0002_remote_truncate_table.sql @@ -2,7 +2,7 @@ DROP DATABASE IF EXISTS db1; CREATE DATABASE db1; USE db1; -CREATE TABLE IF NOT EXISTS t(a varchar, b varchar) Engine = remote; +CREATE TABLE IF NOT EXISTS t(a varchar, b varchar) Engine = fuse; INSERT INTO t(a,b) VALUES('1', 'v1'),('2','v2'); SELECT * FROM t; TRUNCATE TABLE t; diff --git a/tests/suites/0_stateless/09_0003_remote_drop_table.sql b/tests/suites/0_stateless/09_0003_remote_drop_table.sql index 827fb0accd3be..7fedcb433eeb0 100644 --- a/tests/suites/0_stateless/09_0003_remote_drop_table.sql +++ b/tests/suites/0_stateless/09_0003_remote_drop_table.sql @@ -2,16 +2,16 @@ DROP DATABASE IF EXISTS db1; CREATE DATABASE db1; USE db1; -CREATE TABLE IF NOT EXISTS t(a varchar, b varchar) Engine = remote; +CREATE TABLE IF NOT EXISTS t(a varchar, b varchar) Engine = fuse; INSERT INTO t(a,b) VALUES('1', 'v1'),('2','v2'); SELECT * FROM t; -CREATE TABLE IF NOT EXISTS t2(a varchar, b varchar) Engine = remote; +CREATE TABLE IF NOT EXISTS t2(a varchar, b varchar) Engine = fuse; INSERT INTO t2(a,b) VALUES('t2_1', 't2_v1'),('t2_2','t2_v2'); SELECT * FROM t2; DROP TABLE t; -CREATE TABLE IF NOT EXISTS t(a varchar, b varchar) Engine = remote; +CREATE TABLE IF NOT EXISTS t(a varchar, b varchar) Engine = fuse; SELECT * FROM t; SELECT * FROM t2; @@ -20,7 +20,7 @@ DROP DATABASE db1; CREATE DATABASE db1; USE db1; -CREATE TABLE IF NOT EXISTS t2(a varchar, b varchar) Engine = remote; +CREATE TABLE IF NOT EXISTS t2(a varchar, b varchar) Engine = fuse; SELECT * FROM t2; DROP DATABASE IF EXISTS db1; From b8b128ee3fdc2d97309f82c3e93ccdffbd333b21 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 21 Oct 2021 12:15:45 +0800 Subject: [PATCH 2/2] add DataType::Data32 for `try_into_data_array` --- common/datavalues/src/data_value_ops.rs | 3 +++ common/datavalues/src/macros.rs | 5 +---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/datavalues/src/data_value_ops.rs b/common/datavalues/src/data_value_ops.rs index cc2ae70d8ae92..04475c5ff1b16 100644 --- a/common/datavalues/src/data_value_ops.rs +++ b/common/datavalues/src/data_value_ops.rs @@ -88,6 +88,9 @@ impl DataValue { DataType::Date16 => { try_build_array! {PrimitiveArrayBuilder, u16, UInt16, values} } + DataType::Date32 => { + try_build_array! {PrimitiveArrayBuilder, i32, Int32, values} + } DataType::DateTime32(_) => { try_build_array! {PrimitiveArrayBuilder, u32, UInt32, values} } diff --git a/common/datavalues/src/macros.rs b/common/datavalues/src/macros.rs index 8bfdf7c01c9f2..2fa05cdadd7bd 100644 --- a/common/datavalues/src/macros.rs +++ b/common/datavalues/src/macros.rs @@ -186,10 +186,7 @@ macro_rules! try_build_array { match value { DataValue::$SCALAR_TY(Some(v)) => builder.append_value(*v), DataValue::$SCALAR_TY(None) => builder.append_null(), - dv => { - eprintln!("DV is {:?}", dv); - unreachable!() - } + _ => unreachable!(), } } Ok(builder.finish().into_series())