diff --git a/Cargo.toml b/Cargo.toml index a17f4423..5bb9a800 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,8 +35,8 @@ trust-dns = ["reqwest/trust-dns"] test-registry = [] [dependencies] +bytes = "1" chrono = { version = "0.4.23", features = ["serde"] } -futures = "0.3" futures-util = "0.3" http = "0.2" http-auth = { version = "0.1", default_features = false } @@ -50,7 +50,6 @@ serde = { version = "1.0", features = ["derive"] } sha2 = "0.10" thiserror = "1.0" tokio = { version = "1.21", features = ["macros", "io-util"] } -tokio-util = { version = "0.7.4", features = ["compat"] } tracing = { version = "0.1", features = ['log'] } unicase = "2.6" @@ -66,3 +65,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } tempfile = "3.3" testcontainers = "0.14" tokio = { version = "1.21", features = ["macros", "fs", "rt-multi-thread"] } +tokio-util = { version = "0.7.4", features = ["compat"] } diff --git a/src/client.rs b/src/client.rs index 713e247d..e5742ca5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,6 +20,7 @@ use crate::errors::{OciDistributionError, Result}; use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache}; use futures_util::future; use futures_util::stream::{self, StreamExt, TryStreamExt}; +use futures_util::Stream; use http::HeaderValue; use http_auth::{parser::ChallengeParser, ChallengeRef}; use olpc_cjson::CanonicalFormatter; @@ -29,8 +30,7 @@ use serde::Serialize; use sha2::Digest; use std::collections::HashMap; use std::convert::TryFrom; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; -use tokio_util::compat::FuturesAsyncReadCompatExt; +use tokio::io::{AsyncWrite, AsyncWriteExt}; use tracing::{debug, trace, warn}; const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[ @@ -820,12 +820,12 @@ impl Client { /// Stream a single layer from an OCI registry. /// /// This is a streaming version of [`Client::pull_blob`]. - /// Returns [`AsyncRead`](tokio::io::AsyncRead). - pub async fn async_pull_blob( + /// Returns [`Stream`](futures_util::Stream). + pub async fn pull_blob_stream( &self, image: &Reference, digest: &str, - ) -> Result { + ) -> Result>> { let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), digest); let stream = RequestBuilderWrapper::from_client(self, |client| client.get(&url)) .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)? @@ -836,7 +836,7 @@ impl Client { .bytes_stream() .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); - Ok(FuturesAsyncReadCompatExt::compat(stream.into_async_read())) + Ok(stream) } /// Begins a session to push an image to registry in a monolithical way @@ -1533,6 +1533,8 @@ mod test { use std::path; use std::result::Result; use tempfile::TempDir; + use tokio::io::AsyncReadExt; + use tokio_util::io::StreamReader; #[cfg(feature = "test-registry")] use testcontainers::{ @@ -2065,7 +2067,7 @@ mod test { } #[tokio::test] - async fn test_async_pull_blob() { + async fn test_pull_blob_stream() { let mut c = Client::default(); for &image in TEST_IMAGES { @@ -2086,11 +2088,12 @@ mod test { let mut file: Vec = Vec::new(); let layer0 = &manifest.layers[0]; - let mut async_reader = c - .async_pull_blob(&reference, &layer0.digest) + let layer_stream = c + .pull_blob_stream(&reference, &layer0.digest) .await - .expect("failed to pull blob with async read"); - tokio::io::AsyncReadExt::read_to_end(&mut async_reader, &mut file) + .expect("failed to pull blob stream"); + + AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream), &mut file) .await .unwrap();