Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make pushes concurrent and limit pull concurrency #72

Merged
merged 3 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ license = "Apache-2.0"
name = "oci-distribution"
readme = "README.md"
repository = "https://github.com/krustlet/oci-distribution"
version = "0.9.4"
version = "0.10.0"

[badges]
maintenance = {status = "actively-developed"}
Expand Down
104 changes: 69 additions & 35 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use crate::Reference;

use crate::errors::{OciDistributionError, Result};
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
use futures::stream::TryStreamExt;
use futures_util::future;
use futures_util::stream::StreamExt;
use futures_util::stream::{self, StreamExt, TryStreamExt};
use http::HeaderValue;
use http_auth::{parser::ChallengeParser, ChallengeRef};
use olpc_cjson::CanonicalFormatter;
Expand All @@ -43,6 +42,12 @@ const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[

const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;

/// Default value for `ClientConfig::max_concurrent_upload`
pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;

/// Default value for `ClientConfig::max_concurrent_download`
pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;

/// The data for an image or module.
#[derive(Clone)]
pub struct ImageData {
Expand Down Expand Up @@ -276,24 +281,26 @@ impl Client {
self.validate_layers(&manifest, accepted_media_types)
.await?;

let layers = manifest.layers.iter().map(|layer| {
// This avoids moving `self` which is &mut Self
// into the async block. We only want to capture
// as &Self
let this = &self;
async move {
let mut out: Vec<u8> = Vec::new();
debug!("Pulling image layer");
this.pull_blob(image, &layer.digest, &mut out).await?;
Ok::<_, OciDistributionError>(ImageLayer::new(
out,
layer.media_type.clone(),
layer.annotations.clone(),
))
}
});

let layers = future::try_join_all(layers).await?;
let layers = stream::iter(&manifest.layers)
.map(|layer| {
// This avoids moving `self` which is &mut Self
// into the async block. We only want to capture
// as &Self
let this = &self;
async move {
let mut out: Vec<u8> = Vec::new();
debug!("Pulling image layer");
this.pull_blob(image, &layer.digest, &mut out).await?;
Ok::<_, OciDistributionError>(ImageLayer::new(
out,
layer.media_type.clone(),
layer.annotations.clone(),
))
}
})
.buffer_unordered(self.config.max_concurrent_download)
.try_collect()
.await?;

Ok(ImageData {
layers,
Expand Down Expand Up @@ -332,22 +339,35 @@ impl Client {
};

// Upload layers
for layer in layers {
let digest = layer.sha256_digest();
match self
.push_blob_chunked(image_ref, &layer.data, &digest)
.await
{
Err(OciDistributionError::SpecViolationError(violation)) => {
warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
warn!("Attempting monolithic push");
self.push_blob_monolithically(image_ref, &layer.data, &digest)
.await?;
stream::iter(layers)
.map(|layer| {
let this = &self;
async move {
let digest = layer.sha256_digest();
match this
.push_blob_chunked(image_ref, &layer.data, &digest)
.await
{
Err(OciDistributionError::SpecViolationError(violation)) => {
warn!(
?violation,
"Registry is not respecting the OCI Distribution \
Specification when doing chunked push operations"
);
warn!("Attempting monolithic push");
this.push_blob_monolithically(image_ref, &layer.data, &digest)
.await?;
}
Err(e) => return Err(e),
_ => {}
};

Ok(())
}
Err(e) => return Err(e),
_ => {}
};
}
})
.buffer_unordered(self.config.max_concurrent_upload)
.try_for_each(future::ok)
.await?;

let config_url = match self
.push_blob_chunked(image_ref, &config.data, &manifest.config.digest)
Expand Down Expand Up @@ -1294,6 +1314,18 @@ pub struct ClientConfig {
/// If set to None, an error is raised if an Image Index manifest is received
/// during an image pull.
pub platform_resolver: Option<Box<PlatformResolverFn>>,

/// Maximum number of concurrent uploads to perform during a `push`
/// operation.
///
/// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`].
pub max_concurrent_upload: usize,

/// Maximum number of concurrent downloads to perform during a `pull`
/// operation.
///
/// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`].
pub max_concurrent_download: usize,
}

impl Default for ClientConfig {
Expand All @@ -1305,6 +1337,8 @@ impl Default for ClientConfig {
accept_invalid_certificates: false,
extra_root_certificates: Vec::new(),
platform_resolver: Some(Box::new(current_platform_resolver)),
max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
}
}
}
Expand Down