Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-2361: remote backend for fuse table #2355

Merged
merged 2 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/datavalues/src/data_value_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ impl DataValue {
}
DataType::Boolean => try_build_array! {values},
DataType::String => try_build_array! {String, values},
DataType::Date16 => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss Date32

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

try_build_array! {PrimitiveArrayBuilder, u16, UInt16, values}
}
DataType::Date32 => {
try_build_array! {PrimitiveArrayBuilder, i32, Int32, values}
}
DataType::DateTime32(_) => {
try_build_array! {PrimitiveArrayBuilder, u32, UInt32, values}
}
other => Result::Err(ErrorCode::BadDataValueType(format!(
"Unexpected type:{} for DataValue List",
other
Expand Down
1 change: 1 addition & 0 deletions common/exception/src/exception.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ build_exceptions! {

ConcurrentSnapshotInstall(2404),
IllegalSnapshot(2405),
TableVersionMissMatch(2406),

// KVSrv server error

Expand Down
11 changes: 6 additions & 5 deletions common/meta/api/src/meta_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
use std::sync::Arc;

use common_exception::Result;
use common_meta_types::CommitTableReply;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateTableReply;
use common_meta_types::DatabaseInfo;
use common_meta_types::MetaId;
use common_meta_types::MetaVersion;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand Down Expand Up @@ -56,12 +56,13 @@ pub trait MetaApi: Send + Sync {
table_version: Option<MetaVersion>,
) -> Result<Arc<TableInfo>>;

async fn commit_table(
async fn upsert_table_option(
&self,
table_id: MetaId,
new_table_version: MetaVersion,
new_snapshot_location: String,
) -> Result<CommitTableReply>;
table_version: MetaVersion,
option_key: String,
option_value: String,
) -> Result<UpsertTableOptionReply>;
Comment on lines +59 to +65
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just define a upsert_table, to update an entire table instance?

An update of a single field is only useful with non-serialized concurrency control. E.g. two concurrent commutative update operations to a single table.

As MetaVersion must be matched, it is serialized. Updating the entire table simplifies the backend impl.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, that is more reasonable. let's fix this in latter prs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then everything else looks good enough to me.


fn name(&self) -> String;
}
12 changes: 7 additions & 5 deletions common/meta/embedded/src/meta_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ use common_exception::Result;
use common_meta_api::MetaApi;
use common_meta_raft_store::state_machine::AppliedState;
use common_meta_types::Cmd;
use common_meta_types::CommitTableReply;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateTableReply;
use common_meta_types::DatabaseInfo;
use common_meta_types::MetaId;
use common_meta_types::MetaVersion;
use common_meta_types::Table;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand Down Expand Up @@ -123,6 +123,7 @@ impl MetaApi for MetaEmbedded {

let table = Table {
table_id: 0,
table_version: 0,
table_name: table_name.to_string(),
database_id: 0, // this field is unused during the creation of table
db_name: db_name.to_string(),
Expand Down Expand Up @@ -260,12 +261,13 @@ impl MetaApi for MetaEmbedded {
Ok(Arc::new(rst))
}

async fn commit_table(
async fn upsert_table_option(
&self,
_table_id: MetaId,
_new_table_version: MetaVersion,
_new_snapshot_location: String,
) -> Result<CommitTableReply> {
_table_version: MetaVersion,
_option_key: String,
_option_value: String,
) -> Result<UpsertTableOptionReply> {
todo!()
}

Expand Down
15 changes: 15 additions & 0 deletions common/meta/flight/src/flight_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_meta_types::MetaVersion;
use common_meta_types::PrefixListReply;
use common_meta_types::TableInfo;
use common_meta_types::UpsertKVActionReply;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand Down Expand Up @@ -71,6 +72,7 @@ pub enum MetaFlightAction {
GetTableExt(GetTableExtReq),
GetTables(GetTablesAction),
GetDatabases(GetDatabasesAction),
CommitTable(UpsertTableOptionReq),

// general purpose kv
UpsertKV(UpsertKVAction),
Expand Down Expand Up @@ -254,6 +256,19 @@ action_declare!(
MetaFlightAction::GetTableExt
);

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct UpsertTableOptionReq {
pub table_id: MetaId,
pub table_version: MetaVersion,
pub option_key: String,
pub option_value: String,
}
action_declare!(
UpsertTableOptionReq,
UpsertTableOptionReply,
MetaFlightAction::CommitTable
);

// - get tables
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct GetTablesAction {
Expand Down
22 changes: 15 additions & 7 deletions common/meta/flight/src/impls/meta_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
use std::sync::Arc;

use common_meta_api::MetaApi;
use common_meta_types::CommitTableReply;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateTableReply;
use common_meta_types::DatabaseInfo;
use common_meta_types::MetaId;
use common_meta_types::MetaVersion;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand All @@ -38,6 +38,7 @@ use crate::GetTableAction;
use crate::GetTableExtReq;
use crate::GetTablesAction;
use crate::MetaFlightClient;
use crate::UpsertTableOptionReq;

#[async_trait::async_trait]
impl MetaApi for MetaFlightClient {
Expand Down Expand Up @@ -101,13 +102,20 @@ impl MetaApi for MetaFlightClient {
self.do_action(GetTableExtReq { tbl_id, tbl_ver }).await
}

async fn commit_table(
async fn upsert_table_option(
&self,
_table_id: MetaId,
_new_table_version: MetaVersion,
_new_snapshot_location: String,
) -> common_exception::Result<CommitTableReply> {
todo!()
table_id: MetaId,
table_version: MetaVersion,
option_key: String,
option_value: String,
) -> common_exception::Result<UpsertTableOptionReply> {
self.do_action(UpsertTableOptionReq {
table_id,
table_version,
option_key,
option_value,
})
.await
}

fn name(&self) -> String {
Expand Down
1 change: 1 addition & 0 deletions common/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ impl StateMachine {
let table_id = self.incr_seq(SEQ_TABLE_ID).await?;
let table = Table {
table_id,
table_version: 0,
table_name: table_name.to_string(),
database_id: dbi.database_id,
db_name: db_name.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion common/meta/types/src/commit_table_reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
// limitations under the License.
//

pub type CommitTableReply = ();
pub type UpsertTableOptionReply = ();
2 changes: 1 addition & 1 deletion common/meta/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use cluster::Node;
pub use cluster::NodeInfo;
pub use cluster::Slot;
pub use cmd::Cmd;
pub use commit_table_reply::CommitTableReply;
pub use commit_table_reply::UpsertTableOptionReply;
pub use common_meta_sled_store::KVMeta;
pub use common_meta_sled_store::KVValue;
pub use common_meta_sled_store::SeqValue;
Expand Down
2 changes: 2 additions & 0 deletions common/meta/types/src/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::MetaVersion;
pub struct Table {
pub table_id: u64,

pub table_version: u64,

/// name of this table
pub table_name: String,

Expand Down
1 change: 1 addition & 0 deletions metasrv/src/executor/action_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl ActionHandler {
MetaFlightAction::GetTable(a) => s.serialize(self.handle(a).await?),
MetaFlightAction::GetTables(a) => s.serialize(self.handle(a).await?),
MetaFlightAction::GetTableExt(a) => s.serialize(self.handle(a).await?),
MetaFlightAction::CommitTable(a) => s.serialize(self.handle(a).await?),
}
}
}
21 changes: 20 additions & 1 deletion metasrv/src/executor/meta_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_meta_flight::GetDatabasesAction;
use common_meta_flight::GetTableAction;
use common_meta_flight::GetTableExtReq;
use common_meta_flight::GetTablesAction;
use common_meta_flight::UpsertTableOptionReq;
use common_meta_raft_store::state_machine::AppliedState;
use common_meta_types::Cmd::CreateDatabase;
use common_meta_types::Cmd::CreateTable;
Expand All @@ -41,6 +42,7 @@ use common_meta_types::DatabaseInfo;
use common_meta_types::LogEntry;
use common_meta_types::Table;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use log::info;

use crate::executor::action_handler::RequestHandler;
Expand Down Expand Up @@ -156,6 +158,7 @@ impl RequestHandler<CreateTableAction> for ActionHandler {

let table = Table {
table_id: 0,
table_version: 0,
table_name: table_name.to_string(),
database_id: 0, // this field is unused during the creation of table
db_name: db_name.to_string(),
Expand Down Expand Up @@ -277,7 +280,7 @@ impl RequestHandler<GetTableAction> for ActionHandler {
let rst = TableInfo {
database_id: db.database_id,
table_id: table.table_id,
version: 0, // placeholder, not yet implemented in meta service
version: table.table_version,
db: db_name.clone(),
name: table_name.clone(),
schema: Arc::new(arrow_schema.into()),
Expand Down Expand Up @@ -348,3 +351,19 @@ impl RequestHandler<GetTablesAction> for ActionHandler {
Ok(res)
}
}
#[async_trait::async_trait]
impl RequestHandler<UpsertTableOptionReq> for ActionHandler {
async fn handle(
&self,
req: UpsertTableOptionReq,
) -> common_exception::Result<UpsertTableOptionReply> {
self.meta_node
.upsert_table_opt(
req.table_id,
req.table_version,
req.option_key,
req.option_value,
)
.await
}
}
29 changes: 29 additions & 0 deletions metasrv/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,35 @@ impl MetaNode {
sm.get_table(tid)
}

#[tracing::instrument(level = "debug", skip(self))]
pub async fn upsert_table_opt(
&self,
table_id: u64,
table_version: u64,
opt_key: String,
opt_value: String,
) -> common_exception::Result<()> {
// non-consensus modification, tobe fixed latter
let mut sm = self.sto.state_machine.write().await;
if let Some(tbl) = sm.tables.get_mut(&table_id) {
if tbl.table_version != table_version {
Err(ErrorCode::TableVersionMissMatch(format!(
"targeting version {}, current version {}",
table_version, tbl.table_version,
)))
} else {
tbl.table_options.insert(opt_key, opt_value);
tbl.table_version += 1;
Ok(())
}
} else {
Err(ErrorCode::UnknownTable(format!(
"unknown table of id {}",
table_id
)))
}
}

#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_kv(&self, key: &str) -> common_exception::Result<Option<SeqValue<KVValue>>> {
// inconsistent get: from local state machine
Expand Down
9 changes: 5 additions & 4 deletions query/src/catalogs/backends/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
use std::sync::Arc;

use common_exception::Result;
use common_meta_types::CommitTableReply;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateTableReply;
use common_meta_types::DatabaseInfo;
use common_meta_types::MetaId;
use common_meta_types::MetaVersion;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand Down Expand Up @@ -55,12 +55,13 @@ pub trait MetaApiSync: Send + Sync {
table_version: Option<MetaVersion>,
) -> Result<Arc<TableInfo>>;

fn commit_table(
fn upsert_table_option(
&self,
table_id: MetaId,
new_table_version: MetaVersion,
new_snapshot_location: String,
) -> Result<CommitTableReply>;
table_option_key: String,
table_option_value: String,
) -> Result<UpsertTableOptionReply>;

fn name(&self) -> String;
}
Loading