From 53de2dcd48e839f425c59849c4c896ac78149074 Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Thu, 16 Mar 2023 10:52:29 -0600 Subject: [PATCH 1/3] make pushes concurrent and limit pull concurrency Previously, pushes were serialized and pulls were unboundedly concurrent. This makes pushes concurrent (capped at 16 concurrent uploads) and also caps download concurrency at 16. Signed-off-by: Joel Dice --- src/client.rs | 86 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 35 deletions(-) diff --git a/src/client.rs b/src/client.rs index 5482ebf3..20213c55 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,9 +18,8 @@ use crate::Reference; use crate::errors::{OciDistributionError, Result}; use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache}; -use futures::stream::TryStreamExt; use futures_util::future; -use futures_util::stream::StreamExt; +use futures_util::stream::{self, StreamExt, TryStreamExt}; use http::HeaderValue; use http_auth::{parser::ChallengeParser, ChallengeRef}; use olpc_cjson::CanonicalFormatter; @@ -43,6 +42,8 @@ const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[ const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024; +const MAX_PARALLEL_PUSH_AND_PULL: usize = 16; + /// The data for an image or module. #[derive(Clone)] pub struct ImageData { @@ -276,24 +277,26 @@ impl Client { self.validate_layers(&manifest, accepted_media_types) .await?; - let layers = manifest.layers.iter().map(|layer| { - // This avoids moving `self` which is &mut Self - // into the async block. We only want to capture - // as &Self - let this = &self; - async move { - let mut out: Vec = Vec::new(); - debug!("Pulling image layer"); - this.pull_blob(image, &layer.digest, &mut out).await?; - Ok::<_, OciDistributionError>(ImageLayer::new( - out, - layer.media_type.clone(), - layer.annotations.clone(), - )) - } - }); - - let layers = future::try_join_all(layers).await?; + let layers = stream::iter(&manifest.layers) + .map(|layer| { + // This avoids moving `self` which is &mut Self + // into the async block. We only want to capture + // as &Self + let this = &self; + async move { + let mut out: Vec = Vec::new(); + debug!("Pulling image layer"); + this.pull_blob(image, &layer.digest, &mut out).await?; + Ok::<_, OciDistributionError>(ImageLayer::new( + out, + layer.media_type.clone(), + layer.annotations.clone(), + )) + } + }) + .buffer_unordered(MAX_PARALLEL_PUSH_AND_PULL) + .try_collect() + .await?; Ok(ImageData { layers, @@ -332,22 +335,35 @@ impl Client { }; // Upload layers - for layer in layers { - let digest = layer.sha256_digest(); - match self - .push_blob_chunked(image_ref, &layer.data, &digest) - .await - { - Err(OciDistributionError::SpecViolationError(violation)) => { - warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations"); - warn!("Attempting monolithic push"); - self.push_blob_monolithically(image_ref, &layer.data, &digest) - .await?; + stream::iter(layers) + .map(|layer| { + let this = &self; + async move { + let digest = layer.sha256_digest(); + match this + .push_blob_chunked(image_ref, &layer.data, &digest) + .await + { + Err(OciDistributionError::SpecViolationError(violation)) => { + warn!( + ?violation, + "Registry is not respecting the OCI Distribution \ + Specification when doing chunked push operations" + ); + warn!("Attempting monolithic push"); + this.push_blob_monolithically(image_ref, &layer.data, &digest) + .await?; + } + Err(e) => return Err(e), + _ => {} + }; + + Ok(()) } - Err(e) => return Err(e), - _ => {} - }; - } + }) + .buffer_unordered(MAX_PARALLEL_PUSH_AND_PULL) + .try_for_each(future::ok) + .await?; let config_url = match self .push_blob_chunked(image_ref, &config.data, &manifest.config.digest) From eacc1e959e1b3512f1046f980e78af589e0313ca Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Fri, 24 Mar 2023 14:00:01 -0600 Subject: [PATCH 2/3] optionally override default concurrency limits This introduces `pull_with_max_concurrency` and `push_with_max_concurrency`, allowing the application to specify download and upload concurrency limits, respectively. Signed-off-by: Joel Dice --- src/client.rs | 45 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 20213c55..cc82de18 100644 --- a/src/client.rs +++ b/src/client.rs @@ -42,7 +42,8 @@ const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[ const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024; -const MAX_PARALLEL_PUSH_AND_PULL: usize = 16; +const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16; +const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16; /// The data for an image or module. #[derive(Clone)] @@ -265,6 +266,23 @@ impl Client { image: &Reference, auth: &RegistryAuth, accepted_media_types: Vec<&str>, + ) -> Result { + self.pull_with_max_concurrency( + image, + auth, + accepted_media_types, + DEFAULT_MAX_CONCURRENT_DOWNLOAD, + ) + .await + } + + /// Like `pull`, but with a specified concurrent layer download limit. + pub async fn pull_with_max_concurrency( + &mut self, + image: &Reference, + auth: &RegistryAuth, + accepted_media_types: Vec<&str>, + max_concurrent_download: usize, ) -> Result { debug!("Pulling image: {:?}", image); let op = RegistryOperation::Pull; @@ -294,7 +312,7 @@ impl Client { )) } }) - .buffer_unordered(MAX_PARALLEL_PUSH_AND_PULL) + .buffer_unordered(max_concurrent_download) .try_collect() .await?; @@ -322,6 +340,27 @@ impl Client { config: Config, auth: &RegistryAuth, manifest: Option, + ) -> Result { + self.push_with_max_concurrency( + image_ref, + layers, + config, + auth, + manifest, + DEFAULT_MAX_CONCURRENT_UPLOAD, + ) + .await + } + + /// Like `push`, but with a specified concurrent layer upload limit. + pub async fn push_with_max_concurrency( + &mut self, + image_ref: &Reference, + layers: &[ImageLayer], + config: Config, + auth: &RegistryAuth, + manifest: Option, + max_concurrent_upload: usize, ) -> Result { debug!("Pushing image: {:?}", image_ref); let op = RegistryOperation::Push; @@ -361,7 +400,7 @@ impl Client { Ok(()) } }) - .buffer_unordered(MAX_PARALLEL_PUSH_AND_PULL) + .buffer_unordered(max_concurrent_upload) .try_for_each(future::ok) .await?; From 948922fa3e5f07cb26faba0be30f9c1e45816fa9 Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Mon, 27 Mar 2023 08:17:45 -0600 Subject: [PATCH 3/3] add concurrency limit fields to `ClientConfig` ...and use them to determine maximum upload and download concurrency for pushes and pulls, respectively. I've also bumped the version up to 0.10.0 since this is a breaking API change (due to new fields being added to a public struct composed of all-public fields). Signed-off-by: Joel Dice --- Cargo.toml | 2 +- src/client.rs | 63 +++++++++++++++++---------------------------------- 2 files changed, 22 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index da84389d..a17f4423 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ license = "Apache-2.0" name = "oci-distribution" readme = "README.md" repository = "https://github.com/krustlet/oci-distribution" -version = "0.9.4" +version = "0.10.0" [badges] maintenance = {status = "actively-developed"} diff --git a/src/client.rs b/src/client.rs index cc82de18..713e247d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -42,8 +42,11 @@ const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[ const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024; -const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16; -const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16; +/// Default value for `ClientConfig::max_concurrent_upload` +pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16; + +/// Default value for `ClientConfig::max_concurrent_download` +pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16; /// The data for an image or module. #[derive(Clone)] @@ -266,23 +269,6 @@ impl Client { image: &Reference, auth: &RegistryAuth, accepted_media_types: Vec<&str>, - ) -> Result { - self.pull_with_max_concurrency( - image, - auth, - accepted_media_types, - DEFAULT_MAX_CONCURRENT_DOWNLOAD, - ) - .await - } - - /// Like `pull`, but with a specified concurrent layer download limit. - pub async fn pull_with_max_concurrency( - &mut self, - image: &Reference, - auth: &RegistryAuth, - accepted_media_types: Vec<&str>, - max_concurrent_download: usize, ) -> Result { debug!("Pulling image: {:?}", image); let op = RegistryOperation::Pull; @@ -312,7 +298,7 @@ impl Client { )) } }) - .buffer_unordered(max_concurrent_download) + .buffer_unordered(self.config.max_concurrent_download) .try_collect() .await?; @@ -340,27 +326,6 @@ impl Client { config: Config, auth: &RegistryAuth, manifest: Option, - ) -> Result { - self.push_with_max_concurrency( - image_ref, - layers, - config, - auth, - manifest, - DEFAULT_MAX_CONCURRENT_UPLOAD, - ) - .await - } - - /// Like `push`, but with a specified concurrent layer upload limit. - pub async fn push_with_max_concurrency( - &mut self, - image_ref: &Reference, - layers: &[ImageLayer], - config: Config, - auth: &RegistryAuth, - manifest: Option, - max_concurrent_upload: usize, ) -> Result { debug!("Pushing image: {:?}", image_ref); let op = RegistryOperation::Push; @@ -400,7 +365,7 @@ impl Client { Ok(()) } }) - .buffer_unordered(max_concurrent_upload) + .buffer_unordered(self.config.max_concurrent_upload) .try_for_each(future::ok) .await?; @@ -1349,6 +1314,18 @@ pub struct ClientConfig { /// If set to None, an error is raised if an Image Index manifest is received /// during an image pull. pub platform_resolver: Option>, + + /// Maximum number of concurrent uploads to perform during a `push` + /// operation. + /// + /// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`]. + pub max_concurrent_upload: usize, + + /// Maximum number of concurrent downloads to perform during a `pull` + /// operation. + /// + /// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`]. + pub max_concurrent_download: usize, } impl Default for ClientConfig { @@ -1360,6 +1337,8 @@ impl Default for ClientConfig { accept_invalid_certificates: false, extra_root_certificates: Vec::new(), platform_resolver: Some(Box::new(current_platform_resolver)), + max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD, + max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD, } } }