Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support table options #1044

Merged
merged 5 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::{TableId, TableInfoRef};
use table::requests::{CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest};
use table::requests::{
CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest, TableOptions,
};
use table::{Table, TableRef};

use crate::error::{
Expand Down Expand Up @@ -109,7 +111,7 @@ impl SystemCatalogTable {
region_numbers: vec![0],
primary_key_indices: vec![ENTRY_TYPE_INDEX, KEY_INDEX],
create_if_not_exists: true,
table_options: HashMap::new(),
table_options: TableOptions::default(),
};

let table = engine
Expand Down
1 change: 1 addition & 0 deletions src/catalog/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl TableEngine for MockTableEngine {
let table_id = TableId::from_str(
request
.table_options
.extra_options
.get("table_id")
.unwrap_or(&default_table_id),
)
Expand Down
10 changes: 7 additions & 3 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::schema::{ColumnSchema, RawSchema};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest};
use table::requests::{
AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, TableOptions,
};

use crate::error::{
ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu,
Result,
Result, UnrecognizedTableOptionSnafu,
};

/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
Expand Down Expand Up @@ -163,6 +165,8 @@ pub fn create_expr_to_request(
expr.region_ids
};

let table_options =
TableOptions::try_from(&expr.table_options).context(UnrecognizedTableOptionSnafu)?;
Ok(CreateTableRequest {
id: table_id,
catalog_name,
Expand All @@ -173,7 +177,7 @@ pub fn create_expr_to_request(
region_numbers: region_ids,
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options: expr.table_options,
table_options,
})
}

Expand Down
7 changes: 7 additions & 0 deletions src/common/grpc-expr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ pub enum Error {
#[snafu(backtrace)]
source: api::error::Error,
},

#[snafu(display("Unrecognized table option: {}", source))]
UnrecognizedTableOption {
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
#[snafu(backtrace)]
source: table::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -104,6 +110,7 @@ impl ErrorExt for Error {
Error::MissingField { .. } => StatusCode::InvalidArguments,
Error::ColumnDefaultConstraint { source, .. } => source.status_code(),
Error::InvalidColumnDef { source, .. } => source.status_code(),
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
Expand Down
7 changes: 7 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ pub enum Error {
backtrace: Backtrace,
source: object_store::Error,
},

#[snafu(display("Unrecognized table option: {}", source))]
UnrecognizedTableOption {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
#[snafu(backtrace)]
source: table::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -463,6 +469,7 @@ impl ErrorExt for Error {
ColumnDefaultValue { source, .. } => source.status_code(),
CopyTable { source, .. } => source.status_code(),
TableScanExec { source, .. } => source.status_code(),
UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
39 changes: 37 additions & 2 deletions src/datanode/src/sql/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_telemetry::tracing::log::error;
use datatypes::schema::RawSchema;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{ColumnOption, TableConstraint};
use sql::ast::{ColumnOption, SqlOption, TableConstraint, Value};
use sql::statements::column_def_to_schema;
use sql::statements::create::CreateTable;
use store_api::storage::consts::TIME_INDEX_NAME;
Expand All @@ -33,6 +33,7 @@ use crate::error::{
self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu,
IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu,
RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
UnrecognizedTableOptionSnafu,
};
use crate::sql::SqlHandler;

Expand Down Expand Up @@ -238,6 +239,7 @@ impl SqlHandler {
})
.collect::<Result<Vec<_>>>()?;

let table_options = stmt_options_to_table_options(&stmt.options)?;
let schema = RawSchema::new(columns_schemas);
let request = CreateTableRequest {
id: table_id,
Expand All @@ -249,16 +251,32 @@ impl SqlHandler {
region_numbers: vec![0],
primary_key_indices: primary_keys,
create_if_not_exists: stmt.if_not_exists,
table_options: HashMap::new(),
table_options,
};
Ok(request)
}
}

fn stmt_options_to_table_options(opts: &[SqlOption]) -> error::Result<TableOptions> {
let mut map = HashMap::with_capacity(opts.len());
for SqlOption { name, value } in opts {
let value_str = match value {
Value::SingleQuotedString(s) => s.clone(),
Value::DoubleQuotedString(s) => s.clone(),
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
_ => value.to_string(),
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
};
map.insert(name.value.clone(), value_str);
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
}
let options = TableOptions::try_from(&map).context(UnrecognizedTableOptionSnafu)?;
Ok(options)
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::Schema;
use sql::dialect::GenericDialect;
Expand All @@ -280,6 +298,23 @@ mod tests {
}
}

#[tokio::test]
async fn test_create_table_with_options() {
let sql = r#"CREATE TABLE demo_table (timestamp BIGINT TIME INDEX, value DOUBLE, host STRING PRIMARY KEY) engine=mito with(regions=1, ttl='7days',write_buffer_size='32MB',some='other');"#;
let parsed_stmt = sql_to_statement(sql);
let handler = create_mock_sql_handler().await;
let c = handler
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap();

assert_eq!(Some(Duration::from_secs(604800)), c.table_options.ttl);
assert_eq!(
Some(ReadableSize::mb(32)),
c.table_options.write_buffer_size
);
assert_eq!("other", c.table_options.extra_options.get("some").unwrap());
}

#[tokio::test]
pub async fn test_create_with_inline_primary_key() {
let handler = create_mock_sql_handler().await;
Expand Down
5 changes: 2 additions & 3 deletions src/datanode/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
Expand All @@ -26,7 +25,7 @@ use query::QueryEngineFactory;
use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
use table::requests::{CreateTableRequest, TableOptions};
use tempdir::TempDir;

use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, WalConfig};
Expand Down Expand Up @@ -107,7 +106,7 @@ pub(crate) async fn create_test_table(
schema: RawSchema::new(column_schemas),
create_if_not_exists: true,
primary_key_indices: vec![0], // "host" is in primary keys
table_options: HashMap::new(),
table_options: TableOptions::default(),
region_numbers: vec![0],
},
)
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ pub enum Error {

#[snafu(display("Illegal primary keys definition: {}", msg))]
IllegalPrimaryKeysDef { msg: String, backtrace: Backtrace },

#[snafu(display("Unrecognized table option: {}", source))]
UnrecognizedTableOption {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
#[snafu(backtrace)]
source: table::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -427,6 +433,7 @@ impl ErrorExt for Error {
Error::DeserializePartition { source, .. } | Error::FindTableRoute { source, .. } => {
source.status_code()
}
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
52 changes: 48 additions & 4 deletions src/frontend/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ use datanode::instance::sql::table_idents_to_full_name;
use datatypes::schema::ColumnSchema;
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::ast::{ColumnDef, ColumnOption, TableConstraint};
use sql::ast::{ColumnDef, ColumnOption, SqlOption, TableConstraint, Value};
use sql::statements::column_def_to_schema;
use sql::statements::create::{CreateTable, TIME_INDEX};
use table::requests::TableOptions;

use crate::error::{
self, BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu,
ConvertColumnDefaultConstraintSnafu, IllegalPrimaryKeysDefSnafu, InvalidSqlSnafu,
ParseSqlSnafu, Result,
ParseSqlSnafu, Result, UnrecognizedTableOptionSnafu,
};

pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
Expand Down Expand Up @@ -82,6 +83,7 @@ pub(crate) fn create_to_expr(
.context(error::ExternalSnafu)?;

let time_index = find_time_index(&create.constraints)?;
let table_options = HashMap::from(&stmt_options_to_table_options(&create.options)?);
let expr = CreateTableExpr {
catalog_name,
schema_name,
Expand All @@ -91,8 +93,7 @@ pub(crate) fn create_to_expr(
time_index,
primary_keys: find_primary_keys(&create.columns, &create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine.clone())]),
table_options,
table_id: None,
region_ids: vec![],
};
Expand Down Expand Up @@ -218,3 +219,46 @@ fn columns_to_expr(
})
.collect()
}

// TODO(hl): This function is intentionally duplicated with that one in src/datanode/src/sql/create.rs:261
// since we are going to remove the statement parsing stuff from datanode.
// Refer: https://github.com/GreptimeTeam/greptimedb/issues/1010
fn stmt_options_to_table_options(opts: &[SqlOption]) -> error::Result<TableOptions> {
let mut map = HashMap::with_capacity(opts.len());
for SqlOption { name, value } in opts {
let value_str = match value {
Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => s.clone(),
_ => value.to_string(),
};
map.insert(name.value.clone(), value_str);
}
let options = TableOptions::try_from(&map).context(UnrecognizedTableOptionSnafu)?;
Ok(options)
}

#[cfg(test)]
mod tests {
use session::context::QueryContext;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;

use super::*;

#[test]
fn test_create_to_expr() {
let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(regions=1, ttl='3days', write_buffer_size='1024KB');";
let stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
.unwrap()
.pop()
.unwrap();

let Statement::CreateTable(create_table) = stmt else { unreachable!() };
let expr = create_to_expr(&create_table, Arc::new(QueryContext::default())).unwrap();
assert_eq!("3days", expr.table_options.get("ttl").unwrap());
assert_eq!(
"1.0MiB",
expr.table_options.get("write_buffer_size").unwrap()
);
}
}
6 changes: 4 additions & 2 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use sql::statements::create::Partitions;
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use table::table::AlterContext;

use crate::catalog::FrontendCatalogManager;
Expand All @@ -62,7 +63,7 @@ use crate::error::{
ColumnDataTypeSnafu, DeserializePartitionSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
StartMetaClientSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu,
ToTableInsertRequestSnafu,
ToTableInsertRequestSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::parse_stmt;
Expand Down Expand Up @@ -605,7 +606,8 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result<RawTableInfo> {
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: HashMap::new(),
options: TableOptions::try_from(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?,
created_on: DateTime::default(),
};

Expand Down
3 changes: 2 additions & 1 deletion src/meta-client/examples/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use meta_client::rpc::{
PutRequest, RangeRequest, TableName,
};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use tracing::{event, subscriber, Level};
use tracing_subscriber::FmtSubscriber;

Expand Down Expand Up @@ -177,7 +178,7 @@ fn new_table_info() -> RawTableInfo {
next_column_id: 0,
region_numbers: vec![],
engine_options: HashMap::new(),
options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
},
table_type: TableType::Base,
Expand Down
3 changes: 2 additions & 1 deletion src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ mod tests {
use meta_srv::selector::{Namespace, Selector};
use meta_srv::Result as MetaResult;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;

use super::*;
use crate::mocks;
Expand Down Expand Up @@ -469,7 +470,7 @@ mod tests {
next_column_id: 0,
region_numbers: vec![],
engine_options: HashMap::new(),
options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
},
table_type: TableType::Base,
Expand Down
Loading