Skip to content

Commit

Permalink
Merge pull request #54 from arronwy/stream
Browse files Browse the repository at this point in the history
Add async_read support for pull_blob API
  • Loading branch information
flavio authored Dec 14, 2022
2 parents 0466847 + 55037f2 commit 804cc54
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ trust-dns = ["reqwest/trust-dns"]
test-registry = []

[dependencies]
futures = "0.3"
futures-util = "0.3"
http = "0.2"
http-auth = { version = "0.1", default_features = false }
Expand All @@ -47,6 +48,7 @@ serde = { version = "1.0", features = ["derive"] }
sha2 = "0.10"
thiserror = "1.0"
tokio = { version = "1.21", features = ["macros", "fs"] }
tokio-util = { version = "0.7.4", features = ["compat"] }
tracing = { version = "0.1", features = ['log'] }
unicase = "2.6"

Expand Down
61 changes: 60 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::Reference;

use crate::errors::{OciDistributionError, Result};
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
use futures::stream::TryStreamExt;
use futures_util::future;
use futures_util::stream::StreamExt;
use http::HeaderValue;
Expand All @@ -28,7 +29,8 @@ use serde::Serialize;
use sha2::Digest;
use std::collections::HashMap;
use std::convert::TryFrom;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, trace, warn};

const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
Expand Down Expand Up @@ -775,6 +777,28 @@ impl Client {
Ok(())
}

/// Stream a single layer from an OCI registry.
///
/// This is a streaming version of [`Client::pull_blob`].
/// Returns [`AsyncRead`](tokio::io::AsyncRead).
pub async fn async_pull_blob(
&self,
image: &Reference,
digest: &str,
) -> Result<impl AsyncRead + Unpin> {
let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), digest);
let stream = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.apply_auth(image, RegistryOperation::Pull)?
.into_request_builder()
.send()
.await?
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));

Ok(FuturesAsyncReadCompatExt::compat(stream.into_async_read()))
}

/// Begins a session to push an image to registry in a monolithical way
///
/// Returns URL with session UUID
Expand Down Expand Up @@ -1986,6 +2010,41 @@ mod test {
}
}

#[tokio::test]
async fn test_async_pull_blob() {
let mut c = Client::default();

for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
c.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("authenticated");
let (manifest, _) = c
._pull_image_manifest(&reference)
.await
.expect("failed to pull manifest");

// Pull one specific layer
let mut file: Vec<u8> = Vec::new();
let layer0 = &manifest.layers[0];

let mut async_reader = c
.async_pull_blob(&reference, &layer0.digest)
.await
.expect("failed to pull blob with async read");
tokio::io::AsyncReadExt::read_to_end(&mut async_reader, &mut file)
.await
.unwrap();

// The manifest says how many bytes we should expect.
assert_eq!(file.len(), layer0.size as usize);
}
}

#[tokio::test]
async fn test_pull() {
for &image in TEST_IMAGES {
Expand Down

0 comments on commit 804cc54

Please sign in to comment.