Skip to content

Commit

Permalink
Merge pull request #6579 from fkuner/feature-global-settings
Browse files Browse the repository at this point in the history
feat(setting): support global setting
  • Loading branch information
mergify[bot] authored Jul 13, 2022
2 parents e6fdb99 + 2e77de4 commit cdd8703
Show file tree
Hide file tree
Showing 26 changed files with 547 additions and 30 deletions.
13 changes: 12 additions & 1 deletion common/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum Statement<'a> {
},

SetVariable {
is_global: bool,
variable: Identifier<'a>,
value: Literal,
},
Expand Down Expand Up @@ -215,7 +216,17 @@ impl<'a> Display for Statement<'a> {
}
write!(f, " {object_id}")?;
}
Statement::SetVariable { variable, value } => write!(f, "SET {variable} = {value}")?,
Statement::SetVariable {
is_global,
variable,
value,
} => {
write!(f, "SET ")?;
if *is_global {
write!(f, "GLOBAL ")?;
}
write!(f, "{variable} = {value}")?;
}
Statement::ShowDatabases(stmt) => write!(f, "{stmt}")?,
Statement::ShowCreateDatabase(stmt) => write!(f, "{stmt}")?,
Statement::CreateDatabase(stmt) => write!(f, "{stmt}")?,
Expand Down
8 changes: 6 additions & 2 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
);
let set_variable = map(
rule! {
SET ~ #ident ~ "=" ~ #literal
SET ~ (GLOBAL)? ~ #ident ~ "=" ~ #literal
},
|(_, opt_is_global, variable, _, value)| Statement::SetVariable {
is_global: opt_is_global.is_some(),
variable,
value,
},
|(_, variable, _, value)| Statement::SetVariable { variable, value },
);
let show_databases = map(
rule! {
Expand Down
2 changes: 2 additions & 0 deletions common/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,8 @@ pub enum TokenKind {
FUSE,
#[token("GITHUB", ignore(ascii_case))]
GITHUB,
#[token("GLOBAL", ignore(ascii_case))]
GLOBAL,
#[token("GRAPH", ignore(ascii_case))]
GRAPH,
#[token("GROUP", ignore(ascii_case))]
Expand Down
3 changes: 3 additions & 0 deletions common/management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod cluster;
mod quota;
mod role;
mod serde;
mod setting;
mod stage;
mod udf;
mod user;
Expand All @@ -28,6 +29,8 @@ pub use role::RoleApi;
pub use role::RoleMgr;
pub use serde::deserialize_struct;
pub use serde::serialize_struct;
pub use setting::SettingApi;
pub use setting::SettingMgr;
pub use stage::StageApi;
pub use stage::StageMgr;
pub use udf::UdfApi;
Expand Down
19 changes: 19 additions & 0 deletions common/management/src/setting/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod setting_api;
mod setting_mgr;

pub use setting_api::SettingApi;
pub use setting_mgr::SettingMgr;
31 changes: 31 additions & 0 deletions common/management/src/setting/setting_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::Result;
use common_meta_types::SeqV;
use common_meta_types::UserSetting;

#[async_trait::async_trait]
pub trait SettingApi: Sync + Send {
// Add a setting to /tenant/cluster/setting-name.
async fn set_setting(&self, setting: UserSetting) -> Result<u64>;

// Get all the settings for tenant/cluster.
async fn get_settings(&self) -> Result<Vec<UserSetting>>;

async fn get_setting(&self, name: &str, seq: Option<u64>) -> Result<SeqV<UserSetting>>;

// Drop the setting by name.
async fn drop_setting(&self, name: &str, seq: Option<u64>) -> Result<()>;
}
113 changes: 113 additions & 0 deletions common/management/src/setting/setting_mgr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_api::KVApi;
use common_meta_types::IntoSeqV;
use common_meta_types::MatchSeq;
use common_meta_types::MatchSeqExt;
use common_meta_types::OkOrExist;
use common_meta_types::Operation;
use common_meta_types::SeqV;
use common_meta_types::UpsertKVReq;
use common_meta_types::UserSetting;

use crate::setting::SettingApi;

static USER_SETTING_API_KEY_PREFIX: &str = "__fd_settings";

pub struct SettingMgr {
kv_api: Arc<dyn KVApi>,
setting_prefix: String,
}

impl SettingMgr {
#[allow(dead_code)]
pub fn create(kv_api: Arc<dyn KVApi>, tenant: &str) -> Result<Self> {
Ok(SettingMgr {
kv_api,
setting_prefix: format!("{}/{}", USER_SETTING_API_KEY_PREFIX, tenant),
})
}
}

#[async_trait::async_trait]
impl SettingApi for SettingMgr {
async fn set_setting(&self, setting: UserSetting) -> Result<u64> {
// Upsert.
let seq = MatchSeq::Any;
let val = Operation::Update(serde_json::to_vec(&setting)?);
let key = format!("{}/{}", self.setting_prefix, setting.name);
let upsert = self
.kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, val, None));

let res = upsert.await?.into_add_result()?;

match res.res {
OkOrExist::Ok(v) => Ok(v.seq),
OkOrExist::Exists(v) => Ok(v.seq),
}
}

async fn get_settings(&self) -> Result<Vec<UserSetting>> {
let values = self.kv_api.prefix_list_kv(&self.setting_prefix).await?;

let mut settings = Vec::with_capacity(values.len());
for (_, value) in values {
let setting = serde_json::from_slice::<UserSetting>(&value.data)?;
settings.push(setting);
}
Ok(settings)
}

async fn get_setting(&self, name: &str, seq: Option<u64>) -> Result<SeqV<UserSetting>> {
let key = format!("{}/{}", self.setting_prefix, name);
let kv_api = self.kv_api.clone();
let get_kv = async move { kv_api.get_kv(&key).await };
let res = get_kv.await?;
let seq_value =
res.ok_or_else(|| ErrorCode::UnknownVariable(format!("Unknown setting {}", name)))?;

match MatchSeq::from(seq).match_seq(&seq_value) {
Ok(_) => Ok(seq_value.into_seqv()?),
Err(_) => Err(ErrorCode::UnknownVariable(format!(
"Unknown setting {}",
name
))),
}
}

async fn drop_setting(&self, name: &str, seq: Option<u64>) -> Result<()> {
let key = format!("{}/{}", self.setting_prefix, name);
let kv_api = self.kv_api.clone();
let upsert_kv = async move {
kv_api
.upsert_kv(UpsertKVReq::new(&key, seq.into(), Operation::Delete, None))
.await
};
let res = upsert_kv.await?;
if res.prev.is_some() && res.result.is_none() {
Ok(())
} else {
Err(ErrorCode::UnknownVariable(format!(
"Unknown setting {}",
name
)))
}
}
}
1 change: 1 addition & 0 deletions common/management/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod cluster;
mod setting;
mod stage;
mod udf;
mod user;
113 changes: 113 additions & 0 deletions common/management/tests/it/setting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_base::base::tokio;
use common_datavalues::DataValue;
use common_exception::Result;
use common_management::*;
use common_meta_api::KVApi;
use common_meta_embedded::MetaEmbedded;
use common_meta_types::SeqV;
use common_meta_types::UserSetting;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_set_setting() -> Result<()> {
let (kv_api, mgr) = new_setting_api().await?;

{
let setting = UserSetting::create("max_threads", DataValue::UInt64(3));
mgr.set_setting(setting.clone()).await?;
let value = kv_api
.get_kv("__fd_settings/databend_query/max_threads")
.await?;

match value {
Some(SeqV {
seq: 1,
meta: _,
data: value,
}) => {
assert_eq!(value, serde_json::to_vec(&setting)?);
}
catch => panic!("GetKVActionReply{:?}", catch),
}
}

// Set again.
{
let setting = UserSetting::create("max_threads", DataValue::UInt64(1));
mgr.set_setting(setting.clone()).await?;
let value = kv_api
.get_kv("__fd_settings/databend_query/max_threads")
.await?;

match value {
Some(SeqV {
seq: 2,
meta: _,
data: value,
}) => {
assert_eq!(value, serde_json::to_vec(&setting)?);
}
catch => panic!("GetKVActionReply{:?}", catch),
}
}

// Get settings.
{
let expect = vec![UserSetting::create("max_threads", DataValue::UInt64(1))];
let actual = mgr.get_settings().await?;
assert_eq!(actual, expect);
}

// Get setting.
{
let expect = UserSetting::create("max_threads", DataValue::UInt64(1));
let actual = mgr.get_setting("max_threads", None).await?;
assert_eq!(actual.data, expect);
}

// Drop setting.
{
mgr.drop_setting("max_threads", None).await?;
}

// Get settings.
{
let actual = mgr.get_settings().await?;
assert_eq!(0, actual.len());
}

// Get setting.
{
let res = mgr.get_setting("max_threads", None).await;
assert!(res.is_err());
}

// Drop setting not exists.
{
let res = mgr.drop_setting("max_threads_not", None).await;
assert!(res.is_err());
}

Ok(())
}

async fn new_setting_api() -> Result<(Arc<MetaEmbedded>, SettingMgr)> {
let test_api = Arc::new(MetaEmbedded::new_temp().await?);
let mgr = SettingMgr::create(test_api.clone(), "databend_query")?;
Ok((test_api, mgr))
}
2 changes: 2 additions & 0 deletions common/meta/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod user_identity;
mod user_info;
mod user_privilege;
mod user_quota;
mod user_setting;
mod user_stage;

pub mod app_error;
Expand Down Expand Up @@ -179,4 +180,5 @@ pub use user_info::UserOptionFlag;
pub use user_privilege::UserPrivilegeSet;
pub use user_privilege::UserPrivilegeType;
pub use user_quota::UserQuota;
pub use user_setting::UserSetting;
pub use user_stage::*;
Loading

1 comment on commit cdd8703

@vercel
Copy link

@vercel vercel bot commented on cdd8703 Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.