Skip to content

Commit

Permalink
Merge branch 'main' into copy_optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 19, 2022
2 parents 49a2f46 + d0006ac commit 26e9207
Show file tree
Hide file tree
Showing 53 changed files with 1,300 additions and 271 deletions.
71 changes: 67 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions docs/doc/30-reference/00-api/00-rest.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ This handler return results in `pages` with long-polling.
of type `QueryResponse`.
2. Use fields of `QueryResponse` for further processing:
1. A `GET` to the `next_uri` returns the next `page` of query results. It returns `QueryResponse` too, processing it
the same way recursively until `next_uri` is nil.
2. A `GET` to the `final_uri` finally after all results is fetched (`next_uri = nil`) or the remaining is not
needed. Return empty body.
the same way until `next_uri` is null.
2. (optional) A `GET` to the `kill_uri` to kill the query. Return empty body.
3. (optional) A `GET` to the `stats_uri` to get stats only at once (without long-polling), return `QueryResponse`
with empty `data` field.

Expand Down Expand Up @@ -81,17 +80,11 @@ you are expected to get JSON like this (formatted):
"running_time_ms": 466.85395800000003
},
"stats_uri": "/v1/query/3cd25ab7-c3a4-42ce-9e02-e1b354d91f06",
"final_uri": "/v1/query/3cd25ab7-c3a4-42ce-9e02-e1b354d91f06/kill?delete=true",
"next_uri": null
"affect": null
}
```

Note:

1. next_uri is null because all data is returned.
1. client should call final_uri to tell the server the client has received the results and server can delete them.


## Query Request

Expand Down
7 changes: 4 additions & 3 deletions src/common/base/src/base/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,10 @@ impl Runtime {
{
let semaphore = Arc::new(semaphore);
let iter = futures.into_iter().map(|v| {
|permit| {
let _permit = permit;
v
|permit| async {
let r = v.await;
drop(permit);
r
}
});
self.try_spawn_batch_with_owned_semaphore(semaphore, iter)
Expand Down
2 changes: 2 additions & 0 deletions src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ opendal = { version = "0.19", features = [
"layers-metrics",
"services-ipfs",
"services-ftp",
"services-moka",
"services-redis",
"compress",
] }
percent-encoding = "2.2.0"
Expand Down
8 changes: 8 additions & 0 deletions src/common/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub enum StorageParams {
Http(StorageHttpConfig),
Ipfs(StorageIpfsConfig),
Memory,
Moka(StorageMokaConfig),
Obs(StorageObsConfig),
Oss(StorageOssConfig),
S3(StorageS3Config),
Expand Down Expand Up @@ -85,6 +86,7 @@ impl Display for StorageParams {
write!(f, "ipfs://endpoint={},root={}", c.endpoint_url, c.root)
}
StorageParams::Memory => write!(f, "memory://"),
StorageParams::Moka(_) => write!(f, "moka://"),
StorageParams::Obs(v) => write!(
f,
"obs://bucket={},root={},endpoint={}",
Expand Down Expand Up @@ -120,6 +122,7 @@ impl StorageParams {
StorageParams::Http(v) => v.endpoint_url.starts_with("https://"),
StorageParams::Ipfs(c) => c.endpoint_url.starts_with("https://"),
StorageParams::Memory => false,
StorageParams::Moka(_) => false,
StorageParams::Obs(v) => v.endpoint_url.starts_with("https://"),
StorageParams::Oss(v) => v.endpoint_url.starts_with("https://"),
StorageParams::S3(v) => v.endpoint_url.starts_with("https://"),
Expand Down Expand Up @@ -355,6 +358,7 @@ impl Debug for StorageObsConfig {
.finish()
}
}

/// config for Aliyun Object Storage Service
#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct StorageOssConfig {
Expand All @@ -380,6 +384,10 @@ impl Debug for StorageOssConfig {
}
}

/// config for Moka Object Storage Service
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct StorageMokaConfig {}

static SHARE_TABLE_CONFIG: OnceCell<Singleton<ShareTableConfig>> = OnceCell::new();

#[derive(Clone)]
Expand Down
1 change: 1 addition & 0 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use config::STORAGE_S3_DEFAULT_ENDPOINT;

mod operator;
pub use operator::init_operator;
pub use operator::CacheOperator;
pub use operator::StorageOperator;

mod location;
Expand Down
77 changes: 64 additions & 13 deletions src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use opendal::services::ftp;
use opendal::services::gcs;
use opendal::services::http;
use opendal::services::memory;
use opendal::services::moka;
use opendal::services::obs;
use opendal::services::oss;
use opendal::services::s3;
Expand All @@ -45,6 +46,7 @@ use super::StorageParams;
use super::StorageS3Config;
use crate::config::StorageGcsConfig;
use crate::config::StorageHttpConfig;
use crate::config::StorageMokaConfig;
use crate::config::StorageObsConfig;
use crate::StorageConfig;
use crate::StorageOssConfig;
Expand All @@ -61,14 +63,15 @@ pub fn init_operator(cfg: &StorageParams) -> Result<Operator> {
StorageParams::Http(cfg) => init_http_operator(cfg)?,
StorageParams::Ipfs(cfg) => init_ipfs_operator(cfg)?,
StorageParams::Memory => init_memory_operator()?,
StorageParams::Moka(cfg) => init_moka_operator(cfg)?,
StorageParams::Obs(cfg) => init_obs_operator(cfg)?,
StorageParams::S3(cfg) => init_s3_operator(cfg)?,
StorageParams::Oss(cfg) => init_oss_operator(cfg)?,
};

let op = op
// Add retry
.layer(RetryLayer::new(ExponentialBackoff::default()))
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
// Add metrics
.layer(MetricsLayer)
// Add logging
Expand Down Expand Up @@ -253,6 +256,29 @@ fn init_obs_operator(cfg: &StorageObsConfig) -> Result<Operator> {
Ok(Operator::new(builder.build()?))
}

/// init_oss_operator will init an opendal OSS operator with input oss config.
fn init_oss_operator(cfg: &StorageOssConfig) -> Result<Operator> {
let mut builder = oss::Builder::default();

// endpoint
let backend = builder
.endpoint(&cfg.endpoint_url)
.access_key_id(&cfg.access_key_id)
.access_key_secret(&cfg.access_key_secret)
.bucket(&cfg.bucket)
.root(&cfg.root)
.build()?;

Ok(Operator::new(backend))
}

/// init_moka_operator will init a moka operator.
fn init_moka_operator(_: &StorageMokaConfig) -> Result<Operator> {
let mut builder = moka::Builder::default();

Ok(Operator::new(builder.build()?))
}

#[derive(Clone, Debug)]
pub struct StorageOperator {
operator: Operator,
Expand Down Expand Up @@ -324,18 +350,43 @@ impl StorageOperator {
}
}

/// init_oss_operator will init an opendal OSS operator with input oss config.
fn init_oss_operator(cfg: &StorageOssConfig) -> Result<Operator> {
let mut builder = oss::Builder::default();
/// The operator for cache.
#[derive(Clone, Debug)]
pub struct CacheOperator {
op: Operator,
}

// endpoint
let backend = builder
.endpoint(&cfg.endpoint_url)
.access_key_id(&cfg.access_key_id)
.access_key_secret(&cfg.access_key_secret)
.bucket(&cfg.bucket)
.root(&cfg.root)
.build()?;
impl Deref for CacheOperator {
type Target = Operator;

Ok(Operator::new(backend))
fn deref(&self) -> &Self::Target {
&self.op
}
}

static CACHE_OPERATOR: OnceCell<Singleton<CacheOperator>> = OnceCell::new();

impl CacheOperator {
pub async fn init(
conf: &StorageConfig,
v: Singleton<CacheOperator>,
) -> common_exception::Result<()> {
v.init(Self::try_create(conf).await?)?;

CACHE_OPERATOR.set(v).ok();
Ok(())
}

pub async fn try_create(conf: &StorageConfig) -> common_exception::Result<CacheOperator> {
let op = init_operator(&conf.params)?;

Ok(CacheOperator { op })
}

pub fn instance() -> CacheOperator {
match CACHE_OPERATOR.get() {
None => panic!("StorageOperator is not init"),
Some(op) => op.get(),
}
}
}
Loading

0 comments on commit 26e9207

Please sign in to comment.