Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make pushes concurrent and limit pull concurrency #72

Merged
merged 3 commits into from
Apr 18, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 90 additions & 35 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use crate::Reference;

use crate::errors::{OciDistributionError, Result};
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
use futures::stream::TryStreamExt;
use futures_util::future;
use futures_util::stream::StreamExt;
use futures_util::stream::{self, StreamExt, TryStreamExt};
use http::HeaderValue;
use http_auth::{parser::ChallengeParser, ChallengeRef};
use olpc_cjson::CanonicalFormatter;
Expand All @@ -43,6 +42,9 @@ const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[

const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;

const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;

/// The data for an image or module.
#[derive(Clone)]
pub struct ImageData {
Expand Down Expand Up @@ -264,6 +266,23 @@ impl Client {
image: &Reference,
auth: &RegistryAuth,
accepted_media_types: Vec<&str>,
) -> Result<ImageData> {
self.pull_with_max_concurrency(
image,
auth,
accepted_media_types,
DEFAULT_MAX_CONCURRENT_DOWNLOAD,
)
.await
}

/// Like `pull`, but with a specified concurrent layer download limit.
pub async fn pull_with_max_concurrency(
&mut self,
image: &Reference,
auth: &RegistryAuth,
accepted_media_types: Vec<&str>,
max_concurrent_download: usize,
) -> Result<ImageData> {
debug!("Pulling image: {:?}", image);
let op = RegistryOperation::Pull;
Expand All @@ -276,24 +295,26 @@ impl Client {
self.validate_layers(&manifest, accepted_media_types)
.await?;

let layers = manifest.layers.iter().map(|layer| {
// This avoids moving `self` which is &mut Self
// into the async block. We only want to capture
// as &Self
let this = &self;
async move {
let mut out: Vec<u8> = Vec::new();
debug!("Pulling image layer");
this.pull_blob(image, &layer.digest, &mut out).await?;
Ok::<_, OciDistributionError>(ImageLayer::new(
out,
layer.media_type.clone(),
layer.annotations.clone(),
))
}
});

let layers = future::try_join_all(layers).await?;
let layers = stream::iter(&manifest.layers)
.map(|layer| {
// This avoids moving `self` which is &mut Self
// into the async block. We only want to capture
// as &Self
let this = &self;
async move {
let mut out: Vec<u8> = Vec::new();
debug!("Pulling image layer");
this.pull_blob(image, &layer.digest, &mut out).await?;
Ok::<_, OciDistributionError>(ImageLayer::new(
out,
layer.media_type.clone(),
layer.annotations.clone(),
))
}
})
.buffer_unordered(max_concurrent_download)
.try_collect()
.await?;

Ok(ImageData {
layers,
Expand All @@ -319,6 +340,27 @@ impl Client {
config: Config,
auth: &RegistryAuth,
manifest: Option<OciImageManifest>,
) -> Result<PushResponse> {
self.push_with_max_concurrency(
image_ref,
layers,
config,
auth,
manifest,
DEFAULT_MAX_CONCURRENT_UPLOAD,
)
.await
}

/// Like `push`, but with a specified concurrent layer upload limit.
pub async fn push_with_max_concurrency(
&mut self,
image_ref: &Reference,
layers: &[ImageLayer],
config: Config,
auth: &RegistryAuth,
manifest: Option<OciImageManifest>,
max_concurrent_upload: usize,
) -> Result<PushResponse> {
debug!("Pushing image: {:?}", image_ref);
let op = RegistryOperation::Push;
Expand All @@ -332,22 +374,35 @@ impl Client {
};

// Upload layers
for layer in layers {
let digest = layer.sha256_digest();
match self
.push_blob_chunked(image_ref, &layer.data, &digest)
.await
{
Err(OciDistributionError::SpecViolationError(violation)) => {
warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
warn!("Attempting monolithic push");
self.push_blob_monolithically(image_ref, &layer.data, &digest)
.await?;
stream::iter(layers)
.map(|layer| {
let this = &self;
async move {
let digest = layer.sha256_digest();
match this
.push_blob_chunked(image_ref, &layer.data, &digest)
.await
{
Err(OciDistributionError::SpecViolationError(violation)) => {
warn!(
?violation,
"Registry is not respecting the OCI Distribution \
Specification when doing chunked push operations"
);
warn!("Attempting monolithic push");
this.push_blob_monolithically(image_ref, &layer.data, &digest)
.await?;
}
Err(e) => return Err(e),
_ => {}
};

Ok(())
}
Err(e) => return Err(e),
_ => {}
};
}
})
.buffer_unordered(max_concurrent_upload)
.try_for_each(future::ok)
.await?;

let config_url = match self
.push_blob_chunked(image_ref, &config.data, &manifest.config.digest)
Expand Down