Skip to content

Commit

Permalink
Move retry_forever_download_to_vec into GenericRemoteStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
arpad-m committed Oct 21, 2024
1 parent 4063340 commit d4fec5f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 32 deletions.
51 changes: 50 additions & 1 deletion libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod support;
use std::{
collections::HashMap,
fmt::Debug,
future::Future,
num::NonZeroU32,
ops::Bound,
pin::{pin, Pin},
Expand All @@ -36,7 +37,7 @@ use futures::{stream::Stream, StreamExt};
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tokio_util::{io::StreamReader, sync::CancellationToken};
use tracing::info;

pub use self::{
Expand Down Expand Up @@ -630,6 +631,32 @@ impl GenericRemoteStorage {
})
}

/// Download the specified object into a memory buffer with infinite retries
pub async fn retry_forever_download_to_vec<const WARN_THRESHOLD: u32>(
&self,
remote_path: &RemotePath,
download_opts: &DownloadOpts,
cancel: &CancellationToken,
) -> Result<(Vec<u8>, SystemTime), DownloadError> {
retry_forever::<_, _, _, WARN_THRESHOLD>(
|| async {
let download = self.download(remote_path, download_opts, cancel).await?;

let mut bytes = Vec::new();

let stream = download.download_stream;
let mut stream = StreamReader::new(stream);

tokio::io::copy_buf(&mut stream, &mut bytes).await?;

Ok((bytes, download.last_modified))
},
&format!("download {remote_path:?}"),
cancel,
)
.await
}

/// The name of the bucket/container/etc.
pub fn bucket_name(&self) -> Option<&str> {
match self {
Expand All @@ -641,6 +668,28 @@ impl GenericRemoteStorage {
}
}

async fn retry_forever<T, O, F, const WARN_THRESHOLD: u32>(
op: O,
description: &str,
cancel: &CancellationToken,
) -> Result<T, DownloadError>
where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
utils::backoff::retry(
op,
DownloadError::is_permanent,
WARN_THRESHOLD,
u32::MAX,
description,
cancel,
)
.await
.ok_or_else(|| DownloadError::Cancelled)
.and_then(|x| x)
}

/// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
/// Immutable, cannot be changed once the file is created.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
42 changes: 11 additions & 31 deletions pageserver/src/tenant/remote_timeline_client/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use utils::backoff;
Expand Down Expand Up @@ -339,32 +338,6 @@ pub async fn list_remote_timelines(
list_identifiers::<TimelineId>(storage, remote_path, cancel).await
}

async fn do_download_remote_path_retry_forever(
storage: &GenericRemoteStorage,
remote_path: &RemotePath,
cancel: &CancellationToken,
) -> Result<(Vec<u8>, SystemTime), DownloadError> {
download_retry_forever(
|| async {
let download = storage
.download(remote_path, &DownloadOpts::default(), cancel)
.await?;

let mut bytes = Vec::new();

let stream = download.download_stream;
let mut stream = StreamReader::new(stream);

tokio::io::copy_buf(&mut stream, &mut bytes).await?;

Ok((bytes, download.last_modified))
},
&format!("download {remote_path:?}"),
cancel,
)
.await
}

pub async fn do_download_tenant_manifest(
storage: &GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
Expand All @@ -374,8 +347,10 @@ pub async fn do_download_tenant_manifest(
let generation = super::TENANT_MANIFEST_GENERATION;
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);

let (manifest_bytes, _manifest_bytes_mtime) =
do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
let (manifest_bytes, _manifest_bytes_mtime) = storage
.retry_forever_download_to_vec::<FAILED_DOWNLOAD_WARN_THRESHOLD>(&remote_path,
&Default::default(), cancel)
.await?;

let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
.with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
Expand All @@ -393,8 +368,13 @@ async fn do_download_index_part(
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);

let (index_part_bytes, index_part_mtime) =
do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
let (index_part_bytes, index_part_mtime) = storage
.retry_forever_download_to_vec::<FAILED_DOWNLOAD_WARN_THRESHOLD>(
&remote_path,
&Default::default(),
cancel,
)
.await?;

let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
Expand Down

0 comments on commit d4fec5f

Please sign in to comment.