Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLI] more robust load interface #2629

Merged
merged 5 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ miri:
cargo miri setup
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test

cluster: build
mkdir -p ./.databend/local/bin/test
cp ./target/release/databend-query ./.databend/local/bin/test/databend-query
cp ./target/release/databend-meta ./.databend/local/bin/test/databend-meta
./target/release/bendctl cluster create --databend_dir ./.databend --group local --version test --force
run: build
bash ./scripts/deploy/databend-query-standalone.sh release

Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async-std = "1.10.0"
tryhard = "0.4.0"
rayon = "1.5.1"
tokio-util = "0.6.9"
csv = "1.1"

[dev-dependencies]
pretty_assertions = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion cli/docs/ontime/build_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,4 @@ CREATE TABLE ontime
Div5LongestGTime String,
Div5WheelsOff String,
Div5TailNum String
) ENGINE = CSV location = {{ .csv_location }};
) ENGINE = FUSE;
106 changes: 55 additions & 51 deletions cli/src/cmds/loads/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,14 @@ use clap::App;
use clap::AppSettings;
use clap::Arg;
use clap::ArgMatches;
use common_base::tokio::fs::File;
use common_base::tokio::io::AsyncBufReadExt;
use common_base::tokio::io::AsyncRead;
use common_base::tokio::io::BufReader;
use common_base::tokio::macros::support::Pin;
use common_base::tokio::time;
// Lets us call into_async_read() to convert a futures::stream::Stream into a
// futures::io::AsyncRead.
use futures::stream::TryStreamExt;
use itertools::Itertools;
use lexical_util::num::AsPrimitive;
use num_format::Locale;
use num_format::ToFormattedString;
use rayon::prelude::*;
use tokio_util::compat::FuturesAsyncReadCompatExt;

use crate::cmds::clusters::cluster::ClusterProfile;
use crate::cmds::command::Command;
Expand Down Expand Up @@ -151,11 +144,11 @@ impl LoadCommand {
.required(false),
)
.arg(
Arg::new("skip-head-lines").long("skip-head-lines")
.about("skip head line in file for example: \
bendctl load test.csv --skip-head-lines 10 would ignore the first ten lines in csv file")
.takes_value(true)
.required(false),
Arg::new("with_header").long("with_header")
.about("state on whether CSV has dataset header for example: \
bendctl load test.csv --with_header true would ignore the first ten lines in csv file")
.required(false)
.takes_value(false),
)
.arg(
Arg::new("table").long("table")
Expand All @@ -169,17 +162,9 @@ impl LoadCommand {
async fn local_exec_match(&self, writer: &mut Writer, args: &ArgMatches) -> Result<()> {
match self.local_exec_precheck(args).await {
Ok(_) => {
let mut reader = build_reader(args.value_of("load")).await.lines();
for _ in 0..args
.value_of("skip-head-lines")
.unwrap_or("0")
.parse::<usize>()
.unwrap()
{
if reader.next_line().await?.is_none() {
return Ok(());
}
}
let mut reader =
build_reader(args.value_of("load"), args.value_of("with_header")).await;
let mut record = reader.records();
let table = args.value_of("table").unwrap();
let schema = args.value_of("schema");
let table_format = match schema {
Expand All @@ -202,9 +187,17 @@ impl LoadCommand {
let mut batch = vec![];
// possible optimization is to run iterator in parallel
for _ in 0..100_000 {
if let Some(line) = reader.next_line().await? {
batch.push(line);
count += 1;
if let Some(line) = record.next() {
if let Ok(line) = line {
batch.push(line);
count += 1;
} else {
writer.write_err(format!(
"cannot read csv line {}, error: {}",
count,
line.unwrap_err()
))
}
} else {
break;
}
Expand All @@ -215,6 +208,17 @@ impl LoadCommand {
let values = batch
.into_iter()
.par_bridge()
.map(|s| {
s.iter()
.map(|i| {
if i.trim().is_empty() {
"null".to_string()
} else {
"'".to_owned() + i + &*"'".to_owned()
}
})
.join(",")
})
.map(|e| format!("({})", e.trim()))
.filter(|e| !e.trim().is_empty())
.reduce_with(|a, b| format!("{}, {}", a, b));
Expand All @@ -228,6 +232,7 @@ impl LoadCommand {
}
}
}

let elapsed = start.elapsed();
let time = elapsed.as_millis() as f64 / 1000f64;
writer.write_ok(format!(
Expand All @@ -238,6 +243,7 @@ impl LoadCommand {
.to_formatted_string(&Locale::en),
time
));

Ok(())
}
Err(e) => {
Expand Down Expand Up @@ -281,43 +287,41 @@ impl LoadCommand {
}
}

async fn build_reader(load: Option<&str>) -> BufReader<Pin<Box<dyn AsyncRead + Send>>> {
async fn build_reader(
load: Option<&str>,
header: Option<&str>,
) -> csv::Reader<Box<dyn std::io::Read + Send + Sync>> {
let header = header.is_some();
match load {
Some(val) => {
if Path::new(val).exists() {
let f = File::open(val)
.await
.expect("cannot open file: permission denied");
BufReader::new(Box::pin(f))
let f = std::fs::File::open(val).expect("cannot open file: permission denied");
csv::ReaderBuilder::new()
.has_headers(header)
.from_reader(Box::new(f))
} else if val.contains("://") {
// Attempt to download ferris..
let target = reqwest::get(val)
.await
.expect("cannot connect to target url")
.error_for_status()
.expect("return code is not OK"); // generate an error if server didn't respond OK

// Convert the body of the response into a futures::io::Stream.
let target_stream = target.bytes_stream();

// Convert the stream into an futures::io::AsyncRead.
// We must first convert the reqwest::Error into an futures::io::Error.
let target_stream = target_stream
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
.into_async_read();

// Convert the futures::io::AsyncRead into a tokio::io::AsyncRead.
let target_stream = target_stream.compat();

BufReader::new(Box::pin(target_stream))
.expect("return code is not OK")
.text()
.await
.expect("cannot fetch for target"); // generate an error if server didn't respond
csv::ReaderBuilder::new()
.has_headers(header)
.from_reader(Box::new(Cursor::new(target)))
} else {
let bytes = val.to_string();
BufReader::new(Box::pin(Cursor::new(bytes.as_bytes().to_owned())))
csv::ReaderBuilder::new()
.has_headers(header)
.from_reader(Box::new(Cursor::new(val.to_string().as_bytes().to_owned())))
}
}
None => {
let io = common_base::tokio::io::stdin();
BufReader::new(Box::pin(io))
let io = std::io::stdin();
csv::ReaderBuilder::new()
.has_headers(header)
.from_reader(Box::new(io))
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions cli/src/cmds/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ impl LocalRuntime for LocalQueryConfig {
databend_query::configs::config_storage::STORAGE_TYPE,
conf.storage.storage_type,
)
.env(
databend_query::configs::config_meta::META_EMBEDDED_DIR,
"/tmp/embedded",
)
.env(
databend_query::configs::config_storage::DISK_STORAGE_DATA_PATH,
conf.storage.disk.data_path,
Expand Down
3 changes: 2 additions & 1 deletion query/src/configs/config_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::configs::Config;
pub const META_ADDRESS: &str = "META_ADDRESS";
pub const META_USERNAME: &str = "META_USERNAME";
pub const META_PASSWORD: &str = "META_PASSWORD";
pub const META_EMBEDDED_DIR: &str = "META_EMBEDDED_DIR";
pub const META_RPC_TLS_SERVER_ROOT_CA_CERT: &str = "META_RPC_TLS_SERVER_ROOT_CA_CERT";
pub const META_RPC_TLS_SERVICE_DOMAIN_NAME: &str = "META_RPC_TLS_SERVICE_DOMAIN_NAME";

Expand All @@ -31,7 +32,7 @@ pub const META_RPC_TLS_SERVICE_DOMAIN_NAME: &str = "META_RPC_TLS_SERVICE_DOMAIN_
#[derive(Clone, serde::Serialize, serde::Deserialize, PartialEq, StructOpt, StructOptToml)]
pub struct MetaConfig {
/// The dir to store persisted meta state for a embedded meta store
#[structopt(long, default_value = "./_meta_embedded")]
#[structopt(long, env = META_EMBEDDED_DIR, default_value = "./_meta_embedded")]
#[serde(default)]
pub meta_embedded_dir: String,

Expand Down