Skip to content

Commit

Permalink
client: Return Stream for async_pull_blob API
Browse files Browse the repository at this point in the history
Return Stream and allow the user to decide how to use it.

Fixes: #58

Signed-off-by: Wang, Arron <arron.wang@intel.com>
  • Loading branch information
arronwy committed May 21, 2023
1 parent 6498685 commit abff18a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ trust-dns = ["reqwest/trust-dns"]
test-registry = []

[dependencies]
bytes = "1"
chrono = { version = "0.4.23", features = ["serde"] }
futures = "0.3"
futures-util = "0.3"
http = "0.2"
http-auth = { version = "0.1", default_features = false }
Expand All @@ -50,7 +50,6 @@ serde = { version = "1.0", features = ["derive"] }
sha2 = "0.10"
thiserror = "1.0"
tokio = { version = "1.21", features = ["macros", "io-util"] }
tokio-util = { version = "0.7.4", features = ["compat"] }
tracing = { version = "0.1", features = ['log'] }
unicase = "2.6"

Expand All @@ -66,3 +65,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tempfile = "3.3"
testcontainers = "0.14"
tokio = { version = "1.21", features = ["macros", "fs", "rt-multi-thread"] }
tokio-util = { version = "0.7.4", features = ["compat"] }
25 changes: 14 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::errors::{OciDistributionError, Result};
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
use futures_util::future;
use futures_util::stream::{self, StreamExt, TryStreamExt};
use futures_util::Stream;
use http::HeaderValue;
use http_auth::{parser::ChallengeParser, ChallengeRef};
use olpc_cjson::CanonicalFormatter;
Expand All @@ -29,8 +30,7 @@ use serde::Serialize;
use sha2::Digest;
use std::collections::HashMap;
use std::convert::TryFrom;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tracing::{debug, trace, warn};

const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
Expand Down Expand Up @@ -820,12 +820,12 @@ impl Client {
/// Stream a single layer from an OCI registry.
///
/// This is a streaming version of [`Client::pull_blob`].
/// Returns [`AsyncRead`](tokio::io::AsyncRead).
pub async fn async_pull_blob(
/// Returns [`Stream`](futures_util::Stream).
pub async fn pull_blob_stream(
&self,
image: &Reference,
digest: &str,
) -> Result<impl AsyncRead + Unpin> {
) -> Result<impl Stream<Item = std::result::Result<bytes::Bytes, std::io::Error>>> {
let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), digest);
let stream = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
Expand All @@ -836,7 +836,7 @@ impl Client {
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));

Ok(FuturesAsyncReadCompatExt::compat(stream.into_async_read()))
Ok(stream)
}

/// Begins a session to push an image to registry in a monolithical way
Expand Down Expand Up @@ -1533,6 +1533,8 @@ mod test {
use std::path;
use std::result::Result;
use tempfile::TempDir;
use tokio::io::AsyncReadExt;
use tokio_util::io::StreamReader;

#[cfg(feature = "test-registry")]
use testcontainers::{
Expand Down Expand Up @@ -2065,7 +2067,7 @@ mod test {
}

#[tokio::test]
async fn test_async_pull_blob() {
async fn test_pull_blob_stream() {
let mut c = Client::default();

for &image in TEST_IMAGES {
Expand All @@ -2086,11 +2088,12 @@ mod test {
let mut file: Vec<u8> = Vec::new();
let layer0 = &manifest.layers[0];

let mut async_reader = c
.async_pull_blob(&reference, &layer0.digest)
let layer_stream = c
.pull_blob_stream(&reference, &layer0.digest)
.await
.expect("failed to pull blob with async read");
tokio::io::AsyncReadExt::read_to_end(&mut async_reader, &mut file)
.expect("failed to pull blob stream");

AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream), &mut file)
.await
.unwrap();

Expand Down

0 comments on commit abff18a

Please sign in to comment.