Skip to content

Commit

Permalink
Merge pull request #6602 from Xuanwo/refactor-stage
Browse files Browse the repository at this point in the history
feat: Allow create stage for different services
  • Loading branch information
BohuTANG authored Jul 13, 2022
2 parents 02edfba + 0449210 commit 80e09b3
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 184 deletions.
45 changes: 25 additions & 20 deletions common/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ pub enum CopyUnit<'a> {
/// UriLocation (a.k.a external location) can be used in `INTO` or `FROM`.
///
/// For examples: `'s3://example/path/to/dir' CONNECTION = (AWS_ACCESS_ID="admin" AWS_SECRET_KEY="admin")`
UriLocation {
protocol: String,
name: String,
path: String,
connection: BTreeMap<String, String>,
},
UriLocation(UriLocation),
/// Query can only be used as `FROM`.
///
/// For example:`(SELECT field_a,field_b FROM table)`
Expand Down Expand Up @@ -148,23 +143,33 @@ impl Display for CopyUnit<'_> {
CopyUnit::StageLocation { name, path } => {
write!(f, "@{name}{path}")
}
CopyUnit::UriLocation {
protocol,
name,
path,
connection,
} => {
write!(f, "'{protocol}://{name}{path}'")?;
if !connection.is_empty() {
write!(f, " CONNECTION = ( ")?;
write_space_seperated_map(f, connection)?;
write!(f, " )")?;
}
Ok(())
}
CopyUnit::UriLocation(v) => v.fmt(f),
CopyUnit::Query(query) => {
write!(f, "({query})")
}
}
}
}

/// UriLocation (a.k.a external location) can be used in `INTO` or `FROM`.
///
/// For examples: `'s3://example/path/to/dir' CONNECTION = (AWS_ACCESS_ID="admin" AWS_SECRET_KEY="admin")`
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UriLocation {
pub protocol: String,
pub name: String,
pub path: String,
pub connection: BTreeMap<String, String>,
}

impl Display for UriLocation {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "'{}://{}{}'", self.protocol, self.name, self.path)?;
if !self.connection.is_empty() {
write!(f, " CONNECTION = ( ")?;
write_space_seperated_map(f, &self.connection)?;
write!(f, " )")?;
}
Ok(())
}
}
27 changes: 6 additions & 21 deletions common/ast/src/ast/statements/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

use crate::ast::UriLocation;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateStageStmt {
pub if_not_exists: bool,
pub stage_name: String,

pub location: String,
pub credential_options: BTreeMap<String, String>,
pub encryption_options: BTreeMap<String, String>,
pub location: Option<UriLocation>,

pub file_format_options: BTreeMap<String, String>,
pub on_error: String,
Expand All @@ -40,24 +40,9 @@ impl Display for CreateStageStmt {
}
write!(f, " {}", self.stage_name)?;

if !self.location.is_empty() {
write!(f, " URL = '{}'", self.location)?;

if !self.credential_options.is_empty() {
write!(f, " CREDENTIALS = (")?;
for (k, v) in self.credential_options.iter() {
write!(f, " {} = '{}'", k, v)?;
}
write!(f, " )")?;
}

if !self.encryption_options.is_empty() {
write!(f, " ENCRYPTION = (")?;
for (k, v) in self.encryption_options.iter() {
write!(f, " {} = '{}'", k, v)?;
}
write!(f, " )")?;
}
if let Some(ul) = &self.location {
write!(f, " URL = ")?;
write!(f, "{ul}")?;
}

if !self.file_format_options.is_empty() {
Expand Down
101 changes: 48 additions & 53 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,14 +565,11 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
);

// stages
let create_stage = map(
let create_stage = map_res(
rule! {
CREATE ~ STAGE ~ ( IF ~ NOT ~ EXISTS )?
~ #ident
~ ( URL ~ "=" ~ #literal_string
~ (CREDENTIALS ~ "=" ~ #options)?
~ (ENCRYPTION ~ "=" ~ #options)?
)?
~ ( URL ~ "=" ~ #uri_location)?
~ ( FILE_FORMAT ~ "=" ~ #options)?
~ ( ON_ERROR ~ "=" ~ #ident)?
~ ( SIZE_LIMIT ~ "=" ~ #literal_u64)?
Expand All @@ -591,22 +588,10 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
validation_mode_opt,
comment_opt,
)| {
let (location, credential_options, encryption_options) = url_opt
.map(|(_, _, url, c, e)| {
(
url,
c.map(|v| v.2).unwrap_or_default(),
e.map(|v| v.2).unwrap_or_default(),
)
})
.unwrap_or_default();

Statement::CreateStage(CreateStageStmt {
Ok(Statement::CreateStage(CreateStageStmt {
if_not_exists: opt_if_not_exists.is_some(),
stage_name: stage.to_string(),
location,
credential_options,
encryption_options,
location: url_opt.map(|v| v.2),
file_format_options: file_format_opt
.map(|(_, _, file_format_opt)| file_format_opt)
.unwrap_or_default(),
Expand All @@ -616,7 +601,7 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
.map(|v| v.2.to_string())
.unwrap_or_default(),
comments: comment_opt.map(|v| v.2).unwrap_or_default(),
})
}))
},
);

Expand Down Expand Up @@ -662,8 +647,8 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
let copy_into = map(
rule! {
COPY
~ INTO ~ #copy_target
~ FROM ~ #copy_target
~ INTO ~ #copy_unit
~ FROM ~ #copy_unit
~ ( FILES ~ "=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")")?
~ ( PATTERN ~ "=" ~ #literal_string)?
~ ( FILE_FORMAT ~ "=" ~ #options)?
Expand Down Expand Up @@ -1074,12 +1059,12 @@ pub fn kill_target(i: Input) -> IResult<KillTarget> {
))(i)
}

/// Parse input into `CopyTarget`
/// Parse input into `CopyUnit`
///
/// # Notes
///
/// It's required to parse stage location first. Or stage could be parsed as table.
pub fn copy_target(i: Input) -> IResult<CopyUnit> {
pub fn copy_unit(i: Input) -> IResult<CopyUnit> {
// Parse input like `@my_stage/path/to/dir`
let stage_location = |i| {
map(at_string, |location| {
Expand Down Expand Up @@ -1118,48 +1103,58 @@ pub fn copy_target(i: Input) -> IResult<CopyUnit> {
};

// Parse input like `'s3://example/path/to/dir' CREDENTIALS = (AWS_ACCESS_ID="admin" AWS_SECRET_KEY="admin")`
let uri_location = |i| {
let inner_uri_location = |i| {
map_res(
rule! {
#literal_string
~ (CONNECTION ~ "=" ~ #options)?
~ (CREDENTIALS ~ "=" ~ #options)?
~ (ENCRYPTION ~ "=" ~ #options)?
},
|(location, connection_opt, credentials_opt, encryption_opt)| {
let parsed =
Url::parse(&location).map_err(|_| ErrorKind::Other("invalid uri location"))?;

// TODO: We will use `CONNECTION` to replace `CREDENTIALS` and `ENCRYPTION`.
let mut conn = connection_opt.map(|v| v.2).unwrap_or_default();
conn.extend(credentials_opt.map(|v| v.2).unwrap_or_default());
conn.extend(encryption_opt.map(|v| v.2).unwrap_or_default());

Ok(CopyUnit::UriLocation {
protocol: parsed.scheme().to_string(),
name: parsed
.host_str()
.ok_or(ErrorKind::Other("invalid uri location"))?
.to_string(),
path: if parsed.path().is_empty() {
"/".to_string()
} else {
parsed.path().to_string()
},
connection: conn,
})
#uri_location
},
|v| Ok(CopyUnit::UriLocation(v)),
)(i)
};

rule!(
#stage_location: "@<stage_name> { <path> }"
| #uri_location: "'<protocol>://<name> {<path>} { CONNECTION = ({ AWS_ACCESS_KEY = 'aws_access_key' }) } '"
| #inner_uri_location: "'<protocol>://<name> {<path>} { CONNECTION = ({ AWS_ACCESS_KEY = 'aws_access_key' }) } '"
| #table: "{ { <catalog>. } <database>. }<table>"
| #query: "( <query> )"
)(i)
}

/// Parse input into `UriLocation`
pub fn uri_location(i: Input) -> IResult<UriLocation> {
map_res(
rule! {
#literal_string
~ (CONNECTION ~ "=" ~ #options)?
~ (CREDENTIALS ~ "=" ~ #options)?
~ (ENCRYPTION ~ "=" ~ #options)?
},
|(location, connection_opt, credentials_opt, encryption_opt)| {
let parsed =
Url::parse(&location).map_err(|_| ErrorKind::Other("invalid uri location"))?;

// TODO: We will use `CONNECTION` to replace `CREDENTIALS` and `ENCRYPTION`.
let mut conn = connection_opt.map(|v| v.2).unwrap_or_default();
conn.extend(credentials_opt.map(|v| v.2).unwrap_or_default());
conn.extend(encryption_opt.map(|v| v.2).unwrap_or_default());

Ok(UriLocation {
protocol: parsed.scheme().to_string(),
name: parsed
.host_str()
.ok_or(ErrorKind::Other("invalid uri location"))?
.to_string(),
path: if parsed.path().is_empty() {
"/".to_string()
} else {
parsed.path().to_string()
},
connection: conn,
})
},
)(i)
}

pub fn show_limit(i: Input) -> IResult<ShowLimit> {
let limit_like = map(
rule! {
Expand Down
83 changes: 48 additions & 35 deletions common/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4429,18 +4429,23 @@ Query(
---------- Input ----------
CREATE STAGE IF NOT EXISTS test_stage url='s3://load/files/' credentials=(aws_key_id='1a2b3c' aws_secret_key='4x5y6z') file_format=(FORMAT = CSV compression = GZIP record_delimiter=',')
---------- Output ---------
CREATE STAGE IF NOT EXISTS test_stage URL = 's3://load/files/' CREDENTIALS = ( aws_key_id = '1a2b3c' aws_secret_key = '4x5y6z' ) FILE_FORMAT = ( compression = 'GZIP' format = 'CSV' record_delimiter = ',' )
CREATE STAGE IF NOT EXISTS test_stage URL = 's3://load/files/' CONNECTION = ( aws_key_id='1a2b3c' aws_secret_key='4x5y6z' ) FILE_FORMAT = ( compression = 'GZIP' format = 'CSV' record_delimiter = ',' )
---------- AST ------------
CreateStage(
CreateStageStmt {
if_not_exists: true,
stage_name: "test_stage",
location: "s3://load/files/",
credential_options: {
"aws_key_id": "1a2b3c",
"aws_secret_key": "4x5y6z",
},
encryption_options: {},
location: Some(
UriLocation {
protocol: "s3",
name: "load",
path: "/files/",
connection: {
"aws_key_id": "1a2b3c",
"aws_secret_key": "4x5y6z",
},
},
),
file_format_options: {
"compression": "GZIP",
"format": "CSV",
Expand Down Expand Up @@ -5300,12 +5305,14 @@ COPY INTO mytable FROM 's3://mybucket/data.csv' FILE_FORMAT = ( field_delimiter
---------- AST ------------
Copy(
CopyStmt {
src: UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {},
},
src: UriLocation(
UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {},
},
),
dst: Table {
catalog: None,
database: None,
Expand Down Expand Up @@ -5348,14 +5355,16 @@ COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url='htt
---------- AST ------------
Copy(
CopyStmt {
src: UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {
"endpoint_url": "http://127.0.0.1:9900",
src: UriLocation(
UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {
"endpoint_url": "http://127.0.0.1:9900",
},
},
},
),
dst: Table {
catalog: None,
database: None,
Expand Down Expand Up @@ -5447,12 +5456,14 @@ Copy(
span: Ident(56..63),
},
},
dst: UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {},
},
dst: UriLocation(
UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {},
},
),
files: [],
pattern: "",
file_format: {
Expand Down Expand Up @@ -5533,16 +5544,18 @@ COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( aws_key_id='acces
---------- AST ------------
Copy(
CopyStmt {
src: UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {
"aws_key_id": "access_key",
"aws_secret_key": "secret_key",
"master_key": "master_key",
src: UriLocation(
UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {
"aws_key_id": "access_key",
"aws_secret_key": "secret_key",
"master_key": "master_key",
},
},
},
),
dst: Table {
catalog: None,
database: None,
Expand Down
Loading

1 comment on commit 80e09b3

@vercel
Copy link

@vercel vercel bot commented on 80e09b3 Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.rs
databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.vercel.app

Please sign in to comment.